Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve type annotations for Python connectors. #498

Merged
1 commit merged into from
Mar 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 38 additions & 24 deletions fleetspeak_python/fleetspeak/server_connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,25 +357,31 @@ class ServiceClient(metaclass=abc.ABCMeta):
@abc.abstractmethod
def __init__(
self,
service_name,
):
service_name: str,
) -> None:
"""Abstract constructor for ServiceClient.

Args:
service_name: string; The Fleetspeak service name to communicate with.
service_name: The Fleetspeak service name to communicate with.
"""

@abc.abstractmethod
def Send(self, message):
def Send(
self,
message: common_pb2.Message,
) -> None:
"""Sends a message to the Fleetspeak server."""

@abc.abstractmethod
def Listen(self, process_callback):
def Listen(
self,
callback: Callable[[common_pb2.Message, grpc.ServicerContext], None],
) -> None:
"""Listens to messages from the Fleetspeak server.

Args:
process_callback: A callback to be executed when a messages arrives from
the Fleetspeak server. See the process argument of Servicer.__init__.
callback: A callback to be executed when a messages arrives from the
Fleetspeak server. See the process argument of Servicer.__init__.
"""


Expand All @@ -392,24 +398,23 @@ class InsecureGRPCServiceClient(ServiceClient):

def __init__(
self,
service_name,
fleetspeak_message_listen_address=None,
fleetspeak_server=None,
threadpool_size=5,
service_name: str,
fleetspeak_message_listen_address: Optional[str] = None,
fleetspeak_server: Optional[str] = None,
threadpool_size: int = 5,
):
"""Constructor.

Args:
service_name: string The name of the service to communicate as.
fleetspeak_message_listen_address: string The connection's read end
address. If unset, the argv flag fleetspeak_message_listen_address will
be used. If still unset, the connection will not be open for reading and
Listen() will raise NotConfigured.
fleetspeak_server: string The connection's write end address. If unset,
the argv flag fleetspeak_server will be used. If still unset, the
connection will not be open for writing and Send() will raise
NotConfigured.
threadpool_size: int The number of threads to use to process messages.
service_name: The name of the service to communicate as.
fleetspeak_message_listen_address: The connection's read end address. If
unset, the argv flag fleetspeak_message_listen_address will be used. If
still unset, the connection will not be open for reading and Listen()
will raise NotConfigured.
fleetspeak_server: The connection's write end address. If unset, the argv
flag fleetspeak_server will be used. If still unset, the connection will
not be open for writing and Send() will raise NotConfigured.
threadpool_size: The number of threads to use to process messages.

Raises:
NotConfigured:
Expand Down Expand Up @@ -449,24 +454,33 @@ def __init__(
"Fleetspeak GRPCService client connected to %s", fleetspeak_server
)

def Send(self, message):
def Send(
self,
message: common_pb2.Message,
) -> None:
"""Send one message.

Deprecated, users should migrate to call self.outgoing.InsertMessage
directly.

Args:
message: A message to send.
"""
if not self.outgoing:
raise NotConfigured("Send address not provided.")
self.outgoing.InsertMessage(message)

def Listen(self, process):
def Listen(
self,
callback: Callable[[common_pb2.Message, grpc.ServicerContext], None],
) -> None:
if self._listen_address is None:
raise NotConfigured("Listen address not provided.")
self._server = grpc.server(
futures.ThreadPoolExecutor(max_workers=self._threadpool_size)
)
self._server.add_insecure_port(self._listen_address)
servicer = Servicer(process, self._service_name)
servicer = Servicer(callback, self._service_name)
grpcservice_pb2_grpc.add_ProcessorServicer_to_server(servicer, self._server)
self._server.start()
logging.info(
Expand Down
Loading