diff --git a/ros_acomms/src/acomms_codecs/ros_packet_codec.py b/ros_acomms/src/acomms_codecs/ros_packet_codec.py index b341c2013a4a6153595ec103abbbd34b6eade36d..5ac3772b3e2a4c925f80efdb63da33e746b13bde 100644 --- a/ros_acomms/src/acomms_codecs/ros_packet_codec.py +++ b/ros_acomms/src/acomms_codecs/ros_packet_codec.py @@ -16,6 +16,7 @@ import hashlib # single message ACK # multiple message ACK + # Helper functions def setattr_nested(obj, attr, value): def rgetattr(obj, attr, *args): @@ -30,10 +31,12 @@ def setattr_nested(obj, attr, value): class PacketCodec(BasePacketCodec): def __init__(self, message_codecs: Dict, message_publishers=None, allow_fragmentation=True, minimum_fragment_size_bits=12*8, - sequence_number_bits=8, + sequence_number_bits=8, # TODO: this is not currently used; suggest removing in future miniframe_header=[0xac], dataframe_header=[], **kwargs): - super(PacketCodec, self).__init__(message_codecs, message_publishers=message_publishers, - miniframe_header=miniframe_header, dataframe_header=dataframe_header) + super(PacketCodec, self).__init__(message_codecs=message_codecs, + message_publishers=message_publishers, + miniframe_header=miniframe_header, + dataframe_header=dataframe_header) self.allow_fragmentation = allow_fragmentation self.minimum_fragment_size_bits = minimum_fragment_size_bits @@ -101,12 +104,13 @@ class PacketCodec(BasePacketCodec): miniframe_bytes = miniframe_bits.tobytes() dataframe_bytes = dataframe_bits.tobytes() - rospy.loginfo( - f"RosPacketCodec encoded {len(messages_in_packet)} msgs in {len(miniframe_bytes) + len(dataframe_bytes)} bytes: {miniframe_bytes.hex()} | {dataframe_bytes.hex()}") + rospy.loginfo(f"RosPacketCodec encoded {len(messages_in_packet)} msgs " + f"in {len(miniframe_bytes) + len(dataframe_bytes)} bytes: " + f"{miniframe_bytes.hex()} | {dataframe_bytes.hex()}") return messages_in_packet, miniframe_bytes, dataframe_bytes - def decode_packet(self, received_packet: ReceivedPacket) -> None: + def decode_packet(self, received_packet: ReceivedPacket, **kwargs) -> None: if not self.message_publishers: # This codec is encode-only return @@ -119,12 +123,20 @@ class PacketCodec(BasePacketCodec): payload_bytes = miniframe_bytes + dataframe_bytes payload_bits = ConstBitStream(bytes=payload_bytes) - self.decode_payload(payload_bits, received_packet=received_packet) + if 'pass_thrus' in kwargs and kwargs['pass_thrus'] is not None: + self.decode_payload(payload_bits, received_packet=received_packet, pass_thrus=kwargs['pass_thrus']) + else: + self.decode_payload(payload_bits, received_packet=received_packet) def decode_payload(self, payload_bits: BitStream, received_packet: Optional[ReceivedPacket] = None, src: Optional[int] = None, set_fields: Dict[str, Any] = {}, - publish_message: bool = True) -> None: + publish_message: bool = True, # TODO: arg added in b9feeb9a but not used... + pass_thrus: Dict[List[int], rospy.Publisher, bool, bool] = {}) -> None: + + # flag for indicating if a ReceivedPacket has already met the pass_thru criteria + packet_already_passed: bool = False + if not self.message_publishers: # This codec is encode-only return @@ -137,9 +149,13 @@ class PacketCodec(BasePacketCodec): # Get the SRC for this packet, which can be set explictly or in the received_packet if src is None: - #rospy.logwarn("missing SRC") if received_packet: src = received_packet.packet.src + else: + # src being NoneType handling is further-managed below + # TODO: can we set an Unknown SRC to 999 (or something) here? + # src = 999 + pass # loop through until we are out of bits (ReadError will be raised) try: @@ -151,40 +167,90 @@ class PacketCodec(BasePacketCodec): Raise: KeyError if id < 128: + message_codec = self.message_codecs[id] - rospy.loginfo("ROS Message decode: ID {} with {}".format(id, message_codec)) - rospy.logdebug(payload_bits) + rospy.loginfo(f"ROS Message decode: ID [{id}] using " + f"RosType Codec [{message_codec.__dict__['ros_type'].__name__}]") + rospy.logdebug(f"Payload Bits: {payload_bits}") + + # IMPORTANT STEP: decode payload bits into a ROS message ros_msg = self.message_codecs[id].decode(payload_bits, received_packet) + + # gateway steps for passing-thru messages + if pass_thrus and id in pass_thrus['ids']: + + # 1. forward the ReceivedPacket, unless it has already been passed-thru + if received_packet is not None and not packet_already_passed: + rospy.logdebug(f"Passing-thru codec id [{id}] (found in {pass_thrus['ids']})") + pass_thrus['publisher'].publish(received_packet) + rospy.loginfo(f"Passing-thru msg codec id: {id} on " + f"[{pass_thrus['publisher'].resolved_name}]") + packet_already_passed = True + + # 2. if in gateway-only mode, skip the local publish + if pass_thrus['gateway_only']: + # relevant payload bits were just decoded, + # safe to just continue (and NOT publish anything locally) + rospy.logwarn(f"Gateway-Only [{pass_thrus['gateway_only']}], " + f"not publishing Codec ID [{id}] locally...") + continue + + # decode meta packets + if pass_thrus and isinstance(ros_msg, ReceivedPacket) and pass_thrus['decode_meta_packets']: + # we're Rx'ing a ReceivedPacket (how meta)... + # continue decoding the packet payload to get the original message (including msg SRC) + try: + rospy.loginfo(f"Encountered [Meta]ReceivedPacket ID [{id}]") + rospy.logdebug(f"Decoding the following [Meta]ReceivedPacket:\n{ros_msg}") + self.decode_packet(ros_msg, pass_thrus=pass_thrus) + except Exception as _e: + rospy.logerr(f"Error decoding [Meta]ReceivedPacket: {(_e,)}") + else: + rospy.logwarn(f'Finished decoding [Meta]ReceivedPacket.') + pass # continue decoding... + else: # continue decoding... + pass + for field, value in set_fields.items(): # Only set the field if the message already has it. The exception handling covers this. # ROS messages aren't dictionaries, so we can't just add fields. try: setattr_nested(ros_msg, field, value) except NameError or AttributeError as e: - rospy.loginfo(f"Message decode: could not set {field} on {ros_msg}. Error: {e}") + rospy.logwarn(f"Message decode: could not set {field} on {ros_msg}. Error: {e}") + try: if isinstance(self.message_publishers[id], rospy.Publisher): self.message_publishers[id].publish(ros_msg) + rospy.loginfo(f"Published msg id: {id} on [{self.message_publishers[id].resolved_name}]") else: if src is not None: try: self.message_publishers[id][src].publish(ros_msg) + rospy.loginfo(f"Published msg id: {id} on " + f"[{self.message_publishers[id][src].resolved_name}]") except KeyError: - rospy.loginfo("Got a message from an un-named SRC {}, not publishing it".format( - src)) + if 999 in self.message_publishers[id]: + self.message_publishers[id][999].publish(ros_msg) + rospy.logwarn(f"Published msg id: {id} on " + f"[{self.message_publishers[id][999].resolved_name}]") + else: + rospy.logwarn(f"Got a message from an un-named SRC [{src}], not publishing it") + else: - rospy.logwarn("Got a message that needs metadata to publish, but no metadata was supplied") - rospy.logdebug("Published msg id: {0}".format(id)) + rospy.logwarn("Got a message that needs metadata to publish, " + "but no metadata was supplied") except Exception as e: rospy.logerr("Error publishing ROS message: {}".format(e)) else: frag_type = payload_bits.read('uint:8') payload_bits.pos -= 8 rospy.logdebug("frag_type: {0}".format(frag_type)) + # Received a fragment here if frag_type == 0: rospy.logdebug("New start fragment") - # Parsse new fragment and add to dictionary + # Parse new fragment and add to dictionary hdrbits = payload_bits.read(FragmentationStartHeader.length_bits) hdr = FragmentationStartHeader.from_bits(frag_type, hdrbits) @@ -203,7 +269,8 @@ class PacketCodec(BasePacketCodec): hdr.end_fragment_index) try: - existing = self.incoming_fragmented_messages[(received_packet.cst.src, hdr.sequence_num)] + existing = self.incoming_fragmented_messages[(received_packet.cst.src, + hdr.sequence_num)] rospy.logdebug( "Existing FragmentationTracker with this sequence_num {0}".format(hdr.sequence_num)) if existing.unix_time == incoming.unix_time: @@ -218,15 +285,15 @@ class PacketCodec(BasePacketCodec): self.incoming_fragmented_messages[ (received_packet.cst.src, hdr.sequence_num)] = incoming except KeyError as e: - rospy.logwarn( - "Could not create new fragmentation tracker for incoming message") - #TODO: read out the bits so we can resynchronize if there are other messages in this packet + rospy.logwarn("Could not create new fragmentation tracker for incoming message") + # TODO: read out the bits so we can resynchronize if + # there are other messages in this packet return - ack = FragmentationAckHeader(incoming.sequence_num, 1, incoming.transferred_blocks) - msg = EncodedAck(src=received_packet.packet.dest, dest=received_packet.packet.src, - encoded_ack=ack.as_bits.bytes) + msg = EncodedAck(src=received_packet.packet.dest, + dest=received_packet.packet.src, + encoded_ack=ack.as_bits.bytes) self.encoded_ack_pub.publish(msg) elif frag_type == 1: @@ -259,9 +326,8 @@ class PacketCodec(BasePacketCodec): encoded_ack=ack.as_bits.bytes) self.encoded_ack_pub.publish(msg) incoming = None - rospy.logwarn( - "Fragmented packets out of sync. Receiving continuation headers but do not have start header.") - + rospy.logwarn("Fragmented packets out of sync. " + "Receiving continuation headers but do not have start header.") else: # Ack... pass @@ -269,8 +335,8 @@ class PacketCodec(BasePacketCodec): try: rospy.logdebug(incoming.transferred_blocks) rospy.logdebug("Incoming size: {0}".format(incoming.payload_size_blocks)) - except: - rospy.logdebug("Couldn't print transferred blocks") + except Exception as _e: + rospy.logdebug(f"Couldn't print transferred blocks ({_e})") if incoming is not None: if incoming.complete(): @@ -278,10 +344,11 @@ class PacketCodec(BasePacketCodec): # Store that we have published incoming message incoming.published = True - # Parse the message with the appropraite codec + # Parse the message with the appropriate codec message_codec = self.message_codecs[id % 128] rospy.loginfo("ROS Message decode: ID {} with {}".format(id % 128, message_codec)) - ros_msg = self.message_codecs[id % 128].decode(ConstBitStream(incoming.payload_bits), received_packet) + ros_msg = self.message_codecs[id % 128].decode(ConstBitStream(incoming.payload_bits), + received_packet) if isinstance(self.message_publishers[id % 128], rospy.Publisher): self.message_publishers[id % 128].publish(ros_msg) @@ -289,9 +356,11 @@ class PacketCodec(BasePacketCodec): try: self.message_publishers[id % 128][received_packet.packet.src].publish(ros_msg) except KeyError: - rospy.loginfo("Got a message from an un-named SRC {}, not publishing it".format(received_packet.packet.src)) + rospy.loginfo("Got a message from an un-named SRC {}, " + "not publishing it".format(received_packet.packet.src)) - #TODO: Keep a record of published messages for now but eventually we need to clean them up + # TODO: Keep a record of published messages for now but... + # eventually we need to clean them up #self.incoming_fragmented_messages.pop((incoming.fragment_src, incoming.sequence_num)) else: diff --git a/ros_acomms/src/packet_dispatch_node.py b/ros_acomms/src/packet_dispatch_node.py index 1e49bc04eeff15f79e9bfd3aafc64425edefcc57..68cc098ca9fa5175c3686fa5c8378070c643304c 100755 --- a/ros_acomms/src/packet_dispatch_node.py +++ b/ros_acomms/src/packet_dispatch_node.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 +from sys import argv from typing import Dict, List, Tuple import rospy @@ -77,8 +78,8 @@ class DecoderListEntry: return f"DecoderListEntry: {self.__dict__}" class PacketDispatchNode(object): - def __init__(self): - rospy.init_node('packet_dispatch') + def __init__(self, logger_level: str = 'DEBUG'): + rospy.init_node('packet_dispatch', log_level=getattr(rospy, logger_level)) try: version = version_node() @@ -87,6 +88,53 @@ class PacketDispatchNode(object): except: rospy.logwarn("Unable to query version information") + # PACKET_GATEWAY + # if we Rx a Packet matching a pass-thru Codec Message ID from the list, + # publish the ros_acomms/ReceivedPacket to the pass-thru topic + _pass_thru_codec_ids: List[int] = rospy.get_param('~pass_thru_codec_ids', []) + + if isinstance(_pass_thru_codec_ids, List): + pass + elif isinstance(_pass_thru_codec_ids, int): + _pass_thru_codec_ids = [_pass_thru_codec_ids] + elif isinstance(_pass_thru_codec_ids, str) and _pass_thru_codec_ids.lower() in ('any', 'all'): + _pass_thru_codec_ids = list(range(128)) + else: + rospy.logwarn(f'Invalid pass_thru_codec_ids param: {_pass_thru_codec_ids} (type: {type(_pass_thru_codec_ids)})') + rospy.logwarn('Valid types: <List[int,]>, <int>, or <str> ("any", "all")') + rospy.logwarn('Setting pass_thru_codec_ids to [] (effectively nothing is passed thru...)') + _pass_thru_codec_ids = [] + + # pass-thru ReceivedPackets on the following topic... + _pass_thru_topic: str = rospy.get_param('~pass_thru_topic', '') + + # if gateway_only_mode is True, then we ONLY pass-thru the matching packets, and do NOT publish locally + _gateway_only_mode: bool = rospy.get_param('~gateway_only_mode', False) + + # META_PACKET_DECODER + # on the far-end, if we Rx and decode a ReceivedPacket, we can re-run + # the decode_packet method to get to the original ReceivedPacket data + # e.g. a ReceivedPacket payload IS A ReceivedPacket (how meta), so decode it again... + # CAVEAT: probably ONLY use this with the above-mentioned PACKET_GATEWAY setup + # i.e. the Gateway's packet_publisher node is set up with params: + # pass_thru_codec_ids, pass_thru_topic, gateway_only_mode + _decode_meta_packets: bool = rospy.get_param('~decode_meta_packets', False) + + self.pass_thrus: dict = None + # if no pass-thru-related params are set, don't update the pass-thru dict + # False = any(list([], '', False, False)) + if any((_pass_thru_codec_ids, _pass_thru_topic, _gateway_only_mode, _decode_meta_packets)): + # if no pass-thru topic is set, set a default topic <str> so Publisher doesn't throw ValueError + _pass_thru_topic = '/pass_thru_topic' if not _pass_thru_topic else _pass_thru_topic + self.pass_thrus: dict = {'ids': _pass_thru_codec_ids, + 'publisher': rospy.Publisher(_pass_thru_topic, ReceivedPacket, queue_size=10), + 'gateway_only': _gateway_only_mode, + 'decode_meta_packets': _decode_meta_packets} + rospy.logdebug(f'Pass-Thrus Object: {self.pass_thrus}') + + # initialize the packet decoder-helper attributes + self.decoder_list: List[DecoderListEntry] = None + self.message_publishers: Dict[QueueId, rospy.Publisher] = None self.setup_codecs() @@ -99,53 +147,56 @@ class PacketDispatchNode(object): load_custom_codecs() packet_codec_param = read_packet_codec_yaml() - self.decoder_list: List[DecoderListEntry] = [] + self.decoder_list = [] for packet_codec in packet_codec_param: # QUESTION(lindzey): Storing the message codec dict as a member # variable mirrors the logic that was here before, # but seems to imply that there will only ever be a single # packet codec in the config file. Is that true? - self.message_codecs = get_message_codecs(packet_codec) - self.queue_params = get_queue_params(packet_codec) + message_codecs = get_message_codecs(packet_codec) + queue_params = get_queue_params(packet_codec) # Convert SRC IDs to integers, since yaml dict keys have to be strings try: src_names: dict = {int(k): v for k, v in packet_codec.get('src_names', {}).items()} except Exception as e: rospy.logerr(f"Error parsing src_names parameter: {e}") - self.message_publishers: Dict[QueueId, rospy.Publisher] = {} - for queue_id, params in self.queue_params.items(): - if params.publish_topic: - if ("{src}" in params.publish_topic) or ("{name}" in params.publish_topic): - # In this case, we will have a dict of message publishers that correspond to different SRC IDs - for src, name in src_names.items(): - pub_name = params.publish_topic.format(src=src, name=name) - pub_name = rospy.names.canonicalize_name(pub_name) - if queue_id not in self.message_publishers.keys(): - self.message_publishers[queue_id] = {} - self.message_publishers[queue_id][src] = rospy.Publisher(pub_name, params.msg_class, - queue_size=10) - else: - pub_name = rospy.names.canonicalize_name(params.publish_topic) - self.message_publishers[queue_id] = rospy.Publisher( - pub_name, params.msg_class, queue_size=10) - - packet_codec.pop('message_codecs') - try: + self.message_publishers = {} + for queue_id, params in queue_params.items(): + if not params.publish_topic: + continue + + if ("{src}" in params.publish_topic) or ("{name}" in params.publish_topic): + # In this case, we will have a dict of message publishers that correspond to different SRC IDs + for src, name in src_names.items(): + pub_name = params.publish_topic.format(src=src, name=name) + pub_name = rospy.names.canonicalize_name(pub_name) + if queue_id not in self.message_publishers.keys(): + self.message_publishers[queue_id] = {} + self.message_publishers[queue_id][src] = rospy.Publisher(pub_name, params.msg_class, + queue_size=10) + else: + pub_name = rospy.names.canonicalize_name(params.publish_topic) + self.message_publishers[queue_id] = rospy.Publisher( + pub_name, params.msg_class, queue_size=10) + + if 'message_codecs' in packet_codec: + packet_codec.pop('message_codecs') + if 'src_names' in packet_codec: packet_codec.pop('src_names') - except: - pass + entry = DecoderListEntry(**packet_codec) self.decoder_list.append(entry) rospy.loginfo("Added decoder: {}".format(entry)) # replace name with instance of packet codec # ToDO make this less ugly and stupid - entry.packet_codec = packet_codecs[entry.packet_codec].PacketCodec(self.message_codecs, - message_publishers=self.message_publishers, - **packet_codec) + entry.packet_codec = (packet_codecs[entry.packet_codec]. + PacketCodec(message_codecs, + message_publishers=self.message_publishers, + **packet_codec)) - rospy.loginfo("Receive packet codec initialized with {} message codecs.".format(len(self.message_codecs))) + rospy.loginfo("Receive packet codec initialized with {} message codecs.".format(len(message_codecs))) def on_packet_rx(self, received_packet: ReceivedPacket) -> None: rospy.loginfo("Got incoming packet") @@ -157,17 +208,23 @@ class PacketDispatchNode(object): for d in matching_codecs: try: - d.decode_packet(received_packet) + if self.pass_thrus is not None: + d.decode_packet(received_packet, pass_thrus=self.pass_thrus) + else: + d.decode_packet(received_packet) except KeyError: rospy.logwarn("Unrecognized packet decoder: {}".format(d)) except Exception as e: rospy.logerr(f'ERROR Decoding Packet: {(e,)}', exc_info=True) + if __name__ == '__main__': + _arg1 = argv[1].upper() if len(argv) > 1 else None + _log_level = _arg1 if _arg1 is not None and _arg1 in ('INFO', 'WARN', 'ERROR') else 'DEBUG' try: - node = PacketDispatchNode() + node = PacketDispatchNode(logger_level=_log_level) rospy.loginfo("Message Dispatch shutdown") except rospy.ROSInterruptException: node.close() - rospy.loginfo("Message Dispatch shutdown (interrupt)") \ No newline at end of file + rospy.loginfo("Message Dispatch shutdown (interrupt)") diff --git a/ros_acomms_tests/assets/test_gateway_dispatcher_diagram.png b/ros_acomms_tests/assets/test_gateway_dispatcher_diagram.png new file mode 100644 index 0000000000000000000000000000000000000000..3241df49b6412492dab595a8f04779ffe056bb01 Binary files /dev/null and b/ros_acomms_tests/assets/test_gateway_dispatcher_diagram.png differ diff --git a/ros_acomms_tests/launch/test_gateway_dispatcher.launch b/ros_acomms_tests/launch/test_gateway_dispatcher.launch new file mode 100644 index 0000000000000000000000000000000000000000..67f7c9c54281ad54e3d0ab7157d76bd9080ffd49 --- /dev/null +++ b/ros_acomms_tests/launch/test_gateway_dispatcher.launch @@ -0,0 +1,131 @@ +<launch> + <env name="ROSCONSOLE_FORMAT" value="[${severity}] [${time}] [${node}]: ${message}"/> + + <param name="use_sim_time" value="true"/> + + <group ns="modem0"> + + <node name="chat_node" pkg="ros_acomms_tests" type="chat_node.py" output="screen" respawn="true" respawn_delay="20"> + <param name="chat_timer" value="120" type="int" /> + </node> + + <node name="another_chat_node" pkg="ros_acomms_tests" type="chat_node.py" output="screen" respawn="true" respawn_delay="20"> + <param name="chat_timer" value="60" type="int" /> + <param name="chat_topic" value="another/chat" type="str" /> + </node> + + <node name="modem_sim_node" pkg="ros_acomms_modeling" type="modem_sim_node.py" output="screen" > + <param name="center_frequency_hz" value="10000" type="int" /> + <param name="bandwidth_hz" value="5000" type="int" /> + <param name="SRC" value="0" type="int" /> + </node> + + <node name="tdma" pkg="ros_acomms" type="tdma_node.py" output="screen" required="true" respawn="false"> + <param name="num_slots" value="2"/> + <param name="active_slots" value="0"/> + <param name="slot_duration_seconds" value="30" /> + </node> + + <node name="message_queue_node" pkg="ros_acomms" type="message_queue_node.py" output="screen" > + <param name="packet_codec_file" value="$( find ros_acomms_tests )/launch/test_gateway_dispatcher_config.yaml"/> + <param name="codec_directory" value="$( find ros_acomms_tests )/launch/"/> + </node> + + <node name="packet_dispatch" pkg="ros_acomms" type="packet_dispatch_node.py" output="screen" > + <param name="packet_codec_file" value="$( find ros_acomms_tests )/launch/test_gateway_dispatcher_config.yaml"/> + <param name="codec_directory" value="$( find ros_acomms_tests )/launch/"/> + </node> + + <node name="modem_location_sim" pkg="ros_acomms_modeling" type="modem_location_sim_node.py" /> + </group> + + <group ns="modem1"> + <node name="modem_sim_node" pkg="ros_acomms_modeling" type="modem_sim_node.py" output="screen" > + <param name="center_frequency_hz" value="10000" type="int" /> + <param name="bandwidth_hz" value="5000" type="int" /> + <param name="SRC" value="1" type="int" /> + </node> + + <node name="tdma" pkg="ros_acomms" type="tdma_node.py" output="screen" required="true" respawn="false"> + <param name="num_slots" value="2"/> + <param name="active_slots" value="1"/> + <param name="slot_duration_seconds" value="30" /> + </node> + + <node name="message_queue_node" pkg="ros_acomms" type="message_queue_node.py" output="screen" > + <param name="packet_codec_file" value="$( find ros_acomms_tests )/launch/test_gateway_dispatcher_config.yaml"/> + <param name="codec_directory" value="$( find ros_acomms_tests )/launch/"/> + </node> + + <node name="packet_dispatch" pkg="ros_acomms" type="packet_dispatch_node.py" output="screen" > + <param name="packet_codec_file" value="$( find ros_acomms_tests )/launch/test_gateway_dispatcher_config.yaml"/> + <param name="codec_directory" value="$( find ros_acomms_tests )/launch/"/> + <rosparam param="pass_thru_codec_ids"> + - 0 + - 42 + </rosparam> + <param name="pass_thru_topic" value="/modem1/iridium/pass_thru" type="str"/> + <param name="gateway_only_mode" value="true" type="bool"/> + </node> + + <node name="modem_location_sim" pkg="ros_acomms_modeling" type="modem_location_sim_node.py" /> + + <group ns="iridium"> + <node name="modem_sim_node" pkg="ros_acomms_modeling" type="modem_sim_node.py" output="screen" > + <param name="center_frequency_hz" value="25000" type="int" /> + <param name="bandwidth_hz" value="5000" type="int" /> + <param name="SRC" value="2" type="int" /> + </node> + + <node name="tdma" pkg="ros_acomms" type="tdma_node.py" output="screen" required="true" respawn="false"> + <param name="num_slots" value="2"/> + <param name="active_slots" value="0"/> + <param name="slot_duration_seconds" value="30" /> + </node> + + <node name="message_queue_node" pkg="ros_acomms" type="message_queue_node.py" output="screen" > + <param name="packet_codec_file" value="$( find ros_acomms_tests )/launch/test_gateway_dispatcher_config.yaml"/> + <param name="codec_directory" value="$( find ros_acomms_tests )/launch/"/> + </node> + + <node name="modem_location_sim" pkg="ros_acomms_modeling" type="modem_location_sim_node.py" /> + + </group> <!-- iridium --> + </group> <!-- modem1 --> + + <group ns="modem2"> + <node name="modem_sim_node" pkg="ros_acomms_modeling" type="modem_sim_node.py" output="screen" > + <param name="center_frequency_hz" value="25000" type="int" /> + <param name="bandwidth_hz" value="5000" type="int" /> + <param name="SRC" value="3" type="int" /> + </node> + + <node name="tdma" pkg="ros_acomms" type="tdma_node.py" output="screen" required="true" respawn="false"> + <param name="num_slots" value="2"/> + <param name="active_slots" value="1"/> + <param name="slot_duration_seconds" value="30" /> + </node> + + <node name="packet_dispatch" pkg="ros_acomms" type="packet_dispatch_node.py" output="screen" > + <param name="packet_codec_file" value="$( find ros_acomms_tests )/launch/test_gateway_dispatcher_config.yaml"/> + <param name="codec_directory" value="$( find ros_acomms_tests )/launch/"/> + + <!-- if isinstance(payload, ReceivedPacket), continue decoding payload; else don't --> + <param name="decode_meta_packets" value="true" type="bool"/> +<!-- <param name="decode_meta_packets" value="false" type="bool"/> --> + + </node> + + <node name="modem_location_sim" pkg="ros_acomms_modeling" type="modem_location_sim_node.py" /> + </group> + + <node name="sim_packet_performance_node" pkg="ros_acomms_modeling" type="sim_packet_performance_node.py" /> + + <node name="sim_transmission_loss_node" pkg="ros_acomms_modeling" type="sim_transmission_loss_node.py" > + <param name="water_depth" value="20" type="int" /> + <param name="bellhop_arrivals" value="false" type="bool" /> + </node> + + <node name="clock_generator" pkg="ros_acomms_modeling" type="clock_generator.py" /> + +</launch> diff --git a/ros_acomms_tests/launch/test_gateway_dispatcher_config.yaml b/ros_acomms_tests/launch/test_gateway_dispatcher_config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..fdc9a3b02df52d183b442a5e323bf733963158d6 --- /dev/null +++ b/ros_acomms_tests/launch/test_gateway_dispatcher_config.yaml @@ -0,0 +1,210 @@ +- codec_name: default + match_src: [] + match_dest: [] + except_src: [] + except_dest: [] + packet_codec: ros + miniframe_header: [0xac] + dataframe_header: [] + remove_headers: true + src_names: + "0": 'modem0' + "1": 'modem1' + "2": 'modem1_ird' + "3": 'modem2' + "999": 'unknown' + message_codecs: + + - id: 0 + subscribe_topic: "chat" + publish_topic: "chat/{name}" + ros_type: "std_msgs/String" + default_dest: 121 + queue_order: fifo + queue_maxsize: 1 + allow_fragmentation: false + priority: 40 + fields: + data: + codec: string + max_length: 100 + + - id: 1 + subscribe_topic: "another/chat" + publish_topic: "another/chat/{name}" + ros_type: "std_msgs/String" + default_dest: 121 + queue_order: fifo + queue_maxsize: 1 + allow_fragmentation: false + priority: 30 + fields: + data: + codec: string + max_length: 100 + + - id: 2 + message_codec: default + subscribe_topic: "sst" + publish_topic: "{name}/sst" + ros_type: "ros_acomms_msgs/SST" + default_dest: 121 + queue_order: lifo + queue_maxsize: 5 + allow_fragmentation: false + priority: 40 + fields: + time: + codec: rostime + precision: 3 + + in_water_spl_dB: + codec: float + min_value: 0 + max_value: 300 + precision: 3 + + detector: + codec: integer + min_value: 0 + max_value: 10 + + num_samples: + codec: uint16 + + summary_min: + codec: float + min_value: 0 + max_value: 300 + precision: 3 + + summary_lower_quartile: + codec: float + min_value: 0 + max_value: 300 + precision: 3 + + summary_median: + codec: float + min_value: 0 + max_value: 300 + precision: 3 + + summary_upper_quartile: + codec: float + min_value: 0 + max_value: 300 + precision: 3 + + summary_max: + codec: float + min_value: 0 + max_value: 300 + precision: 3 + + summary_len: + codec: uint16 + + - id: 3 + message_codec: default + subscribe_topic: "pass_thru" +# publish_topic: "null" + publish_topic: "packet_rx" + ros_type: "ros_acomms_msgs/ReceivedPacket" + default_dest: 121 + queue_order: lifo + queue_maxsize: 1 + allow_fragmentation: false + priority: 90 + fields: + header: + codec: msg + ros_type: "std_msgs/Header" + fields: + stamp: + codec: rostime + epoch_end: 1924952399 + packet: + codec: msg + ros_type: "ros_acomms_msgs/Packet" + fields: + src: + codec: uint16 + min_value: 0 + max_value: 300 + dest: + codec: uint16 + min_value: 0 + max_value: 300 + packet_type: + codec: uint8 + min_value: 0 + max_value: 255 + miniframe_rate: + codec: uint8 + min_value: 0 + max_value: 255 + dataframe_rate: + codec: uint8 + min_value: 0 + max_value: 255 + miniframe_bytes: + codec: variable_len_array + element_type: uint8 + max_length: 100 + dataframe_bytes: + codec: variable_len_array + element_type: uint8 + max_length: 200 + cst: + codec: msg + ros_type: "ros_acomms_msgs/CST" + fields: + mfd_pow: + codec: integer + min_value: 0 + max_value: 1000 + mfd_ratio: + codec: integer + min_value: 0 + max_value: 1000 + mfd_spl: + codec: integer + min_value: 0 + max_value: 1000 + agn: + codec: integer + min_value: 0 + max_value: 1000 + src: + codec: integer + min_value: 0 + max_value: 255 + snr_rss: + codec: integer + min_value: 0 + max_value: 1000 + snr_in: + codec: float + precision: 2 + min_value: -1500 + max_value: 1500 + snr_out: + codec: float + precision: 2 + min_value: -1500 + max_value: 1500 + snr_sym: + codec: float + precision: 2 + min_value: -1500 + max_value: 1500 + mse: + codec: float + precision: 2 + min_value: -1500 + max_value: 1500 + dqf: + codec: integer + min_value: -1000 + max_value: 1000 diff --git a/ros_acomms_tests/src/chat_node.py b/ros_acomms_tests/src/chat_node.py new file mode 100755 index 0000000000000000000000000000000000000000..11e559cca04c174dc61a0796601e4626cde926c3 --- /dev/null +++ b/ros_acomms_tests/src/chat_node.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python3 +"""Publish a std_msgs/String message to the 'chat' topic every X minutes.""" + +# import builtins +from datetime import datetime + +# import externals +import rospy + +# import locals +from std_msgs.msg import String + + +class Chatter(object): + def __init__(self): + rospy.init_node('chat_node') + + _chat_timer = rospy.get_param('~chat_timer', 120) + _chat_topic = rospy.get_param('~chat_topic', 'chat') + + self.chat_pub = rospy.Publisher(_chat_topic, String, queue_size=10) + + rospy.Timer(rospy.Duration(_chat_timer), self.on_chat_timer) + + rospy.loginfo(f'Chat Node Initialized [{_chat_topic}]') + rospy.spin() + + def on_chat_timer(self, event: rospy.timer.TimerEvent = None) -> None: + _now = rospy.get_time() + now = datetime.utcfromtimestamp(_now).strftime('%Y-%m-%d %H:%M:%S.%f') + + chat_string = f'{now} Hello World! {_now}' + rospy.loginfo(f"Publishing '{chat_string}' to [{self.chat_pub.resolved_name}]") + + self.chat_pub.publish(String(data=chat_string)) + + +if __name__ == "__main__": + try: + Chatter() + except rospy.ROSInterruptException: + pass + finally: + rospy.loginfo('Data Collection Chatter Node Terminated')