From 89581e1084dbf177d76d86044ac75435f50e8f3b Mon Sep 17 00:00:00 2001 From: Laura Lindzey <lindzey@uw.edu> Date: Tue, 26 Apr 2022 12:47:27 -0700 Subject: [PATCH 1/7] First cut at centralizing config file parsing --- src/message_queue_node.py | 125 ++++++++++++++++++++++++++++---------- 1 file changed, 92 insertions(+), 33 deletions(-) diff --git a/src/message_queue_node.py b/src/message_queue_node.py index 17c2e577..44745d88 100755 --- a/src/message_queue_node.py +++ b/src/message_queue_node.py @@ -2,7 +2,7 @@ from __future__ import unicode_literals from itertools import groupby -from typing import Any, Deque, List, Optional, Tuple, Union +from typing import Any, Deque, Dict, List, NewType, Optional, Tuple, Union import rospy from rospy.msg import AnyMsg @@ -27,6 +27,66 @@ from version_node import version_node #TODO: move this from packet_dispatch_node import DecoderListEntry +QueueId = NewType('QueueId', int) +MessageCodecId = NewType('MessageCodecId', int) + +@dataclass +class QueueParams: + subscribe_topic: str + publish_topic: str + codec_id: MessageCodecId + queue_order: str # TODO: I'd like this to be an enum corresponding to 'fifo', 'lifo' (rigbht now, it's "if 'fifo': / else:") + queue_maxsize: int + priority: int + is_active: bool + destination: int + allow_fragmentation: bool + +class AcommsConfiguration(object): + """ + Includes all the relevant configuration info for a list of queues and the codecs that they use. + + Raises ValueError in case of an invalid configuration file. + """ + def __init__(self, packet_codec_param): + print(packet_codec_param) + # raise(Exception) + # TODO: Fill these in!!! (Or decide that there's no need for them...) + self.topic_from_id: Dict[QueueId, str] = {} + self.id_from_topic: Dict[str, QueueId] = {} + # Queues are indexed by their ID because that's what goes in the header + # to identify the appropriate publication parameters for packet dispatch + self.queue_params: Dict[QueueId, QueueParams] = {} + self.message_codecs: Dict[MessageCodecId, QueueParams] = {} + + config_version = packet_codec_param.get('config_version') + if config_version is None: + # Use legacy, where we have a one-to-one mapping between queues and message codecs + # All relevation is in the message_codecs field + for message_codec_param in packet_codec_param['message_codecs']: + both_ids = message_codec_param["id"] + self.queue_params[both_ids] = QueueParams(message_codec_param["subscribe_topic"], + message_codec_param["publish_topic"], + both_ids, + message_codec_param["queue_order"], + message_codec_param["queue_maxsize"], + message_codec_param["priority"], # NB: current implementation MessageQueueNode has a default priority + message_codec_param.get("is_active", True), + message_codec_param["default_dest"], # NB: there's also a default destination at the MessageQueueNode level + message_codec_param.get("allow_fragmentation", False), + ) + ros_type_name = message_codec_param['ros_type'] + msg_class = roslib.message.get_message_class(ros_type_name) + if not msg_class: + raise TypeError('Invalid ROS type "{}" for codec ID {}' + .format(ros_type_name, both_ids)) + self.message_codecs[both_ids] = RosMessageCodec(ros_type=msg_class, fields_dict=message_codec_param['fields']) + + else: + # Use new version that separates the queue and codec definitions. + raise(Exception("NYI")) + + @dataclass class QueuedMessage: @@ -242,7 +302,7 @@ class MessageQueueNode(object): def __init__(self): rospy.init_node('message_queue', log_level=rospy.DEBUG) self.update_rate = rospy.get_param('~update_rate', 1) - self.unknown_dests_are_reachable = rospy.get_param('~unknown_dests_are_reachable', True) + self.unknown_dests_are_reachable: bool = rospy.get_param('~unknown_dests_are_reachable', True) self.default_dest = rospy.get_param('~default_dest', 121) self.default_priority = rospy.get_param('~default_priority', 10) @@ -280,52 +340,51 @@ class MessageQueueNode(object): # when queuing for TX, we go from topic -> message -> packet # However, organizing it this way allows us to use a single config YAML for both TX and RX packet_codec_param = rospy.get_param('~packet_codecs') - self.packet_codecs = {} + + # QUESTION(lindzey): Do we ever have more than one packet codec? I'm + # not convinced that the code in this block actually handles that case + # appropriately, unless different packet codecs are guaranteed to + # have non-overlapping destinations and subscribe topics. for packet_codec_details in packet_codec_param: rospy.loginfo("Packet codec details: {}".format(packet_codec_details)) packet_codec_class = packet_codecs[packet_codec_details['packet_codec']] - message_codecs = {} - for message_codec_params in packet_codec_details['message_codecs']: - ros_type_name = message_codec_params['ros_type'] - msg_class = roslib.message.get_message_class(ros_type_name) - codec_id = message_codec_params['id'] - if not msg_class: - raise TypeError('Invalid ROS type "{}" for message codec {}'.format(ros_type_name, codec_id)) - message_codec = RosMessageCodec(ros_type=msg_class, fields_dict=message_codec_params['fields']) - message_codecs[codec_id] = message_codec + # This assumes that there's only one ... which seems to be an assumption that the rest of the code also makes? + # TODO: consider replacing with queue_params, message_codecs = parse_acomms_configuration(...) + self.acomms_configuration = AcommsConfiguration(packet_codec_param[0]) - # TODO clean this up - self.destination_reachable[ - message_codec_params['default_dest']] = True if self.unknown_dests_are_reachable else False + for queue_id, params in self.acomms_configuration.queue_params.items(): - subscribe_topic = message_codec_params['subscribe_topic'] - queue_active_status = message_codec_params.get('is_active', True) - priority = message_codec_params['priority'] + # QUESTION: this makes it a non-optional parameter, but later, it is accessed with a default value. + # Which do we want? Have the node's default propagate to the queue? Or not (and get rid of the node's)? + # default_dest = message_codec_params['default_dest'] + # self.destination_reachable[default_dest] = True if self.unknown_dests_are_reachable else False + self.destination_reachable[params.destination] = self.unknown_dests_are_reachable - if queue_active_status: - self.active_queue_names.add(subscribe_topic) + if params.is_active: + self.active_queue_names.add(params.subscribe_topic) new_queue = MessageQueue(master_message_queue=self._queued_messages, - dest_address=message_codec_params.get('default_dest', self.default_dest), - priority=message_codec_params.get('priority', self.default_priority), - header=message_codec_params['id'], - topic=subscribe_topic, - order=message_codec_params['queue_order'], - maxsize=message_codec_params['queue_maxsize'], - allow_fragmentation=message_codec_params.get('allow_fragmentation', False), - message_codec=message_codec, + dest_address=params.destination, + priority=params.priority, + header=queue_id, + topic=params.subscribe_topic, + order=params.queue_order, + maxsize=params.queue_maxsize, + allow_fragmentation=params.allow_fragmentation, + # QUESTION: Why does the queue own the codec? + message_codec=self.acomms_configuration.message_codecs[params.codec_id], ) - self.topic_queue[subscribe_topic] = new_queue - rospy.Subscriber(subscribe_topic, + self.topic_queue[params.subscribe_topic] = new_queue + rospy.Subscriber(params.subscribe_topic, AnyMsg, - partial(self.on_incoming_message, topic_name=subscribe_topic)) + partial(self.on_incoming_message, topic_name=params.subscribe_topic)) # The packet codec initializer sets the packet codec on each message codec - packet_codec = packet_codec_class(message_codecs=message_codecs, + packet_codec = packet_codec_class(message_codecs=self.acomms_configuration.message_codecs, miniframe_header=packet_codec_details.get('miniframe_header', bytes()), dataframe_header=packet_codec_details.get('dataframe_header', bytes())) - rospy.loginfo("Added packet codec with {} message codecs".format(len(message_codecs))) + rospy.loginfo("Added packet codec with {} message codecs".format(len(self.acomms_configuration.message_codecs))) # Subscribe to the Neighbor Reachability updates (which may be published by more than one node rospy.Subscriber("neighbor_reachability", NeighborReachability, self.on_neighbor_reachability) -- GitLab From 7727d7197e29d452fbe680e4a6b69441872d2607 Mon Sep 17 00:00:00 2001 From: Laura Lindzey <lindzey@uw.edu> Date: Wed, 27 Apr 2022 01:19:01 -0700 Subject: [PATCH 2/7] Pull config parsing out into separate file --- src/config_parser.py | 65 ++++++++++++++++++++++++++++++ src/message_queue_node.py | 84 ++++++--------------------------------- 2 files changed, 77 insertions(+), 72 deletions(-) create mode 100644 src/config_parser.py diff --git a/src/config_parser.py b/src/config_parser.py new file mode 100644 index 00000000..22cb0a92 --- /dev/null +++ b/src/config_parser.py @@ -0,0 +1,65 @@ +from dataclasses import dataclass +from ltcodecs import RosMessageCodec +import roslib +from typing import Dict, NewType + +QueueId = NewType('QueueId', int) +MessageCodecId = NewType('MessageCodecId', int) + +@dataclass +class QueueParams: + subscribe_topic: str + publish_topic: str + codec_id: MessageCodecId + queue_order: str # TODO: I'd like this to be an enum corresponding to 'fifo', 'lifo' (rigbht now, it's "if 'fifo': / else:") + queue_maxsize: int + priority: int + is_active: bool + destination: int + allow_fragmentation: bool + + +def get_queue_params(packet_codec_param): + # Queues are indexed by their ID because that's what goes in the header + # to identify the appropriate publication parameters for packet dispatch + queue_params: Dict[QueueId, QueueParams] = {} + config_version = packet_codec_param.get('config_version') + if config_version is None: + # Use legacy, where we have a one-to-one mapping between queues and message codecs + # All relevation is in the message_codecs field + for message_codec_param in packet_codec_param['message_codecs']: + both_ids = message_codec_param["id"] + queue_params[both_ids] = QueueParams(message_codec_param["subscribe_topic"], + message_codec_param["publish_topic"], + both_ids, + message_codec_param["queue_order"], + message_codec_param["queue_maxsize"], + message_codec_param["priority"], # NB: current implementation MessageQueueNode has a default priority + message_codec_param.get("is_active", True), + message_codec_param["default_dest"], # NB: there's also a default destination at the MessageQueueNode level + message_codec_param.get("allow_fragmentation", False), + ) + else: + # Use new version that separates the queue and codec definitions. + raise(Exception("NYI")) + return queue_params + + +def get_message_codecs(packet_codec_param): + message_codecs: Dict[MessageCodecId, QueueParams] = {} + config_version = packet_codec_param.get('config_version') + if config_version is None: + # Use legacy, where we have a one-to-one mapping between queues and message codecs + # All relevation is in the message_codecs field + for message_codec_param in packet_codec_param['message_codecs']: + both_ids = message_codec_param["id"] + ros_type_name = message_codec_param['ros_type'] + msg_class = roslib.message.get_message_class(ros_type_name) + if not msg_class: + raise TypeError('Invalid ROS type "{}" for codec ID {}' + .format(ros_type_name, both_ids)) + message_codecs[both_ids] = RosMessageCodec(ros_type=msg_class, fields_dict=message_codec_param['fields']) + else: + # Use new version that separates the queue and codec definitions. + raise(Exception("NYI")) + return message_codecs \ No newline at end of file diff --git a/src/message_queue_node.py b/src/message_queue_node.py index 44745d88..6a5f203d 100755 --- a/src/message_queue_node.py +++ b/src/message_queue_node.py @@ -2,7 +2,7 @@ from __future__ import unicode_literals from itertools import groupby -from typing import Any, Deque, Dict, List, NewType, Optional, Tuple, Union +from typing import Any, Deque, List, Optional, Tuple, Union import rospy from rospy.msg import AnyMsg @@ -27,65 +27,7 @@ from version_node import version_node #TODO: move this from packet_dispatch_node import DecoderListEntry -QueueId = NewType('QueueId', int) -MessageCodecId = NewType('MessageCodecId', int) - -@dataclass -class QueueParams: - subscribe_topic: str - publish_topic: str - codec_id: MessageCodecId - queue_order: str # TODO: I'd like this to be an enum corresponding to 'fifo', 'lifo' (rigbht now, it's "if 'fifo': / else:") - queue_maxsize: int - priority: int - is_active: bool - destination: int - allow_fragmentation: bool - -class AcommsConfiguration(object): - """ - Includes all the relevant configuration info for a list of queues and the codecs that they use. - - Raises ValueError in case of an invalid configuration file. - """ - def __init__(self, packet_codec_param): - print(packet_codec_param) - # raise(Exception) - # TODO: Fill these in!!! (Or decide that there's no need for them...) - self.topic_from_id: Dict[QueueId, str] = {} - self.id_from_topic: Dict[str, QueueId] = {} - # Queues are indexed by their ID because that's what goes in the header - # to identify the appropriate publication parameters for packet dispatch - self.queue_params: Dict[QueueId, QueueParams] = {} - self.message_codecs: Dict[MessageCodecId, QueueParams] = {} - - config_version = packet_codec_param.get('config_version') - if config_version is None: - # Use legacy, where we have a one-to-one mapping between queues and message codecs - # All relevation is in the message_codecs field - for message_codec_param in packet_codec_param['message_codecs']: - both_ids = message_codec_param["id"] - self.queue_params[both_ids] = QueueParams(message_codec_param["subscribe_topic"], - message_codec_param["publish_topic"], - both_ids, - message_codec_param["queue_order"], - message_codec_param["queue_maxsize"], - message_codec_param["priority"], # NB: current implementation MessageQueueNode has a default priority - message_codec_param.get("is_active", True), - message_codec_param["default_dest"], # NB: there's also a default destination at the MessageQueueNode level - message_codec_param.get("allow_fragmentation", False), - ) - ros_type_name = message_codec_param['ros_type'] - msg_class = roslib.message.get_message_class(ros_type_name) - if not msg_class: - raise TypeError('Invalid ROS type "{}" for codec ID {}' - .format(ros_type_name, both_ids)) - self.message_codecs[both_ids] = RosMessageCodec(ros_type=msg_class, fields_dict=message_codec_param['fields']) - - else: - # Use new version that separates the queue and codec definitions. - raise(Exception("NYI")) - +from config_parser import QueueParams, QueueId, MessageCodecId, get_queue_params, get_message_codecs @dataclass @@ -347,18 +289,14 @@ class MessageQueueNode(object): # have non-overlapping destinations and subscribe topics. for packet_codec_details in packet_codec_param: rospy.loginfo("Packet codec details: {}".format(packet_codec_details)) - packet_codec_class = packet_codecs[packet_codec_details['packet_codec']] - - # This assumes that there's only one ... which seems to be an assumption that the rest of the code also makes? - # TODO: consider replacing with queue_params, message_codecs = parse_acomms_configuration(...) - self.acomms_configuration = AcommsConfiguration(packet_codec_param[0]) + packet_codec_class = packet_codecs[packet_codec_details['packet_codec']] # type: PacketCodec - for queue_id, params in self.acomms_configuration.queue_params.items(): + queue_params = get_queue_params(packet_codec_details) + message_codecs = get_message_codecs(packet_codec_details) - # QUESTION: this makes it a non-optional parameter, but later, it is accessed with a default value. + for queue_id, params in queue_params.items(): + # QUESTION: this made default_dest a non-optional parameter, but later, it is accessed with a default value. # Which do we want? Have the node's default propagate to the queue? Or not (and get rid of the node's)? - # default_dest = message_codec_params['default_dest'] - # self.destination_reachable[default_dest] = True if self.unknown_dests_are_reachable else False self.destination_reachable[params.destination] = self.unknown_dests_are_reachable if params.is_active: @@ -373,18 +311,20 @@ class MessageQueueNode(object): maxsize=params.queue_maxsize, allow_fragmentation=params.allow_fragmentation, # QUESTION: Why does the queue own the codec? - message_codec=self.acomms_configuration.message_codecs[params.codec_id], + message_codec=message_codecs[params.codec_id], ) self.topic_queue[params.subscribe_topic] = new_queue rospy.Subscriber(params.subscribe_topic, AnyMsg, partial(self.on_incoming_message, topic_name=params.subscribe_topic)) + # QUESTION(lindzey): It looks like the packet codec is never used? + # => AAAAH. This is called for its side effects. Maybe rename to make more clear? # The packet codec initializer sets the packet codec on each message codec - packet_codec = packet_codec_class(message_codecs=self.acomms_configuration.message_codecs, + packet_codec = packet_codec_class(message_codecs=message_codecs, miniframe_header=packet_codec_details.get('miniframe_header', bytes()), dataframe_header=packet_codec_details.get('dataframe_header', bytes())) - rospy.loginfo("Added packet codec with {} message codecs".format(len(self.acomms_configuration.message_codecs))) + rospy.loginfo("Added packet codec with {} message codecs".format(len(message_codecs))) # Subscribe to the Neighbor Reachability updates (which may be published by more than one node rospy.Subscriber("neighbor_reachability", NeighborReachability, self.on_neighbor_reachability) -- GitLab From ad54f73e042be3646b47e73b233f36aab2921002 Mon Sep 17 00:00:00 2001 From: Laura Lindzey <lindzey@uw.edu> Date: Wed, 27 Apr 2022 02:04:51 -0700 Subject: [PATCH 3/7] Packet dispatch now uses the config utility --- src/config_parser.py | 13 ++++++++++++- src/packet_dispatch_node.py | 39 ++++++++++++++++++------------------- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/src/config_parser.py b/src/config_parser.py index 22cb0a92..3245dd34 100644 --- a/src/config_parser.py +++ b/src/config_parser.py @@ -8,6 +8,7 @@ MessageCodecId = NewType('MessageCodecId', int) @dataclass class QueueParams: + msg_class: type subscribe_topic: str publish_topic: str codec_id: MessageCodecId @@ -29,7 +30,13 @@ def get_queue_params(packet_codec_param): # All relevation is in the message_codecs field for message_codec_param in packet_codec_param['message_codecs']: both_ids = message_codec_param["id"] - queue_params[both_ids] = QueueParams(message_codec_param["subscribe_topic"], + ros_type_name = message_codec_param['ros_type'] + msg_class = roslib.message.get_message_class(ros_type_name) + if not msg_class: + raise TypeError('Invalid ROS type "{}" for codec ID {}' + .format(ros_type_name, both_ids)) + queue_params[both_ids] = QueueParams(msg_class, + message_codec_param["subscribe_topic"], message_codec_param["publish_topic"], both_ids, message_codec_param["queue_order"], @@ -52,6 +59,10 @@ def get_message_codecs(packet_codec_param): # Use legacy, where we have a one-to-one mapping between queues and message codecs # All relevation is in the message_codecs field for message_codec_param in packet_codec_param['message_codecs']: + # NOTE(lindzey): The following TODOs were copied over from packet_dispatch_node + # TODO: allow custom codecs + # like this: MyClass = getattr(importlib.import_module("module.submodule"), "MyClass") + # TODO: make this work with orthogonal packet codecs both_ids = message_codec_param["id"] ros_type_name = message_codec_param['ros_type'] msg_class = roslib.message.get_message_class(ros_type_name) diff --git a/src/packet_dispatch_node.py b/src/packet_dispatch_node.py index 6e9706f4..b94c7904 100755 --- a/src/packet_dispatch_node.py +++ b/src/packet_dispatch_node.py @@ -13,6 +13,8 @@ from acomms_codecs.ros_packet_codec import RosPacketCodec from acomms_codecs.packet_codecs import packet_codecs from version_node import version_node +from config_parser import QueueParams, QueueId, MessageCodecId, get_queue_params, get_message_codecs + @dataclass class DecoderListEntry: #TODO: move header stuff into separate packetcodec class @@ -87,27 +89,23 @@ class PacketDispatchNode(object): packet_codec_param = rospy.get_param('~packet_codecs') self.decoder_list = [] for packet_codec in packet_codec_param: - message_codec_config = packet_codec.pop('message_codecs') + # QUESTION(lindzey): Storing the message codec dict as a member + # variable mirrors the logic that was here before, + # but seems to imply that there will only ever be a single + # packet codec in the config file. Is that true? + self.message_codecs = get_message_codecs(packet_codec) + self.queue_params = get_queue_params(packet_codec) + self.message_publishers = {} + for queue_id, params in self.queue_params.items(): + pub_name = rospy.names.canonicalize_name(params.publish_topic) + self.message_publishers[queue_id] = rospy.Publisher( + pub_name, params.msg_class, queue_size=10) + + packet_codec.pop('message_codecs') entry = DecoderListEntry(**packet_codec) self.decoder_list.append(entry) rospy.loginfo("Added decoder: {}".format(entry)) - self.message_codecs = {} - self.message_publishers = {} - for codec_config in message_codec_config: - codec_id = codec_config['id'] - ros_type_name = codec_config['ros_type'] - msg_class = roslib.message.get_message_class(ros_type_name) - if not msg_class: - raise TypeError('Invalid ROS type "{}" for message codec {}'.format(ros_type_name, codec_id)) - # TODO: allow custom codecs - # like this: MyClass = getattr(importlib.import_module("module.submodule"), "MyClass") - # TODO: make this work with orthogonal packet codecs - self.message_codecs[codec_id] = RosMessageCodec(msg_class, codec_config['fields']) - - pub_name = rospy.names.canonicalize_name(codec_config['publish_topic']) - self.message_publishers[codec_id] = rospy.Publisher(pub_name, msg_class, queue_size=10) - # replace name with instance of packet codec # ToDO make this less ugly and stupid entry.packet_codec = packet_codecs[entry.packet_codec](self.message_codecs, @@ -149,9 +147,10 @@ class PacketDispatchNode(object): # loop through until we are out of bits (ReadError will be raised) try: while True: - id = payload_bits.read('uint:8') - ros_msg = self.message_codecs[id].decode(payload_bits) - self.message_publishers[id].publish(ros_msg) + queue_id = payload_bits.read('uint:8') + codec_id = self.queue_params[queue_id].codec_id + ros_msg = self.message_codecs[codec_id].decode(payload_bits) + self.message_publishers[queue_id].publish(ros_msg) except KeyError: # We don't have a config entry that matches this ID rospy.loginfo("Unknown message ID ({}) received, unable to continue parsing packet".format(id)) -- GitLab From 836d0c0fa1bd1863fad445af22eee1217216f82f Mon Sep 17 00:00:00 2001 From: Laura Lindzey <lindzey@uw.edu> Date: Wed, 27 Apr 2022 02:17:27 -0700 Subject: [PATCH 4/7] (Very) Minor mypy cleanup in packet_dispatch_node --- src/packet_dispatch_node.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/packet_dispatch_node.py b/src/packet_dispatch_node.py index b94c7904..25a5a33a 100755 --- a/src/packet_dispatch_node.py +++ b/src/packet_dispatch_node.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -from typing import List, Tuple +from typing import Dict, List, Tuple import rospy import roslib.message @@ -21,14 +21,14 @@ class DecoderListEntry: codec_name: str miniframe_header: bytes dataframe_header: bytes - packet_codec: str + packet_codec: str # TODO: Mypy isn't happy about this -- I think it should be a PacketCodec match_src: List[int] = field(default_factory=list) match_dest: List[int] = field(default_factory=list) except_src: List[int] = field(default_factory=list) except_dest: List[int] = field(default_factory=list) remove_headers: bool = True - def does_packet_match(self, received_packet: ReceivedPacket): + def does_packet_match(self, received_packet: ReceivedPacket) -> bool: # We work in this order: # 1. SRC # 2. DEST @@ -64,7 +64,7 @@ class DecoderListEntry: return False return True - def strip_headers(self, received_packet: ReceivedPacket): + def strip_headers(self, received_packet: ReceivedPacket) -> Tuple[bytes, bytes]: if self.remove_headers: miniframe_bytes = received_packet.packet.miniframe_bytes[len(self.miniframe_header):] dataframe_bytes = received_packet.packet.dataframe_bytes[len(self.dataframe_header):] @@ -85,9 +85,8 @@ class PacketDispatchNode(object): except: rospy.logwarn("Unable to query version information") - packet_codec_param = rospy.get_param('~packet_codecs') - self.decoder_list = [] + self.decoder_list: List[DecoderListEntry] = [] for packet_codec in packet_codec_param: # QUESTION(lindzey): Storing the message codec dict as a member # variable mirrors the logic that was here before, @@ -95,7 +94,7 @@ class PacketDispatchNode(object): # packet codec in the config file. Is that true? self.message_codecs = get_message_codecs(packet_codec) self.queue_params = get_queue_params(packet_codec) - self.message_publishers = {} + self.message_publishers: Dict[QueueId, rospy.Publisher] = {} for queue_id, params in self.queue_params.items(): pub_name = rospy.names.canonicalize_name(params.publish_topic) self.message_publishers[queue_id] = rospy.Publisher( @@ -120,7 +119,7 @@ class PacketDispatchNode(object): rospy.loginfo("Message Dispatch started") rospy.spin() - def on_packet_rx(self, received_packet: ReceivedPacket): + def on_packet_rx(self, received_packet: ReceivedPacket) -> None: rospy.loginfo("Got incoming packet") # Decide what to do with the incoming packet, based on the match criteria matching_codecs = [d.packet_codec for d in self.decoder_list if d.does_packet_match(received_packet)] @@ -134,7 +133,10 @@ class PacketDispatchNode(object): except KeyError: rospy.logwarn("Unrecognized packet decoder: {}".format(d)) - def decode_ros_acomms_packet(self, received_packet: ReceivedPacket, stripped_data: Tuple[bytes, bytes]): + def decode_ros_acomms_packet(self, + received_packet: ReceivedPacket, + stripped_data: Tuple[bytes, bytes] + ) -> None: payload_bytes = stripped_data[0] + stripped_data[1] payload_bits = ConstBitStream(bytes=payload_bytes) -- GitLab From 6016971967c9d9ff8da1b935775df4ea8895dd60 Mon Sep 17 00:00:00 2001 From: Laura Lindzey <lindzey@uw.edu> Date: Wed, 27 Apr 2022 02:48:21 -0700 Subject: [PATCH 5/7] Move the config parser into a module --- setup.py | 2 +- src/codec_config_parser/__init__.py | 1 + src/{ => codec_config_parser}/config_parser.py | 0 src/message_queue_node.py | 2 +- src/packet_dispatch_node.py | 2 +- 5 files changed, 4 insertions(+), 3 deletions(-) create mode 100644 src/codec_config_parser/__init__.py rename src/{ => codec_config_parser}/config_parser.py (100%) diff --git a/setup.py b/setup.py index 41030556..60a33446 100755 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from distutils.core import setup from catkin_pkg.python_setup import generate_distutils_setup d = generate_distutils_setup( - py_modules=['acomms_codecs'], + py_modules=['acomms_codecs', 'codec_config_parser'], ) setup(**d) diff --git a/src/codec_config_parser/__init__.py b/src/codec_config_parser/__init__.py new file mode 100644 index 00000000..351cc68c --- /dev/null +++ b/src/codec_config_parser/__init__.py @@ -0,0 +1 @@ +from .config_parser import MessageCodecId, QueueId, QueueParams, get_queue_params, get_message_codecs \ No newline at end of file diff --git a/src/config_parser.py b/src/codec_config_parser/config_parser.py similarity index 100% rename from src/config_parser.py rename to src/codec_config_parser/config_parser.py diff --git a/src/message_queue_node.py b/src/message_queue_node.py index 6a5f203d..a699dbfc 100755 --- a/src/message_queue_node.py +++ b/src/message_queue_node.py @@ -27,7 +27,7 @@ from version_node import version_node #TODO: move this from packet_dispatch_node import DecoderListEntry -from config_parser import QueueParams, QueueId, MessageCodecId, get_queue_params, get_message_codecs +from codec_config_parser import QueueParams, QueueId, MessageCodecId, get_queue_params, get_message_codecs @dataclass diff --git a/src/packet_dispatch_node.py b/src/packet_dispatch_node.py index 25a5a33a..d604936a 100755 --- a/src/packet_dispatch_node.py +++ b/src/packet_dispatch_node.py @@ -13,7 +13,7 @@ from acomms_codecs.ros_packet_codec import RosPacketCodec from acomms_codecs.packet_codecs import packet_codecs from version_node import version_node -from config_parser import QueueParams, QueueId, MessageCodecId, get_queue_params, get_message_codecs +from codec_config_parser import QueueParams, QueueId, MessageCodecId, get_queue_params, get_message_codecs @dataclass class DecoderListEntry: -- GitLab From e49abb834f608f6092af1f850a5a764759c4defe Mon Sep 17 00:00:00 2001 From: Laura Lindzey <lindzey@uw.edu> Date: Wed, 27 Apr 2022 03:32:39 -0700 Subject: [PATCH 6/7] Fix editing fails in a few comments --- src/codec_config_parser/config_parser.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/codec_config_parser/config_parser.py b/src/codec_config_parser/config_parser.py index 3245dd34..d6fae8be 100644 --- a/src/codec_config_parser/config_parser.py +++ b/src/codec_config_parser/config_parser.py @@ -12,7 +12,7 @@ class QueueParams: subscribe_topic: str publish_topic: str codec_id: MessageCodecId - queue_order: str # TODO: I'd like this to be an enum corresponding to 'fifo', 'lifo' (rigbht now, it's "if 'fifo': / else:") + queue_order: str # TODO: I'd like this to be an enum corresponding to 'fifo', 'lifo' queue_maxsize: int priority: int is_active: bool @@ -27,7 +27,6 @@ def get_queue_params(packet_codec_param): config_version = packet_codec_param.get('config_version') if config_version is None: # Use legacy, where we have a one-to-one mapping between queues and message codecs - # All relevation is in the message_codecs field for message_codec_param in packet_codec_param['message_codecs']: both_ids = message_codec_param["id"] ros_type_name = message_codec_param['ros_type'] @@ -57,7 +56,6 @@ def get_message_codecs(packet_codec_param): config_version = packet_codec_param.get('config_version') if config_version is None: # Use legacy, where we have a one-to-one mapping between queues and message codecs - # All relevation is in the message_codecs field for message_codec_param in packet_codec_param['message_codecs']: # NOTE(lindzey): The following TODOs were copied over from packet_dispatch_node # TODO: allow custom codecs -- GitLab From 4b5144fef966cb9fd45d23e28934966f59bcca26 Mon Sep 17 00:00:00 2001 From: Laura Lindzey <lindzey@uw.edu> Date: Wed, 27 Apr 2022 23:01:59 -0700 Subject: [PATCH 7/7] fix install to work for other packages --- setup.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 60a33446..a1c86cbb 100755 --- a/setup.py +++ b/setup.py @@ -2,7 +2,9 @@ from distutils.core import setup from catkin_pkg.python_setup import generate_distutils_setup d = generate_distutils_setup( - py_modules=['acomms_codecs', 'codec_config_parser'], + py_modules=['acomms_codecs'], + packages=['codec_config_parser'], + package_dir={'': 'src'}, ) setup(**d) -- GitLab