# Copyright 2022 MONAI Consortium
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import shutil
import tempfile
from ast import Bytes
from pathlib import Path
from typing import Dict, Optional
import numpy as np
from monai.deploy.utils.importutil import optional_import
nib, _ = optional_import("nibabel")
sitk, _ = optional_import("SimpleITK")
label, _ = optional_import("skimage.measure", name="label")
measure, _ = optional_import("skimage", name="measure")
mesh, _ = optional_import("stl", name="mesh")
resize, _ = optional_import("skimage.transform", name="resize")
trimesh, _ = optional_import("trimesh")
import monai.deploy.core as md
from monai.deploy.core import ExecutionContext, Image, InputContext, IOType, Operator, OutputContext
__all__ = ["STLConversionOperator", "STLConverter"]
[docs]@md.input("image", Image, IOType.IN_MEMORY)
@md.output("stl_output", Bytes, IOType.IN_MEMORY) # Only available when run as non-leaf operator
# nibabel is required by the dependent class STLConverter.
@md.env(
pip_packages=["numpy>=1.21", "nibabel >= 3.2.1", "numpy-stl>=2.12.0", "scikit-image>=0.17.2", "trimesh>=3.8.11"]
)
class STLConversionOperator(Operator):
"""Converts volumetric image to surface mesh in STL format, file output only.
Only when used as a non-leaf operator is the output of STL binary stored in memory idenfied by the output label.
If a file path is provided, the STL binary will be saved in the the application's output folder of the current run.
"""
[docs] def __init__(
self, output_file=None, class_id=None, is_smooth=True, keep_largest_connected_component=True, *args, **kwargs
):
"""Creates an object to generate a surface mesh and saves it as an STL file if the path is provided.
Args:
output_file (str, optional): output STL file relative path. Default to None for no file output.
class_id (array, optional): Class label ids. Defaults to None.
is_smooth (bool, optional): smoothing or not. Defaults to True.
keep_largest_connected_component (bool, optional): Defaults to True.
"""
super().__init__(*args, **kwargs)
self._logger = logging.getLogger("{}.{}".format(__name__, type(self).__name__))
self._class_id = class_id
self._is_smooth = is_smooth
self._keep_largest_connected_component = keep_largest_connected_component
self._output_file = output_file if output_file and len(str(output_file)) > 0 else None
self._converter = STLConverter(*args, **kwargs)
[docs] def compute(self, op_input: InputContext, op_output: OutputContext, context: ExecutionContext):
"""Gets the input (image), processes it and sets results in the output.
When used in a leaf operator, this function cannot set its output as in-memory object due to
current limitation.
If a file path is provided, the STL binary will be saved in the the application's output
folder of the current run.
Args:
op_input (InputContext): An input context for the operator.
op_output (OutputContext): An output context for the operator.
context (ExecutionContext): An execution context for the operator.
"""
input_image = op_input.get("image")
if not input_image:
raise ValueError("Input is None.")
# Use the app's current run output folder as parent to the STL output path.
if self._output_file and len(str(self._output_file)) > 0:
_output_file = context.output.get().path / self._output_file
_output_file.parent.mkdir(parents=True, exist_ok=True)
self._logger.info(f"Output will be saved in file {_output_file}.")
stl_bytes = self._convert(input_image, _output_file)
try:
# TODO: Need a way to find if the operator is run as leaf node in order to
# avoid setting in_memory object.
if self.op_info.get_storage_type("output", "stl_output") == IOType.IN_MEMORY:
op_output.set(stl_bytes)
except Exception as ex:
self._logger.warn(f"In_memory output cannot be used when run as non-leaf operator. {ex}")
def _convert(self, image: Image, output_file: Optional[Path] = None):
"""
Args:
image (Image): object with the image (ndarray in DHW) and its metadata dictionary.
output_file (Path, optional): output file path. Default None for no file output.
Returns:
Bytes: Bytes of the binary of STL file
"""
# Use path in the output_file arg if provided.
if isinstance(output_file, Path):
output_file.parent.mkdir(exist_ok=True)
else:
output_file = self._output_file
return self._converter.convert(
image=image,
output_file=output_file,
class_ids=self._class_id,
is_smooth=self._is_smooth,
keep_largest_connected_component=self._keep_largest_connected_component,
)
[docs]class STLConverter(object):
"""Converts volumetric image to surface mesh in STL"""
[docs] def __init__(self, *args, **kwargs):
"""Creates an instance to generate a surface mesh in STL with an Image object."""
super().__init__(*args, **kwargs)
self._logger = logging.getLogger("{}.{}".format(__name__, type(self).__name__))
[docs] def convert(
self,
image: Image,
output_file: Optional[Path] = None,
class_ids=None,
is_smooth=True,
keep_largest_connected_component=True,
):
"""
Args:
image (Image): object with the image (ndarray of DHW index order) and its metadata dictionary.
output_file (str): output STL file path. Default to None for not saving output file.
class_id (array, optional): Class label id. Defaults to None.
is_smooth (bool, optional): smoothing or not. Defaults to True.
keep_largest_connected_component (bool, optional): Defaults to True.
Returns:
Bytes of the binary STL file.
"""
if not image or not isinstance(image, Image):
raise ValueError("image is not a Image object.")
if isinstance(output_file, Path):
output_file.parent.mkdir(parents=True, exist_ok=True)
s_image = self.SpatialImage(image)
nda = s_image.image_array
self._logger.info(f"Image ndarray shape:{nda.shape}")
if keep_largest_connected_component:
nda = STLConverter.get_largest_cc(nda)
res = s_image.spacing
if res is None:
raise ValueError("Image spacing/resolution is missing.")
# In case image has been re-oriented from the original
affine = s_image.original_affine
if (
affine is not None
and s_image.affine is not None
and np.sum(np.abs(s_image.original_affine - s_image.affine)) > 1e-7
):
codes = nib.orientations.axcodes2ornt(nib.orientations.aff2axcodes(np.linalg.inv(affine)))
nda = nib.orientations.apply_orientation(np.squeeze(nda), codes)
new_nda = np.zeros(shape=nda.shape, dtype=np.uint8)
if class_ids is None:
new_nda += (nda > 0).astype(np.uint8)
elif isinstance(class_ids, list):
for class_id in class_ids:
new_nda += (nda == class_id).astype(np.uint8)
else:
try:
new_nda += (nda == class_ids).astype(np.uint8)
except ValueError as err:
err_msg = "That was no valid value for class_id."
self._logger.error(err_msg)
raise ValueError(err_msg) from err
max_res = np.amin(res)
target_shape = []
for _j in range(3):
length = float(nda.shape[_j]) * res[_j] / max_res
length = int(np.round(length))
target_shape.append(length)
new_nda = STLConverter.resize_volume(nda, output_shape=target_shape)
verts, faces, _, _ = measure.marching_cubes(new_nda, level=0.5, step_size=5)
for _j in range(3):
verts[:, _j] = (verts[:, _j] + 0.5) * float(nda.shape[_j]) / float(new_nda.shape[_j]) - 0.5
itk_image = s_image.itk_image
for _j in range(verts.shape[0]):
vert = (float(verts[_j, 0]), float(verts[_j, 1]), float(verts[_j, 2]))
vert = itk_image.TransformContinuousIndexToPhysicalPoint(vert)
verts[_j, :] = np.array(vert)
# Write out the STL file, and then load into trimesh
try:
temp_folder = tempfile.mkdtemp()
raw_stl_filename = os.path.join(temp_folder, "temp.stl")
STLConverter.write_stl(verts, faces, raw_stl_filename)
mesh_data = trimesh.load(raw_stl_filename)
if is_smooth:
trimesh.smoothing.filter_taubin(mesh_data, iterations=20)
final_file_path = output_file if output_file else os.path.join(temp_folder, "surface_mesh.stl")
mesh_data.export(final_file_path)
with open(str(final_file_path), "rb") as r_file:
stl_bytes = r_file.read()
finally:
shutil.rmtree(temp_folder)
return stl_bytes
# Helper functions
@staticmethod
def get_largest_cc(nda):
logging.debug("ndarray shape: {}".format(nda.shape))
labels = label(nda)
# assume at least 1 CC
assert labels.max() != 0
largest_cc = labels == np.argmax(np.bincount(labels.flat)[1:]) + 1
largest_cc = largest_cc.astype(np.uint8)
return largest_cc
@staticmethod
def resize_volume(nda, output_shape, order=1, preserve_range=True, anti_aliasing=False):
return resize(
nda, output_shape, order=order, mode="constant", preserve_range=preserve_range, anti_aliasing=anti_aliasing
)
@staticmethod
def write_stl(verts, faces, filename):
# Create the mesh
cube = mesh.Mesh(np.zeros(faces.shape[0], dtype=mesh.Mesh.dtype))
for i, f in enumerate(faces):
for j in range(3):
cube.vectors[i][j] = verts[f[j], :]
cube.save(os.path.splitext(filename)[0] + ".stl")
# Helper class for wrapping the App SDK Image object
#
[docs] class SpatialImage:
"""Object encapsulating a spatial volume image instance of Image.
Channel is not supported in this version.
"""
[docs] def __init__(self, image: Image, dtype=np.float32):
"""Creates an instance.
Args:
image(Image): An instance of Image.
dtype (Numpy type, optional): Defaults to np.float32.
"""
self._logger = logging.getLogger("{}.{}".format(__name__, type(self).__name__))
if not image or not isinstance(image, Image):
raise ValueError("Argument is not a Image object.")
self._image = image
self._dtype = dtype
self._props: Dict = {}
""" Properties may include some or all of the following
img_array
shape
spacing
original_affine
affine
"""
self._read_from_in_mem_image(self._image)
@property
def image_array(self):
"""Image data in Numpy array, or None"""
return self._props.get("img_array", None)
@property
def itk_image(self):
"""ITK image object created from the encapsulated image object, or None"""
return self._props.get("itk_image", None)
@property
def shape(self):
"""Shape of image array, or None"""
return self._props.get("shape", None)
@property
def spacing(self):
"""Pixel spacing of original image, aka resolution, or None"""
return self._props.get("spacing", None)
@property
def original_affine(self):
"""Original affine of the image, or None"""
return self._props.get("original_affine", None)
@property
def affine(self):
"""Affine of the re-oriented image data, or None"""
return self._props.get("affine", None)
[docs] def set_property(self, key: str, value):
"""Sets an image property
Args:
key (str): key of the property
value: value of the property
"""
self._props[key] = value
[docs] def get_property(self, key: str, default=None):
"""Gets value of the specified property
Args:
key (str): key of the property
default: default value if the property does not exist.
Returns:
the value of the property, or the default value if property does not exist.
"""
return self._props.get(key, default)
[docs] def get_data(self):
"""Returns the image array in ndarray"""
return self._props.get("img_array", None)
def _load_data(self, image):
img_array = image.asnumpy()
img_meta_dict = image.metadata()
shape = np.asarray(image.asnumpy().shape)
spacing = np.asarray(
(
img_meta_dict["row_pixel_spacing"],
img_meta_dict["col_pixel_spacing"],
img_meta_dict["depth_pixel_spacing"],
)
)
original_affine = img_meta_dict["nifti_affine_transform"]
affine = original_affine
itk_image = sitk.GetImageFromArray(img_array)
itk_image.SetSpacing(
[
float(img_meta_dict["row_pixel_spacing"]),
float(img_meta_dict["col_pixel_spacing"]),
float(img_meta_dict["depth_pixel_spacing"]),
]
)
direction = []
direction.extend(img_meta_dict["row_direction_cosine"])
direction.extend(img_meta_dict["col_direction_cosine"])
direction.extend(img_meta_dict["depth_direction_cosine"])
itk_image.SetDirection(direction)
return img_array, affine, original_affine, shape, spacing, itk_image
def _read_from_in_mem_image(self, image):
"""Parse the in-memory image for the attributes.
Args:
image (Image): App SDK Image instance.
Returns:
An instance of SpacialImage.
"""
img_array, affine, original_affine, shape, spacing, itk_image = self._load_data(image)
num_dims = len(img_array.shape)
img_array = img_array.astype(self._dtype)
if num_dims == 2:
self._logger.info("2D image")
elif num_dims == 3:
self._logger.info("3D image")
elif num_dims <= 5:
# if 4d data, we assume 4th dimension is channels.
# if 5d data, try to squeeze 5th dimension.
if num_dims == 5:
img_array = np.squeeze(img_array)
if len(img_array.shape) != 4:
raise ValueError("Cannot squeeze 5D image to 4D; object doesn't support time based data.")
if self.is_channels_first:
self._logger.info("4D image, channel first")
else:
self._logger.info("4D image, channel last")
else:
raise NotImplementedError("Object does not support image of dims {}".format(num_dims))
self._props["original_affine"] = original_affine
self._props["affine"] = affine
self._props["spacing"] = spacing
self._props["shape"] = shape
self._props["img_array"] = img_array
self._props["itk_image"] = itk_image
def test():
from monai.deploy.operators.dicom_data_loader_operator import DICOMDataLoaderOperator
from monai.deploy.operators.dicom_series_selector_operator import DICOMSeriesSelectorOperator
from monai.deploy.operators.dicom_series_to_volume_operator import DICOMSeriesToVolumeOperator
current_file_dir = Path(__file__).parent.resolve()
data_path = current_file_dir.joinpath("../../../examples/ai_spleen_seg_data/dcm")
loader = DICOMDataLoaderOperator()
series_selector = DICOMSeriesSelectorOperator()
dcm_to_volume_op = DICOMSeriesToVolumeOperator()
stl_writer = STLConversionOperator()
# Testing with the main entry functions
study_list = loader.load_data_to_studies(data_path.absolute())
study_selected_series_list = series_selector.filter(None, study_list)
image = dcm_to_volume_op.convert_to_image(study_selected_series_list)
stl_writer._convert(image, Path("stl/test.stl"))
if __name__ == "__main__":
test()