Skip to content

Commit

Permalink
forward diagnostics messages to crossbar
Browse files Browse the repository at this point in the history
  • Loading branch information
atiderko committed Dec 15, 2023
1 parent f387f42 commit bef5468
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -504,3 +504,46 @@ def __init__(self, version: str, date: str) -> None:

def __repr__(self) -> str:
return f"DaemonVersion<version: {self.version}, date: {self.date}>"


class DiagnosticArray:
"""
This message is used to send diagnostic information about the state of the host.
:param timestamp:
:param status: an array of components being reported on.
"""
def __init__(self, timestamp: float, status: []) -> None:
self.timestamp = timestamp
self.status = status


class DiagnosticStatus:
"""
This message holds the status of an individual component of the host.
:param level: level of operation enumerated above
:param name: a description of the test/component reporting
:param message: a description of the status
:param hardware_id: a hardware unique string
:param values: an array of values associated with the status
"""

# Possible levels of operations
class LevelType:
OK = 0
WARN = 1
ERROR = 2
STALE = 3

class KeyValue:
def __init__(self, key: str, value: str) -> None:
self.key = key
self.value = value

def __init__(self, level: LevelType, name: str, message: str, hardware_id: str, values: List[KeyValue]) -> None:
self.level = level
self.name = name
self.message = message
self.hardware_id = hardware_id
self.values = values

2 changes: 1 addition & 1 deletion fkie_multimaster_pylib/fkie_multimaster_pylib/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def default(self):
'version': {':value': self.version, ':ro': True},
'file': {':value': self.filename, ':ro': True},
'grpc_timeout': {':value': 15.0, ':type': 'float', ':min': 0, ':default': 15.0, ':hint': "timeout for connection to remote gRPC-server"},
'use_diagnostics_agg': {':value': False, ':hint': "subscribes to '/diagnostics_agg' topic instead of '/diagnostics'"},
'use_diagnostics_agg': {':value': True, ':hint': "subscribes to '/diagnostics_agg' topic instead of '/diagnostics'"},
'reset': {':value': False, ':hint': 'if this flag is set to True the configuration will be reseted'},
'grpc_verbosity': {':value': 'INFO', ':alt': ['DEBUG', 'INFO', 'ERROR'], ':hint': 'change gRPC verbosity', ':need_restart': True},
'grpc_poll_strategy': {':value': '', ':alt': ['', 'poll', 'epollex', 'epoll1'], ':hint': 'change the strategy if you get warnings. Empty sets to default.', ':need_restart': True}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@


class DiagnosticObj(DiagnosticStatus):

def __init__(self, msg=DiagnosticStatus(), timestamp=0):
self.msg = msg
self.timestamp = timestamp
Expand All @@ -67,21 +66,22 @@ def __gt__(self, item):


class Service:

def __init__(self, settings):
def __init__(self, settings, callbackDiagnostics=None):
self._settings = settings
self._mutex = threading.RLock()
self._diagnostics = [] # DiagnosticObj
self.use_diagnostics_agg = settings.param(
'global/use_diagnostics_agg', False)
self._callbackDiagnostics = callbackDiagnostics
self.use_diagnostics_agg = settings.param("global/use_diagnostics_agg", True)
self._sub_diag_agg = None
self._sub_diag = None
if self.use_diagnostics_agg:
self._sub_diag_agg = rospy.Subscriber(
'/diagnostics_agg', DiagnosticArray, self._callback_diagnostics)
"/diagnostics_agg", DiagnosticArray, self._callback_diagnostics
)
else:
self._sub_diag = rospy.Subscriber(
'/diagnostics', DiagnosticArray, self._callback_diagnostics)
"/diagnostics", DiagnosticArray, self._callback_diagnostics
)
hostname = socket.gethostname()

self.sensors = []
Expand All @@ -95,7 +95,7 @@ def __init__(self, settings):
self._settings.add_reload_listener(self.reload_parameter)

def reload_parameter(self, settings):
value = settings.param('global/use_diagnostics_agg', False)
value = settings.param("global/use_diagnostics_agg", True)
if value != self.use_diagnostics_agg:
if self._sub_diag is not None:
self._sub_diag.unregister()
Expand All @@ -105,10 +105,12 @@ def reload_parameter(self, settings):
self._sub_diag_agg = None
if value:
self._sub_diag_agg = rospy.Subscriber(
'/diagnostics_agg', DiagnosticArray, self._callback_diagnostics)
"/diagnostics_agg", DiagnosticArray, self._callback_diagnostics
)
else:
self._sub_diag = rospy.Subscriber(
'/diagnostics', DiagnosticArray, self._callback_diagnostics)
"/diagnostics", DiagnosticArray, self._callback_diagnostics
)
self.use_diagnostics_agg = value

def _callback_diagnostics(self, msg):
Expand All @@ -124,6 +126,8 @@ def _callback_diagnostics(self, msg):
except Exception:
diag_obj = DiagnosticObj(status, stamp)
self._diagnostics.append(diag_obj)
if self._callbackDiagnostics:
self._callbackDiagnostics(msg)

def get_system_diagnostics(self, filter_level=0, filter_ts=0):
result = DiagnosticArray()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
from fkie_node_manager_daemon.monitor import Service, grpc_msg
import fkie_multimaster_msgs.grpc.monitor_pb2_grpc as mgrpc
import fkie_multimaster_msgs.grpc.monitor_pb2 as mmsg
from fkie_multimaster_pylib.crossbar.runtime_interface import DiagnosticArray
from fkie_multimaster_pylib.crossbar.runtime_interface import DiagnosticStatus
from fkie_multimaster_pylib.crossbar.runtime_interface import SystemEnvironment
from fkie_multimaster_pylib.crossbar.runtime_interface import SystemInformation
from fkie_multimaster_pylib.crossbar.base_session import CrossbarBaseSession
Expand All @@ -60,7 +62,7 @@ def __init__(
Log.info("Create monitor servicer")
mgrpc.MonitorServiceServicer.__init__(self)
CrossbarBaseSession.__init__(self, loop, realm, port, test_env=test_env)
self._monitor = Service(settings)
self._monitor = Service(settings, self.diagnosticsCbPublisher)

def stop(self):
self._monitor.stop()
Expand Down Expand Up @@ -91,6 +93,34 @@ def GetUser(self, request, context):
reply.user = getpass.getuser()
return reply

def _toCrossbarDiagnostics(self, rosmsg):
cbMsg = DiagnosticArray(
float(rosmsg.header.stamp.secs)
+ float(rosmsg.header.stamp.nsecs) / 1000000000.0, []
)
for sensor in rosmsg.status:
values = []
for v in sensor.values:
values.append(DiagnosticStatus.KeyValue(v.key, v.value))
status = DiagnosticStatus(
sensor.level, sensor.name, sensor.message, sensor.hardware_id, values
)
cbMsg.status.append(status)
return cbMsg

def diagnosticsCbPublisher(self, rosmsg):
self.publish_to(
"ros.provider.diagnostics",
json.dumps(self._toCrossbarDiagnostics(rosmsg), cls=SelfEncoder),
)

@wamp.register("ros.provider.get_diagnostics")
def getDiagnostics(self) -> DiagnosticArray:
Log.info("crossbar: get diagnostics")
rosmsg = self._monitor.get_diagnostics(0, 0)
# copy message to the crossbar structure
return json.dumps(self._toCrossbarDiagnostics(rosmsg), cls=SelfEncoder)

@wamp.register("ros.provider.get_system_info")
def getSystemInfo(self) -> SystemInformation:
Log.info("crossbar: get system info")
Expand Down

0 comments on commit bef5468

Please sign in to comment.