From 823da935f9afccb40273f9049a15736fb213c235 Mon Sep 17 00:00:00 2001 From: Brennan <brennanmk2200@gmail.com> Date: Wed, 29 Sep 2021 16:49:20 -0400 Subject: [PATCH 1/4] Created Queue DB and refactored message_queue_node --- .gitignore | 1 + database/controllers/queueController.py | 218 ++++++++++++++ database/databases/queue.db | Bin 0 -> 57344 bytes src/message_queue_node.py | 369 ++++++++++-------------- src/packet_dispatch_node.py | 2 +- 5 files changed, 379 insertions(+), 211 deletions(-) create mode 100644 database/controllers/queueController.py create mode 100644 database/databases/queue.db diff --git a/.gitignore b/.gitignore index 79f33fdd..c0706b96 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ /src/__pycache__ /src/acomms_codecs/__pycache__ /src/acomms_codecs/field_codecs/__pycache__ +.vscode/settings.json diff --git a/database/controllers/queueController.py b/database/controllers/queueController.py new file mode 100644 index 00000000..3843ca3e --- /dev/null +++ b/database/controllers/queueController.py @@ -0,0 +1,218 @@ +from time import time +from typing import Optional +from xmlrpc.client import Boolean +from peewee import * +import os +import rospkg +import json +from rospy_message_converter import message_converter +import rospy + +rospack = rospkg.RosPack() + +DB_PATH = os.path.join(rospack.get_path('ros_acomms'), + 'src/database/databases/queue.db') +db = SqliteDatabase(DB_PATH) # database location + + +class BaseModel(Model): + class Meta: + database = db + + +class Queue(Model): # database model + + identifier = IntegerField() # Unique ID + message_raw = TextField() + message_encoded = TextField() + message_type = TextField() + dest = IntegerField() + payload_bits = TextField() + length_bits = IntegerField() + posistion = IntegerField() # Pos in queue (relative to topic) + fragmentation_tracker = BooleanField() + allow_fragmentation = BooleanField() + transmitted = BooleanField() # Transmitted state + completed = BooleanField() # Transmission completed + message_type = IntegerField() # Corresponds to message table + direction = BooleanField() + + def get_next_message_smaller_than_bits(self, size_in_bits, dest_address=None, packet_codec=None, + check_reachability=True): + try: + for val in Queue.select(): + next_message = None + + if dest_address and (dest_address != val.dest): + # Move on to check the next message + rospy.logdebug("Disqualified: dest_address") + continue + if not val.allow_fragmentation and val.length_bits > size_in_bits: + rospy.logdebug("Disqualified: too large, no fragmentation") + continue + break + + return next_message + + except: + return None + + def bit_count(self): + count = 0 + + for val in Queue.select(): + count += val.length_bits + + return count + + def message_count(self): + counter = 0 + for val in Queue.select(): + counter += 1 + return counter + + def removeMessage(self, idVal): + Queue.delete().where( + Queue.identifier == idVal).execute() + + def getNextMessage(self, message_type): + for val in Queue.select(): + if(val.message_type == message_type): + if(val.completed == False): + return val.message_encoded + + def clearLifoMessages(self, messageType): + Queue.delete().where(Queue.message_type == messageType).execute() + + def popLeft(self): + Queue.delete().where(Queue.identifier == 0).execute() + + def popRight(self): + Queue.delete().where(Queue.identifier == self.getLastIterator()).execute() + + def updateTransmit(self, msg, transmitVal): + dictionary = message_converter.convert_ros_message_to_dictionary( + msg) + message_str = json.dumps(dictionary) + Queue.update(transmitted=transmitVal).where( + Queue.message == message_str).execute() + + def addRight(self, message, message_codecVal, destVal, payload_bitsVal, length_bitsVal, time_addedVal, transmittedVal, next_hop_ackedVal, priorityVal, fragmentation_trackerVal, allow_fragmentationVal, is_activeVal, directionVal): + dictionary = message_converter.convert_ros_message_to_dictionary( + message) + message_raw = json.dumps(dictionary) + + idVal = self.getLastIterator + 1 + countVal = 0 + for val in Queue.select(): + countVal += 1 + posistionVal = countVal + 1 + + Queue.create(identifier=idVal, + message_raw=message_raw, + message_codec=message_codecVal, + dest=destVal, + payload_bits=payload_bitsVal, + length_bits=length_bitsVal, + time_added=time_addedVal, + transmitted=transmittedVal, + next_hop_acked=next_hop_ackedVal, + posistion=posistionVal, + fragmentation_tracker=fragmentation_trackerVal, + allow_fragmentation=allow_fragmentationVal, + is_active=is_activeVal, + direction=directionVal) + + def addLeft(self, message, message_codecVal, destVal, payload_bitsVal, length_bitsVal, time_addedVal, transmittedVal, next_hop_ackedVal, priorityVal, fragmentation_trackerVal, allow_fragmentationVal, is_activeVal, directionVal): + dictionary = message_converter.convert_ros_message_to_dictionary( + message) + message_raw = json.dumps(dictionary) + + idVal = self.getLastIterator + 1 + + for val in Queue.select(): + posistionVal = val.posistion - 1 + + Queue.create(identifier=idVal, + message=message_raw, + message_codec=message_codecVal, + dest=destVal, + payload_bits=payload_bitsVal, + length_bits=length_bitsVal, + time_added=time_addedVal, + transmitted=transmittedVal, + next_hop_acked=next_hop_ackedVal, + posistionVal=posistionVal, + fragmentation_tracker=fragmentation_trackerVal, + allow_fragmentation=allow_fragmentationVal, + is_active=is_activeVal, + direction=directionVal) + + def displayTable(self): + for command in Queue.select(): + print(command.output) + + def returnTable(self): + return Queue.select() + + def getLastIterator(self): + highVal = -1 + for val in Queue.select(): + if(val.identifier > highVal): + highVal = val.identifier + return highVal + + +class Topics(BaseModel): # Gets created at yaml read + message_type = IntegerField() + message_name = TextField() + priority = IntegerField() + is_active = BooleanField() + + def addEntry(self, message_typeVal, priorityVal, is_activeVal, directionVal = None): + Topics.create(message_type=message_typeVal, priority=priorityVal, + is_active=is_activeVal, direction=directionVal) + + +class msgEvents(BaseModel): + identifier = IntegerField() + time = TimeField() + source = IntegerField() + dest = IntegerField() + + def removeEntry(self, idVal): + msgEvents.delete().where( + msgEvents.identifier == idVal).execute() + + def addEntry(self, identifierVal, timeVal, sourceVal, destVal): + msgEvents.create(identifier=identifierVal, time=timeVal, + source=sourceVal, dest=destVal) + + def getLastIterator(self): + highVal = -1 + for val in msgEvents.select(): + if(val.identifier > highVal): + highVal = val.identifier + return highVal + + +class Fragmentation(BaseModel): + identifier = IntegerField() + time = TimeField() + source = IntegerField() + dest = IntegerField() + + def removeEntry(self, idVal): + Fragmentation.delete().where( + Fragmentation.identifier == idVal).execute() + + def addEntry(self, identifierVal, timeVal, sourceVal, destVal): + Fragmentation.create(identifier=identifierVal, + time=timeVal, source=sourceVal, dest=destVal) + + def getLastIterator(self): + highVal = -1 + for val in Fragmentation.select(): + if(val.identifier > highVal): + highVal = val.identifier + return highVal diff --git a/database/databases/queue.db b/database/databases/queue.db new file mode 100644 index 0000000000000000000000000000000000000000..393053933e96565b6bfe691bf6e093f117b0355d GIT binary patch literal 57344 zcmeI&O>Wab6aZkS{1hk@cC0FmvH}UQ;Q&yIg(^aT5G1+`%@iWlN$WUJ)`$ymA<n^V zcqxq%RIy_P-zZOGy?K7}e7lML`c-Gt<a98pt3i_<6t{|2t9X=BQEYrGiefY3Ld5l= zxcoilixG>_T3q~jjvn_P{<skZt<7-}KZ5`P0t5&UAV7cs0Rja63xO{;TkE&Cwpw40 zo58!oyg#X`!FV`b%xgQna=Txq{`TWenc9nVJKbrIhV8W5?Uzr>Uh2N+r|z3hC%x?L zKHu)`rDx^dy%Z&R+>8!JxjtVS%V*vCpO11o^~<-><E+ne@%+MM)*Q{6U-_u3b2F>Q zJCkAbID4GPPA9W^|F_IAPvfR#MKh{$v|3Gl$b;pcqDeJ4In3kLiaH<9qEB~Lt!Aq{ zoetjT<v!xx$;VuOs^cKPT9{Ar{_NCwHG42R%ui0AZf<aQem0Lozd?Wi0RjXF5FkK+ z009C72>km38?nXtyZuWMmm}6Au0&joxE3*Ak8c2q`QHWeZvxgL3<3lQ5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAn?}&egZ|O BgT?>= literal 0 HcmV?d00001 diff --git a/src/message_queue_node.py b/src/message_queue_node.py index 659dac90..fbef4f8d 100755 --- a/src/message_queue_node.py +++ b/src/message_queue_node.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 from __future__ import unicode_literals -from itertools import groupby +from itertools import count, groupby from typing import Union import rospy @@ -17,13 +17,13 @@ from ros_acomms.srv import GetNextPacketData, GetNextPacketDataRequest, GetNextP from ros_acomms.msg import NeighborReachability, SummaryMessageCount, QueueStatus, EncodedAck from bitstring import Bits, BitStream from fragmentation_tracker import FragmentationTracker, FragmentationStartHeader, FragmentationContHeader, FragmentationAckHeader - -from acomms_codecs.ros_message_codec import RosMessageCodec +from database.controllers.queueController import * +from field_codecs import RosMessageCodec from acomms_codecs.ros_packet_codec import RosPacketCodec - from acomms_codecs.packet_codecs import packet_codecs - -#TODO: move this +import json +from rospy_message_converter import message_converter +# TODO: move this from packet_dispatch_node import DecoderListEntry @@ -47,11 +47,14 @@ class QueuedMessage: def get_min_fragment_size_bits(self): if self.fragmentation_tracker is None: - size = (FragmentationStartHeader.length_bits + self.fragment_size_bits) + size = (FragmentationStartHeader.length_bits + + self.fragment_size_bits) elif self.fragmentation_tracker.start_header_acked: - size = (FragmentationContHeader.length_bits + self.fragment_size_bits) + size = (FragmentationContHeader.length_bits + + self.fragment_size_bits) else: - size = (FragmentationStartHeader.length_bits + self.fragment_size_bits) + size = (FragmentationStartHeader.length_bits + + self.fragment_size_bits) rospy.logdebug("Minimum fragment size: {0}".format(size)) return size @@ -73,10 +76,12 @@ class QueuedMessage: if self.fragmentation_tracker is None: sequence_number = dest_sequence_nums[self.dest] dest_sequence_nums[self.dest] += 1 - rospy.logdebug("Queued Message length bits: {0}".format(self.length_bits)) + rospy.logdebug( + "Queued Message length bits: {0}".format(self.length_bits)) self.sequence_number = sequence_number self.fragmentation_tracker = FragmentationTracker(sequence_number, 0, self.dest, - int(self.time_added.timestamp()), self.length_bits, + int(self.time_added.timestamp( + )), self.length_bits, self.fragment_size_bits, payload_bits=self.payload_bits) @@ -96,19 +101,21 @@ class QueuedMessage: except: rospy.logdebug("No acked bytes yet") - message_bits = self.fragmentation_tracker.next_fragment_to_transfer(size_in_bits) + message_bits = self.fragmentation_tracker.next_fragment_to_transfer( + size_in_bits) message_header = self.queue.header + 128 - rospy.logdebug("Returning {0} of {1} bits".format(message_bits.length, self.payload_bits.length)) + rospy.logdebug("Returning {0} of {1} bits".format( + message_bits.length, self.payload_bits.length)) if remove_from_queue and not fragmented: self.queue.mark_transmitted(self) return message_header, message_bits + class MessageQueue(object): - def __init__(self, master_message_queue, dest_address, header, message_codec=None, ack_next_hop=False, ack_end_to_end=False, + def __init__(self, dest_address, header, message_codec=None, ack_next_hop=False, ack_end_to_end=False, priority=1, order='lifo', maxsize=100, packet_codec=None, allow_fragmentation=False, is_active=True, roundrobin_tx=False): - self.master_message_queue = master_message_queue # Acks are TODO self.ack_next_hop = ack_next_hop self.ack_end_to_end = ack_end_to_end @@ -123,13 +130,13 @@ class MessageQueue(object): self.is_active = is_active self._last_tx_time = datetime.utcnow() - self._queue = deque(maxlen=maxsize) def priority_update(self, priority_requested): - rospy.logwarn("Priority request for msg queue: {}".format(priority_requested)) + rospy.logwarn( + "Priority request for msg queue: {}".format(priority_requested)) self.priority = priority_requested return self.priority - + def append(self, message): encoded_bits, metadata = self.message_codec.encode(message) rospy.loginfo("Append: {}".format(encoded_bits)) @@ -138,75 +145,54 @@ class MessageQueue(object): if 'dest' in metadata: dest = metadata['dest'] - queued_message = QueuedMessage(message=message, - message_codec=self.message_codec, - queue=self, - dest=dest, - payload_bits=encoded_bits, - length_bits=encoded_bits.length, - time_added=datetime.utcnow(), - transmitted=False, - next_hop_acked=False, - priority=self.priority, - fragmentation_tracker=None, - allow_fragmentation=self.allow_fragmentation, - is_active = self.is_active - ) - if self.order == 'fifo': if len(self._queue) < self._queue.maxlen: # drop this message if the queue is full. - self._queue.appendleft(queued_message) + Queue.addRight( + message, + encoded_bits, + self.dest, + encoded_bits.length, + datetime.utcnow(), + None, + self.allow_fragmentation, + self.message_codec, + ) # master message queue draws from the top - self.master_message_queue.append(queued_message) else: # For LIFO, we threw away the old messages - if len(self._queue) == self._queue.maxlen: - # drop the oldest message - message_to_remove = self._queue.popleft() - self.master_message_queue.remove(message_to_remove) - self._queue.append(queued_message) - self.master_message_queue.insert(0, queued_message) - - rospy.logdebug_throttle(5, "Added message to queue, {} messages".format(len(self._queue))) + Queue.clearLifoMessages(self.message_codec) + Queue.addLeft( + message, + encoded_bits, + self.dest, + encoded_bits.length, + datetime.utcnow(), + None, + self.allow_fragmentation, + self.message_codec, + 0 #0 represents an outbound message + ) + + Queue.clearTopic(message_type) + + rospy.logdebug_throttle( + 5, "Added message to queue") def mark_transmitted(self, queued_message): # type: (QueuedMessage) -> None - if queued_message in self._queue: - queued_message.transmitted = True - self._last_tx_time = datetime.utcnow() - if (not self.ack_next_hop) and (not self.ack_end_to_end): - self._queue.remove(queued_message) - self.master_message_queue.remove(queued_message) - - def get_next_message(self, remove_from_queue=True): - if len(self._queue) == 0: - return None - queued_message = self._queue[-1] - if remove_from_queue: - self._queue.pop() - self._last_tx_time = datetime.utcnow() - return queued_message + Queue.updateTransmit(queued_message, True) + self._last_tx_time = datetime.utcnow() + + def get_next_message(self, message_type): + return Queue.getNextMessage(message_type) def get_next_message_smaller_than(self, size_in_bytes, remove_from_queue=False): - return self.get_next_message_smaller_than_bits(size_in_bits=size_in_bytes*8, remove_from_queue = remove_from_queue) - - def get_next_message_smaller_than_bits(self, size_in_bits, remove_from_queue=False): - if len(self._queue) == 0: - return None - for qm in reversed(self._queue): - if qm.length_bits <= size_in_bits: - if remove_from_queue: - self._queue.remove(qm) - self._last_tx_time = datetime.utcnow() - return qm + return Queue.get_next_message_smaller_than_bits(size_in_bits=size_in_bytes*8) @property def bit_count(self): - count = 0 - for msg in self._queue: - count += msg.length_bits - return count + return Queue.bit_count() @property def byte_count(self): @@ -214,7 +200,7 @@ class MessageQueue(object): @property def message_count(self): - return len(self._queue) + return Queue.message_count() @property def time_since_last_tx(self): @@ -227,42 +213,44 @@ class MessageQueue(object): class MessageQueueNode(object): def __init__(self): + self.topic_list = [] #List of dict for topics + rospy.init_node('message_queue', log_level=rospy.DEBUG) self.update_rate = rospy.get_param('~update_rate', 1) - self.unknown_dests_are_reachable = rospy.get_param('~unknown_dests_are_reachable', True) + self.unknown_dests_are_reachable = rospy.get_param( + '~unknown_dests_are_reachable', True) self.default_dest = rospy.get_param('~default_dest', 121) self.default_priority = rospy.get_param('~default_priority', 10) - self.queues = {} # type: dict[str, MessageQueue] - self.topic_queue = {} self.topic_headers = {} self.destination_reachable = {} self.dest_sequence_num = defaultdict(lambda: 1) - self.queues_by_priority = defaultdict(list) self.priority_update = False - self._queued_messages = [] - + self.queue_active_names = [] - # Status publishers - self.queue_status_pub = rospy.Publisher('queue_status', QueueStatus, queue_size=10, latch=True) + self.queue_status_pub = rospy.Publisher( + 'queue_status', QueueStatus, queue_size=10, latch=True) # Active Queues Service - self.queue_active_service = rospy.Service('queue_active', QueueActive, self.handle_queue_active) - + self.queue_active_service = rospy.Service( + 'queue_active', QueueActive, self.handle_queue_active) + # Dynamic Priority Service - self.priority_update_service = rospy.Service('priority_update', PriorityUpdate, self.handle_priority_update) - + self.priority_update_service = rospy.Service( + 'priority_update', PriorityUpdate, self.handle_priority_update) + # this is a little awkward, since it's conceptually backward here. # when queuing for TX, we go from topic -> message -> packet # However, organizing it this way allows us to use a single config YAML for both TX and RX packet_codec_param = rospy.get_param('~packet_codecs') self.packet_codecs = {} for packet_codec_details in packet_codec_param: - rospy.loginfo("Packet codec details: {}".format(packet_codec_details)) + rospy.loginfo("Packet codec details: {}".format( + packet_codec_details)) packet_codec_class = packet_codecs[packet_codec_details['packet_codec']] message_codecs = {} @@ -270,129 +258,114 @@ class MessageQueueNode(object): ros_type_name = message_codec_params['ros_type'] msg_class = roslib.message.get_message_class(ros_type_name) if not msg_class: - raise TypeError('Invalid ROS type "{}" for message codec {}'.format(ros_type_name, id)) - message_codec = RosMessageCodec(ros_type=msg_class, fields_dict=message_codec_params['fields']) + raise TypeError( + 'Invalid ROS type "{}" for message codec {}'.format(ros_type_name, id)) + message_codec = RosMessageCodec( + ros_type=msg_class, fields_dict=message_codec_params['fields']) message_codecs[message_codec_params['id']] = message_codec - # TODO clean this up self.destination_reachable[ message_codec_params['default_dest']] = True if self.unknown_dests_are_reachable else False actual_topic_name = message_codec_params['subscribe_topic'] - queue_active_status = message_codec_params.get('is_active', True) + queue_active_status = message_codec_params.get( + 'is_active', True) priority = message_codec_params['priority'] - if queue_active_status: self.queue_active_names.append(actual_topic_name) - + if not self.priority_update: # No priority update requested, use message codec params - rospy.logwarn("Priority update is {}".format(self.priority_update)) + rospy.logwarn("Priority update is {}".format( + self.priority_update)) priority = message_codec_params['priority'] else: - rospy.logwarn("Priority update is {}".format(self.priority_update)) + rospy.logwarn("Priority update is {}".format( + self.priority_update)) priority = self.queue_priority_desired # Priority variable is set, now change bool to False to reset for future self.priority_update = False - new_queue = MessageQueue(master_message_queue=self._queued_messages, - dest_address=message_codec_params.get('default_dest', self.default_dest), - priority=message_codec_params.get('priority', self.default_priority), - header=message_codec_params['id'], - order=message_codec_params['queue_order'], - maxsize=message_codec_params['queue_maxsize'], - allow_fragmentation=message_codec_params.get('allow_fragmentation', False), - is_active=message_codec_params.get('is_active', True), - message_codec=message_codec, - ) - self.topic_queue[actual_topic_name] = new_queue + topic = {message_codec_params['id']:message_codec} #Create new topic *used to codec) + + self.topic_list.append(topic) #append topic to list + + Topics.addEntry( + message_codec_params['id'], + ros_type_name, + priority, + message_codec_params.get( + 'is_active', True), + ) + rospy.Subscriber(actual_topic_name, AnyMsg, partial(self.on_incoming_message, topic_name=actual_topic_name)) # The packet codec initializer sets the packet codec on each message codec packet_codec = packet_codec_class(message_codecs=message_codecs, - miniframe_header=packet_codec_details.get('miniframe_header', bytes()), + miniframe_header=packet_codec_details.get( + 'miniframe_header', bytes()), dataframe_header=packet_codec_details.get('dataframe_header', bytes())) - rospy.loginfo("Added packet codec with {} message codecs".format(len(message_codecs))) + rospy.loginfo("Added packet codec with {} message codecs".format( + len(message_codecs))) # Subscribe to the Neighbor Reachability updates (which may be published by more than one node - rospy.Subscriber("neighbor_reachability", NeighborReachability, self.on_neighbor_reachability) + rospy.Subscriber("neighbor_reachability", + NeighborReachability, self.on_neighbor_reachability) self.get_packet_data_service = rospy.Service('get_next_packet_data', GetNextPacketData, self.handle_get_next_packet) # Subscribe to ACK message to update queues - rospy.Subscriber("from_acomms/encoded_ack", EncodedAck, self.on_ack_received) + rospy.Subscriber("from_acomms/encoded_ack", + EncodedAck, self.on_ack_received) rospy.loginfo("Message Queue started") self.spin() - def update_queue_status(self): - highest_priority = -1 - queued_dest_addresses = set() - message_counts = {} - - for key, group in groupby(self._queued_messages, lambda x: (x.priority, x.dest)): - group_count = sum(1 for _ in group) - message_counts[key] = group_count - highest_priority = max(highest_priority, key[0]) - queued_dest_addresses.add(key[1]) - - total_message_count = len(self._queued_messages) - ''' - for m in self._queued_messages: - message_counts.setdefault((m.priority, m.dest_address), 0) - if q.message_count: - message_counts[(q.priority, q.dest_address)] += q.message_count - total_message_count += q.message_count - highest_priority = max(highest_priority, q.priority) - queued_dest_addresses.add(q.dest_address) - ''' - - # Build the QueueStatus message - hdr = Header(stamp=rospy.get_rostime()) - summary_message_counts = [] - for k,v in message_counts.items(): - summary_message_counts.append(SummaryMessageCount(priority=k[0], dest_address=k[1], message_count=v)) - msg = QueueStatus(header=hdr, message_count=total_message_count, highest_priority=highest_priority, - queued_dest_addresses=queued_dest_addresses, summary_message_counts=summary_message_counts) - self.queue_status_pub.publish(msg) - def on_incoming_message(self, msg_data, topic_name): # type: (AnyMsg, str) -> None connection_header = msg_data._connection_header['type'].split('/') ros_pkg = connection_header[0] + '.msg' msg_type = connection_header[1] - #print 'Message type detected as ' + msg_type + # print 'Message type detected as ' + msg_type msg_class = getattr(import_module(ros_pkg), msg_type) msg = msg_class() msg.deserialize(msg_data._buff) - rospy.logdebug("On incoming message: {} ({})".format(topic_name, msg_type)) - + rospy.logdebug("On incoming message: {} ({})".format( + topic_name, msg_type)) + # Check if incoming msg topic name matches any of the active queue topic names - if topic_name in self.queue_active_names: - rospy.logdebug("TOPIC: {} in active queue list, appending msg".format(topic_name)) - self.topic_queue[topic_name].append(msg) - self.update_queue_status() - else: - rospy.logdebug("TOPIC: {} not in active queue list, skipping msg".format(topic_name)) + found = False + for val in Topics.select(): + if (topic_name == val.message_name): + rospy.logdebug( + "TOPIC: {} in active queue list, appending msg".format(topic_name)) + found = True + if (found == False): + rospy.logdebug( + "TOPIC: {} not in active queue list, skipping msg".format(topic_name)) def on_neighbor_reachability(self, msg: NeighborReachability): self.destination_reachable[msg.dest] = msg.reachable def on_ack_received(self, msg: EncodedAck): - ack = FragmentationAckHeader.from_bits(BitStream(bytes=msg.encoded_ack)) - rospy.logdebug("Received ACK with src: {0} seq num: {1}". format(msg.src, ack.sequence_num)) - rospy.logdebug("Ack blocks {}-{}".format(ack.blocks[0][0], ack.blocks[-1][1])) + ack = FragmentationAckHeader.from_bits( + BitStream(bytes=msg.encoded_ack)) + rospy.logdebug("Received ACK with src: {0} seq num: {1}". format( + msg.src, ack.sequence_num)) + rospy.logdebug( + "Ack blocks {}-{}".format(ack.blocks[0][0], ack.blocks[-1][1])) rospy.logdebug("ACK N-Blocks: {}".format(ack.n_blocks)) - for message in self._queued_messages: - rospy.logdebug("Message dest: {0} Sequence number: {1}".format(message.dest, message.sequence_number)) - if message.dest == msg.src and message.sequence_number == ack.sequence_num: + for message in Queue.returnTable(): + rospy.logdebug("Message dest: {0} Sequence number: {1}".format( + message.dest, message.sequence_number)) + if message.dest == msg.src and message.identifier == ack.sequence_num: if len(ack.blocks) == 1 and ack.blocks[0][0] == 0 and ack.blocks[-1][-1] == 1 and ack.ack_flag == 0: rospy.logwarn( "Empty ACK received indicated the message recipient does not have start fragment for this message") @@ -400,8 +373,10 @@ class MessageQueueNode(object): return rospy.logdebug("Recording blocks ACKED") - message.fragmentation_tracker.record_blocks_acked(ack.blocks, ack.ack_flag) - rospy.logdebug("Payload size={0}".format(message.fragmentation_tracker.payload_size_blocks)) + message.fragmentation_tracker.record_blocks_acked( + ack.blocks, ack.ack_flag) + rospy.logdebug("Payload size={0}".format( + message.fragmentation_tracker.payload_size_blocks)) if message.fragmentation_tracker.acked(): # Remove this message from the queue @@ -415,50 +390,15 @@ class MessageQueueNode(object): msg.queue.mark_transmitted(msg) def get_next_message(self, remove_from_queue=False, check_reachability=True): - # type: (bool) -> Union[None, QueuedMessage] - # Use the master message queue for this - next_message = None - for message in sorted(self._queued_messages, key=lambda m: m.priority, reverse=True): - if check_reachability and not self.destination_reachable[message.dest]: - continue - next_message = message - break - return next_message + return Queue.getNextMessage() def get_next_message_smaller_than_bits(self, size_in_bits, dest_address=None, packet_codec=None, check_reachability=True): - # type: (int, bool, int, None, bool) -> Union[None, QueuedMessage] - next_message = None - - for message in sorted(self._queued_messages, key=lambda m: m.priority, reverse=True): - if check_reachability and not self.destination_reachable[message.dest]: - rospy.logdebug("Disqualified: Reachability") - continue - if dest_address and (dest_address != message.dest): - # Move on to check the next message - rospy.logdebug("Disqualified: dest_address") - continue - if packet_codec and (packet_codec != message.message_codec.packet_codec): - # Move on to check the next message - rospy.logdebug("Disqualified: packet_codec") - continue - if not message.allow_fragmentation and message.length_bits > size_in_bits: - rospy.logdebug("Disqualified: too large, no fragmentation") - continue - if message.allow_fragmentation and message.get_min_fragment_size_bits() > size_in_bits: - rospy.logdebug("Disqualified: fragments too large") - continue - if message.allow_fragmentation and message.get_min_fragment_size_bits() == 0: - rospy.logdebug("Disqualified: no fragments to send at this time") - continue - - next_message = message - break - - return next_message + return Queue.get_next_message_smaller_than_bits(size_in_bits, dest_address, packet_codec, check_reachability) def handle_priority_update(self, request): - rospy.loginfo("Handling new priority update, new priority requested: {}".format(request.priority_desired)) + rospy.loginfo("Handling new priority update, new priority requested: {}".format( + request.priority_desired)) self.priority_update = True self.queue_name = request.topic_name self.queue_priority_desired = request.priority_desired @@ -466,7 +406,7 @@ class MessageQueueNode(object): self.topic_queue[topic_name].priority_update(request.priority_desired) response = True return response - + def handle_queue_active(self, request): rospy.loginfo("Handling new dynamic queue request") if not request.set_active: @@ -480,17 +420,19 @@ class MessageQueueNode(object): self.queue_active_status = True response = True return response - + def handle_get_next_packet(self, req): # type: (GetNextPacketDataRequest) -> GetNextPacketDataResponse - rospy.loginfo("Getting next packet data for transmission:\r\n" + str(req)) + rospy.loginfo( + "Getting next packet data for transmission:\r\n" + str(req)) num_miniframe_bytes = req.num_miniframe_bytes num_dataframe_bytes = req.num_dataframe_bytes # To get the next packet, the first thing we do is get the highest priority message to transmit # Then, we get a packet using its packet codec. # This is also where we should eventually evaluate which rate to use - highest_priority_message = self.get_next_message(remove_from_queue=False) + highest_priority_message = self.get_next_message( + remove_from_queue=False) if not highest_priority_message: return GetNextPacketDataResponse(num_miniframe_bytes=0, num_dataframe_bytes=0, num_messages=0) @@ -500,20 +442,25 @@ class MessageQueueNode(object): num_dataframe_bytes, self) - #for msg in messages_in_packet: + # for msg in messages_in_packet: # self.mark_transmitted(msg) return GetNextPacketDataResponse(dest=highest_priority_message.dest, miniframe_bytes=miniframe_bytes, dataframe_bytes=dataframe_bytes, queued_message_ids=[], - num_miniframe_bytes=len(miniframe_bytes), - num_dataframe_bytes=len(dataframe_bytes), + num_miniframe_bytes=len( + miniframe_bytes), + num_dataframe_bytes=len( + dataframe_bytes), num_messages=len(messages_in_packet)) @property def message_count(self): - return sum([q.message_count for q in self.queues.values()]) + count = 0 + for val in Queue.returnTable(): + count += 1 + return count @property def age_of_oldest_queue(self): @@ -521,19 +468,21 @@ class MessageQueueNode(object): return max([q.time_since_last_tx for q in self.queues.values()]) def get_filtered_message_count(self, dest_address, min_priority): - filtered_queues = [q for q in self.queues.values() if q.priority >= min_priority] - filtered_queues = [q for q in filtered_queues if q.dest_address == dest_address] + filtered_queues = [ + val for val in Queue.returnTable() if val.priority >= min_priority] + filtered_queues = [ + val for val in Queue.returnTable() if val.dest_address == dest_address] if len(filtered_queues) == 0: return 0 else: - return sum([q.message_count for q in filtered_queues]) + return sum([1 for q in filtered_queues]) def spin(self): # Issue status periodically. rate = rospy.Rate(self.update_rate) while not rospy.is_shutdown(): - #self.time_since_last_tx_pub.publish(rospy.Duration.from_sec(self.age_of_oldest_queue.total_seconds())) - #self.messages_in_queues_pub.publish(Int32(self.message_count)) + # self.time_since_last_tx_pub.publish(rospy.Duration.from_sec(self.age_of_oldest_queue.total_seconds())) + # self.messages_in_queues_pub.publish(Int32(self.message_count)) self.update_queue_status() rate.sleep() diff --git a/src/packet_dispatch_node.py b/src/packet_dispatch_node.py index 9b56e567..401d8dd6 100755 --- a/src/packet_dispatch_node.py +++ b/src/packet_dispatch_node.py @@ -7,7 +7,7 @@ import roslib.message from dataclasses import dataclass, field from ros_acomms.msg import ReceivedPacket, Packet, CST from bitstring import ConstBitStream, ReadError - +from database.controllers.queueController import * from acomms_codecs.ros_message_codec import RosMessageCodec from acomms_codecs.packet_codecs import packet_codecs -- GitLab From f2ac7bcf61f85415ac02ae714ab27a5f836c9364 Mon Sep 17 00:00:00 2001 From: Brennan Miller-Klugman <millerklugmanb@wit.edu> Date: Thu, 30 Sep 2021 09:46:20 -0400 Subject: [PATCH 2/4] Added verbose commenting for queueController and updated packet_dispatch_node to add table entries --- .gitignore | 1 + database/controllers/queueController.py | 84 ++++++++++++++++++++----- src/packet_dispatch_node.py | 44 ++++++++----- 3 files changed, 100 insertions(+), 29 deletions(-) diff --git a/.gitignore b/.gitignore index c0706b96..e5ee6dab 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ /src/acomms_codecs/__pycache__ /src/acomms_codecs/field_codecs/__pycache__ .vscode/settings.json +.vscode/c_cpp_properties.json diff --git a/database/controllers/queueController.py b/database/controllers/queueController.py index 3843ca3e..e868ea0f 100644 --- a/database/controllers/queueController.py +++ b/database/controllers/queueController.py @@ -24,21 +24,23 @@ class Queue(Model): # database model identifier = IntegerField() # Unique ID message_raw = TextField() - message_encoded = TextField() - message_type = TextField() - dest = IntegerField() - payload_bits = TextField() - length_bits = IntegerField() - posistion = IntegerField() # Pos in queue (relative to topic) - fragmentation_tracker = BooleanField() - allow_fragmentation = BooleanField() - transmitted = BooleanField() # Transmitted state - completed = BooleanField() # Transmission completed - message_type = IntegerField() # Corresponds to message table - direction = BooleanField() + message_encoded = TextField(null=True) + message_type = TextField(null=True) + dest = IntegerField(null=True) + payload_bits = TextField(null=True) + length_bits = IntegerField(null=True) + posistion = IntegerField(null=True) # Pos in queue (relative to topic) + fragmentation_tracker = BooleanField(null=True) + allow_fragmentation = BooleanField(null=True) + transmitted = BooleanField(null=True) # Transmitted state + completed = BooleanField(null=True) # Transmission completed + message_type = IntegerField(null=True) # Corresponds to message table + direction = BooleanField(null=True) def get_next_message_smaller_than_bits(self, size_in_bits, dest_address=None, packet_codec=None, check_reachability=True): + '''get_next_message_smaller_than_bits returns the next message in the queue with a bit count smaller then the specified parameter''' + try: for val in Queue.select(): next_message = None @@ -58,6 +60,8 @@ class Queue(Model): # database model return None def bit_count(self): + '''bit_count is used to count bits across all messages''' + count = 0 for val in Queue.select(): @@ -66,38 +70,69 @@ class Queue(Model): # database model return count def message_count(self): + '''message_count is used to count all unsent messages across every topic type''' + counter = 0 for val in Queue.select(): - counter += 1 + if (val.transmitted == False): + counter += 1 return counter def removeMessage(self, idVal): + '''removeMessage is used to delete a message with an identifier of idVal''' + Queue.delete().where( Queue.identifier == idVal).execute() def getNextMessage(self, message_type): + '''getNextMessage is used to get the next message within a specific message type queue''' + for val in Queue.select(): if(val.message_type == message_type): if(val.completed == False): return val.message_encoded def clearLifoMessages(self, messageType): + '''clearLifoMessages is used to delete all entries within a certain message type''' + Queue.delete().where(Queue.message_type == messageType).execute() def popLeft(self): + '''popLeft is used to delete the left most value from the queue''' + Queue.delete().where(Queue.identifier == 0).execute() def popRight(self): + '''popRight is used to delete the right most value from the queue''' Queue.delete().where(Queue.identifier == self.getLastIterator()).execute() def updateTransmit(self, msg, transmitVal): + '''updateTransmit function is used to update a database entry's transmit value parameter''' + dictionary = message_converter.convert_ros_message_to_dictionary( msg) message_str = json.dumps(dictionary) Queue.update(transmitted=transmitVal).where( Queue.message == message_str).execute() + def addRecieve(self, message, payloadBits): + '''Add recieve function is used to create database entries on message recieve''' + + dictionary = message_converter.convert_ros_message_to_dictionary( + message) + message_raw = json.dumps(dictionary) + + Queue.create(identifier=self.getLastIterator + 1, + message_raw=message_raw, + payload_bits=payloadBits, + transmitted=True, + is_active=False, + direction=1 # Direction 1 represents incomming + ) + def addRight(self, message, message_codecVal, destVal, payload_bitsVal, length_bitsVal, time_addedVal, transmittedVal, next_hop_ackedVal, priorityVal, fragmentation_trackerVal, allow_fragmentationVal, is_activeVal, directionVal): + '''Add right function is used to create database entries on the right side of the queue''' + dictionary = message_converter.convert_ros_message_to_dictionary( message) message_raw = json.dumps(dictionary) @@ -124,6 +159,8 @@ class Queue(Model): # database model direction=directionVal) def addLeft(self, message, message_codecVal, destVal, payload_bitsVal, length_bitsVal, time_addedVal, transmittedVal, next_hop_ackedVal, priorityVal, fragmentation_trackerVal, allow_fragmentationVal, is_activeVal, directionVal): + '''Add right function is used to create database entries on the left side of the queue''' + dictionary = message_converter.convert_ros_message_to_dictionary( message) message_raw = json.dumps(dictionary) @@ -149,13 +186,18 @@ class Queue(Model): # database model direction=directionVal) def displayTable(self): + '''*Test Function* used to display all values within the database''' + for command in Queue.select(): print(command.output) def returnTable(self): + '''Retruns contect of select() function''' + return Queue.select() def getLastIterator(self): + '''Gets the highest iterator value in the database''' highVal = -1 for val in Queue.select(): if(val.identifier > highVal): @@ -169,7 +211,9 @@ class Topics(BaseModel): # Gets created at yaml read priority = IntegerField() is_active = BooleanField() - def addEntry(self, message_typeVal, priorityVal, is_activeVal, directionVal = None): + def addEntry(self, message_typeVal, priorityVal, is_activeVal, directionVal=None): + '''addEntry function creates a table entry based on function parameters''' + Topics.create(message_type=message_typeVal, priority=priorityVal, is_active=is_activeVal, direction=directionVal) @@ -181,14 +225,20 @@ class msgEvents(BaseModel): dest = IntegerField() def removeEntry(self, idVal): + '''removeEntry function removes a table entry based on the specified idVal''' + msgEvents.delete().where( msgEvents.identifier == idVal).execute() def addEntry(self, identifierVal, timeVal, sourceVal, destVal): + '''addEntry function creates a table entry based on function parameters''' + msgEvents.create(identifier=identifierVal, time=timeVal, source=sourceVal, dest=destVal) def getLastIterator(self): + '''Gets the highest iterator value in the database''' + highVal = -1 for val in msgEvents.select(): if(val.identifier > highVal): @@ -203,14 +253,20 @@ class Fragmentation(BaseModel): dest = IntegerField() def removeEntry(self, idVal): + '''removeEntry function removes a table entry based on the specified idVal''' + Fragmentation.delete().where( Fragmentation.identifier == idVal).execute() def addEntry(self, identifierVal, timeVal, sourceVal, destVal): + '''addEntry function creates a table entry based on function parameters''' + Fragmentation.create(identifier=identifierVal, time=timeVal, source=sourceVal, dest=destVal) def getLastIterator(self): + '''Gets the highest iterator value in the database''' + highVal = -1 for val in Fragmentation.select(): if(val.identifier > highVal): diff --git a/src/packet_dispatch_node.py b/src/packet_dispatch_node.py index 401d8dd6..bb9990aa 100755 --- a/src/packet_dispatch_node.py +++ b/src/packet_dispatch_node.py @@ -10,10 +10,12 @@ from bitstring import ConstBitStream, ReadError from database.controllers.queueController import * from acomms_codecs.ros_message_codec import RosMessageCodec from acomms_codecs.packet_codecs import packet_codecs +from database.controllers.queueController import * + @dataclass class DecoderListEntry: - #TODO: move header stuff into separate packetcodec class + # TODO: move header stuff into separate packetcodec class codec_name: str miniframe_header: bytes dataframe_header: bytes @@ -52,18 +54,22 @@ class DecoderListEntry: return False if len(self.miniframe_header) > 0: if bytes(self.miniframe_header) != received_packet.packet.miniframe_bytes[:len(self.miniframe_header)]: - print("Miniframe header mismatch: {} not {}".format(received_packet.packet.miniframe_bytes[:len(self.miniframe_header)], self.miniframe_header)) + print("Miniframe header mismatch: {} not {}".format( + received_packet.packet.miniframe_bytes[:len(self.miniframe_header)], self.miniframe_header)) return False if len(self.dataframe_header) > 0: if bytes(self.dataframe_header) != received_packet.packet.dataframe_bytes[:len(self.dataframe_header)]: - print("Dataframe header mismatch: {} not {}".format(received_packet.packet.dataframe_bytes[:len(self.dataframe_header)], self.dataframe_header)) + print("Dataframe header mismatch: {} not {}".format( + received_packet.packet.dataframe_bytes[:len(self.dataframe_header)], self.dataframe_header)) return False return True def strip_headers(self, received_packet: ReceivedPacket): if self.remove_headers: - miniframe_bytes = received_packet.packet.miniframe_bytes[len(self.miniframe_header):] - dataframe_bytes = received_packet.packet.dataframe_bytes[len(self.dataframe_header):] + miniframe_bytes = received_packet.packet.miniframe_bytes[len( + self.miniframe_header):] + dataframe_bytes = received_packet.packet.dataframe_bytes[len( + self.dataframe_header):] else: miniframe_bytes = received_packet.packet.miniframe_bytes dataframe_bytes = received_packet.packet.dataframe_bytes @@ -73,7 +79,6 @@ class DecoderListEntry: class PacketDispatchNode(object): def __init__(self): rospy.init_node('packet_dispatch') - packet_codec_param = rospy.get_param('~packet_codecs') self.decoder_list = [] for packet_codec in packet_codec_param: @@ -89,14 +94,18 @@ class PacketDispatchNode(object): ros_type_name = codec_config['ros_type'] msg_class = roslib.message.get_message_class(ros_type_name) if not msg_class: - raise TypeError('Invalid ROS type "{}" for message codec {}'.format(ros_type_name, id)) + raise TypeError( + 'Invalid ROS type "{}" for message codec {}'.format(ros_type_name, id)) # TODO: allow custom codecs # like this: MyClass = getattr(importlib.import_module("module.submodule"), "MyClass") # TODO: make this work with orthogonal packet codecs - self.message_codecs[id] = RosMessageCodec(msg_class, codec_config['fields']) + self.message_codecs[id] = RosMessageCodec( + msg_class, codec_config['fields']) - pub_name = rospy.names.canonicalize_name(codec_config['publish_topic']) - self.message_publishers[id] = rospy.Publisher(pub_name, msg_class, queue_size=10) + pub_name = rospy.names.canonicalize_name( + codec_config['publish_topic']) + self.message_publishers[id] = rospy.Publisher( + pub_name, msg_class, queue_size=10) # replace name with instance of packet codec # ToDO make this less ugly and stupid @@ -105,7 +114,8 @@ class PacketDispatchNode(object): miniframe_header=entry.miniframe_header, dataframe_header=entry.dataframe_header) - rospy.loginfo("Receive packet codec initialized with {} message codecs.".format(len(self.message_codecs))) + rospy.loginfo("Receive packet codec initialized with {} message codecs.".format( + len(self.message_codecs))) rospy.Subscriber('packet_rx', ReceivedPacket, self.on_packet_rx) @@ -115,10 +125,12 @@ class PacketDispatchNode(object): def on_packet_rx(self, received_packet: ReceivedPacket): rospy.loginfo("Got incoming packet") # Decide what to do with the incoming packet, based on the match criteria - matching_codecs = [d.packet_codec for d in self.decoder_list if d.does_packet_match(received_packet)] + matching_codecs = [ + d.packet_codec for d in self.decoder_list if d.does_packet_match(received_packet)] if not matching_codecs: - rospy.loginfo("Dispatcher received packet that matched no packet decoders.") + rospy.loginfo( + "Dispatcher received packet that matched no packet decoders.") for d in matching_codecs: try: @@ -141,10 +153,12 @@ class PacketDispatchNode(object): while True: id = payload_bits.read('uint:8') ros_msg = self.message_codecs[id].decode(payload_bits) + Queue.addRecieve(ros_msg, payload_bits) # Update database with inbound message self.message_publishers[id].publish(ros_msg) except KeyError: # We don't have a config entry that matches this ID - rospy.loginfo("Unknown message ID ({}) received, unable to continue parsing packet".format(id)) + rospy.loginfo( + "Unknown message ID ({}) received, unable to continue parsing packet".format(id)) return except ReadError: @@ -158,4 +172,4 @@ if __name__ == '__main__': rospy.loginfo("Message Dispatch shutdown") except rospy.ROSInterruptException: node.close() - rospy.loginfo("Message Dispatch shutdown (interrupt)") \ No newline at end of file + rospy.loginfo("Message Dispatch shutdown (interrupt)") -- GitLab From 0495b456720e8279179925a2c1ac3cf89e65beb6 Mon Sep 17 00:00:00 2001 From: Brennan Miller-Klugman <millerklugmanb@wit.edu> Date: Thu, 30 Sep 2021 10:50:18 -0400 Subject: [PATCH 3/4] Moved location of databases folder, made minor updates to fix errors --- .../queueController.cpython-38.pyc | Bin 0 -> 8693 bytes .../database}/controllers/queueController.py | 18 +++++++++++++++--- {database => src/database}/databases/queue.db | Bin 57344 -> 57344 bytes src/packet_dispatch_node.py | 4 ++-- 4 files changed, 17 insertions(+), 5 deletions(-) create mode 100644 src/database/controllers/__pycache__/queueController.cpython-38.pyc rename {database => src/database}/controllers/queueController.py (95%) mode change 100644 => 100755 rename {database => src/database}/databases/queue.db (96%) diff --git a/src/database/controllers/__pycache__/queueController.cpython-38.pyc b/src/database/controllers/__pycache__/queueController.cpython-38.pyc new file mode 100644 index 0000000000000000000000000000000000000000..c7fdbc985d52cd8e48975fa9268321a97eeef8d9 GIT binary patch literal 8693 zcmcIpOOxBi5yp!o$R)W??doYmiesCKtaa=pPPrUKUdxj0ShOsy;uwjG#&TwNu}c!% z0jLLyn;hh-octeFm6QHNPPwFVN#&HnC5KcFuAEZ2*;P@#o&i9DTFH(blZD2-FqrQC zx~HdcvsSBU_|^Y()BE%Dn)Y|9O#X_fT*MU`x~4IW=^d@BxtdO8qoa2X7ab^@9kXk> zmMmMH!aB7TIz_iA?<Kb+@3w2>UhI^+6}KYC7CJNT3@;s2?`v+27kBChv%GdN$4d8g zX5Tm5dA`8s@9FO16^)fy<r9rn_|iSoJ%aiStD;`zM^Ilzy~gUO*JXVL^;tHD`kbsE zMSY$vpuQmM$M|u!xNC_2@Wq1@e3>8PNBPQ@bzj#tzI;z}Px51+r}$COr}zr!)2whq zYb<?6(rM_i75QC`yZv52@`Ik&LAkUR1Rd`6P}I*hLFc<X481MhY6rbrTtr;N6%m9j zuN`!|;qV0WB5%VB`D$7SSN8^dz|XUdHoa%^XQFZuS9lzRYp%{T*8o>KGr*N`&u~p< zVue;*Uc;1E1I9a#OadjUN#-Ud;ihXb9giBrdiLn~t32|WjY4d<T0O7JTdla#YIOrP z=uo-ZYV8fYPSRtr4QMuD=7A<^m|c?3udi+gUB0>@c(3R6R@+{*>-So>!_`o<SI44| z)pUYYNGlRSr^7`ek2kW0^ZmQAMN2z^SA+(Lt&3UQnhi^o>9vGsF++r!$4gPAI<?s# zOSsbKss4zlGI;~2nY_6PBv`xvTI2<i{inKH%1SmjleTi!R>|6CvQm}Tl5zE{1l%WW zb6IIVd&17z7T6hPvBD>?307pKPc(Ol*{qEF5msR{xG%FRtKq)F>TDMGqil}N<9>`S zutnUDvn6%}_Y-WHt>Au=9c9OGKgEu-6SzOcPO?+DKh2(EPvh<|hn>d#w8YnU;==oc z(RJY4>J2)bX1f6SrN0si<TT#E6>fqIuSeQ}eqXz!eYCa@k?j{phB!AeMui<?*A(X> z^T1*T8Qxw6ZH4eH(BeqnwZvPp&s2TrZ(>C81MLRf5B)y`_-|;xkhEb9f4#+{R*&C_ zTB+5AU5`N2inhI8Yr~I1NAPGMdZ80-bBBgH$xz4d$u?<y&TT*1c0319IPGB2i=1R4 z8luL9ectvrea@V|Ctw}0?B?+JB|jvikR7i&jE7OnV@$vY3>*2rC<vU6C${)Grx!Sz z!rSWdUgVJ{X_N#RXIu<<hqt5HqIu#1Yz5G_JHZy?8-uNQ5v&EbTe(N$nGWx5Mcb0s zMk%&1#b#U${UL9`>PP}{b#hs8wePieA+|QWMcZ9yZ}awUOK`8f?QQrSKe`*6x4ceV zofPYYNs%?hXTDtSX9*1Cw=~PptGZ>>jf!sQb2`zYfxkX{UZU#!mrvFvCzk%mO6iht zb5H=l^bBzIePg7Lj2+_@ZC~H3j+B#o8G-=Eq*y<61|hT<1&O95O1w6t9XgP3ijPpx zOB<$CjaWcgtf+>gxWxrIn>b1BneP))I_2zT7ttG%tI<r$sOnW?IICFBMm(M~Y|VMj zmcbeLVg~jw@Wspt4z1xksRcZSOX6(M3jw)|K__9FW1qSUdG!6Z6W#4|$@t~ZrrD2Y zBH{JIt{+95K`v4ZCHXoo4i!W#x2AVU7&7!QEg+(T|8P->BR8&1E<XKH*}^52O~O`+ zJ}@F}$Arg#7df!*Q*fdb{ifjE;1<8CHsMKSV{e4bI$p|tV)8x5XAlY*uyBG+)PC-D znhy$zMXIOb!tHG?cw8BejZ6FvZx15=z)+GX$S0?<P|L071{es*RcfZK+YeLrb7OMs z6VB<9?7xUYhVwHD4I~0#0~gqZl9FXW95=K*n;F>P&EXB$R}+r!5SgIoQNKv85WdlK z5|7u;Bu3zHq`$n3Fkx&MaWz#Sjfl$#b^0W17SE;~yhr{fg^hRttQ-oNdVaAVVq9WC z31XHqpE1ExPG<irWUjUmz=?PL%^;Cbco=M8K7fhvc_@J+1=L1-=b@A^DUBx+%(KMe zb3`Pw4U`%RD(+13z&aV*l=GN>WHlbiK-z>kTv6OVGA8Fn$Z#KPGX#^&W)whP>j(XJ z`DV^FCe}imW{0}EK^QsM7X}<#SI`~roYGk|#dq<}2fDO`X^u+C^b{GSeAE;lNzWsz zr|3dL-a#Q}@fumX<e)HN^%pSFb$@I7aV!Yg_gEg9jiLhcSZCyY<5__5T`!E@#$NA5 zfe@rb@e+|Kt=Z`UcG0PrL}Z;YDc9B}Lr#ChVWy)Wph8@Z^!xh085sxW2ocb}6<Irl zk-_v&^@AeHrI95$w<YIFcQ*%p#L#>_@yyQVpx365ViLk?2n;%nwk4A9K8KhneN1UK z3v3#?ID<7c8)ZS>Dn6YYW?B@!rzRZ*E#}LYc>)poP8js!0vmMuVd6X0d!-G;rQ|6o zLii53d6?aB%guHb_}#cNo-wnMRv3wCdDru72UuiCdoT7`-FRpr)TokwO`Ju3@FRMl z4O*h@iF(j>icDY8Miw+;FbevuPm9=h=!jSxzVkX`2sPV2Hq%3u&_;-b?I))Kawbq! z>OPXk#c8Z({W35qL6|y=mhf&<@OHNocuWR%Qb%#wr^99B--6?1KIS0xHxW{j=ERSP zyh7y1L{ylm)*^`AL>1yyl--$R_BBL};x!t7fyflrD+q9}D^BmAJ6s0I<c=uENdI39 z=adDEA81P@(F6F=d5jh$Wxs<$W&?F;19jNI>`2?eNp(frpJT@Ud{j6n%5z0^e_=Gw zbc8_jduOSH5sOg)HZnJw#Yw}2o(iMls5G)ii=)bDW>g*3nE7cP8nd9W!f;(`Oge_c zjOGdYBB!W4N{D!Xf(B~YqZ5+UoeCW5Qhic_@kAhmpj#}0SMgIK6j8<%$}7;ZfitDj zx{STIgFY0yi(Rzb4?;gAy~am!(SHjDj1fYR4#Y(wuY)uj_MzG*g^zb_v4(16N%?Hj ze>|6(wmch<J(lR`K!w>!;88<;;@m;?+L#I1ow^o80aTyQvqViZec=ZJ2M($nJ(Mr% zSw7^{L+#ZJ_p+8jAGHbWeh+-PwhU6KgCa;6Q7TSqnnJftU?o)i7DGb{xyIP8=qhlv zad^p4K3)2E>i5%$a+oL8XPEyOl_z69!+U*yel)*RV){{Se_`)gAl*I$=?fXs4T*G_ z92(yEMo5?2|KUJyBp4^~h)YEB$QEx>8|@_GGLf4^-Xb!MSGCVfd#fqDUZE!?Xcxzz z6>n23oh`)Ah`dYWDv>6U_lR60@;;I4L?{Xu>qN#rj&Py^Qn|<}P?VrcijaPXQQ<j| z2~5ay-4pSU1UtzbxnRejsc=abI6mGWaZ<;b7AM&q^B$#1?BsZJ;p{pL(z%@M@@KQ7 zEc3&D$Ga;Ju(v2o%feLhP3f*f88bqkJkG`i7+^1oi@_l3527SEjd7yucD){(HjHZW zigj<J!+(!SLW*BBv#Jjll$)O%kmHg9QsPu{NxQ8I>EMR-;WO8HBycdrVZ0Y%^CEYu z(jd<_vW*mL%a(m;{57MpJaTP+zzafpon{W_)ygJ%a|<KUXX2;eFPBPK%8pnT4%|3; zrwN4z#&GotM{bE^yA5YSI`NYrF^Q6s>{2TE&mQ8JOD53f{y?3g-4acb`ll(u%We9P z7_83DM&g3>;St5#RE*j%o6YscnW@BRTzwnT-Qwc1&pS*77Ugvu3RPWh^Q(A=I;%`s zG1*;QO%mPefw2*vp{LFhp`AxOMZ_WU3=y&w@gkA$6M31)4~hJQ2;o8eoX7`6eo5q4 zM5N)5ZBUjda*!xEhC)ayCO2G^c?8NOO#SH){IIfYSM0J~v~9b9zlHLwebPRG7Il1W zW18HL&~On~NGzo(_go5?(!nKegO-t$MI?x{Y(<)9Aug^{5+%&1M=8{&Sd6%p!4gsz z2=!Hvg5aLMpvYkU^Gk3MZ{m=7Hv_(-!Vn~;9DFwDa22^b^Z|iU=$J(=S?C_Z#9zRh zVmsYl#YI$;xWEv^v2vm+2zRq1I-T7KgV$)p*hh}lKpKc?f0S$dD{9Hn+|~`vcsQI@ z+@DWJH1uX;W(tVnW15{7C_E51%N)>W1;MF(gUCf9n?%O%mn8`UV&X5jq)FHSf&44x zs4w>;S8x$mNZ$SlDka6j&(gOQsMcf_?iE&GMcijt3Es+%%Q&&Tc?*ev@av}>;61LW zjL(#=RpVoIP0nVVgjRVfgzsD6N|5hT$p&&O>YRF(N@M9s?ZHn{dQ86fALyS<Mjgp` zJ3lchm+AkCP=;G$m>fuyt04s-w;c=w5+>wZn(*gx*=wjgNN`vi=hlEiw?y$I70T*C zLZ;m1^BhTx{1emUp|~*SB}L=^!|{uc@FsY1xp<ipmJGJgIwfi8!v00;s(?_qK=~_M zA08X8Y|3*tA5;%D7J7wSUWYi8?<H}HT_Pk);S-UT@>s7)%l#WJc|;zY3%p=avhuB( zOzraIr;e|0Hewqb0Pj8G_bcLE3J3914(G>m3;L#57dx~pLoJK&EILFEL1~HBq0oSE z5djfg=5l^5-v(BtyuKW(wejjQte(oRZW^zmo99&WEd4XR^9x*u0gn3U3v(Bl(^jkD z2<0BI`}n;rVN!4HPPZfa?ep!94_7NF&UA}?&Tn%b8$lQs>C4#emdtCoEAlg0mX%5} zRX7tD@t9bm`AXM=@EQ)vaf!xK!RmX_HgZ;h--~VV)uOsvy0q51_WJr;ZvDm{zJ&2h z$>(iRr4<z9_jUOtSw*8{Hp&%98JB5LO>NjMxm$;BNoEBut+}(ClU9I+namHMQj#BE zF0D<#CKG0CzotIHUm)3pFM;4o66Lp&KT3ve)G6;}+QwXA?l;=2bG5mXb0=(LL54_0 F{lA{*9cTam literal 0 HcmV?d00001 diff --git a/database/controllers/queueController.py b/src/database/controllers/queueController.py old mode 100644 new mode 100755 similarity index 95% rename from database/controllers/queueController.py rename to src/database/controllers/queueController.py index e868ea0f..f7f0dac0 --- a/database/controllers/queueController.py +++ b/src/database/controllers/queueController.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 + from time import time from typing import Optional from xmlrpc.client import Boolean @@ -11,16 +13,16 @@ import rospy rospack = rospkg.RosPack() DB_PATH = os.path.join(rospack.get_path('ros_acomms'), - 'src/database/databases/queue.db') -db = SqliteDatabase(DB_PATH) # database location + 'database/databases/queue.db') +db = SqliteDatabase(DB_PATH) # database location class BaseModel(Model): class Meta: database = db -class Queue(Model): # database model +class Queue(BaseModel): # database model identifier = IntegerField() # Unique ID message_raw = TextField() @@ -272,3 +274,13 @@ class Fragmentation(BaseModel): if(val.identifier > highVal): highVal = val.identifier return highVal + +if __name__ == '__main__': + '''This script can be ran standalone to create DB tables and check that classes are properly working''' + db.create_tables([Queue, Fragmentation, msgEvents, Topics]) + + queueDB = Queue() + fragmentationDB = Fragmentation() + msgEventsDB = msgEvents() + topicDB = Topics() + diff --git a/database/databases/queue.db b/src/database/databases/queue.db similarity index 96% rename from database/databases/queue.db rename to src/database/databases/queue.db index 393053933e96565b6bfe691bf6e093f117b0355d..7c2457ad825c8d0829018be3f0e0772827aef903 100644 GIT binary patch delta 966 zcmb`GPjAyO7{(L2CZnvOJ)o=tX;FnXDot?U4%(kEO)6!YGHnuv$W2VKNa8GZSM}7j z)9#=hCT?is3qZfX4txpjNPGrvyz!c>h8`#7wd}{Q?O&ePKE5g(Ull$+tez{1Vn7ri zbVbpAd|G{8J6E2*{8<J?D=jKUWK_#97J8-MrNwWDU+z8|HZ|R>)zlH=s2yNVW8$&v zc79{mUUO{ASzB+~R$Yi^m&TJph#5m2?DBq$>(+M5vA679tM$gQT6@jrs#VVtA{0V` zV;{iG&1{?ynh@Tfxs)&$d7K>0*tl_0F6yRfE~}?~0ejfRT`U@1MituGSL0QVnxNkQ zAQ>i(JnCcrIsx!~%y<q5;~4b=ihQ?CILng)9Ci5ni8k>d&83VmPG}@H7^KdC`Yu78 z5Jw!PGcHe%_W>u<N1z~}Jy%Mbo`w|>3kl~b<;`Ip4P&U8EM|X|R%d@Y+b0PIYbR(a zS!1|4r<+Sl>S$jW8nO;P0!)^7G}+iphikAk;6zk+`?Wn?T}Hc!H=}z0`kJq440j5; zxw4{;Tmd1OAPtHVj3K)!%6(f%Z|?|OhDAZtMhu5=LxQ|BCy~@MT@8TZvDz3%#^>q- gZSoq^x3>Us527@FWc|8-QK{tL_ljN_?^u7_zy9zfRR910 delta 91 zcmZoTz}#?vd4jayBnAcsJ|JcWVjcztw*3=zjQJ-q=-J=o1&T28<uUN(Efy5u+nmQY jQ&5x}D8>TBtU$~L#Oy%Ku~|mp3IAq=f(Q9PLcsw55#bS( diff --git a/src/packet_dispatch_node.py b/src/packet_dispatch_node.py index bb9990aa..1abcadd7 100755 --- a/src/packet_dispatch_node.py +++ b/src/packet_dispatch_node.py @@ -7,7 +7,6 @@ import roslib.message from dataclasses import dataclass, field from ros_acomms.msg import ReceivedPacket, Packet, CST from bitstring import ConstBitStream, ReadError -from database.controllers.queueController import * from acomms_codecs.ros_message_codec import RosMessageCodec from acomms_codecs.packet_codecs import packet_codecs from database.controllers.queueController import * @@ -153,7 +152,8 @@ class PacketDispatchNode(object): while True: id = payload_bits.read('uint:8') ros_msg = self.message_codecs[id].decode(payload_bits) - Queue.addRecieve(ros_msg, payload_bits) # Update database with inbound message + # Update database with inbound message + Queue.addRecieve(ros_msg, payload_bits) self.message_publishers[id].publish(ros_msg) except KeyError: # We don't have a config entry that matches this ID -- GitLab From 8e7532305923ed81980db77789098cab648a5b0a Mon Sep 17 00:00:00 2001 From: Brennan Miller-Klugman <millerklugmanb@wit.edu> Date: Thu, 30 Sep 2021 16:36:48 -0400 Subject: [PATCH 4/4] Attempted to repair message queue system, adding messages working --- .../queueController.cpython-38.pyc | Bin 8693 -> 9098 bytes src/database/controllers/queueController.py | 45 +++++++++++------- src/database/databases/queue.db | Bin 57344 -> 57344 bytes src/message_queue_node.py | 22 +++++---- src/packet_dispatch_node.py | 1 - 5 files changed, 41 insertions(+), 27 deletions(-) diff --git a/src/database/controllers/__pycache__/queueController.cpython-38.pyc b/src/database/controllers/__pycache__/queueController.cpython-38.pyc index c7fdbc985d52cd8e48975fa9268321a97eeef8d9..c21c3b64073ec4afee8fda7cbb740f79a542e89d 100644 GIT binary patch delta 2645 zcma)8TWl0n7@jk`J3D(X+skg-?e?a?K&9M70UJsw<svPFE+_+7hn>T+bi2DfGX>gg z?E^lUG=hg`ZH+!O@d0gOOpJ*SJ{TjBL`lp%sL`0H4{A&#KB)NrXSOXgk}#Y3=3M@B z&i{Y^<?NsPUO5mw5s3sPc#<_IjmsOaM^hwyedC>j*9dE(wR6q1fi}`4P2{}ugh()w zS~b#eRnl8%BVd{)0b6MTu#L9Owa-gL;$7spuhnAN?!_7MHc7{cBvDBxrliZ%V-i!I zCAydT+}7eQk4(tIVvqMWAr6l!Nun>VQ(~mC7F1+O0L{Z1;p6zSrHP`=(!9OCmY?%~ z*@+TP$Yw^-8q3=@HJ2p)LXxto?5O$^Y2=x};0&m;{P>{dfp;Ig*7E=+nFi&iWn1c( z&NfYZDxQm^LPotTjIC(`2(#m1+cW2#N12XkZ^e62uE?V@+YBwA>8Z998HgB`#y6Yt zu%vmXhI7;&qF4A|fx&G*--BmpK=V6Frc};nEk`Z0e2L}lNhdr;Ez8K!5qq*sSq&(~ z&`53sTekZI3wpFrx=bzmSapeB|6qS`_&^5MI-W5rS2h3jU|qZpc9CQ+@e-8;i9%GF ze-ONxScfO9N7#a}0YH;kBcBOvB{}|H=q+_E3@s2CYOk+T%cb%WdfqPZsqmX?SJrAE z<HW$kBZg%kvMDp{5)+%Tr_fnJTzEX6yI_y=k;tc{ldq2+O5?GPziP01v}+>U4m14K z=zcQ6??i6}JD{5x&=jWecVexu`tw*v?`{}m&mi<5AXSc%Wz>M<cS1+2?zIQndZ=tn z7D|TMm$xm}3nQ!#^$G_K(>zYId3uSyv^ZF^hU~?&7)FT}WoQ=b18^csZpoHRnq^2P z!>|$I=>+Zk7#)GFsL49{8*6s-C2+1EVL!l-rZNRPt8_*+SA5I{psUphC$l{~5x>>c zivp+s>jKbK5!EM+0{Hdc_y#h&sMV&)463}OEn3SCB0Q)gJA`AM2!|0)Asj(it!m&= zRDBj#3Vmytg1HF_e^=L~oxtWvgy#@2sj75i3yolDgj2wNGg;t0_0xbK)*nuYRQ5q0 z6m3T-P1xlLdjSJ6)u542_&~!YMU(>(PLU9!_5>?tjB^G08UMcF#(K61wuS6p&KTjm z1-EeM{tFO$E^qHP)%uY4FZlb3p&1dlfegK359DdVbh+M!8j5sS#UWblg!@XR0yT>6 zf)xq{w_9;3x}3Elv?Dx@@C3q>2s;p-M%X2R=h$9s9zf6$P9qoy=MZ-CZ{#HZC>i6o zlL`JqGESoWZgR`aF_?UKP1Tj<I_sxuwc-Yr$AYG6dQIQUL=TyY>3><$!=?wyRDgBE zwi&hta43rryYyh&53eYdNlB`eYzb>)MXq=%-U^iRSs#C`@hJ5P;#SI7(HNs_GpxKn z8)R{g8$XI$)M85shIJ1-r7sWQ4(PfJZsJB2rY_&p6i9vxW0ndad4fci<a<*wQBnD? zsRW;`k8EjzDO;K&qFRTii7i*iRk+l~fs|p1oq=fx8njxJS$38mZAs>l{vYABya19X zOr~nxLcPh%Tjhc=DFRO;Lt2CdK!V|I%0dVfLB!5^Y~>Mz`8jMYR|Iwuy9PS91yBDP zUMmWK#ku;Eg9_{BHR%_5Pcv^C#RU&mT(`h(6m0y{945c>VCu}K3m~X@-P$EX+fl62 z1j|xU5Qd<*i=8lc7obA&8>w|<h+|fuN_Y8JseB@fTX<YSk#(3R%oI|XJAd;+OK-O@ z6=(E72I`-i4|pOgA&A@=!<MjB1ab?W{R_NRTNSv7;3un!s<2V8wT`D71365pM-1+z zzZ0tIF})DT0n=wH@C}-NQ-yEH3_xH9oyY()a$~e;8+N`_bVHoatGhZ_0S<$^g2Cq@ z{31DMNSObTej#2&dD)dm7Fl_jy*Qt0EiJJpHvB*Kl04bAjf|u3yHH3W%X}<(Uhsi) zjyf_jX5@<_BU%T)*0$S+7oNj!x1~uv|E=w`AwpMpRGf#=2sX1)dBn!zXz3C6O00Gr z|M&!*p#6PL?EL+Nz(5H8FYLbr3|Y|e+xW+%zfXi{g5PY9ksbWw_Q&dbYvMJjn&zM! Lm*cMUz<vJ#*w;#o delta 2084 zcma)+U2GIp6vyYz?#@nkyB}=Z{b;w_WkKi&@=*|#FU!&{A7VjhAr9a&-MPD6y1TR7 zovqZAV1kK>PmpV(@kQ|Cfj3MOUrbE=c)%DEVnQ?%<HrLU<C8Hy5Tba_9a^Bi*rvbU zx#!$_|L5Fu&l`LG+8v$<g#r@%guk9JZVp}wr-^)N&8@ZUAKG=fkH%?=CTXJLy+VjY z;};}7O;f;rngnKO0yx0;lEb8L{$27Ri3f-zQAsDJq|4L;RxkBkkme^nG9lVL_1+}C zdaN`>OVdS0jnb4cS*=y=S;sSD)SUT)N`$0hpoS#?nujIfpW`c8?S{?L{AK?yE70l$ zZDurD^{Q=CbHU3myeI>|SJk^D$>%zrCo=y*Np9HyYi#K<xgw27Cv)fIIr+3aN2X<_ z!pLJTjJ&c$q$AQ9)s&CG;e0qSkuHpBJ{E=bOdQ-<r>&Mzp+$SPN%{3af5_Fyo<Ns0 zPX$*iDhwqLKODT4ScS8z5yGy)NRwHTr#gp-!4GtPu<|jOx=KWxEQF2%VHi37rqw(^ z%XXP}hu#Z5*lLh}7V1%OFE+%#52eX8zZ1Gna{Tk~z6>7b`0uM^;y|C9MNhp|;aX&r zSlo)-2rLK3@q(tXL0*Us!0K`|H@pQmeG2h30^@d+5?qu`*=G2FT;PqctpGjPG-hj- zVdkrLi*3j0o&0(<L(b0s8eK+q;Q<V)LDIr(P*?#@Xd!_m%cLdNg@$Jlc;QaKuGVSM zFimQ*9DgRZB_BuIFk%;AOjDVHA4`Oywg<Pw^6;s3-<QfiirwgS<)POafUb&KIcn74 z%HHmJoV-5&MOTKLN68CnF+?_kcvK@B#km}!fS5q+MJ$!t@qeW5!IeU97wcWjE!&6F z`w`C}4)Ai%+MY*)U}>~W@K1XB$ou@;o^wRw6Y>2CQK!Bpt2S&$vD$XCZHt=wIIfXT z`A_i~MLe(}ej+hIzT|U>HwS;a2S4zR7VBJG*j2Z*b^lbgGUev#=WLe`bNv-RlpH&c zx9W6^)3bJ=N^2(DiGtQ6M71x;8%FAM<}Is6jfT76LE1z<*WpsUW|l!@5i1dE5gQO2 z5t|TCO0b6Qz~OGhQN%ICal{G4X1*+)<i}DGk3tk)O|2p-znI#%0}uLdlhPH@u>4eo zw$)*JP1*FkM)ZJ5pd0ucW!!33ORX{fPIfbYv3K8juL#poA=MUx9`<mV%B+L}#KpRA zMvp91KDG`_=OOesf*40EdHpK9f>G#tv)_TSr2-_6N@PjC_ew-4;oJHGd5w%|LDqrm zv5T`H;uJz$fdM0Zv1c`mI%O1%lZatN8B>%(vERU73pY^-WD9>G2wq74sZ`K$6(7zf z(@R7K8LeA0ROr0TTm2{2nc%B=-3Kqj*il+mo0X_|MPpEiVl&LkRmcnfyMHy=&#|n^ zF!ej%k*UT`;uap)QIzT+zu1=)b@vCql^I@t3LL}<ddE2Q3%8c=FgA@4byvj^?&%hR z50Cwk8v_YaftXH(Lj`73>a<}ScGYU|Yw4|FHU&P=;TpI~gv)PdBR<@T@H5$$VyDqA zyLOC#)fSce`JJq_pj>RQM0tq6wR{t)W43*0zl&O;IC9itv2Iiw#iEwwk=!;P_FtVJ z%w<S7Z{&_27L_E@CN{>))<bSnF51Rqjka|EnKo_Hk-Q!$KO7cJ1flDX&On?k`0(3U q!XtT3Jl#=8-CnoMc8%`zz>cjU{=$lt%eF>C(R8#=m1DA71pfjGvCvWg diff --git a/src/database/controllers/queueController.py b/src/database/controllers/queueController.py index f7f0dac0..53626f4a 100755 --- a/src/database/controllers/queueController.py +++ b/src/database/controllers/queueController.py @@ -13,7 +13,7 @@ import rospy rospack = rospkg.RosPack() DB_PATH = os.path.join(rospack.get_path('ros_acomms'), - 'database/databases/queue.db') + 'src/database/databases/queue.db') db = SqliteDatabase(DB_PATH) # database location @@ -24,8 +24,8 @@ class BaseModel(Model): class Queue(BaseModel): # database model - identifier = IntegerField() # Unique ID - message_raw = TextField() + identifier = IntegerField(null=True) # Unique ID + message_raw = TextField(null=True) message_encoded = TextField(null=True) message_type = TextField(null=True) dest = IntegerField(null=True) @@ -86,11 +86,17 @@ class Queue(BaseModel): # database model Queue.delete().where( Queue.identifier == idVal).execute() - def getNextMessage(self, message_type): + def getNextMessage(self): '''getNextMessage is used to get the next message within a specific message type queue''' + lowestPriority = -1 + lowestPriorityMSGType = None + for msg in Topics: + if(msg.priority > lowestPriority): + lowestPriority = msg.priority + lowestPriorityMSGType = msg.message_type for val in Queue.select(): - if(val.message_type == message_type): + if(val.message_type == lowestPriorityMSGType): if(val.completed == False): return val.message_encoded @@ -208,18 +214,23 @@ class Queue(BaseModel): # database model class Topics(BaseModel): # Gets created at yaml read - message_type = IntegerField() - message_name = TextField() - priority = IntegerField() - is_active = BooleanField() + message_type = IntegerField(null=True) + message_name = TextField(null=True) + priority = IntegerField(null=True) + is_active = BooleanField(null=True) + direction = IntegerField(null=True) - def addEntry(self, message_typeVal, priorityVal, is_activeVal, directionVal=None): + def addEntry(self, message_typeVal, message_nameVal, priorityVal, is_activeVal, directionVal=None): '''addEntry function creates a table entry based on function parameters''' - Topics.create(message_type=message_typeVal, priority=priorityVal, + Topics.create(message_type=message_typeVal, message_name=message_nameVal, priority=priorityVal, is_active=is_activeVal, direction=directionVal) + def displayTable(self): + '''*Test Function* used to display all values within the database''' + for command in Topics.select(): + print(command) class msgEvents(BaseModel): identifier = IntegerField() time = TimeField() @@ -249,10 +260,10 @@ class msgEvents(BaseModel): class Fragmentation(BaseModel): - identifier = IntegerField() - time = TimeField() - source = IntegerField() - dest = IntegerField() + identifier = IntegerField(null=True) + time = TimeField(null=True) + source = IntegerField(null=True) + dest = IntegerField(null=True) def removeEntry(self, idVal): '''removeEntry function removes a table entry based on the specified idVal''' @@ -277,10 +288,12 @@ class Fragmentation(BaseModel): if __name__ == '__main__': '''This script can be ran standalone to create DB tables and check that classes are properly working''' - db.create_tables([Queue, Fragmentation, msgEvents, Topics]) + db.drop_tables([Queue, Fragmentation, msgEvents, Topics]) + db.create_tables([Queue, Fragmentation, msgEvents, Topics]) queueDB = Queue() fragmentationDB = Fragmentation() msgEventsDB = msgEvents() topicDB = Topics() + topicDB.displayTable() diff --git a/src/database/databases/queue.db b/src/database/databases/queue.db index 7c2457ad825c8d0829018be3f0e0772827aef903..5af9916508eb874eefde8083b0ff4cb3bc8252fd 100644 GIT binary patch literal 57344 zcmeI*&u`mg9LI5|ZIZV6wLu^h=<0Trx`+u5a6w{>u8mMx*RhfcCNxEE@}#wv#F_21 ztGLik`wMnK+FyY92RI=90whixI3Ohc1jHHfwd17r)Fsh+S<u^(o8<AwPoC%XwJo_h z_=`K6rmOV2Wj761e_y+x#bVkgx~^&I?=(%z$~q})QqvOu$m^7>y(QLr^ZZxo<K64O zXC#oAy{Bbc*^8Oo^wZS5d|*KU0R#|0009ILc-aJgJ@<f)uO*U&xw+WMj%(~Tlxwxk zs?+^Dy|!IiEtmB2>W$5kzSM18(yuI;wIzLHt6ciDw5@O5F6&!6o16OQ+Z(r5x9{pV zOLwp8OHJiC#-6IUM{U&)huT|4Q`*a=ugcQcwoS`6-J@{ObSg&GH4h?t*GyYUqt)ss zizna2lZ8Sd_P9I#2OZT>{-0!Y^nTljgEwoc<(hR<+5PVc4c#^l`y=m7Q?;sAO$kdd zqndKu@V;&1s9_nk%C6};{ZK=-_T2rTE%5X-EXQ;rla$7~ZR|B=RED>q;@U>_espw3 zqhTFZBC+rQWRI59G+oyl<mq=+t!7(_9&PMR>e<?zp?|zSl`JeQ#7@5SkDjKprw(M4 zPXA3JT7`c5i&vt5#EFV)`bW{mtx~wvu{w73^tcIciqxgQp?G|CGFezziJcVuX+#tw zAL7yJM7F*7lp-^c56?_MS|Z~vem&u%AIJUC?i!A2%F);8znO}Tvfn<+2#b$$v1DO> zKKA&5-@6<kO`}zFdao1FzI)B1bS?8Z<IL(;K{f}ymHmr`cPgE+v3vT2H&`9F-I3ER zIQMMjcI?)gh=h;yA*S->-oYeYg?HLlM&xv98V}{94JT~xV&k}5oo2W-ID<~-A>CUC z%085<Tx3-0p{jO#=kV@zv!T}gD~LCxqIYjDH~lOKAb<b@2q1s}0tg_000Iag@c$P` z$P~Tb`n`V#q-9OXIxFjptkbf}@Bf;YiOcHcCS+wn009ILKmY**5I_I{1Q0*~fiV%_ z_x~{|6vIRS0R#|0009ILKmY**5J2D@0^I+f1CL!1KmY**5I_I{1Q0*~0R#{j69Ml3 z$D~jU69EJeKmY**5I_I{1Q0*~fpZ9O|9=iVc0~XI1Q0*~0R#|0009ILKwwM+y!-#f z)V!AaBllzONv@guEVrEfH~U-mhwQi6O7^2{A@g_Um(2H>N11z>4>GgqKhr;_pQaDe zchc{qr&E8Weo8$_nW>MZ3l;<rKmY**5I_I{1jbunVKJ%2K8($1seRRG*E@}hYgrBF zJ!RXLy;{AWFX|GwC4t#!z_z-NIV@H8t!@92VfiZy61^$Wnde7KE!RHEuUwMkh9sw> zN$<&BYgVgndXMUsC-vo*=Owr%!DKXOD@VE8^4vC8t>4Hmza69!gHnrcN$Q&P6_2L8 zFWqT-&lk)uzZrBi6^#TBRtx5Nc}`NR($VCZDSwKWF250kCZeJCk-KlTf}R!&Np0r3 z^mJCH+t0%5($ByGy(WPX3zV1WXa#yzk|P!96$uV6(8VA%xIh;qHKahfprgSB%1UZz zfigj8V1d%Z3zU+61{P>m0wWe^Mxvt?Xj+mZ6(}je;RQ+rslf$`OKM1grh<+J7idya zLklz!ga#HUHnc$FeSv252q1s}0tg_000IagfB*srAb<b@2q1s}0tg_000IagfB*sr zAb<b@2q1s}0tg_000IagfB*srAb<b@2q1s}0tg_000IagfB*srAb<b@2q1s}0tg_0 k00IagfB*srAb<b@2q1s}0tg_000IagfB*sryc`1m0qW;issI20 delta 300 zcmZoTz}#?vd4iM>_jCpZ20kEW24Z#w2DT#;b&RBdqI&i>d4WP~ToMd?d3=Jrc0A== z&$%Qv3kodY<ZAR`V;7f{WNa#)oX(Y0ui)n&qTm<m<D;XXl$%;yoS2>(pO=`Ms-zI& z8Ud6+$QBf3<`-p_R4OTW`h~c<y9OakW){aMCYNNE0rjEFYBuw-vWtt0GqzTiBqrsg z7M7-#LI`%}pvjip-jn%wd8A-=7bTXXS-p7%_i0AX0A_Y^b#=z3_{llEr9d4an+5rV igm}4t{^SN?9w6r3Y@l$(e=~~!v%qG7h7a)*12_Sn?@pHh diff --git a/src/message_queue_node.py b/src/message_queue_node.py index fbef4f8d..6392a66f 100755 --- a/src/message_queue_node.py +++ b/src/message_queue_node.py @@ -24,7 +24,6 @@ from acomms_codecs.packet_codecs import packet_codecs import json from rospy_message_converter import message_converter # TODO: move this -from packet_dispatch_node import DecoderListEntry @dataclass @@ -148,7 +147,7 @@ class MessageQueue(object): if self.order == 'fifo': if len(self._queue) < self._queue.maxlen: # drop this message if the queue is full. - Queue.addRight( + self.queueDB.addRight( message, encoded_bits, self.dest, @@ -161,8 +160,8 @@ class MessageQueue(object): # master message queue draws from the top else: # For LIFO, we threw away the old messages - Queue.clearLifoMessages(self.message_codec) - Queue.addLeft( + self.queueDB.clearLifoMessages(self.message_codec) + self.queueDB.addLeft( message, encoded_bits, self.dest, @@ -213,6 +212,11 @@ class MessageQueue(object): class MessageQueueNode(object): def __init__(self): + self.queueDB = Queue() + self.fragmentationDB = Fragmentation() + self.msgEventsDB = msgEvents() + self.topicDB = Topics() + self.topic_list = [] #List of dict for topics rospy.init_node('message_queue', log_level=rospy.DEBUG) @@ -291,14 +295,13 @@ class MessageQueueNode(object): self.topic_list.append(topic) #append topic to list - Topics.addEntry( + self.topicDB.addEntry( message_codec_params['id'], ros_type_name, - priority, + int(priority), message_codec_params.get( 'is_active', True), ) - rospy.Subscriber(actual_topic_name, AnyMsg, partial(self.on_incoming_message, topic_name=actual_topic_name)) @@ -341,7 +344,7 @@ class MessageQueueNode(object): # Check if incoming msg topic name matches any of the active queue topic names found = False for val in Topics.select(): - if (topic_name == val.message_name): + if (topic_name in val.message_name): rospy.logdebug( "TOPIC: {} in active queue list, appending msg".format(topic_name)) found = True @@ -390,7 +393,7 @@ class MessageQueueNode(object): msg.queue.mark_transmitted(msg) def get_next_message(self, remove_from_queue=False, check_reachability=True): - return Queue.getNextMessage() + return self.queueDB.getNextMessage() def get_next_message_smaller_than_bits(self, size_in_bits, dest_address=None, packet_codec=None, check_reachability=True): @@ -483,7 +486,6 @@ class MessageQueueNode(object): while not rospy.is_shutdown(): # self.time_since_last_tx_pub.publish(rospy.Duration.from_sec(self.age_of_oldest_queue.total_seconds())) # self.messages_in_queues_pub.publish(Int32(self.message_count)) - self.update_queue_status() rate.sleep() diff --git a/src/packet_dispatch_node.py b/src/packet_dispatch_node.py index 1abcadd7..e62fdb97 100755 --- a/src/packet_dispatch_node.py +++ b/src/packet_dispatch_node.py @@ -153,7 +153,6 @@ class PacketDispatchNode(object): id = payload_bits.read('uint:8') ros_msg = self.message_codecs[id].decode(payload_bits) # Update database with inbound message - Queue.addRecieve(ros_msg, payload_bits) self.message_publishers[id].publish(ros_msg) except KeyError: # We don't have a config entry that matches this ID -- GitLab