diff --git a/ros_acomms/src/acomms_codecs/ros_packet_codec.py b/ros_acomms/src/acomms_codecs/ros_packet_codec.py index 3aece9784efe49002273dc20686897e583e6ab8b..9b906a7539408473d51bd948f86504dd61eac6c2 100644 --- a/ros_acomms/src/acomms_codecs/ros_packet_codec.py +++ b/ros_acomms/src/acomms_codecs/ros_packet_codec.py @@ -39,6 +39,7 @@ class PacketCodec(BasePacketCodec): self.minimum_fragment_size_bits = minimum_fragment_size_bits self.header_size_bits = 8 + self.codec_overhead_bits = self.header_size_bits + len(miniframe_header) * 8 + len(dataframe_header) * 8 self.incoming_fragmented_messages = dict() diff --git a/ros_acomms/src/acomms_driver_node.py b/ros_acomms/src/acomms_driver_node.py index ecc2f47f802971ab3c050999febfefcf4ce8a498..9533a1514e4df69ce90d335a9e128df7dba12c24 100755 --- a/ros_acomms/src/acomms_driver_node.py +++ b/ros_acomms/src/acomms_driver_node.py @@ -37,12 +37,13 @@ from ros_acomms_msgs.msg import ( PingTranspondersReply, ReceivedPacket, ) +from ros_acomms_msgs.srv import QueryModemParam, QueryModemParamRequest, QueryModemParamResponse from ros_acomms_msgs.srv import PingModem, PingModemResponse from ros_acomms_msgs.srv import PingTransponders, PingTranspondersResponse from ros_acomms_msgs.srv import QueueTxPacket, QueueTxPacketResponse +from ros_acomms_msgs.srv import QueryModemSrcRequest, QueryModemSrcResponse, QueryModemSrc from ros_acomms.cfg import acomms_driverConfig from std_msgs.msg import Header, String, Time - from version_node import version_node # Import NodeStatus if it is available @@ -130,6 +131,7 @@ class AcommsDriverNode(object): self.use_janus_packets = rospy.get_param("~use_janus_packets", False) self.use_tdp = rospy.get_param("~use_tdp", False) + rospy.loginfo(f'Params\n{rospy.get_param("~")}') self.pending_packets = deque() # Figure out current log level of this rosnode to pass to pyacomms @@ -229,6 +231,16 @@ class AcommsDriverNode(object): "queue_tx_packet", QueueTxPacket, self.handle_queue_tx_packet ) + # Can be used to get the SRC address of the modem, and also serves as a mechanism to block until the acomms + # driver is up + self.query_modem_src_service = rospy.Service( + "query_modem_src", QueryModemSrc, self.handle_query_modem_src + ) + + self.query_modem_param_service = rospy.Service( + "query_modem_param", QueryModemParam, self.handle_query_modem_param + ) + self.reconfigure_server = DynamicReconfigureServer( acomms_driverConfig, self.reconfigure ) @@ -241,6 +253,10 @@ class AcommsDriverNode(object): cycle_timeout_factor = self.um.config_data['CTO'] * missed_checkin_factor return True if secs_since_checkin < cycle_timeout_factor else False + def handle_query_modem_src(self, query: QueryModemSrcRequest) -> QueryModemSrcResponse: + response = QueryModemSrcResponse(src=self.um.id) + return response + def on_modem_msg(self, parsed_obj: Union[Message, Ack, Casst, NavStatistics, NavTurnTimes, NavPingDigitalTransponder, NavPingGenericTransponder], msg: Message = None) -> None: @@ -475,6 +491,24 @@ class AcommsDriverNode(object): return response + def handle_query_modem_param(self, req: QueryModemParamRequest) -> QueryModemParamResponse: + """handler for rospy QueryModemParam service class + Returns: + QueryModemParamResponse.value <str> : value of the requested parameter + - value is of type <str> because uModem param values can be combination of + - <int>, <str>, <dict> (from Micromodem.get_config call), etc + caller must know what type of data to expect in response + """ + rospy.logdebug("Requesting modem param: {}".format(req.param)) + try: + param_dict = self.um.get_config(req.param) + if param_dict is None: + raise rospy.ServiceException(f"Unable to Execute Service query_modem_param (param returned None)") + except KeyError as e: + raise rospy.ServiceException(f"Unable to Execute Service query_modem_param: {e}") + v = str(param_dict) if len(param_dict) > 1 else str(param_dict[req.param]) + return QueryModemParamResponse(value=v) + def handle_ping_modem(self, request): """handler for rospy PingModem service class diff --git a/ros_acomms/src/link_layer_feedback_node.py b/ros_acomms/src/link_layer_feedback_node.py index 86ab3955bf5eab6312427f40b0d539cd00afdb68..a2d58007456e84dea2ab6f61226b46c3178f1cbb 100755 --- a/ros_acomms/src/link_layer_feedback_node.py +++ b/ros_acomms/src/link_layer_feedback_node.py @@ -45,6 +45,7 @@ class LinkLayerFeedbackNode: if not self.last_sst: rospy.logwarn_throttle(10, "No SST information is available, not saving CST") + return # We track the most recent CSTs associated with each sender # Only update if the header decoded (so we have a src address) and it's a PSK packet (3, 4, or 5) @@ -82,6 +83,10 @@ class LinkLayerFeedbackNode: # encode it message_bits, message_metadata = self.link_stats_feedback_codec.encode(new_feedback_msg) + # Make sure it is small enough + if len(message_bits) > request.max_size_in_bits: + return GetNextQueuedMessageResponse(has_message=False) + return GetNextQueuedMessageResponse(has_message=True, message_id=1, dest_address=self.destination_address, diff --git a/ros_acomms/src/mac_utils.py b/ros_acomms/src/mac_utils.py index 5e964158367f356cc09325a582fc8c850795c8de..67a957543d73b7a862381e5b3abf7b65da381e57 100755 --- a/ros_acomms/src/mac_utils.py +++ b/ros_acomms/src/mac_utils.py @@ -118,7 +118,12 @@ class ManualTransmitRequestQueue(object): msg = None try: msg = self.next_manual_transmit.subscriber_queue.popleft() - self.next_manual_transmit.publisher.publish(msg) + # handle multiple lines of nmea if ganged together by newlines + lines = msg.data.split('\n') + # even if there are no newlines, this will publish the single command + for line in lines: + if line: + self.next_manual_transmit.publisher.publish(data=line) except: # deque is empty self.next_manual_transmit = None @@ -152,7 +157,7 @@ class ManualTransmitRequestQueue(object): self.topic_map[topic] = ManualTransmitRequest( subscriber_queue=deque(maxlen=maxlen), - publisher=rospy.Publisher(topic, msg_type, queue_size=1), + publisher=rospy.Publisher(topic, msg_type, queue_size=10), publish_topic=topic, subscribe_topic=self.get_tdma_ns_topic(topic=topic), ) diff --git a/ros_acomms/src/message_queue_node.py b/ros_acomms/src/message_queue_node.py index 145f4a86a69a72f3d1c625dc7c3c63d0863cf4b5..4818419342bda93b2f88ad4875c17105026840fb 100755 --- a/ros_acomms/src/message_queue_node.py +++ b/ros_acomms/src/message_queue_node.py @@ -16,6 +16,7 @@ from datetime import datetime, timedelta import time from ros_acomms_msgs.srv import GetNextPacketData, GetNextPacketDataRequest, GetNextPacketDataResponse from ros_acomms_msgs.srv import GetNextQueuedMessage, GetNextQueuedMessageRequest, GetNextQueuedMessageResponse +from ros_acomms_msgs.srv import QueryModemSrc, QueryModemSrcRequest, QueryModemSrcResponse from ros_acomms_msgs.msg import UpdateQueuedMessage from ros_acomms_msgs.msg import (EncodedAck, NeighborReachability, QueueEnable, QueuePriority, QueueStatus, QueueSummary, SummaryMessageCount) @@ -88,6 +89,12 @@ class QueuedMessage: rospy.logdebug("Getting next bits up to: {0}".format(size_in_bits)) rospy.logdebug("Self lengthbits: {0}".format(self.length_bits)) + if self.length_bits > size_in_bits: + rospy.logwarn(f"Got {self.length_bits} bits, but we can only fit {size_in_bits}") + + if self.length_bits == 0: + rospy.logerr("Message payload length is 0, which shouldn't happen") + if self.length_bits <= size_in_bits: message_bits = self.payload_bits message_header = self.queue.message_codec_id @@ -271,7 +278,18 @@ class MessageQueueNode(object): self.default_priority = rospy.get_param('~default_priority', 10) src_param_name = rospy.search_param('src') - self.src = rospy.get_param(src_param_name) + if src_param_name: + self.src = rospy.get_param(src_param_name) + else: + rospy.loginfo("src parameter was not found. Querying src ID from modem... (Requires that modem driver or sim node is up)") + try: + rospy.wait_for_service('query_modem_src') + query_modem_src = rospy.ServiceProxy('query_modem_src', QueryModemSrc) + src_response: QueryModemSrcResponse = query_modem_src() + self.src = src_response.src + except rospy.ServiceException as e: + rospy.logerr("Error calling query_modem_src service. Setting src to -1, which may break packet codecs.") + self.src = -1 try: version = version_node() @@ -327,13 +345,13 @@ class MessageQueueNode(object): rospy.loginfo(f'Loading Custom Codecs...') try: load_custom_codecs() - except Exceptions as e: + except Exception as e: rospy.logwarn(f'WARNING: Problem Loading Custom Codecs: {(e,)}', exc_info=True) rospy.loginfo(f'Reading Packet Codecs...') try: packet_codec_param = read_packet_codec_yaml() - except Exceptions as e: + except Exception as e: rospy.logwarn(f'WARNING: Problem Reading Packet Codecs: {(e,)}', exc_info=True) packet_codec_param = [] @@ -582,9 +600,11 @@ class MessageQueueNode(object): packet_codec: Optional[str] = None, check_reachability: bool = True, minimum_priority: int = None, - exclude_messages: Optional[List[QueuedMessage]] = []) -> Optional[ - tuple[str, int]]: + exclude_messages: Optional[List[QueuedMessage]] = None) -> tuple[ + Optional['DynamicQueue'], Optional[int]]: + if exclude_messages is None: + exclude_messages = [] start_time = time.time() # for profiling for dynamic_queue in self.dynamic_queues.values(): dynamic_queue_name = dynamic_queue.dynamic_message_service @@ -634,18 +654,22 @@ class MessageQueueNode(object): rospy.logdebug(f"Dynamic queue query took {time.time()-start_time} s") # Now, figure out what the highest priority dynamic message is, if any, - # .. and handle gating if minimum_priority is passed + # and handle gating if minimum_priority is passed priorities = {} for queue, response in self.current_packet_dynamic_queue_cache.items(): if not response.has_message: continue + if response.data_size_in_bits > max_size_in_bits: + rospy.logerr(f"Queue response from {queue} has size {response.data_size_in_bits}, larger than max {max_size_in_bits}. Skipping.") + continue + if minimum_priority is not None: # check if this response priority meet min if response.priority < minimum_priority: rospy.logdebug_throttle(1, f"dynamic_queue: response.priority is < minimum_priority({minimum_priority}). response.priority: ({response.priority})") continue - + priorities[queue] = response.priority if len(priorities) > 0: diff --git a/ros_acomms/src/tdma_node.py b/ros_acomms/src/tdma_node.py index 460bfa63a32017cf3625481838591d896eece097..e4b4b7144eacae28cc09971799e4c894dd7977db 100755 --- a/ros_acomms/src/tdma_node.py +++ b/ros_acomms/src/tdma_node.py @@ -49,6 +49,7 @@ class TdmaMacNode(object): self.manual_transmit_queue = ManualTransmitRequestQueue(maxlen=rospy.get_param('~maxlen_manual_queue', 1)) # this is how sub classes should add manual transmit topics in their init self.manual_transmit_queue.add_manual_transmit_topic(topic='nmea_to_modem', msg_type=String) + self.allow_manual_tx_during_priority_slots = rospy.get_param('~allow_manual_tx_during_priority_slots', False) if not subclass: rospy.loginfo("~*+ tdma_mac node running.") @@ -182,7 +183,7 @@ class TdmaMacNode(object): if req_max_bytes is None: # invalid value. Will throw an exception shutting down this node if dataframe: err = f'Must set valid maximum_dataframe_bytes for rate {rate}. ' - else: err += f'Must set valid maximum_miniframe_bytes for rate {rate}. ' + else: err = f'Must set valid maximum_miniframe_bytes for rate {rate}. ' err += f'For rates other than [1,3,5] there is no default. Using non FDP rate requires passing max_bytes!' rospy.logfatal(err) raise rospy.ROSInterruptException(err) @@ -260,10 +261,8 @@ class TdmaMacNode(object): # wrap around calculation, current_slot is after slot of interest # Need remaining seconds in cycle. then add seconds until slot of interest remaining_time_to_slot += self.slot_duration_seconds * ((self.num_slots - 1) - current_slot) - if slot > 0: - # slot 0 is a special case. We don't add this additional slot_duration because, - # .. we have the partial remaining_slot_seconds added initially - remaining_time_to_slot += self.slot_duration_seconds + # now add the slot cycles between 0 and the slot of interest (if slot=0, we don't add any time of course) + remaining_time_to_slot += self.slot_duration_seconds * slot return remaining_time_to_slot @@ -350,12 +349,14 @@ class TdmaMacNode(object): continue def send_next_packet(self, insert_at_head=False, minimum_priority=None): - # this will not send a manual request when minimum_priority is passed! - if minimum_priority is None and self.manual_transmit_queue: - msg = self.manual_transmit_queue.handle_active_queue() - if msg is not None: - rospy.logwarn(f'NOTICE: Sent manual transmit request instead of packet: {msg}') - return 0 + # this will not send a manual request when minimum_priority is passed + # .. unless ~allow_manual_tx_during_priority_slots is set to True (defaults to False) + if minimum_priority is None or self.allow_manual_tx_during_priority_slots: + if self.manual_transmit_queue: + msg = self.manual_transmit_queue.handle_active_queue() + if msg is not None: + rospy.logwarn(f'NOTICE: Sent manual transmit request instead of packet: {msg}') + return 0 # TODO: handle other packet sizes packet_data_response = self.get_next_packet_data(num_miniframe_bytes=self.maximum_miniframe_bytes, diff --git a/ros_acomms_modeling/src/modem_sim_node.py b/ros_acomms_modeling/src/modem_sim_node.py index 0fee80926030a3262804e20eca1fc99784c841ed..2be7356a5703aa48aa53ccf206566e7a10e28204 100755 --- a/ros_acomms_modeling/src/modem_sim_node.py +++ b/ros_acomms_modeling/src/modem_sim_node.py @@ -4,7 +4,8 @@ import roslib.message from rospy import AnyMsg from std_msgs.msg import Header, Float32 from ros_acomms_msgs.msg import PingReply, CST, XST, TransmittedPacket, ReceivedPacket, Packet, SST -from ros_acomms_msgs.srv import PingModem, PingModemResponse, QueueTxPacket, QueueTxPacketResponse +from ros_acomms_msgs.srv import PingModem, PingModemResponse, QueueTxPacket, QueueTxPacketResponse, \ + QueryModemSrc, QueryModemSrcRequest, QueryModemSrcResponse from ros_acomms_modeling.msg import SimPacket from ros_acomms_modeling.srv import SimTransmissionLoss, SimTransmissionLossRequest, SimPacketPerformance, \ SimPacketPerformanceRequest, SimPacketPerformanceResponse, SimTravelTime, SimTravelTimeRequest, SimTravelTimeResponse @@ -241,11 +242,21 @@ class ModemSimNode(object): # '/acoustic_channel', SimPacket, self.on_acoustic_channel_sim_packet, tcp_nodelay=True) '/acoustic_channel', SimPacket, self.on_acoustic_channel_sim_packet) + # Can be used to get the SRC address of the modem, and also serves as a mechanism to block until the acomms + # simulator is up + self.query_modem_src_service = rospy.Service( + "query_modem_src", QueryModemSrc, self.handle_query_modem_src + ) + rospy.loginfo("Modem sim node running{}. SRC={}, FC={} Hz, BW={} Hz".format( " with sim ticks active" if self.use_sim_tick else "", self.src, self.fc, self.bw)) rospy.spin() + def handle_query_modem_src(self, query: QueryModemSrcRequest) -> QueryModemSrcResponse: + response = QueryModemSrcResponse(src=self.src) + return response + def on_ambient_noise(self, msg: Float32): self.ambient_noise = msg.data diff --git a/ros_acomms_msgs/CMakeLists.txt b/ros_acomms_msgs/CMakeLists.txt index 52cc89a23df8ac738cb63fe8e6f6bee7443073ea..c0c5e535f3696dae0219e4f3531a79a0928d52b7 100644 --- a/ros_acomms_msgs/CMakeLists.txt +++ b/ros_acomms_msgs/CMakeLists.txt @@ -80,13 +80,14 @@ add_message_files(FILES XST.msg ) - add_service_files(FILES GetNextPacketData.srv GetNextQueuedMessage.srv PingModem.srv PingTransponders.srv + QueryModemParam.srv QueueTxPacket.srv + QueryModemSrc.srv ) ## Generate actions in the 'action' folder diff --git a/ros_acomms_msgs/srv/QueryModemParam.srv b/ros_acomms_msgs/srv/QueryModemParam.srv new file mode 100644 index 0000000000000000000000000000000000000000..8e1f6239101d3bd924db99b52f94bf7271cd4bd1 --- /dev/null +++ b/ros_acomms_msgs/srv/QueryModemParam.srv @@ -0,0 +1,3 @@ +string param +--- +string value diff --git a/ros_acomms_msgs/srv/QueryModemSrc.srv b/ros_acomms_msgs/srv/QueryModemSrc.srv new file mode 100644 index 0000000000000000000000000000000000000000..44865360752f721d5e520c81e0c0832df312172f --- /dev/null +++ b/ros_acomms_msgs/srv/QueryModemSrc.srv @@ -0,0 +1,4 @@ +# Request has no parameters +--- +# Response +uint8 src \ No newline at end of file diff --git a/ros_acomms_tests/src/test_tdma_extended.py b/ros_acomms_tests/src/test_tdma_extended.py index 8fc0a442c16ec3973c13910f4f74d5cbe015a07e..0d6403136b811cecce65d8bf1f461043dd1f10fb 100644 --- a/ros_acomms_tests/src/test_tdma_extended.py +++ b/ros_acomms_tests/src/test_tdma_extended.py @@ -175,7 +175,7 @@ class TestTdmaExtended: self.logger.info(f'Software Mute:: Waiting for msg on /modem3/from_acomms/test_msg, timeout=20') self.logger.info(f'Software Mute:: Attempt {num_timed_out} of {max_timed_out_attempts}') ret = rospy.wait_for_message( - "/modem3/from_acomms/test_msg", UInt8, timeout=100) + "/modem3/from_acomms/test_msg", UInt8, timeout=20) assert ret.data != 123, f'Software mute failed on modem2, test_msg: {ret} was still transmitted!' except rospy.exceptions.ROSException: num_timed_out += 1 @@ -193,6 +193,8 @@ class TestTdmaExtended: self.logger.info(f'Testing Always Send Test Data data in miniframes') max_timed_out_attempts = 2 num_timed_out = 0 + # sometimes we are so fast we get the last packet sent with all zeros. Only allow this on the first + got_prior_packet = False queue_publisher = rospy.Publisher('/modem3/test_msg', UInt8, queue_size=5) time.sleep(2) @@ -209,10 +211,21 @@ class TestTdmaExtended: ret = rospy.wait_for_message( "/modem0/packet_rx", ReceivedPacket, timeout=100) if ret.packet.src == 3: - assert any(ret.packet.miniframe_bytes), f'Packet from modem3 is all zeros for miniframe_bytes {ret.packet.miniframe_bytes}. Published test_msg not transmitted' - assert ret.packet.miniframe_bytes[-64:] == bytes([0] * 64), f'Packet from modem3 should have 64 zero padded bits at the end {ret.packet.miniframe_bytes}.' - self.logger.info(f'Always Send Test Data miniframes:: Got valid zero padded packet with 64 zero padded bytes at end of miniframe_bytes from modem3! {ret.packet} ') - break + try: + assert any(ret.packet.miniframe_bytes), f'Packet from modem3 is all zeros for miniframe_bytes {ret.packet.miniframe_bytes}. Published test_msg not transmitted' + assert ret.packet.miniframe_bytes[-64:] == bytes([0] * 64), f'Packet from modem3 should have 64 zero padded bits at the end {ret.packet.miniframe_bytes}.' + except AssertionError: + # if we have not already gotten a prior message, re-raise + if not got_prior_packet and num_timed_out == 0: + # we allow 1 old packet so we can avoid watching tdma status + got_prior_packet = True + self.logger.info(f'Always Send Test Data miniframes:: got prior packet from modem3.. allowing once.') + continue + else: + raise + else: + self.logger.info(f'Always Send Test Data miniframes:: Got valid zero padded packet with 64 zero padded bytes at end of miniframe_bytes from modem3! {ret.packet} ') + break except rospy.exceptions.ROSException: num_timed_out += 1 self.logger.info(f'Always Send Test Data miniframes:: Timed-out waiting for msg on /modem0/packet_rx')