monai.deploy.operators.MonaiBundleInferenceOperator#

class monai.deploy.operators.MonaiBundleInferenceOperator(fragment, *args, app_context, input_mapping, output_mapping, model_name='', bundle_path=None, bundle_config_names=<monai.deploy.operators.monai_bundle_inference_operator.BundleConfigNames object>, **kwargs)[source]#

Bases: InferenceOperator

This inference operator automates the inference operation for a given MONAI Bundle.

This inference operator configures itself based on the parsed data from a MONAI bundle file. This file is included with a MAP as a Torchscript file with added bundle metadata or a zipped bundle with weights. The class will configure how to do pre- and post-processing, inference, which device to use, state its inputs, outputs, and dependencies. Its compute method is meant to be general purpose to most any bundle such that it will handle any input specified in the bundle and produce output as specified, using the inference object the bundle defines. A number of methods are provided which define parts of functionality relating to this behavior, users may wish to overwrite these to change behavior is needed for specific bundles.

The input(s) and output(s) for this operator need to be provided when an instance is created, and their labels need to correspond to the bundle network input and output names, which are also used as the keys in the pre and post processing.

For image input and output, the type is the Image class. For output of probabilities, the type is Dict.

This operator is expected to be linked with both source and destination operators, e.g. receiving an Image object from the DICOMSeriesToVolumeOperator, and passing a segmentation Image to the DICOMSegmentationWriterOperator. In such cases, the I/O storage type can only be IN_MEMORY due to the restrictions imposed by the application executor.

For the time being, the input and output to this operator are limited to in_memory object.

Create an instance of this class, associated with an Application/Fragment.

Parameters:
  • fragment (Fragment) – An instance of the Application class which is derived from Fragment.

  • app_context (AppContext) – Object holding the I/O and model paths, and potentially loaded models.

  • input_mapping (List[IOMapping]) – Define the inputs’ name, type, and storage type.

  • output_mapping (List[IOMapping]) – Defines the outputs’ name, type, and storage type.

  • model_name (Optional[str], optional) – Name of the model/bundle, needed in multi-model case. Defaults to “”.

  • bundle_path (Optional[str], optional) – Known path to the bundle file. Defaults to None.

  • bundle_config_names (BundleConfigNames, optional) – Relevant config item names in a the bundle. Defaults to DEFAULT_BundleConfigNames.

Methods

__init__(fragment, *args, app_context, ...)

Create an instance of this class, associated with an Application/Fragment.

add_arg(*args, **kwargs)

Overloaded function.

add_dynamic_flow(*args, **kwargs)

Overloaded function.

compute(op_input, op_output, context)

Infers with the input(s) and saves the prediction result(s) to output

enable_metadata(self, enable)

Configure whether or not the metadata feature is enabled for this operator.

find_all_flow_info(self, predicate)

Find all flow info objects in the operator's next flows that match a given condition.

find_flow_info(self, predicate)

Find a flow info in the operator's next flows based on a given predicate.

initialize()

Default implementation of initialize

post_process(data, *args, **kwargs)

Processes the output list/dictionary with the stored transform sequence self._postproc.

pre_process(data, *args, **kwargs)

Processes the input dictionary with the stored transform sequence self._preproc.

predict(data, *args, **kwargs)

Predicts output using the inferer.

queue_policy(self, port_name, port_type, policy)

Set the queue policy to be used by an input (or output) port's receiver (or transmitter).

receiver(self, port_name)

Get the receiver used by an input port.

resource(self, name)

Resources associated with the operator.

service(self, service_type[, id])

Retrieve a registered fragment service through the component's fragment.

setup(spec)

Default implementation of setup method.

start()

Default implementation of start

stop()

Default implementation of stop

stop_execution(self)

Stop the execution of the operator.

transmitter(self, port_name)

Get the transmitter used by an output port.

Attributes

INPUT_EXEC_PORT_NAME

MODEL_LOCAL_PATH

OUTPUT_EXEC_PORT_NAME

args

The list of arguments associated with the component.

async_condition

The internal asynchronous condition for the operator.

bundle_path

The path of the MONAI Bundle model.

conditions

Conditions associated with the operator.

description

YAML formatted string describing the operator.

execution_context

The execution context for the operator.

fragment

The fragment (holoscan.core.Fragment) that the operator belongs to.

id

The identifier of the component.

is_metadata_enabled

Boolean indicating whether the fragment this operator belongs to has metadata transmission enabled.

known_io_data_types

kw_preprocessed_inputs

metadata

The metadata dictionary (holoscan.core.MetadataDictionary) associated with the operator.

metadata_policy

The metadata policy (holoscan.core.MetadataPolicy) associated with the operator.

model_name

name

The name of the operator.

next_flows

Get the list of flow information for connections to downstream operators.

operator_type

The operator type.

parser

The ConfigParser object.

resources

Resources associated with the operator.

spec

The operator spec (holoscan.core.OperatorSpec) associated with the operator.

class OperatorType(self: holoscan.core._core.OperatorBase.OperatorType, value: int)#

Bases: pybind11_object

Enum class for operator types used by the executor.

  • NATIVE: Native operator.

  • GXF: GXF operator.

  • VIRTUAL: Virtual operator. (for internal use, not intended for use by application authors)

Members:

NATIVE

GXF

VIRTUAL

property name#
__init__(fragment, *args, app_context, input_mapping, output_mapping, model_name='', bundle_path=None, bundle_config_names=<monai.deploy.operators.monai_bundle_inference_operator.BundleConfigNames object>, **kwargs)[source]#

Create an instance of this class, associated with an Application/Fragment.

Parameters:
  • fragment (Fragment) – An instance of the Application class which is derived from Fragment.

  • app_context (AppContext) – Object holding the I/O and model paths, and potentially loaded models.

  • input_mapping (List[IOMapping]) – Define the inputs’ name, type, and storage type.

  • output_mapping (List[IOMapping]) – Defines the outputs’ name, type, and storage type.

  • model_name (Optional[str], optional) – Name of the model/bundle, needed in multi-model case. Defaults to “”.

  • bundle_path (Optional[str], optional) – Known path to the bundle file. Defaults to None.

  • bundle_config_names (BundleConfigNames, optional) – Relevant config item names in a the bundle. Defaults to DEFAULT_BundleConfigNames.

add_arg(*args, **kwargs)#

Overloaded function.

  1. add_arg(self: holoscan.core._core.OperatorBase, arg: holoscan.core._core.Arg) -> None

Add an argument to the component.

  1. add_arg(self: holoscan.core._core.OperatorBase, arg: holoscan.core._core.ArgList) -> None

Add a list of arguments to the component.

  1. add_arg(self: holoscan.core._core.OperatorBase, **kwargs) -> None

Add arguments to the component via Python kwargs.

  1. add_arg(self: holoscan.core._core.OperatorBase, arg: holoscan.core._core.ConditionBase) -> None

  2. add_arg(self: holoscan.core._core.OperatorBase, arg: holoscan.core._core.ResourceBase) -> None

Add a condition or resource to the Operator.

This can be used to add a condition or resource to an operator after it has already been constructed.

Parameters#

argholoscan.core.Condition or holoscan.core.Resource

The condition or resource to add.

add_dynamic_flow(*args, **kwargs)#

Overloaded function.

  1. add_dynamic_flow(self: holoscan.core._core.OperatorBase, flow: holoscan.core._core.FlowInfo) -> None

  2. add_dynamic_flow(self: holoscan.core._core.OperatorBase, flows: list[holoscan.core._core.FlowInfo]) -> None

  3. add_dynamic_flow(self: holoscan.core._core.OperatorBase, next_op: holoscan.core._core.OperatorBase, next_input_port_name: str = ‘’) -> None

Add a dynamic flow from this operator to another operator.

Parameters#

next_opholoscan.core.Operator

The downstream operator to connect to.

next_input_port_namestr, optional

The name of the input port on the downstream operator to connect to. If not specified, the first available input port will be used.

Notes#

This method has several overloads to support different ways of creating dynamic flows:

  1. add_dynamic_flow(next_op: Operator, next_input_port_name: str = ‘’) - Basic connection using default output port. This is the simplest form for connecting

    two operators when you only need to specify the destination.

  2. add_dynamic_flow(curr_output_port_name: str, next_op: Operator, next_input_port_name: str = ‘’) - Connection with explicit output port specification. Use this when the source operator has

    multiple output ports and you need to specify which one to use.

  3. add_dynamic_flow(flow: FlowInfo) - Connection using a FlowInfo object, which encapsulates all connection details including:

    • Source operator and its output port specification

    • Destination operator and its input port specification

    • Port names and associated IOSpecs

    • This is useful for complex connections or when reusing connection patterns.

  4. add_dynamic_flow(flows: List[FlowInfo]) - Batch connection using multiple FlowInfo objects. Use this to set up multiple

    connections in a single call, which is more efficient than making multiple individual connections.

The FlowInfo class provides a complete description of a flow connection between operators, including all port specifications and naming. It’s particularly useful when you need to:

  • Store and reuse connection patterns

  • Create complex routing configurations

  • Handle dynamic port specifications

  • Manage multiple connections systematically

  1. add_dynamic_flow(self: holoscan.core._core.OperatorBase, curr_output_port_name: str, next_op: holoscan.core._core.OperatorBase, next_input_port_name: str = ‘’) -> None

property args#

The list of arguments associated with the component.

Returns#

arglist : holoscan.core.ArgList

property async_condition#

The internal asynchronous condition for the operator.

This property provides access to the internal asynchronous condition for the operator, which controls the scheduling of the operator’s compute method.

Returns#

conditionholoscan.conditions.AsynchronousCondition

An instance of holoscan.conditions.AsynchronousCondition that is the internal asynchronous condition for the operator.

Notes#

This object is only accessible after the executor has called Operator.initialize() via run() or run_async(). If accessed during Application.compose(), it will return None.

property bundle_path: Path | None#

The path of the MONAI Bundle model.

compute(op_input, op_output, context)[source]#

Infers with the input(s) and saves the prediction result(s) to output

Parameters:
  • op_input (InputContext) – An input context for the operator.

  • op_output (OutputContext) – An output context for the operator.

  • context (ExecutionContext) – An execution context for the operator.

property conditions#

Conditions associated with the operator.

property description#

YAML formatted string describing the operator.

enable_metadata(self: holoscan.core._core.OperatorBase, enable: bool) None#

Configure whether or not the metadata feature is enabled for this operator. If it is not set, the default value will be determined by the enable_metadata setting from the Fragment that this operator belongs to.

property execution_context#

The execution context for the operator.

This property provides access to the execution context of the operator, which contains information about the current execution environment including scheduling details.

Returns#

holoscan.core.ExecutionContext

The execution context object for this operator.

find_all_flow_info(self: holoscan.core._core.OperatorBase, predicate: Callable[[holoscan.core._core.FlowInfo], bool]) list[holoscan.core._core.FlowInfo]#

Find all flow info objects in the operator’s next flows that match a given condition.

Parameters#

predicatecallable

A function that takes a FlowInfo object and returns a boolean.

Returns#

list[holoscan.core.FlowInfo]

List of matching FlowInfo objects.

find_flow_info(self: holoscan.core._core.OperatorBase, predicate: Callable[[holoscan.core._core.FlowInfo], bool]) holoscan.core._core.FlowInfo#

Find a flow info in the operator’s next flows based on a given predicate.

Parameters#

predicatecallable

A function that takes a FlowInfo object and returns a boolean.

Returns#

holoscan.core.FlowInfo or None

The first matching FlowInfo object, or None if not found.

property fragment#

The fragment (holoscan.core.Fragment) that the operator belongs to.

property id#

The identifier of the component.

The identifier is initially set to -1, and will become a valid value when the component is initialized.

With the default executor (holoscan.gxf.GXFExecutor), the identifier is set to the GXF component ID.

Returns#

id : int

initialize()#

Default implementation of initialize

property is_metadata_enabled#

Boolean indicating whether the fragment this operator belongs to has metadata transmission enabled.

property metadata#

The metadata dictionary (holoscan.core.MetadataDictionary) associated with the operator.

property metadata_policy#

The metadata policy (holoscan.core.MetadataPolicy) associated with the operator.

The supported policies are:

  • MetadataPolicy.REJECT: Reject the new value if the key already exists

  • MetadataPolicy.UPDATE: Replace existing value with the new one if the key already exists

  • MetadataPolicy.INPLACE_UPDATE: Update the value stored within an existing MetadataObject in-place if the key already exists (in contrast to UPDATE which always replaces the existing MetadataObject with a new one).

  • MetadataPolicy.RAISE: Raise an exception if the key already exists

property name#

The name of the operator.

property next_flows#

Get the list of flow information for connections to downstream operators.

Returns#

list[holoscan.core.FlowInfo]

List of flow information objects describing connections to downstream operators.

property operator_type#

The operator type.

holoscan.core.Operator.OperatorType enum representing the type of the operator. The two types currently implemented are native and GXF.

property parser: ConfigParser | None#

The ConfigParser object.

post_process(data, *args, **kwargs)[source]#

Processes the output list/dictionary with the stored transform sequence self._postproc.

The “processed_inputs”, in fact the metadata in it, need to be passed in so that the invertible transforms in the post processing can work properly.

Return type:

Union[Image, Any, Tuple[Any, ...], Dict[Any, Any]]

pre_process(data, *args, **kwargs)[source]#

Processes the input dictionary with the stored transform sequence self._preproc.

Return type:

Union[Image, Any, Tuple[Any, ...], Dict[Any, Any]]

predict(data, *args, **kwargs)[source]#

Predicts output using the inferer.

Return type:

Union[Image, Any, Tuple[Any, ...], Dict[Any, Any]]

queue_policy(self: holoscan.core._core.OperatorBase, port_name: str, port_type: holoscan.core._core.IOSpec.IOType = <IOType.INPUT: 0>, policy: holoscan.core._core.IOSpec.QueuePolicy = <QueuePolicy.FAULT: 2>) None#

Set the queue policy to be used by an input (or output) port’s receiver (or transmitter).

Parameters#

port_namestr

The name of the port.

port_typeIOSpec.IOType, optional

Enum indicating whether port_name corresponds to an input port or output port.

policyIOSpec.QueuePolicy, optional

The queue policy to set. Valid values are:

  • QueuePolicy.POP : If the queue is full, pop the oldest item, then add the new one.

  • QueuePolicy.REJECT : If the queue is full, reject (discard) the new item.

  • QueuePolicy.FAULT : If the queue is full, log a warning and reject the new item.

Returns#

transmitterholoscan.resources.Transmitter or None

The transmitter used by this output port. Will be None if the port does not exist.

receiver(self: holoscan.core._core.OperatorBase, port_name: str) Optional[holoscan::Receiver]#

Get the receiver used by an input port.

Parameters#

port_namestr

The name of the input port.

Returns#

receiverholoscan.resources.Receiver

The receiver used by this input port. Will be None if the port does not exist.

resource(self: holoscan.core._core.OperatorBase, name: str) object | None#

Resources associated with the operator.

Parameters#

namestr

The name of the resource to retrieve

Returns#

holoscan.core.Resource or None

The resource with the given name. If no resource with the given name is found, None is returned.

property resources#

Resources associated with the operator.

service(self: object, service_type: type, id: str = '') object#

Retrieve a registered fragment service through the component’s fragment.

This method delegates to the fragment’s service() method to retrieve a previously registered fragment service by its type and optional identifier. Returns None if no fragment service is found with the specified type and identifier.

Parameters#

service_typetype

The type of the fragment service to retrieve. Must be a type that inherits from Resource or FragmentService.

idstr, optional

The identifier of the fragment service. If empty, retrieves by service type only. For Resources, this would typically be the resource’s name.

Returns#

object or None

The fragment service instance of the requested type, or None if not found. If the service wraps a Resource and a Resource type is requested, the unwrapped Resource instance is returned.

Raises#

RuntimeError

If the component has no associated fragment or if the fragment’s service method cannot be accessed.

Notes#

This is a convenience method that internally calls the fragment’s service() method. For services that wrap Resources, the method will automatically unwrap and return the Resource if a Resource type is requested.

setup(spec)[source]#

Default implementation of setup method.

property spec#

The operator spec (holoscan.core.OperatorSpec) associated with the operator.

start()#

Default implementation of start

stop()#

Default implementation of stop

stop_execution(self: holoscan.core._core.OperatorBase) None#

Stop the execution of the operator.

This method is used to stop the execution of the operator by setting the internal async condition to EVENT_NEVER state, which sets the scheduling condition to NEVER. Once stopped, the operator will not be scheduled for execution (the compute() method will not be called).

Note that executing this method does not trigger the operator’s stop() method. The stop() method is called only when the scheduler deactivates all operators together.

transmitter(self: holoscan.core._core.OperatorBase, port_name: str) Optional[holoscan::Transmitter]#

Get the transmitter used by an output port.

Parameters#

port_namestr

The name of the output port.

Returns#

transmitterholoscan.resources.Transmitter or None

The transmitter used by this output port. Will be None if the port does not exist.