From bef54689f08fcffce20c7a0f9541dfd7ee0df5e0 Mon Sep 17 00:00:00 2001 From: Alexander Tiderko Date: Fri, 15 Dec 2023 13:31:46 +0100 Subject: [PATCH] forward diagnostics messages to crossbar --- .../crossbar/runtime_interface.py | 43 +++++++++++++++++++ .../fkie_multimaster_pylib/settings.py | 2 +- .../monitor/service.py | 24 ++++++----- .../monitor_servicer.py | 32 +++++++++++++- 4 files changed, 89 insertions(+), 12 deletions(-) diff --git a/fkie_multimaster_pylib/fkie_multimaster_pylib/crossbar/runtime_interface.py b/fkie_multimaster_pylib/fkie_multimaster_pylib/crossbar/runtime_interface.py index 0c09fb0e..967f981e 100644 --- a/fkie_multimaster_pylib/fkie_multimaster_pylib/crossbar/runtime_interface.py +++ b/fkie_multimaster_pylib/fkie_multimaster_pylib/crossbar/runtime_interface.py @@ -504,3 +504,46 @@ def __init__(self, version: str, date: str) -> None: def __repr__(self) -> str: return f"DaemonVersion" + + +class DiagnosticArray: + """ + This message is used to send diagnostic information about the state of the host. +:param timestamp: +:param status: an array of components being reported on. + + """ + def __init__(self, timestamp: float, status: []) -> None: + self.timestamp = timestamp + self.status = status + + +class DiagnosticStatus: + """ + This message holds the status of an individual component of the host. + :param level: level of operation enumerated above + :param name: a description of the test/component reporting + :param message: a description of the status + :param hardware_id: a hardware unique string + :param values: an array of values associated with the status + """ + + # Possible levels of operations + class LevelType: + OK = 0 + WARN = 1 + ERROR = 2 + STALE = 3 + + class KeyValue: + def __init__(self, key: str, value: str) -> None: + self.key = key + self.value = value + + def __init__(self, level: LevelType, name: str, message: str, hardware_id: str, values: List[KeyValue]) -> None: + self.level = level + self.name = name + self.message = message + self.hardware_id = hardware_id + self.values = values + diff --git a/fkie_multimaster_pylib/fkie_multimaster_pylib/settings.py b/fkie_multimaster_pylib/fkie_multimaster_pylib/settings.py index 0991a76e..6a2ccfa1 100644 --- a/fkie_multimaster_pylib/fkie_multimaster_pylib/settings.py +++ b/fkie_multimaster_pylib/fkie_multimaster_pylib/settings.py @@ -53,7 +53,7 @@ def default(self): 'version': {':value': self.version, ':ro': True}, 'file': {':value': self.filename, ':ro': True}, 'grpc_timeout': {':value': 15.0, ':type': 'float', ':min': 0, ':default': 15.0, ':hint': "timeout for connection to remote gRPC-server"}, - 'use_diagnostics_agg': {':value': False, ':hint': "subscribes to '/diagnostics_agg' topic instead of '/diagnostics'"}, + 'use_diagnostics_agg': {':value': True, ':hint': "subscribes to '/diagnostics_agg' topic instead of '/diagnostics'"}, 'reset': {':value': False, ':hint': 'if this flag is set to True the configuration will be reseted'}, 'grpc_verbosity': {':value': 'INFO', ':alt': ['DEBUG', 'INFO', 'ERROR'], ':hint': 'change gRPC verbosity', ':need_restart': True}, 'grpc_poll_strategy': {':value': '', ':alt': ['', 'poll', 'epollex', 'epoll1'], ':hint': 'change the strategy if you get warnings. Empty sets to default.', ':need_restart': True} diff --git a/fkie_node_manager_daemon/src/fkie_node_manager_daemon/monitor/service.py b/fkie_node_manager_daemon/src/fkie_node_manager_daemon/monitor/service.py index 43377a68..5a0e1171 100644 --- a/fkie_node_manager_daemon/src/fkie_node_manager_daemon/monitor/service.py +++ b/fkie_node_manager_daemon/src/fkie_node_manager_daemon/monitor/service.py @@ -45,7 +45,6 @@ class DiagnosticObj(DiagnosticStatus): - def __init__(self, msg=DiagnosticStatus(), timestamp=0): self.msg = msg self.timestamp = timestamp @@ -67,21 +66,22 @@ def __gt__(self, item): class Service: - - def __init__(self, settings): + def __init__(self, settings, callbackDiagnostics=None): self._settings = settings self._mutex = threading.RLock() self._diagnostics = [] # DiagnosticObj - self.use_diagnostics_agg = settings.param( - 'global/use_diagnostics_agg', False) + self._callbackDiagnostics = callbackDiagnostics + self.use_diagnostics_agg = settings.param("global/use_diagnostics_agg", True) self._sub_diag_agg = None self._sub_diag = None if self.use_diagnostics_agg: self._sub_diag_agg = rospy.Subscriber( - '/diagnostics_agg', DiagnosticArray, self._callback_diagnostics) + "/diagnostics_agg", DiagnosticArray, self._callback_diagnostics + ) else: self._sub_diag = rospy.Subscriber( - '/diagnostics', DiagnosticArray, self._callback_diagnostics) + "/diagnostics", DiagnosticArray, self._callback_diagnostics + ) hostname = socket.gethostname() self.sensors = [] @@ -95,7 +95,7 @@ def __init__(self, settings): self._settings.add_reload_listener(self.reload_parameter) def reload_parameter(self, settings): - value = settings.param('global/use_diagnostics_agg', False) + value = settings.param("global/use_diagnostics_agg", True) if value != self.use_diagnostics_agg: if self._sub_diag is not None: self._sub_diag.unregister() @@ -105,10 +105,12 @@ def reload_parameter(self, settings): self._sub_diag_agg = None if value: self._sub_diag_agg = rospy.Subscriber( - '/diagnostics_agg', DiagnosticArray, self._callback_diagnostics) + "/diagnostics_agg", DiagnosticArray, self._callback_diagnostics + ) else: self._sub_diag = rospy.Subscriber( - '/diagnostics', DiagnosticArray, self._callback_diagnostics) + "/diagnostics", DiagnosticArray, self._callback_diagnostics + ) self.use_diagnostics_agg = value def _callback_diagnostics(self, msg): @@ -124,6 +126,8 @@ def _callback_diagnostics(self, msg): except Exception: diag_obj = DiagnosticObj(status, stamp) self._diagnostics.append(diag_obj) + if self._callbackDiagnostics: + self._callbackDiagnostics(msg) def get_system_diagnostics(self, filter_level=0, filter_ts=0): result = DiagnosticArray() diff --git a/fkie_node_manager_daemon/src/fkie_node_manager_daemon/monitor_servicer.py b/fkie_node_manager_daemon/src/fkie_node_manager_daemon/monitor_servicer.py index 8868bfaf..5261d3c5 100644 --- a/fkie_node_manager_daemon/src/fkie_node_manager_daemon/monitor_servicer.py +++ b/fkie_node_manager_daemon/src/fkie_node_manager_daemon/monitor_servicer.py @@ -41,6 +41,8 @@ from fkie_node_manager_daemon.monitor import Service, grpc_msg import fkie_multimaster_msgs.grpc.monitor_pb2_grpc as mgrpc import fkie_multimaster_msgs.grpc.monitor_pb2 as mmsg +from fkie_multimaster_pylib.crossbar.runtime_interface import DiagnosticArray +from fkie_multimaster_pylib.crossbar.runtime_interface import DiagnosticStatus from fkie_multimaster_pylib.crossbar.runtime_interface import SystemEnvironment from fkie_multimaster_pylib.crossbar.runtime_interface import SystemInformation from fkie_multimaster_pylib.crossbar.base_session import CrossbarBaseSession @@ -60,7 +62,7 @@ def __init__( Log.info("Create monitor servicer") mgrpc.MonitorServiceServicer.__init__(self) CrossbarBaseSession.__init__(self, loop, realm, port, test_env=test_env) - self._monitor = Service(settings) + self._monitor = Service(settings, self.diagnosticsCbPublisher) def stop(self): self._monitor.stop() @@ -91,6 +93,34 @@ def GetUser(self, request, context): reply.user = getpass.getuser() return reply + def _toCrossbarDiagnostics(self, rosmsg): + cbMsg = DiagnosticArray( + float(rosmsg.header.stamp.secs) + + float(rosmsg.header.stamp.nsecs) / 1000000000.0, [] + ) + for sensor in rosmsg.status: + values = [] + for v in sensor.values: + values.append(DiagnosticStatus.KeyValue(v.key, v.value)) + status = DiagnosticStatus( + sensor.level, sensor.name, sensor.message, sensor.hardware_id, values + ) + cbMsg.status.append(status) + return cbMsg + + def diagnosticsCbPublisher(self, rosmsg): + self.publish_to( + "ros.provider.diagnostics", + json.dumps(self._toCrossbarDiagnostics(rosmsg), cls=SelfEncoder), + ) + + @wamp.register("ros.provider.get_diagnostics") + def getDiagnostics(self) -> DiagnosticArray: + Log.info("crossbar: get diagnostics") + rosmsg = self._monitor.get_diagnostics(0, 0) + # copy message to the crossbar structure + return json.dumps(self._toCrossbarDiagnostics(rosmsg), cls=SelfEncoder) + @wamp.register("ros.provider.get_system_info") def getSystemInfo(self) -> SystemInformation: Log.info("crossbar: get system info")