From 89581e1084dbf177d76d86044ac75435f50e8f3b Mon Sep 17 00:00:00 2001
From: Laura Lindzey <lindzey@uw.edu>
Date: Tue, 26 Apr 2022 12:47:27 -0700
Subject: [PATCH 1/7] First cut at centralizing config file parsing

---
 src/message_queue_node.py | 125 ++++++++++++++++++++++++++++----------
 1 file changed, 92 insertions(+), 33 deletions(-)

diff --git a/src/message_queue_node.py b/src/message_queue_node.py
index 17c2e577..44745d88 100755
--- a/src/message_queue_node.py
+++ b/src/message_queue_node.py
@@ -2,7 +2,7 @@
 from __future__ import unicode_literals
 
 from itertools import groupby
-from typing import Any, Deque, List, Optional, Tuple, Union
+from typing import Any, Deque, Dict, List, NewType, Optional, Tuple, Union
 
 import rospy
 from rospy.msg import AnyMsg
@@ -27,6 +27,66 @@ from version_node import version_node
 #TODO: move this
 from packet_dispatch_node import DecoderListEntry
 
+QueueId = NewType('QueueId', int)
+MessageCodecId = NewType('MessageCodecId', int)
+
+@dataclass
+class QueueParams:
+    subscribe_topic: str
+    publish_topic: str
+    codec_id: MessageCodecId
+    queue_order: str  # TODO: I'd like this to be an enum corresponding to 'fifo', 'lifo' (rigbht now, it's "if 'fifo': / else:")
+    queue_maxsize: int
+    priority: int
+    is_active: bool
+    destination: int
+    allow_fragmentation: bool
+
+class AcommsConfiguration(object):
+    """
+    Includes all the relevant configuration info for a list of queues and the codecs that they use.
+
+    Raises ValueError in case of an invalid configuration file.
+    """
+    def __init__(self, packet_codec_param):
+        print(packet_codec_param)
+        # raise(Exception)
+        # TODO: Fill these in!!! (Or decide that there's no need for them...)
+        self.topic_from_id: Dict[QueueId, str] = {}
+        self.id_from_topic: Dict[str, QueueId] = {}
+        # Queues are indexed by their ID because that's what goes in the header
+        # to identify the appropriate publication parameters for packet dispatch
+        self.queue_params: Dict[QueueId, QueueParams] = {}
+        self.message_codecs: Dict[MessageCodecId, QueueParams] = {}
+
+        config_version = packet_codec_param.get('config_version')
+        if config_version is None:
+            # Use legacy, where we have a one-to-one mapping between queues and message codecs
+            # All relevation is in the message_codecs field
+            for message_codec_param in packet_codec_param['message_codecs']:
+                both_ids = message_codec_param["id"]
+                self.queue_params[both_ids] = QueueParams(message_codec_param["subscribe_topic"],
+                                                          message_codec_param["publish_topic"],
+                                                          both_ids,
+                                                          message_codec_param["queue_order"],
+                                                          message_codec_param["queue_maxsize"],
+                                                          message_codec_param["priority"], # NB: current implementation MessageQueueNode has a default priority
+                                                          message_codec_param.get("is_active", True),
+                                                          message_codec_param["default_dest"], # NB: there's also a default destination at the MessageQueueNode level
+                                                          message_codec_param.get("allow_fragmentation", False),
+                                                          )
+                ros_type_name = message_codec_param['ros_type']
+                msg_class = roslib.message.get_message_class(ros_type_name)
+                if not msg_class:
+                    raise TypeError('Invalid ROS type "{}" for codec ID {}'
+                                    .format(ros_type_name, both_ids))
+                self.message_codecs[both_ids] = RosMessageCodec(ros_type=msg_class, fields_dict=message_codec_param['fields'])
+
+        else:
+            # Use new version that separates the queue and codec definitions.
+            raise(Exception("NYI"))
+
+
 
 @dataclass
 class QueuedMessage:
@@ -242,7 +302,7 @@ class MessageQueueNode(object):
     def __init__(self):
         rospy.init_node('message_queue', log_level=rospy.DEBUG)
         self.update_rate = rospy.get_param('~update_rate', 1)
-        self.unknown_dests_are_reachable = rospy.get_param('~unknown_dests_are_reachable', True)
+        self.unknown_dests_are_reachable: bool = rospy.get_param('~unknown_dests_are_reachable', True)
         self.default_dest = rospy.get_param('~default_dest', 121)
         self.default_priority = rospy.get_param('~default_priority', 10)
 
@@ -280,52 +340,51 @@ class MessageQueueNode(object):
         # when queuing for TX, we go from topic -> message -> packet
         # However, organizing it this way allows us to use a single config YAML for both TX and RX
         packet_codec_param = rospy.get_param('~packet_codecs')
-        self.packet_codecs = {}
+
+        # QUESTION(lindzey): Do we ever have more than one packet codec? I'm
+        #   not convinced that the code in this block actually handles that case
+        #   appropriately, unless different packet codecs are guaranteed to
+        #   have non-overlapping destinations and subscribe topics.
         for packet_codec_details in packet_codec_param:
             rospy.loginfo("Packet codec details: {}".format(packet_codec_details))
             packet_codec_class = packet_codecs[packet_codec_details['packet_codec']]
 
-            message_codecs = {}
-            for message_codec_params in packet_codec_details['message_codecs']:
-                ros_type_name = message_codec_params['ros_type']
-                msg_class = roslib.message.get_message_class(ros_type_name)
-                codec_id = message_codec_params['id']
-                if not msg_class:
-                    raise TypeError('Invalid ROS type "{}" for message codec {}'.format(ros_type_name, codec_id))
-                message_codec = RosMessageCodec(ros_type=msg_class, fields_dict=message_codec_params['fields'])
-                message_codecs[codec_id] = message_codec
+            # This assumes that there's only one ... which seems to be an assumption that the rest of the code also makes?
+            # TODO: consider replacing with queue_params, message_codecs = parse_acomms_configuration(...)
+            self.acomms_configuration = AcommsConfiguration(packet_codec_param[0])
 
-                # TODO clean this up
-                self.destination_reachable[
-                    message_codec_params['default_dest']] = True if self.unknown_dests_are_reachable else False
+            for queue_id, params in self.acomms_configuration.queue_params.items():
 
-                subscribe_topic = message_codec_params['subscribe_topic']
-                queue_active_status = message_codec_params.get('is_active', True)
-                priority = message_codec_params['priority']
+                # QUESTION: this makes it a non-optional parameter, but later, it is accessed with a default value.
+                #    Which do we want? Have the node's default propagate to the queue? Or not (and get rid of the node's)?
+                # default_dest = message_codec_params['default_dest']
+                # self.destination_reachable[default_dest] = True if self.unknown_dests_are_reachable else False
+                self.destination_reachable[params.destination] = self.unknown_dests_are_reachable
 
-                if queue_active_status:
-                    self.active_queue_names.add(subscribe_topic)
+                if params.is_active:
+                    self.active_queue_names.add(params.subscribe_topic)
 
                 new_queue = MessageQueue(master_message_queue=self._queued_messages,
-                                         dest_address=message_codec_params.get('default_dest', self.default_dest),
-                                         priority=message_codec_params.get('priority', self.default_priority),
-                                         header=message_codec_params['id'],
-                                         topic=subscribe_topic,
-                                         order=message_codec_params['queue_order'],
-                                         maxsize=message_codec_params['queue_maxsize'],
-                                         allow_fragmentation=message_codec_params.get('allow_fragmentation', False),
-                                         message_codec=message_codec,
+                                         dest_address=params.destination,
+                                         priority=params.priority,
+                                         header=queue_id,
+                                         topic=params.subscribe_topic,
+                                         order=params.queue_order,
+                                         maxsize=params.queue_maxsize,
+                                         allow_fragmentation=params.allow_fragmentation,
+                                         # QUESTION: Why does the queue own the codec?
+                                         message_codec=self.acomms_configuration.message_codecs[params.codec_id],
                                          )
-                self.topic_queue[subscribe_topic] = new_queue
-                rospy.Subscriber(subscribe_topic,
+                self.topic_queue[params.subscribe_topic] = new_queue
+                rospy.Subscriber(params.subscribe_topic,
                                  AnyMsg,
-                                 partial(self.on_incoming_message, topic_name=subscribe_topic))
+                                 partial(self.on_incoming_message, topic_name=params.subscribe_topic))
 
             # The packet codec initializer sets the packet codec on each message codec
-            packet_codec = packet_codec_class(message_codecs=message_codecs,
+            packet_codec = packet_codec_class(message_codecs=self.acomms_configuration.message_codecs,
                                               miniframe_header=packet_codec_details.get('miniframe_header', bytes()),
                                               dataframe_header=packet_codec_details.get('dataframe_header', bytes()))
-            rospy.loginfo("Added packet codec with {} message codecs".format(len(message_codecs)))
+            rospy.loginfo("Added packet codec with {} message codecs".format(len(self.acomms_configuration.message_codecs)))
 
         # Subscribe to the Neighbor Reachability updates (which may be published by more than one node
         rospy.Subscriber("neighbor_reachability", NeighborReachability, self.on_neighbor_reachability)
-- 
GitLab


From 7727d7197e29d452fbe680e4a6b69441872d2607 Mon Sep 17 00:00:00 2001
From: Laura Lindzey <lindzey@uw.edu>
Date: Wed, 27 Apr 2022 01:19:01 -0700
Subject: [PATCH 2/7] Pull config parsing out into separate file

---
 src/config_parser.py      | 65 ++++++++++++++++++++++++++++++
 src/message_queue_node.py | 84 ++++++---------------------------------
 2 files changed, 77 insertions(+), 72 deletions(-)
 create mode 100644 src/config_parser.py

diff --git a/src/config_parser.py b/src/config_parser.py
new file mode 100644
index 00000000..22cb0a92
--- /dev/null
+++ b/src/config_parser.py
@@ -0,0 +1,65 @@
+from dataclasses import dataclass
+from ltcodecs import RosMessageCodec
+import roslib
+from typing import Dict, NewType
+
+QueueId = NewType('QueueId', int)
+MessageCodecId = NewType('MessageCodecId', int)
+
+@dataclass
+class QueueParams:
+    subscribe_topic: str
+    publish_topic: str
+    codec_id: MessageCodecId
+    queue_order: str  # TODO: I'd like this to be an enum corresponding to 'fifo', 'lifo' (rigbht now, it's "if 'fifo': / else:")
+    queue_maxsize: int
+    priority: int
+    is_active: bool
+    destination: int
+    allow_fragmentation: bool
+
+
+def get_queue_params(packet_codec_param):
+    # Queues are indexed by their ID because that's what goes in the header
+    # to identify the appropriate publication parameters for packet dispatch
+    queue_params: Dict[QueueId, QueueParams] = {}
+    config_version = packet_codec_param.get('config_version')
+    if config_version is None:
+        # Use legacy, where we have a one-to-one mapping between queues and message codecs
+        # All relevation is in the message_codecs field
+        for message_codec_param in packet_codec_param['message_codecs']:
+            both_ids = message_codec_param["id"]
+            queue_params[both_ids] = QueueParams(message_codec_param["subscribe_topic"],
+                                                 message_codec_param["publish_topic"],
+                                                 both_ids,
+                                                 message_codec_param["queue_order"],
+                                                 message_codec_param["queue_maxsize"],
+                                                 message_codec_param["priority"], # NB: current implementation MessageQueueNode has a default priority
+                                                 message_codec_param.get("is_active", True),
+                                                 message_codec_param["default_dest"], # NB: there's also a default destination at the MessageQueueNode level
+                                                 message_codec_param.get("allow_fragmentation", False),
+                                                )
+    else:
+        # Use new version that separates the queue and codec definitions.
+        raise(Exception("NYI"))
+    return queue_params
+
+
+def get_message_codecs(packet_codec_param):
+    message_codecs: Dict[MessageCodecId, QueueParams] = {}
+    config_version = packet_codec_param.get('config_version')
+    if config_version is None:
+        # Use legacy, where we have a one-to-one mapping between queues and message codecs
+        # All relevation is in the message_codecs field
+        for message_codec_param in packet_codec_param['message_codecs']:
+            both_ids = message_codec_param["id"]
+            ros_type_name = message_codec_param['ros_type']
+            msg_class = roslib.message.get_message_class(ros_type_name)
+            if not msg_class:
+                raise TypeError('Invalid ROS type "{}" for codec ID {}'
+                                .format(ros_type_name, both_ids))
+            message_codecs[both_ids] = RosMessageCodec(ros_type=msg_class, fields_dict=message_codec_param['fields'])
+    else:
+        # Use new version that separates the queue and codec definitions.
+        raise(Exception("NYI"))
+    return message_codecs
\ No newline at end of file
diff --git a/src/message_queue_node.py b/src/message_queue_node.py
index 44745d88..6a5f203d 100755
--- a/src/message_queue_node.py
+++ b/src/message_queue_node.py
@@ -2,7 +2,7 @@
 from __future__ import unicode_literals
 
 from itertools import groupby
-from typing import Any, Deque, Dict, List, NewType, Optional, Tuple, Union
+from typing import Any, Deque, List, Optional, Tuple, Union
 
 import rospy
 from rospy.msg import AnyMsg
@@ -27,65 +27,7 @@ from version_node import version_node
 #TODO: move this
 from packet_dispatch_node import DecoderListEntry
 
-QueueId = NewType('QueueId', int)
-MessageCodecId = NewType('MessageCodecId', int)
-
-@dataclass
-class QueueParams:
-    subscribe_topic: str
-    publish_topic: str
-    codec_id: MessageCodecId
-    queue_order: str  # TODO: I'd like this to be an enum corresponding to 'fifo', 'lifo' (rigbht now, it's "if 'fifo': / else:")
-    queue_maxsize: int
-    priority: int
-    is_active: bool
-    destination: int
-    allow_fragmentation: bool
-
-class AcommsConfiguration(object):
-    """
-    Includes all the relevant configuration info for a list of queues and the codecs that they use.
-
-    Raises ValueError in case of an invalid configuration file.
-    """
-    def __init__(self, packet_codec_param):
-        print(packet_codec_param)
-        # raise(Exception)
-        # TODO: Fill these in!!! (Or decide that there's no need for them...)
-        self.topic_from_id: Dict[QueueId, str] = {}
-        self.id_from_topic: Dict[str, QueueId] = {}
-        # Queues are indexed by their ID because that's what goes in the header
-        # to identify the appropriate publication parameters for packet dispatch
-        self.queue_params: Dict[QueueId, QueueParams] = {}
-        self.message_codecs: Dict[MessageCodecId, QueueParams] = {}
-
-        config_version = packet_codec_param.get('config_version')
-        if config_version is None:
-            # Use legacy, where we have a one-to-one mapping between queues and message codecs
-            # All relevation is in the message_codecs field
-            for message_codec_param in packet_codec_param['message_codecs']:
-                both_ids = message_codec_param["id"]
-                self.queue_params[both_ids] = QueueParams(message_codec_param["subscribe_topic"],
-                                                          message_codec_param["publish_topic"],
-                                                          both_ids,
-                                                          message_codec_param["queue_order"],
-                                                          message_codec_param["queue_maxsize"],
-                                                          message_codec_param["priority"], # NB: current implementation MessageQueueNode has a default priority
-                                                          message_codec_param.get("is_active", True),
-                                                          message_codec_param["default_dest"], # NB: there's also a default destination at the MessageQueueNode level
-                                                          message_codec_param.get("allow_fragmentation", False),
-                                                          )
-                ros_type_name = message_codec_param['ros_type']
-                msg_class = roslib.message.get_message_class(ros_type_name)
-                if not msg_class:
-                    raise TypeError('Invalid ROS type "{}" for codec ID {}'
-                                    .format(ros_type_name, both_ids))
-                self.message_codecs[both_ids] = RosMessageCodec(ros_type=msg_class, fields_dict=message_codec_param['fields'])
-
-        else:
-            # Use new version that separates the queue and codec definitions.
-            raise(Exception("NYI"))
-
+from config_parser import QueueParams, QueueId, MessageCodecId, get_queue_params, get_message_codecs
 
 
 @dataclass
@@ -347,18 +289,14 @@ class MessageQueueNode(object):
         #   have non-overlapping destinations and subscribe topics.
         for packet_codec_details in packet_codec_param:
             rospy.loginfo("Packet codec details: {}".format(packet_codec_details))
-            packet_codec_class = packet_codecs[packet_codec_details['packet_codec']]
-
-            # This assumes that there's only one ... which seems to be an assumption that the rest of the code also makes?
-            # TODO: consider replacing with queue_params, message_codecs = parse_acomms_configuration(...)
-            self.acomms_configuration = AcommsConfiguration(packet_codec_param[0])
+            packet_codec_class = packet_codecs[packet_codec_details['packet_codec']] # type: PacketCodec
 
-            for queue_id, params in self.acomms_configuration.queue_params.items():
+            queue_params = get_queue_params(packet_codec_details)
+            message_codecs = get_message_codecs(packet_codec_details)
 
-                # QUESTION: this makes it a non-optional parameter, but later, it is accessed with a default value.
+            for queue_id, params in queue_params.items():
+                # QUESTION: this made default_dest a non-optional parameter, but later, it is accessed with a default value.
                 #    Which do we want? Have the node's default propagate to the queue? Or not (and get rid of the node's)?
-                # default_dest = message_codec_params['default_dest']
-                # self.destination_reachable[default_dest] = True if self.unknown_dests_are_reachable else False
                 self.destination_reachable[params.destination] = self.unknown_dests_are_reachable
 
                 if params.is_active:
@@ -373,18 +311,20 @@ class MessageQueueNode(object):
                                          maxsize=params.queue_maxsize,
                                          allow_fragmentation=params.allow_fragmentation,
                                          # QUESTION: Why does the queue own the codec?
-                                         message_codec=self.acomms_configuration.message_codecs[params.codec_id],
+                                         message_codec=message_codecs[params.codec_id],
                                          )
                 self.topic_queue[params.subscribe_topic] = new_queue
                 rospy.Subscriber(params.subscribe_topic,
                                  AnyMsg,
                                  partial(self.on_incoming_message, topic_name=params.subscribe_topic))
 
+            # QUESTION(lindzey): It looks like the packet codec is never used?
+            #   => AAAAH. This is called for its side effects. Maybe rename to make more clear?
             # The packet codec initializer sets the packet codec on each message codec
-            packet_codec = packet_codec_class(message_codecs=self.acomms_configuration.message_codecs,
+            packet_codec = packet_codec_class(message_codecs=message_codecs,
                                               miniframe_header=packet_codec_details.get('miniframe_header', bytes()),
                                               dataframe_header=packet_codec_details.get('dataframe_header', bytes()))
-            rospy.loginfo("Added packet codec with {} message codecs".format(len(self.acomms_configuration.message_codecs)))
+            rospy.loginfo("Added packet codec with {} message codecs".format(len(message_codecs)))
 
         # Subscribe to the Neighbor Reachability updates (which may be published by more than one node
         rospy.Subscriber("neighbor_reachability", NeighborReachability, self.on_neighbor_reachability)
-- 
GitLab


From ad54f73e042be3646b47e73b233f36aab2921002 Mon Sep 17 00:00:00 2001
From: Laura Lindzey <lindzey@uw.edu>
Date: Wed, 27 Apr 2022 02:04:51 -0700
Subject: [PATCH 3/7] Packet dispatch now uses the config utility

---
 src/config_parser.py        | 13 ++++++++++++-
 src/packet_dispatch_node.py | 39 ++++++++++++++++++-------------------
 2 files changed, 31 insertions(+), 21 deletions(-)

diff --git a/src/config_parser.py b/src/config_parser.py
index 22cb0a92..3245dd34 100644
--- a/src/config_parser.py
+++ b/src/config_parser.py
@@ -8,6 +8,7 @@ MessageCodecId = NewType('MessageCodecId', int)
 
 @dataclass
 class QueueParams:
+    msg_class: type
     subscribe_topic: str
     publish_topic: str
     codec_id: MessageCodecId
@@ -29,7 +30,13 @@ def get_queue_params(packet_codec_param):
         # All relevation is in the message_codecs field
         for message_codec_param in packet_codec_param['message_codecs']:
             both_ids = message_codec_param["id"]
-            queue_params[both_ids] = QueueParams(message_codec_param["subscribe_topic"],
+            ros_type_name = message_codec_param['ros_type']
+            msg_class = roslib.message.get_message_class(ros_type_name)
+            if not msg_class:
+                raise TypeError('Invalid ROS type "{}" for codec ID {}'
+                                .format(ros_type_name, both_ids))
+            queue_params[both_ids] = QueueParams(msg_class,
+                                                 message_codec_param["subscribe_topic"],
                                                  message_codec_param["publish_topic"],
                                                  both_ids,
                                                  message_codec_param["queue_order"],
@@ -52,6 +59,10 @@ def get_message_codecs(packet_codec_param):
         # Use legacy, where we have a one-to-one mapping between queues and message codecs
         # All relevation is in the message_codecs field
         for message_codec_param in packet_codec_param['message_codecs']:
+            # NOTE(lindzey): The following TODOs were copied over from packet_dispatch_node
+            # TODO: allow custom codecs
+            # like this: MyClass = getattr(importlib.import_module("module.submodule"), "MyClass")
+            # TODO: make this work with orthogonal packet codecs
             both_ids = message_codec_param["id"]
             ros_type_name = message_codec_param['ros_type']
             msg_class = roslib.message.get_message_class(ros_type_name)
diff --git a/src/packet_dispatch_node.py b/src/packet_dispatch_node.py
index 6e9706f4..b94c7904 100755
--- a/src/packet_dispatch_node.py
+++ b/src/packet_dispatch_node.py
@@ -13,6 +13,8 @@ from acomms_codecs.ros_packet_codec import RosPacketCodec
 from acomms_codecs.packet_codecs import packet_codecs
 
 from version_node import version_node
+from config_parser import QueueParams, QueueId, MessageCodecId, get_queue_params, get_message_codecs
+
 @dataclass
 class DecoderListEntry:
     #TODO: move header stuff into separate packetcodec class
@@ -87,27 +89,23 @@ class PacketDispatchNode(object):
         packet_codec_param = rospy.get_param('~packet_codecs')
         self.decoder_list = []
         for packet_codec in packet_codec_param:
-            message_codec_config = packet_codec.pop('message_codecs')
+            # QUESTION(lindzey): Storing the message codec dict as a member
+            #    variable mirrors the logic that was here before,
+            #    but seems to imply that there will only ever be a single
+            #    packet codec in the config file. Is that true?
+            self.message_codecs = get_message_codecs(packet_codec)
+            self.queue_params = get_queue_params(packet_codec)
+            self.message_publishers = {}
+            for queue_id, params in self.queue_params.items():
+                pub_name = rospy.names.canonicalize_name(params.publish_topic)
+                self.message_publishers[queue_id] = rospy.Publisher(
+                    pub_name, params.msg_class, queue_size=10)
+
+            packet_codec.pop('message_codecs')
             entry = DecoderListEntry(**packet_codec)
             self.decoder_list.append(entry)
             rospy.loginfo("Added decoder: {}".format(entry))
 
-            self.message_codecs = {}
-            self.message_publishers = {}
-            for codec_config in message_codec_config:
-                codec_id = codec_config['id']
-                ros_type_name = codec_config['ros_type']
-                msg_class = roslib.message.get_message_class(ros_type_name)
-                if not msg_class:
-                    raise TypeError('Invalid ROS type "{}" for message codec {}'.format(ros_type_name, codec_id))
-                # TODO: allow custom codecs
-                # like this: MyClass = getattr(importlib.import_module("module.submodule"), "MyClass")
-                # TODO: make this work with orthogonal packet codecs
-                self.message_codecs[codec_id] = RosMessageCodec(msg_class, codec_config['fields'])
-
-                pub_name = rospy.names.canonicalize_name(codec_config['publish_topic'])
-                self.message_publishers[codec_id] = rospy.Publisher(pub_name, msg_class, queue_size=10)
-
             # replace name with instance of packet codec
             # ToDO make this less ugly and stupid
             entry.packet_codec = packet_codecs[entry.packet_codec](self.message_codecs,
@@ -149,9 +147,10 @@ class PacketDispatchNode(object):
         # loop through until we are out of bits (ReadError will be raised)
         try:
             while True:
-                id = payload_bits.read('uint:8')
-                ros_msg = self.message_codecs[id].decode(payload_bits)
-                self.message_publishers[id].publish(ros_msg)
+                queue_id = payload_bits.read('uint:8')
+                codec_id = self.queue_params[queue_id].codec_id
+                ros_msg = self.message_codecs[codec_id].decode(payload_bits)
+                self.message_publishers[queue_id].publish(ros_msg)
         except KeyError:
             # We don't have a config entry that matches this ID
             rospy.loginfo("Unknown message ID ({}) received, unable to continue parsing packet".format(id))
-- 
GitLab


From 836d0c0fa1bd1863fad445af22eee1217216f82f Mon Sep 17 00:00:00 2001
From: Laura Lindzey <lindzey@uw.edu>
Date: Wed, 27 Apr 2022 02:17:27 -0700
Subject: [PATCH 4/7] (Very) Minor mypy cleanup in packet_dispatch_node

---
 src/packet_dispatch_node.py | 20 +++++++++++---------
 1 file changed, 11 insertions(+), 9 deletions(-)

diff --git a/src/packet_dispatch_node.py b/src/packet_dispatch_node.py
index b94c7904..25a5a33a 100755
--- a/src/packet_dispatch_node.py
+++ b/src/packet_dispatch_node.py
@@ -1,6 +1,6 @@
 #!/usr/bin/env python3
 
-from typing import List, Tuple
+from typing import Dict, List, Tuple
 
 import rospy
 import roslib.message
@@ -21,14 +21,14 @@ class DecoderListEntry:
     codec_name: str
     miniframe_header: bytes
     dataframe_header: bytes
-    packet_codec: str
+    packet_codec: str  # TODO: Mypy isn't happy about this -- I think it should be a PacketCodec
     match_src: List[int] = field(default_factory=list)
     match_dest: List[int] = field(default_factory=list)
     except_src: List[int] = field(default_factory=list)
     except_dest: List[int] = field(default_factory=list)
     remove_headers: bool = True
 
-    def does_packet_match(self, received_packet: ReceivedPacket):
+    def does_packet_match(self, received_packet: ReceivedPacket) -> bool:
         # We work in this order:
         # 1. SRC
         # 2. DEST
@@ -64,7 +64,7 @@ class DecoderListEntry:
                 return False
         return True
 
-    def strip_headers(self, received_packet: ReceivedPacket):
+    def strip_headers(self, received_packet: ReceivedPacket) -> Tuple[bytes, bytes]:
         if self.remove_headers:
             miniframe_bytes = received_packet.packet.miniframe_bytes[len(self.miniframe_header):]
             dataframe_bytes = received_packet.packet.dataframe_bytes[len(self.dataframe_header):]
@@ -85,9 +85,8 @@ class PacketDispatchNode(object):
         except:
             rospy.logwarn("Unable to query version information")
 
-
         packet_codec_param = rospy.get_param('~packet_codecs')
-        self.decoder_list = []
+        self.decoder_list: List[DecoderListEntry] = []
         for packet_codec in packet_codec_param:
             # QUESTION(lindzey): Storing the message codec dict as a member
             #    variable mirrors the logic that was here before,
@@ -95,7 +94,7 @@ class PacketDispatchNode(object):
             #    packet codec in the config file. Is that true?
             self.message_codecs = get_message_codecs(packet_codec)
             self.queue_params = get_queue_params(packet_codec)
-            self.message_publishers = {}
+            self.message_publishers: Dict[QueueId, rospy.Publisher] = {}
             for queue_id, params in self.queue_params.items():
                 pub_name = rospy.names.canonicalize_name(params.publish_topic)
                 self.message_publishers[queue_id] = rospy.Publisher(
@@ -120,7 +119,7 @@ class PacketDispatchNode(object):
         rospy.loginfo("Message Dispatch started")
         rospy.spin()
 
-    def on_packet_rx(self, received_packet: ReceivedPacket):
+    def on_packet_rx(self, received_packet: ReceivedPacket) -> None:
         rospy.loginfo("Got incoming packet")
         # Decide what to do with the incoming packet, based on the match criteria
         matching_codecs = [d.packet_codec for d in self.decoder_list if d.does_packet_match(received_packet)]
@@ -134,7 +133,10 @@ class PacketDispatchNode(object):
             except KeyError:
                 rospy.logwarn("Unrecognized packet decoder: {}".format(d))
 
-    def decode_ros_acomms_packet(self, received_packet: ReceivedPacket, stripped_data: Tuple[bytes, bytes]):
+    def decode_ros_acomms_packet(self,
+                                 received_packet: ReceivedPacket,
+                                 stripped_data: Tuple[bytes, bytes]
+                                ) -> None:
         payload_bytes = stripped_data[0] + stripped_data[1]
         payload_bits = ConstBitStream(bytes=payload_bytes)
 
-- 
GitLab


From 6016971967c9d9ff8da1b935775df4ea8895dd60 Mon Sep 17 00:00:00 2001
From: Laura Lindzey <lindzey@uw.edu>
Date: Wed, 27 Apr 2022 02:48:21 -0700
Subject: [PATCH 5/7] Move the config parser into a module

---
 setup.py                                       | 2 +-
 src/codec_config_parser/__init__.py            | 1 +
 src/{ => codec_config_parser}/config_parser.py | 0
 src/message_queue_node.py                      | 2 +-
 src/packet_dispatch_node.py                    | 2 +-
 5 files changed, 4 insertions(+), 3 deletions(-)
 create mode 100644 src/codec_config_parser/__init__.py
 rename src/{ => codec_config_parser}/config_parser.py (100%)

diff --git a/setup.py b/setup.py
index 41030556..60a33446 100755
--- a/setup.py
+++ b/setup.py
@@ -2,7 +2,7 @@ from distutils.core import setup
 from catkin_pkg.python_setup import generate_distutils_setup
 
 d = generate_distutils_setup(
-    py_modules=['acomms_codecs'],
+    py_modules=['acomms_codecs', 'codec_config_parser'],
 )
 
 setup(**d)
diff --git a/src/codec_config_parser/__init__.py b/src/codec_config_parser/__init__.py
new file mode 100644
index 00000000..351cc68c
--- /dev/null
+++ b/src/codec_config_parser/__init__.py
@@ -0,0 +1 @@
+from .config_parser import MessageCodecId, QueueId, QueueParams, get_queue_params, get_message_codecs
\ No newline at end of file
diff --git a/src/config_parser.py b/src/codec_config_parser/config_parser.py
similarity index 100%
rename from src/config_parser.py
rename to src/codec_config_parser/config_parser.py
diff --git a/src/message_queue_node.py b/src/message_queue_node.py
index 6a5f203d..a699dbfc 100755
--- a/src/message_queue_node.py
+++ b/src/message_queue_node.py
@@ -27,7 +27,7 @@ from version_node import version_node
 #TODO: move this
 from packet_dispatch_node import DecoderListEntry
 
-from config_parser import QueueParams, QueueId, MessageCodecId, get_queue_params, get_message_codecs
+from codec_config_parser import QueueParams, QueueId, MessageCodecId, get_queue_params, get_message_codecs
 
 
 @dataclass
diff --git a/src/packet_dispatch_node.py b/src/packet_dispatch_node.py
index 25a5a33a..d604936a 100755
--- a/src/packet_dispatch_node.py
+++ b/src/packet_dispatch_node.py
@@ -13,7 +13,7 @@ from acomms_codecs.ros_packet_codec import RosPacketCodec
 from acomms_codecs.packet_codecs import packet_codecs
 
 from version_node import version_node
-from config_parser import QueueParams, QueueId, MessageCodecId, get_queue_params, get_message_codecs
+from codec_config_parser import QueueParams, QueueId, MessageCodecId, get_queue_params, get_message_codecs
 
 @dataclass
 class DecoderListEntry:
-- 
GitLab


From e49abb834f608f6092af1f850a5a764759c4defe Mon Sep 17 00:00:00 2001
From: Laura Lindzey <lindzey@uw.edu>
Date: Wed, 27 Apr 2022 03:32:39 -0700
Subject: [PATCH 6/7] Fix editing fails in a few comments

---
 src/codec_config_parser/config_parser.py | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/src/codec_config_parser/config_parser.py b/src/codec_config_parser/config_parser.py
index 3245dd34..d6fae8be 100644
--- a/src/codec_config_parser/config_parser.py
+++ b/src/codec_config_parser/config_parser.py
@@ -12,7 +12,7 @@ class QueueParams:
     subscribe_topic: str
     publish_topic: str
     codec_id: MessageCodecId
-    queue_order: str  # TODO: I'd like this to be an enum corresponding to 'fifo', 'lifo' (rigbht now, it's "if 'fifo': / else:")
+    queue_order: str  # TODO: I'd like this to be an enum corresponding to 'fifo', 'lifo'
     queue_maxsize: int
     priority: int
     is_active: bool
@@ -27,7 +27,6 @@ def get_queue_params(packet_codec_param):
     config_version = packet_codec_param.get('config_version')
     if config_version is None:
         # Use legacy, where we have a one-to-one mapping between queues and message codecs
-        # All relevation is in the message_codecs field
         for message_codec_param in packet_codec_param['message_codecs']:
             both_ids = message_codec_param["id"]
             ros_type_name = message_codec_param['ros_type']
@@ -57,7 +56,6 @@ def get_message_codecs(packet_codec_param):
     config_version = packet_codec_param.get('config_version')
     if config_version is None:
         # Use legacy, where we have a one-to-one mapping between queues and message codecs
-        # All relevation is in the message_codecs field
         for message_codec_param in packet_codec_param['message_codecs']:
             # NOTE(lindzey): The following TODOs were copied over from packet_dispatch_node
             # TODO: allow custom codecs
-- 
GitLab


From 4b5144fef966cb9fd45d23e28934966f59bcca26 Mon Sep 17 00:00:00 2001
From: Laura Lindzey <lindzey@uw.edu>
Date: Wed, 27 Apr 2022 23:01:59 -0700
Subject: [PATCH 7/7] fix install to work for other packages

---
 setup.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/setup.py b/setup.py
index 60a33446..a1c86cbb 100755
--- a/setup.py
+++ b/setup.py
@@ -2,7 +2,9 @@ from distutils.core import setup
 from catkin_pkg.python_setup import generate_distutils_setup
 
 d = generate_distutils_setup(
-    py_modules=['acomms_codecs', 'codec_config_parser'],
+    py_modules=['acomms_codecs'],
+    packages=['codec_config_parser'],
+    package_dir={'': 'src'},
 )
 
 setup(**d)
-- 
GitLab