monai.deploy.core.Application#
- class monai.deploy.core.Application(self: holoscan.core._core.Application, argv: list[str] = [])[source]#
Bases:
ApplicationApplication class.
This constructor parses the command line for flags that are recognized by App Driver/Worker, and removes all recognized flags so users can use the remaining flags for their own purposes.
If the arguments are not specified, the arguments are retrieved from
sys.executableandsys.argv.The arguments after processing arguments (parsing Holoscan-specific flags and removing them) are accessible through the
argvattribute.Parameters#
- argvList[str]
The command line arguments to parse. The first item should be the path to the python executable. If not specified,
[sys.executable, *sys.argv]is used.
Examples#
>>> from holoscan.core import Application >>> import sys >>> Application().argv == sys.argv True >>> Application([]).argv == sys.argv True >>> Application([sys.executable, *sys.argv]).argv == sys.argv True >>> Application(["python3", "myapp.py", "--driver", "--address=10.0.0.1", "my_arg1"]).argv ['myapp.py', 'my_arg1']
Application class.
This constructor parses the command line for flags that are recognized by App Driver/Worker, and removes all recognized flags so users can use the remaining flags for their own purposes.
If the arguments are not specified, the arguments are retrieved from
sys.executableandsys.argv.The arguments after processing arguments (parsing Holoscan-specific flags and removing them) are accessible through the
argvattribute.Parameters#
- argvList[str]
The command line arguments to parse. The first item should be the path to the python executable. If not specified,
[sys.executable, *sys.argv]is used.
Examples#
>>> from holoscan.core import Application >>> import sys >>> Application().argv == sys.argv True >>> Application([]).argv == sys.argv True >>> Application([sys.executable, *sys.argv]).argv == sys.argv True >>> Application(["python3", "myapp.py", "--driver", "--address=10.0.0.1", "my_arg1"]).argv ['myapp.py', 'my_arg1']
Methods
__init__(self[, argv])Application class.
add_data_logger(self, logger)Add a data logger to the application.
add_default_green_context_pool(self, dev_id, ...)Add a default green context pool to the fragment.
add_flow(*args, **kwargs)Overloaded function.
add_fragment(self, frag)Add a fragment to the application.
add_operator(self, op)Add an operator to the application.
compose(self)The compose method of the application.
compose_graph(self)This is a wrapper around compose that only calls compose if the graph is not already composed.
config(*args, **kwargs)Overloaded function.
config_keys(self)The set of keys present in the fragment's configuration file.
enable_metadata(self, enabled)Method to set whether operator metadata transmission is enabled by default for fragments in this application.
from_config(self, key)Retrieve parameters from the associated configuration.
Get the default green context pool associated with the fragment.
init_app_context([runtime_env])Initializes the app context with arguments and well-known environment variables.
kwargs(self, key)Retrieve a dictionary parameters from the associated configuration.
make_thread_pool(self, name, initialize_size)Create a ThreadPool associated with this Fragment.
network_context(*args, **kwargs)Overloaded function.
register_service(self, service[, id])Register a fragment service instance.
run(self)The run method of the application.
Run the application asynchronously using a shared executor.
scheduler(*args, **kwargs)Overloaded function.
service(self, service_type[, id])Retrieve a registered fragment service.
set_dynamic_flows(self, op, dynamic_flow_func)Set a callback function to define dynamic flows for an operator at runtime.
shutdown_async_executor([wait])Shuts down the shared asynchronous executor.
start_op()Get or create the start operator for this application.
stop_execution(self[, op_name])Stop the execution of all operators in the fragment.
track(self, num_start_messages_to_skip, ...)The track method of the fragment (or application).
track_distributed(self[, ...])Attributes
The application associated with the fragment.
The command line arguments after processing flags.
Get the data loggers associated with this fragment.
The application's description.
Get the executor associated with the fragment.
Get the computation graph (Graph node is a Fragment) associated with the application.
Get the computation graph (Graph node is an Operator) associated with the fragment.
Whether operator metadata transmission is enabled by default for the application.
The default metadata policy (
holoscan.core.MetadataPolicy) associated with the application.The fragment's name.
The reference to the CLI options.
The application's version.
- __init__(self: holoscan.core._core.Application, argv: list[str] = []) None[source]#
Application class.
This constructor parses the command line for flags that are recognized by App Driver/Worker, and removes all recognized flags so users can use the remaining flags for their own purposes.
If the arguments are not specified, the arguments are retrieved from
sys.executableandsys.argv.The arguments after processing arguments (parsing Holoscan-specific flags and removing them) are accessible through the
argvattribute.Parameters#
- argvList[str]
The command line arguments to parse. The first item should be the path to the python executable. If not specified,
[sys.executable, *sys.argv]is used.
Examples#
>>> from holoscan.core import Application >>> import sys >>> Application().argv == sys.argv True >>> Application([]).argv == sys.argv True >>> Application([sys.executable, *sys.argv]).argv == sys.argv True >>> Application(["python3", "myapp.py", "--driver", "--address=10.0.0.1", "my_arg1"]).argv ['myapp.py', 'my_arg1']
- add_data_logger(self: holoscan.core._core.Application, logger: holoscan.core._core.DataLogger) None#
Add a data logger to the application.
This method adds a data logger to the application. For distributed applications, the data logger is added to each fragment in the fragment graph.
Parameters#
- loggerholoscan.core.DataLogger
The data logger to add.
- add_default_green_context_pool(self: holoscan.core._core.Fragment, dev_id: int, sms_per_partition: list[int] = [], default_context_index: int = -1, min_sm_size: int = 2) holoscan::CudaGreenContextPool#
Add a default green context pool to the fragment.
Parameters#
- dev_idint
The device ID to use for the green context pool.
- sms_per_partitionlist[int], optional
The number of SMs per partition to use for the green context pool.
- default_context_indexint, optional
The index of the default context to use for the green context pool.
- min_sm_sizeint, optional
The minimum size of the SM to use for the green context pool.
- add_flow(*args, **kwargs)#
Overloaded function.
add_flow(self: holoscan.core._core.Application, upstream_op: holoscan.core._core.OperatorBase, downstream_op: holoscan.core._core.OperatorBase) -> None
add_flow(self: holoscan.core._core.Application, upstream_op: holoscan.core._core.OperatorBase, downstream_op: holoscan.core._core.OperatorBase, port_pairs: set[tuple[str, str]]) -> None
add_flow(self: holoscan.core._core.Application, upstream_op: holoscan.core._core.OperatorBase, downstream_op: holoscan.core._core.OperatorBase, connector_type: holoscan.core._core.IOSpec.ConnectorType) -> None
add_flow(self: holoscan.core._core.Application, upstream_op: holoscan.core._core.OperatorBase, downstream_op: holoscan.core._core.OperatorBase, port_pairs: set[tuple[str, str]], connector_type: holoscan.core._core.IOSpec.ConnectorType) -> None
Connect two operators associated with the fragment.
Parameters#
- upstream_opholoscan.core.Operator
Source operator.
- downstream_opholoscan.core.Operator
Destination operator.
- port_pairsSequence of (str, str) tuples
Sequence of ports to connect. The first element of each 2-tuple is a port from upstream_op while the second element is the port of downstream_op to which it connects.
Notes#
This is an overloaded function. Additional variants exist:
1.) For the Application class there is a variant where the first two arguments are of type holoscan.core.Fragment instead of holoscan.core.Operator. This variant is used in building multi-fragment applications.
- 2.) Subgraph variants: Connect operators and subgraphs in any combination
add_flow(operator, subgraph, port_pairs=None)
add_flow(subgraph, operator, port_pairs=None)
add_flow(subgraph, subgraph, port_pairs=None)
- 3.) All variants support connector_type parameter:
add_flow(…, connector_type)
add_flow(…, port_pairs, connector_type)
4.) There are also variants that omit the port_pairs argument that are applicable when there is only a single output on the upstream operator/fragment/subgraph and a single input on the downstream operator/fragment/subgraph.
add_flow(self: holoscan.core._core.Application, upstream_frag: holoscan.core._core.Fragment, downstream_frag: holoscan.core._core.Fragment, port_pairs: set[tuple[str, str]]) -> None
add_flow(self: holoscan.core._core.Application, upstream_op: holoscan.core._core.OperatorBase, downstream_subgraph: holoscan.core._core.Subgraph, port_pairs: set[tuple[str, str]] = set()) -> None
add_flow(self: holoscan.core._core.Application, upstream_subgraph: holoscan.core._core.Subgraph, downstream_op: holoscan.core._core.OperatorBase, port_pairs: set[tuple[str, str]] = set()) -> None
add_flow(self: holoscan.core._core.Application, upstream_subgraph: holoscan.core._core.Subgraph, downstream_subgraph: holoscan.core._core.Subgraph, port_pairs: set[tuple[str, str]] = set()) -> None
add_flow(self: holoscan.core._core.Application, upstream_op: holoscan.core._core.OperatorBase, downstream_subgraph: holoscan.core._core.Subgraph, connector_type: holoscan.core._core.IOSpec.ConnectorType) -> None
add_flow(self: holoscan.core._core.Application, upstream_op: holoscan.core._core.OperatorBase, downstream_subgraph: holoscan.core._core.Subgraph, port_pairs: set[tuple[str, str]], connector_type: holoscan.core._core.IOSpec.ConnectorType) -> None
add_flow(self: holoscan.core._core.Application, upstream_subgraph: holoscan.core._core.Subgraph, downstream_op: holoscan.core._core.OperatorBase, connector_type: holoscan.core._core.IOSpec.ConnectorType) -> None
add_flow(self: holoscan.core._core.Application, upstream_subgraph: holoscan.core._core.Subgraph, downstream_op: holoscan.core._core.OperatorBase, port_pairs: set[tuple[str, str]], connector_type: holoscan.core._core.IOSpec.ConnectorType) -> None
add_flow(self: holoscan.core._core.Application, upstream_subgraph: holoscan.core._core.Subgraph, downstream_subgraph: holoscan.core._core.Subgraph, connector_type: holoscan.core._core.IOSpec.ConnectorType) -> None
add_flow(self: holoscan.core._core.Application, upstream_subgraph: holoscan.core._core.Subgraph, downstream_subgraph: holoscan.core._core.Subgraph, port_pairs: set[tuple[str, str]], connector_type: holoscan.core._core.IOSpec.ConnectorType) -> None
- add_fragment(self: holoscan.core._core.Application, frag: holoscan.core._core.Fragment) None#
Add a fragment to the application.
Parameters#
- fragholoscan.core.Fragment
The fragment to add.
- add_operator(self: holoscan.core._core.Application, op: holoscan.core._core.OperatorBase) None#
Add an operator to the application.
Parameters#
- opholoscan.core.Operator
The operator to add.
- property application#
The application associated with the fragment.
Returns#
app : holoscan.core.Application
- property argv#
The command line arguments after processing flags. This does not include the python executable like sys.argv does.
Returns#
argv : list of str
- compose(self: holoscan.core._core.Application) None#
The compose method of the application.
This method should be called after
config, but before the graph starts running in order to compose the computation graph. This method will be called automatically byApplication.run, so it is not normally necessary to call it directly.
- compose_graph(self: holoscan.core._core.Application) None#
This is a wrapper around compose that only calls compose if the graph is not already composed.
- config(*args, **kwargs)#
Overloaded function.
config(self: holoscan.core._core.Fragment, config_file: str, prefix: str = ‘’) -> None
Configuration class.
Represents configuration parameters as read from a YAML file.
Parameters#
- configstr or holoscan.core.Config
The path to the configuration file (in YAML format) or a holoscan.core.Config object.
- prefixstr, optional
Prefix path for the` config` file. Only available in the overloaded variant that takes a string for config.
Raises#
- RuntimeError
If the config is a string path that is non-empty and the file doesn’t exist.
config(self: holoscan.core._core.Fragment, arg0: holoscan.core._core.Config) -> None
config(self: holoscan.core._core.Fragment) -> holoscan.core._core.Config
- config_keys(self: holoscan.core._core.Fragment) set[str]#
The set of keys present in the fragment’s configuration file.
- property data_loggers#
Get the data loggers associated with this fragment.
Returns#
- list[holoscan.core.DataLogger]
A list of data loggers associated with this fragment.
- enable_metadata(self: holoscan.core._core.Application, enabled: bool) None#
Method to set whether operator metadata transmission is enabled by default for fragments in this application. Individual fragments can override this default via Fragment.enable_metadata. Similarly individual operators can override their fragment default via Operator.enable_metadata.
- property executor#
Get the executor associated with the fragment.
- property fragment_graph#
Get the computation graph (Graph node is a Fragment) associated with the application.
- from_config(self: holoscan.core._core.Fragment, key: str) object#
Retrieve parameters from the associated configuration.
Parameters#
- keystr
The key within the configuration file to retrieve. This can also be a specific component of the parameter via syntax ‘key.sub_key’.
Returns#
- argsholoscan.core.ArgList
An argument list associated with the key.
- get_default_green_context_pool(self: holoscan.core._core.Fragment) holoscan::CudaGreenContextPool#
Get the default green context pool associated with the fragment.
- property graph#
Get the computation graph (Graph node is an Operator) associated with the fragment.
- init_app_context(runtime_env=None)#
Initializes the app context with arguments and well-known environment variables.
The arguments, if passed in, override the attributes set with environment variables.
- Parameters:
argv (Optional[List[str]], optional) – arguments passed to the program. Defaults to None.
- Returns:
the AppContext object
- Return type:
- property is_metadata_enabled#
Whether operator metadata transmission is enabled by default for the application.
Notes#
Setting metadata to be enabled/disabled via this method is deprecated. Please use enable_metadata instead.
- kwargs(self: holoscan.core._core.Fragment, key: str) dict#
Retrieve a dictionary parameters from the associated configuration.
Parameters#
- keystr
The key within the configuration file to retrieve. This can also be a specific component of the parameter via syntax ‘key.sub_key’.
Returns#
- kwargsdict
A Python dict containing the parameters in the configuration file under the specified key.
- make_thread_pool(self: holoscan.core._core.Fragment, name: str, initialize_size: int = 1) holoscan::ThreadPool#
Create a ThreadPool associated with this Fragment.
The add method must be used to add individual operators to the pool.
Parameters#
- namestr
A name for the thread pool.
- initialize_size1
The initial number of threads in the pool.
- property metadata_policy#
The default metadata policy (
holoscan.core.MetadataPolicy) associated with the application.Individual fragments of a distributed application can override this via
Fragment.metadata_policy. Similarly, individual operators can override this viaOperator.metadata_policy.The supported policies are:
MetadataPolicy.REJECT: Reject the new value if the key already exists
MetadataPolicy.UPDATE: Replace existing value with the new one if the key already exists
MetadataPolicy.INPLACE_UPDATE: Update the value stored within an existing MetadataObject in-place if the key already exists (in contrast to UPDATE which always replaces the existing MetadataObject with a new one).
MetadataPolicy.RAISE: Raise an exception if the key already exists
- network_context(*args, **kwargs)#
Overloaded function.
network_context(self: holoscan.core._core.Fragment, network_context: holoscan.core._core.NetworkContext) -> None
Assign a network context to the Fragment
Parameters#
- network_contextholoscan.core.NetworkContext
A network_context class instance to be used by the underlying GXF executor. If unspecified, no network context will be used.
network_context(self: holoscan.core._core.Fragment) -> holoscan.core._core.NetworkContext
Get the network context to be used by the Fragment
- register_service(self: object, service: object, id: str = '') None#
Register a fragment service instance.
Registers an already created fragment service instance with the specified identifier. This allows the fragment service to be retrieved later using the service() method.
A service must be an instance of a class that inherits from holoscan.core.Resource or holoscan.core.FragmentService.
Parameters#
- serviceholoscan.core.Resource or holoscan.core.FragmentService
The service instance to register.
- idstr, optional
The identifier for the service registration. If empty: - For Resource instances: The resource’s name will be used as the identifier. - For other services: A unique identifier based on the service type will be generated. Note: When registering a Resource, the id parameter must be empty or will be ignored.
Raises#
- TypeError
If the service is not a Resource or FragmentService instance.
- RuntimeError
If the service registration fails.
Notes#
Resources are automatically wrapped in a DefaultFragmentService when registered. For Python-defined services, a unique identifier is generated if no id is provided.
- run(self: holoscan.core._core.Application) None#
The run method of the application.
This method runs the computation. It must have first been initialized via config and compose.
- run_async()[source]#
Run the application asynchronously using a shared executor.
This method uses a shared ThreadPoolExecutor associated with this Application instance. The executor is created on the first call. Call shutdown_async_executor() when done with async runs to clean up resources.
Returns#
future :
concurrent.futures.Futureobject
- scheduler(*args, **kwargs)#
Overloaded function.
scheduler(self: holoscan.core._core.Fragment, scheduler: holoscan.core._core.Scheduler) -> None
Assign a scheduler to the Fragment.
Parameters#
- schedulerholoscan.core.Scheduler
A scheduler class instance to be used by the underlying GXF executor. If unspecified, the default is a holoscan.gxf.GreedyScheduler.
scheduler(self: holoscan.core._core.Fragment) -> holoscan.core._core.Scheduler
Get the scheduler to be used by the Fragment.
- service(self: object, service_type: type, id: str = '') object#
Retrieve a registered fragment service.
Retrieves a previously registered fragment service by its type and optional identifier. The lookup process checks the Python service registry first, then falls back to the C++ service registry if needed.
Parameters#
- service_typetype
The type of the fragment service to retrieve. Must be a type that inherits from Resource or FragmentService.
- idstr, optional
The identifier of the fragment service. If empty, retrieves by service type only. For Resources, this would typically be the resource’s name.
Returns#
- object or None
The fragment service instance of the requested type, or
Noneif not found. If the service wraps a Resource and a Resource type is requested, the unwrapped Resource instance is returned.
Notes#
For services that wrap Resources, the method will automatically unwrap and return the Resource if a Resource type is requested.
- set_dynamic_flows(self: holoscan.core._core.Fragment, op: holoscan.core._core.OperatorBase, dynamic_flow_func: Callable) None#
Set a callback function to define dynamic flows for an operator at runtime.
This method allows operators to modify their connections with other operators during execution. The callback function is called after the operator executes and can add dynamic flows using the operator’s add_dynamic_flow() methods.
Parameters#
- opholoscan.core.Operator
The operator for which to set dynamic flows.
- dynamic_flow_funccallable
The callback function that defines the dynamic flows. Takes an operator as input and returns
None.
- shutdown_async_executor(wait=True)[source]#
Shuts down the shared asynchronous executor.
Call this method when the application instance is no longer needed and asynchronous runs initiated by run_async should terminate.
Parameters#
- waitbool
If True (default), wait for running tasks to complete before shutting down. If False, shut down immediately.
- start_op()[source]#
Get or create the start operator for this application.
This operator is nothing but the first operator that was added to the application. It has the name of <|start|> and has a condition of CountCondition(1). This Operator is used to start the execution of the application. Entry operators who want to start the execution of the application should connect to this operator.
If this method is not called, no start operator is created. Otherwise, the start operator is created if it does not exist, and the start operator is returned.
Returns#
- Operator
The start operator instance. If it doesn’t exist, it will be created with a CountCondition(1).
- stop_execution(self: holoscan.core._core.Fragment, op_name: str = '') None#
Stop the execution of all operators in the fragment.
This method is used to stop the execution of all operators in the fragment by setting the internal async condition of each operator to EVENT_NEVER state, which sets the scheduling condition to NEVER. Once stopped, the operators will not be scheduled for execution (the compute() method will not be called), which may lead to application termination depending on the application’s design.
Note that executing this method does not trigger the operators’ stop() method. The stop() method is called only when the scheduler deactivates all operators together.
Parameters#
- op_namestr, optional
The name of the operator to stop. If empty, all operators will be stopped.
- track(self: holoscan.core._core.Fragment, num_start_messages_to_skip: int = 10, num_last_messages_to_discard: int = 10, latency_threshold: int = 0, is_limited_tracking: bool = False) holoscan::DataFlowTracker#
The track method of the fragment (or application).
This method enables data frame flow tracking and returns a DataFlowTracker object which can be used to display metrics data for profiling an application.
Parameters#
- num_start_messages_to_skipint
The number of messages to skip at the beginning.
- num_last_messages_to_discardint
The number of messages to discard at the end.
- latency_thresholdint
The minimum end-to-end latency in milliseconds to account for in the end-to-end latency metric calculations
- is_limited_trackingbool, optional
If
True, the tracking is limited to root and leaf nodes, minimizing the timestamps by avoiding intermediate operators.
Returns#
- trackerholoscan.core.DataFlowTracker
The data flow tracker object that can be used to display metrics data for profiling along the different paths through the computation graph.
- track_distributed(self: holoscan.core._core.Application, num_start_messages_to_skip: int = 10, num_last_messages_to_discard: int = 10, latency_threshold: int = 0, is_limited_tracking: bool = False) dict[str, holoscan::DataFlowTracker]#