From 823da935f9afccb40273f9049a15736fb213c235 Mon Sep 17 00:00:00 2001
From: Brennan <brennanmk2200@gmail.com>
Date: Wed, 29 Sep 2021 16:49:20 -0400
Subject: [PATCH 1/4] Created Queue DB and refactored message_queue_node

---
 .gitignore                              |   1 +
 database/controllers/queueController.py | 218 ++++++++++++++
 database/databases/queue.db             | Bin 0 -> 57344 bytes
 src/message_queue_node.py               | 369 ++++++++++--------------
 src/packet_dispatch_node.py             |   2 +-
 5 files changed, 379 insertions(+), 211 deletions(-)
 create mode 100644 database/controllers/queueController.py
 create mode 100644 database/databases/queue.db

diff --git a/.gitignore b/.gitignore
index 79f33fdd..c0706b96 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,3 +2,4 @@
 /src/__pycache__
 /src/acomms_codecs/__pycache__
 /src/acomms_codecs/field_codecs/__pycache__
+.vscode/settings.json
diff --git a/database/controllers/queueController.py b/database/controllers/queueController.py
new file mode 100644
index 00000000..3843ca3e
--- /dev/null
+++ b/database/controllers/queueController.py
@@ -0,0 +1,218 @@
+from time import time
+from typing import Optional
+from xmlrpc.client import Boolean
+from peewee import *
+import os
+import rospkg
+import json
+from rospy_message_converter import message_converter
+import rospy
+
+rospack = rospkg.RosPack()
+
+DB_PATH = os.path.join(rospack.get_path('ros_acomms'),
+                       'src/database/databases/queue.db')
+db = SqliteDatabase(DB_PATH)  # database location
+
+
+class BaseModel(Model):
+    class Meta:
+        database = db
+
+
+class Queue(Model):  # database model
+
+    identifier = IntegerField()  # Unique ID
+    message_raw = TextField()
+    message_encoded = TextField()
+    message_type = TextField()
+    dest = IntegerField()
+    payload_bits = TextField()
+    length_bits = IntegerField()
+    posistion = IntegerField()  # Pos in queue (relative to topic)
+    fragmentation_tracker = BooleanField()
+    allow_fragmentation = BooleanField()
+    transmitted = BooleanField()  # Transmitted state
+    completed = BooleanField()  # Transmission completed
+    message_type = IntegerField()  # Corresponds to message table
+    direction = BooleanField()
+
+    def get_next_message_smaller_than_bits(self, size_in_bits, dest_address=None, packet_codec=None,
+                                           check_reachability=True):
+        try:
+            for val in Queue.select():
+                next_message = None
+
+                if dest_address and (dest_address != val.dest):
+                    # Move on to check the next message
+                    rospy.logdebug("Disqualified: dest_address")
+                    continue
+                if not val.allow_fragmentation and val.length_bits > size_in_bits:
+                    rospy.logdebug("Disqualified: too large, no fragmentation")
+                    continue
+                break
+
+            return next_message
+
+        except:
+            return None
+
+    def bit_count(self):
+        count = 0
+
+        for val in Queue.select():
+            count += val.length_bits
+
+        return count
+
+    def message_count(self):
+        counter = 0
+        for val in Queue.select():
+            counter += 1
+        return counter
+
+    def removeMessage(self, idVal):
+        Queue.delete().where(
+            Queue.identifier == idVal).execute()
+
+    def getNextMessage(self, message_type):
+        for val in Queue.select():
+            if(val.message_type == message_type):
+                if(val.completed == False):
+                    return val.message_encoded
+
+    def clearLifoMessages(self, messageType):
+        Queue.delete().where(Queue.message_type == messageType).execute()
+
+    def popLeft(self):
+        Queue.delete().where(Queue.identifier == 0).execute()
+
+    def popRight(self):
+        Queue.delete().where(Queue.identifier == self.getLastIterator()).execute()
+
+    def updateTransmit(self, msg, transmitVal):
+        dictionary = message_converter.convert_ros_message_to_dictionary(
+            msg)
+        message_str = json.dumps(dictionary)
+        Queue.update(transmitted=transmitVal).where(
+            Queue.message == message_str).execute()
+
+    def addRight(self, message, message_codecVal, destVal, payload_bitsVal, length_bitsVal, time_addedVal, transmittedVal, next_hop_ackedVal, priorityVal, fragmentation_trackerVal, allow_fragmentationVal, is_activeVal, directionVal):
+        dictionary = message_converter.convert_ros_message_to_dictionary(
+            message)
+        message_raw = json.dumps(dictionary)
+
+        idVal = self.getLastIterator + 1
+        countVal = 0
+        for val in Queue.select():
+            countVal += 1
+        posistionVal = countVal + 1
+
+        Queue.create(identifier=idVal,
+                     message_raw=message_raw,
+                     message_codec=message_codecVal,
+                     dest=destVal,
+                     payload_bits=payload_bitsVal,
+                     length_bits=length_bitsVal,
+                     time_added=time_addedVal,
+                     transmitted=transmittedVal,
+                     next_hop_acked=next_hop_ackedVal,
+                     posistion=posistionVal,
+                     fragmentation_tracker=fragmentation_trackerVal,
+                     allow_fragmentation=allow_fragmentationVal,
+                     is_active=is_activeVal,
+                     direction=directionVal)
+
+    def addLeft(self, message, message_codecVal, destVal, payload_bitsVal, length_bitsVal, time_addedVal, transmittedVal, next_hop_ackedVal, priorityVal, fragmentation_trackerVal, allow_fragmentationVal, is_activeVal, directionVal):
+        dictionary = message_converter.convert_ros_message_to_dictionary(
+            message)
+        message_raw = json.dumps(dictionary)
+
+        idVal = self.getLastIterator + 1
+
+        for val in Queue.select():
+            posistionVal = val.posistion - 1
+
+        Queue.create(identifier=idVal,
+                     message=message_raw,
+                     message_codec=message_codecVal,
+                     dest=destVal,
+                     payload_bits=payload_bitsVal,
+                     length_bits=length_bitsVal,
+                     time_added=time_addedVal,
+                     transmitted=transmittedVal,
+                     next_hop_acked=next_hop_ackedVal,
+                     posistionVal=posistionVal,
+                     fragmentation_tracker=fragmentation_trackerVal,
+                     allow_fragmentation=allow_fragmentationVal,
+                     is_active=is_activeVal,
+                     direction=directionVal)
+
+    def displayTable(self):
+        for command in Queue.select():
+            print(command.output)
+
+    def returnTable(self):
+        return Queue.select()
+
+    def getLastIterator(self):
+        highVal = -1
+        for val in Queue.select():
+            if(val.identifier > highVal):
+                highVal = val.identifier
+        return highVal
+
+
+class Topics(BaseModel):  # Gets created at yaml read
+    message_type = IntegerField()
+    message_name = TextField()
+    priority = IntegerField()
+    is_active = BooleanField()
+
+    def addEntry(self, message_typeVal, priorityVal, is_activeVal, directionVal = None):
+        Topics.create(message_type=message_typeVal, priority=priorityVal,
+                      is_active=is_activeVal, direction=directionVal)
+
+
+class msgEvents(BaseModel):
+    identifier = IntegerField()
+    time = TimeField()
+    source = IntegerField()
+    dest = IntegerField()
+
+    def removeEntry(self, idVal):
+        msgEvents.delete().where(
+            msgEvents.identifier == idVal).execute()
+
+    def addEntry(self, identifierVal, timeVal, sourceVal, destVal):
+        msgEvents.create(identifier=identifierVal, time=timeVal,
+                         source=sourceVal, dest=destVal)
+
+    def getLastIterator(self):
+        highVal = -1
+        for val in msgEvents.select():
+            if(val.identifier > highVal):
+                highVal = val.identifier
+        return highVal
+
+
+class Fragmentation(BaseModel):
+    identifier = IntegerField()
+    time = TimeField()
+    source = IntegerField()
+    dest = IntegerField()
+
+    def removeEntry(self, idVal):
+        Fragmentation.delete().where(
+            Fragmentation.identifier == idVal).execute()
+
+    def addEntry(self, identifierVal, timeVal, sourceVal, destVal):
+        Fragmentation.create(identifier=identifierVal,
+                             time=timeVal, source=sourceVal, dest=destVal)
+
+    def getLastIterator(self):
+        highVal = -1
+        for val in Fragmentation.select():
+            if(val.identifier > highVal):
+                highVal = val.identifier
+        return highVal
diff --git a/database/databases/queue.db b/database/databases/queue.db
new file mode 100644
index 0000000000000000000000000000000000000000..393053933e96565b6bfe691bf6e093f117b0355d
GIT binary patch
literal 57344
zcmeI&O>Wab6aZkS{1hk@cC0FmvH}UQ;Q&yIg(^aT5G1+`%@iWlN$WUJ)`$ymA<n^V
zcqxq%RIy_P-zZOGy?K7}e7lML`c-Gt<a98pt3i_<6t{|2t9X=BQEYrGiefY3Ld5l=
zxcoilixG>_T3q~jjvn_P{<skZt<7-}KZ5`P0t5&UAV7cs0Rja63xO{;TkE&Cwpw40
zo58!oyg#X`!FV`b%xgQna=Txq{`TWenc9nVJKbrIhV8W5?Uzr>Uh2N+r|z3hC%x?L
zKHu)`rDx^dy%Z&R+>8!JxjtVS%V*vCpO11o^~<-><E+ne@%+MM)*Q{6U-_u3b2F>Q
zJCkAbID4GPPA9W^|F_IAPvfR#MKh{$v|3Gl$b;pcqDeJ4In3kLiaH<9qEB~Lt!Aq{
zoetjT<v!xx$;VuOs^cKPT9{Ar{_NCwHG42R%ui0AZf<aQem0Lozd?Wi0RjXF5FkK+
z009C72>km38?nXtyZuWMmm}6Au0&joxE3*Ak8c2q`QHWeZvxgL3<3lQ5FkK+009C7
z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N
z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+
z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly
zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAn?}&egZ|O
BgT?>=

literal 0
HcmV?d00001

diff --git a/src/message_queue_node.py b/src/message_queue_node.py
index 659dac90..fbef4f8d 100755
--- a/src/message_queue_node.py
+++ b/src/message_queue_node.py
@@ -1,7 +1,7 @@
 #!/usr/bin/env python3
 from __future__ import unicode_literals
 
-from itertools import groupby
+from itertools import count, groupby
 from typing import Union
 
 import rospy
@@ -17,13 +17,13 @@ from ros_acomms.srv import GetNextPacketData, GetNextPacketDataRequest, GetNextP
 from ros_acomms.msg import NeighborReachability, SummaryMessageCount, QueueStatus, EncodedAck
 from bitstring import Bits, BitStream
 from fragmentation_tracker import FragmentationTracker, FragmentationStartHeader, FragmentationContHeader, FragmentationAckHeader
-
-from acomms_codecs.ros_message_codec import RosMessageCodec
+from database.controllers.queueController import *
+from field_codecs import RosMessageCodec
 from acomms_codecs.ros_packet_codec import RosPacketCodec
-
 from acomms_codecs.packet_codecs import packet_codecs
-
-#TODO: move this
+import json
+from rospy_message_converter import message_converter
+# TODO: move this
 from packet_dispatch_node import DecoderListEntry
 
 
@@ -47,11 +47,14 @@ class QueuedMessage:
 
     def get_min_fragment_size_bits(self):
         if self.fragmentation_tracker is None:
-            size = (FragmentationStartHeader.length_bits + self.fragment_size_bits)
+            size = (FragmentationStartHeader.length_bits +
+                    self.fragment_size_bits)
         elif self.fragmentation_tracker.start_header_acked:
-            size = (FragmentationContHeader.length_bits + self.fragment_size_bits)
+            size = (FragmentationContHeader.length_bits +
+                    self.fragment_size_bits)
         else:
-            size = (FragmentationStartHeader.length_bits + self.fragment_size_bits)
+            size = (FragmentationStartHeader.length_bits +
+                    self.fragment_size_bits)
 
         rospy.logdebug("Minimum fragment size: {0}".format(size))
         return size
@@ -73,10 +76,12 @@ class QueuedMessage:
             if self.fragmentation_tracker is None:
                 sequence_number = dest_sequence_nums[self.dest]
                 dest_sequence_nums[self.dest] += 1
-                rospy.logdebug("Queued Message length bits: {0}".format(self.length_bits))
+                rospy.logdebug(
+                    "Queued Message length bits: {0}".format(self.length_bits))
                 self.sequence_number = sequence_number
                 self.fragmentation_tracker = FragmentationTracker(sequence_number, 0, self.dest,
-                                                                  int(self.time_added.timestamp()), self.length_bits,
+                                                                  int(self.time_added.timestamp(
+                                                                  )), self.length_bits,
                                                                   self.fragment_size_bits,
                                                                   payload_bits=self.payload_bits)
 
@@ -96,19 +101,21 @@ class QueuedMessage:
             except:
                 rospy.logdebug("No acked bytes yet")
 
-            message_bits = self.fragmentation_tracker.next_fragment_to_transfer(size_in_bits)
+            message_bits = self.fragmentation_tracker.next_fragment_to_transfer(
+                size_in_bits)
             message_header = self.queue.header + 128
 
-        rospy.logdebug("Returning {0} of {1} bits".format(message_bits.length, self.payload_bits.length))
+        rospy.logdebug("Returning {0} of {1} bits".format(
+            message_bits.length, self.payload_bits.length))
         if remove_from_queue and not fragmented:
             self.queue.mark_transmitted(self)
 
         return message_header, message_bits
 
+
 class MessageQueue(object):
-    def __init__(self, master_message_queue, dest_address, header, message_codec=None, ack_next_hop=False, ack_end_to_end=False,
+    def __init__(self, dest_address, header, message_codec=None, ack_next_hop=False, ack_end_to_end=False,
                  priority=1, order='lifo', maxsize=100, packet_codec=None, allow_fragmentation=False, is_active=True, roundrobin_tx=False):
-        self.master_message_queue = master_message_queue
         # Acks are TODO
         self.ack_next_hop = ack_next_hop
         self.ack_end_to_end = ack_end_to_end
@@ -123,13 +130,13 @@ class MessageQueue(object):
         self.is_active = is_active
 
         self._last_tx_time = datetime.utcnow()
-        self._queue = deque(maxlen=maxsize)
 
     def priority_update(self, priority_requested):
-        rospy.logwarn("Priority request for msg queue: {}".format(priority_requested))
+        rospy.logwarn(
+            "Priority request for msg queue: {}".format(priority_requested))
         self.priority = priority_requested
         return self.priority
-    
+
     def append(self, message):
         encoded_bits, metadata = self.message_codec.encode(message)
         rospy.loginfo("Append: {}".format(encoded_bits))
@@ -138,75 +145,54 @@ class MessageQueue(object):
             if 'dest' in metadata:
                 dest = metadata['dest']
 
-        queued_message = QueuedMessage(message=message,
-                                       message_codec=self.message_codec,
-                                       queue=self,
-                                       dest=dest,
-                                       payload_bits=encoded_bits,
-                                       length_bits=encoded_bits.length,
-                                       time_added=datetime.utcnow(),
-                                       transmitted=False,
-                                       next_hop_acked=False,
-                                       priority=self.priority,
-                                       fragmentation_tracker=None,
-                                       allow_fragmentation=self.allow_fragmentation,
-                                       is_active = self.is_active
-                                       )
-
         if self.order == 'fifo':
             if len(self._queue) < self._queue.maxlen:
                 # drop this message if the queue is full.
-                self._queue.appendleft(queued_message)
+                Queue.addRight(
+                    message,
+                    encoded_bits,
+                    self.dest,
+                    encoded_bits.length,
+                    datetime.utcnow(),
+                    None,
+                    self.allow_fragmentation,
+                    self.message_codec,
+                )
                 # master message queue draws from the top
-                self.master_message_queue.append(queued_message)
         else:
             # For LIFO, we threw away the old messages
-            if len(self._queue) == self._queue.maxlen:
-                # drop the oldest message
-                message_to_remove = self._queue.popleft()
-                self.master_message_queue.remove(message_to_remove)
-            self._queue.append(queued_message)
-            self.master_message_queue.insert(0, queued_message)
-
-        rospy.logdebug_throttle(5, "Added message to queue, {} messages".format(len(self._queue)))
+            Queue.clearLifoMessages(self.message_codec)
+            Queue.addLeft(
+                message,
+                encoded_bits,
+                self.dest,
+                encoded_bits.length,
+                datetime.utcnow(),
+                None,
+                self.allow_fragmentation,
+                self.message_codec,
+                0 #0 represents an outbound message
+            )
+
+            Queue.clearTopic(message_type)
+
+        rospy.logdebug_throttle(
+            5, "Added message to queue")
 
     def mark_transmitted(self, queued_message):
         # type: (QueuedMessage) -> None
-        if queued_message in self._queue:
-            queued_message.transmitted = True
-            self._last_tx_time = datetime.utcnow()
-            if (not self.ack_next_hop) and (not self.ack_end_to_end):
-                self._queue.remove(queued_message)
-                self.master_message_queue.remove(queued_message)
-
-    def get_next_message(self, remove_from_queue=True):
-        if len(self._queue) == 0:
-            return None
-        queued_message = self._queue[-1]
-        if remove_from_queue:
-            self._queue.pop()
-            self._last_tx_time = datetime.utcnow()
-        return queued_message
+        Queue.updateTransmit(queued_message, True)
+        self._last_tx_time = datetime.utcnow()
+
+    def get_next_message(self, message_type):
+        return Queue.getNextMessage(message_type)
 
     def get_next_message_smaller_than(self, size_in_bytes, remove_from_queue=False):
-        return self.get_next_message_smaller_than_bits(size_in_bits=size_in_bytes*8, remove_from_queue = remove_from_queue)
-
-    def get_next_message_smaller_than_bits(self, size_in_bits, remove_from_queue=False):
-        if len(self._queue) == 0:
-            return None
-        for qm in reversed(self._queue):
-            if qm.length_bits <= size_in_bits:
-                if remove_from_queue:
-                    self._queue.remove(qm)
-                    self._last_tx_time = datetime.utcnow()
-                return qm
+        return Queue.get_next_message_smaller_than_bits(size_in_bits=size_in_bytes*8)
 
     @property
     def bit_count(self):
-        count = 0
-        for msg in self._queue:
-            count += msg.length_bits
-        return count
+        return Queue.bit_count()
 
     @property
     def byte_count(self):
@@ -214,7 +200,7 @@ class MessageQueue(object):
 
     @property
     def message_count(self):
-        return len(self._queue)
+        return Queue.message_count()
 
     @property
     def time_since_last_tx(self):
@@ -227,42 +213,44 @@ class MessageQueue(object):
 
 class MessageQueueNode(object):
     def __init__(self):
+        self.topic_list = [] #List of dict for topics
+
         rospy.init_node('message_queue', log_level=rospy.DEBUG)
         self.update_rate = rospy.get_param('~update_rate', 1)
-        self.unknown_dests_are_reachable = rospy.get_param('~unknown_dests_are_reachable', True)
+        self.unknown_dests_are_reachable = rospy.get_param(
+            '~unknown_dests_are_reachable', True)
         self.default_dest = rospy.get_param('~default_dest', 121)
         self.default_priority = rospy.get_param('~default_priority', 10)
 
-        self.queues = {}  # type: dict[str, MessageQueue]
-        self.topic_queue = {}
         self.topic_headers = {}
 
         self.destination_reachable = {}
         self.dest_sequence_num = defaultdict(lambda: 1)
 
-        self.queues_by_priority = defaultdict(list)
         self.priority_update = False
-        self._queued_messages = []
-        
+
         self.queue_active_names = []
-    
 
         # Status publishers
-        self.queue_status_pub = rospy.Publisher('queue_status', QueueStatus, queue_size=10, latch=True)
+        self.queue_status_pub = rospy.Publisher(
+            'queue_status', QueueStatus, queue_size=10, latch=True)
 
         # Active Queues Service
-        self.queue_active_service = rospy.Service('queue_active', QueueActive, self.handle_queue_active)
-        
+        self.queue_active_service = rospy.Service(
+            'queue_active', QueueActive, self.handle_queue_active)
+
         # Dynamic Priority Service
-        self.priority_update_service = rospy.Service('priority_update', PriorityUpdate, self.handle_priority_update)
-        
+        self.priority_update_service = rospy.Service(
+            'priority_update', PriorityUpdate, self.handle_priority_update)
+
         # this is a little awkward, since it's conceptually backward here.
         # when queuing for TX, we go from topic -> message -> packet
         # However, organizing it this way allows us to use a single config YAML for both TX and RX
         packet_codec_param = rospy.get_param('~packet_codecs')
         self.packet_codecs = {}
         for packet_codec_details in packet_codec_param:
-            rospy.loginfo("Packet codec details: {}".format(packet_codec_details))
+            rospy.loginfo("Packet codec details: {}".format(
+                packet_codec_details))
             packet_codec_class = packet_codecs[packet_codec_details['packet_codec']]
 
             message_codecs = {}
@@ -270,129 +258,114 @@ class MessageQueueNode(object):
                 ros_type_name = message_codec_params['ros_type']
                 msg_class = roslib.message.get_message_class(ros_type_name)
                 if not msg_class:
-                    raise TypeError('Invalid ROS type "{}" for message codec {}'.format(ros_type_name, id))
-                message_codec = RosMessageCodec(ros_type=msg_class, fields_dict=message_codec_params['fields'])
+                    raise TypeError(
+                        'Invalid ROS type "{}" for message codec {}'.format(ros_type_name, id))
+                message_codec = RosMessageCodec(
+                    ros_type=msg_class, fields_dict=message_codec_params['fields'])
                 message_codecs[message_codec_params['id']] = message_codec
 
-                # TODO clean this up
                 self.destination_reachable[
                     message_codec_params['default_dest']] = True if self.unknown_dests_are_reachable else False
 
                 actual_topic_name = message_codec_params['subscribe_topic']
-                queue_active_status = message_codec_params.get('is_active', True)
+                queue_active_status = message_codec_params.get(
+                    'is_active', True)
                 priority = message_codec_params['priority']
 
-                
                 if queue_active_status:
                     self.queue_active_names.append(actual_topic_name)
-                    
+
                 if not self.priority_update:
                     # No priority update requested, use message codec params
-                    rospy.logwarn("Priority update is {}".format(self.priority_update))
+                    rospy.logwarn("Priority update is {}".format(
+                        self.priority_update))
                     priority = message_codec_params['priority']
                 else:
-                    rospy.logwarn("Priority update is {}".format(self.priority_update))
+                    rospy.logwarn("Priority update is {}".format(
+                        self.priority_update))
                     priority = self.queue_priority_desired
                     # Priority variable is set, now change bool to False to reset for future
                     self.priority_update = False
                 
-                new_queue = MessageQueue(master_message_queue=self._queued_messages,
-                                         dest_address=message_codec_params.get('default_dest', self.default_dest),
-                                         priority=message_codec_params.get('priority', self.default_priority),
-                                         header=message_codec_params['id'],
-                                         order=message_codec_params['queue_order'],
-                                         maxsize=message_codec_params['queue_maxsize'],
-                                         allow_fragmentation=message_codec_params.get('allow_fragmentation', False),
-                                         is_active=message_codec_params.get('is_active', True),
-                                         message_codec=message_codec,
-                                         )
-                self.topic_queue[actual_topic_name] = new_queue
+                topic = {message_codec_params['id']:message_codec} #Create new topic *used to codec)
+                
+                self.topic_list.append(topic) #append topic to list
+
+                Topics.addEntry(
+                    message_codec_params['id'],
+                    ros_type_name,
+                    priority,
+                    message_codec_params.get(
+                        'is_active', True),
+                )
+                
                 rospy.Subscriber(actual_topic_name,
                                  AnyMsg,
                                  partial(self.on_incoming_message, topic_name=actual_topic_name))
 
             # The packet codec initializer sets the packet codec on each message codec
             packet_codec = packet_codec_class(message_codecs=message_codecs,
-                                              miniframe_header=packet_codec_details.get('miniframe_header', bytes()),
+                                              miniframe_header=packet_codec_details.get(
+                                                  'miniframe_header', bytes()),
                                               dataframe_header=packet_codec_details.get('dataframe_header', bytes()))
-            rospy.loginfo("Added packet codec with {} message codecs".format(len(message_codecs)))
+            rospy.loginfo("Added packet codec with {} message codecs".format(
+                len(message_codecs)))
 
         # Subscribe to the Neighbor Reachability updates (which may be published by more than one node
-        rospy.Subscriber("neighbor_reachability", NeighborReachability, self.on_neighbor_reachability)
+        rospy.Subscriber("neighbor_reachability",
+                         NeighborReachability, self.on_neighbor_reachability)
 
         self.get_packet_data_service = rospy.Service('get_next_packet_data', GetNextPacketData,
                                                      self.handle_get_next_packet)
 
         # Subscribe to ACK message to update queues
-        rospy.Subscriber("from_acomms/encoded_ack", EncodedAck, self.on_ack_received)
+        rospy.Subscriber("from_acomms/encoded_ack",
+                         EncodedAck, self.on_ack_received)
 
         rospy.loginfo("Message Queue started")
         self.spin()
 
-    def update_queue_status(self):
-        highest_priority = -1
-        queued_dest_addresses = set()
-        message_counts = {}
-
-        for key, group in groupby(self._queued_messages, lambda x: (x.priority, x.dest)):
-            group_count = sum(1 for _ in group)
-            message_counts[key] = group_count
-            highest_priority = max(highest_priority, key[0])
-            queued_dest_addresses.add(key[1])
-
-        total_message_count = len(self._queued_messages)
-        '''
-        for m in self._queued_messages:
-            message_counts.setdefault((m.priority, m.dest_address), 0)
-            if q.message_count:
-                message_counts[(q.priority, q.dest_address)] += q.message_count
-                total_message_count += q.message_count
-                highest_priority = max(highest_priority, q.priority)
-                queued_dest_addresses.add(q.dest_address)
-        '''
-
-        # Build the QueueStatus message
-        hdr = Header(stamp=rospy.get_rostime())
-        summary_message_counts = []
-        for k,v in message_counts.items():
-            summary_message_counts.append(SummaryMessageCount(priority=k[0], dest_address=k[1], message_count=v))
-        msg = QueueStatus(header=hdr, message_count=total_message_count, highest_priority=highest_priority,
-                          queued_dest_addresses=queued_dest_addresses, summary_message_counts=summary_message_counts)
-        self.queue_status_pub.publish(msg)
-
     def on_incoming_message(self, msg_data, topic_name):
         # type: (AnyMsg, str) -> None
         connection_header = msg_data._connection_header['type'].split('/')
         ros_pkg = connection_header[0] + '.msg'
         msg_type = connection_header[1]
-        #print 'Message type detected as ' + msg_type
+        # print 'Message type detected as ' + msg_type
         msg_class = getattr(import_module(ros_pkg), msg_type)
         msg = msg_class()
         msg.deserialize(msg_data._buff)
 
-        rospy.logdebug("On incoming message: {} ({})".format(topic_name, msg_type))
-        
+        rospy.logdebug("On incoming message: {} ({})".format(
+            topic_name, msg_type))
+
         # Check if incoming msg topic name matches any of the active queue topic names
-        if topic_name in self.queue_active_names:
-            rospy.logdebug("TOPIC: {} in active queue list, appending msg".format(topic_name))
-            self.topic_queue[topic_name].append(msg)
-            self.update_queue_status()
-        else:
-            rospy.logdebug("TOPIC: {} not in active queue list, skipping msg".format(topic_name))
+        found = False
+        for val in Topics.select():
+            if (topic_name == val.message_name):
+                rospy.logdebug(
+                    "TOPIC: {} in active queue list, appending msg".format(topic_name))
+                found = True
 
+        if (found == False):
+            rospy.logdebug(
+                "TOPIC: {} not in active queue list, skipping msg".format(topic_name))
 
     def on_neighbor_reachability(self, msg: NeighborReachability):
         self.destination_reachable[msg.dest] = msg.reachable
 
     def on_ack_received(self, msg: EncodedAck):
-        ack = FragmentationAckHeader.from_bits(BitStream(bytes=msg.encoded_ack))
-        rospy.logdebug("Received ACK with src: {0} seq num: {1}". format(msg.src, ack.sequence_num))
-        rospy.logdebug("Ack blocks {}-{}".format(ack.blocks[0][0], ack.blocks[-1][1]))
+        ack = FragmentationAckHeader.from_bits(
+            BitStream(bytes=msg.encoded_ack))
+        rospy.logdebug("Received ACK with src: {0} seq num: {1}". format(
+            msg.src, ack.sequence_num))
+        rospy.logdebug(
+            "Ack blocks {}-{}".format(ack.blocks[0][0], ack.blocks[-1][1]))
         rospy.logdebug("ACK N-Blocks: {}".format(ack.n_blocks))
 
-        for message in self._queued_messages:
-            rospy.logdebug("Message dest: {0} Sequence number: {1}".format(message.dest, message.sequence_number))
-            if message.dest == msg.src and message.sequence_number == ack.sequence_num:
+        for message in Queue.returnTable():
+            rospy.logdebug("Message dest: {0} Sequence number: {1}".format(
+                message.dest, message.sequence_number))
+            if message.dest == msg.src and message.identifier == ack.sequence_num:
                 if len(ack.blocks) == 1 and ack.blocks[0][0] == 0 and ack.blocks[-1][-1] == 1 and ack.ack_flag == 0:
                     rospy.logwarn(
                         "Empty ACK received indicated the message recipient does not have start fragment for this message")
@@ -400,8 +373,10 @@ class MessageQueueNode(object):
                     return
 
                 rospy.logdebug("Recording blocks ACKED")
-                message.fragmentation_tracker.record_blocks_acked(ack.blocks, ack.ack_flag)
-                rospy.logdebug("Payload size={0}".format(message.fragmentation_tracker.payload_size_blocks))
+                message.fragmentation_tracker.record_blocks_acked(
+                    ack.blocks, ack.ack_flag)
+                rospy.logdebug("Payload size={0}".format(
+                    message.fragmentation_tracker.payload_size_blocks))
 
                 if message.fragmentation_tracker.acked():
                     # Remove this message from the queue
@@ -415,50 +390,15 @@ class MessageQueueNode(object):
         msg.queue.mark_transmitted(msg)
 
     def get_next_message(self, remove_from_queue=False, check_reachability=True):
-        # type: (bool) -> Union[None, QueuedMessage]
-        # Use the master message queue for this
-        next_message = None
-        for message in sorted(self._queued_messages, key=lambda m: m.priority, reverse=True):
-            if check_reachability and not self.destination_reachable[message.dest]:
-                continue
-            next_message = message
-            break
-        return next_message
+        return Queue.getNextMessage()
 
     def get_next_message_smaller_than_bits(self, size_in_bits, dest_address=None, packet_codec=None,
                                            check_reachability=True):
-        # type: (int, bool, int, None, bool) -> Union[None, QueuedMessage]
-        next_message = None
-
-        for message in sorted(self._queued_messages, key=lambda m: m.priority, reverse=True):
-            if check_reachability and not self.destination_reachable[message.dest]:
-                rospy.logdebug("Disqualified: Reachability")
-                continue
-            if dest_address and (dest_address != message.dest):
-                # Move on to check the next message
-                rospy.logdebug("Disqualified: dest_address")
-                continue
-            if packet_codec and (packet_codec != message.message_codec.packet_codec):
-                # Move on to check the next message
-                rospy.logdebug("Disqualified: packet_codec")
-                continue
-            if not message.allow_fragmentation and message.length_bits > size_in_bits:
-                rospy.logdebug("Disqualified: too large, no fragmentation")
-                continue
-            if message.allow_fragmentation and message.get_min_fragment_size_bits() > size_in_bits:
-                rospy.logdebug("Disqualified: fragments too large")
-                continue
-            if message.allow_fragmentation and message.get_min_fragment_size_bits() == 0:
-                rospy.logdebug("Disqualified: no fragments to send at this time")
-                continue
-
-            next_message = message
-            break
-
-        return next_message
+        return Queue.get_next_message_smaller_than_bits(size_in_bits, dest_address, packet_codec, check_reachability)
 
     def handle_priority_update(self, request):
-        rospy.loginfo("Handling new priority update, new priority requested: {}".format(request.priority_desired))
+        rospy.loginfo("Handling new priority update, new priority requested: {}".format(
+            request.priority_desired))
         self.priority_update = True
         self.queue_name = request.topic_name
         self.queue_priority_desired = request.priority_desired
@@ -466,7 +406,7 @@ class MessageQueueNode(object):
         self.topic_queue[topic_name].priority_update(request.priority_desired)
         response = True
         return response
-        
+
     def handle_queue_active(self, request):
         rospy.loginfo("Handling new dynamic queue request")
         if not request.set_active:
@@ -480,17 +420,19 @@ class MessageQueueNode(object):
             self.queue_active_status = True
             response = True
         return response
-    
+
     def handle_get_next_packet(self, req):
         # type: (GetNextPacketDataRequest) -> GetNextPacketDataResponse
-        rospy.loginfo("Getting next packet data for transmission:\r\n" + str(req))
+        rospy.loginfo(
+            "Getting next packet data for transmission:\r\n" + str(req))
         num_miniframe_bytes = req.num_miniframe_bytes
         num_dataframe_bytes = req.num_dataframe_bytes
 
         # To get the next packet, the first thing we do is get the highest priority message to transmit
         # Then, we get a packet using its packet codec.
         # This is also where we should eventually evaluate which rate to use
-        highest_priority_message = self.get_next_message(remove_from_queue=False)
+        highest_priority_message = self.get_next_message(
+            remove_from_queue=False)
         if not highest_priority_message:
             return GetNextPacketDataResponse(num_miniframe_bytes=0, num_dataframe_bytes=0, num_messages=0)
 
@@ -500,20 +442,25 @@ class MessageQueueNode(object):
                                                                                            num_dataframe_bytes,
                                                                                            self)
 
-        #for msg in messages_in_packet:
+        # for msg in messages_in_packet:
         #    self.mark_transmitted(msg)
 
         return GetNextPacketDataResponse(dest=highest_priority_message.dest,
                                          miniframe_bytes=miniframe_bytes,
                                          dataframe_bytes=dataframe_bytes,
                                          queued_message_ids=[],
-                                         num_miniframe_bytes=len(miniframe_bytes),
-                                         num_dataframe_bytes=len(dataframe_bytes),
+                                         num_miniframe_bytes=len(
+                                             miniframe_bytes),
+                                         num_dataframe_bytes=len(
+                                             dataframe_bytes),
                                          num_messages=len(messages_in_packet))
 
     @property
     def message_count(self):
-        return sum([q.message_count for q in self.queues.values()])
+        count = 0
+        for val in Queue.returnTable():
+            count += 1
+        return count
 
     @property
     def age_of_oldest_queue(self):
@@ -521,19 +468,21 @@ class MessageQueueNode(object):
         return max([q.time_since_last_tx for q in self.queues.values()])
 
     def get_filtered_message_count(self, dest_address, min_priority):
-        filtered_queues = [q for q in self.queues.values() if q.priority >= min_priority]
-        filtered_queues = [q for q in filtered_queues if q.dest_address == dest_address]
+        filtered_queues = [
+            val for val in Queue.returnTable() if val.priority >= min_priority]
+        filtered_queues = [
+                val for val in Queue.returnTable() if val.dest_address == dest_address]
         if len(filtered_queues) == 0:
             return 0
         else:
-            return sum([q.message_count for q in filtered_queues])
+            return sum([1 for q in filtered_queues])
 
     def spin(self):
         # Issue status periodically.
         rate = rospy.Rate(self.update_rate)
         while not rospy.is_shutdown():
-            #self.time_since_last_tx_pub.publish(rospy.Duration.from_sec(self.age_of_oldest_queue.total_seconds()))
-            #self.messages_in_queues_pub.publish(Int32(self.message_count))
+            # self.time_since_last_tx_pub.publish(rospy.Duration.from_sec(self.age_of_oldest_queue.total_seconds()))
+            # self.messages_in_queues_pub.publish(Int32(self.message_count))
             self.update_queue_status()
             rate.sleep()
 
diff --git a/src/packet_dispatch_node.py b/src/packet_dispatch_node.py
index 9b56e567..401d8dd6 100755
--- a/src/packet_dispatch_node.py
+++ b/src/packet_dispatch_node.py
@@ -7,7 +7,7 @@ import roslib.message
 from dataclasses import dataclass, field
 from ros_acomms.msg import ReceivedPacket, Packet, CST
 from bitstring import ConstBitStream, ReadError
-
+from database.controllers.queueController import *
 from acomms_codecs.ros_message_codec import RosMessageCodec
 from acomms_codecs.packet_codecs import packet_codecs
 
-- 
GitLab


From f2ac7bcf61f85415ac02ae714ab27a5f836c9364 Mon Sep 17 00:00:00 2001
From: Brennan Miller-Klugman <millerklugmanb@wit.edu>
Date: Thu, 30 Sep 2021 09:46:20 -0400
Subject: [PATCH 2/4] Added verbose commenting for queueController and updated
 packet_dispatch_node to add table entries

---
 .gitignore                              |  1 +
 database/controllers/queueController.py | 84 ++++++++++++++++++++-----
 src/packet_dispatch_node.py             | 44 ++++++++-----
 3 files changed, 100 insertions(+), 29 deletions(-)

diff --git a/.gitignore b/.gitignore
index c0706b96..e5ee6dab 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,3 +3,4 @@
 /src/acomms_codecs/__pycache__
 /src/acomms_codecs/field_codecs/__pycache__
 .vscode/settings.json
+.vscode/c_cpp_properties.json
diff --git a/database/controllers/queueController.py b/database/controllers/queueController.py
index 3843ca3e..e868ea0f 100644
--- a/database/controllers/queueController.py
+++ b/database/controllers/queueController.py
@@ -24,21 +24,23 @@ class Queue(Model):  # database model
 
     identifier = IntegerField()  # Unique ID
     message_raw = TextField()
-    message_encoded = TextField()
-    message_type = TextField()
-    dest = IntegerField()
-    payload_bits = TextField()
-    length_bits = IntegerField()
-    posistion = IntegerField()  # Pos in queue (relative to topic)
-    fragmentation_tracker = BooleanField()
-    allow_fragmentation = BooleanField()
-    transmitted = BooleanField()  # Transmitted state
-    completed = BooleanField()  # Transmission completed
-    message_type = IntegerField()  # Corresponds to message table
-    direction = BooleanField()
+    message_encoded = TextField(null=True)
+    message_type = TextField(null=True)
+    dest = IntegerField(null=True)
+    payload_bits = TextField(null=True)
+    length_bits = IntegerField(null=True)
+    posistion = IntegerField(null=True)  # Pos in queue (relative to topic)
+    fragmentation_tracker = BooleanField(null=True)
+    allow_fragmentation = BooleanField(null=True)
+    transmitted = BooleanField(null=True)  # Transmitted state
+    completed = BooleanField(null=True)  # Transmission completed
+    message_type = IntegerField(null=True)  # Corresponds to message table
+    direction = BooleanField(null=True)
 
     def get_next_message_smaller_than_bits(self, size_in_bits, dest_address=None, packet_codec=None,
                                            check_reachability=True):
+        '''get_next_message_smaller_than_bits returns the next message in the queue with a bit count smaller then the specified parameter'''
+
         try:
             for val in Queue.select():
                 next_message = None
@@ -58,6 +60,8 @@ class Queue(Model):  # database model
             return None
 
     def bit_count(self):
+        '''bit_count is used to count bits across all messages'''
+
         count = 0
 
         for val in Queue.select():
@@ -66,38 +70,69 @@ class Queue(Model):  # database model
         return count
 
     def message_count(self):
+        '''message_count is used to count all unsent messages across every topic type'''
+
         counter = 0
         for val in Queue.select():
-            counter += 1
+            if (val.transmitted == False):
+                counter += 1
         return counter
 
     def removeMessage(self, idVal):
+        '''removeMessage is used to delete a message with an identifier of idVal'''
+
         Queue.delete().where(
             Queue.identifier == idVal).execute()
 
     def getNextMessage(self, message_type):
+        '''getNextMessage is used to get the next message within a specific message type queue'''
+
         for val in Queue.select():
             if(val.message_type == message_type):
                 if(val.completed == False):
                     return val.message_encoded
 
     def clearLifoMessages(self, messageType):
+        '''clearLifoMessages is used to delete all entries within a certain message type'''
+
         Queue.delete().where(Queue.message_type == messageType).execute()
 
     def popLeft(self):
+        '''popLeft is used to delete the left most value from the queue'''
+
         Queue.delete().where(Queue.identifier == 0).execute()
 
     def popRight(self):
+        '''popRight is used to delete the right most value from the queue'''
         Queue.delete().where(Queue.identifier == self.getLastIterator()).execute()
 
     def updateTransmit(self, msg, transmitVal):
+        '''updateTransmit function is used to update a database entry's transmit value parameter'''
+
         dictionary = message_converter.convert_ros_message_to_dictionary(
             msg)
         message_str = json.dumps(dictionary)
         Queue.update(transmitted=transmitVal).where(
             Queue.message == message_str).execute()
 
+    def addRecieve(self, message, payloadBits):
+        '''Add recieve function is used to create database entries on message recieve'''
+
+        dictionary = message_converter.convert_ros_message_to_dictionary(
+            message)
+        message_raw = json.dumps(dictionary)
+
+        Queue.create(identifier=self.getLastIterator + 1,
+                     message_raw=message_raw,
+                     payload_bits=payloadBits,
+                     transmitted=True,
+                     is_active=False,
+                     direction=1  # Direction 1 represents incomming
+                     )
+
     def addRight(self, message, message_codecVal, destVal, payload_bitsVal, length_bitsVal, time_addedVal, transmittedVal, next_hop_ackedVal, priorityVal, fragmentation_trackerVal, allow_fragmentationVal, is_activeVal, directionVal):
+        '''Add right function is used to create database entries on the right side of the queue'''
+
         dictionary = message_converter.convert_ros_message_to_dictionary(
             message)
         message_raw = json.dumps(dictionary)
@@ -124,6 +159,8 @@ class Queue(Model):  # database model
                      direction=directionVal)
 
     def addLeft(self, message, message_codecVal, destVal, payload_bitsVal, length_bitsVal, time_addedVal, transmittedVal, next_hop_ackedVal, priorityVal, fragmentation_trackerVal, allow_fragmentationVal, is_activeVal, directionVal):
+        '''Add right function is used to create database entries on the left side of the queue'''
+
         dictionary = message_converter.convert_ros_message_to_dictionary(
             message)
         message_raw = json.dumps(dictionary)
@@ -149,13 +186,18 @@ class Queue(Model):  # database model
                      direction=directionVal)
 
     def displayTable(self):
+        '''*Test Function* used to display all values within the database'''
+
         for command in Queue.select():
             print(command.output)
 
     def returnTable(self):
+        '''Retruns contect of select() function'''
+
         return Queue.select()
 
     def getLastIterator(self):
+        '''Gets the highest iterator value in the database'''
         highVal = -1
         for val in Queue.select():
             if(val.identifier > highVal):
@@ -169,7 +211,9 @@ class Topics(BaseModel):  # Gets created at yaml read
     priority = IntegerField()
     is_active = BooleanField()
 
-    def addEntry(self, message_typeVal, priorityVal, is_activeVal, directionVal = None):
+    def addEntry(self, message_typeVal, priorityVal, is_activeVal, directionVal=None):
+        '''addEntry function creates a table entry based on function parameters'''
+
         Topics.create(message_type=message_typeVal, priority=priorityVal,
                       is_active=is_activeVal, direction=directionVal)
 
@@ -181,14 +225,20 @@ class msgEvents(BaseModel):
     dest = IntegerField()
 
     def removeEntry(self, idVal):
+        '''removeEntry function removes a table entry based on the specified idVal'''
+
         msgEvents.delete().where(
             msgEvents.identifier == idVal).execute()
 
     def addEntry(self, identifierVal, timeVal, sourceVal, destVal):
+        '''addEntry function creates a table entry based on function parameters'''
+
         msgEvents.create(identifier=identifierVal, time=timeVal,
                          source=sourceVal, dest=destVal)
 
     def getLastIterator(self):
+        '''Gets the highest iterator value in the database'''
+
         highVal = -1
         for val in msgEvents.select():
             if(val.identifier > highVal):
@@ -203,14 +253,20 @@ class Fragmentation(BaseModel):
     dest = IntegerField()
 
     def removeEntry(self, idVal):
+        '''removeEntry function removes a table entry based on the specified idVal'''
+
         Fragmentation.delete().where(
             Fragmentation.identifier == idVal).execute()
 
     def addEntry(self, identifierVal, timeVal, sourceVal, destVal):
+        '''addEntry function creates a table entry based on function parameters'''
+
         Fragmentation.create(identifier=identifierVal,
                              time=timeVal, source=sourceVal, dest=destVal)
 
     def getLastIterator(self):
+        '''Gets the highest iterator value in the database'''
+
         highVal = -1
         for val in Fragmentation.select():
             if(val.identifier > highVal):
diff --git a/src/packet_dispatch_node.py b/src/packet_dispatch_node.py
index 401d8dd6..bb9990aa 100755
--- a/src/packet_dispatch_node.py
+++ b/src/packet_dispatch_node.py
@@ -10,10 +10,12 @@ from bitstring import ConstBitStream, ReadError
 from database.controllers.queueController import *
 from acomms_codecs.ros_message_codec import RosMessageCodec
 from acomms_codecs.packet_codecs import packet_codecs
+from database.controllers.queueController import *
+
 
 @dataclass
 class DecoderListEntry:
-    #TODO: move header stuff into separate packetcodec class
+    # TODO: move header stuff into separate packetcodec class
     codec_name: str
     miniframe_header: bytes
     dataframe_header: bytes
@@ -52,18 +54,22 @@ class DecoderListEntry:
                 return False
         if len(self.miniframe_header) > 0:
             if bytes(self.miniframe_header) != received_packet.packet.miniframe_bytes[:len(self.miniframe_header)]:
-                print("Miniframe header mismatch: {} not {}".format(received_packet.packet.miniframe_bytes[:len(self.miniframe_header)], self.miniframe_header))
+                print("Miniframe header mismatch: {} not {}".format(
+                    received_packet.packet.miniframe_bytes[:len(self.miniframe_header)], self.miniframe_header))
                 return False
         if len(self.dataframe_header) > 0:
             if bytes(self.dataframe_header) != received_packet.packet.dataframe_bytes[:len(self.dataframe_header)]:
-                print("Dataframe header mismatch: {} not {}".format(received_packet.packet.dataframe_bytes[:len(self.dataframe_header)], self.dataframe_header))
+                print("Dataframe header mismatch: {} not {}".format(
+                    received_packet.packet.dataframe_bytes[:len(self.dataframe_header)], self.dataframe_header))
                 return False
         return True
 
     def strip_headers(self, received_packet: ReceivedPacket):
         if self.remove_headers:
-            miniframe_bytes = received_packet.packet.miniframe_bytes[len(self.miniframe_header):]
-            dataframe_bytes = received_packet.packet.dataframe_bytes[len(self.dataframe_header):]
+            miniframe_bytes = received_packet.packet.miniframe_bytes[len(
+                self.miniframe_header):]
+            dataframe_bytes = received_packet.packet.dataframe_bytes[len(
+                self.dataframe_header):]
         else:
             miniframe_bytes = received_packet.packet.miniframe_bytes
             dataframe_bytes = received_packet.packet.dataframe_bytes
@@ -73,7 +79,6 @@ class DecoderListEntry:
 class PacketDispatchNode(object):
     def __init__(self):
         rospy.init_node('packet_dispatch')
-
         packet_codec_param = rospy.get_param('~packet_codecs')
         self.decoder_list = []
         for packet_codec in packet_codec_param:
@@ -89,14 +94,18 @@ class PacketDispatchNode(object):
                 ros_type_name = codec_config['ros_type']
                 msg_class = roslib.message.get_message_class(ros_type_name)
                 if not msg_class:
-                    raise TypeError('Invalid ROS type "{}" for message codec {}'.format(ros_type_name, id))
+                    raise TypeError(
+                        'Invalid ROS type "{}" for message codec {}'.format(ros_type_name, id))
                 # TODO: allow custom codecs
                 # like this: MyClass = getattr(importlib.import_module("module.submodule"), "MyClass")
                 # TODO: make this work with orthogonal packet codecs
-                self.message_codecs[id] = RosMessageCodec(msg_class, codec_config['fields'])
+                self.message_codecs[id] = RosMessageCodec(
+                    msg_class, codec_config['fields'])
 
-                pub_name = rospy.names.canonicalize_name(codec_config['publish_topic'])
-                self.message_publishers[id] = rospy.Publisher(pub_name, msg_class, queue_size=10)
+                pub_name = rospy.names.canonicalize_name(
+                    codec_config['publish_topic'])
+                self.message_publishers[id] = rospy.Publisher(
+                    pub_name, msg_class, queue_size=10)
 
             # replace name with instance of packet codec
             # ToDO make this less ugly and stupid
@@ -105,7 +114,8 @@ class PacketDispatchNode(object):
                                                                    miniframe_header=entry.miniframe_header,
                                                                    dataframe_header=entry.dataframe_header)
 
-            rospy.loginfo("Receive packet codec initialized with {} message codecs.".format(len(self.message_codecs)))
+            rospy.loginfo("Receive packet codec initialized with {} message codecs.".format(
+                len(self.message_codecs)))
 
         rospy.Subscriber('packet_rx', ReceivedPacket, self.on_packet_rx)
 
@@ -115,10 +125,12 @@ class PacketDispatchNode(object):
     def on_packet_rx(self, received_packet: ReceivedPacket):
         rospy.loginfo("Got incoming packet")
         # Decide what to do with the incoming packet, based on the match criteria
-        matching_codecs = [d.packet_codec for d in self.decoder_list if d.does_packet_match(received_packet)]
+        matching_codecs = [
+            d.packet_codec for d in self.decoder_list if d.does_packet_match(received_packet)]
 
         if not matching_codecs:
-            rospy.loginfo("Dispatcher received packet that matched no packet decoders.")
+            rospy.loginfo(
+                "Dispatcher received packet that matched no packet decoders.")
 
         for d in matching_codecs:
             try:
@@ -141,10 +153,12 @@ class PacketDispatchNode(object):
             while True:
                 id = payload_bits.read('uint:8')
                 ros_msg = self.message_codecs[id].decode(payload_bits)
+                Queue.addRecieve(ros_msg, payload_bits)  # Update database with inbound message
                 self.message_publishers[id].publish(ros_msg)
         except KeyError:
             # We don't have a config entry that matches this ID
-            rospy.loginfo("Unknown message ID ({}) received, unable to continue parsing packet".format(id))
+            rospy.loginfo(
+                "Unknown message ID ({}) received, unable to continue parsing packet".format(id))
             return
 
         except ReadError:
@@ -158,4 +172,4 @@ if __name__ == '__main__':
         rospy.loginfo("Message Dispatch shutdown")
     except rospy.ROSInterruptException:
         node.close()
-        rospy.loginfo("Message Dispatch shutdown (interrupt)")
\ No newline at end of file
+        rospy.loginfo("Message Dispatch shutdown (interrupt)")
-- 
GitLab


From 0495b456720e8279179925a2c1ac3cf89e65beb6 Mon Sep 17 00:00:00 2001
From: Brennan Miller-Klugman <millerklugmanb@wit.edu>
Date: Thu, 30 Sep 2021 10:50:18 -0400
Subject: [PATCH 3/4] Moved location of databases folder, made minor updates to
 fix errors

---
 .../queueController.cpython-38.pyc            | Bin 0 -> 8693 bytes
 .../database}/controllers/queueController.py  |  18 +++++++++++++++---
 {database => src/database}/databases/queue.db | Bin 57344 -> 57344 bytes
 src/packet_dispatch_node.py                   |   4 ++--
 4 files changed, 17 insertions(+), 5 deletions(-)
 create mode 100644 src/database/controllers/__pycache__/queueController.cpython-38.pyc
 rename {database => src/database}/controllers/queueController.py (95%)
 mode change 100644 => 100755
 rename {database => src/database}/databases/queue.db (96%)

diff --git a/src/database/controllers/__pycache__/queueController.cpython-38.pyc b/src/database/controllers/__pycache__/queueController.cpython-38.pyc
new file mode 100644
index 0000000000000000000000000000000000000000..c7fdbc985d52cd8e48975fa9268321a97eeef8d9
GIT binary patch
literal 8693
zcmcIpOOxBi5yp!o$R)W??doYmiesCKtaa=pPPrUKUdxj0ShOsy;uwjG#&TwNu}c!%
z0jLLyn;hh-octeFm6QHNPPwFVN#&HnC5KcFuAEZ2*;P@#o&i9DTFH(blZD2-FqrQC
zx~HdcvsSBU_|^Y()BE%Dn)Y|9O#X_fT*MU`x~4IW=^d@BxtdO8qoa2X7ab^@9kXk>
zmMmMH!aB7TIz_iA?<Kb+@3w2>UhI^+6}KYC7CJNT3@;s2?`v+27kBChv%GdN$4d8g
zX5Tm5dA`8s@9FO16^)fy<r9rn_|iSoJ%aiStD;`zM^Ilzy~gUO*JXVL^;tHD`kbsE
zMSY$vpuQmM$M|u!xNC_2@Wq1@e3>8PNBPQ@bzj#tzI;z}Px51+r}$COr}zr!)2whq
zYb<?6(rM_i75QC`yZv52@`Ik&LAkUR1Rd`6P}I*hLFc<X481MhY6rbrTtr;N6%m9j
zuN`!|;qV0WB5%VB`D$7SSN8^dz|XUdHoa%^XQFZuS9lzRYp%{T*8o>KGr*N`&u~p<
zVue;*Uc;1E1I9a#OadjUN#-Ud;ihXb9giBrdiLn~t32|WjY4d<T0O7JTdla#YIOrP
z=uo-ZYV8fYPSRtr4QMuD=7A<^m|c?3udi+gUB0>@c(3R6R@+{*>-So>!_`o<SI44|
z)pUYYNGlRSr^7`ek2kW0^ZmQAMN2z^SA+(Lt&3UQnhi^o>9vGsF++r!$4gPAI<?s#
zOSsbKss4zlGI;~2nY_6PBv`xvTI2<i{inKH%1SmjleTi!R>|6CvQm}Tl5zE{1l%WW
zb6IIVd&17z7T6hPvBD>?307pKPc(Ol*{qEF5msR{xG%FRtKq)F>TDMGqil}N<9>`S
zutnUDvn6%}_Y-WHt>Au=9c9OGKgEu-6SzOcPO?+DKh2(EPvh<|hn>d#w8YnU;==oc
z(RJY4>J2)bX1f6SrN0si<TT#E6>fqIuSeQ}eqXz!eYCa@k?j{phB!AeMui<?*A(X>
z^T1*T8Qxw6ZH4eH(BeqnwZvPp&s2TrZ(>C81MLRf5B)y`_-|;xkhEb9f4#+{R*&C_
zTB+5AU5`N2inhI8Yr~I1NAPGMdZ80-bBBgH$xz4d$u?<y&TT*1c0319IPGB2i=1R4
z8luL9ectvrea@V|Ctw}0?B?+JB|jvikR7i&jE7OnV@$vY3>*2rC<vU6C${)Grx!Sz
z!rSWdUgVJ{X_N#RXIu<<hqt5HqIu#1Yz5G_JHZy?8-uNQ5v&EbTe(N$nGWx5Mcb0s
zMk%&1#b#U${UL9`>PP}{b#hs8wePieA+|QWMcZ9yZ}awUOK`8f?QQrSKe`*6x4ceV
zofPYYNs%?hXTDtSX9*1Cw=~PptGZ>>jf!sQb2`zYfxkX{UZU#!mrvFvCzk%mO6iht
zb5H=l^bBzIePg7Lj2+_@ZC~H3j+B#o8G-=Eq*y<61|hT<1&O95O1w6t9XgP3ijPpx
zOB<$CjaWcgtf+>gxWxrIn>b1BneP))I_2zT7ttG%tI<r$sOnW?IICFBMm(M~Y|VMj
zmcbeLVg~jw@Wspt4z1xksRcZSOX6(M3jw)|K__9FW1qSUdG!6Z6W#4|$@t~ZrrD2Y
zBH{JIt{+95K`v4ZCHXoo4i!W#x2AVU7&7!QEg+(T|8P->BR8&1E<XKH*}^52O~O`+
zJ}@F}$Arg#7df!*Q*fdb{ifjE;1<8CHsMKSV{e4bI$p|tV)8x5XAlY*uyBG+)PC-D
znhy$zMXIOb!tHG?cw8BejZ6FvZx15=z)+GX$S0?<P|L071{es*RcfZK+YeLrb7OMs
z6VB<9?7xUYhVwHD4I~0#0~gqZl9FXW95=K*n;F>P&EXB$R}+r!5SgIoQNKv85WdlK
z5|7u;Bu3zHq`$n3Fkx&MaWz#Sjfl$#b^0W17SE;~yhr{fg^hRttQ-oNdVaAVVq9WC
z31XHqpE1ExPG<irWUjUmz=?PL%^;Cbco=M8K7fhvc_@J+1=L1-=b@A^DUBx+%(KMe
zb3`Pw4U`%RD(+13z&aV*l=GN>WHlbiK-z>kTv6OVGA8Fn$Z#KPGX#^&W)whP>j(XJ
z`DV^FCe}imW{0}EK^QsM7X}<#SI`~roYGk|#dq<}2fDO`X^u+C^b{GSeAE;lNzWsz
zr|3dL-a#Q}@fumX<e)HN^%pSFb$@I7aV!Yg_gEg9jiLhcSZCyY<5__5T`!E@#$NA5
zfe@rb@e+|Kt=Z`UcG0PrL}Z;YDc9B}Lr#ChVWy)Wph8@Z^!xh085sxW2ocb}6<Irl
zk-_v&^@AeHrI95$w<YIFcQ*%p#L#>_@yyQVpx365ViLk?2n;%nwk4A9K8KhneN1UK
z3v3#?ID<7c8)ZS>Dn6YYW?B@!rzRZ*E#}LYc>)poP8js!0vmMuVd6X0d!-G;rQ|6o
zLii53d6?aB%guHb_}#cNo-wnMRv3wCdDru72UuiCdoT7`-FRpr)TokwO`Ju3@FRMl
z4O*h@iF(j>icDY8Miw+;FbevuPm9=h=!jSxzVkX`2sPV2Hq%3u&_;-b?I))Kawbq!
z>OPXk#c8Z({W35qL6|y=mhf&<@OHNocuWR%Qb%#wr^99B--6?1KIS0xHxW{j=ERSP
zyh7y1L{ylm)*^`AL>1yyl--$R_BBL};x!t7fyflrD+q9}D^BmAJ6s0I<c=uENdI39
z=adDEA81P@(F6F=d5jh$Wxs<$W&?F;19jNI>`2?eNp(frpJT@Ud{j6n%5z0^e_=Gw
zbc8_jduOSH5sOg)HZnJw#Yw}2o(iMls5G)ii=)bDW>g*3nE7cP8nd9W!f;(`Oge_c
zjOGdYBB!W4N{D!Xf(B~YqZ5+UoeCW5Qhic_@kAhmpj#}0SMgIK6j8<%$}7;ZfitDj
zx{STIgFY0yi(Rzb4?;gAy~am!(SHjDj1fYR4#Y(wuY)uj_MzG*g^zb_v4(16N%?Hj
ze>|6(wmch<J(lR`K!w>!;88<;;@m;?+L#I1ow^o80aTyQvqViZec=ZJ2M($nJ(Mr%
zSw7^{L+#ZJ_p+8jAGHbWeh+-PwhU6KgCa;6Q7TSqnnJftU?o)i7DGb{xyIP8=qhlv
zad^p4K3)2E>i5%$a+oL8XPEyOl_z69!+U*yel)*RV){{Se_`)gAl*I$=?fXs4T*G_
z92(yEMo5?2|KUJyBp4^~h)YEB$QEx>8|@_GGLf4^-Xb!MSGCVfd#fqDUZE!?Xcxzz
z6>n23oh`)Ah`dYWDv>6U_lR60@;;I4L?{Xu>qN#rj&Py^Qn|<}P?VrcijaPXQQ<j|
z2~5ay-4pSU1UtzbxnRejsc=abI6mGWaZ<;b7AM&q^B$#1?BsZJ;p{pL(z%@M@@KQ7
zEc3&D$Ga;Ju(v2o%feLhP3f*f88bqkJkG`i7+^1oi@_l3527SEjd7yucD){(HjHZW
zigj<J!+(!SLW*BBv#Jjll$)O%kmHg9QsPu{NxQ8I>EMR-;WO8HBycdrVZ0Y%^CEYu
z(jd<_vW*mL%a(m;{57MpJaTP+zzafpon{W_)ygJ%a|<KUXX2;eFPBPK%8pnT4%|3;
zrwN4z#&GotM{bE^yA5YSI`NYrF^Q6s>{2TE&mQ8JOD53f{y?3g-4acb`ll(u%We9P
z7_83DM&g3>;St5#RE*j%o6YscnW@BRTzwnT-Qwc1&pS*77Ugvu3RPWh^Q(A=I;%`s
zG1*;QO%mPefw2*vp{LFhp`AxOMZ_WU3=y&w@gkA$6M31)4~hJQ2;o8eoX7`6eo5q4
zM5N)5ZBUjda*!xEhC)ayCO2G^c?8NOO#SH){IIfYSM0J~v~9b9zlHLwebPRG7Il1W
zW18HL&~On~NGzo(_go5?(!nKegO-t$MI?x{Y(<)9Aug^{5+%&1M=8{&Sd6%p!4gsz
z2=!Hvg5aLMpvYkU^Gk3MZ{m=7Hv_(-!Vn~;9DFwDa22^b^Z|iU=$J(=S?C_Z#9zRh
zVmsYl#YI$;xWEv^v2vm+2zRq1I-T7KgV$)p*hh}lKpKc?f0S$dD{9Hn+|~`vcsQI@
z+@DWJH1uX;W(tVnW15{7C_E51%N)>W1;MF(gUCf9n?%O%mn8`UV&X5jq)FHSf&44x
zs4w>;S8x$mNZ$SlDka6j&(gOQsMcf_?iE&GMcijt3Es+%%Q&&Tc?*ev@av}>;61LW
zjL(#=RpVoIP0nVVgjRVfgzsD6N|5hT$p&&O>YRF(N@M9s?ZHn{dQ86fALyS<Mjgp`
zJ3lchm+AkCP=;G$m>fuyt04s-w;c=w5+>wZn(*gx*=wjgNN`vi=hlEiw?y$I70T*C
zLZ;m1^BhTx{1emUp|~*SB}L=^!|{uc@FsY1xp<ipmJGJgIwfi8!v00;s(?_qK=~_M
zA08X8Y|3*tA5;%D7J7wSUWYi8?<H}HT_Pk);S-UT@>s7)%l#WJc|;zY3%p=avhuB(
zOzraIr;e|0Hewqb0Pj8G_bcLE3J3914(G>m3;L#57dx~pLoJK&EILFEL1~HBq0oSE
z5djfg=5l^5-v(BtyuKW(wejjQte(oRZW^zmo99&WEd4XR^9x*u0gn3U3v(Bl(^jkD
z2<0BI`}n;rVN!4HPPZfa?ep!94_7NF&UA}?&Tn%b8$lQs>C4#emdtCoEAlg0mX%5}
zRX7tD@t9bm`AXM=@EQ)vaf!xK!RmX_HgZ;h--~VV)uOsvy0q51_WJr;ZvDm{zJ&2h
z$>(iRr4<z9_jUOtSw*8{Hp&%98JB5LO>NjMxm$;BNoEBut+}(ClU9I+namHMQj#BE
zF0D<#CKG0CzotIHUm)3pFM;4o66Lp&KT3ve)G6;}+QwXA?l;=2bG5mXb0=(LL54_0
F{lA{*9cTam

literal 0
HcmV?d00001

diff --git a/database/controllers/queueController.py b/src/database/controllers/queueController.py
old mode 100644
new mode 100755
similarity index 95%
rename from database/controllers/queueController.py
rename to src/database/controllers/queueController.py
index e868ea0f..f7f0dac0
--- a/database/controllers/queueController.py
+++ b/src/database/controllers/queueController.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
 from time import time
 from typing import Optional
 from xmlrpc.client import Boolean
@@ -11,16 +13,16 @@ import rospy
 rospack = rospkg.RosPack()
 
 DB_PATH = os.path.join(rospack.get_path('ros_acomms'),
-                       'src/database/databases/queue.db')
-db = SqliteDatabase(DB_PATH)  # database location
+                       'database/databases/queue.db')
 
+db = SqliteDatabase(DB_PATH)  # database location
 
 class BaseModel(Model):
     class Meta:
         database = db
 
 
-class Queue(Model):  # database model
+class Queue(BaseModel):  # database model
 
     identifier = IntegerField()  # Unique ID
     message_raw = TextField()
@@ -272,3 +274,13 @@ class Fragmentation(BaseModel):
             if(val.identifier > highVal):
                 highVal = val.identifier
         return highVal
+
+if __name__ == '__main__':
+    '''This script can be ran standalone to create DB tables and check that classes are properly working'''
+    db.create_tables([Queue, Fragmentation, msgEvents, Topics])
+
+    queueDB = Queue()
+    fragmentationDB = Fragmentation()
+    msgEventsDB = msgEvents()
+    topicDB = Topics()
+
diff --git a/database/databases/queue.db b/src/database/databases/queue.db
similarity index 96%
rename from database/databases/queue.db
rename to src/database/databases/queue.db
index 393053933e96565b6bfe691bf6e093f117b0355d..7c2457ad825c8d0829018be3f0e0772827aef903 100644
GIT binary patch
delta 966
zcmb`GPjAyO7{(L2CZnvOJ)o=tX;FnXDot?U4%(kEO)6!YGHnuv$W2VKNa8GZSM}7j
z)9#=hCT?is3qZfX4txpjNPGrvyz!c>h8`#7wd}{Q?O&ePKE5g(Ull$+tez{1Vn7ri
zbVbpAd|G{8J6E2*{8<J?D=jKUWK_#97J8-MrNwWDU+z8|HZ|R>)zlH=s2yNVW8$&v
zc79{mUUO{ASzB+~R$Yi^m&TJph#5m2?DBq$>(+M5vA679tM$gQT6@jrs#VVtA{0V`
zV;{iG&1{?ynh@Tfxs)&$d7K>0*tl_0F6yRfE~}?~0ejfRT`U@1MituGSL0QVnxNkQ
zAQ>i(JnCcrIsx!~%y<q5;~4b=ihQ?CILng)9Ci5ni8k>d&83VmPG}@H7^KdC`Yu78
z5Jw!PGcHe%_W>u<N1z~}Jy%Mbo`w|>3kl~b<;`Ip4P&U8EM|X|R%d@Y+b0PIYbR(a
zS!1|4r<+Sl>S$jW8nO;P0!)^7G}+iphikAk;6zk+`?Wn?T}Hc!H=}z0`kJq440j5;
zxw4{;Tmd1OAPtHVj3K)!%6(f%Z|?|OhDAZtMhu5=LxQ|BCy~@MT@8TZvDz3%#^>q-
gZSoq^x3>Us527@FWc|8-QK{tL_ljN_?^u7_zy9zfRR910

delta 91
zcmZoTz}#?vd4jayBnAcsJ|JcWVjcztw*3=zjQJ-q=-J=o1&T28<uUN(Efy5u+nmQY
jQ&5x}D8>TBtU$~L#Oy%Ku~|mp3IAq=f(Q9PLcsw55#bS(

diff --git a/src/packet_dispatch_node.py b/src/packet_dispatch_node.py
index bb9990aa..1abcadd7 100755
--- a/src/packet_dispatch_node.py
+++ b/src/packet_dispatch_node.py
@@ -7,7 +7,6 @@ import roslib.message
 from dataclasses import dataclass, field
 from ros_acomms.msg import ReceivedPacket, Packet, CST
 from bitstring import ConstBitStream, ReadError
-from database.controllers.queueController import *
 from acomms_codecs.ros_message_codec import RosMessageCodec
 from acomms_codecs.packet_codecs import packet_codecs
 from database.controllers.queueController import *
@@ -153,7 +152,8 @@ class PacketDispatchNode(object):
             while True:
                 id = payload_bits.read('uint:8')
                 ros_msg = self.message_codecs[id].decode(payload_bits)
-                Queue.addRecieve(ros_msg, payload_bits)  # Update database with inbound message
+                # Update database with inbound message
+                Queue.addRecieve(ros_msg, payload_bits)
                 self.message_publishers[id].publish(ros_msg)
         except KeyError:
             # We don't have a config entry that matches this ID
-- 
GitLab


From 8e7532305923ed81980db77789098cab648a5b0a Mon Sep 17 00:00:00 2001
From: Brennan Miller-Klugman <millerklugmanb@wit.edu>
Date: Thu, 30 Sep 2021 16:36:48 -0400
Subject: [PATCH 4/4] Attempted to repair message queue system, adding messages
 working

---
 .../queueController.cpython-38.pyc            | Bin 8693 -> 9098 bytes
 src/database/controllers/queueController.py   |  45 +++++++++++-------
 src/database/databases/queue.db               | Bin 57344 -> 57344 bytes
 src/message_queue_node.py                     |  22 +++++----
 src/packet_dispatch_node.py                   |   1 -
 5 files changed, 41 insertions(+), 27 deletions(-)

diff --git a/src/database/controllers/__pycache__/queueController.cpython-38.pyc b/src/database/controllers/__pycache__/queueController.cpython-38.pyc
index c7fdbc985d52cd8e48975fa9268321a97eeef8d9..c21c3b64073ec4afee8fda7cbb740f79a542e89d 100644
GIT binary patch
delta 2645
zcma)8TWl0n7@jk`J3D(X+skg-?e?a?K&9M70UJsw<svPFE+_+7hn>T+bi2DfGX>gg
z?E^lUG=hg`ZH+!O@d0gOOpJ*SJ{TjBL`lp%sL`0H4{A&#KB)NrXSOXgk}#Y3=3M@B
z&i{Y^<?NsPUO5mw5s3sPc#<_IjmsOaM^hwyedC>j*9dE(wR6q1fi}`4P2{}ugh()w
zS~b#eRnl8%BVd{)0b6MTu#L9Owa-gL;$7spuhnAN?!_7MHc7{cBvDBxrliZ%V-i!I
zCAydT+}7eQk4(tIVvqMWAr6l!Nun>VQ(~mC7F1+O0L{Z1;p6zSrHP`=(!9OCmY?%~
z*@+TP$Yw^-8q3=@HJ2p)LXxto?5O$^Y2=x};0&m;{P>{dfp;Ig*7E=+nFi&iWn1c(
z&NfYZDxQm^LPotTjIC(`2(#m1+cW2#N12XkZ^e62uE?V@+YBwA>8Z998HgB`#y6Yt
zu%vmXhI7;&qF4A|fx&G*--BmpK=V6Frc};nEk`Z0e2L}lNhdr;Ez8K!5qq*sSq&(~
z&`53sTekZI3wpFrx=bzmSapeB|6qS`_&^5MI-W5rS2h3jU|qZpc9CQ+@e-8;i9%GF
ze-ONxScfO9N7#a}0YH;kBcBOvB{}|H=q+_E3@s2CYOk+T%cb%WdfqPZsqmX?SJrAE
z<HW$kBZg%kvMDp{5)+%Tr_fnJTzEX6yI_y=k;tc{ldq2+O5?GPziP01v}+>U4m14K
z=zcQ6??i6}JD{5x&=jWecVexu`tw*v?`{}m&mi<5AXSc%Wz>M<cS1+2?zIQndZ=tn
z7D|TMm$xm}3nQ!#^$G_K(>zYId3uSyv^ZF^hU~?&7)FT}WoQ=b18^csZpoHRnq^2P
z!>|$I=>+Zk7#)GFsL49{8*6s-C2+1EVL!l-rZNRPt8_*+SA5I{psUphC$l{~5x>>c
zivp+s>jKbK5!EM+0{Hdc_y#h&sMV&)463}OEn3SCB0Q)gJA`AM2!|0)Asj(it!m&=
zRDBj#3Vmytg1HF_e^=L~oxtWvgy#@2sj75i3yolDgj2wNGg;t0_0xbK)*nuYRQ5q0
z6m3T-P1xlLdjSJ6)u542_&~!YMU(>(PLU9!_5>?tjB^G08UMcF#(K61wuS6p&KTjm
z1-EeM{tFO$E^qHP)%uY4FZlb3p&1dlfegK359DdVbh+M!8j5sS#UWblg!@XR0yT>6
zf)xq{w_9;3x}3Elv?Dx@@C3q>2s;p-M%X2R=h$9s9zf6$P9qoy=MZ-CZ{#HZC>i6o
zlL`JqGESoWZgR`aF_?UKP1Tj<I_sxuwc-Yr$AYG6dQIQUL=TyY>3><$!=?wyRDgBE
zwi&hta43rryYyh&53eYdNlB`eYzb>)MXq=%-U^iRSs#C`@hJ5P;#SI7(HNs_GpxKn
z8)R{g8$XI$)M85shIJ1-r7sWQ4(PfJZsJB2rY_&p6i9vxW0ndad4fci<a<*wQBnD?
zsRW;`k8EjzDO;K&qFRTii7i*iRk+l~fs|p1oq=fx8njxJS$38mZAs>l{vYABya19X
zOr~nxLcPh%Tjhc=DFRO;Lt2CdK!V|I%0dVfLB!5^Y~>Mz`8jMYR|Iwuy9PS91yBDP
zUMmWK#ku;Eg9_{BHR%_5Pcv^C#RU&mT(`h(6m0y{945c>VCu}K3m~X@-P$EX+fl62
z1j|xU5Qd<*i=8lc7obA&8>w|<h+|fuN_Y8JseB@fTX<YSk#(3R%oI|XJAd;+OK-O@
z6=(E72I`-i4|pOgA&A@=!<MjB1ab?W{R_NRTNSv7;3un!s<2V8wT`D71365pM-1+z
zzZ0tIF})DT0n=wH@C}-NQ-yEH3_xH9oyY()a$~e;8+N`_bVHoatGhZ_0S<$^g2Cq@
z{31DMNSObTej#2&dD)dm7Fl_jy*Qt0EiJJpHvB*Kl04bAjf|u3yHH3W%X}<(Uhsi)
zjyf_jX5@<_BU%T)*0$S+7oNj!x1~uv|E=w`AwpMpRGf#=2sX1)dBn!zXz3C6O00Gr
z|M&!*p#6PL?EL+Nz(5H8FYLbr3|Y|e+xW+%zfXi{g5PY9ksbWw_Q&dbYvMJjn&zM!
Lm*cMUz<vJ#*w;#o

delta 2084
zcma)+U2GIp6vyYz?#@nkyB}=Z{b;w_WkKi&@=*|#FU!&{A7VjhAr9a&-MPD6y1TR7
zovqZAV1kK>PmpV(@kQ|Cfj3MOUrbE=c)%DEVnQ?%<HrLU<C8Hy5Tba_9a^Bi*rvbU
zx#!$_|L5Fu&l`LG+8v$<g#r@%guk9JZVp}wr-^)N&8@ZUAKG=fkH%?=CTXJLy+VjY
z;};}7O;f;rngnKO0yx0;lEb8L{$27Ri3f-zQAsDJq|4L;RxkBkkme^nG9lVL_1+}C
zdaN`>OVdS0jnb4cS*=y=S;sSD)SUT)N`$0hpoS#?nujIfpW`c8?S{?L{AK?yE70l$
zZDurD^{Q=CbHU3myeI>|SJk^D$>%zrCo=y*Np9HyYi#K<xgw27Cv)fIIr+3aN2X<_
z!pLJTjJ&c$q$AQ9)s&CG;e0qSkuHpBJ{E=bOdQ-<r>&Mzp+$SPN%{3af5_Fyo<Ns0
zPX$*iDhwqLKODT4ScS8z5yGy)NRwHTr#gp-!4GtPu<|jOx=KWxEQF2%VHi37rqw(^
z%XXP}hu#Z5*lLh}7V1%OFE+%#52eX8zZ1Gna{Tk~z6>7b`0uM^;y|C9MNhp|;aX&r
zSlo)-2rLK3@q(tXL0*Us!0K`|H@pQmeG2h30^@d+5?qu`*=G2FT;PqctpGjPG-hj-
zVdkrLi*3j0o&0(<L(b0s8eK+q;Q<V)LDIr(P*?#@Xd!_m%cLdNg@$Jlc;QaKuGVSM
zFimQ*9DgRZB_BuIFk%;AOjDVHA4`Oywg<Pw^6;s3-<QfiirwgS<)POafUb&KIcn74
z%HHmJoV-5&MOTKLN68CnF+?_kcvK@B#km}!fS5q+MJ$!t@qeW5!IeU97wcWjE!&6F
z`w`C}4)Ai%+MY*)U}>~W@K1XB$ou@;o^wRw6Y>2CQK!Bpt2S&$vD$XCZHt=wIIfXT
z`A_i~MLe(}ej+hIzT|U>HwS;a2S4zR7VBJG*j2Z*b^lbgGUev#=WLe`bNv-RlpH&c
zx9W6^)3bJ=N^2(DiGtQ6M71x;8%FAM<}Is6jfT76LE1z<*WpsUW|l!@5i1dE5gQO2
z5t|TCO0b6Qz~OGhQN%ICal{G4X1*+)<i}DGk3tk)O|2p-znI#%0}uLdlhPH@u>4eo
zw$)*JP1*FkM)ZJ5pd0ucW!!33ORX{fPIfbYv3K8juL#poA=MUx9`<mV%B+L}#KpRA
zMvp91KDG`_=OOesf*40EdHpK9f>G#tv)_TSr2-_6N@PjC_ew-4;oJHGd5w%|LDqrm
zv5T`H;uJz$fdM0Zv1c`mI%O1%lZatN8B>%(vERU73pY^-WD9>G2wq74sZ`K$6(7zf
z(@R7K8LeA0ROr0TTm2{2nc%B=-3Kqj*il+mo0X_|MPpEiVl&LkRmcnfyMHy=&#|n^
zF!ej%k*UT`;uap)QIzT+zu1=)b@vCql^I@t3LL}<ddE2Q3%8c=FgA@4byvj^?&%hR
z50Cwk8v_YaftXH(Lj`73>a<}ScGYU|Yw4|FHU&P=;TpI~gv)PdBR<@T@H5$$VyDqA
zyLOC#)fSce`JJq_pj>RQM0tq6wR{t)W43*0zl&O;IC9itv2Iiw#iEwwk=!;P_FtVJ
z%w<S7Z{&_27L_E@CN{>))<bSnF51Rqjka|EnKo_Hk-Q!$KO7cJ1flDX&On?k`0(3U
q!XtT3Jl#=8-CnoMc8%`zz>cjU{=$lt%eF>C(R8#=m1DA71pfjGvCvWg

diff --git a/src/database/controllers/queueController.py b/src/database/controllers/queueController.py
index f7f0dac0..53626f4a 100755
--- a/src/database/controllers/queueController.py
+++ b/src/database/controllers/queueController.py
@@ -13,7 +13,7 @@ import rospy
 rospack = rospkg.RosPack()
 
 DB_PATH = os.path.join(rospack.get_path('ros_acomms'),
-                       'database/databases/queue.db')
+                       'src/database/databases/queue.db')
 
 db = SqliteDatabase(DB_PATH)  # database location
 
@@ -24,8 +24,8 @@ class BaseModel(Model):
 
 class Queue(BaseModel):  # database model
 
-    identifier = IntegerField()  # Unique ID
-    message_raw = TextField()
+    identifier = IntegerField(null=True)  # Unique ID
+    message_raw = TextField(null=True)
     message_encoded = TextField(null=True)
     message_type = TextField(null=True)
     dest = IntegerField(null=True)
@@ -86,11 +86,17 @@ class Queue(BaseModel):  # database model
         Queue.delete().where(
             Queue.identifier == idVal).execute()
 
-    def getNextMessage(self, message_type):
+    def getNextMessage(self):
         '''getNextMessage is used to get the next message within a specific message type queue'''
+        lowestPriority = -1
+        lowestPriorityMSGType = None
+        for msg in Topics:
+            if(msg.priority > lowestPriority):
+                lowestPriority = msg.priority
+                lowestPriorityMSGType = msg.message_type
 
         for val in Queue.select():
-            if(val.message_type == message_type):
+            if(val.message_type == lowestPriorityMSGType):
                 if(val.completed == False):
                     return val.message_encoded
 
@@ -208,18 +214,23 @@ class Queue(BaseModel):  # database model
 
 
 class Topics(BaseModel):  # Gets created at yaml read
-    message_type = IntegerField()
-    message_name = TextField()
-    priority = IntegerField()
-    is_active = BooleanField()
+    message_type = IntegerField(null=True)
+    message_name = TextField(null=True)
+    priority = IntegerField(null=True)
+    is_active = BooleanField(null=True)
+    direction = IntegerField(null=True)
 
-    def addEntry(self, message_typeVal, priorityVal, is_activeVal, directionVal=None):
+    def addEntry(self, message_typeVal, message_nameVal, priorityVal, is_activeVal, directionVal=None):
         '''addEntry function creates a table entry based on function parameters'''
 
-        Topics.create(message_type=message_typeVal, priority=priorityVal,
+        Topics.create(message_type=message_typeVal, message_name=message_nameVal, priority=priorityVal,
                       is_active=is_activeVal, direction=directionVal)
 
+    def displayTable(self):
+        '''*Test Function* used to display all values within the database'''
 
+        for command in Topics.select():
+            print(command)
 class msgEvents(BaseModel):
     identifier = IntegerField()
     time = TimeField()
@@ -249,10 +260,10 @@ class msgEvents(BaseModel):
 
 
 class Fragmentation(BaseModel):
-    identifier = IntegerField()
-    time = TimeField()
-    source = IntegerField()
-    dest = IntegerField()
+    identifier = IntegerField(null=True)
+    time = TimeField(null=True)
+    source = IntegerField(null=True)
+    dest = IntegerField(null=True)
 
     def removeEntry(self, idVal):
         '''removeEntry function removes a table entry based on the specified idVal'''
@@ -277,10 +288,12 @@ class Fragmentation(BaseModel):
 
 if __name__ == '__main__':
     '''This script can be ran standalone to create DB tables and check that classes are properly working'''
-    db.create_tables([Queue, Fragmentation, msgEvents, Topics])
+    db.drop_tables([Queue, Fragmentation, msgEvents, Topics])
 
+    db.create_tables([Queue, Fragmentation, msgEvents, Topics])
     queueDB = Queue()
     fragmentationDB = Fragmentation()
     msgEventsDB = msgEvents()
     topicDB = Topics()
 
+    topicDB.displayTable()
diff --git a/src/database/databases/queue.db b/src/database/databases/queue.db
index 7c2457ad825c8d0829018be3f0e0772827aef903..5af9916508eb874eefde8083b0ff4cb3bc8252fd 100644
GIT binary patch
literal 57344
zcmeI*&u`mg9LI5|ZIZV6wLu^h=<0Trx`+u5a6w{>u8mMx*RhfcCNxEE@}#wv#F_21
ztGLik`wMnK+FyY92RI=90whixI3Ohc1jHHfwd17r)Fsh+S<u^(o8<AwPoC%XwJo_h
z_=`K6rmOV2Wj761e_y+x#bVkgx~^&I?=(%z$~q})QqvOu$m^7>y(QLr^ZZxo<K64O
zXC#oAy{Bbc*^8Oo^wZS5d|*KU0R#|0009ILc-aJgJ@<f)uO*U&xw+WMj%(~Tlxwxk
zs?+^Dy|!IiEtmB2>W$5kzSM18(yuI;wIzLHt6ciDw5@O5F6&!6o16OQ+Z(r5x9{pV
zOLwp8OHJiC#-6IUM{U&)huT|4Q`*a=ugcQcwoS`6-J@{ObSg&GH4h?t*GyYUqt)ss
zizna2lZ8Sd_P9I#2OZT>{-0!Y^nTljgEwoc<(hR<+5PVc4c#^l`y=m7Q?;sAO$kdd
zqndKu@V;&1s9_nk%C6};{ZK=-_T2rTE%5X-EXQ;rla$7~ZR|B=RED>q;@U>_espw3
zqhTFZBC+rQWRI59G+oyl<mq=+t!7(_9&PMR>e<?zp?|zSl`JeQ#7@5SkDjKprw(M4
zPXA3JT7`c5i&vt5#EFV)`bW{mtx~wvu{w73^tcIciqxgQp?G|CGFezziJcVuX+#tw
zAL7yJM7F*7lp-^c56?_MS|Z~vem&u%AIJUC?i!A2%F);8znO}Tvfn<+2#b$$v1DO>
zKKA&5-@6<kO`}zFdao1FzI)B1bS?8Z<IL(;K{f}ymHmr`cPgE+v3vT2H&`9F-I3ER
zIQMMjcI?)gh=h;yA*S->-oYeYg?HLlM&xv98V}{94JT~xV&k}5oo2W-ID<~-A>CUC
z%085<Tx3-0p{jO#=kV@zv!T}gD~LCxqIYjDH~lOKAb<b@2q1s}0tg_000Iag@c$P`
z$P~Tb`n`V#q-9OXIxFjptkbf}@Bf;YiOcHcCS+wn009ILKmY**5I_I{1Q0*~fiV%_
z_x~{|6vIRS0R#|0009ILKmY**5J2D@0^I+f1CL!1KmY**5I_I{1Q0*~0R#{j69Ml3
z$D~jU69EJeKmY**5I_I{1Q0*~fpZ9O|9=iVc0~XI1Q0*~0R#|0009ILKwwM+y!-#f
z)V!AaBllzONv@guEVrEfH~U-mhwQi6O7^2{A@g_Um(2H>N11z>4>GgqKhr;_pQaDe
zchc{qr&E8Weo8$_nW>MZ3l;<rKmY**5I_I{1jbunVKJ%2K8($1seRRG*E@}hYgrBF
zJ!RXLy;{AWFX|GwC4t#!z_z-NIV@H8t!@92VfiZy61^$Wnde7KE!RHEuUwMkh9sw>
zN$<&BYgVgndXMUsC-vo*=Owr%!DKXOD@VE8^4vC8t>4Hmza69!gHnrcN$Q&P6_2L8
zFWqT-&lk)uzZrBi6^#TBRtx5Nc}`NR($VCZDSwKWF250kCZeJCk-KlTf}R!&Np0r3
z^mJCH+t0%5($ByGy(WPX3zV1WXa#yzk|P!96$uV6(8VA%xIh;qHKahfprgSB%1UZz
zfigj8V1d%Z3zU+61{P>m0wWe^Mxvt?Xj+mZ6(}je;RQ+rslf$`OKM1grh<+J7idya
zLklz!ga#HUHnc$FeSv252q1s}0tg_000IagfB*srAb<b@2q1s}0tg_000IagfB*sr
zAb<b@2q1s}0tg_000IagfB*srAb<b@2q1s}0tg_000IagfB*srAb<b@2q1s}0tg_0
k00IagfB*srAb<b@2q1s}0tg_000IagfB*sryc`1m0qW;issI20

delta 300
zcmZoTz}#?vd4iM>_jCpZ20kEW24Z#w2DT#;b&RBdqI&i>d4WP~ToMd?d3=Jrc0A==
z&$%Qv3kodY<ZAR`V;7f{WNa#)oX(Y0ui)n&qTm<m<D;XXl$%;yoS2>(pO=`Ms-zI&
z8Ud6+$QBf3<`-p_R4OTW`h~c<y9OakW){aMCYNNE0rjEFYBuw-vWtt0GqzTiBqrsg
z7M7-#LI`%}pvjip-jn%wd8A-=7bTXXS-p7%_i0AX0A_Y^b#=z3_{llEr9d4an+5rV
igm}4t{^SN?9w6r3Y@l$(e=~~!v%qG7h7a)*12_Sn?@pHh

diff --git a/src/message_queue_node.py b/src/message_queue_node.py
index fbef4f8d..6392a66f 100755
--- a/src/message_queue_node.py
+++ b/src/message_queue_node.py
@@ -24,7 +24,6 @@ from acomms_codecs.packet_codecs import packet_codecs
 import json
 from rospy_message_converter import message_converter
 # TODO: move this
-from packet_dispatch_node import DecoderListEntry
 
 
 @dataclass
@@ -148,7 +147,7 @@ class MessageQueue(object):
         if self.order == 'fifo':
             if len(self._queue) < self._queue.maxlen:
                 # drop this message if the queue is full.
-                Queue.addRight(
+                self.queueDB.addRight(
                     message,
                     encoded_bits,
                     self.dest,
@@ -161,8 +160,8 @@ class MessageQueue(object):
                 # master message queue draws from the top
         else:
             # For LIFO, we threw away the old messages
-            Queue.clearLifoMessages(self.message_codec)
-            Queue.addLeft(
+            self.queueDB.clearLifoMessages(self.message_codec)
+            self.queueDB.addLeft(
                 message,
                 encoded_bits,
                 self.dest,
@@ -213,6 +212,11 @@ class MessageQueue(object):
 
 class MessageQueueNode(object):
     def __init__(self):
+        self.queueDB = Queue()
+        self.fragmentationDB = Fragmentation()
+        self.msgEventsDB = msgEvents()
+        self.topicDB = Topics()
+
         self.topic_list = [] #List of dict for topics
 
         rospy.init_node('message_queue', log_level=rospy.DEBUG)
@@ -291,14 +295,13 @@ class MessageQueueNode(object):
                 
                 self.topic_list.append(topic) #append topic to list
 
-                Topics.addEntry(
+                self.topicDB.addEntry(
                     message_codec_params['id'],
                     ros_type_name,
-                    priority,
+                    int(priority),
                     message_codec_params.get(
                         'is_active', True),
                 )
-                
                 rospy.Subscriber(actual_topic_name,
                                  AnyMsg,
                                  partial(self.on_incoming_message, topic_name=actual_topic_name))
@@ -341,7 +344,7 @@ class MessageQueueNode(object):
         # Check if incoming msg topic name matches any of the active queue topic names
         found = False
         for val in Topics.select():
-            if (topic_name == val.message_name):
+            if (topic_name in val.message_name):
                 rospy.logdebug(
                     "TOPIC: {} in active queue list, appending msg".format(topic_name))
                 found = True
@@ -390,7 +393,7 @@ class MessageQueueNode(object):
         msg.queue.mark_transmitted(msg)
 
     def get_next_message(self, remove_from_queue=False, check_reachability=True):
-        return Queue.getNextMessage()
+        return self.queueDB.getNextMessage()
 
     def get_next_message_smaller_than_bits(self, size_in_bits, dest_address=None, packet_codec=None,
                                            check_reachability=True):
@@ -483,7 +486,6 @@ class MessageQueueNode(object):
         while not rospy.is_shutdown():
             # self.time_since_last_tx_pub.publish(rospy.Duration.from_sec(self.age_of_oldest_queue.total_seconds()))
             # self.messages_in_queues_pub.publish(Int32(self.message_count))
-            self.update_queue_status()
             rate.sleep()
 
 
diff --git a/src/packet_dispatch_node.py b/src/packet_dispatch_node.py
index 1abcadd7..e62fdb97 100755
--- a/src/packet_dispatch_node.py
+++ b/src/packet_dispatch_node.py
@@ -153,7 +153,6 @@ class PacketDispatchNode(object):
                 id = payload_bits.read('uint:8')
                 ros_msg = self.message_codecs[id].decode(payload_bits)
                 # Update database with inbound message
-                Queue.addRecieve(ros_msg, payload_bits)
                 self.message_publishers[id].publish(ros_msg)
         except KeyError:
             # We don't have a config entry that matches this ID
-- 
GitLab