1 """High-level Object-oriented API for the different types of packets
2 (FixedLength and VariableLength) supported by the package.
11 from .converters
import Converter
12 from .decode
import _decode_fixed_length, _decode_variable_length
13 from .packet_fields
import PacketField, PacketArray
16 __author__ =
"Daniel da Silva <mail@danieldasilva.org>"
20 """Base class of FixedLength and VariableLength. Not to be instantiated
24 def _init(self, fields):
28 fields : list of `ccsdspy.PacketField`
29 Layout of packet fields contained in the definition.
43 Path to file on the local file system that defines the packet fields.
44 Currently only supports csv files.
45 See :download:`simple_csv_3col.csv <../../ccsdspy/tests/data/packet_def/simple_csv_3col.csv>` # noqa: E501
46 and :download:`simple_csv_4col.csv <../../ccsdspy/tests/data/packet_def/simple_csv_4col.csv>` # noqa: E501
50 An instance of FixedLength.
52 file_extension = os.path.splitext(file)
53 if file_extension[1] ==
".csv":
54 fields = _get_fields_csv_file(file)
56 raise ValueError(f
"File type {file_extension[1]} not supported.")
61 """Add a converted field to the packet definition, used to apply
62 post-processing transformations of decoded fields.
66 input_field_name : str or list/tuple
67 Name of input field, or list/tuple of names of fields. There must be field(s)
68 which exists in the packet definition corresponding to these name(s).
69 output_field_name : str
70 Name of output field. When the packet is decoded using `pkt.load()`,
71 a new field named this will be present in the output dictionary.
72 converter : instance of subclass of `:py:class:~ccsdspy.converters.Converter`
73 A converter object to apply post-processing conversions, such as
74 calibration curves or value replacement. Converter objects
75 can be found in`:py:mod:~ccsdspy.converters`.
80 If one of the arguments is not of the correct type.
82 The provided `input_field_name` is not present in the packet definition
84 if not isinstance(output_field_name, str):
85 raise TypeError(
"output_field_name must be a str")
86 if not isinstance(converter, Converter):
87 raise TypeError(
"converter must be an instance of a Converter subclass")
91 if isinstance(input_field_name, str):
92 input_field_names = (input_field_name,)
93 elif isinstance(input_field_name, (list, tuple)):
94 input_field_names = tuple(input_field_name)
96 raise TypeError(
"input_field_name must be either str, list, or tuple")
102 fields_in_packet_set = {field._name
for field
in self.
_fields}
103 input_field_names_set =
set(input_field_names)
104 all_fields_present = input_field_names_set <= fields_in_packet_set
106 if not all_fields_present:
107 missing_fields = input_field_names_set - fields_in_packet_set
109 "Some fields specified as inputs to converters were missing: "
110 f
"{sorted(missing_fields)}"
113 self.
_converters[input_field_names] = (output_field_name, converter)
117 """Define a fixed length packet to decode binary data.
119 Fixed length packets correspond to packets that are the same length and
120 layout every time. A common example of this is housekeeping or status
128 fields : list of :py:class:`~ccsdspy.PacketField` or :py:class:`~ccsdspy.PacketArray`
129 Layout of packet fields contained in the definition.
134 one or more of the arguments are invalid
136 if any(isinstance(field._array_shape, str)
for field
in fields):
138 "The FixedLength class does not support variable fields. "
139 "Instead, use the VariableLength class."
144 def load(self, file, include_primary_header=False):
145 """Decode a file-like object containing a sequence of these packets.
150 Path to file on the local file system, or file-like object
151 include_primary_header : bool
152 If True, provides the primary header in the output
156 field_arrays : dict, string to NumPy array
157 dictionary mapping field names to NumPy arrays, with key order matching
158 the order of fields in the packet.
163 If the ccsds sequence count is not in order
165 If the ccsds sequence count is missing packets
167 If there are more than one APID
169 packet_arrays = _load(
174 include_primary_header=
True,
178 _inspect_primary_header_fields(packet_arrays)
180 if not include_primary_header:
181 _delete_primary_header_fields(packet_arrays)
187 """Define a variable length packet to decode binary data.
189 Variable length packets are packets which have a different length each
190 time. Variable length fields are defined as `~ccsdspy.PacketArray` fields
191 where `array_shape="expand"` (causing the field to grow to fill the packet) or
192 `array_shape="other_field"` (causes the field named `other_field` to set the number
193 of elements in this array).
195 Please note that while this class is able to parse fixed length packets, it
196 is much slower. Use the :py:class:`~ccsdspy.FixedLength` class instead.
198 Rules for variable length packets:
199 * Do only specify a `~ccsdspy.PacketArray` with the `array_shape="other_field"`
200 when `other_field` precedes it in the packet definition
201 * Do not provide more than one expanding `~ccsdspy.PacketArray` with `array_shape="expand"`
202 * Do not specify the primary header fields manually
203 * Do not specify explicit bit_offsets (they will be computed automatically)
210 fields : list of :py:class:`~ccsdspy.PacketField` or :py:class:`~ccsdspy.PacketArray`
211 Layout of packet fields contained in the definition. No more than
212 one field should have array_shape="expand". The field must have no
213 bit_offset's. Do not include the primary header fields.
218 one or more of the arguments are invalid, or do not follow the
225 if isinstance(field, PacketArray)
and field._array_shape ==
"expand"
228 if len(expand_arrays) > 1:
230 "The VariableLength class does not support more than one field "
231 "with array_shape='expand', as the decoding process becomes "
237 field_names = [field._name
for field
in fields]
239 for i, field
in enumerate(fields):
241 isinstance(field, PacketArray)
242 and isinstance(field._array_shape, str)
243 and field._array_shape !=
"expand"
244 and field._array_shape
not in field_names[:i]
247 "The VariableLength class requires that variable fields with "
248 "their sizes set by other fields only do so when the "
249 "previous field precedes it."
253 if not all(field._bit_offset
is None for field
in fields):
255 "The VariableLength class does not support explicit bit "
256 "offsets. You must specify the entire packet so they can be "
257 "determined automatically."
262 def load(self, file, include_primary_header=False):
263 """Decode a file-like object containing a sequence of these packets.
268 Path to file on the local file system, or file-like object
269 include_primary_header : bool
270 If True, provides the primary header in the output
274 field_arrays : dict, string to NumPy array
275 dictionary mapping field names to NumPy arrays, with key order matching
276 the order of fields in the packet.
281 If the ccsds sequence count is not in order
283 If the ccsds sequence count is missing packets
285 If there are more than one APID
290 packet_arrays = _load(
295 _inspect_primary_header_fields(packet_arrays)
297 if not include_primary_header:
298 _delete_primary_header_fields(packet_arrays)
303 def _inspect_primary_header_fields(packet_arrays):
304 """Inspects the primary header fields.
306 Checks for the following issues
307 * all apids are the same
308 * sequence count is not missing any values
309 * sequence count is in order
314 dictionary mapping field names to NumPy arrays, with key order matching
315 the order fields in the packet. Modified in place
317 seq_counts = packet_arrays[
"CCSDS_SEQUENCE_COUNT"]
318 start, end = seq_counts[0], seq_counts[-1]
319 missing_elements = sorted(
set(
range(start, end + 1)).difference(seq_counts))
320 if len(missing_elements) != 0:
321 warnings.warn(f
"Missing packets found {missing_elements}.", UserWarning)
323 if not np.all(seq_counts == np.sort(seq_counts)):
324 warnings.warn(
"Sequence count are out of order.", UserWarning)
326 individual_ap_ids =
set(packet_arrays[
"CCSDS_APID"])
327 if len(individual_ap_ids) != 1:
328 warnings.warn(f
"Found multiple AP IDs {individual_ap_ids}.", UserWarning)
333 def _delete_primary_header_fields(packet_arrays):
334 """Modifies in place the packet arrays dictionary to delete primary
340 dictionary mapping field names to NumPy arrays, with key order matching
341 the order fields in the packet. Modified in place
343 header_fields = _prepend_primary_header_fields([])
345 for header_field
in header_fields:
346 del packet_arrays[header_field._name]
349 def _expand_array_fields(existing_fields):
350 """Expand arrays into multiple fields, one for each element.
352 Returns a new list of fields as well as a data structure which can be used
353 to reverse this process. See the `_unexpand_field_arrays()` function to reverse
358 existing_fields : list of `ccsdspy.PacketField`
359 Layout of packet fields contained in the definition, with PacketArray
363 return_fields : list of `ccsdspy.PacketField`
364 Layout of packet fields contained in the definition, without PacketArray's
365 expand_history : dict
366 Dictionary mapping array name with shape/data-type and field expansions
371 for existing_field
in existing_fields:
372 if existing_field._field_type !=
"array" or isinstance(existing_field._array_shape, str):
373 return_fields.append(existing_field)
376 array_shape = existing_field._array_shape
377 array_order = existing_field._array_order
379 index_vecs = [np.arange(dim)
for dim
in array_shape]
380 index_grids = np.meshgrid(*index_vecs, indexing=
"ij")
381 indices_flat = [index_grid.flatten(order=array_order)
for index_grid
in index_grids]
383 expand_history[existing_field._name] = {
384 "shape": array_shape,
385 "data_type": existing_field._data_type,
389 for i, indices
in enumerate(zip(*indices_flat)):
390 name = f
"{existing_field._name}[{','.join(map(str,indices))}]"
391 if existing_field._bit_offset
is None:
394 bit_offset = existing_field._bit_offset + i * existing_field._bit_length
398 data_type=existing_field._data_type,
399 bit_length=existing_field._bit_length,
400 bit_offset=bit_offset,
401 byte_order=existing_field._byte_order,
404 expand_history[existing_field._name][
"fields"][name] = indices
405 return_fields.append(return_field)
407 return return_fields, expand_history
410 def _unexpand_field_arrays(field_arrays, expand_history):
411 """Reverse the array expansion process from `_expand_array_fields`.
415 field_arrays : dict, str to numpy array
416 Dictionary mapping field names to NumPy arrays, with key order matching
417 the order fields in the packet. Has a key for each array element.
418 expand_history : dict
419 Dictionary mapping array name with shape/data-type and field expansions
423 return_field_arrays : dict, str to array
424 Dictionary mapping field names to NumPy arrays, with key order matching
425 the order fields in the packet. Has keys mapping to full arrays.
427 npackets =
list(field_arrays.values())[0].shape[0]
428 return_field_arrays = field_arrays.copy()
430 for array_name, array_details
in expand_history.items():
431 array_shape = (npackets,) + array_details[
"shape"]
432 array_dtype = field_arrays[
list(array_details[
"fields"].keys())[0]].dtype
433 array = np.zeros(array_shape, dtype=array_dtype)
435 for element_name, indices
in array_details[
"fields"].items():
436 array.__setitem__((slice(
None),) + indices, field_arrays[element_name])
437 del return_field_arrays[element_name]
439 return_field_arrays[array_name] = array
441 return return_field_arrays
444 def _prepend_primary_header_fields(existing_fields):
445 """Helper function that prepends primary header fields to a list of packet
446 fields, to support load(include_primary_header=True)
450 existing_fields : list of `ccsdspy.PacketField`
451 Non-primary header fields defined by the packet.
455 New list of fields with the primary header fields prepended.
459 name=
"CCSDS_VERSION_NUMBER",
465 name=
"CCSDS_PACKET_TYPE",
471 name=
"CCSDS_SECONDARY_FLAG",
476 PacketField(name=
"CCSDS_APID", data_type=
"uint", bit_length=11, bit_offset=5),
478 name=
"CCSDS_SEQUENCE_FLAG",
484 name=
"CCSDS_SEQUENCE_COUNT",
490 name=
"CCSDS_PACKET_LENGTH",
497 return_fields.extend(existing_fields)
502 def _get_fields_csv_file(csv_file):
503 """Parse a simple comma-delimited file that defines a packet.
505 Should not include the CCSDS header. The minimum set of columns are (name,
506 data_type, bit_length). An optional bit_offset can also be provided.
511 Path to file on the local file system
516 A list of `PacketField` objects.
518 req_columns = [
"name",
"data_type",
"bit_length"]
520 with open(csv_file,
"r")
as fp:
522 reader = csv.DictReader(fp, skipinitialspace=
True)
523 headers = reader.fieldnames
526 raise RuntimeError(
"CSV file must not be empty")
528 if not all((req_col
in headers)
for req_col
in req_columns):
529 raise ValueError(f
"Minimum required columns are {req_columns}.")
532 if "bit_offset" not in headers:
533 if (row[
"data_type"].
count(
"(") == 1)
and (row[
"data_type"].
count(
")") == 1):
534 data_type = row[
"data_type"].split(
"(")[0]
535 array_shape_str = row[
"data_type"][
536 row[
"data_type"].find(
"(") + 1 : row[
"data_type"].find(
")")
538 array_shape = tuple(map(int, array_shape_str.split(
", ")))
543 bit_length=
int(row[
"bit_length"]),
544 array_shape=(array_shape),
551 data_type=row[
"data_type"],
552 bit_length=
int(row[
"bit_length"]),
555 if "bit_offset" in headers:
557 if (row[
"data_type"].
count(
"(") == 1)
and (row[
"data_type"].
count(
")") == 1):
558 data_type = row[
"data_type"].split(
"(")[0]
559 array_shape_str = row[
"data_type"][
560 row[
"data_type"].find(
"(") + 1 : row[
"data_type"].find(
")")
562 array_shape = tuple(map(int, array_shape_str.split(
", ")))
567 bit_length=
int(row[
"bit_length"]),
568 bit_offset=
int(row[
"bit_offset"]),
569 array_shape=array_shape,
576 data_type=row[
"data_type"],
577 bit_length=
int(row[
"bit_length"]),
578 bit_offset=
int(row[
"bit_offset"]),
585 def _load(file, fields, converters, decoder_name, include_primary_header=False):
586 """Decode a file-like object containing a sequence of these packets.
591 Path to file on the local file system, or file-like object
592 fields : list of `ccsdspy.PacketField`
593 Layout of packet fields contained in the definition.
594 converters : dict, str to tuple (str, Converter)
595 Dictionary of post-processing conversions. keys are input field names,
596 values are tuples of (output_field_name, Converter instance)
597 decoder_name: {'fixed_length', 'variable_length'}
598 String identifying which decoder to use.
599 include_primary_header: bool
600 If True, provides the primary header in the output
604 dictionary mapping field names to NumPy arrays, with key order matching
605 the order fields in the packet.
610 the decoder_name is not one of the allowed values
612 if hasattr(file,
"read"):
613 file_bytes = np.frombuffer(file.read(),
"u1")
615 file_bytes = np.fromfile(file,
"u1")
617 if include_primary_header:
618 fields = _prepend_primary_header_fields(fields)
620 fields, expand_history = _expand_array_fields(fields)
622 if decoder_name ==
"fixed_length":
623 field_arrays = _decode_fixed_length(file_bytes, fields)
624 elif decoder_name ==
"variable_length":
625 field_arrays = _decode_variable_length(file_bytes, fields)
628 f
"Invalid decoder_name 'f{decoder_name}' specified. Must be "
629 "either 'fixed_length', or 'variable_length'"
632 field_arrays = _unexpand_field_arrays(field_arrays, expand_history)
633 field_arrays = _apply_converters(field_arrays, converters)
638 def _apply_converters(field_arrays, converters):
639 """Apply post-processing converters in place to a dictionary of field
644 field_arrays : dict of string to NumPy arrays
645 The decoded packet field arrays without any post-processing applied
646 converters : dict, str to tuple (str, Converter)
647 Dictionary of post-processing conversions. keys are input field names,
648 values are tuples of (output_field_name, Converter instance)
652 converted_field_arrays : dict of string to NumPy arrays
653 The converted decoded packet field arrays, as a dictionary with the same
654 key as the passed `field_arrays`.
656 converted = field_arrays.copy()
658 for input_field_names, (output_field_name, converter)
in converters.items():
662 for input_field_name
in input_field_names:
663 input_arrays.append(field_arrays[input_field_name])
666 converted[output_field_name] = converter.convert(*input_arrays)