From 169aa95903dfab2b54815a37c3866b0d5c25f7a9 Mon Sep 17 00:00:00 2001 From: Eric Gallimore <egallimore@whoi.edu> Date: Wed, 31 May 2023 12:13:59 -0400 Subject: [PATCH 1/3] Added return values to encode functions: error flag and error message. --- src/ltcodecs/bool_codec.py | 7 ++++--- src/ltcodecs/bytes_codec.py | 12 ++++++++---- src/ltcodecs/ccl_latlon_bcd_codec.py | 5 +++-- src/ltcodecs/ccl_latlon_codec.py | 5 +++-- src/ltcodecs/field_codec.py | 13 +++++++++---- src/ltcodecs/fixed_len_array_codec.py | 11 +++++++---- src/ltcodecs/fixedint_codec.py | 5 ----- src/ltcodecs/float_codec.py | 5 +++-- src/ltcodecs/ieee_float_codec.py | 6 +++--- src/ltcodecs/linspace_float_codec.py | 5 +++-- src/ltcodecs/padding_codec.py | 5 +++-- src/ltcodecs/ros_message_codec.py | 4 +++- src/ltcodecs/ros_msg_field_codec.py | 12 ++++++++---- src/ltcodecs/rostime_codec.py | 6 ++++-- src/ltcodecs/string_codecs.py | 4 ++-- src/ltcodecs/string_enum_codec.py | 6 +++++- src/ltcodecs/variable_len_array_codec.py | 11 +++++++---- src/ltcodecs/varint_codec.py | 7 ++++--- 18 files changed, 79 insertions(+), 50 deletions(-) diff --git a/src/ltcodecs/bool_codec.py b/src/ltcodecs/bool_codec.py index 7c9aabf..4cd9432 100644 --- a/src/ltcodecs/bool_codec.py +++ b/src/ltcodecs/bool_codec.py @@ -1,4 +1,5 @@ -from bitstring import BitArray, ConstBitStream +from __future__ import annotations +from bitstring import BitArray, ConstBitStream, Bits from .field_codec import FieldCodec @@ -6,10 +7,10 @@ class BoolCodec(FieldCodec): def __init__(self, **kwargs): pass - def encode(self, value: bool): + def encode(self, value: bool) -> tuple[Bits, bool, bool, str]: value = bool(value) value_bits = BitArray(bool=value) - return value_bits, value + return value_bits, value, False, "" def decode(self, bits_to_decode: ConstBitStream): value = bits_to_decode.read('bool') diff --git a/src/ltcodecs/bytes_codec.py b/src/ltcodecs/bytes_codec.py index 933e558..2e8447b 100644 --- a/src/ltcodecs/bytes_codec.py +++ b/src/ltcodecs/bytes_codec.py @@ -1,19 +1,23 @@ -from bitstring import BitArray, ConstBitStream +from __future__ import annotations +from bitstring import BitArray, ConstBitStream, Bits from .field_codec import FieldCodec from .varint_codec import VarintCodec class BytesCodec(FieldCodec): - def __init__(self, max_length: int, **kwargs): + def __init__(self, max_length: int, fail_on_overflow=False, **kwargs): self.max_length = max_length self.length_codec = VarintCodec(min_value=0, max_value=self.max_length) + self.fail_on_overflow = fail_on_overflow - def encode(self, value: bytes): + def encode(self, value: bytes) -> tuple[Bits, bytes, bool, str]: + if self.fail_on_overflow and len(value) > self.max_length: + return Bits(), bytes(), True value = value[0:self.max_length] length_bits, _ = self.length_codec.encode(len(value)) value_bits = BitArray(bytes=value) value_bits.prepend(length_bits) - return value_bits, value + return value_bits, value, False, "" def decode(self, bits_to_decode: ConstBitStream): num_bytes = self.length_codec.decode(bits_to_decode) diff --git a/src/ltcodecs/ccl_latlon_bcd_codec.py b/src/ltcodecs/ccl_latlon_bcd_codec.py index 75e345a..693cdf6 100644 --- a/src/ltcodecs/ccl_latlon_bcd_codec.py +++ b/src/ltcodecs/ccl_latlon_bcd_codec.py @@ -1,3 +1,4 @@ +from __future__ import annotations from bitstring import ConstBitStream, Bits from .field_codec import FieldCodec from math import modf, copysign @@ -9,7 +10,7 @@ class CclLatLonBcdCodec(FieldCodec): self.lat_not_lon = lat_not_lon pass - def encode(self, value: float): + def encode(self, value: float) -> tuple[Bits, float, bool, str]: sign = copysign(1, value) abs_value = abs(value) frac, degrees = modf(abs_value) @@ -26,7 +27,7 @@ class CclLatLonBcdCodec(FieldCodec): encoded_value = sign * (degrees + (dec_min / 10000) / 60) encoded_bits = Bits(hex=hex_string) - return encoded_bits, encoded_value + return encoded_bits, encoded_value, False, "" def decode(self, bits_to_decode: ConstBitStream): string_bytes = bits_to_decode.read('bytes:5') diff --git a/src/ltcodecs/ccl_latlon_codec.py b/src/ltcodecs/ccl_latlon_codec.py index 65b8e4b..ed272db 100644 --- a/src/ltcodecs/ccl_latlon_codec.py +++ b/src/ltcodecs/ccl_latlon_codec.py @@ -1,3 +1,4 @@ +from __future__ import annotations from bitstring import ConstBitStream, Bits from .field_codec import FieldCodec from math import ceil, log2 @@ -7,11 +8,11 @@ class CclLatLonCodec(FieldCodec): def __init__(self, **kwargs): pass - def encode(self, value: float): + def encode(self, value: float) -> tuple[Bits, float, bool, str]: encoded_value = int(value * ((2 ** 23 - 1) / 180.0)) encoded_bits = Bits(intle=encoded_value, length=24) - return encoded_bits, encoded_value + return encoded_bits, encoded_value, False, "" def decode(self, bits_to_decode: ConstBitStream): scale = bits_to_decode.read('intle:24') diff --git a/src/ltcodecs/field_codec.py b/src/ltcodecs/field_codec.py index 1cf463d..d52564a 100644 --- a/src/ltcodecs/field_codec.py +++ b/src/ltcodecs/field_codec.py @@ -1,23 +1,28 @@ +from __future__ import annotations from abc import ABC, abstractmethod +from bitstring import ConstBitStream, Bits +from typing import Any + class FieldCodec(ABC): def __init__(self, **kwargs): pass @abstractmethod - def encode(self): + def encode(self, value: Any) -> tuple[Bits, Any, bool, str]: + # Returns: encoded bits, "rounded" value, encoding failed, error message pass @abstractmethod - def decode(self): + def decode(self, bits_to_decode: ConstBitStream): pass @property @abstractmethod - def max_length_bits(self): + def max_length_bits(self) -> int: pass @property @abstractmethod - def min_length_bits(self): + def min_length_bits(self) -> int: pass \ No newline at end of file diff --git a/src/ltcodecs/fixed_len_array_codec.py b/src/ltcodecs/fixed_len_array_codec.py index 4edf127..6509fe2 100644 --- a/src/ltcodecs/fixed_len_array_codec.py +++ b/src/ltcodecs/fixed_len_array_codec.py @@ -1,4 +1,5 @@ -from bitstring import BitArray, ConstBitStream +from __future__ import annotations +from bitstring import BitArray, ConstBitStream, Bits from .field_codec import FieldCodec import ltcodecs as ltcodecs from typing import List @@ -12,15 +13,17 @@ class FixedLenArrayCodec(FieldCodec): else: self.element_field_codec = ltcodecs.field_codec_classes[element_type]() - def encode(self, value: List): + def encode(self, value: List) -> tuple[Bits, List, bool, str]: value = value[0:self.length] value_bits = BitArray() encoded_value_list = [] for element in value: - element_bits, element_value = self.element_field_codec.encode(element) + element_bits, element_value, encode_failed, error_msg = self.element_field_codec.encode(element) + if encode_failed: + return Bits(), list(), True, error_msg value_bits.append(element_bits) encoded_value_list.append(element_value) - return value_bits, encoded_value_list + return value_bits, encoded_value_list, False, "" def decode(self, bits_to_decode: ConstBitStream): decoded_list = [] diff --git a/src/ltcodecs/fixedint_codec.py b/src/ltcodecs/fixedint_codec.py index fa8b986..82c358d 100644 --- a/src/ltcodecs/fixedint_codec.py +++ b/src/ltcodecs/fixedint_codec.py @@ -1,7 +1,4 @@ -from bitstring import ConstBitStream, Bits -from .field_codec import FieldCodec from .varint_codec import VarintCodec -from math import ceil, log2 class FixedIntCodec(VarintCodec): @@ -21,8 +18,6 @@ class FixedIntCodec(VarintCodec): self.little_endian = little_endian - - class Int8Codec(FixedIntCodec): def __init__(self, **kwargs): super(Int8Codec, self).__init__(num_bits=8, signed=True) diff --git a/src/ltcodecs/float_codec.py b/src/ltcodecs/float_codec.py index 496b202..d42e308 100644 --- a/src/ltcodecs/float_codec.py +++ b/src/ltcodecs/float_codec.py @@ -1,3 +1,4 @@ +from __future__ import annotations from bitstring import ConstBitStream, Bits from .field_codec import FieldCodec from math import ceil, log2 @@ -13,7 +14,7 @@ class FloatCodec(FieldCodec): self.num_bits = ceil(log2(self.value_range)) #print("Float codec: precision {} num_bits {}".format(precision, self.num_bits)) - def encode(self, value: float): + def encode(self, value: float) -> tuple[Bits, float, bool, str]: value = float(value) if value < self.min_value: value = self.min_value @@ -25,7 +26,7 @@ class FloatCodec(FieldCodec): rounded_value = round(value - self.min_value, self.precision) + self.min_value # Used only for validation encoded_bits = Bits(uint=encoded_value, length=self.num_bits) - return encoded_bits, rounded_value + return encoded_bits, rounded_value, False, "" def decode(self, bits_to_decode: ConstBitStream): float_offset = bits_to_decode.read('uint:{}'.format(self.num_bits)) / 10 ** self.precision diff --git a/src/ltcodecs/ieee_float_codec.py b/src/ltcodecs/ieee_float_codec.py index ed5a949..bc887d5 100644 --- a/src/ltcodecs/ieee_float_codec.py +++ b/src/ltcodecs/ieee_float_codec.py @@ -1,6 +1,6 @@ +from __future__ import annotations from bitstring import ConstBitStream, Bits from .field_codec import FieldCodec -from math import ceil, log2 class IeeeFloatCodec(FieldCodec): @@ -9,12 +9,12 @@ class IeeeFloatCodec(FieldCodec): raise ValueError("Only 32 or 64 bit widths are supported for IEEE floating point codecs") self.num_bits = num_bits - def encode(self, value: float): + def encode(self, value: float) -> tuple[Bits, float, bool, str]: value = float(value) encoded_bits = Bits(float=value, length=self.num_bits) encoded_value = encoded_bits.float ## print("encode varint {} bits as {}".format(num_bits, encoded_bits)) - return encoded_bits, encoded_value + return encoded_bits, encoded_value, False, "" def decode(self, bits_to_decode: ConstBitStream): value = bits_to_decode.read('floatbe:{}'.format(self.num_bits)) diff --git a/src/ltcodecs/linspace_float_codec.py b/src/ltcodecs/linspace_float_codec.py index 791a86f..5b1df2e 100644 --- a/src/ltcodecs/linspace_float_codec.py +++ b/src/ltcodecs/linspace_float_codec.py @@ -1,3 +1,4 @@ +from __future__ import annotations from bitstring import ConstBitStream, Bits from .field_codec import FieldCodec from math import ceil, log2 @@ -30,7 +31,7 @@ class LinspaceFloatCodec(FieldCodec): # if the max value isn't an integer multiple of the resolution from the min value, it won't be encoded. self.num_bits = ceil(log2(self.num_values)) - def encode(self, value: float): + def encode(self, value: float) -> tuple[Bits, float, bool, str]: value = float(value) if value < self.min_value: value = self.min_value @@ -41,7 +42,7 @@ class LinspaceFloatCodec(FieldCodec): encoded_value = self.min_value + (discretized_offset * self.resolution) encoded_bits = Bits(uint=discretized_offset, length=self.num_bits) ## print("encode varint {} bits as {}".format(num_bits, encoded_bits)) - return encoded_bits, encoded_value + return encoded_bits, encoded_value, False, "" def decode(self, bits_to_decode: ConstBitStream): discretized_offset = bits_to_decode.read('uint:{}'.format(self.num_bits)) diff --git a/src/ltcodecs/padding_codec.py b/src/ltcodecs/padding_codec.py index 0acb607..6059758 100644 --- a/src/ltcodecs/padding_codec.py +++ b/src/ltcodecs/padding_codec.py @@ -1,3 +1,4 @@ +from __future__ import annotations from bitstring import BitArray, ConstBitStream from .field_codec import FieldCodec @@ -7,9 +8,9 @@ class PaddingCodec(FieldCodec): self.num_bits = int(num_bits) pass - def encode(self, value=None): + def encode(self, value=None) -> tuple[BitArray, None, bool, str]: value_bits = BitArray(uint=0, length=self.num_bits) - return value_bits, None + return value_bits, None, False, "" def decode(self, bits_to_decode: ConstBitStream): bits_to_decode.read('pad:{}'.format(self.num_bits)) diff --git a/src/ltcodecs/ros_message_codec.py b/src/ltcodecs/ros_message_codec.py index 87940c3..7b7aa6b 100644 --- a/src/ltcodecs/ros_message_codec.py +++ b/src/ltcodecs/ros_message_codec.py @@ -1,3 +1,5 @@ +from __future__ import annotations +from typing import Any import ltcodecs as ltcodecs import msgpack from crccheck.crc import Crc8SaeJ1850, Crc32Iscsi @@ -30,7 +32,7 @@ class RosMessageCodec(object): self.root_field_codec = ltcodecs.RosMsgFieldCodec(ros_type=ros_type, fields=fields_dict) - def encode(self, ros_msg): + def encode(self, ros_msg: rospy.AnyMsg) -> tuple[Bits, dict[Any]]: encoded_bits, encoded_dict = self.root_field_codec.encode(ros_msg) if self.checksum: diff --git a/src/ltcodecs/ros_msg_field_codec.py b/src/ltcodecs/ros_msg_field_codec.py index 19701c1..b7e4b02 100644 --- a/src/ltcodecs/ros_msg_field_codec.py +++ b/src/ltcodecs/ros_msg_field_codec.py @@ -1,9 +1,10 @@ +from __future__ import annotations import re from bitstring import ConstBitStream, BitArray from .field_codec import FieldCodec import ltcodecs as ltcodecs from rospy import AnyMsg -from typing import Union +from typing import Union, Any import operator import roslib.message @@ -77,7 +78,7 @@ class RosMsgFieldCodec(FieldCodec): field_params, e)) - def encode(self, message: AnyMsg, metadata=None): + def encode(self, message: AnyMsg, metadata=None) -> tuple[BitArray, dict[Any], bool, str]: # ROS messages use __slots__, so we can't use __dict__ or vars() to get the attributes as a dict message_dict = {} for field in message.__slots__: @@ -91,12 +92,15 @@ class RosMsgFieldCodec(FieldCodec): # Note that metadata encoding is done at the ros_msg_codec level, not here if not field_codec or isinstance(field_codec, str): continue - field_bits, encoded_dict[field_name] = field_codec.encode(message_dict[field_name]) + field_bits, encoded_dict[field_name], failed_to_encode, error_msg = field_codec.encode(message_dict[field_name]) + if failed_to_encode: + return BitArray(), dict(), True, error_msg encoded_bits.append(field_bits) except Exception as e: #print("Codec: {}, max len bits {}".format(field_codec, field_codec.max_length_bits)) - raise Exception('Error encoding field "{}" with codec {} (max len bits {})'.format(field_name, + error_msg = ('Error encoding field "{}" with codec {} (max len bits {})'.format(field_name, field_codec, field_codec.max_length_bits)).with_traceback(e.__traceback__) + return BitArray(), dict(), True, error_msg return encoded_bits, encoded_dict def decode(self, bits_to_decode: ConstBitStream, metadata=None): diff --git a/src/ltcodecs/rostime_codec.py b/src/ltcodecs/rostime_codec.py index 4c696d4..ef201ba 100644 --- a/src/ltcodecs/rostime_codec.py +++ b/src/ltcodecs/rostime_codec.py @@ -1,3 +1,4 @@ +from __future__ import annotations from bitstring import ConstBitStream, Bits from .field_codec import FieldCodec from math import ceil, log2 @@ -14,7 +15,7 @@ class RosTimeCodec(FieldCodec): self.num_bits = ceil(log2(self.value_range)) #print("Float codec: precision {} num_bits {}".format(precision, self.num_bits)) - def encode(self, value: Time): + def encode(self, value: Time) -> tuple[Bits, Time, bool, str]: value = value.to_sec() if value < self.min_value: value = self.min_value @@ -24,9 +25,10 @@ class RosTimeCodec(FieldCodec): encoded_value = int(round(value - self.min_value, self.precision) * 10 ** self.precision) #print(value, self.min_value, self.precision) rounded_value = round(value - self.min_value, self.precision) + self.min_value # Used only for validation + rounded_value = Time.from_sec(rounded_value) encoded_bits = Bits(uint=encoded_value, length=self.num_bits) - return encoded_bits, rounded_value + return encoded_bits, rounded_value, False, "" def decode(self, bits_to_decode: ConstBitStream): float_offset = bits_to_decode.read('uint:{}'.format(self.num_bits)) / 10 ** self.precision diff --git a/src/ltcodecs/string_codecs.py b/src/ltcodecs/string_codecs.py index 0f25c85..749ea4b 100644 --- a/src/ltcodecs/string_codecs.py +++ b/src/ltcodecs/string_codecs.py @@ -34,7 +34,7 @@ class AsciiStringCodec(FieldCodec): self.tail = tail self.string_len_codec = VarintCodec(min_value=0, max_value=max_length) - def encode(self, value: str): + def encode(self, value: str) -> tuple[Bits, str, bool, str]: if not self.tail: value = value[0:self.max_length] else: @@ -64,7 +64,7 @@ class AsciiStringCodec(FieldCodec): compressed_bytes = string_bytes compressed_string = compressed_bytes.decode('ascii') - return encoded_bits, compressed_string + return encoded_bits, compressed_string, False, "" def decode(self, encoded_bits: ConstBitStream): num_chars = self.string_len_codec.decode(encoded_bits) diff --git a/src/ltcodecs/string_enum_codec.py b/src/ltcodecs/string_enum_codec.py index c71cd00..1dc7e60 100644 --- a/src/ltcodecs/string_enum_codec.py +++ b/src/ltcodecs/string_enum_codec.py @@ -29,6 +29,8 @@ class StringEnumCodec(FieldCodec): if self.strip: value = value.strip() + encode_failed = False + error_msg = "" if value in self.entries: index = self.entries.index(value) compressed_value = self.entries[index] @@ -39,10 +41,12 @@ class StringEnumCodec(FieldCodec): else: index = 0 compressed_value = self.entries[index] + encode_failed = True + error_msg = f"Failed to encode unknown string {value}, not in {self.entries}" encoded_index, _ = self.string_index_codec.encode(index) - return encoded_index, compressed_value + return encoded_index, compressed_value, encode_failed, error_msg def decode(self, encoded_bits: ConstBitStream): index = self.string_index_codec.decode(encoded_bits) diff --git a/src/ltcodecs/variable_len_array_codec.py b/src/ltcodecs/variable_len_array_codec.py index 687bd1f..0cf62f4 100644 --- a/src/ltcodecs/variable_len_array_codec.py +++ b/src/ltcodecs/variable_len_array_codec.py @@ -1,8 +1,9 @@ +from __future__ import annotations +from typing import Any from bitstring import BitArray, ConstBitStream from .field_codec import FieldCodec from .varint_codec import VarintCodec import ltcodecs as ltcodecs -from typing import List class VariableLenArrayCodec(FieldCodec): @@ -15,16 +16,18 @@ class VariableLenArrayCodec(FieldCodec): else: self.element_field_codec = ltcodecs.field_codec_classes[element_type]() - def encode(self, value: List): + def encode(self, value: list) -> tuple[BitArray, list[Any], bool, str]: value = value[0:self.max_length] length_bits, _ = self.length_codec.encode(len(value)) value_bits = BitArray(length_bits) encoded_value_list = [] for element in value: - element_bits, element_value = self.element_field_codec.encode(element) + element_bits, element_value, failed_to_encode, error_msg = self.element_field_codec.encode(element) + if failed_to_encode: + return BitArray(), [], True, error_msg value_bits.append(element_bits) encoded_value_list.append(element_value) - return value_bits, encoded_value_list + return value_bits, encoded_value_list, False, "" def decode(self, bits_to_decode: ConstBitStream): num_elements = self.length_codec.decode(bits_to_decode) diff --git a/src/ltcodecs/varint_codec.py b/src/ltcodecs/varint_codec.py index bf763a5..adf4410 100644 --- a/src/ltcodecs/varint_codec.py +++ b/src/ltcodecs/varint_codec.py @@ -1,3 +1,4 @@ +from __future__ import annotations from bitstring import ConstBitStream, Bits from .field_codec import FieldCodec from math import ceil, log2 @@ -14,7 +15,7 @@ class VarintCodec(FieldCodec): self.num_bits = ceil(log2(num_values)) self.little_endian = little_endian - def encode(self, value: int): + def encode(self, value: int) -> tuple[Bits, int, bool, str]: value = int(value) if value < self.min_value: value = self.min_value @@ -28,9 +29,9 @@ class VarintCodec(FieldCodec): else: encoded_bits = Bits(uint=discretized_offset, length=self.num_bits) ## print("encode varint {} bits as {}".format(num_bits, encoded_bits)) - return encoded_bits, encoded_value + return encoded_bits, encoded_value, False, "" - def decode(self, bits_to_decode: ConstBitStream): + def decode(self, bits_to_decode: ConstBitStream) -> int: if self.little_endian: discretized_offset = bits_to_decode.read('uintle:{}'.format(self.num_bits)) else: -- GitLab From 5eae4e51ead7dc641f60e0c66680844f72bd24d7 Mon Sep 17 00:00:00 2001 From: Brennan <brennanmk2200@gmail.com> Date: Mon, 5 Jun 2023 17:24:51 -0400 Subject: [PATCH 2/3] Added exception, updated to pep8 standards --- .gitignore | 236 ++++++++++++- codecTest.py | 364 --------------------- setup.cfg | 2 +- src/ltcodecs.egg-info/SOURCES.txt | 29 -- src/ltcodecs.egg-info/dependency_links.txt | 1 - src/ltcodecs.egg-info/top_level.txt | 1 - src/ltcodecs/__init__.py | 82 ++--- src/ltcodecs/bool_codec.py | 40 ++- src/ltcodecs/bytes_codec.py | 38 ++- src/ltcodecs/ccl_latlon_bcd_codec.py | 31 +- src/ltcodecs/ccl_latlon_codec.py | 35 +- src/ltcodecs/exceptions.py | 16 + src/ltcodecs/field_codec.py | 27 +- src/ltcodecs/fixed_len_array_codec.py | 53 ++- src/ltcodecs/fixedint_codec.py | 58 +++- src/ltcodecs/float_codec.py | 53 ++- src/ltcodecs/ieee_float_codec.py | 44 ++- src/ltcodecs/linspace_float_codec.py | 62 +++- src/ltcodecs/padding_codec.py | 31 +- src/ltcodecs/ros_message_codec.py | 67 ++-- src/ltcodecs/ros_msg_field_codec.py | 130 +++++--- src/ltcodecs/rostime_codec.py | 64 +++- src/ltcodecs/string_codecs.py | 94 ++++-- src/ltcodecs/string_enum_codec.py | 53 ++- src/ltcodecs/variable_len_array_codec.py | 53 ++- src/ltcodecs/varint_codec.py | 57 +++- test_codecs.py | 349 ++++++++++++++++++++ 27 files changed, 1377 insertions(+), 693 deletions(-) delete mode 100755 codecTest.py delete mode 100644 src/ltcodecs.egg-info/SOURCES.txt delete mode 100644 src/ltcodecs.egg-info/dependency_links.txt delete mode 100644 src/ltcodecs.egg-info/top_level.txt create mode 100644 src/ltcodecs/exceptions.py create mode 100755 test_codecs.py diff --git a/.gitignore b/.gitignore index 002b3ba..1ffa0c7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,18 +1,218 @@ -src/field_codecs/__pycache__/varint_codec.cpython-38.pyc -src/field_codecs/__pycache__/variable_len_array_codec.cpython-38.pyc -src/field_codecs/__pycache__/string_codecs.cpython-38.pyc -src/field_codecs/__pycache__/rostime_codec.cpython-38.pyc -src/field_codecs/__pycache__/ros_msg_field_codec.cpython-38.pyc -src/field_codecs/__pycache__/ros_message_codec.cpython-38.pyc -src/field_codecs/__pycache__/padding_codec.cpython-38.pyc -src/field_codecs/__pycache__/linspace_float_codec.cpython-38.pyc -src/field_codecs/__pycache__/ieee_float_codec.cpython-38.pyc -src/field_codecs/__pycache__/float_codec.cpython-38.pyc -src/field_codecs/__pycache__/fixedint_codec.cpython-38.pyc -src/field_codecs/__pycache__/fixed_len_array_codec.cpython-38.pyc -src/field_codecs/__pycache__/field_codec.cpython-38.pyc -src/field_codecs/__pycache__/ccl_latlon_codec.cpython-38.pyc -src/field_codecs/__pycache__/ccl_latlon_bcd_codec.cpython-38.pyc -src/field_codecs/__pycache__/bytes_codec.cpython-38.pyc -src/field_codecs/__pycache__/bool_codec.cpython-38.pyc -src/field_codecs/__pycache__/__init__.cpython-38.pyc + +# Created by https://www.toptal.com/developers/gitignore/api/visualstudiocode,python,emacs +# Edit at https://www.toptal.com/developers/gitignore?templates=visualstudiocode,python,emacs + +### Emacs ### +# -*- mode: gitignore; -*- +*~ +\#*\# +/.emacs.desktop +/.emacs.desktop.lock +*.elc +auto-save-list +tramp +.\#* + +# Org-mode +.org-id-locations +*_archive + +# flymake-mode +*_flymake.* + +# eshell files +/eshell/history +/eshell/lastdir + +# elpa packages +/elpa/ + +# reftex files +*.rel + +# AUCTeX auto folder +/auto/ + +# cask packages +.cask/ +dist/ + +# Flycheck +flycheck_*.el + +# server auth directory +/server/ + +# projectiles files +.projectile + +# directory configuration +.dir-locals.el + +# network security +/network-security.data + + +### Python ### +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +### VisualStudioCode ### +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +*.code-workspace + +# Local History for Visual Studio Code +.history/ + +### VisualStudioCode Patch ### +# Ignore all local history of files +.history +.ionide + +# End of https://www.toptal.com/developers/gitignore/api/visualstudiocode,python,emacs +.vscode/settings.json +src/test.txt +launch/.vscode/settings.json +src/commandSender.py +src/database/databases/hostCommands.db +src/database/databases/remoteCommands.db +src/uploadedFiles/.gitkeep diff --git a/codecTest.py b/codecTest.py deleted file mode 100755 index 6911193..0000000 --- a/codecTest.py +++ /dev/null @@ -1,364 +0,0 @@ -#!/usr/bin/env python3 - -import unittest -from bitstring import ConstBitStream -import ltcodecs -import math -import time -import subprocess - -try: - import rospy - from acomms_tests.msg import testMsg -except: - pass - - - -class codecTest(unittest.TestCase): - def testBoolCodec(self): - '''test functionality of the boolean codec''' - - entryBool = True - codec = ltcodecs.BoolCodec() - encodedBool = codec.encode(entryBool) - - bitStream = ConstBitStream(encodedBool[0]) - - decodedBool = codec.decode(bitStream) - self.assertEqual(decodedBool, entryBool) - - def testStringCodec(self): - '''test functionality of the string codec''' - - entryString = "This is a test string" - codec = ltcodecs.string_codecs.AsciiStringCodec() - encodedStr = codec.encode(entryString) - - bitStream = ConstBitStream(encodedStr[0]) - - decodedStr = codec.decode(bitStream) - self.assertEqual(decodedStr, entryString) - - def testBytesCodec(self): - '''test functionality of the bytes codec''' - - entryString = "This is a test string" - entryBytes = entryString.encode() # encode entryString to bytes - - codec = ltcodecs.bytes_codec.BytesCodec(100) - encodedBytes = codec.encode(entryBytes) - - bitStream = ConstBitStream(encodedBytes[0]) - - decodedBytes = codec.decode(bitStream).decode() - self.assertEqual(decodedBytes, entryString) - - def testCclLatLonBcdCodec(self): - '''test functionality of the CclLatLonBcd codec''' - - entryCoord = 41.5 - - codec = ltcodecs.ccl_latlon_bcd_codec.CclLatLonBcdCodec(entryCoord) - encodedCoord = codec.encode(entryCoord) - - bitStream = ConstBitStream(encodedCoord[0]) - - decodedCoord = codec.decode(bitStream) - - self.assertEqual(math.ceil(decodedCoord), math.ceil(entryCoord)) - - def testCclLatLonCodec(self): - '''test functionality of the CclLatLon codec''' - - entryCoord = 41.5 - - codec = ltcodecs.ccl_latlon_codec.CclLatLonCodec() - encodedCoord = codec.encode(entryCoord) - - bitStream = ConstBitStream(encodedCoord[0]) - - decodedCoord = codec.decode(bitStream) - - self.assertAlmostEqual(math.ceil(decodedCoord), math.ceil(entryCoord)) - - def testInt8Codec(self): - '''test functionality of the signed integer 8 codec''' - - entryInt = -21 - - codec = ltcodecs.FixedIntCodec(8, True) - encodedInt = codec.encode(entryInt) - - bitStream = ConstBitStream(encodedInt[0]) - - decodedInt = codec.decode(bitStream) - - self.assertAlmostEqual(decodedInt, entryInt) - - def testInt16Codec(self): - '''test functionality of the signed integer 16 codec''' - - entryInt = -21 - - codec = ltcodecs.FixedIntCodec(16, True) - encodedInt = codec.encode(entryInt) - - bitStream = ConstBitStream(encodedInt[0]) - - decodedInt = codec.decode(bitStream) - - self.assertAlmostEqual(decodedInt, entryInt) - - def testInt32Codec(self): - '''test functionality of the signed integer 32 codec''' - - entryInt = 1 - - codec = ltcodecs.FixedIntCodec(32, True) - encodedInt = codec.encode(entryInt) - - bitStream = ConstBitStream(encodedInt[0]) - - decodedInt = codec.decode(bitStream) - - self.assertAlmostEqual(decodedInt, entryInt) - - def testInt64Codec(self): - '''test functionality of the signed integer 64 codec''' - - entryInt = 1 - - codec = ltcodecs.FixedIntCodec(64, True) - encodedInt = codec.encode(entryInt) - - bitStream = ConstBitStream(encodedInt[0]) - - decodedInt = codec.decode(bitStream) - - self.assertAlmostEqual(decodedInt, entryInt) - - def testUInt8Codec(self): - '''test functionality of the unsigned integer 8 codec''' - - entryInt = 21 - - codec = ltcodecs.FixedIntCodec(8) - encodedInt = codec.encode(entryInt) - - bitStream = ConstBitStream(encodedInt[0]) - - decodedInt = codec.decode(bitStream) - - self.assertAlmostEqual(decodedInt, entryInt) - - def testUInt16Codec(self): - '''test functionality of the unsigned integer 16 codec''' - - entryInt = 21 - - codec = ltcodecs.FixedIntCodec(16) - encodedInt = codec.encode(entryInt) - - bitStream = ConstBitStream(encodedInt[0]) - - decodedInt = codec.decode(bitStream) - - self.assertAlmostEqual(decodedInt, entryInt) - - def testUInt32Codec(self): - '''test functionality of the unsigned integer 32 codec''' - - entryInt = 21 - - codec = ltcodecs.FixedIntCodec(32) - encodedInt = codec.encode(entryInt) - - bitStream = ConstBitStream(encodedInt[0]) - - decodedInt = codec.decode(bitStream) - - self.assertAlmostEqual(decodedInt, entryInt) - - def testUInt64Codec(self): - '''test functionality of the unsigned integer 64 codec''' - - entryInt = 21 - - codec = ltcodecs.FixedIntCodec(64) - encodedInt = codec.encode(entryInt) - - bitStream = ConstBitStream(encodedInt[0]) - - decodedInt = codec.decode(bitStream) - - self.assertAlmostEqual(decodedInt, entryInt) - - def testFloatCodec(self): - '''test functionality of the Float codec''' - - entryFloat = 21.53 - - codec = ltcodecs.float_codec.FloatCodec(0, 100, 2) - encodedFloat = codec.encode(entryFloat) - - bitStream = ConstBitStream(encodedFloat[0]) - - decodedFloat = codec.decode(bitStream) - - self.assertAlmostEqual(decodedFloat, entryFloat) - - def testIeeeFloatCodec(self): - '''test functionality of the IEEE Float codec''' - - entryFloat = 21.23 - - codec = ltcodecs.ieee_float_codec.IeeeFloatCodec() - encodedFloat = codec.encode(entryFloat) - - bitStream = ConstBitStream(encodedFloat[0]) - - decodedFloat = codec.decode(bitStream) - - self.assertAlmostEqual(round(decodedFloat, 2), round(entryFloat, 2)) - - def testLinspaceFloatCodec(self): - '''test functionality of the Linspace Float codec''' - - entryFloat = 21.23 - - codec = ltcodecs.linspace_float_codec.LinspaceFloatCodec( - 0, 100, num_bits=32) - encodedFloat = codec.encode(entryFloat) - - bitStream = ConstBitStream(encodedFloat[0]) - - decodedFloat = codec.decode(bitStream) - - self.assertAlmostEqual(round(decodedFloat, 2), round(entryFloat, 2)) - - def testVarintCodec(self): - '''test functionality of the Varint codec''' - - entryInt = 21 - - codec = ltcodecs.VarintCodec(0, 100) - encodedInt = codec.encode(entryInt) - - bitStream = ConstBitStream(encodedInt[0]) - - decodedInt = codec.decode(bitStream) - - self.assertAlmostEqual(decodedInt, entryInt) - - def testFixedLenArrayCodec(self): - '''test functionality of the fixed length array codec''' - - entryArray = [1, 2, 3, 4] - - codec = ltcodecs.VariableLenArrayCodec("integer", 4, element_params={ - 'min_value': 0, 'max_value': 100}) - encodedInt = codec.encode(entryArray) - - bitStream = ConstBitStream(encodedInt[0]) - - decodedInt = codec.decode(bitStream) - - self.assertAlmostEqual(decodedInt, entryArray) - - def testVariableLenArrayCodec(self): - '''test functionality of the variable length array codec''' - - entryArray = [1, 2, 3, 4] - - codec = ltcodecs.VariableLenArrayCodec("integer", 4, element_params={ - 'min_value': 0, 'max_value': 100}) - encodedInt = codec.encode(entryArray) - - bitStream = ConstBitStream(encodedInt[0]) - - decodedInt = codec.decode(bitStream) - - self.assertAlmostEqual(decodedInt, entryArray) - - def testRosMessageCodec(self): - '''test functionality of the ros message codec''' - - try: - - entryMsg = testMsg() - entryMsg.val1 = 2 - entryMsg.val2 = 4 - - codec = ltcodecs.RosMessageCodec("acomms_tests/testMsg") - encodedMsg = codec.encode(entryMsg) - - bitStream = ConstBitStream(encodedMsg[0]) - - decodedMsg = codec.decode(bitStream) - - self.assertAlmostEqual(decodedMsg, entryMsg) - except Exception as e: - self.assertTrue(1 == 0, "Could not find ros package") - - def testRosMessageFieldCodec(self): - '''test functionality of the ros message field codec''' - try: - - entryMsg = testMsg() - entryMsg.val1 = 2 - entryMsg.val2 = 4 - - codec = ltcodecs.RosMsgFieldCodec("acomms_tests/testMsg") - encodedMsg = codec.encode(entryMsg) - - bitStream = ConstBitStream(encodedMsg[0]) - - decodedMsg = codec.decode(bitStream) - - self.assertAlmostEqual(decodedMsg, entryMsg) - except Exception as e: - self.assertTrue(1 == 0, "Could not find ros package") - - def testRosTimeCodec(self): - '''test functionality of the ros time field codec''' - - try: - - entryTime = rospy.Time.from_sec(time.time()) - - codec = ltcodecs.rostime_codec.RosTimeCodec(precision=20) - encodedTime = codec.encode(entryTime) - - bitStream = ConstBitStream(encodedTime[0]) - - decodedMsg = codec.decode(bitStream) - - self.assertEqual(decodedMsg, entryTime) - except Exception as e: - self.assertTrue(1 == 0, "Could not find ros package") - - def test_string_enum_codec(self): - '''test functionality of the StringEnum field codec''' - - try: - test_string = 'test2' - codec = ltcodecs.string_enum_codec.StringEnumCodec(entries=['Test1', 'test2', 'other strings here']) - encoded = codec.encode('test2') - bitstream = ConstBitStream(encoded[0]) - decoded = codec.decode(bitstream) - - self.assertEqual(test_string, decoded) - except Exception as e: - self.assertTrue(1 == 0, f"StringEnum codec fail: decoded {decoded} != {encoded}") - - -if __name__ == "__main__": - try: # check to make sure that ros is installed - subprocess.run( - ['roscore'], capture_output=True, timeout=2, text=True) - - rospy.init_node("temp_node") - except: - pass - unittest.main() diff --git a/setup.cfg b/setup.cfg index 2a5ce43..9eeccc4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = ltcodecs -version = 0.2.0 +version = 1.0.0 author = whoi author_email = egallimore@whoi.edu description = LT Codecs diff --git a/src/ltcodecs.egg-info/SOURCES.txt b/src/ltcodecs.egg-info/SOURCES.txt deleted file mode 100644 index 3220c4a..0000000 --- a/src/ltcodecs.egg-info/SOURCES.txt +++ /dev/null @@ -1,29 +0,0 @@ -LICENSE -README.md -pyproject.toml -setup.cfg -msg/EncodedAck.msg -msg/Packet.msg -msg/ReceivedPacket.msg -src/field_codecs/__init__.py -src/field_codecs/bool_codec.py -src/field_codecs/bytes_codec.py -src/field_codecs/ccl_latlon_bcd_codec.py -src/field_codecs/ccl_latlon_codec.py -src/field_codecs/field_codec.py -src/field_codecs/fixed_len_array_codec.py -src/field_codecs/fixedint_codec.py -src/field_codecs/float_codec.py -src/field_codecs/ieee_float_codec.py -src/field_codecs/linspace_float_codec.py -src/field_codecs/padding_codec.py -src/field_codecs/ros_message_codec.py -src/field_codecs/ros_msg_field_codec.py -src/field_codecs/rostime_codec.py -src/field_codecs/string_codecs.py -src/field_codecs/variable_len_array_codec.py -src/field_codecs/varint_codec.py -src/ltcodecs.egg-info/PKG-INFO -src/ltcodecs.egg-info/SOURCES.txt -src/ltcodecs.egg-info/dependency_links.txt -src/ltcodecs.egg-info/top_level.txt \ No newline at end of file diff --git a/src/ltcodecs.egg-info/dependency_links.txt b/src/ltcodecs.egg-info/dependency_links.txt deleted file mode 100644 index 8b13789..0000000 --- a/src/ltcodecs.egg-info/dependency_links.txt +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/ltcodecs.egg-info/top_level.txt b/src/ltcodecs.egg-info/top_level.txt deleted file mode 100644 index 89e8e3f..0000000 --- a/src/ltcodecs.egg-info/top_level.txt +++ /dev/null @@ -1 +0,0 @@ -field_codecs diff --git a/src/ltcodecs/__init__.py b/src/ltcodecs/__init__.py index 541f127..f750c2a 100644 --- a/src/ltcodecs/__init__.py +++ b/src/ltcodecs/__init__.py @@ -17,47 +17,49 @@ from .ccl_latlon_bcd_codec import CclLatLonBcdCodec from .padding_codec import PaddingCodec from .rostime_codec import RosTimeCodec from .ros_message_codec import RosMessageCodec +from .exceptions import EncodingFailed +field_codec_classes = { + "integer": VarintCodec, + "fixedint": FixedIntCodec, + "varint": VarintCodec, + "float": FloatCodec, + "linspace_float": LinspaceFloatCodec, + "linspace": LinspaceFloatCodec, + "uint8": UInt8Codec, + "uint16": UInt16Codec, + "uint32": UInt32Codec, + "uint64": UInt64Codec, + "int8": Int8Codec, + "int16": Int16Codec, + "int32": Int32Codec, + "int64": Int64Codec, + "float32": IeeeFloat32Codec, + "float64": IeeeFloat64Codec, + "bool": BoolCodec, + "string": AsciiStringCodec, + "ascii": AsciiStringCodec, + "bytes": BytesCodec, + "msg": RosMsgFieldCodec, + "ros_msg": RosMsgFieldCodec, + "variable_len_array": VariableLenArrayCodec, + "fixed_len_array": FixedLenArrayCodec, + "ccl_latlon": CclLatLonCodec, + "ccl_latlon_bcd": CclLatLonBcdCodec, + "pad": PaddingCodec, + "padding": PaddingCodec, + "time": RosTimeCodec, + "rostime": RosTimeCodec, + "string_enum": StringEnumCodec, +} -field_codec_classes = {'integer': VarintCodec, - 'fixedint': FixedIntCodec, - 'varint': VarintCodec, - 'float': FloatCodec, - 'linspace_float': LinspaceFloatCodec, - 'linspace': LinspaceFloatCodec, - 'uint8': UInt8Codec, - 'uint16': UInt16Codec, - 'uint32': UInt32Codec, - 'uint64': UInt64Codec, - 'int8': Int8Codec, - 'int16': Int16Codec, - 'int32': Int32Codec, - 'int64': Int64Codec, - 'float32': IeeeFloat32Codec, - 'float64': IeeeFloat64Codec, - 'bool': BoolCodec, - 'string': AsciiStringCodec, - 'ascii': AsciiStringCodec, - 'bytes': BytesCodec, - 'msg': RosMsgFieldCodec, - 'ros_msg': RosMsgFieldCodec, - 'variable_len_array': VariableLenArrayCodec, - 'fixed_len_array': FixedLenArrayCodec, - 'ccl_latlon': CclLatLonCodec, - 'ccl_latlon_bcd': CclLatLonBcdCodec, - 'pad': PaddingCodec, - 'padding': PaddingCodec, - 'time': RosTimeCodec, - 'rostime': RosTimeCodec, - 'string_enum': StringEnumCodec, - } +metadata_decoders = { + "src": "packet.src", + "dest": "packet.dest", + "dest_decoder": "packet.dest", + "toa": "cst.toa", + "snr_in": "cst.snr_in", +} -metadata_decoders = {'src': 'packet.src', - 'dest': 'packet.dest', - 'dest_decoder': 'packet.dest', - 'toa': 'cst.toa', - 'snr_in': 'cst.snr_in'} - - -metadata_encoders = {'dest': 'packet.dest'} +metadata_encoders = {"dest": "packet.dest"} diff --git a/src/ltcodecs/bool_codec.py b/src/ltcodecs/bool_codec.py index 4cd9432..d5080d2 100644 --- a/src/ltcodecs/bool_codec.py +++ b/src/ltcodecs/bool_codec.py @@ -1,23 +1,47 @@ +""" +ltcodecs.bool_codec +--------------------- + +This module contains the BoolCodec class, which is used to encode and decode bools. +""" + from __future__ import annotations from bitstring import BitArray, ConstBitStream, Bits from .field_codec import FieldCodec class BoolCodec(FieldCodec): - def __init__(self, **kwargs): + """ + codec for bools + """ + + def __init__(self, **kwargs) -> None: pass - def encode(self, value: bool) -> tuple[Bits, bool, bool, str]: + def encode(self, value: bool) -> tuple[Bits, bool]: + """ + encode a bool + + :param value: the bool to encode + """ value = bool(value) value_bits = BitArray(bool=value) - return value_bits, value, False, "" + return value_bits, value - def decode(self, bits_to_decode: ConstBitStream): - value = bits_to_decode.read('bool') + def decode(self, bits_to_decode: ConstBitStream) -> bool: + """ + decode a bool + + :param bits_to_decode: the bits to decode + """ + + value = bits_to_decode.read("bool") return value - def min_length_bits(self): + @property + def min_length_bits(self) -> int: return 1 - def max_length_bits(self): - return 1 \ No newline at end of file + @property + def max_length_bits(self) -> int: + return 1 diff --git a/src/ltcodecs/bytes_codec.py b/src/ltcodecs/bytes_codec.py index 2e8447b..0f5cb72 100644 --- a/src/ltcodecs/bytes_codec.py +++ b/src/ltcodecs/bytes_codec.py @@ -1,31 +1,55 @@ +""" +ltcodecs.bytes_codec +--------------------- + +This module contains the BytesCodec class, which is used to encode and decode bytes. +""" + from __future__ import annotations from bitstring import BitArray, ConstBitStream, Bits from .field_codec import FieldCodec from .varint_codec import VarintCodec +from .exceptions import EncodingFailed class BytesCodec(FieldCodec): - def __init__(self, max_length: int, fail_on_overflow=False, **kwargs): + """ + codec for bytes + """ + + def __init__(self, max_length: int, fail_on_overflow=False, **kwargs) -> None: self.max_length = max_length self.length_codec = VarintCodec(min_value=0, max_value=self.max_length) self.fail_on_overflow = fail_on_overflow - def encode(self, value: bytes) -> tuple[Bits, bytes, bool, str]: + def encode(self, value: bytes) -> tuple[Bits, bytes]: + """ + encode bytes + + :param value: the bytes to encode + """ if self.fail_on_overflow and len(value) > self.max_length: - return Bits(), bytes(), True - value = value[0:self.max_length] + raise EncodingFailed("BytesCodec: value is too long to encode") + value = value[0 : self.max_length] length_bits, _ = self.length_codec.encode(len(value)) value_bits = BitArray(bytes=value) value_bits.prepend(length_bits) - return value_bits, value, False, "" + return value_bits, value def decode(self, bits_to_decode: ConstBitStream): + """ + decode bytes + + :param bits_to_decode: the bits to decode + """ num_bytes = self.length_codec.decode(bits_to_decode) - value = bits_to_decode.read('bytes:' + str(num_bytes)) + value = bits_to_decode.read("bytes:" + str(num_bytes)) return value + @property def min_length_bits(self): return self.length_codec.max_length_bits + @property def max_length_bits(self): - return self.length_codec.max_length_bits + (8 * self.max_length) \ No newline at end of file + return self.length_codec.max_length_bits + (8 * self.max_length) diff --git a/src/ltcodecs/ccl_latlon_bcd_codec.py b/src/ltcodecs/ccl_latlon_bcd_codec.py index 693cdf6..8687c31 100644 --- a/src/ltcodecs/ccl_latlon_bcd_codec.py +++ b/src/ltcodecs/ccl_latlon_bcd_codec.py @@ -1,3 +1,10 @@ +""" +ltcodecs.ccl_latlon_bcd_codec +----------------------------- + +This module contains the the ccl latlon bcd codec +""" + from __future__ import annotations from bitstring import ConstBitStream, Bits from .field_codec import FieldCodec @@ -5,12 +12,12 @@ from math import modf, copysign class CclLatLonBcdCodec(FieldCodec): - ''' This codec is horrifically inefficient, but included for compatibility with messages like MDAT_RANGER ''' + """This codec is horrifically inefficient, but included for compatibility with messages like MDAT_RANGER""" + def __init__(self, lat_not_lon=True, **kwargs): self.lat_not_lon = lat_not_lon - pass - def encode(self, value: float) -> tuple[Bits, float, bool, str]: + def encode(self, value: float) -> tuple[Bits, float]: sign = copysign(1, value) abs_value = abs(value) frac, degrees = modf(abs_value) @@ -19,23 +26,23 @@ class CclLatLonBcdCodec(FieldCodec): dec_min = int(minutes * 10000) if self.lat_not_lon: - sign_char = 'a' if sign > 0 else 'c' + sign_char = "a" if sign > 0 else "c" else: - sign_char = 'b' if sign > 0 else 'd' + sign_char = "b" if sign > 0 else "d" - hex_string = "{:03d}{}{:06d}".format(degrees, sign_char, dec_min) + hex_string = f"{degrees:03d}{sign_char}{dec_min:06d}" encoded_value = sign * (degrees + (dec_min / 10000) / 60) encoded_bits = Bits(hex=hex_string) - return encoded_bits, encoded_value, False, "" + return encoded_bits, encoded_value def decode(self, bits_to_decode: ConstBitStream): - string_bytes = bits_to_decode.read('bytes:5') - hex_string = ''.join('{:02x}'.format(x) for x in string_bytes) + string_bytes = bits_to_decode.read("bytes:5") + hex_string = "".join(f"{x:02x}" for x in string_bytes) degrees = int(hex_string[0:3]) - direction = 1 if hex_string[3] in ('a', 'b') else -1 - minutes = float(hex_string[4:10]) / 10000. + direction = 1 if hex_string[3] in ("a", "b") else -1 + minutes = float(hex_string[4:10]) / 10000.0 value = direction * (degrees + (minutes / 60)) return value @@ -46,4 +53,4 @@ class CclLatLonBcdCodec(FieldCodec): @property def min_length_bits(self): - return 40 \ No newline at end of file + return 40 diff --git a/src/ltcodecs/ccl_latlon_codec.py b/src/ltcodecs/ccl_latlon_codec.py index ed272db..34eada2 100644 --- a/src/ltcodecs/ccl_latlon_codec.py +++ b/src/ltcodecs/ccl_latlon_codec.py @@ -1,3 +1,10 @@ +""" +ltcodecs.ccl_latlon_codec +------------------------- + +ccl_latlon_codec encodes and decodes CCL latitude and longitude values +""" + from __future__ import annotations from bitstring import ConstBitStream, Bits from .field_codec import FieldCodec @@ -5,17 +12,33 @@ from math import ceil, log2 class CclLatLonCodec(FieldCodec): - def __init__(self, **kwargs): + """ + codec for CCL latitude and longitude values + """ + + def __init__(self, **kwargs) -> None: pass - def encode(self, value: float) -> tuple[Bits, float, bool, str]: - encoded_value = int(value * ((2 ** 23 - 1) / 180.0)) + def encode(self, value: float) -> tuple[Bits, float]: + """ + encode value + + :param value: the value to encode + """ + + encoded_value = int(value * ((2**23 - 1) / 180.0)) encoded_bits = Bits(intle=encoded_value, length=24) - return encoded_bits, encoded_value, False, "" + return encoded_bits, encoded_value def decode(self, bits_to_decode: ConstBitStream): - scale = bits_to_decode.read('intle:24') + """ + decode value + + :param bits_to_decode: the bitstream to decode + """ + + scale = bits_to_decode.read("intle:24") value = scale * (180.0 / (2**23 - 1)) return value @@ -25,4 +48,4 @@ class CclLatLonCodec(FieldCodec): @property def min_length_bits(self): - return 24 \ No newline at end of file + return 24 diff --git a/src/ltcodecs/exceptions.py b/src/ltcodecs/exceptions.py new file mode 100644 index 0000000..4043e3c --- /dev/null +++ b/src/ltcodecs/exceptions.py @@ -0,0 +1,16 @@ +""" +ltcodecs.exceptions +------------------- + +This module contains custom exceptions for ltcodecs +""" + + +class EncodingFailed(Exception): + """ + Raised when encoding fails + """ + + def __init__(self, message: str = "Encoding failed") -> None: + self.message = message + super().__init__(self.message) diff --git a/src/ltcodecs/field_codec.py b/src/ltcodecs/field_codec.py index d52564a..83aa56b 100644 --- a/src/ltcodecs/field_codec.py +++ b/src/ltcodecs/field_codec.py @@ -1,3 +1,7 @@ +""" +abstract base class for field codecs +""" + from __future__ import annotations from abc import ABC, abstractmethod from bitstring import ConstBitStream, Bits @@ -5,16 +9,29 @@ from typing import Any class FieldCodec(ABC): - def __init__(self, **kwargs): + """ + abstract base class for field codecs + """ + + def __init__(self, **kwargs) -> None: pass @abstractmethod - def encode(self, value: Any) -> tuple[Bits, Any, bool, str]: - # Returns: encoded bits, "rounded" value, encoding failed, error message + def encode(self, value: Any) -> tuple[Bits, Any]: + """ + encodes value + + :param value: value to encode + """ pass @abstractmethod - def decode(self, bits_to_decode: ConstBitStream): + def decode(self, bits_to_decode: ConstBitStream) -> Any: + """ + decodes bits + + :param bits_to_decode: ConstBitStream to decode + """ pass @property @@ -25,4 +42,4 @@ class FieldCodec(ABC): @property @abstractmethod def min_length_bits(self) -> int: - pass \ No newline at end of file + pass diff --git a/src/ltcodecs/fixed_len_array_codec.py b/src/ltcodecs/fixed_len_array_codec.py index 6509fe2..6061520 100644 --- a/src/ltcodecs/fixed_len_array_codec.py +++ b/src/ltcodecs/fixed_len_array_codec.py @@ -1,31 +1,60 @@ +""" +ltcodecs.fixed_len_array_codec +------------------------------ + +This module contains the FixedLenArrayCodec class, which is used to encode and decode fixed length arrays. +""" + from __future__ import annotations from bitstring import BitArray, ConstBitStream, Bits from .field_codec import FieldCodec import ltcodecs as ltcodecs from typing import List +from .exceptions import EncodingFailed class FixedLenArrayCodec(FieldCodec): - def __init__(self, element_type: str, length: int, element_params=None, **kwargs): + """ + codec for fixed length arrays + """ + + def __init__( + self, element_type: str, length: int, element_params=None, **kwargs + ) -> None: self.length = length if element_params: - self.element_field_codec = ltcodecs.field_codec_classes[element_type](**element_params) + self.element_field_codec = ltcodecs.field_codec_classes[element_type]( + **element_params + ) else: self.element_field_codec = ltcodecs.field_codec_classes[element_type]() - def encode(self, value: List) -> tuple[Bits, List, bool, str]: - value = value[0:self.length] + def encode(self, value: List) -> tuple[Bits, List]: + """ + encode a fixed length array + + :param value: the fixed length array to encode + """ + value = value[0 : self.length] value_bits = BitArray() encoded_value_list = [] for element in value: - element_bits, element_value, encode_failed, error_msg = self.element_field_codec.encode(element) - if encode_failed: - return Bits(), list(), True, error_msg + ( + element_bits, + element_value, + ) = self.element_field_codec.encode(element) + value_bits.append(element_bits) encoded_value_list.append(element_value) - return value_bits, encoded_value_list, False, "" + return value_bits, encoded_value_list + + def decode(self, bits_to_decode: ConstBitStream) -> List: + """ + decode a fixed length array + + :param bits_to_decode: the bits to decode + """ - def decode(self, bits_to_decode: ConstBitStream): decoded_list = [] for i in range(self.length): element = self.element_field_codec.decode(bits_to_decode) @@ -33,9 +62,9 @@ class FixedLenArrayCodec(FieldCodec): return decoded_list @property - def min_length_bits(self): + def min_length_bits(self) -> int: return self.length * self.element_field_codec.max_length_bits @property - def max_length_bits(self): - return self.length * self.element_field_codec.max_length_bits \ No newline at end of file + def max_length_bits(self) -> int: + return self.length * self.element_field_codec.max_length_bits diff --git a/src/ltcodecs/fixedint_codec.py b/src/ltcodecs/fixedint_codec.py index 82c358d..2caf996 100644 --- a/src/ltcodecs/fixedint_codec.py +++ b/src/ltcodecs/fixedint_codec.py @@ -1,15 +1,33 @@ +""" +ltcodecs.fixedint_codec +----------------------- + +This module contains classes for fixed width integer codecs +""" + from .varint_codec import VarintCodec class FixedIntCodec(VarintCodec): - def __init__(self, num_bits: int, signed: bool = False, min_value: int = None, resolution: int = 1, - little_endian: bool = False, **kwargs): + """ + FixedIntCodec base class + """ + + def __init__( + self, + num_bits: int, + signed: bool = False, + min_value: int = None, + resolution: int = 1, + little_endian: bool = False, + **kwargs + ): self.num_bits = num_bits if min_value: self.min_value = int(min_value) else: if signed: - self.min_value = -1 * resolution * 2**(self.num_bits - 1) + self.min_value = -1 * resolution * 2 ** (self.num_bits - 1) else: self.min_value = 0 self.max_value = self.min_value + (2**self.num_bits * resolution) @@ -19,40 +37,72 @@ class FixedIntCodec(VarintCodec): class Int8Codec(FixedIntCodec): + """ + metaclass for int8 codec + """ + def __init__(self, **kwargs): super(Int8Codec, self).__init__(num_bits=8, signed=True) class Int16Codec(FixedIntCodec): + """ + metaclass for int16 codec + """ + def __init__(self, **kwargs): super(Int16Codec, self).__init__(num_bits=16, signed=True) class Int32Codec(FixedIntCodec): + """ + metaclass for int32 codec + """ + def __init__(self, **kwargs): super(Int32Codec, self).__init__(num_bits=32, signed=True) class Int64Codec(FixedIntCodec): + """ + metaclass for int64 codec + """ + def __init__(self, **kwargs): super(Int64Codec, self).__init__(num_bits=64, signed=True) class UInt8Codec(FixedIntCodec): + """ + metaclass for uint8 codec + """ + def __init__(self, **kwargs): super(UInt8Codec, self).__init__(num_bits=8, signed=False) class UInt16Codec(FixedIntCodec): + """ + metaclass for uint16 codec + """ + def __init__(self, **kwargs): super(UInt16Codec, self).__init__(num_bits=16, signed=False) class UInt32Codec(FixedIntCodec): + """ + metaclass for uint32 codec + """ + def __init__(self, **kwargs): super(UInt32Codec, self).__init__(num_bits=32, signed=False) class UInt64Codec(FixedIntCodec): + """ + metaclass for uint64 codec + """ + def __init__(self, **kwargs): - super(UInt32Codec, self).__init__(num_bits=64, signed=False) \ No newline at end of file + super(UInt32Codec, self).__init__(num_bits=64, signed=False) diff --git a/src/ltcodecs/float_codec.py b/src/ltcodecs/float_codec.py index d42e308..61297ed 100644 --- a/src/ltcodecs/float_codec.py +++ b/src/ltcodecs/float_codec.py @@ -1,3 +1,10 @@ +""" +ltcodecs.float_codec +----------------------------- + +This module contains the the float codec +""" + from __future__ import annotations from bitstring import ConstBitStream, Bits from .field_codec import FieldCodec @@ -5,31 +12,55 @@ from math import ceil, log2 class FloatCodec(FieldCodec): - def __init__(self, min_value: float, max_value: float, precision: int, **kwargs): + """ + codec for floating point numbers + """ + + def __init__( + self, min_value: float, max_value: float, precision: int, **kwargs + ) -> None: self.max_value = float(max_value) self.min_value = float(min_value) self.precision = int(precision) - self.value_range = int(round(max_value - min_value, precision) * 10 ** precision) + self.value_range = int( + round(max_value - min_value, precision) * 10**precision + ) self.num_bits = ceil(log2(self.value_range)) - #print("Float codec: precision {} num_bits {}".format(precision, self.num_bits)) - def encode(self, value: float) -> tuple[Bits, float, bool, str]: + def encode(self, value: float) -> tuple[Bits, float]: + """ + encode a float + + :param value: the value to encode + """ value = float(value) if value < self.min_value: value = self.min_value elif value > self.max_value: value = self.max_value - encoded_value = int(round(value - self.min_value, self.precision) * 10 ** self.precision) - #print(value, self.min_value, self.precision) - rounded_value = round(value - self.min_value, self.precision) + self.min_value # Used only for validation + encoded_value = int( + round(value - self.min_value, self.precision) * 10**self.precision + ) + # print(value, self.min_value, self.precision) + rounded_value = ( + round(value - self.min_value, self.precision) + self.min_value + ) # Used only for validation encoded_bits = Bits(uint=encoded_value, length=self.num_bits) - return encoded_bits, rounded_value, False, "" + return encoded_bits, rounded_value - def decode(self, bits_to_decode: ConstBitStream): - float_offset = bits_to_decode.read('uint:{}'.format(self.num_bits)) / 10 ** self.precision + def decode(self, bits_to_decode: ConstBitStream) -> float: + """ + decode a float + + :param bits_to_decode: the bitstream to decode + """ + + float_offset = ( + bits_to_decode.read(f"uint:{self.num_bits}") / 10**self.precision + ) value = self.min_value + float_offset return value @@ -39,4 +70,4 @@ class FloatCodec(FieldCodec): @property def min_length_bits(self): - return self.num_bits \ No newline at end of file + return self.num_bits diff --git a/src/ltcodecs/ieee_float_codec.py b/src/ltcodecs/ieee_float_codec.py index bc887d5..26e7f96 100644 --- a/src/ltcodecs/ieee_float_codec.py +++ b/src/ltcodecs/ieee_float_codec.py @@ -1,23 +1,47 @@ +""" +ltcodecs.ieee_float_codec +------------------------- + +ieee_float_codec encodes and decodes IEEE floating point numbers +""" + from __future__ import annotations from bitstring import ConstBitStream, Bits from .field_codec import FieldCodec class IeeeFloatCodec(FieldCodec): + """ + codec for IEEE floating point numbers + """ + def __init__(self, num_bits=32, **kwargs): if num_bits not in (32, 64): - raise ValueError("Only 32 or 64 bit widths are supported for IEEE floating point codecs") + raise ValueError( + "Only 32 or 64 bit widths are supported for IEEE floating point codecs" + ) self.num_bits = num_bits - def encode(self, value: float) -> tuple[Bits, float, bool, str]: + def encode(self, value: float) -> tuple[Bits, float]: + """ + encode value + + :param value: the value to encode + """ + value = float(value) encoded_bits = Bits(float=value, length=self.num_bits) encoded_value = encoded_bits.float - ## print("encode varint {} bits as {}".format(num_bits, encoded_bits)) - return encoded_bits, encoded_value, False, "" + return encoded_bits, encoded_value def decode(self, bits_to_decode: ConstBitStream): - value = bits_to_decode.read('floatbe:{}'.format(self.num_bits)) + """ + decode value + + :param bits_to_decode: the bitstream to decode + """ + + value = bits_to_decode.read(f"floatbe:{self.num_bits}") return value @property @@ -30,10 +54,18 @@ class IeeeFloatCodec(FieldCodec): class IeeeFloat32Codec(IeeeFloatCodec): + """ + metaclass for IEEE 32 bit floating point numbers + """ + def __init__(self, **kwargs): super(IeeeFloat32Codec, self).__init__(num_bits=32) class IeeeFloat64Codec(IeeeFloatCodec): + """ + metaclass for IEEE 64 bit floating point numbers + """ + def __init__(self, **kwargs): - super(IeeeFloat64Codec, self).__init__(num_bits=64) \ No newline at end of file + super(IeeeFloat64Codec, self).__init__(num_bits=64) diff --git a/src/ltcodecs/linspace_float_codec.py b/src/ltcodecs/linspace_float_codec.py index 5b1df2e..f22b5d6 100644 --- a/src/ltcodecs/linspace_float_codec.py +++ b/src/ltcodecs/linspace_float_codec.py @@ -1,28 +1,53 @@ +""" +ltcodecs.linspace_float_codec +----------------------------- + +This module contains the the linspace float codec +""" + from __future__ import annotations +from math import ceil, log2 from bitstring import ConstBitStream, Bits + from .field_codec import FieldCodec -from math import ceil, log2 class LinspaceFloatCodec(FieldCodec): - def __init__(self, min_value: float, max_value: float, - resolution: float = None, num_values: int = None, num_bits: int = None, **kwargs): + """ + LinspaceFloatCodec + """ + + def __init__( + self, + min_value: float, + max_value: float, + resolution: float = None, + num_values: int = None, + num_bits: int = None, + **kwargs, + ): self.max_value = float(max_value) self.min_value = float(min_value) self.value_range = max_value - min_value if num_values or num_bits: if resolution: - raise ValueError("LinspaceFloatCodec supports setting only one of num_values, num_bits, or resolution.") + raise ValueError( + "LinspaceFloatCodec supports setting only one of num_values, num_bits, or resolution." + ) if num_bits: if num_values: - raise ValueError("LinspaceFloatCodec supports setting either num_values or num_bits, not both.") + raise ValueError( + "LinspaceFloatCodec supports setting either num_values or num_bits, not both." + ) if num_bits < 1: - raise ValueError("LinspaceFloatCodec requires at least 1 bit (num_bits >= 1), you specified num_bits={}".format(num_bits)) - num_values = 2 ** num_bits + raise ValueError( + f"LinspaceFloatCodec requires at least 1 bit (num_bits >= 1), you specified num_bits={num_bits}" + ) + num_values = 2**num_bits if num_values < 2: raise ValueError( - "LinspaceFloatCodec requires at least 2 values (num_values >= 2), you specified num_values={}".format( - num_values)) + f"LinspaceFloatCodec requires at least 2 values (num_values >= 2), you specified num_values={num_values}" + ) resolution = self.value_range / (num_values - 1) self.resolution = float(resolution) @@ -31,7 +56,12 @@ class LinspaceFloatCodec(FieldCodec): # if the max value isn't an integer multiple of the resolution from the min value, it won't be encoded. self.num_bits = ceil(log2(self.num_values)) - def encode(self, value: float) -> tuple[Bits, float, bool, str]: + def encode(self, value: float) -> tuple[Bits, float]: + """ + encode value + + :param value: the value to encode + """ value = float(value) if value < self.min_value: value = self.min_value @@ -41,11 +71,13 @@ class LinspaceFloatCodec(FieldCodec): discretized_offset = int(offset // self.resolution) encoded_value = self.min_value + (discretized_offset * self.resolution) encoded_bits = Bits(uint=discretized_offset, length=self.num_bits) - ## print("encode varint {} bits as {}".format(num_bits, encoded_bits)) - return encoded_bits, encoded_value, False, "" + return encoded_bits, encoded_value - def decode(self, bits_to_decode: ConstBitStream): - discretized_offset = bits_to_decode.read('uint:{}'.format(self.num_bits)) + def decode(self, bits_to_decode: ConstBitStream) -> float: + """ + decode bits_to_decode into a value + """ + discretized_offset = bits_to_decode.read(f"uint:{self.num_bits}") value = self.min_value + (discretized_offset * self.resolution) return value @@ -55,4 +87,4 @@ class LinspaceFloatCodec(FieldCodec): @property def min_length_bits(self): - return self.num_bits \ No newline at end of file + return self.num_bits diff --git a/src/ltcodecs/padding_codec.py b/src/ltcodecs/padding_codec.py index 6059758..6385e01 100644 --- a/src/ltcodecs/padding_codec.py +++ b/src/ltcodecs/padding_codec.py @@ -1,23 +1,44 @@ +""" +ltcodecs.padding_codec +---------------------- + +padding_codec encodes and decodes padding +""" + from __future__ import annotations from bitstring import BitArray, ConstBitStream from .field_codec import FieldCodec class PaddingCodec(FieldCodec): - def __init__(self, num_bits, **kwargs): + """ + padding codec + """ + + def __init__(self, num_bits, **kwargs) -> None: self.num_bits = int(num_bits) pass - def encode(self, value=None) -> tuple[BitArray, None, bool, str]: + def encode(self, value=None) -> tuple[BitArray, None]: + """ + encode padding + + :param value: the value to encode + """ value_bits = BitArray(uint=0, length=self.num_bits) - return value_bits, None, False, "" + return value_bits, None def decode(self, bits_to_decode: ConstBitStream): - bits_to_decode.read('pad:{}'.format(self.num_bits)) + """ + decode padding + + :param bits_to_decode: the bitstream to decode + """ + bits_to_decode.read(f"pad:{self.num_bits}") return None def min_length_bits(self): return self.num_bits def max_length_bits(self): - return self.num_bits \ No newline at end of file + return self.num_bits diff --git a/src/ltcodecs/ros_message_codec.py b/src/ltcodecs/ros_message_codec.py index 7b7aa6b..4788b92 100644 --- a/src/ltcodecs/ros_message_codec.py +++ b/src/ltcodecs/ros_message_codec.py @@ -1,3 +1,10 @@ +""" +ltcodecs.ros_message_codec +-------------------------- + +This module contains the the ROS message codec +""" + from __future__ import annotations from typing import Any import ltcodecs as ltcodecs @@ -9,48 +16,65 @@ import operator from std_msgs.msg import Header import rospy + class RosMessageCodec(object): + """ + RosMessageCodec + """ + def __init__(self, ros_type, fields_dict: dict = None, checksum=None): self.ros_type = ros_type self.packet_codec = None self.checksum = checksum - + if fields_dict: self.metadata_encoder_fields = {} for field_name, field_params in fields_dict.items(): # We only support a few metadata fields for encoding - codec_name = field_params.get('codec', 'auto') + codec_name = field_params.get("codec", "auto") for field_name, field_params in fields_dict.items(): # We only support a few metadata fields for encoding - codec_name = field_params.get('codec', 'auto') + codec_name = field_params.get("codec", "auto") if codec_name in (ltcodecs.metadata_encoders.keys()): self.metadata_encoder_fields[field_name] = codec_name else: self.metadata_encoder_fields = None - self.root_field_codec = ltcodecs.RosMsgFieldCodec(ros_type=ros_type, fields=fields_dict) - + self.root_field_codec = ltcodecs.RosMsgFieldCodec( + ros_type=ros_type, fields=fields_dict + ) + def encode(self, ros_msg: rospy.AnyMsg) -> tuple[Bits, dict[Any]]: + """ + Encode a ROS message into a bitstream and metadata dictionary + + :param ros_msg: ROS message to encode + """ encoded_bits, encoded_dict = self.root_field_codec.encode(ros_msg) if self.checksum: msgpack_bytes = msgpack.packb(encoded_dict) - if self.checksum == 'crc8': + if self.checksum == "crc8": calculated_crc = Crc8SaeJ1850.calc(msgpack_bytes) encoded_bits.append(Bits(uint=calculated_crc, length=8)) - elif self.checksum == 'crc32': + elif self.checksum == "crc32": calculated_crc = Crc32Iscsi.calc(msgpack_bytes) encoded_bits.append(Bits(uint=calculated_crc, length=32)) - #print("Encoded CRC: {}".format(hex(calculated_crc))) metadata_dict = self._encode_metadata(ros_msg) return encoded_bits, metadata_dict - def decode(self, bits_to_decode: ConstBitStream, received_packet = None): - rospy.loginfo("Decoding ROS message {}".format(self.ros_type)) + def decode(self, bits_to_decode: ConstBitStream, received_packet=None): + """ + Decode a ROS message from a bitstream + + :param bits_to_decode: Bitstream to decode + """ + + rospy.loginfo(f"Decoding ROS message {self.ros_type}") # Now, check CRC, if required. if self.checksum: # We need to decode this as a dict separately, so we need a new copy of the received bitstream. @@ -62,24 +86,23 @@ class RosMessageCodec(object): if self.checksum: msgpack_bytes = msgpack.packb(decoded_dict) - # print("Msgpack length: {} bytes".format(len(msgpack_bytes))) - if self.checksum == 'crc8': + if self.checksum == "crc8": calculated_crc = Crc8SaeJ1850.calc(msgpack_bytes) # Next 8 bits of message are CRC - received_crc = bits_to_decode.read('uint:8') - elif self.checksum == 'crc32': + received_crc = bits_to_decode.read("uint:8") + elif self.checksum == "crc32": calculated_crc = Crc32Iscsi.calc(msgpack_bytes) - received_crc = bits_to_decode.read('uint:32') + received_crc = bits_to_decode.read("uint:32") - #print("Received CRC: {}".format(hex(received_crc))) if calculated_crc != received_crc: raise ValueError("Message CRC Mismatch") - rospy.loginfo("ROS Message: {}".format(ros_msg)) + rospy.loginfo(f"ROS Message: {ros_msg}") return ros_msg def _encode_metadata(self, ros_msg): - # Look through the fields dict for one of the magic keywords specified in field_codecs.py + """Look through the fields dict for one of the magic keywords specified in field_codecs.py""" + if self.metadata_encoder_fields: metadata_dict = {} for field_name, codec in self.metadata_encoder_fields.items(): @@ -90,9 +113,9 @@ class RosMessageCodec(object): @property def max_length_bits(self): - if self.checksum == 'crc8': + if self.checksum == "crc8": checksum_len = 8 - elif self.checksum == 'crc32': + elif self.checksum == "crc32": checksum_len = 32 else: checksum_len = 0 @@ -100,9 +123,9 @@ class RosMessageCodec(object): @property def min_length_bits(self): - if self.checksum == 'crc8': + if self.checksum == "crc8": checksum_len = 8 - elif self.checksum == 'crc32': + elif self.checksum == "crc32": checksum_len = 32 else: checksum_len = 0 diff --git a/src/ltcodecs/ros_msg_field_codec.py b/src/ltcodecs/ros_msg_field_codec.py index b7e4b02..9e39176 100644 --- a/src/ltcodecs/ros_msg_field_codec.py +++ b/src/ltcodecs/ros_msg_field_codec.py @@ -1,23 +1,40 @@ +""" +ltcodecs.ros_msg_codec +---------------------- + +This module contains the the field codec which is used to encode and decode ROS messages. +""" + from __future__ import annotations import re from bitstring import ConstBitStream, BitArray -from .field_codec import FieldCodec -import ltcodecs as ltcodecs +import operator + from rospy import AnyMsg from typing import Union, Any -import operator import roslib.message +import ltcodecs as ltcodecs + +from .field_codec import FieldCodec +from .exceptions import EncodingFailed + class RosMsgFieldCodec(FieldCodec): - def __init__(self, ros_type: Union[str, AnyMsg], fields: dict = None, **kwargs): + """ + field codec for ROS messages + """ + + def __init__( + self, ros_type: Union[str, AnyMsg], fields: dict = None, **kwargs + ) -> None: self.fields = fields self.ros_type = ros_type if isinstance(ros_type, str): msg_class = roslib.message.get_message_class(ros_type) if not msg_class: - raise TypeError('Invalid ROS type "{}" for message codec {}'.format(ros_type, id)) + raise TypeError(f'Invalid ROS type "{ros_type}" for message codec {id}') else: msg_class = ros_type self.ros_msg_class = msg_class @@ -34,29 +51,36 @@ class RosMsgFieldCodec(FieldCodec): for field_name in new_fields: msg_type = slots[field_name] if msg_type in ltcodecs.field_codec_classes: - field_params = {'codec': msg_type} + field_params = {"codec": msg_type} else: # There are two cases here: arrays or nested ROS messages - if msg_type.endswith(']'): + if msg_type.endswith("]"): # either a variable or fixed length array - matched = re.match(r'(?P<element_type>.*)\[(?P<array_len>\d*)\]', msg_type) - if matched['array_len']: - array_len = int(matched['array_len']) - field_params = {'codec': 'fixed_len_array', - 'length': array_len} + matched = re.match( + r"(?P<element_type>.*)\[(?P<array_len>\d*)\]", msg_type + ) + if matched["array_len"]: + array_len = int(matched["array_len"]) + field_params = { + "codec": "fixed_len_array", + "length": array_len, + } else: - field_params = {'codec': 'variable_len_array', - 'max_length': 10} # TODO: maybe take this as a parameter? + field_params = { + "codec": "variable_len_array", + "max_length": 10, + } # TODO: maybe take this as a parameter? # and the element type is either a standard type or a ROS message - if matched['element_type'] in ltcodecs.field_codec_classes: - field_params['element_type'] = matched['element_type'] + if matched["element_type"] in ltcodecs.field_codec_classes: + field_params["element_type"] = matched["element_type"] else: - field_params['element_type'] = 'msg' - field_params['element_params'] = {'ros_type': matched['element_type']} + field_params["element_type"] = "msg" + field_params["element_params"] = { + "ros_type": matched["element_type"] + } else: # It's a nested ROS message - field_params = {'codec': 'msg', - 'ros_type': msg_type} + field_params = {"codec": "msg", "ros_type": msg_type} new_fields[field_name] = field_params @@ -68,17 +92,24 @@ class RosMsgFieldCodec(FieldCodec): # print(field_name, field_params['codec']) # print(field_codecs.field_codec_classes[field_params['codec']]) try: - if field_params['codec'] not in ltcodecs.metadata_decoders.keys(): - self.field_codecs[field_name] = ltcodecs.field_codec_classes[field_params['codec']](**field_params) + if field_params["codec"] not in ltcodecs.metadata_decoders.keys(): + self.field_codecs[field_name] = ltcodecs.field_codec_classes[ + field_params["codec"] + ](**field_params) else: # The metadata codec is a string, which we use for a lookup later. - self.field_codecs[field_name] = field_params['codec'] - except KeyError as e: - raise KeyError("Error parsing codec config for {}. Got params:\n{}\nError: {}".format(field_name, - field_params, - e)) - - def encode(self, message: AnyMsg, metadata=None) -> tuple[BitArray, dict[Any], bool, str]: + self.field_codecs[field_name] = field_params["codec"] + except KeyError as err: + raise KeyError( + f"Error parsing codec config for {field_name}. Got params:\n{field_params}\nError: {err}" + ) from err + + def encode(self, message: AnyMsg, metadata=None) -> tuple[BitArray, dict[Any]]: + """ + encode a ROS message + + :param message: the message to encode + """ # ROS messages use __slots__, so we can't use __dict__ or vars() to get the attributes as a dict message_dict = {} for field in message.__slots__: @@ -92,19 +123,26 @@ class RosMsgFieldCodec(FieldCodec): # Note that metadata encoding is done at the ros_msg_codec level, not here if not field_codec or isinstance(field_codec, str): continue - field_bits, encoded_dict[field_name], failed_to_encode, error_msg = field_codec.encode(message_dict[field_name]) - if failed_to_encode: - return BitArray(), dict(), True, error_msg + ( + field_bits, + encoded_dict[field_name], + ) = field_codec.encode(message_dict[field_name]) + encoded_bits.append(field_bits) - except Exception as e: - #print("Codec: {}, max len bits {}".format(field_codec, field_codec.max_length_bits)) - error_msg = ('Error encoding field "{}" with codec {} (max len bits {})'.format(field_name, - field_codec, field_codec.max_length_bits)).with_traceback(e.__traceback__) - return BitArray(), dict(), True, error_msg + except Exception as err: + raise EncodingFailed( + f'Error encoding field "{field_name}" with codec {field_codec} (max len bits {field_codec.max_length_bits})' + ) from err return encoded_bits, encoded_dict def decode(self, bits_to_decode: ConstBitStream, metadata=None): - # We go through the bits in sequence until we are decoded. + """ + decode a ROS message + + :param bits_to_decode: the bits to decode + """ + + # go through the bits in sequence until we are decoded. # The ConstBitStream has an internal read pointer. decoded_message = {} for field_name, field_params in self.fields.items(): @@ -126,27 +164,31 @@ class RosMsgFieldCodec(FieldCodec): # if we don't recognize the metadata codec, just keep going continue - ## print("Ros decode field {}".format(field_name)) # pass metadata only to nested ros msg fields if isinstance(field_codec, type(self)): - decoded_message[field_name] = field_codec.decode(bits_to_decode, metadata=metadata) + decoded_message[field_name] = field_codec.decode( + bits_to_decode, metadata=metadata + ) else: decoded_message[field_name] = field_codec.decode(bits_to_decode) - ## print("Ros decode got {}".format(decoded_message)) return self.ros_msg_class(**decoded_message) def decode_as_dict(self, bits_to_decode: ConstBitStream): - # This function is used to generate an object that we can use for calculating CRCs in the message codec + """This function is used to generate an object that we can use for calculating CRCs in the message codec + It seems like a bit of a hack, but the alternative would require adding a second return value to every field + decoder. Since this only affects ROS messages, this keeps the bloat down. + + :param bits_to_decode: the bits to decode + """ # It seems like a bit of a hack, but the alternative would require adding a second return value to every field # decoder. Since this only affects ROS messages, this keeps the bloat down. decoded_message = {} for field_name, field_params in self.fields.items(): field_codec = self.field_codecs[field_name] - if hasattr(field_codec, 'decode_as_dict'): + if hasattr(field_codec, "decode_as_dict"): decoded_message[field_name] = field_codec.decode_as_dict(bits_to_decode) else: decoded_message[field_name] = field_codec.decode(bits_to_decode) - ## print("Ros decode got {}".format(decoded_message)) return decoded_message @property diff --git a/src/ltcodecs/rostime_codec.py b/src/ltcodecs/rostime_codec.py index ef201ba..fafff7c 100644 --- a/src/ltcodecs/rostime_codec.py +++ b/src/ltcodecs/rostime_codec.py @@ -1,37 +1,75 @@ +""" +ltcodecs.ros_time_codec +----------------------- + +This module contains the RosTimeCodec class, which is used to encode and decode ROS times. +""" + from __future__ import annotations -from bitstring import ConstBitStream, Bits -from .field_codec import FieldCodec + from math import ceil, log2 +from bitstring import ConstBitStream, Bits from rospy import Time +from .field_codec import FieldCodec + class RosTimeCodec(FieldCodec): - def __init__(self, precision: int = 0, epoch_start=1622520000, epoch_end=(2**31 - 1), **kwargs): + """ + codec for encoding and decoding ROS times + """ + + def __init__( + self, + precision: int = 0, + epoch_start=1622520000, + epoch_end=(2**31 - 1), + **kwargs, + ) -> None: self.max_value = epoch_end self.min_value = epoch_start self.precision = int(precision) - self.value_range = int(round(self.max_value - self.min_value, precision) * 10 ** precision) + self.value_range = int( + round(self.max_value - self.min_value, precision) * 10**precision + ) self.num_bits = ceil(log2(self.value_range)) - #print("Float codec: precision {} num_bits {}".format(precision, self.num_bits)) - def encode(self, value: Time) -> tuple[Bits, Time, bool, str]: + def encode(self, value: Time) -> tuple[Bits, Time]: + """ + encode rostime + + :param value: the float to encode + """ + value = value.to_sec() if value < self.min_value: value = self.min_value elif value > self.max_value: value = self.max_value - encoded_value = int(round(value - self.min_value, self.precision) * 10 ** self.precision) - #print(value, self.min_value, self.precision) - rounded_value = round(value - self.min_value, self.precision) + self.min_value # Used only for validation + encoded_value = int( + round(value - self.min_value, self.precision) * 10**self.precision + ) + # print(value, self.min_value, self.precision) + rounded_value = ( + round(value - self.min_value, self.precision) + self.min_value + ) # Used only for validation rounded_value = Time.from_sec(rounded_value) encoded_bits = Bits(uint=encoded_value, length=self.num_bits) - return encoded_bits, rounded_value, False, "" + return encoded_bits, rounded_value - def decode(self, bits_to_decode: ConstBitStream): - float_offset = bits_to_decode.read('uint:{}'.format(self.num_bits)) / 10 ** self.precision + def decode(self, bits_to_decode: ConstBitStream) -> Time: + """ + decode rostime + + :param bits_to_decode: the bitstream to decode + """ + + float_offset = ( + bits_to_decode.read(f"uint:{self.num_bits}") / 10**self.precision + ) value = Time.from_sec(self.min_value + float_offset) return value @@ -41,4 +79,4 @@ class RosTimeCodec(FieldCodec): @property def min_length_bits(self): - return self.num_bits \ No newline at end of file + return self.num_bits diff --git a/src/ltcodecs/string_codecs.py b/src/ltcodecs/string_codecs.py index 749ea4b..6f7b665 100644 --- a/src/ltcodecs/string_codecs.py +++ b/src/ltcodecs/string_codecs.py @@ -1,92 +1,124 @@ -from sys import byteorder +""" +ltcodecs.string_codecs +---------------------- + +This module contains codecs for encoding and decoding strings. +""" + +from __future__ import annotations from bitstring import BitArray, Bits, ConstBitStream, offsetcopy from .varint_codec import VarintCodec from .field_codec import FieldCodec -def to_sixbit_ascii(character: str): + +def to_sixbit_ascii(character: str) -> int: + """ + return the sixbit code corresponding to the ascii character + + :param character: the character to convert + """ upper_character = character.upper() - ascii_code = int(upper_character.encode('ascii')[0]) + ascii_code = int(upper_character.encode("ascii")[0]) - if ascii_code >= 0x40 and ascii_code <= 0x5f: + if ascii_code >= 0x40 and ascii_code <= 0x5F: sixbit_code = ascii_code - 0x40 - elif ascii_code >= 0x20 and ascii_code <= 0x3f: + elif ascii_code >= 0x20 and ascii_code <= 0x3F: sixbit_code = ascii_code else: - sixbit_code = 0x3f # out of bounds values are encoded as '?' + sixbit_code = 0x3F # out of bounds values are encoded as '?' return sixbit_code -def from_sixbit_ascii(sixbit_code: int): - if sixbit_code <= 0x1f: - ascii_code = sixbit_code + 0x40 - # Other characters map directly - else: - ascii_code = sixbit_code +def from_sixbit_ascii(sixbit_code: int) -> str: + """ + return the ascii character corresponding to the sixbit code - return bytes([ascii_code]).decode('ascii') + :param sixbit_code: the sixbit code to convert + """ + ascii_code = sixbit_code + 0x40 if sixbit_code <= 0x1F else sixbit_code + + return bytes([ascii_code]).decode("ascii") class AsciiStringCodec(FieldCodec): - def __init__(self, max_length: int = 128, bits_per_char: int = 7, tail=False, **kwargs): + """ + codec for encoding and decoding ascii strings + """ + + def __init__( + self, max_length: int = 128, bits_per_char: int = 7, tail=False, **kwargs + ) -> None: self.max_length = int(max_length) self.bits_per_char = bits_per_char self.tail = tail self.string_len_codec = VarintCodec(min_value=0, max_value=max_length) - def encode(self, value: str) -> tuple[Bits, str, bool, str]: + def encode(self, value: str) -> tuple[Bits, str]: + """ + encode a string into a bitstream + + :param value: the string to encode + """ if not self.tail: - value = value[0:self.max_length] + value = value[0 : self.max_length] else: - value = value[-self.max_length:] + value = value[-self.max_length :] length_bits, _ = self.string_len_codec.encode(len(value)) encoded_bits = BitArray() encoded_bits.append(length_bits) - string_bytes = value.encode('ascii') + string_bytes = value.encode("ascii") if self.bits_per_char == 7: compressed_bytes = bytearray() for sb in string_bytes: - if sb > 0x7f: + if sb > 0x7F: # Replace out of bounds values with "?" - sb = 0x3f + sb = 0x3F compressed_bytes.append(sb) encoded_bits.append(Bits(bytes=[sb], length=7, offset=1)) elif self.bits_per_char == 6: compressed_bytes = bytearray() for sb in string_bytes: sixbit_value = to_sixbit_ascii(sb) - compressed_byte = from_sixbit_ascii(sixbit_value).encode('ascii') + compressed_byte = from_sixbit_ascii(sixbit_value).encode("ascii") compressed_bytes.extend(compressed_byte) encoded_bits.append(Bits(bytes=[sixbit_value], length=6, offset=2)) else: encoded_bits.append(Bits(bytes=value)) compressed_bytes = string_bytes - compressed_string = compressed_bytes.decode('ascii') - return encoded_bits, compressed_string, False, "" + compressed_string = compressed_bytes.decode("ascii") + return encoded_bits, compressed_string + + def decode(self, encoded_bits: ConstBitStream) -> str: + """ + decode a string from a bitstream - def decode(self, encoded_bits: ConstBitStream): + :param encoded_bits: the bitstream to decode + """ num_chars = self.string_len_codec.decode(encoded_bits) if self.bits_per_char == 7: string_bytes = bytearray() for i in range(num_chars): - char_byte = encoded_bits.read('uint:7').to_bytes(1, 'big')[0] + char_byte = encoded_bits.read("uint:7").to_bytes(1, "big")[0] string_bytes.append(char_byte) elif self.bits_per_char == 6: new_string = "" for i in range(num_chars): - sixbit_code = encoded_bits.read('uint:6').to_bytes(1, 'big')[0] - new_string.append(from_sixbit_ascii(sixbit_code)) + sixbit_code = encoded_bits.read("uint:6").to_bytes(1, "big")[0] + new_string += from_sixbit_ascii(sixbit_code) return new_string else: - string_bytes = encoded_bits.read('bytes:{}'.format(num_chars)) - return string_bytes.decode('ascii') + string_bytes = encoded_bits.read(f"bytes:{num_chars}") + return string_bytes.decode("ascii") @property def max_length_bits(self): - return self.string_len_codec.max_length_bits + (self.max_length * self.bits_per_char) + return self.string_len_codec.max_length_bits + ( + self.max_length * self.bits_per_char + ) @property def min_length_bits(self): - return self.string_len_codec.max_length_bits \ No newline at end of file + return self.string_len_codec.max_length_bits diff --git a/src/ltcodecs/string_enum_codec.py b/src/ltcodecs/string_enum_codec.py index 1dc7e60..6f3fffd 100644 --- a/src/ltcodecs/string_enum_codec.py +++ b/src/ltcodecs/string_enum_codec.py @@ -1,12 +1,30 @@ +""" +ltcodecs.string_enum_codec +-------------------------- + +This module contains the StringEnumCodec class, which is used to encode and decode string enums. +""" +from __future__ import annotations from bitstring import ConstBitStream from .varint_codec import VarintCodec from .field_codec import FieldCodec from typing import List, Optional +from .exceptions import EncodingFailed class StringEnumCodec(FieldCodec): - def __init__(self, entries: List[str], unknown_value: Optional[str] = None, - case_sensitive: bool = False, strip: bool = False, **kwargs): + """ + codec for string enums + """ + + def __init__( + self, + entries: List[str], + unknown_value: Optional[str] = None, + case_sensitive: bool = False, + strip: bool = False, + **kwargs, + ) -> None: if case_sensitive: self.entries = entries else: @@ -21,16 +39,22 @@ class StringEnumCodec(FieldCodec): self.unknown_value = None min_value = 0 - self.string_index_codec = VarintCodec(min_value=min_value, max_value=len(self.entries)) + self.string_index_codec = VarintCodec( + min_value=min_value, max_value=len(self.entries) + ) + + def encode(self, value: str) -> tuple[ConstBitStream, str]: + """ + encode a string enum + + :param value: the string to encode + """ - def encode(self, value: str): if not self.case_sensitive: value = value.lower() if self.strip: value = value.strip() - encode_failed = False - error_msg = "" if value in self.entries: index = self.entries.index(value) compressed_value = self.entries[index] @@ -41,14 +65,21 @@ class StringEnumCodec(FieldCodec): else: index = 0 compressed_value = self.entries[index] - encode_failed = True - error_msg = f"Failed to encode unknown string {value}, not in {self.entries}" + raise EncodingFailed( + f"Failed to encode unknown string {value}, not in {self.entries}" + ) encoded_index, _ = self.string_index_codec.encode(index) - return encoded_index, compressed_value, encode_failed, error_msg + return encoded_index, compressed_value + + def decode(self, encoded_bits: ConstBitStream) -> str: + """ + decode a string enum from a bitstream + + :param encoded_bits: the bitstream to decode + """ - def decode(self, encoded_bits: ConstBitStream): index = self.string_index_codec.decode(encoded_bits) if index < 0: return self.unknown_value @@ -61,4 +92,4 @@ class StringEnumCodec(FieldCodec): @property def min_length_bits(self): - return self.string_index_codec.max_length_bits \ No newline at end of file + return self.string_index_codec.max_length_bits diff --git a/src/ltcodecs/variable_len_array_codec.py b/src/ltcodecs/variable_len_array_codec.py index 0cf62f4..dc32935 100644 --- a/src/ltcodecs/variable_len_array_codec.py +++ b/src/ltcodecs/variable_len_array_codec.py @@ -1,35 +1,60 @@ +""" +variable_len_array_codec.py +--------------------------- + +module for VariableLenArrayCodec +""" + from __future__ import annotations from typing import Any from bitstring import BitArray, ConstBitStream from .field_codec import FieldCodec from .varint_codec import VarintCodec import ltcodecs as ltcodecs +from .exceptions import EncodingFailed class VariableLenArrayCodec(FieldCodec): - def __init__(self, element_type: str, max_length: int, element_params=None, **kwargs): + """ + codec for variable-length array + """ + + def __init__( + self, element_type: str, max_length: int, element_params=None, **kwargs + ) -> None: self.max_length = max_length self.length_codec = VarintCodec(min_value=0, max_value=self.max_length) - print('element_type', element_type) + print("element_type", element_type) if element_params: - self.element_field_codec = ltcodecs.field_codec_classes[element_type](**element_params) + self.element_field_codec = ltcodecs.field_codec_classes[element_type]( + **element_params + ) else: self.element_field_codec = ltcodecs.field_codec_classes[element_type]() - def encode(self, value: list) -> tuple[BitArray, list[Any], bool, str]: - value = value[0:self.max_length] + def encode(self, value: list) -> tuple[BitArray, list[Any]]: + """ + encodes list of elements + """ + + value = value[0 : self.max_length] length_bits, _ = self.length_codec.encode(len(value)) value_bits = BitArray(length_bits) encoded_value_list = [] for element in value: - element_bits, element_value, failed_to_encode, error_msg = self.element_field_codec.encode(element) - if failed_to_encode: - return BitArray(), [], True, error_msg + ( + element_bits, + element_value, + ) = self.element_field_codec.encode(element) + value_bits.append(element_bits) encoded_value_list.append(element_value) - return value_bits, encoded_value_list, False, "" + return value_bits, encoded_value_list - def decode(self, bits_to_decode: ConstBitStream): + def decode(self, bits_to_decode: ConstBitStream) -> list: + """ + decodes list of elements from bits + """ num_elements = self.length_codec.decode(bits_to_decode) decoded_list = [] for i in range(num_elements): @@ -38,9 +63,11 @@ class VariableLenArrayCodec(FieldCodec): return decoded_list @property - def min_length_bits(self): + def min_length_bits(self) -> int: return self.length_codec.max_length_bits @property - def max_length_bits(self): - return self.length_codec.max_length_bits + (self.max_length * self.element_field_codec.max_length_bits) \ No newline at end of file + def max_length_bits(self) -> int: + return self.length_codec.max_length_bits + ( + self.max_length * self.element_field_codec.max_length_bits + ) diff --git a/src/ltcodecs/varint_codec.py b/src/ltcodecs/varint_codec.py index adf4410..3ce6600 100644 --- a/src/ltcodecs/varint_codec.py +++ b/src/ltcodecs/varint_codec.py @@ -1,11 +1,31 @@ +""" +ltcodecs.varint_codec +--------------------- + +This module contains the VarintCodec class, which is used to encode and decode variable-length integers. +""" + from __future__ import annotations + +from math import ceil, log2 + from bitstring import ConstBitStream, Bits from .field_codec import FieldCodec -from math import ceil, log2 class VarintCodec(FieldCodec): - def __init__(self, min_value: int, max_value: int, resolution: int = 1, little_endian: bool = False, **kwargs): + """ + codec for variable-length integers + """ + + def __init__( + self, + min_value: int, + max_value: int, + resolution: int = 1, + little_endian: bool = False, + **kwargs, + ) -> None: self.max_value = int(max_value) self.min_value = int(min_value) self.resolution = int(resolution) @@ -15,7 +35,12 @@ class VarintCodec(FieldCodec): self.num_bits = ceil(log2(num_values)) self.little_endian = little_endian - def encode(self, value: int) -> tuple[Bits, int, bool, str]: + def encode(self, value: int) -> tuple[Bits, int]: + """ + turns int into variable-length int + + :param value: int to encode + """ value = int(value) if value < self.min_value: value = self.min_value @@ -28,28 +53,32 @@ class VarintCodec(FieldCodec): encoded_bits = Bits(uintle=discretized_offset, length=self.num_bits) else: encoded_bits = Bits(uint=discretized_offset, length=self.num_bits) - ## print("encode varint {} bits as {}".format(num_bits, encoded_bits)) - return encoded_bits, encoded_value, False, "" + return encoded_bits, encoded_value def decode(self, bits_to_decode: ConstBitStream) -> int: + """ + decodes encoded varint + + :param bits_to_decode: ConstBitStream to decode + """ if self.little_endian: - discretized_offset = bits_to_decode.read('uintle:{}'.format(self.num_bits)) + discretized_offset = bits_to_decode.read(f"uintle:{self.num_bits}") else: - discretized_offset = bits_to_decode.read('uint:{}'.format(self.num_bits)) + discretized_offset = bits_to_decode.read(f"uint:{self.num_bits}") value = self.min_value + (discretized_offset * self.resolution) return value @property - def max_length_bits(self): + def max_length_bits(self) -> int: return self.num_bits @property - def min_length_bits(self): + def min_length_bits(self) -> int: return self.num_bits - def __repr__(self): - return "VarintCodec {:x}: min_value={}, max_value={}, resolution={}".format(id(self), - self.min_value, - self.max_value, - self.resolution) \ No newline at end of file + def __repr__(self) -> str: + """ + returns string representation of VarintCodec + """ + return f"VarintCodec {id(self):x}: min_value={self.min_value}, max_value={self.max_value}, resolution={self.resolution}" diff --git a/test_codecs.py b/test_codecs.py new file mode 100755 index 0000000..e7bdde6 --- /dev/null +++ b/test_codecs.py @@ -0,0 +1,349 @@ +#!/usr/bin/env python3 + +""" +Test suite for ltcodecs +""" +import time + +import unittest +from bitstring import ConstBitStream + +import rospy +from std_msgs.msg import String + +import ltcodecs + + +class CodecTest(unittest.TestCase): + """ + Test class for ltcodecs + """ + + def test_bool_codec(self) -> None: + """test functionality of the boolean codec""" + + input_bool = True + codec = ltcodecs.BoolCodec() + encoded_bool = codec.encode(input_bool) + + bit_stream = ConstBitStream(encoded_bool[0]) + + decoded_bool = codec.decode(bit_stream) + self.assertEqual(decoded_bool, input_bool) + + def test_string_codec(self) -> None: + """test functionality of the string codec""" + + input_str = "This is a test string" + codec = ltcodecs.string_codecs.AsciiStringCodec() + encoded_str = codec.encode(input_str) + + bit_stream = ConstBitStream(encoded_str[0]) + + decoded_str = codec.decode(bit_stream) + self.assertEqual(decoded_str, input_str) + + def test_bytes_codec(self) -> None: + """test functionality of the bytes codec""" + + input_str = "This is a test string" + input_bytes = input_str.encode() # encode entryString to bytes + + codec = ltcodecs.bytes_codec.BytesCodec(100) + encoded_byes = codec.encode(input_bytes) + + bit_stream = ConstBitStream(encoded_byes[0]) + + decoded_bytes = codec.decode(bit_stream).decode() + self.assertEqual(decoded_bytes, input_str) + + def test_ccl_latlon_bcd_codec(self) -> None: + """test functionality of the CclLatLonBcd codec""" + + input_coord = 41.5 + + codec = ltcodecs.ccl_latlon_bcd_codec.CclLatLonBcdCodec(input_coord) + encoded_coord = codec.encode(input_coord) + + bit_stream = ConstBitStream(encoded_coord[0]) + + decoded_coord = codec.decode(bit_stream) + + self.assertAlmostEqual(decoded_coord, input_coord) + + def test_ccl_latlon_codec(self) -> None: + """test functionality of the CclLatLon codec""" + + input_coord = 41.5 + + codec = ltcodecs.ccl_latlon_codec.CclLatLonCodec() + encoded_coord = codec.encode(input_coord) + + bit_stream = ConstBitStream(encoded_coord[0]) + + decoded_coord = codec.decode(bit_stream) + + self.assertAlmostEqual(decoded_coord, input_coord, delta=1) + + def test_int8_codec(self) -> None: + """test functionality of the signed integer 8 codec""" + + input_int = -21 + + codec = ltcodecs.FixedIntCodec(8, True) + encoded_int = codec.encode(input_int) + + bit_stream = ConstBitStream(encoded_int[0]) + + decoded_int = codec.decode(bit_stream) + + self.assertAlmostEqual(decoded_int, input_int, delta=1) + + def test_int16_codec(self) -> None: + """test functionality of the signed integer 16 codec""" + + input_int = -21 + + codec = ltcodecs.FixedIntCodec(16, True) + encoded_int = codec.encode(input_int) + + bit_stream = ConstBitStream(encoded_int[0]) + + decoded_int = codec.decode(bit_stream) + + self.assertAlmostEqual(decoded_int, input_int) + + def test_int32_codec(self) -> None: + """test functionality of the signed integer 32 codec""" + + input_int = 1 + + codec = ltcodecs.FixedIntCodec(32, True) + encoded_int = codec.encode(input_int) + + bit_stream = ConstBitStream(encoded_int[0]) + + decoded_int = codec.decode(bit_stream) + + self.assertAlmostEqual(decoded_int, input_int) + + def test_int64_codec(self) -> None: + """test functionality of the signed integer 64 codec""" + + input_int = 1 + + codec = ltcodecs.FixedIntCodec(64, True) + encoded_int = codec.encode(input_int) + + bit_stream = ConstBitStream(encoded_int[0]) + + decoded_int = codec.decode(bit_stream) + + self.assertAlmostEqual(decoded_int, input_int) + + def test_uint8_codec(self) -> None: + """test functionality of the unsigned integer 8 codec""" + + input_int = 21 + + codec = ltcodecs.FixedIntCodec(8) + encoded_int = codec.encode(input_int) + + bit_stream = ConstBitStream(encoded_int[0]) + + decoded_int = codec.decode(bit_stream) + + self.assertAlmostEqual(decoded_int, input_int) + + def test_uint16_codec(self) -> None: + """test functionality of the unsigned integer 16 codec""" + + input_int = 21 + + codec = ltcodecs.FixedIntCodec(16) + encoded_int = codec.encode(input_int) + + bit_stream = ConstBitStream(encoded_int[0]) + + decoded_int = codec.decode(bit_stream) + + self.assertAlmostEqual(decoded_int, input_int) + + def test_uint32_codec(self) -> None: + """test functionality of the unsigned integer 32 codec""" + + input_int = 21 + + codec = ltcodecs.FixedIntCodec(32) + encoded_int = codec.encode(input_int) + + bit_stream = ConstBitStream(encoded_int[0]) + + decoded_int = codec.decode(bit_stream) + + self.assertAlmostEqual(decoded_int, input_int) + + def test_uint64_codec(self) -> None: + """test functionality of the unsigned integer 64 codec""" + + input_int = 21 + + codec = ltcodecs.FixedIntCodec(64) + encoded_int = codec.encode(input_int) + + bit_stream = ConstBitStream(encoded_int[0]) + + decoded_int = codec.decode(bit_stream) + + self.assertAlmostEqual(decoded_int, input_int) + + def test_float_codec(self) -> None: + """test functionality of the Float codec""" + + input_float = 21.53 + + codec = ltcodecs.float_codec.FloatCodec(0, 100, 2) + encoded_float = codec.encode(input_float) + + bit_stream = ConstBitStream(encoded_float[0]) + + decoded_float = codec.decode(bit_stream) + + self.assertAlmostEqual(decoded_float, input_float) + + def test_ieee_float_codec(self) -> None: + """test functionality of the IEEE Float codec""" + + input_float = 21.23 + + codec = ltcodecs.ieee_float_codec.IeeeFloatCodec() + encoded_float = codec.encode(input_float) + + bit_stream = ConstBitStream(encoded_float[0]) + + decoded_float = codec.decode(bit_stream) + + self.assertAlmostEqual(round(decoded_float, 2), round(input_float, 2)) + + def test_linspace_float_codec(self) -> None: + """test functionality of the Linspace Float codec""" + + input_float = 21.23 + + codec = ltcodecs.linspace_float_codec.LinspaceFloatCodec(0, 100, num_bits=32) + encoded_float = codec.encode(input_float) + + bit_stream = ConstBitStream(encoded_float[0]) + + decoded_float = codec.decode(bit_stream) + + self.assertAlmostEqual(round(decoded_float, 2), round(input_float, 2)) + + def test_varint_codec(self) -> None: + """test functionality of the Varint codec""" + + input_int = 21 + + codec = ltcodecs.VarintCodec(0, 100) + encoded_int = codec.encode(input_int) + + bit_stream = ConstBitStream(encoded_int[0]) + + decoded_int = codec.decode(bit_stream) + + self.assertAlmostEqual(decoded_int, input_int) + + def test_fixed_length_array_codec(self) -> None: + """test functionality of the fixed length array codec""" + + input_array = [1, 2, 3, 4] + + codec = ltcodecs.VariableLenArrayCodec( + "integer", 4, element_params={"min_value": 0, "max_value": 100} + ) + encoded_int = codec.encode(input_array) + + bit_stream = ConstBitStream(encoded_int[0]) + + decoded_int = codec.decode(bit_stream) + + self.assertAlmostEqual(decoded_int, input_array) + + def test_variable_legnth_array_codec(self) -> None: + """test functionality of the variable length array codec""" + + input_array = [1, 2, 3, 4] + + codec = ltcodecs.VariableLenArrayCodec( + "integer", 4, element_params={"min_value": 0, "max_value": 100} + ) + encoded_int = codec.encode(input_array) + + bit_stream = ConstBitStream(encoded_int[0]) + + decoded_int = codec.decode(bit_stream) + + self.assertAlmostEqual(decoded_int, input_array) + + def test_ros_message_codec(self) -> None: + """test functionality of the ros message codec""" + + msg = String() + msg.data = "test" + + codec = ltcodecs.RosMessageCodec("std_msgs/String") + encoded_msg = codec.encode(msg) + + bit_stream = ConstBitStream(encoded_msg[0]) + + decoded_msg = codec.decode(bit_stream) + + self.assertAlmostEqual(decoded_msg, msg) + + def test_ros_message_field_codec(self) -> None: + """test functionality of the ros message field codec""" + + msg = String() + msg.data = "test" + + codec = ltcodecs.RosMsgFieldCodec("std_msgs/String") + encoded_msg = codec.encode(msg) + + bit_stream = ConstBitStream(encoded_msg[0]) + + decoded_msg = codec.decode(bit_stream) + + self.assertAlmostEqual(decoded_msg, msg) + + def test_ros_time_codec(self) -> None: + """test functionality of the ros time field codec""" + + input_time = rospy.Time.from_sec(time.time()) + + codec = ltcodecs.rostime_codec.RosTimeCodec(precision=20) + encoded_time = codec.encode(input_time) + + bit_stream = ConstBitStream(encoded_time[0]) + + decoded_time = codec.decode(bit_stream) + + self.assertEqual(decoded_time, input_time) + + def test_string_enum_codec(self) -> None: + """test functionality of the StringEnum field codec""" + + test_string = "test2" + codec = ltcodecs.string_enum_codec.StringEnumCodec( + entries=["Test1", "test2", "other strings here"] + ) + encoded = codec.encode("test2") + bit_stream = ConstBitStream(encoded[0]) + decoded = codec.decode(bit_stream) + + self.assertEqual(test_string, decoded) + + +if __name__ == "__main__": + rospy.init_node("temp_node") + + unittest.main() -- GitLab From 50fbfc9a18f3c6f9b02a189c714ddc293d50eb0e Mon Sep 17 00:00:00 2001 From: Brennan <brennanmk2200@gmail.com> Date: Mon, 5 Jun 2023 17:25:38 -0400 Subject: [PATCH 3/3] Added LZMA encoder --- src/ltcodecs/__init__.py | 2 ++ src/ltcodecs/lzma_codec.py | 56 ++++++++++++++++++++++++++++++++++++++ test_codecs.py | 14 ++++++++++ 3 files changed, 72 insertions(+) create mode 100644 src/ltcodecs/lzma_codec.py diff --git a/src/ltcodecs/__init__.py b/src/ltcodecs/__init__.py index f750c2a..5715356 100644 --- a/src/ltcodecs/__init__.py +++ b/src/ltcodecs/__init__.py @@ -17,6 +17,7 @@ from .ccl_latlon_bcd_codec import CclLatLonBcdCodec from .padding_codec import PaddingCodec from .rostime_codec import RosTimeCodec from .ros_message_codec import RosMessageCodec +from .lzma_codec import LzmaCodec from .exceptions import EncodingFailed field_codec_classes = { @@ -51,6 +52,7 @@ field_codec_classes = { "time": RosTimeCodec, "rostime": RosTimeCodec, "string_enum": StringEnumCodec, + "lzma": LzmaCodec, } metadata_decoders = { diff --git a/src/ltcodecs/lzma_codec.py b/src/ltcodecs/lzma_codec.py new file mode 100644 index 0000000..4363209 --- /dev/null +++ b/src/ltcodecs/lzma_codec.py @@ -0,0 +1,56 @@ +""" +ltcodecs.LzmaCodec +--------------------- + +This module contains the LzmaCodec class, which is used to encode and decode bytes usinng LZMA. +""" + +from __future__ import annotations +import lzma +from bitstring import ConstBitStream, Bits +from .field_codec import FieldCodec +from .exceptions import EncodingFailed + + +class LzmaCodec(FieldCodec): + """ + codec for bools + """ + + def __init__(self, **kwargs) -> None: + pass + + def encode(self, value: bytes) -> tuple[Bits, bytes]: + """ + encode bytes + + :param value: the bool to encode + """ + try: + compressed = lzma.compress(value) + except lzma.LZMAError as err: + raise EncodingFailed("LZMA encoding failed") from err + + value_bits = Bits(bytes=compressed) + return value_bits, value + + def decode(self, bits_to_decode: ConstBitStream) -> bytes: + """ + decode a bool + + :param bits_to_decode: the bits to decode + """ + try: + value = lzma.decompress(bits_to_decode.read("bytes")) + except lzma.LZMAError as err: + raise EncodingFailed("LZMA decoding failed") from err + + return value + + @property + def max_length_bits(self) -> None: + pass + + @property + def min_length_bits(self) -> None: + pass diff --git a/test_codecs.py b/test_codecs.py index e7bdde6..6916402 100755 --- a/test_codecs.py +++ b/test_codecs.py @@ -57,6 +57,20 @@ class CodecTest(unittest.TestCase): decoded_bytes = codec.decode(bit_stream).decode() self.assertEqual(decoded_bytes, input_str) + def test_lzma_codec(self) -> None: + """test functionality of the LZMA codec""" + + input_str = "This is a test string" + input_bytes = input_str.encode() # encode entryString to bytes + + codec = ltcodecs.lzma_codec.LzmaCodec() + encoded_byes = codec.encode(input_bytes) + + bit_stream = ConstBitStream(encoded_byes[0]) + + decoded_bytes = codec.decode(bit_stream).decode() + self.assertEqual(decoded_bytes, input_str) + def test_ccl_latlon_bcd_codec(self) -> None: """test functionality of the CclLatLonBcd codec""" -- GitLab