# SPDX-FileCopyrightText: Copyright (c) 2022-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module provides a Python API for the core C++ API classes.
The `Application` class is the primary class that should be derived from to
create a custom application.
.. autosummary::
holoscan.core.Application
holoscan.core.Arg
holoscan.core.ArgContainerType
holoscan.core.ArgElementType
holoscan.core.ArgList
holoscan.core.ArgType
holoscan.core.AsyncDataLoggerResource
holoscan.core.AsyncQueuePolicy
holoscan.core.CLIOptions
holoscan.core.Clock
holoscan.core.ClockInterface
holoscan.core.Component
holoscan.core.ComponentSpec
holoscan.core.Condition
holoscan.core.ConditionBase
holoscan.core.ConditionType
holoscan.core.Config
holoscan.core.DataFlowMetric
holoscan.core.DataFlowTracker
holoscan.core.DataLogger
holoscan.core.DataLoggerResource
holoscan.core.DefaultFragmentService
holoscan.core.DLDevice
holoscan.core.DLDeviceType
holoscan.core.ExecutionContext
holoscan.core.Executor
holoscan.core.Fragment
holoscan.core.FlowInfo
holoscan.core.Fragment
holoscan.core.Graph
holoscan.core.FragmentService
holoscan.core.InputContext
holoscan.core.IOSpec
holoscan.core.Message
holoscan.core.MetadataDictionary
holoscan.core.MetadataPolicy
holoscan.core.MultiMessageConditionInfo
holoscan.core.NetworkContext
holoscan.core.Operator
holoscan.core.OperatorBase
holoscan.core.OperatorSpec
holoscan.core.OperatorStatus
holoscan.core.OutputContext
holoscan.core.ParameterFlag
holoscan.core.arg_to_py_object
holoscan.core.arglist_to_kwargs
holoscan.core.Resource
holoscan.core.ResourceBase
holoscan.core.SchedulingStatusType
holoscan.core.ServiceDriverEndpoint
holoscan.core.ServiceWorkerEndpoint
holoscan.core.Subgraph
holoscan.core.Tensor
holoscan.core.Tracker
holoscan.core.kwargs_to_arglist
holoscan.core.py_object_to_arg
"""
import logging
import sys
# Note: Python 3.7+ expects the threading module to be initialized (imported) before additional
# threads are created (by C++ modules using pybind11).
# Otherwise you will get an assert tlock.locked() error on exit.
# (CLARAHOLOS-765)
import threading as _threading # noqa: F401, I001
import warnings
from collections.abc import Callable
# Add ThreadPoolExecutor to imports if not already there
from concurrent.futures import ThreadPoolExecutor
# Check stack size and warn if insufficient
try:
import resource
# Get current stack size limit
soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_STACK)
# Recommended minimum stack size (32 MB)
RECOMMENDED_STACK_SIZE = 32 * 1024 * 1024
# Check if soft limit is set and is below recommended size
if soft_limit != resource.RLIM_INFINITY and soft_limit < RECOMMENDED_STACK_SIZE:
warnings.warn(
f"Current stack size ({soft_limit / (1024 * 1024):.1f} MB) is below the "
f"recommended minimum ({RECOMMENDED_STACK_SIZE / (1024 * 1024):.1f} MB). "
f"This may cause segmentation faults or crashes. "
f"Consider increasing the stack size with 'ulimit -s {RECOMMENDED_STACK_SIZE // 1024}'"
f", or if using Docker, launch the container with "
f"'--ulimit stack={RECOMMENDED_STACK_SIZE}'.",
RuntimeWarning,
stacklevel=1,
)
except (ImportError, OSError, ValueError):
# resource module not available (e.g., on Windows) or error getting limits
pass
# Import statements for the C++ API classes
from ..graphs._graphs import FragmentGraph, OperatorGraph
from ._core import Application as _Application
from ._core import (
Arg,
ArgContainerType,
ArgElementType,
ArgList,
ArgType,
AsyncDataLoggerResource,
AsyncQueuePolicy,
CLIOptions,
Clock,
ClockInterface,
Component,
ConditionBase,
ConditionType,
Config,
DataFlowMetric,
DataFlowTracker,
DataLogger,
DataLoggerResource,
DistributedAppService,
DLDevice,
DLDeviceType,
Executor,
FlowInfo,
FragmentService,
IOSpec,
Message,
MetadataDictionary,
MetadataPolicy,
MultiMessageConditionInfo,
NetworkContext,
OperatorBase,
OperatorStatus,
ParameterFlag,
ResourceBase,
Scheduler,
SchedulingStatusType,
ServiceDriverEndpoint,
ServiceWorkerEndpoint,
arg_to_py_object,
arglist_to_kwargs,
kwargs_to_arglist,
py_object_to_arg,
)
from ._core import DefaultFragmentService as _DefaultFragmentService
from ._core import Fragment as _Fragment
from ._core import PyComponentSpec as ComponentSpec
from ._core import PyExecutionContext as ExecutionContext
from ._core import PyInputContext as InputContext
from ._core import PyOperatorSpec as OperatorSpec
from ._core import PyOutputContext as OutputContext
from ._core import PyRegistryContext as _RegistryContext
from ._core import PyTensor as Tensor
from ._core import Subgraph as _Subgraph
from ._core import register_types as _register_types
# Get a logger instance for this module
logger = logging.getLogger(__name__)
Graph = OperatorGraph # define alias for backward compatibility
__all__ = [
"Application",
"Arg",
"ArgContainerType",
"ArgElementType",
"ArgList",
"ArgType",
"AsyncDataLoggerResource",
"AsyncQueuePolicy",
"CLIOptions",
"Clock",
"ClockInterface",
"Component",
"ComponentSpec",
"ConditionType",
"Condition",
"ConditionBase",
"Config",
"DataFlowMetric",
"DataFlowTracker",
"DataLogger",
"DataLoggerResource",
"DefaultFragmentService",
"DistributedAppService",
"DLDevice",
"DLDeviceType",
"ExecutionContext",
"Executor",
"FlowInfo",
"Fragment",
"FragmentGraph",
"FragmentService",
"Graph",
"InputContext",
"IOSpec",
"Message",
"MetadataDictionary",
"MetadataPolicy",
"MultiMessageConditionInfo",
"NetworkContext",
"Operator",
"OperatorBase",
"OperatorSpec",
"OperatorStatus",
"OperatorGraph",
"OutputContext",
"ParameterFlag",
"Resource",
"ResourceBase",
"Scheduler",
"SchedulingStatusType",
"ServiceDriverEndpoint",
"ServiceWorkerEndpoint",
"START_OPERATOR_NAME",
"Subgraph",
"Tensor",
"Tracker",
"arg_to_py_object",
"arglist_to_kwargs",
"io_type_registry",
"kwargs_to_arglist",
"py_object_to_arg",
]
# define aliases for backwards compatibility
_Condition = ConditionBase
_Operator = OperatorBase
_Resource = ResourceBase
# Define custom __repr__ method for MetadataDictionary
def metadata_repr(self):
items = {k: v for k, v in self.items()}
return f"{items}"
# need these imports for ThreadPool return type of Fragment.make_thread_pool to work
from ..gxf._gxf import GXFResource as _GXFResource # noqa: E402, F401, I001
from ..resources import ThreadPool as _ThreadPool # noqa: E402, F401, I001
MetadataDictionary.__repr__ = metadata_repr
# Defines the special operator name used to initiate application execution.
# The GXF framework requires entity names to not begin with double underscores,
# so this distinctive name pattern is chosen to prevent naming collisions.
# This constant mirrors the C++ definition of `holoscan::kStartOperatorName`
# found in holoscan/core/fragment.hpp
START_OPERATOR_NAME = "<|start|>"
[docs]
class Application(_Application):
[docs]
def __init__(self, argv=None, *args, **kwargs):
# If no arguments are provided, instead of letting the C++ API initialize the application
# from the command line (through '/proc/self/cmdline'), we initialize the application
# with the command line arguments retrieved from the Python interpreter.
# This is because the C++ API will not be able to discard arguments that are not meant for
# the Python application.
# For example, if the user runs the application with the following
# command line arguments:
# /usr/bin/python3 -m pytest -v -k test_init /workspace/holoscan-sdk/public/python/tests
# then the C++ API will get the following arguments:
# ['/usr/bin/python3', '-m', 'pytest', '-v', '-k', 'test_init',
# '/workspace/holoscan-sdk/public/python/tests']
# whereas the Python interpreter (sys.argv) will get the following arguments:
# ['/usr/lib/python3/dist-packages/pytest.py', '-v', '-k', 'test_init',
# '/workspace/holoscan-sdk/public/python/tests']
# For the reason above, we initialize the application with the arguments:
# [sys.executable, *sys.argv]
# which will be equivalent to the following command line arguments:
# ['/usr/bin/python3', '/usr/lib/python3/dist-packages/pytest.py', '-v', '-k',
# 'test_init', '/workspace/holoscan-sdk/public/python/tests']
# and ``Application().argv`` will return the same arguments as ``sys.argv``.
if not argv:
argv = [sys.executable, *sys.argv]
# It is recommended to not use super()
# (https://pybind11.readthedocs.io/en/stable/advanced/classes.html#overriding-virtual-functions-in-python)
_Application.__init__(self, argv, *args, **kwargs)
self._async_executor = None
self._async_executor_lock = _threading.Lock()
self._start_op = None
[docs]
def run_async(self):
"""Run the application asynchronously using a shared executor.
This method uses a shared ThreadPoolExecutor associated with this
Application instance. The executor is created on the first call.
Call `shutdown_async_executor()` when done with async runs
to clean up resources.
Returns
-------
future : ``concurrent.futures.Future`` object
"""
# Ensure only one thread creates the executor
with self._async_executor_lock:
if self._async_executor is None:
# Create the executor ONCE
self._async_executor = ThreadPoolExecutor(
max_workers=1, thread_name_prefix=f"HoloscanApp_{self.name}_Async"
)
# Submit the job to the shared executor
return self._async_executor.submit(self.run)
[docs]
def shutdown_async_executor(self, wait=True):
"""Shuts down the shared asynchronous executor.
Call this method when the application instance is no longer needed
and asynchronous runs initiated by `run_async` should terminate.
Parameters
----------
wait : bool
If True (default), wait for running tasks to complete before shutting down.
If False, shut down immediately.
"""
# Use the lock to prevent race conditions with run_async
with self._async_executor_lock:
if self._async_executor is not None:
# Shutting down async executor
self._async_executor.shutdown(wait=wait)
self._async_executor = None
[docs]
def start_op(self):
"""Get or create the start operator for this application.
This operator is nothing but the first operator that was added to the application.
It has the name of `<|start|>` and has a condition of `CountCondition(1)`.
This Operator is used to start the execution of the application.
Entry operators who want to start the execution of the application should connect to this
operator.
If this method is not called, no start operator is created.
Otherwise, the start operator is created if it does not exist, and the start operator is
returned.
Returns
-------
Operator
The start operator instance. If it doesn't exist, it will be created with
a CountCondition(1).
"""
from ..conditions import CountCondition # noqa: PLC0415
if not self._start_op:
self._start_op = Operator(self, CountCondition(self, 1), name=START_OPERATOR_NAME)
self.add_operator(self._start_op)
return self._start_op
# If we created a context via `gxf.context_create` then we would need to
# call `gxf.context_destroy` in a destructor. However, in the __init__
# here, the C++ API creates the GXFExecutor and its context and the
# C++ object will also take care of the context deletion.
# def __del__(self):
# context_destroy(self._context)
def __del__(self):
# This is best-effort cleanup, not guaranteed to be called reliably.
# Avoid potentially blocking calls or complex logic here.
if self._async_executor is not None:
# Non-blocking shutdown is safer in __del__ if possible,
# but might leave work unfinished or resources dangling longer.
# Using wait=False might be preferable here, but check implications.
try:
self._async_executor.shutdown(wait=False) # Try non-blocking first
except Exception as e:
logger.error(
f"Error during __del__ executor shutdown for Application {self.name}: {e}",
exc_info=True,
)
finally:
self._async_executor = None
# copy docstrings defined in core_pydoc.hpp
Application.__doc__ = _Application.__doc__
Application.__init__.__doc__ = _Application.__init__.__doc__
[docs]
class Fragment(_Fragment):
[docs]
def __init__(self, app=None, name="", *args, **kwargs):
if app is not None and not isinstance(app, _Application):
raise ValueError(
"The first argument to a Fragment's constructor must be the Application "
"to which it belongs."
)
# It is recommended to not use super()
# (https://pybind11.readthedocs.io/en/stable/advanced/classes.html#overriding-virtual-functions-in-python)
_Fragment.__init__(self, self, *args, **kwargs)
self.name = name
self.application = app
# Set the fragment config to the application config.
if app:
self.config(app.config())
self._async_executor = None
self._async_executor_lock = _threading.Lock()
self._start_op = None
# Initialize the Python service registry for PyFragment
self._python_service_registry = {}
[docs]
def compose(self):
pass
[docs]
def run_async(self):
"""Run the fragment asynchronously using a shared executor.
This method uses a shared ThreadPoolExecutor associated with this
Application instance. The executor is created on the first call.
Call `shutdown_async_executor()` when done with async runs
to clean up resources.
Returns
-------
future : ``concurrent.futures.Future`` object
"""
# Ensure only one thread creates the executor
with self._async_executor_lock:
if self._async_executor is None:
# Create the executor ONCE
self._async_executor = ThreadPoolExecutor(
max_workers=1, thread_name_prefix=f"HoloscanFragment_{self.name}_Async"
)
# Submit the job to the shared executor
return self._async_executor.submit(self.run)
[docs]
def shutdown_async_executor(self, wait=True):
"""Shuts down the shared asynchronous executor.
Call this method when the application instance is no longer needed
and asynchronous runs initiated by `run_async` should terminate.
Parameters
----------
wait : bool
If True (default), wait for running tasks to complete before shutting down.
If False, shut down immediately.
"""
# Use the lock to prevent race conditions with run_async
with self._async_executor_lock:
if self._async_executor is not None:
# Shutting down async executor
self._async_executor.shutdown(wait=wait)
self._async_executor = None
[docs]
def start_op(self):
"""Get or create the start operator for this fragment.
This operator is nothing but the first operator that was added to the fragment.
It has the name of `<|start|>` and has a condition of `CountCondition(1)`.
This Operator is used to start the execution of the fragment.
Entry operators who want to start the execution of the fragment should connect to this
operator.
If this method is not called, no start operator is created.
Otherwise, the start operator is created if it does not exist, and the start operator is
returned.
Returns
-------
Operator
The start operator instance. If it doesn't exist, it will be created with
a CountCondition(1).
"""
from ..conditions import CountCondition # noqa: PLC0415
if not self._start_op:
self._start_op = Operator(self, CountCondition(self, 1), name=START_OPERATOR_NAME)
self.add_operator(self._start_op)
return self._start_op
def __del__(self):
# This is best-effort cleanup, not guaranteed to be called reliably.
# Avoid potentially blocking calls or complex logic here.
if getattr(self, "_async_executor", None):
# Non-blocking shutdown is safer in __del__ if possible,
# but might leave work unfinished or resources dangling longer.
# Using wait=False might be preferable here, but check implications.
try:
self._async_executor.shutdown(wait=False) # Try non-blocking first
except Exception as e:
logger.error(
f"Error during __del__ executor shutdown for Fragment {self.name}: {e}",
exc_info=True,
)
finally:
self._async_executor = None
# copy docstrings defined in core_pydoc.hpp
Fragment.__doc__ = _Fragment.__doc__
Fragment.__init__.__doc__ = _Fragment.__init__.__doc__
class Subgraph(_Subgraph):
def __init__(self, fragment: _Fragment | _Subgraph, instance_name: str):
if not isinstance(fragment, (_Fragment, _Subgraph)):
raise ValueError(
"The first argument to a Subgraph's constructor must be the Fragment "
"(Application) or Subgraph to which it belongs."
)
# It is recommended to not use super()
# (https://pybind11.readthedocs.io/en/stable/advanced/classes.html#overriding-virtual-functions-in-python)
_Subgraph.__init__(self, self, fragment, instance_name)
# store Fragment as an attribute so it is accessible from Operator constructor, etc.
if isinstance(fragment, _Subgraph):
fragment = fragment.fragment
self.fragment = fragment
# Compose immediately after Python object is fully constructed
# This matches the behavior of Fragment::make_subgraph() in C++
if not self.is_composed():
self.compose()
self.set_composed(True)
def compose(self):
pass
def add_flow(
self,
upstream: OperatorBase,
downstream: OperatorBase,
port_pairs: set[tuple[str, str]] | None = None,
connector_type: IOSpec.ConnectorType | None = None,
):
"""
Add a flow between components within this Subgraph.
This method delegates to the fragment's add_flow method, providing a convenient
way to connect operators and subgraphs within a Subgraph's compose() method.
Parameters
----------
upstream : Operator or Subgraph
The upstream component
downstream : Operator or Subgraph
The downstream component
port_pairs : set of tuple of str, optional
Port connections as (upstream_port, downstream_port) pairs
connector_type : IOSpec.ConnectorType, optional
The connector type to use for the connection
"""
# Make sure operator names are updated to the qualified name via `add_operator`
#
# Note: The name will already be prefixed for operators where a Subgraph was passed as the
# first argument to the constructor, but it is still safe to call `add_operator` as it
# guards against adding a second copy of the prefix. This will allow robustly making sure
# the prefix is added even if the Operator was constructed by passing the Fragment as the
# first argument to the constructor or if the Python bindings of a C++ operator did not use
# the `get_fragment_ptr_name_pair` utility function to handle the prefixing automatically
# during operator construction.
if isinstance(upstream, OperatorBase):
self.add_operator(upstream)
if isinstance(downstream, OperatorBase):
self.add_operator(downstream)
# Call the PyFragment::add_flow method to handle the connections. This ensures that the
# overloads handling additions to the PyFragment's python_operator_registry_ are used.
if connector_type is not None:
if port_pairs is not None:
self.fragment.add_flow(upstream, downstream, port_pairs, connector_type)
else:
self.fragment.add_flow(upstream, downstream, connector_type)
else:
if port_pairs is not None:
self.fragment.add_flow(upstream, downstream, port_pairs)
else:
self.fragment.add_flow(upstream, downstream)
def set_dynamic_flows(self, op: OperatorBase, func: Callable):
"""Set a callback function to define dynamic flows for an operator at runtime.
This method allows operators to modify their connections with other operators during execution.
The callback function is called after the operator executes and can add dynamic flows using
the operator's `add_dynamic_flow` methods.
Parameters
----------
op : holoscan.core.Operator
The operator for which to set dynamic flows.
dynamic_flow_func : callable
The callback function that defines the dynamic flows. Takes an operator as input and returns
``None``.
"""
self.fragment.set_dynamic_flows(op, func)
# copy docstrings defined in core_pydoc.hpp
Subgraph.__doc__ = _Subgraph.__doc__
Subgraph.__init__.__doc__ = _Subgraph.__init__.__doc__
[docs]
class Operator(OperatorBase):
_readonly_attributes = [
"fragment",
"conditions",
"resources",
"operator_type",
"description",
]
def __setattr__(self, name, value):
if name in self._readonly_attributes:
raise AttributeError(f'cannot override read-only property "{name}"')
super().__setattr__(name, value)
[docs]
def __init__(self, fragment, *args, **kwargs):
if not isinstance(fragment, (_Fragment, _Subgraph)):
raise ValueError(
"The first argument to an Operator's constructor must be the Fragment "
"(Application) or Subgraph to which it belongs."
)
# It is recommended to not use super()
# (https://pybind11.readthedocs.io/en/stable/advanced/classes.html#overriding-virtual-functions-in-python)
OperatorBase.__init__(self, self, fragment, *args, **kwargs)
# Create a PyOperatorSpec object and pass it to the C++ API
spec = OperatorSpec(fragment=self.fragment, op=self)
self.spec = spec
# Call setup method in PyOperator class
self.setup(spec)
[docs]
def setup(self, spec: OperatorSpec):
"""Default implementation of setup method."""
pass
[docs]
def initialize(self):
"""Default implementation of initialize"""
pass
[docs]
def start(self):
"""Default implementation of start"""
pass
[docs]
def compute(self, op_input, op_output, context):
"""Default implementation of compute"""
pass
[docs]
def stop(self):
"""Default implementation of stop"""
pass
# copy docstrings defined in core_pydoc.hpp
Operator.__doc__ = OperatorBase.__doc__
Operator.__doc__.replace(
"Base class representing either a wrapped C++ operator or native Python operator.",
"Native Python operator class.",
)
Operator.__init__.__doc__ = OperatorBase.__init__.__doc__
Operator.__init__.__doc__.replace(
"Base class representing either a wrapped C++ operator or native Python operator.",
"Native Python operator class.",
)
[docs]
class Condition(ConditionBase):
_readonly_attributes = [
"fragment",
"condition_type",
"description",
]
def __setattr__(self, name, value):
if name in self._readonly_attributes:
raise AttributeError(f'cannot override read-only property "{name}"')
super().__setattr__(name, value)
[docs]
def __init__(self, fragment, *args, **kwargs):
if not isinstance(fragment, (_Fragment, _Subgraph)):
raise ValueError(
"The first argument to an Operator's constructor must be the Fragment "
"(Application) or Subgraph to which it belongs."
)
# Extract receiver/transmitter kwargs for automatic argument detection
# This enables user-supplied condition detection in the C++ backend
# It is recommended to not use super()
# (https://pybind11.readthedocs.io/en/stable/advanced/classes.html#overriding-virtual-functions-in-python)
ConditionBase.__init__(self, self, fragment, *args, **kwargs)
# Create a PyComponentSpec object and pass it to the C++ API
spec = ComponentSpec(fragment=self.fragment, component=self)
self.spec = spec
# Call setup method in PyCondition class
self.setup(spec)
# If a "receiver_name" or "transmitter_name" kwarg was passed in so the
# Operator::find_ports_used_by_condition_args() will avoid adding a default condition to
# that port, then we should also add a C++ Parameter with that name to avoid a warning
# being raised about specifying an argument for which there is no corresponding parameter
# defined.
receiver_name = kwargs.get("receiver_name")
transmitter_name = kwargs.get("transmitter_name")
spec_repr = repr(spec)
# add Parameter for "receiver_name" if it doesn't already exist
if receiver_name is not None and "receiver_name" not in spec_repr:
spec.param("receiver_name", receiver_name)
# add Parameter for "transmitter_name" if it doesn't already exist
if transmitter_name is not None and "transmitter_name" not in spec_repr:
spec.param("transmitter_name", transmitter_name)
[docs]
def setup(self, spec: ComponentSpec):
"""Default implementation of setup method."""
pass
[docs]
def initialize(self):
"""Default implementation of initialize"""
pass
[docs]
def update_state(self, timestamp):
"""Default implementation of update_state
Parameters
----------
timestamp : int
The timestamp at which the update_state method was called.
Notes
-----
This method is always called by the underlying GXF framework immediately before the
`Condition.check` method. In some cases, the `Condition.on_execute` method may also wish
to call this method.
"""
pass
[docs]
def check(self, timestamp: int) -> tuple[SchedulingStatusType, int | None]:
"""Default implementation of check.
Parameters
----------
timestamp : int
The timestamp at which the check method is called. This method is called by the
underlying GXF framework to determine whether an operator is ready to execute.
Returns
-------
status_type: SchedulingStatusType
The current status of the operator. See the documentation on native condition
creation for explanations of the various status types.
target_timestamp: int or None
Specifies a specific target timestamp at which the operator is expected to be ready.
This should only be provided if relevant (it helps the underlying framework avoid
overhead of repeated checks before the target time).
Notes
-----
The method should return SchedulingStatusType.READY when the desired condition has been met.
The operator will always execute with this default implementation that always execute with
this default implementation.
"""
return SchedulingStatusType.READY, None
[docs]
def on_execute(self, timestamp):
"""Default implementation of on_execute
Parameters
----------
timestamp : int
The timestamp at which the on_execute method was called.
Notes
-----
This method is called by the underlying GXF framework immediately after the
`Operator.compute` call for the operator to which the condition has been assigned.
"""
pass
# copy docstrings defined in core_pydoc.hpp
Condition.__doc__ = ConditionBase.__doc__
Condition.__doc__.replace(
"Base class representing either a wrapped C++ condition or native Python condition.",
"Native Python condition class.",
)
Condition.__init__.__doc__ = ConditionBase.__init__.__doc__
Condition.__init__.__doc__.replace(
"Base class representing either a wrapped C++ condition or native Python condition.",
"Native Python condition class.",
)
class Resource(ResourceBase):
_readonly_attributes = [
"fragment",
"resource_type",
"description",
]
def __setattr__(self, name, value):
if name in self._readonly_attributes:
raise AttributeError(f'cannot override read-only property "{name}"')
super().__setattr__(name, value)
def __init__(self, fragment, *args, **kwargs):
if not isinstance(fragment, (_Fragment, _Subgraph)):
raise ValueError(
"The first argument to an Operator's constructor must be the Fragment "
"(Application) or Subgraph to which it belongs."
)
# It is recommended to not use super()
# (https://pybind11.readthedocs.io/en/stable/advanced/classes.html#overriding-virtual-functions-in-python)
ResourceBase.__init__(self, self, fragment, *args, **kwargs)
# Create a PyComponentSpec object and pass it to the C++ API
spec = ComponentSpec(fragment=self.fragment, component=self)
self.spec = spec
# Call setup method in PyResource class
self.setup(spec)
def setup(self, spec: ComponentSpec):
"""Default implementation of setup method."""
pass
# copy docstrings defined in core_pydoc.hpp
Resource.__doc__ = ResourceBase.__doc__
Resource.__doc__.replace(
"Base class representing either a wrapped C++ resource or native Python resource.",
"Native Python resource class.",
)
Resource.__init__.__doc__ = ResourceBase.__init__.__doc__
Resource.__init__.__doc__.replace(
"Base class representing either a wrapped C++ resource or native Python resource.",
"Native Python resource class.",
)
class DefaultFragmentService(_DefaultFragmentService):
"""Base class for fragment services in Python.
Provides default implementations of virtual methods to avoid
infinite recursion issues with pybind11 trampolines.
"""
def __init__(self, resource=None, *args, **kwargs):
"""Initialize the fragment service.
Parameters
----------
resource : Resource, optional
The underlying resource for this service.
"""
# Call the C++ base class constructor
if resource is not None:
_DefaultFragmentService.__init__(self, resource, *args, **kwargs)
else:
_DefaultFragmentService.__init__(self, *args, **kwargs)
self._resource_ref = resource
def resource(self, new_resource=None):
"""Get or set the underlying Resource associated with this service.
This method is called by the C++ backend.
Parameters
----------
new_resource : Resource or None
If provided, sets the resource. If None, acts as getter.
Returns
-------
Resource or None
The associated resource when called as a getter.
"""
if new_resource is not None:
self._resource_ref = new_resource
# We also need to call the C++ base class's resource setter
super().resource(new_resource)
return self._resource_ref
class Tracker:
"""Context manager to add data flow tracking to an application."""
def __init__(
self,
app,
*,
filename=None,
num_buffered_messages=100,
num_start_messages_to_skip=10,
num_last_messages_to_discard=10,
latency_threshold=0,
is_limited_tracking=False,
):
"""
Parameters
----------
app : holoscan.core.Application
on which flow tracking should be applied.
filename : str or None, optional
If none, logging to file will be disabled. Otherwise, logging will
write to the specified file.
num_buffered_messages : int, optional
Controls the number of messages buffered between file writing when
`filename` is not ``None``.
num_start_messages_to_skip : int, optional
The number of messages to skip at the beginning of the execution. This does not affect
the log file or the number of source messages metric.
num_last_messages_to_discard : int, optional
The number of messages to discard at the end of the execution. This does not affect
the log file or the number of source messages metric.
latency_threshold : int, optional
The minimum end-to-end latency in milliseconds to account for in the end-to-end
latency metric calculations.
is_limited_tracking : bool, optional
If true, the tracking is limited to root and leaf nodes, minimizing the timestamps by
avoiding intermediate operators.
"""
self.app = app
# Check the number of fragment nodes to see if it is a distributed app.
# Use compose_graph(), not compose() to protect against repeated compose() calls.
self.app.compose_graph()
self.is_distributed_app = len(app.fragment_graph.get_nodes()) > 0
self.enable_logging = filename is not None
if self.enable_logging:
self.logging_kwargs = dict(
filename=filename,
num_buffered_messages=num_buffered_messages,
)
self.tracker_kwargs = dict(
num_start_messages_to_skip=num_start_messages_to_skip,
num_last_messages_to_discard=num_last_messages_to_discard,
latency_threshold=latency_threshold,
is_limited_tracking=is_limited_tracking,
)
def __enter__(self):
if self.is_distributed_app:
self.trackers = self.app.track_distributed(**self.tracker_kwargs)
for tracker in self.trackers.values():
if self.enable_logging:
tracker.enable_logging(**self.logging_kwargs)
return self.trackers
else:
self.tracker = self.app.track(**self.tracker_kwargs)
if self.enable_logging:
self.tracker.enable_logging(**self.logging_kwargs)
return self.tracker
def __exit__(self, exc_type, exc_value, exc_tb):
if self.enable_logging:
if self.is_distributed_app:
for tracker in self.trackers.values():
tracker.end_logging()
else:
self.tracker.end_logging()
_registry_context = _RegistryContext()
io_type_registry = _registry_context.registry()
_register_types(io_type_registry)