monai.deploy.core.Fragment#

class monai.deploy.core.Fragment(self: holoscan.core._core.Fragment, arg0: object)[source]#

Bases: Fragment

Fragment class.

Fragment class.

Methods

__init__(self, arg0)

Fragment class.

add_data_logger(self, logger)

Add a data logger to the fragment.

add_default_green_context_pool(self, dev_id, ...)

Add a default green context pool to the fragment.

add_flow(*args, **kwargs)

Overloaded function.

add_operator(self, op)

Add an operator to the fragment.

compose(self)

The compose method of the Fragment.

config(*args, **kwargs)

Overloaded function.

config_keys(self)

The set of keys present in the fragment's configuration file.

enable_metadata(self, enabled)

Method to set whether operator metadata transmission is enabled by default for an operators added to this fragment.

from_config(self, key)

Retrieve parameters from the associated configuration.

get_default_green_context_pool(self)

Get the default green context pool associated with the fragment.

kwargs(self, key)

Retrieve a dictionary parameters from the associated configuration.

make_thread_pool(self, name, initialize_size)

Create a ThreadPool associated with this Fragment.

network_context(*args, **kwargs)

Overloaded function.

register_service(self, service[, id])

Register a fragment service instance.

run(self)

The run method of the Fragment.

run_async()

Run the fragment asynchronously using a shared executor.

scheduler(*args, **kwargs)

Overloaded function.

service(self, service_type[, id])

Retrieve a registered fragment service.

set_dynamic_flows(self, op, dynamic_flow_func)

Set a callback function to define dynamic flows for an operator at runtime.

shutdown_async_executor([wait])

Shuts down the shared asynchronous executor.

start_op()

Get or create the start operator for this fragment.

stop_execution(self[, op_name])

Stop the execution of all operators in the fragment.

track(self, num_start_messages_to_skip, ...)

The track method of the fragment (or application).

Attributes

application

The application associated with the fragment.

data_loggers

Get the data loggers associated with this fragment.

executor

Get the executor associated with the fragment.

graph

Get the computation graph (Graph node is an Operator) associated with the fragment.

is_metadata_enabled

Property to get or set the boolean controlling whether operator metadata transmission is enabled.

metadata_policy

The default metadata policy (holoscan.core.MetadataPolicy) associated with the fragment.

name

The fragment's name.

__init__(self: holoscan.core._core.Fragment, arg0: object) None[source]#

Fragment class.

add_data_logger(self: holoscan.core._core.Fragment, logger: holoscan.core._core.DataLogger) None#

Add a data logger to the fragment.

Parameters#

loggerholoscan.core.DataLogger

The shared pointer to the data logger to add.

add_default_green_context_pool(self: holoscan.core._core.Fragment, dev_id: int, sms_per_partition: list[int] = [], default_context_index: int = -1, min_sm_size: int = 2) holoscan::CudaGreenContextPool#

Add a default green context pool to the fragment.

Parameters#

dev_idint

The device ID to use for the green context pool.

sms_per_partitionlist[int], optional

The number of SMs per partition to use for the green context pool.

default_context_indexint, optional

The index of the default context to use for the green context pool.

min_sm_sizeint, optional

The minimum size of the SM to use for the green context pool.

add_flow(*args, **kwargs)#

Overloaded function.

  1. add_flow(self: holoscan.core._core.Fragment, upstream_op: holoscan.core._core.OperatorBase, downstream_op: holoscan.core._core.OperatorBase) -> None

  2. add_flow(self: holoscan.core._core.Fragment, upstream_op: holoscan.core._core.OperatorBase, downstream_op: holoscan.core._core.OperatorBase, port_pairs: set[tuple[str, str]]) -> None

  3. add_flow(self: holoscan.core._core.Fragment, upstream_op: holoscan.core._core.OperatorBase, downstream_op: holoscan.core._core.OperatorBase, connector_type: holoscan.core._core.IOSpec.ConnectorType) -> None

  4. add_flow(self: holoscan.core._core.Fragment, upstream_op: holoscan.core._core.OperatorBase, downstream_op: holoscan.core._core.OperatorBase, port_pairs: set[tuple[str, str]], connector_type: holoscan.core._core.IOSpec.ConnectorType) -> None

Connect two operators associated with the fragment.

Parameters#

upstream_opholoscan.core.Operator

Source operator.

downstream_opholoscan.core.Operator

Destination operator.

port_pairsSequence of (str, str) tuples

Sequence of ports to connect. The first element of each 2-tuple is a port from upstream_op while the second element is the port of downstream_op to which it connects.

Notes#

This is an overloaded function. Additional variants exist:

1.) For the Application class there is a variant where the first two arguments are of type holoscan.core.Fragment instead of holoscan.core.Operator. This variant is used in building multi-fragment applications.

2.) Subgraph variants: Connect operators and subgraphs in any combination
  • add_flow(operator, subgraph, port_pairs=None)

  • add_flow(subgraph, operator, port_pairs=None)

  • add_flow(subgraph, subgraph, port_pairs=None)

3.) All variants support connector_type parameter:
  • add_flow(…, connector_type)

  • add_flow(…, port_pairs, connector_type)

4.) There are also variants that omit the port_pairs argument that are applicable when there is only a single output on the upstream operator/fragment/subgraph and a single input on the downstream operator/fragment/subgraph.

  1. add_flow(self: holoscan.core._core.Fragment, upstream_op: holoscan.core._core.OperatorBase, downstream_subgraph: holoscan::Subgraph, port_pairs: set[tuple[str, str]] = set()) -> None

  2. add_flow(self: holoscan.core._core.Fragment, upstream_subgraph: holoscan::Subgraph, downstream_op: holoscan.core._core.OperatorBase, port_pairs: set[tuple[str, str]] = set()) -> None

  3. add_flow(self: holoscan.core._core.Fragment, upstream_subgraph: holoscan::Subgraph, downstream_subgraph: holoscan::Subgraph, port_pairs: set[tuple[str, str]] = set()) -> None

  4. add_flow(self: holoscan.core._core.Fragment, upstream_op: holoscan.core._core.OperatorBase, downstream_subgraph: holoscan::Subgraph, connector_type: holoscan.core._core.IOSpec.ConnectorType) -> None

  5. add_flow(self: holoscan.core._core.Fragment, upstream_op: holoscan.core._core.OperatorBase, downstream_subgraph: holoscan::Subgraph, port_pairs: set[tuple[str, str]], connector_type: holoscan.core._core.IOSpec.ConnectorType) -> None

  6. add_flow(self: holoscan.core._core.Fragment, upstream_subgraph: holoscan::Subgraph, downstream_op: holoscan.core._core.OperatorBase, connector_type: holoscan.core._core.IOSpec.ConnectorType) -> None

  7. add_flow(self: holoscan.core._core.Fragment, upstream_subgraph: holoscan::Subgraph, downstream_op: holoscan.core._core.OperatorBase, port_pairs: set[tuple[str, str]], connector_type: holoscan.core._core.IOSpec.ConnectorType) -> None

  8. add_flow(self: holoscan.core._core.Fragment, upstream_subgraph: holoscan::Subgraph, downstream_subgraph: holoscan::Subgraph, connector_type: holoscan.core._core.IOSpec.ConnectorType) -> None

  9. add_flow(self: holoscan.core._core.Fragment, upstream_subgraph: holoscan::Subgraph, downstream_subgraph: holoscan::Subgraph, port_pairs: set[tuple[str, str]], connector_type: holoscan.core._core.IOSpec.ConnectorType) -> None

add_operator(self: holoscan.core._core.Fragment, op: holoscan.core._core.OperatorBase) None#

Add an operator to the fragment.

Parameters#

opholoscan.core.Operator

The operator to add.

property application#

The application associated with the fragment.

Returns#

app : holoscan.core.Application

compose(self: holoscan.core._core.Fragment) None[source]#

The compose method of the Fragment.

This method should be called after config, but before run in order to compose the computation graph.

config(*args, **kwargs)#

Overloaded function.

  1. config(self: holoscan.core._core.Fragment, config_file: str, prefix: str = ‘’) -> None

Configuration class.

Represents configuration parameters as read from a YAML file.

Parameters#

configstr or holoscan.core.Config

The path to the configuration file (in YAML format) or a holoscan.core.Config object.

prefixstr, optional

Prefix path for the` config` file. Only available in the overloaded variant that takes a string for config.

Raises#

RuntimeError

If the config is a string path that is non-empty and the file doesn’t exist.

  1. config(self: holoscan.core._core.Fragment, arg0: holoscan.core._core.Config) -> None

  2. config(self: holoscan.core._core.Fragment) -> holoscan.core._core.Config

config_keys(self: holoscan.core._core.Fragment) set[str]#

The set of keys present in the fragment’s configuration file.

property data_loggers#

Get the data loggers associated with this fragment.

Returns#

list[holoscan.core.DataLogger]

A list of data loggers associated with this fragment.

enable_metadata(self: holoscan.core._core.Fragment, enabled: bool) None#

Method to set whether operator metadata transmission is enabled by default for an operators added to this fragment. Individual operators can override this default via Operator.enable_metadata.

property executor#

Get the executor associated with the fragment.

from_config(self: holoscan.core._core.Fragment, key: str) object#

Retrieve parameters from the associated configuration.

Parameters#

keystr

The key within the configuration file to retrieve. This can also be a specific component of the parameter via syntax ‘key.sub_key’.

Returns#

argsholoscan.core.ArgList

An argument list associated with the key.

get_default_green_context_pool(self: holoscan.core._core.Fragment) holoscan::CudaGreenContextPool#

Get the default green context pool associated with the fragment.

property graph#

Get the computation graph (Graph node is an Operator) associated with the fragment.

property is_metadata_enabled#

Property to get or set the boolean controlling whether operator metadata transmission is enabled.

Notes#

Setting metadata to be enabled/disabled via this method is deprecated. Please use enable_metadata instead.

kwargs(self: holoscan.core._core.Fragment, key: str) dict#

Retrieve a dictionary parameters from the associated configuration.

Parameters#

keystr

The key within the configuration file to retrieve. This can also be a specific component of the parameter via syntax ‘key.sub_key’.

Returns#

kwargsdict

A Python dict containing the parameters in the configuration file under the specified key.

make_thread_pool(self: holoscan.core._core.Fragment, name: str, initialize_size: int = 1) holoscan::ThreadPool#

Create a ThreadPool associated with this Fragment.

The add method must be used to add individual operators to the pool.

Parameters#

namestr

A name for the thread pool.

initialize_size1

The initial number of threads in the pool.

property metadata_policy#

The default metadata policy (holoscan.core.MetadataPolicy) associated with the fragment.

Individual operators can override this via Operator.metadata_policy.

The supported policies are:

  • MetadataPolicy.REJECT: Reject the new value if the key already exists

  • MetadataPolicy.UPDATE: Replace existing value with the new one if the key already exists

  • MetadataPolicy.INPLACE_UPDATE: Update the value stored within an existing MetadataObject in-place if the key already exists (in contrast to UPDATE which always replaces the existing MetadataObject with a new one).

  • MetadataPolicy.RAISE: Raise an exception if the key already exists

property name#

The fragment’s name.

Returns#

name : str

network_context(*args, **kwargs)#

Overloaded function.

  1. network_context(self: holoscan.core._core.Fragment, network_context: holoscan.core._core.NetworkContext) -> None

Assign a network context to the Fragment

Parameters#

network_contextholoscan.core.NetworkContext

A network_context class instance to be used by the underlying GXF executor. If unspecified, no network context will be used.

  1. network_context(self: holoscan.core._core.Fragment) -> holoscan.core._core.NetworkContext

Get the network context to be used by the Fragment

register_service(self: object, service: object, id: str = '') None#

Register a fragment service instance.

Registers an already created fragment service instance with the specified identifier. This allows the fragment service to be retrieved later using the service() method.

A service must be an instance of a class that inherits from holoscan.core.Resource or holoscan.core.FragmentService.

Parameters#

serviceholoscan.core.Resource or holoscan.core.FragmentService

The service instance to register.

idstr, optional

The identifier for the service registration. If empty: - For Resource instances: The resource’s name will be used as the identifier. - For other services: A unique identifier based on the service type will be generated. Note: When registering a Resource, the id parameter must be empty or will be ignored.

Raises#

TypeError

If the service is not a Resource or FragmentService instance.

RuntimeError

If the service registration fails.

Notes#

Resources are automatically wrapped in a DefaultFragmentService when registered. For Python-defined services, a unique identifier is generated if no id is provided.

run(self: holoscan.core._core.Fragment) None#

The run method of the Fragment.

This method runs the computation. It must have first been initialized via config and compose.

run_async()[source]#

Run the fragment asynchronously using a shared executor.

This method uses a shared ThreadPoolExecutor associated with this Application instance. The executor is created on the first call. Call shutdown_async_executor() when done with async runs to clean up resources.

Returns#

future : concurrent.futures.Future object

scheduler(*args, **kwargs)#

Overloaded function.

  1. scheduler(self: holoscan.core._core.Fragment, scheduler: holoscan.core._core.Scheduler) -> None

Assign a scheduler to the Fragment.

Parameters#

schedulerholoscan.core.Scheduler

A scheduler class instance to be used by the underlying GXF executor. If unspecified, the default is a holoscan.gxf.GreedyScheduler.

  1. scheduler(self: holoscan.core._core.Fragment) -> holoscan.core._core.Scheduler

Get the scheduler to be used by the Fragment.

service(self: object, service_type: type, id: str = '') object#

Retrieve a registered fragment service.

Retrieves a previously registered fragment service by its type and optional identifier. The lookup process checks the Python service registry first, then falls back to the C++ service registry if needed.

Parameters#

service_typetype

The type of the fragment service to retrieve. Must be a type that inherits from Resource or FragmentService.

idstr, optional

The identifier of the fragment service. If empty, retrieves by service type only. For Resources, this would typically be the resource’s name.

Returns#

object or None

The fragment service instance of the requested type, or None if not found. If the service wraps a Resource and a Resource type is requested, the unwrapped Resource instance is returned.

Notes#

For services that wrap Resources, the method will automatically unwrap and return the Resource if a Resource type is requested.

set_dynamic_flows(self: holoscan.core._core.Fragment, op: holoscan.core._core.OperatorBase, dynamic_flow_func: Callable) None#

Set a callback function to define dynamic flows for an operator at runtime.

This method allows operators to modify their connections with other operators during execution. The callback function is called after the operator executes and can add dynamic flows using the operator’s add_dynamic_flow() methods.

Parameters#

opholoscan.core.Operator

The operator for which to set dynamic flows.

dynamic_flow_funccallable

The callback function that defines the dynamic flows. Takes an operator as input and returns None.

shutdown_async_executor(wait=True)[source]#

Shuts down the shared asynchronous executor.

Call this method when the application instance is no longer needed and asynchronous runs initiated by run_async should terminate.

Parameters#

waitbool

If True (default), wait for running tasks to complete before shutting down. If False, shut down immediately.

start_op()[source]#

Get or create the start operator for this fragment.

This operator is nothing but the first operator that was added to the fragment. It has the name of <|start|> and has a condition of CountCondition(1). This Operator is used to start the execution of the fragment. Entry operators who want to start the execution of the fragment should connect to this operator.

If this method is not called, no start operator is created. Otherwise, the start operator is created if it does not exist, and the start operator is returned.

Returns#

Operator

The start operator instance. If it doesn’t exist, it will be created with a CountCondition(1).

stop_execution(self: holoscan.core._core.Fragment, op_name: str = '') None#

Stop the execution of all operators in the fragment.

This method is used to stop the execution of all operators in the fragment by setting the internal async condition of each operator to EVENT_NEVER state, which sets the scheduling condition to NEVER. Once stopped, the operators will not be scheduled for execution (the compute() method will not be called), which may lead to application termination depending on the application’s design.

Note that executing this method does not trigger the operators’ stop() method. The stop() method is called only when the scheduler deactivates all operators together.

Parameters#

op_namestr, optional

The name of the operator to stop. If empty, all operators will be stopped.

track(self: holoscan.core._core.Fragment, num_start_messages_to_skip: int = 10, num_last_messages_to_discard: int = 10, latency_threshold: int = 0, is_limited_tracking: bool = False) holoscan::DataFlowTracker#

The track method of the fragment (or application).

This method enables data frame flow tracking and returns a DataFlowTracker object which can be used to display metrics data for profiling an application.

Parameters#

num_start_messages_to_skipint

The number of messages to skip at the beginning.

num_last_messages_to_discardint

The number of messages to discard at the end.

latency_thresholdint

The minimum end-to-end latency in milliseconds to account for in the end-to-end latency metric calculations

is_limited_trackingbool, optional

If True, the tracking is limited to root and leaf nodes, minimizing the timestamps by avoiding intermediate operators.

Returns#

trackerholoscan.core.DataFlowTracker

The data flow tracker object that can be used to display metrics data for profiling along the different paths through the computation graph.