diff --git a/requirements.txt b/requirements.txt index 158f42abc16d05c36ced4308d860033cf43ede7d..b4ada5a6f1333f9a27be8d6daace2990402ee020 100755 --- a/requirements.txt +++ b/requirements.txt @@ -11,4 +11,6 @@ crccheck bitstring msgpack ltcodecs -whoi-gitver \ No newline at end of file +whoi-gitver +peewee +rospy_message_converter \ No newline at end of file diff --git a/src/database/app.py b/src/database/app.py new file mode 100644 index 0000000000000000000000000000000000000000..47763cda4e955070eecff16322528548119fe423 --- /dev/null +++ b/src/database/app.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 +from controllers.queueController import * +from flask import Flask, render_template, jsonify, request, redirect, url_for + +import rospkg +import os + +import rospy + +rospack = rospkg.RosPack() + +UPLOAD_FOLDER = os.path.join(rospack.get_path('helpful_tools'), 'src/uploadedFiles') +ALLOWED_EXTENSIONS = {'txt', 'py'} + +app = Flask(__name__) +global commandEnter +global PythonEnter + +app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER +def allowed_file(filename): + return '.' in filename and \ + filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS + +@app.route("/", methods=["GET", "POST"]) +def dataViewer(): #dataviewer is the main page of the web application + return render_template("dataViewer.html") #display templates/dataViewer template + +@app.route("/settings", methods=["GET", "POST"]) +def settings(): #dataviewer is the main page of the web application + if request.method == "POST": #on post send form data to either commandEnterInterface or pythonController nodes + try: + if(request.form.get('confirm' )== "on"): + Queue.resetTable() + except: + print("Error: Invalid Entry") + return render_template("settings.html") #display templates/dataViewer template + +@app.route("/update", methods=['POST']) #update endpoint used to fresh database on a timeout +def update(): + + data = queueDB.returnTable() + + formatData = [] + + for command in data: #format data as dict which will be sent as a JSON + formatCommand = {"id": command.identifier, "message_raw": command.message_raw, "message_type": command.message_type, + "posistion": command.posistion, "direction": command.direction} + formatData.append(formatCommand) + return jsonify(formatData) + + +if __name__ == '__main__': + #Clean temp file directory + for file in os.listdir(UPLOAD_FOLDER): + os.remove(os.path.join(UPLOAD_FOLDER, file)) + + #DANGER + #This is test code used to reset databases, only to be used in test env + #Queue.drop_table() + + #Queue.drop_table() + + queueDB = Queue() + #Start flask application + app.run(host='0.0.0.0', port=8000) + + diff --git a/src/database/controllers/__pycache__/queueController.cpython-38.pyc b/src/database/controllers/__pycache__/queueController.cpython-38.pyc new file mode 100644 index 0000000000000000000000000000000000000000..88a1c03d47deab8117d579da720e59e18bb0d6f5 Binary files /dev/null and b/src/database/controllers/__pycache__/queueController.cpython-38.pyc differ diff --git a/src/database/controllers/queueController.py b/src/database/controllers/queueController.py new file mode 100644 index 0000000000000000000000000000000000000000..46478a02ec040494a3acffa97931958ce79d01a8 --- /dev/null +++ b/src/database/controllers/queueController.py @@ -0,0 +1,340 @@ +#!/usr/bin/env python3 + +from time import time +from typing import Optional +from xmlrpc.client import Boolean +from peewee import * +import os +import rospkg +import json +from rospy_message_converter import message_converter +import rospy + +rospack = rospkg.RosPack() + +DB_PATH = os.path.join(rospack.get_path('ros_acomms'), + 'src/database/databases/queue.db') + +db = SqliteDatabase(DB_PATH) # database location + + +class BaseModel(Model): + class Meta: + database = db + + +class Queue(BaseModel): # database model + + identifier = IntegerField(null=True) # Unique ID + message_raw = TextField(null=True) + message_encoded = TextField(null=True) + message_type = TextField(null=True) + dest = IntegerField(null=True) + payload_bits = TextField(null=True) + length_bits = IntegerField(null=True) + posistion = IntegerField(null=True) # Pos in queue (relative to topic) + fragmentation_tracker = TextField(null=True) + allow_fragmentation = BooleanField(null=True) + transmitted = BooleanField(null=True) # Transmitted state + completed = BooleanField(null=True) # Transmission completed + message_type = IntegerField(null=True) # Corresponds to message table + direction = IntegerField(null=True) + sequence_number = IntegerField(null=True) + topic_name = TextField(null=True) + ros_pkg = TextField(null=True) + header = TextField(null=True) + def get_next_message_smaller_than_bits(self, size_in_bits, dest_address=None, packet_codec=None, + check_reachability=True): + '''get_next_message_smaller_than_bits returns the next message in the queue with a bit count smaller then the specified parameter''' + + try: + for val in Queue.select(): + next_message = None + + if dest_address and (dest_address != val.dest): + # Move on to check the next message + rospy.logdebug("Disqualified: dest_address") + continue + if not val.allow_fragmentation and val.length_bits > size_in_bits: + rospy.logdebug("Disqualified: too large, no fragmentation") + continue + break + + return next_message + + except: + return None + + def bit_count(self): + '''bit_count is used to count bits across all messages''' + + count = 0 + + for val in Queue.select(): + count += val.length_bits + + return count + + def message_count(self): + '''message_count is used to count all unsent messages across every topic type''' + + counter = 0 + for val in Queue.select(): + if (val.transmitted == False): + counter += 1 + return counter + + def removeMessage(self, idVal): + '''removeMessage is used to delete a message with an identifier of idVal''' + + Queue.delete().where( + Queue.identifier == idVal).execute() + + def getNextMessage(self): + '''getNextMessage is used to get the next message within a specific message type queue''' + lowestPriority = -1 + lowestPriorityMSGType = None + for msg in Topics: + if(msg.priority > lowestPriority): + lowestPriority = msg.priority + lowestPriorityMSGType = msg.message_type + + for val in Queue.select(): + if(val.message_type == lowestPriorityMSGType): + if(val.completed == False): + return val.message_encoded + + def clearLifoMessages(self, messageType): + '''clearLifoMessages is used to delete all entries within a certain message type''' + + Queue.delete().where(Queue.message_type == messageType).execute() + + def markACK(self, messageVal): + print(messageVal.id) + print("HERE DAD HERE HAER") + Queue.update(completed = True).where(messageVal.id == Queue.identifier).execute() + + def popLeft(self): + '''popLeft is used to delete the left most value from the queue''' + + Queue.delete().where(Queue.identifier == 0).execute() + + def popRight(self): + '''popRight is used to delete the right most value from the queue''' + Queue.delete().where(Queue.identifier == self.getLastIterator()).execute() + + def updateTransmit(self, msg, transmitVal): + '''updateTransmit function is used to update a database entry's transmit value parameter''' + Queue.update(transmitted=transmitVal).where( + str(msg.payload_bits) == str(msg)).execute() + + def updatePayload(self, msg, payloadbits): + '''updateTransmit function is used to update a database entry's transmit value parameter''' + + for queue in Queue: + if(str(msg) == str(queue.message_raw)): + queue.payload_bits=payloadbits + queue.save() + + def addValue(self, message, msg_type): + '''Add function is used to create database entries to the queue''' + + idVal = self.getLastIterator() + 1 + order = None + countVal = -1 + + for val in Topics.select(): + if (msg_type in val.message_name): + order = val.order + messagePointer = val.identifier + + if(order == 'fifo'): + for val in Queue.select(): + if (val.message_type == messagePointer): + countVal += 1 + + posistionVal = countVal + 1 + + Queue.create(identifier=idVal, + message_raw=message, + message_type=msg_type, + posistion = posistionVal, + direction = "Outgoing" + ) + else: + self.clearLifoMessages(messagePointer) #clear all messages within specified type + for val in Queue.select(): + if (val.message_type == messagePointer): + countVal += 1 + posistionVal = countVal + 1 + + Queue.create(identifier=idVal, + message_raw=message, + message_type=messagePointer, + posistion = posistionVal, + direction = "Outgoing" + + ) + + def appendQueue(self, message, msg_type, ros_pkgVal, topic_nameVal,headerVal): + '''Add function is used to create database entries to the queue''' + + idVal = self.getLastIterator() + 1 + order = None + countVal = -1 + + for val in Topics.select(): + if (msg_type in val.message_name): + order = val.order + messagePointer = val.identifier + + if(order == 'fifo'): + for val in Queue.select(): + if (val.message_type == messagePointer): + countVal += 1 + + posistionVal = countVal + 1 + + Queue.create(identifier=idVal, + message_raw=message, + message_type=msg_type, + posistion = posistionVal, + direction = "Outgoing", + ros_pkg = ros_pkgVal, + topic_name = topic_nameVal, + header=headerVal + ) + else: + self.clearLifoMessages(messagePointer) #clear all messages within specified type + for val in Queue.select(): + if (val.message_type == messagePointer): + countVal += 1 + posistionVal = countVal + 1 + + Queue.create(identifier=idVal, + message_raw=message, + message_type=messagePointer, + posistion = posistionVal, + direction = "Outgoing", + ros_pkg = ros_pkgVal, + topic_name = topic_nameVal, + header=headerVal + ) + + def displayTable(self): + '''*Test Function* used to display all values within the database''' + + for command in Queue.select(): + print(command.transmitted) + + def returnTable(self): + '''Retruns contect of select() function''' + + return Queue.select() + + def getLastIterator(self): + '''Gets the highest iterator value in the database''' + highVal = -1 + for val in Queue.select(): + if(val.identifier > highVal): + highVal = val.identifier + return highVal + + def resetTable(self): + Queue.delete().execute() + + +class Topics(BaseModel): # Gets created at yaml read + identifier = IntegerField() + message_type = IntegerField(null=True) + message_name = TextField(null=True) + priority = IntegerField(null=True) + is_active = BooleanField(null=True) + order = TextField(null=True) + + def addEntry(self, message_nameVal, priorityVal, is_activeVal, orderVal): + '''addEntry function creates a table entry based on function parameters''' + + Topics.create(identifier = self.getLastIterator() + 1, message_name=message_nameVal, priority=priorityVal, + is_active=is_activeVal, order=orderVal) + + def displayTable(self): + '''*Test Function* used to display all values within the database''' + + for command in Topics.select(): + print(command.message_name) + + def getLastIterator(self): + '''Gets the highest iterator value in the database''' + highVal = 0 + for val in Topics.select(): + if(val.identifier > highVal): + highVal = val.identifier + return highVal + +class msgEvents(BaseModel): + identifier = IntegerField() + time = TimeField() + source = IntegerField() + dest = IntegerField() + + def removeEntry(self, idVal): + '''removeEntry function removes a table entry based on the specified idVal''' + + msgEvents.delete().where( + msgEvents.identifier == idVal).execute() + + def addEntry(self, identifierVal, timeVal, sourceVal, destVal): + '''addEntry function creates a table entry based on function parameters''' + + msgEvents.create(identifier=identifierVal, time=timeVal, + source=sourceVal, dest=destVal) + + def getLastIterator(self): + '''Gets the highest iterator value in the database''' + + highVal = -1 + for val in msgEvents.select(): + if(val.identifier > highVal): + highVal = val.identifier + return highVal + + +class Fragmentation(BaseModel): + identifier = IntegerField(null=True) + time = TimeField(null=True) + source = IntegerField(null=True) + dest = IntegerField(null=True) + + def removeEntry(self, idVal): + '''removeEntry function removes a table entry based on the specified idVal''' + + Fragmentation.delete().where( + Fragmentation.identifier == idVal).execute() + + def addEntry(self, identifierVal, timeVal, sourceVal, destVal): + '''addEntry function creates a table entry based on function parameters''' + + Fragmentation.create(identifier=identifierVal, + time=timeVal, source=sourceVal, dest=destVal) + + def getLastIterator(self): + '''Gets the highest iterator value in the database''' + + highVal = -1 + for val in Fragmentation.select(): + if(val.identifier > highVal): + highVal = val.identifier + return highVal + + +if __name__ == '__main__': + '''This script can be ran standalone to create DB tables and check that classes are properly working''' + #db.drop_tables([Queue, Fragmentation, msgEvents, Topics]) + db.create_tables([Queue, Fragmentation, msgEvents, Topics]) + queueDB = Queue() + fragmentationDB = Fragmentation() + msgEventsDB = msgEvents() + topicDB = Topics() + + queueDB.displayTable() diff --git a/src/database/databases/queue.db b/src/database/databases/queue.db new file mode 100644 index 0000000000000000000000000000000000000000..f1d38a478ba58b84c13ebfd891d56abe58c7f27d Binary files /dev/null and b/src/database/databases/queue.db differ diff --git a/src/database/templates/dataViewer.html b/src/database/templates/dataViewer.html new file mode 100644 index 0000000000000000000000000000000000000000..fa0a79fe656c2d4a3e643599a98e5c6ccf940318 --- /dev/null +++ b/src/database/templates/dataViewer.html @@ -0,0 +1,236 @@ +<!DOCTYPE html> +<html> + +<head> + <script src="https://ajax.googleapis.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script> + <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.4.1/js/bootstrap.min.js"></script> + <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css"> + + <style> + div.fixed { + position: fixed; + left: 0; + bottom: 0; + width: 100%; + } + + .ButtonClass { + font-size: 16px; + width: 10%; + background-color: #5865f2; + color: white; + border: none; + float: right; + position: absolute; + right: 0; + bottom: 0; + } + + .form-select { + color: black; + height: 10vh; + width: 20%; + float: right; + position: absolute; + left: 0; + bottom: 0; + } + </style> + <title></title> +</head> + +<body> + <style> + body { + background-color: #23272a; + overflow: hidden; + margin: 0; + padding: 0; + height: 100%; + width: 100%; + font-family: Arial, Helvetica, sans-serif; + } + + .navbar { + overflow: hidden; + background-color: #2c2f33; + } + + .navbar a { + float: left; + font-size: 16px; + color: white; + text-align: center; + padding: 14px 16px; + text-decoration: none; + } + + .dropdown { + float: left; + overflow: hidden; + } + + .dropdown .dropbtn { + font-size: 16px; + border: none; + outline: none; + color: white; + padding: 14px 16px; + background-color: inherit; + font-family: inherit; + margin: 0; + } + + .navbar a:hover, + .dropdown:hover .dropbtn { + background-color: #5865f2; + } + + .dropdown-content { + display: none; + position: absolute; + background-color: #f9f9f9; + width: 120px; + box-shadow: 0px 8px 16px 0px rgba(0, 0, 0, 0.2); + z-index: 1; + } + + .dropdown-content a { + float: none; + color: black; + padding: 12px 16px; + text-decoration: none; + display: block; + text-align: left; + } + + .dropdown-content a:hover { + background-color: #ddd; + } + + .dropdown:hover .dropdown-content { + display: block; + } + + .test { + height: 100vh; + } + + a:link { + text-decoration: none; + } + + a:visited { + text-decoration: none; + } + + #table-wrapper { + position: relative; + } + + #table-scroll { + height: 95vh; + overflow: auto; + margin-top: 20px; + } + + #table-wrapper table { + width: 100%; + } + + #table-wrapper table * { + color: white; + } + + #table-wrapper table thead th .text { + position: absolute; + top: -20px; + z-index: 2; + height: 20px; + width: 35%; + } + + th, + td { + text-align: left; + } + + tr:nth-child(even) { + background-color: #2c2f33; + } + + input:focus { + outline: none !important; + } + </style> + + <div class="navbar"> + <a href="#">Home</a> + <a href="/settings" style="float:right"><i class="fa fa-cog"></i></a> + + + </div> + <div id="table-wrapper"> + <div id="table-scroll"> + <table id="myTable"> + <thead> + <tr> + <th>ID</th> + <th>Raw Message</th> + <th>Message Type</th> + <th>Queue Position</th> + <th>Direction</th> + </tr> + </thead> + <tbody></tbody> + </table> + </div> + </div> + <script> + setInterval(function() { + $.ajax({ + url: "/update", + type: "POST", + success: function(response) { + console.log(response); + for (commands in response) { + try { + var table = document.getElementById("myTable"); + var x = table.rows[response[commands].id + 1].cells[0]; + + var updateVal = response[commands].id + 1; + table.rows[updateVal].cells[0].innerHTML = response[commands].id; + table.rows[updateVal].cells[1].innerHTML = response[commands].message_raw; + table.rows[updateVal].cells[2].innerHTML = response[commands].message_type; + table.rows[updateVal].cells[3].innerHTML = response[commands].posistion; + table.rows[updateVal].cells[4].innerHTML = response[commands].direction; + + } catch (error) { + var table = document.getElementById("myTable"); + var row = table.insertRow( + document.getElementById("myTable").rows.length + ); + var id = row.insertCell(0); + var message_raw = row.insertCell(1); + var message_type = row.insertCell(2); + var posistion = row.insertCell(3); + var direction = row.insertCell(4); + + id.innerHTML = response[commands].id; + message_raw.innerHTML = response[commands].message_raw; + message_type.innerHTML = response[commands].message_type; + posistion.innerHTML = response[commands].posistion; + direction.innerHTML = response[commands].direction; + } + } + }, + error: function(error) { + console.log(error); + }, + }); + }, 1000); + </script> + +</body> + +</html> \ No newline at end of file diff --git a/src/database/templates/settings.html b/src/database/templates/settings.html new file mode 100644 index 0000000000000000000000000000000000000000..b1e32332549f3083a4e28b0f378e7e29c4114e98 --- /dev/null +++ b/src/database/templates/settings.html @@ -0,0 +1,142 @@ +<!DOCTYPE html> +<html> + +<head> + <script src="https://ajax.googleapis.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script> + <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.4.1/js/bootstrap.min.js"></script> + <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css"> + <title></title> +</head> + +<body> + <style> + body { + background-color: #23272a; + overflow: hidden; + margin: 0; + padding: 0; + height: 100%; + width: 100%; + font-family: Arial, Helvetica, sans-serif; + } + + .navbar { + overflow: hidden; + background-color: #2c2f33; + } + + .navbar a { + float: left; + font-size: 16px; + color: white; + text-align: center; + padding: 14px 16px; + text-decoration: none; + } + + .dropdown { + float: left; + overflow: hidden; + } + + .dropdown .dropbtn { + font-size: 16px; + border: none; + outline: none; + color: white; + padding: 14px 16px; + background-color: inherit; + font-family: inherit; + margin: 0; + } + + .navbar a:hover, + .dropdown:hover .dropbtn { + background-color: #5865f2; + } + + .dropdown-content { + display: none; + position: absolute; + background-color: #f9f9f9; + width: 120px; + box-shadow: 0px 8px 16px 0px rgba(0, 0, 0, 0.2); + z-index: 1; + } + + .dropdown-content a { + float: none; + color: black; + padding: 12px 16px; + text-decoration: none; + display: block; + text-align: left; + } + + .dropdown-content a:hover { + background-color: #ddd; + } + + .dropdown:hover .dropdown-content { + display: block; + } + + .test { + height: 100vh; + } + + a:link { + text-decoration: none; + } + + a:visited { + text-decoration: none; + } + + hr.solid { + border-top: 1px solid #999; + width: 50%; + position: relative; + } + + .ButtonClass { + font-size: 16px; + width: 10%; + background-color: #5865f2; + color: white; + border: none; + float: right; + position: relative; + right: 0; + bottom: 0; + } + </style> + + <div class="navbar"> + <a href="/">Home</a> + <a href="/settings" style="float:right"><i class="fa fa-cog"></i></a> + + </div> + <h2 class="h3 mb-4 page-title" style="color:white; position:relative; left:25%;">Settings</h2> + <hr class="solid"> + + <h5 class="mb-0 mt-5" style="color:white; position:relative; left:25%;">Reset Database</h5> + <div class="card" style="background-color:#2c2f33; position: relative; top: 5%; left: 25%; width: 50%; text-align: left;color:white;font-size: 16;"> + <div class="card-body"> + <br></bt> <b>Warning:</b> This option will delete all database data<br> + <div class="form-check"> + <form enctype="multipart/form-data" method="post" action="#"> + <label class="form-check-label" for="flexCheckDefault" style="font-size: 16px;"> + <br>Are you sure you want to continue? + </label> + <input class="form-check-input" type="checkbox" name="confirm" id="flexCheckDefault" /> + + <input type="submit" value="Submit" class="ButtonClass" style="position:absolute; height: 100%" /> + </form> + + </div> + </div> + </div> +</body> + +</html> \ No newline at end of file diff --git a/src/message_queue_node.py b/src/message_queue_node.py index 452b7b76b102752a7fa049f837f1c21adb7b07e2..0e7e201f184197a8ad0da870225330d8cfb5469c 100755 --- a/src/message_queue_node.py +++ b/src/message_queue_node.py @@ -21,6 +21,8 @@ from fragmentation_tracker import FragmentationTracker, FragmentationStartHeader from field_codecs import RosMessageCodec from acomms_codecs.ros_packet_codec import RosPacketCodec from acomms_codecs.packet_codecs import packet_codecs +from rospy_message_converter import json_message_converter +from database.controllers.queueController import * #TODO: move this from packet_dispatch_node import DecoderListEntry @@ -120,6 +122,7 @@ class MessageQueue(object): self.message_codec = message_codec self.allow_fragmentation = allow_fragmentation self.is_active = is_active + self.queueDB = Queue() self._last_tx_time = datetime.utcnow() self._queue = deque(maxlen=maxsize) @@ -152,6 +155,7 @@ class MessageQueue(object): is_active = self.is_active ) + self.queueDB.updatePayload(message,encoded_bits) if self.order == 'fifo': if len(self._queue) < self._queue.maxlen: # drop this message if the queue is full. @@ -173,6 +177,7 @@ class MessageQueue(object): # type: (QueuedMessage) -> None if queued_message in self._queue: queued_message.transmitted = True + self.queueDB.updateTransmit(queued_message, True) self._last_tx_time = datetime.utcnow() if (not self.ack_next_hop) and (not self.ack_end_to_end): self._queue.remove(queued_message) @@ -236,6 +241,9 @@ class MessageQueueNode(object): self.topic_queue = {} self.topic_headers = {} + self.queueDB = Queue() + self.topicDB = Topics() + self.destination_reachable = {} self.dest_sequence_num = defaultdict(lambda: 1) @@ -305,6 +313,7 @@ class MessageQueueNode(object): is_active=message_codec_params.get('is_active', True), message_codec=message_codec, ) + self.topicDB.addEntry(ros_type_name, message_codec_params.get('priority', self.default_priority), message_codec_params.get('is_active', True), message_codec_params['queue_order']) self.topic_queue[actual_topic_name] = new_queue rospy.Subscriber(actual_topic_name, AnyMsg, @@ -324,6 +333,16 @@ class MessageQueueNode(object): # Subscribe to ACK message to update queues rospy.Subscriber("from_acomms/encoded_ack", EncodedAck, self.on_ack_received) + table = self.queueDB.returnTable() + + for entries in table: + if(entries.transmitted != True): + namespace = entries.ros_pkg + entries.topic_name + + message = json_message_converter.convert_json_to_ros_message(namespace, entries.message_raw) + + self.topic_queue[entries.topic_name].append(message) + self.update_queue_status() rospy.loginfo("Message Queue started") self.spin() @@ -374,8 +393,13 @@ class MessageQueueNode(object): # Check if incoming msg topic name matches any of the active queue topic names if topic_name in self.queue_active_names: rospy.logdebug("TOPIC: {} in active queue list, appending msg".format(topic_name)) + json_str = json_message_converter.convert_ros_message_to_json(msg) + self.queueDB.appendQueue(json_str, msg_type, connection_header[0], topic_name, connection_header) + self.topic_queue[topic_name].append(msg) self.update_queue_status() + + else: rospy.logdebug("TOPIC: {} not in active queue list, skipping msg".format(topic_name))