From ef9a328da25cb3e33633c9b7e5166281699b6689 Mon Sep 17 00:00:00 2001 From: Eric Gallimore <egallimore@whoi.edu> Date: Mon, 3 Jun 2024 12:49:49 -0700 Subject: [PATCH 01/10] WIP: Add release gateway --- CMakeLists.txt | 1 + msg/AddressedReleaseCommand.msg | 5 + src/release_gateway_node.py | 315 ++++++++++++++++++++++++++++++++ 3 files changed, 321 insertions(+) create mode 100644 msg/AddressedReleaseCommand.msg create mode 100755 src/release_gateway_node.py diff --git a/CMakeLists.txt b/CMakeLists.txt index 4f19803..f8847e6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -51,6 +51,7 @@ find_package(catkin REQUIRED COMPONENTS add_message_files(FILES AddressedCclMessage.msg AddressedHexCclMessage.msg + AddressedReleaseCommand.msg BasicVehicleCommand.msg ) diff --git a/msg/AddressedReleaseCommand.msg b/msg/AddressedReleaseCommand.msg new file mode 100644 index 0000000..19c488d --- /dev/null +++ b/msg/AddressedReleaseCommand.msg @@ -0,0 +1,5 @@ +Header header + +ros_acomms_net_msgs/NetHeader net_header + +int16 release_id \ No newline at end of file diff --git a/src/release_gateway_node.py b/src/release_gateway_node.py new file mode 100755 index 0000000..64b86c9 --- /dev/null +++ b/src/release_gateway_node.py @@ -0,0 +1,315 @@ +#!/usr/bin/env python3 +from functools import partial +import sys +from dataclasses import dataclass + +import rospy +from std_msgs.msg import Header, String +from diagnostic_msgs.msg import DiagnosticStatus, KeyValue +from ros_acomms_net_msgs.msg import AppResponseHeader, GenericAppResponse +from ros_acomms_uuv.msg import AddressedReleaseCommand +from ros_acomms_msgs.msg import UpdateQueuedMessage + +from threading import Event +from collections import deque + +@dataclass +class ReleaseCmdQueueEntry: + namespace: str + release_id: int + gateway_id: int + tries_remaining: int + pending_msg: AddressedReleaseCommand + waiting_for_ack: Event + + +class ReleaseGatewayNode: + def __init__(self): + rospy.init_node('release_gateway') + rospy.loginfo("Starting Release Gateway Node...") + + self.src_id = int(rospy.get_param('~src')) + self.num_release_tries = rospy.get_param('~num_release_tries', 5) + self.release_timeout_sec = rospy.get_param('~release_timeout_sec', 5) + self.manual_transmit_topic = rospy.get_param('~manual_transmit_topic', 'tdma/nmea_to_modem') + self.manual_transmit_txd_topic = rospy.get_param('~manual_transmit_txd_topic', 'nmea_to_modem') + self.manual_transmit_status_topic = rospy.get_param('~manual_transmit_status_topic', 'tdma/manual_transmit_status') + + # topic to publish nmea. Tdma will handle and send when active + self.manual_transmit_publisher = rospy.Publisher(self.manual_transmit_topic, String, queue_size=5) + # when set, we cache new requests until we are done (or timed out) + self.release_transaction_in_progress = Event() + self.release_transaction_in_progress.clear() + self.current_release_pending_transmit = Event() + self.current_release_pending_transmit.clear() + self.current_release_id = None + + self.release_ns_lookup = {} + self.release_cmd_queues = {} + release_namespaces = rospy.get_param('~release_namespaces') + try: + for element in release_namespaces: + # release_namespaces: + # - ['/releases/release_a', 91] # we are the gateway for release_a + # - ['/releases/release_b', 92, 40] # 40 is the gateway for release_b + ns = element[0] + release_id = int(element[1]) + # by default, we are the gateway if none is passed + gateway_id = self.src_id + if len(element) == 3: + gateway_id = int(element[2]) + + self.release_ns_lookup[release_id] = ns + self.release_cmd_queues[ns] = ReleaseCmdQueueEntry(namespace=ns, + release_id=release_id, + gateway_id=gateway_id, + tries_remaining=0, + pending_msg=None, + waiting_for_ack=Event()) + except Exception as e: + rospy.logfatal(f'Unable to process release_namespaces parameter: {e}') + sys.exit() + if len(self.release_cmd_queues) <= 0: + rospy.logfatal(f'No vehicle namespaces were specified, unable to do anything') + sys.exit() + + self.response_publisher = rospy.Publisher('release_command_response', GenericAppResponse, queue_size=10) + + # inbound subscriber (from network) + rospy.Subscriber('/release_cmd_inbound', AddressedReleaseCommand, self.on_release_cmd_inbound) + # outbound publisher (to network) + self.release_cmd_publisher = rospy.Publisher('/release_cmd_outbound', AddressedReleaseCommand, queue_size=5, latch=True) + + # Set up message subscribers + for ns, queue_entry in self.release_cmd_queues.items(): + binary_topic_name = rospy.names.canonicalize_name(ns + '/release_command') + rospy.Subscriber(binary_topic_name, AddressedReleaseCommand, partial(self.on_release_cmd_to_send, ns=ns)) + # queue_entry.waiting_for_ack is a Threading.Event type. + # when queue_entry.waiting_for_ack.is_set(), we have transmitted our pending_msg and are waiting for an ack + queue_entry.waiting_for_ack.clear() + + rospy.Subscriber('update_release_cmd_queue', UpdateQueuedMessage, self.on_update_queue) + + # use this topic for capturing acks to release commands we sent + rospy.Subscriber('nmea_from_modem', String, self.on_nmea_from_modem) + # topic for updates on the manual queue (happens on insert and transmit) + rospy.Subscriber(self.manual_transmit_status_topic, DiagnosticStatus, self.on_manual_transmit_status) + + def on_manual_transmit_status(self, msg): + if 'on_transmit' in msg.name: + if self.release_transaction_in_progress.is_set(): + # this may be the msg in progress going out + update_msg = UpdateQueuedMessage(message_id=self.current_release_id, event=1) + update_msg.header.stamp = rospy.get_time() + self.on_update_queue(update_msg=update_msg, application_state=AppResponseHeader.APPLICATION_STATE_EXECUTING) + + rospy.logdebug(f'release_gateway: on_transmit!') + self.current_release_pending_transmit.clear() + + def pending_msg_to_nmea(self, pending_msg: AddressedReleaseCommand): + dest = pending_msg.release_id + command_string = f"$CCCMD,RLS,{dest},0,0,0," + return command_string + + def on_endpoint_release_cmd(self, queue_entry): + if queue_entry.tries_remaining >= 1: + # make sure we have atleast 1 try left + self.current_release_id = queue_entry.release_id + self.release_transaction_in_progress.set() + self.current_release_pending_transmit.set() + queue_entry.waiting_for_ack.set() + + nmea_msg = self.pending_msg_to_nmea(queue_entry.pending_msg) + self.manual_transmit_publisher.publish(data=nmea_msg) + + queue_entry.tries_remaining -= 1 + update_msg = UpdateQueuedMessage(message_id=queue_entry.release_id, event=1) + update_msg.header.stamp = rospy.get_time() + self.on_update_queue(update_msg=update_msg, application_state=AppResponseHeader.APPLICATION_STATE_QUEUED) + else: + rospy.logerr(f'ERROR: on_endpoint_release_cmd called on inactive queue: {queue_entry}') + + def on_nmea_from_modem(self, msg: String): + if '$CACMR,RLR,' not in msg.data: + return + + # Release ACK: $CACMR,RLR,isodate,SRC,DEST,status,SNR_in,SNR_out,batt_stat,position_fields*CS + tokens = msg.data.split(',') + timestamp_string = tokens[2] + src = int(tokens[3]) + dest = int(tokens[4]) + status = int(tokens[5]) + + rospy.loginfo(f"Heard release ACK from src {src} dest {dest}.") + # Make sure that this ack is for us + if dest != self.src_id: + rospy.loginfo(f"Overheard release ACK message for a different modem (src {src} dest {dest}), ignoring it.") + return + + matching_queue = list(filter(lambda q: q.release_id == src, self.release_cmd_queues.values())) + if len(matching_queue) == 0: + rospy.loginfo(f"Got ACK message from {src}, but we aren't sending release commands to this address.") + return + + if len(matching_queue) > 1: + rospy.logerr(f"Invalid mapping of release destinations. Ensure that each destination is unique in the configuration.") + return + + this_queue: ReleaseCmdQueueEntry = matching_queue[0] + + # Update network that a pending command has been ackd + if self.release_transaction_in_progress.is_set(): + update_msg = UpdateQueuedMessage(message_id=self.current_release_id, event=2) + update_msg.header.stamp = rospy.get_time() + self.on_update_queue(update_msg=update_msg, application_state=AppResponseHeader.APPLICATION_STATE_SUCCEEDED) + + # # OK, mark it as acked + self.clear_current_transaction() + + # and update the status + this_queue.tries_remaining = 0 + this_queue.pending_msg = None + this_queue.waiting_for_ack.clear() + + return + + def on_release_cmd_to_send(self, msg: AddressedReleaseCommand, ns: str) -> None: + # We treat -1 as a special case where we look up the dest based on the topic + if msg.release_id == -1: + msg.release_id = self.release_cmd_queues[ns].release_id + + # We treat -1 as not being set, in this case, we get the endpoint from our lookup (release_cmd_queues) + if msg.net_header.net_destination == -1: + # modem with gateway_id is the last hop for this msg. the gateway is responsible to tracking [this] msg + # .. through the link layer while giving updates to the network layer via GenericAppResponse + msg.net_header.net_destination = self.release_cmd_queues[ns].gateway_id + + gateway_id = msg.net_header.net_destination + rospy.loginfo(f'Got a release command for dest {msg.release_id} via {gateway_id}, publishing on release_cmd_outbound') + self.release_cmd_publisher.publish(msg) + + def on_release_cmd_inbound(self, msg: AddressedReleaseCommand) -> None: + gateway_id = msg.net_header.net_destination + if gateway_id != self.src_id: + rospy.logwarn(f'WARNING: Not the gateway for this release command! {msg}') + return + else: + rospy.loginfo(f'We are the gateway for this release command: {msg}') + # is this command stomping on one in progress for this release? + ns = self.release_ns_lookup[msg.release_id] + + if self.release_cmd_queues[ns].waiting_for_ack.is_set(): + rospy.logwarn(f"WARNING: Release command for dest {msg.release_id} recvd while waiting_for_ack on pending_msg: {self.release_cmd_queues[ns].pending_msg}. Ignoring new msg") + return + + # set this here so we known we shouldn't start countdown to timeout yet. + # when the nmea is transmitted, we clear this and start our timeout countdown + self.current_release_pending_transmit.set() + # We are going to send the packet ourselves (and wait for the ack ourselves). + self.release_cmd_queues[ns].pending_msg = msg + # -1 signals that this queue has not been sent yet + self.release_cmd_queues[ns].tries_remaining = -1 + + if self.release_transaction_in_progress.is_set(): + rospy.logwarn(f"WARNING: release command for dest {msg.release_id}, recvd while release_transaction_in_progress. Caching request") + else: + rospy.logwarn(f"Queued release command for dest {msg.release_id} via {gateway_id}" + " (myself) " if gateway_id == self.src_id else "") + + def on_update_queue(self, update_msg: UpdateQueuedMessage, application_state: int) -> None: + # We track only one message per destination, so use that as the ID. + matching_queue = list(filter(lambda q: q.release_id == update_msg.message_id, self.release_cmd_queues.values())) + if len(matching_queue) == 0: + rospy.logerr(f"Invalid message id / dest id.") + return + + this_queue = matching_queue[0] + if update_msg.event != UpdateQueuedMessage.EVENT_LINK_QUEUED: + self.current_release_pending_transmit.clear() + self.send_response(this_queue, application_state) + + def clear_current_transaction(self): + {queue_entry.waiting_for_ack.clear() for queue_entry in self.release_cmd_queues.values()} + self.release_transaction_in_progress.clear() + self.current_release_pending_transmit.clear() + self.current_release_id = None + + def send_response(self, queue_entry: ReleaseCmdQueueEntry, new_state) -> None: + response_header = AppResponseHeader(net_source=queue_entry.pending_msg.net_header.net_source, + net_destination=queue_entry.pending_msg.net_header.net_destination, + transaction_id=queue_entry.pending_msg.net_header.transaction_id, + originator_uuid=queue_entry.pending_msg.net_header.originator_uuid, + application_state=new_state) + response_msg = GenericAppResponse(header=Header(stamp=rospy.get_rostime()), + response_header=response_header) + self.response_publisher.publish(response_msg) + + def handle_timeout(self): + active_queue_entry = None + + for ns, queue_entry in self.release_cmd_queues.items(): + if queue_entry.waiting_for_ack.is_set(): + # this queue is the queue that timed out + queue_entry.waiting_for_ack.clear() + + if queue_entry.tries_remaining > 0: + rospy.logwarn(f'WARNING: Timed out on release command. {queue_entry.tries_remaining} tries remaining') + active_queue_entry = queue_entry + else: + # if no more tries_remaining, send an update event=0 will create app resp: APPLICATION_STATE_FAILED + rospy.logwarn(f'WARNING: Timed out on release command. No tries left! APPLICATION_STATE_FAILED') + update_msg = UpdateQueuedMessage(message_id=queue_entry.release_id, event=-1) + update_msg.header.stamp = rospy.get_time() + self.on_update_queue(update_msg=update_msg, application_state=AppResponseHeader.APPLICATION_STATE_TIMEOUT) + + if active_queue_entry is None: + # used all tries, we are done with this msg, ready to send another + self.clear_current_transaction() + + return active_queue_entry + + def spin(self): + rate = rospy.Rate(5) + while not rospy.is_shutdown(): + if self.release_transaction_in_progress.is_set(): + # make sure we have sent the message and we're not just pending tx + if self.current_release_pending_transmit.is_set(): + rate.sleep() + continue + + rospy.logdebug(f"Waiting on release ACK. Timeout: {self.release_timeout_sec} seconds") + + start_t = rospy.get_time() + timed_out = True + while rospy.get_time() - start_t < self.release_timeout_sec: + rate.sleep() + + if not self.release_transaction_in_progress.is_set(): + rospy.logwarn(f'Got release ACK!') + timed_out = False + break + + if timed_out: + rospy.logwarn(f'Timed out waiting for release ACK!') + active_queue_entry = self.handle_timeout() + if active_queue_entry is not None: + # we haven't used up all our tries for the active msg + rospy.logwarn(f'{active_queue_entry.tries_remaining} tries remaining for {active_queue_entry}') + self.on_endpoint_release_cmd(queue_entry=active_queue_entry) + continue + else: + # we're not waiting for an ACK, see if we have other commands queued now + for ns, queue_entry in self.release_cmd_queues.items(): + if queue_entry.tries_remaining == -1: + # this queue hasn't be txd yet + # Setting tries_remaining has the effect of marking the packet as active + queue_entry.tries_remaining = self.num_release_tries + self.on_endpoint_release_cmd(queue_entry=queue_entry) + + rate.sleep() + +if __name__ == '__main__': + try: + release_gateway = ReleaseGatewayNode() + release_gateway.spin() + except rospy.ROSInterruptException: + rospy.loginfo("ReleaseGatewayNode shutdown (interrupt)") -- GitLab From 8a46ce7cfb4d2b4899fc5129d5dea6b70ae46620 Mon Sep 17 00:00:00 2001 From: Eric Gallimore <egallimore@whoi.edu> Date: Mon, 3 Jun 2024 14:51:16 -0700 Subject: [PATCH 02/10] Add single topic for release commands. --- src/release_gateway_node.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/release_gateway_node.py b/src/release_gateway_node.py index 64b86c9..9d43b06 100755 --- a/src/release_gateway_node.py +++ b/src/release_gateway_node.py @@ -2,6 +2,7 @@ from functools import partial import sys from dataclasses import dataclass +from typing import Union import rospy from std_msgs.msg import Header, String @@ -46,6 +47,9 @@ class ReleaseGatewayNode: self.release_ns_lookup = {} self.release_cmd_queues = {} + + self.release_command_topic = rospy.get_param('~release_command_topic', '/release_command') + release_namespaces = rospy.get_param('~release_namespaces') try: for element in release_namespaces: @@ -81,6 +85,7 @@ class ReleaseGatewayNode: self.release_cmd_publisher = rospy.Publisher('/release_cmd_outbound', AddressedReleaseCommand, queue_size=5, latch=True) # Set up message subscribers + rospy.Subscriber(self.release_command_topic, AddressedReleaseCommand, self.on_release_cmd_to_send) for ns, queue_entry in self.release_cmd_queues.items(): binary_topic_name = rospy.names.canonicalize_name(ns + '/release_command') rospy.Subscriber(binary_topic_name, AddressedReleaseCommand, partial(self.on_release_cmd_to_send, ns=ns)) @@ -173,16 +178,22 @@ class ReleaseGatewayNode: return - def on_release_cmd_to_send(self, msg: AddressedReleaseCommand, ns: str) -> None: + def on_release_cmd_to_send(self, msg: AddressedReleaseCommand, ns: Union[str, None] = None) -> None: # We treat -1 as a special case where we look up the dest based on the topic if msg.release_id == -1: - msg.release_id = self.release_cmd_queues[ns].release_id + if ns: + msg.release_id = self.release_cmd_queues[ns].release_id + else: + rospy.logwarn("Can't lookup release id without namespace") # We treat -1 as not being set, in this case, we get the endpoint from our lookup (release_cmd_queues) if msg.net_header.net_destination == -1: # modem with gateway_id is the last hop for this msg. the gateway is responsible to tracking [this] msg # .. through the link layer while giving updates to the network layer via GenericAppResponse - msg.net_header.net_destination = self.release_cmd_queues[ns].gateway_id + if ns: + msg.net_header.net_destination = self.release_cmd_queues[ns].gateway_id + else: + msg.net_header.net_destination = self.src_id gateway_id = msg.net_header.net_destination rospy.loginfo(f'Got a release command for dest {msg.release_id} via {gateway_id}, publishing on release_cmd_outbound') -- GitLab From 219a3480e0a19ab8be79c3bd7df60a6a3d0e385e Mon Sep 17 00:00:00 2001 From: Eric Gallimore <egallimore@whoi.edu> Date: Mon, 3 Jun 2024 16:02:12 -0700 Subject: [PATCH 03/10] WIP: switch to a single topic for release commands. --- src/release_gateway_node.py | 173 ++++++++---------------------------- 1 file changed, 38 insertions(+), 135 deletions(-) diff --git a/src/release_gateway_node.py b/src/release_gateway_node.py index 9d43b06..371a7b8 100755 --- a/src/release_gateway_node.py +++ b/src/release_gateway_node.py @@ -44,39 +44,14 @@ class ReleaseGatewayNode: self.current_release_pending_transmit = Event() self.current_release_pending_transmit.clear() self.current_release_id = None + self.current_release_message = None + self.current_tries_remaining = 0 self.release_ns_lookup = {} self.release_cmd_queues = {} self.release_command_topic = rospy.get_param('~release_command_topic', '/release_command') - release_namespaces = rospy.get_param('~release_namespaces') - try: - for element in release_namespaces: - # release_namespaces: - # - ['/releases/release_a', 91] # we are the gateway for release_a - # - ['/releases/release_b', 92, 40] # 40 is the gateway for release_b - ns = element[0] - release_id = int(element[1]) - # by default, we are the gateway if none is passed - gateway_id = self.src_id - if len(element) == 3: - gateway_id = int(element[2]) - - self.release_ns_lookup[release_id] = ns - self.release_cmd_queues[ns] = ReleaseCmdQueueEntry(namespace=ns, - release_id=release_id, - gateway_id=gateway_id, - tries_remaining=0, - pending_msg=None, - waiting_for_ack=Event()) - except Exception as e: - rospy.logfatal(f'Unable to process release_namespaces parameter: {e}') - sys.exit() - if len(self.release_cmd_queues) <= 0: - rospy.logfatal(f'No vehicle namespaces were specified, unable to do anything') - sys.exit() - self.response_publisher = rospy.Publisher('release_command_response', GenericAppResponse, queue_size=10) # inbound subscriber (from network) @@ -93,8 +68,6 @@ class ReleaseGatewayNode: # when queue_entry.waiting_for_ack.is_set(), we have transmitted our pending_msg and are waiting for an ack queue_entry.waiting_for_ack.clear() - rospy.Subscriber('update_release_cmd_queue', UpdateQueuedMessage, self.on_update_queue) - # use this topic for capturing acks to release commands we sent rospy.Subscriber('nmea_from_modem', String, self.on_nmea_from_modem) # topic for updates on the manual queue (happens on insert and transmit) @@ -103,12 +76,9 @@ class ReleaseGatewayNode: def on_manual_transmit_status(self, msg): if 'on_transmit' in msg.name: if self.release_transaction_in_progress.is_set(): - # this may be the msg in progress going out - update_msg = UpdateQueuedMessage(message_id=self.current_release_id, event=1) - update_msg.header.stamp = rospy.get_time() - self.on_update_queue(update_msg=update_msg, application_state=AppResponseHeader.APPLICATION_STATE_EXECUTING) + self.publish_app_state(application_state=AppResponseHeader.APPLICATION_STATE_EXECUTING) - rospy.logdebug(f'release_gateway: on_transmit!') + rospy.loginfo(f'release_gateway: command transmitted') self.current_release_pending_transmit.clear() def pending_msg_to_nmea(self, pending_msg: AddressedReleaseCommand): @@ -116,23 +86,16 @@ class ReleaseGatewayNode: command_string = f"$CCCMD,RLS,{dest},0,0,0," return command_string - def on_endpoint_release_cmd(self, queue_entry): - if queue_entry.tries_remaining >= 1: - # make sure we have atleast 1 try left - self.current_release_id = queue_entry.release_id + def on_endpoint_release_cmd(self): + if self.current_tries_remaining > 0: self.release_transaction_in_progress.set() self.current_release_pending_transmit.set() - queue_entry.waiting_for_ack.set() - nmea_msg = self.pending_msg_to_nmea(queue_entry.pending_msg) + nmea_msg = self.pending_msg_to_nmea(self.current_release_message) self.manual_transmit_publisher.publish(data=nmea_msg) - queue_entry.tries_remaining -= 1 - update_msg = UpdateQueuedMessage(message_id=queue_entry.release_id, event=1) - update_msg.header.stamp = rospy.get_time() - self.on_update_queue(update_msg=update_msg, application_state=AppResponseHeader.APPLICATION_STATE_QUEUED) - else: - rospy.logerr(f'ERROR: on_endpoint_release_cmd called on inactive queue: {queue_entry}') + self.current_release_pending_transmit.clear() + self.publish_app_state(application_state=AppResponseHeader.APPLICATION_STATE_QUEUED) def on_nmea_from_modem(self, msg: String): if '$CACMR,RLR,' not in msg.data: @@ -160,40 +123,19 @@ class ReleaseGatewayNode: rospy.logerr(f"Invalid mapping of release destinations. Ensure that each destination is unique in the configuration.") return - this_queue: ReleaseCmdQueueEntry = matching_queue[0] - # Update network that a pending command has been ackd if self.release_transaction_in_progress.is_set(): - update_msg = UpdateQueuedMessage(message_id=self.current_release_id, event=2) - update_msg.header.stamp = rospy.get_time() - self.on_update_queue(update_msg=update_msg, application_state=AppResponseHeader.APPLICATION_STATE_SUCCEEDED) + self.publish_app_state(application_state=AppResponseHeader.APPLICATION_STATE_SUCCEEDED) # # OK, mark it as acked self.clear_current_transaction() - # and update the status - this_queue.tries_remaining = 0 - this_queue.pending_msg = None - this_queue.waiting_for_ack.clear() - return - def on_release_cmd_to_send(self, msg: AddressedReleaseCommand, ns: Union[str, None] = None) -> None: - # We treat -1 as a special case where we look up the dest based on the topic - if msg.release_id == -1: - if ns: - msg.release_id = self.release_cmd_queues[ns].release_id - else: - rospy.logwarn("Can't lookup release id without namespace") - - # We treat -1 as not being set, in this case, we get the endpoint from our lookup (release_cmd_queues) + def on_release_cmd_to_send(self, msg: AddressedReleaseCommand) -> None: + # We treat -1 as not being set, and we just send it ourselves if msg.net_header.net_destination == -1: - # modem with gateway_id is the last hop for this msg. the gateway is responsible to tracking [this] msg - # .. through the link layer while giving updates to the network layer via GenericAppResponse - if ns: - msg.net_header.net_destination = self.release_cmd_queues[ns].gateway_id - else: - msg.net_header.net_destination = self.src_id + msg.net_header.net_destination = self.src_id gateway_id = msg.net_header.net_destination rospy.loginfo(f'Got a release command for dest {msg.release_id} via {gateway_id}, publishing on release_cmd_outbound') @@ -206,78 +148,47 @@ class ReleaseGatewayNode: return else: rospy.loginfo(f'We are the gateway for this release command: {msg}') - # is this command stomping on one in progress for this release? - ns = self.release_ns_lookup[msg.release_id] - if self.release_cmd_queues[ns].waiting_for_ack.is_set(): - rospy.logwarn(f"WARNING: Release command for dest {msg.release_id} recvd while waiting_for_ack on pending_msg: {self.release_cmd_queues[ns].pending_msg}. Ignoring new msg") + if self.release_transaction_in_progress.is_set(): + rospy.logwarn(f"WARNING: Release command for dest {msg.release_id} recvd while waiting_for_ack on pending_msg: {self.current_release_id}. Ignoring new msg") + self.publish_app_state(application_state=AppResponseHeader.APPLICATION_STATE_REJECTED, msg) return # set this here so we known we shouldn't start countdown to timeout yet. # when the nmea is transmitted, we clear this and start our timeout countdown self.current_release_pending_transmit.set() - # We are going to send the packet ourselves (and wait for the ack ourselves). - self.release_cmd_queues[ns].pending_msg = msg - # -1 signals that this queue has not been sent yet - self.release_cmd_queues[ns].tries_remaining = -1 - - if self.release_transaction_in_progress.is_set(): - rospy.logwarn(f"WARNING: release command for dest {msg.release_id}, recvd while release_transaction_in_progress. Caching request") - else: - rospy.logwarn(f"Queued release command for dest {msg.release_id} via {gateway_id}" + " (myself) " if gateway_id == self.src_id else "") - - def on_update_queue(self, update_msg: UpdateQueuedMessage, application_state: int) -> None: - # We track only one message per destination, so use that as the ID. - matching_queue = list(filter(lambda q: q.release_id == update_msg.message_id, self.release_cmd_queues.values())) - if len(matching_queue) == 0: - rospy.logerr(f"Invalid message id / dest id.") - return - - this_queue = matching_queue[0] - if update_msg.event != UpdateQueuedMessage.EVENT_LINK_QUEUED: - self.current_release_pending_transmit.clear() - self.send_response(this_queue, application_state) + self.release_transaction_in_progress.set() def clear_current_transaction(self): - {queue_entry.waiting_for_ack.clear() for queue_entry in self.release_cmd_queues.values()} self.release_transaction_in_progress.clear() self.current_release_pending_transmit.clear() self.current_release_id = None - - def send_response(self, queue_entry: ReleaseCmdQueueEntry, new_state) -> None: - response_header = AppResponseHeader(net_source=queue_entry.pending_msg.net_header.net_source, - net_destination=queue_entry.pending_msg.net_header.net_destination, - transaction_id=queue_entry.pending_msg.net_header.transaction_id, - originator_uuid=queue_entry.pending_msg.net_header.originator_uuid, - application_state=new_state) + self.current_release_message = None + self.current_release_id = 0 + + def publish_app_state(self, application_state, release_message: Union[AddressedReleaseCommand, None]=None) -> None: + if not release_message: + release_message = self.current_release_message + response_header = AppResponseHeader(net_source=release_message.net_header.net_source, + net_destination=release_message.net_header.net_destination, + transaction_id=release_message.net_header.transaction_id, + originator_uuid=release_message.net_header.originator_uuid, + application_state=application_state) response_msg = GenericAppResponse(header=Header(stamp=rospy.get_rostime()), response_header=response_header) self.response_publisher.publish(response_msg) def handle_timeout(self): - active_queue_entry = None + if self.current_tries_remaining > 0: + rospy.logwarn(f'WARNING: Timed out on release command. {self.current_tries_remaining} tries remaining') + else: + # if no more tries_remaining, send an update event=0 will create app resp: APPLICATION_STATE_FAILED + rospy.logwarn(f'WARNING: Timed out on release command. No tries left! APPLICATION_STATE_FAILED') + self.publish_app_state(application_state=AppResponseHeader.APPLICATION_STATE_TIMEOUT) - for ns, queue_entry in self.release_cmd_queues.items(): - if queue_entry.waiting_for_ack.is_set(): - # this queue is the queue that timed out - queue_entry.waiting_for_ack.clear() - - if queue_entry.tries_remaining > 0: - rospy.logwarn(f'WARNING: Timed out on release command. {queue_entry.tries_remaining} tries remaining') - active_queue_entry = queue_entry - else: - # if no more tries_remaining, send an update event=0 will create app resp: APPLICATION_STATE_FAILED - rospy.logwarn(f'WARNING: Timed out on release command. No tries left! APPLICATION_STATE_FAILED') - update_msg = UpdateQueuedMessage(message_id=queue_entry.release_id, event=-1) - update_msg.header.stamp = rospy.get_time() - self.on_update_queue(update_msg=update_msg, application_state=AppResponseHeader.APPLICATION_STATE_TIMEOUT) - - if active_queue_entry is None: # used all tries, we are done with this msg, ready to send another self.clear_current_transaction() - return active_queue_entry - def spin(self): rate = rospy.Rate(5) while not rospy.is_shutdown(): @@ -301,23 +212,15 @@ class ReleaseGatewayNode: if timed_out: rospy.logwarn(f'Timed out waiting for release ACK!') - active_queue_entry = self.handle_timeout() - if active_queue_entry is not None: + self.handle_timeout() + if self.current_tries_remaining > 0: # we haven't used up all our tries for the active msg - rospy.logwarn(f'{active_queue_entry.tries_remaining} tries remaining for {active_queue_entry}') - self.on_endpoint_release_cmd(queue_entry=active_queue_entry) + self.on_endpoint_release_cmd() continue - else: - # we're not waiting for an ACK, see if we have other commands queued now - for ns, queue_entry in self.release_cmd_queues.items(): - if queue_entry.tries_remaining == -1: - # this queue hasn't be txd yet - # Setting tries_remaining has the effect of marking the packet as active - queue_entry.tries_remaining = self.num_release_tries - self.on_endpoint_release_cmd(queue_entry=queue_entry) rate.sleep() + if __name__ == '__main__': try: release_gateway = ReleaseGatewayNode() -- GitLab From 4959b103e4b24ffa7fb62e396026f9d898ace8c5 Mon Sep 17 00:00:00 2001 From: Eric Gallimore <egallimore@whoi.edu> Date: Mon, 3 Jun 2024 16:03:24 -0700 Subject: [PATCH 04/10] Fix bug in failed notification --- src/release_gateway_node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/release_gateway_node.py b/src/release_gateway_node.py index 371a7b8..e679125 100755 --- a/src/release_gateway_node.py +++ b/src/release_gateway_node.py @@ -151,7 +151,7 @@ class ReleaseGatewayNode: if self.release_transaction_in_progress.is_set(): rospy.logwarn(f"WARNING: Release command for dest {msg.release_id} recvd while waiting_for_ack on pending_msg: {self.current_release_id}. Ignoring new msg") - self.publish_app_state(application_state=AppResponseHeader.APPLICATION_STATE_REJECTED, msg) + self.publish_app_state(application_state=AppResponseHeader.APPLICATION_STATE_REJECTED, release_message=msg) return # set this here so we known we shouldn't start countdown to timeout yet. -- GitLab From 2e6501b52e1a53a16e0188fa185a92f5ad646da9 Mon Sep 17 00:00:00 2001 From: Eric Gallimore <egallimore@whoi.edu> Date: Mon, 3 Jun 2024 16:12:36 -0700 Subject: [PATCH 05/10] Update to handle network transmit updates. --- src/release_gateway_node.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/release_gateway_node.py b/src/release_gateway_node.py index e679125..c231dee 100755 --- a/src/release_gateway_node.py +++ b/src/release_gateway_node.py @@ -68,11 +68,19 @@ class ReleaseGatewayNode: # when queue_entry.waiting_for_ack.is_set(), we have transmitted our pending_msg and are waiting for an ack queue_entry.waiting_for_ack.clear() + rospy.Subscriber('update_release_cmd_queue', UpdateQueuedMessage, self.on_update_queue) + # use this topic for capturing acks to release commands we sent rospy.Subscriber('nmea_from_modem', String, self.on_nmea_from_modem) # topic for updates on the manual queue (happens on insert and transmit) rospy.Subscriber(self.manual_transmit_status_topic, DiagnosticStatus, self.on_manual_transmit_status) + def on_update_queue(self, update_msg: UpdateQueuedMessage) -> None: + # We only track one message at a time right now + if update_msg.event == UpdateQueuedMessage.EVENT_LINK_TRANSMITTED: + self.current_release_pending_transmit.clear() + self.publish_app_state(application_state=AppResponseHeader.APPLICATION_STATE_QUEUED) + def on_manual_transmit_status(self, msg): if 'on_transmit' in msg.name: if self.release_transaction_in_progress.is_set(): -- GitLab From 28ff8ef8f95dfc1ec7f5b276e8a54e319ba3ed29 Mon Sep 17 00:00:00 2001 From: Eric Gallimore <egallimore@whoi.edu> Date: Mon, 3 Jun 2024 16:13:24 -0700 Subject: [PATCH 06/10] Removed unused code --- src/release_gateway_node.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/release_gateway_node.py b/src/release_gateway_node.py index c231dee..6f094af 100755 --- a/src/release_gateway_node.py +++ b/src/release_gateway_node.py @@ -61,13 +61,6 @@ class ReleaseGatewayNode: # Set up message subscribers rospy.Subscriber(self.release_command_topic, AddressedReleaseCommand, self.on_release_cmd_to_send) - for ns, queue_entry in self.release_cmd_queues.items(): - binary_topic_name = rospy.names.canonicalize_name(ns + '/release_command') - rospy.Subscriber(binary_topic_name, AddressedReleaseCommand, partial(self.on_release_cmd_to_send, ns=ns)) - # queue_entry.waiting_for_ack is a Threading.Event type. - # when queue_entry.waiting_for_ack.is_set(), we have transmitted our pending_msg and are waiting for an ack - queue_entry.waiting_for_ack.clear() - rospy.Subscriber('update_release_cmd_queue', UpdateQueuedMessage, self.on_update_queue) # use this topic for capturing acks to release commands we sent -- GitLab From ce89dd82860cf20515712386bf2637bc6aa82ffb Mon Sep 17 00:00:00 2001 From: Eric Gallimore <egallimore@whoi.edu> Date: Mon, 3 Jun 2024 16:19:22 -0700 Subject: [PATCH 07/10] Added codecs, removed unused code. --- codecs/AddressedReleaseCommand.yaml | 2 ++ launch/net_codec_config_release.yaml | 18 ++++++++++++++++++ src/release_gateway_node.py | 11 ++--------- 3 files changed, 22 insertions(+), 9 deletions(-) create mode 100644 codecs/AddressedReleaseCommand.yaml create mode 100644 launch/net_codec_config_release.yaml diff --git a/codecs/AddressedReleaseCommand.yaml b/codecs/AddressedReleaseCommand.yaml new file mode 100644 index 0000000..50f7d70 --- /dev/null +++ b/codecs/AddressedReleaseCommand.yaml @@ -0,0 +1,2 @@ +release_id: + codec: uint8 \ No newline at end of file diff --git a/launch/net_codec_config_release.yaml b/launch/net_codec_config_release.yaml new file mode 100644 index 0000000..2dfd6b9 --- /dev/null +++ b/launch/net_codec_config_release.yaml @@ -0,0 +1,18 @@ + +message_codecs: + - id: 111 + subscribe_topic: '/release_command_outbound' + publish_topic: '/release_command_inbound' + ros_type: 'ros_acomms_uuv/AddressedReleaseCommand' + default_dest: 120 + fields: !ros_include { package: ros_acomms_uuv, extension: codecs/AddressedReleaseCommand.yaml } + + - id: 112 + subscribe_topic: 'release_command_response' + ros_type: 'ros_acomms_net_msgs/GenericAppResponse' + default_dest: 120 + fields: + header: + codec: 'msg' + ros_type: 'std_msgs/Header' + fields: { 'stamp': { 'codec': 'rostime' } } diff --git a/src/release_gateway_node.py b/src/release_gateway_node.py index 6f094af..7450bd6 100755 --- a/src/release_gateway_node.py +++ b/src/release_gateway_node.py @@ -55,25 +55,18 @@ class ReleaseGatewayNode: self.response_publisher = rospy.Publisher('release_command_response', GenericAppResponse, queue_size=10) # inbound subscriber (from network) - rospy.Subscriber('/release_cmd_inbound', AddressedReleaseCommand, self.on_release_cmd_inbound) + rospy.Subscriber('/release_command_inbound', AddressedReleaseCommand, self.on_release_cmd_inbound) # outbound publisher (to network) - self.release_cmd_publisher = rospy.Publisher('/release_cmd_outbound', AddressedReleaseCommand, queue_size=5, latch=True) + self.release_cmd_publisher = rospy.Publisher('/release_command_outbound', AddressedReleaseCommand, queue_size=5, latch=True) # Set up message subscribers rospy.Subscriber(self.release_command_topic, AddressedReleaseCommand, self.on_release_cmd_to_send) - rospy.Subscriber('update_release_cmd_queue', UpdateQueuedMessage, self.on_update_queue) # use this topic for capturing acks to release commands we sent rospy.Subscriber('nmea_from_modem', String, self.on_nmea_from_modem) # topic for updates on the manual queue (happens on insert and transmit) rospy.Subscriber(self.manual_transmit_status_topic, DiagnosticStatus, self.on_manual_transmit_status) - def on_update_queue(self, update_msg: UpdateQueuedMessage) -> None: - # We only track one message at a time right now - if update_msg.event == UpdateQueuedMessage.EVENT_LINK_TRANSMITTED: - self.current_release_pending_transmit.clear() - self.publish_app_state(application_state=AppResponseHeader.APPLICATION_STATE_QUEUED) - def on_manual_transmit_status(self, msg): if 'on_transmit' in msg.name: if self.release_transaction_in_progress.is_set(): -- GitLab From 7f2fc0817276fc7187d4774060e191282549c7f0 Mon Sep 17 00:00:00 2001 From: Eric Gallimore <egallimore@whoi.edu> Date: Mon, 3 Jun 2024 16:20:03 -0700 Subject: [PATCH 08/10] Fixed topic name in log message --- src/release_gateway_node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/release_gateway_node.py b/src/release_gateway_node.py index 7450bd6..00efcd2 100755 --- a/src/release_gateway_node.py +++ b/src/release_gateway_node.py @@ -132,7 +132,7 @@ class ReleaseGatewayNode: msg.net_header.net_destination = self.src_id gateway_id = msg.net_header.net_destination - rospy.loginfo(f'Got a release command for dest {msg.release_id} via {gateway_id}, publishing on release_cmd_outbound') + rospy.loginfo(f'Got a release command for dest {msg.release_id} via {gateway_id}, publishing') self.release_cmd_publisher.publish(msg) def on_release_cmd_inbound(self, msg: AddressedReleaseCommand) -> None: -- GitLab From 9f986fdc6352063d0860417ffb28253658b9d42a Mon Sep 17 00:00:00 2001 From: Eric Gallimore <egallimore@whoi.edu> Date: Mon, 3 Jun 2024 16:28:34 -0700 Subject: [PATCH 09/10] Fixed some stuff. --- src/release_gateway_node.py | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/src/release_gateway_node.py b/src/release_gateway_node.py index 00efcd2..8263768 100755 --- a/src/release_gateway_node.py +++ b/src/release_gateway_node.py @@ -9,19 +9,8 @@ from std_msgs.msg import Header, String from diagnostic_msgs.msg import DiagnosticStatus, KeyValue from ros_acomms_net_msgs.msg import AppResponseHeader, GenericAppResponse from ros_acomms_uuv.msg import AddressedReleaseCommand -from ros_acomms_msgs.msg import UpdateQueuedMessage from threading import Event -from collections import deque - -@dataclass -class ReleaseCmdQueueEntry: - namespace: str - release_id: int - gateway_id: int - tries_remaining: int - pending_msg: AddressedReleaseCommand - waiting_for_ack: Event class ReleaseGatewayNode: @@ -71,6 +60,7 @@ class ReleaseGatewayNode: if 'on_transmit' in msg.name: if self.release_transaction_in_progress.is_set(): self.publish_app_state(application_state=AppResponseHeader.APPLICATION_STATE_EXECUTING) + self.current_tries_remaining -= 1 rospy.loginfo(f'release_gateway: command transmitted') self.current_release_pending_transmit.clear() @@ -88,8 +78,8 @@ class ReleaseGatewayNode: nmea_msg = self.pending_msg_to_nmea(self.current_release_message) self.manual_transmit_publisher.publish(data=nmea_msg) - self.current_release_pending_transmit.clear() self.publish_app_state(application_state=AppResponseHeader.APPLICATION_STATE_QUEUED) + rospy.loginfo(f"Queued manual command: {nmea_msg}") def on_nmea_from_modem(self, msg: String): if '$CACMR,RLR,' not in msg.data: @@ -150,7 +140,10 @@ class ReleaseGatewayNode: # set this here so we known we shouldn't start countdown to timeout yet. # when the nmea is transmitted, we clear this and start our timeout countdown - self.current_release_pending_transmit.set() + self.current_release_message = msg + self.current_release_id = msg.release_id + self.current_tries_remaining = self.num_release_tries + self.current_release_pending_transmit.clear() self.release_transaction_in_progress.set() def clear_current_transaction(self): @@ -162,7 +155,11 @@ class ReleaseGatewayNode: def publish_app_state(self, application_state, release_message: Union[AddressedReleaseCommand, None]=None) -> None: if not release_message: - release_message = self.current_release_message + if self.current_release_message is not None: + release_message = self.current_release_message + else: + rospy.logwarn("Can't publish app state without an active release message.") + return response_header = AppResponseHeader(net_source=release_message.net_header.net_source, net_destination=release_message.net_header.net_destination, transaction_id=release_message.net_header.transaction_id, -- GitLab From d7a8d8c841838543a8a519152a8fabbb97bb3477 Mon Sep 17 00:00:00 2001 From: caileighf <cfitzgerald@whoi.edu> Date: Tue, 4 Jun 2024 15:48:15 +0000 Subject: [PATCH 10/10] changed second field in RLS command (CCCMD,RLS,{dest},[changed from 0 to 1],0,0,. added config command strings added manual tx publisher (without /tdma/). removed logic for ignoring ACKs we want to handle them --- src/release_gateway_node.py | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/src/release_gateway_node.py b/src/release_gateway_node.py index 8263768..fc8b0dc 100755 --- a/src/release_gateway_node.py +++ b/src/release_gateway_node.py @@ -27,6 +27,7 @@ class ReleaseGatewayNode: # topic to publish nmea. Tdma will handle and send when active self.manual_transmit_publisher = rospy.Publisher(self.manual_transmit_topic, String, queue_size=5) + self.manual_transmit_txd_publisher = rospy.Publisher(self.manual_transmit_txd_topic, String, queue_size=5) # when set, we cache new requests until we are done (or timed out) self.release_transaction_in_progress = Event() self.release_transaction_in_progress.clear() @@ -67,7 +68,7 @@ class ReleaseGatewayNode: def pending_msg_to_nmea(self, pending_msg: AddressedReleaseCommand): dest = pending_msg.release_id - command_string = f"$CCCMD,RLS,{dest},0,0,0," + command_string = f"$CCCMD,RLS,{dest},1,0,0," return command_string def on_endpoint_release_cmd(self): @@ -76,6 +77,9 @@ class ReleaseGatewayNode: self.current_release_pending_transmit.set() nmea_msg = self.pending_msg_to_nmea(self.current_release_message) + self.manual_transmit_txd_publisher.publish(data=f'CCCFG,BND,0') + self.manual_transmit_txd_publisher.publish(data=f'CCCFG,FC0,26000') + self.manual_transmit_txd_publisher.publish(data=f'CCCFG,BW0,4000') self.manual_transmit_publisher.publish(data=nmea_msg) self.publish_app_state(application_state=AppResponseHeader.APPLICATION_STATE_QUEUED) @@ -94,18 +98,9 @@ class ReleaseGatewayNode: rospy.loginfo(f"Heard release ACK from src {src} dest {dest}.") # Make sure that this ack is for us - if dest != self.src_id: + if src != self.src_id: rospy.loginfo(f"Overheard release ACK message for a different modem (src {src} dest {dest}), ignoring it.") - return - - matching_queue = list(filter(lambda q: q.release_id == src, self.release_cmd_queues.values())) - if len(matching_queue) == 0: - rospy.loginfo(f"Got ACK message from {src}, but we aren't sending release commands to this address.") - return - - if len(matching_queue) > 1: - rospy.logerr(f"Invalid mapping of release destinations. Ensure that each destination is unique in the configuration.") - return + #return # Update network that a pending command has been ackd if self.release_transaction_in_progress.is_set(): @@ -147,6 +142,7 @@ class ReleaseGatewayNode: self.release_transaction_in_progress.set() def clear_current_transaction(self): + self.manual_transmit_txd_publisher.publish(data=f'CCCFG,BND,3') self.release_transaction_in_progress.clear() self.current_release_pending_transmit.clear() self.current_release_id = None -- GitLab