NASA Logo
Ocean Color Science Software

ocssw V2022
packet_types.py
Go to the documentation of this file.
1 """High-level Object-oriented API for the different types of packets
2 (FixedLength and VariableLength) supported by the package.
3 """
4 
5 import csv
6 import os
7 import warnings
8 
9 import numpy as np
10 
11 from .converters import Converter
12 from .decode import _decode_fixed_length, _decode_variable_length
13 from .packet_fields import PacketField, PacketArray
14 
15 
16 __author__ = "Daniel da Silva <mail@danieldasilva.org>"
17 
18 
20  """Base class of FixedLength and VariableLength. Not to be instantiated
21  directly.
22  """
23 
24  def _init(self, fields):
25  """
26  Parameters
27  ----------
28  fields : list of `ccsdspy.PacketField`
29  Layout of packet fields contained in the definition.
30  """
31  # List of PacketField instances
32  self._fields = fields[:]
33 
34  # Dictionary mapping input name to tuple (output_name: str, Converter instance)
35  self._converters = {}
36 
37  @classmethod
38  def from_file(cls, file):
39  """
40  Parameters
41  ----------
42  file : str
43  Path to file on the local file system that defines the packet fields.
44  Currently only supports csv files.
45  See :download:`simple_csv_3col.csv <../../ccsdspy/tests/data/packet_def/simple_csv_3col.csv>` # noqa: E501
46  and :download:`simple_csv_4col.csv <../../ccsdspy/tests/data/packet_def/simple_csv_4col.csv>` # noqa: E501
47 
48  Returns
49  -------
50  An instance of FixedLength.
51  """
52  file_extension = os.path.splitext(file)
53  if file_extension[1] == ".csv":
54  fields = _get_fields_csv_file(file)
55  else:
56  raise ValueError(f"File type {file_extension[1]} not supported.")
57 
58  return cls(fields)
59 
60  def add_converted_field(self, input_field_name, output_field_name, converter):
61  """Add a converted field to the packet definition, used to apply
62  post-processing transformations of decoded fields.
63 
64  Parameters
65  ----------
66  input_field_name : str or list/tuple
67  Name of input field, or list/tuple of names of fields. There must be field(s)
68  which exists in the packet definition corresponding to these name(s).
69  output_field_name : str
70  Name of output field. When the packet is decoded using `pkt.load()`,
71  a new field named this will be present in the output dictionary.
72  converter : instance of subclass of `:py:class:~ccsdspy.converters.Converter`
73  A converter object to apply post-processing conversions, such as
74  calibration curves or value replacement. Converter objects
75  can be found in`:py:mod:~ccsdspy.converters`.
76 
77  Raises
78  ------
79  TypeError
80  If one of the arguments is not of the correct type.
81  ValueError
82  The provided `input_field_name` is not present in the packet definition
83  """
84  if not isinstance(output_field_name, str):
85  raise TypeError("output_field_name must be a str")
86  if not isinstance(converter, Converter):
87  raise TypeError("converter must be an instance of a Converter subclass")
88 
89  # Get tuple of input field names for storing; this handles the input_field_name
90  # argument being either a str, or list/tuple
91  if isinstance(input_field_name, str):
92  input_field_names = (input_field_name,)
93  elif isinstance(input_field_name, (list, tuple)):
94  input_field_names = tuple(input_field_name)
95  else:
96  raise TypeError("input_field_name must be either str, list, or tuple")
97 
98  del input_field_name # don't use the variable again in this function
99 
100  # Check that each of the input field names exists in the packet, and report
101  # the missing fields if not
102  fields_in_packet_set = {field._name for field in self._fields}
103  input_field_names_set = set(input_field_names)
104  all_fields_present = input_field_names_set <= fields_in_packet_set # subset
105 
106  if not all_fields_present:
107  missing_fields = input_field_names_set - fields_in_packet_set # set op A \ B
108  raise ValueError(
109  "Some fields specified as inputs to converters were missing: "
110  f"{sorted(missing_fields)}"
111  )
112 
113  self._converters[input_field_names] = (output_field_name, converter)
114 
115 
117  """Define a fixed length packet to decode binary data.
118 
119  Fixed length packets correspond to packets that are the same length and
120  layout every time. A common example of this is housekeeping or status
121  messages.
122  """
123 
124  def __init__(self, fields):
125  """
126  Parameters
127  ----------
128  fields : list of :py:class:`~ccsdspy.PacketField` or :py:class:`~ccsdspy.PacketArray`
129  Layout of packet fields contained in the definition.
130 
131  Raises
132  ------
133  ValueError
134  one or more of the arguments are invalid
135  """
136  if any(isinstance(field._array_shape, str) for field in fields):
137  raise ValueError(
138  "The FixedLength class does not support variable fields. "
139  "Instead, use the VariableLength class."
140  )
141 
142  self._init(fields)
143 
144  def load(self, file, include_primary_header=False):
145  """Decode a file-like object containing a sequence of these packets.
146 
147  Parameters
148  ----------
149  file : str
150  Path to file on the local file system, or file-like object
151  include_primary_header : bool
152  If True, provides the primary header in the output
153 
154  Returns
155  -------
156  field_arrays : dict, string to NumPy array
157  dictionary mapping field names to NumPy arrays, with key order matching
158  the order of fields in the packet.
159 
160  Warns
161  -----
162  UserWarning
163  If the ccsds sequence count is not in order
164  UserWarning
165  If the ccsds sequence count is missing packets
166  UserWarning
167  If there are more than one APID
168  """
169  packet_arrays = _load(
170  file,
171  self._fields,
172  self._converters,
173  "fixed_length",
174  include_primary_header=True,
175  )
176 
177  # inspect the primary header and issue warning if appropriate
178  _inspect_primary_header_fields(packet_arrays)
179 
180  if not include_primary_header:
181  _delete_primary_header_fields(packet_arrays)
182 
183  return packet_arrays
184 
185 
187  """Define a variable length packet to decode binary data.
188 
189  Variable length packets are packets which have a different length each
190  time. Variable length fields are defined as `~ccsdspy.PacketArray` fields
191  where `array_shape="expand"` (causing the field to grow to fill the packet) or
192  `array_shape="other_field"` (causes the field named `other_field` to set the number
193  of elements in this array).
194 
195  Please note that while this class is able to parse fixed length packets, it
196  is much slower. Use the :py:class:`~ccsdspy.FixedLength` class instead.
197 
198  Rules for variable length packets:
199  * Do only specify a `~ccsdspy.PacketArray` with the `array_shape="other_field"`
200  when `other_field` precedes it in the packet definition
201  * Do not provide more than one expanding `~ccsdspy.PacketArray` with `array_shape="expand"`
202  * Do not specify the primary header fields manually
203  * Do not specify explicit bit_offsets (they will be computed automatically)
204  """
205 
206  def __init__(self, fields):
207  """
208  Parameters
209  ----------
210  fields : list of :py:class:`~ccsdspy.PacketField` or :py:class:`~ccsdspy.PacketArray`
211  Layout of packet fields contained in the definition. No more than
212  one field should have array_shape="expand". The field must have no
213  bit_offset's. Do not include the primary header fields.
214 
215  Raises
216  ------
217  ValueError
218  one or more of the arguments are invalid, or do not follow the
219  specified rules.
220  """
221  # Check there is only one expanding field in the packet definition
222  expand_arrays = [
223  field
224  for field in fields
225  if isinstance(field, PacketArray) and field._array_shape == "expand"
226  ]
227 
228  if len(expand_arrays) > 1:
229  raise ValueError(
230  "The VariableLength class does not support more than one field "
231  "with array_shape='expand', as the decoding process becomes "
232  "ambiguous."
233  )
234 
235  # Check variable fields with their sizes set by other fields only do so when
236  # the previous field precedes it
237  field_names = [field._name for field in fields]
238 
239  for i, field in enumerate(fields):
240  if (
241  isinstance(field, PacketArray)
242  and isinstance(field._array_shape, str)
243  and field._array_shape != "expand"
244  and field._array_shape not in field_names[:i]
245  ):
246  raise ValueError(
247  "The VariableLength class requires that variable fields with "
248  "their sizes set by other fields only do so when the "
249  "previous field precedes it."
250  )
251 
252  # Check that bit offsets are not set
253  if not all(field._bit_offset is None for field in fields):
254  raise ValueError(
255  "The VariableLength class does not support explicit bit "
256  "offsets. You must specify the entire packet so they can be "
257  "determined automatically."
258  )
259 
260  self._init(fields)
261 
262  def load(self, file, include_primary_header=False):
263  """Decode a file-like object containing a sequence of these packets.
264 
265  Parameters
266  ----------
267  file : str
268  Path to file on the local file system, or file-like object
269  include_primary_header : bool
270  If True, provides the primary header in the output
271 
272  Returns
273  -------
274  field_arrays : dict, string to NumPy array
275  dictionary mapping field names to NumPy arrays, with key order matching
276  the order of fields in the packet.
277 
278  Warns
279  -----
280  UserWarning
281  If the ccsds sequence count is not in order
282  UserWarning
283  If the ccsds sequence count is missing packets
284  UserWarning
285  If there are more than one APID
286  """
287  # The variable length decoder requires the full packet definition, so if
288  # they didn't want the primary header fields, we parse for them and then
289  # remove them after.
290  packet_arrays = _load(
291  file, self._fields, self._converters, "variable_length", include_primary_header=True
292  )
293 
294  # inspect the primary header and issue warning if appropriate
295  _inspect_primary_header_fields(packet_arrays)
296 
297  if not include_primary_header:
298  _delete_primary_header_fields(packet_arrays)
299 
300  return packet_arrays
301 
302 
303 def _inspect_primary_header_fields(packet_arrays):
304  """Inspects the primary header fields.
305 
306  Checks for the following issues
307  * all apids are the same
308  * sequence count is not missing any values
309  * sequence count is in order
310 
311  Parameters
312  -----------
313  packet_arrays
314  dictionary mapping field names to NumPy arrays, with key order matching
315  the order fields in the packet. Modified in place
316  """
317  seq_counts = packet_arrays["CCSDS_SEQUENCE_COUNT"]
318  start, end = seq_counts[0], seq_counts[-1]
319  missing_elements = sorted(set(range(start, end + 1)).difference(seq_counts))
320  if len(missing_elements) != 0:
321  warnings.warn(f"Missing packets found {missing_elements}.", UserWarning)
322 
323  if not np.all(seq_counts == np.sort(seq_counts)):
324  warnings.warn("Sequence count are out of order.", UserWarning)
325 
326  individual_ap_ids = set(packet_arrays["CCSDS_APID"])
327  if len(individual_ap_ids) != 1:
328  warnings.warn(f"Found multiple AP IDs {individual_ap_ids}.", UserWarning)
329 
330  return None
331 
332 
333 def _delete_primary_header_fields(packet_arrays):
334  """Modifies in place the packet arrays dictionary to delete primary
335  header fields.
336 
337  Parameters
338  -----------
339  packet_arrays
340  dictionary mapping field names to NumPy arrays, with key order matching
341  the order fields in the packet. Modified in place
342  """
343  header_fields = _prepend_primary_header_fields([])
344 
345  for header_field in header_fields:
346  del packet_arrays[header_field._name]
347 
348 
349 def _expand_array_fields(existing_fields):
350  """Expand arrays into multiple fields, one for each element.
351 
352  Returns a new list of fields as well as a data structure which can be used
353  to reverse this process. See the `_unexpand_field_arrays()` function to reverse
354  this process.
355 
356  Parameters
357  ----------
358  existing_fields : list of `ccsdspy.PacketField`
359  Layout of packet fields contained in the definition, with PacketArray
360 
361  Returns
362  -------
363  return_fields : list of `ccsdspy.PacketField`
364  Layout of packet fields contained in the definition, without PacketArray's
365  expand_history : dict
366  Dictionary mapping array name with shape/data-type and field expansions
367  """
368  return_fields = []
369  expand_history = {}
370 
371  for existing_field in existing_fields:
372  if existing_field._field_type != "array" or isinstance(existing_field._array_shape, str):
373  return_fields.append(existing_field)
374  continue
375 
376  array_shape = existing_field._array_shape
377  array_order = existing_field._array_order
378 
379  index_vecs = [np.arange(dim) for dim in array_shape]
380  index_grids = np.meshgrid(*index_vecs, indexing="ij")
381  indices_flat = [index_grid.flatten(order=array_order) for index_grid in index_grids]
382 
383  expand_history[existing_field._name] = {
384  "shape": array_shape,
385  "data_type": existing_field._data_type,
386  "fields": {},
387  }
388 
389  for i, indices in enumerate(zip(*indices_flat)):
390  name = f"{existing_field._name}[{','.join(map(str,indices))}]"
391  if existing_field._bit_offset is None:
392  bit_offset = None
393  else:
394  bit_offset = existing_field._bit_offset + i * existing_field._bit_length
395 
396  return_field = PacketField(
397  name=name,
398  data_type=existing_field._data_type,
399  bit_length=existing_field._bit_length,
400  bit_offset=bit_offset,
401  byte_order=existing_field._byte_order,
402  )
403 
404  expand_history[existing_field._name]["fields"][name] = indices
405  return_fields.append(return_field)
406 
407  return return_fields, expand_history
408 
409 
410 def _unexpand_field_arrays(field_arrays, expand_history):
411  """Reverse the array expansion process from `_expand_array_fields`.
412 
413  Parameters
414  ----------
415  field_arrays : dict, str to numpy array
416  Dictionary mapping field names to NumPy arrays, with key order matching
417  the order fields in the packet. Has a key for each array element.
418  expand_history : dict
419  Dictionary mapping array name with shape/data-type and field expansions
420 
421  Returns
422  -------
423  return_field_arrays : dict, str to array
424  Dictionary mapping field names to NumPy arrays, with key order matching
425  the order fields in the packet. Has keys mapping to full arrays.
426  """
427  npackets = list(field_arrays.values())[0].shape[0]
428  return_field_arrays = field_arrays.copy()
429 
430  for array_name, array_details in expand_history.items():
431  array_shape = (npackets,) + array_details["shape"]
432  array_dtype = field_arrays[list(array_details["fields"].keys())[0]].dtype
433  array = np.zeros(array_shape, dtype=array_dtype)
434 
435  for element_name, indices in array_details["fields"].items():
436  array.__setitem__((slice(None),) + indices, field_arrays[element_name])
437  del return_field_arrays[element_name]
438 
439  return_field_arrays[array_name] = array
440 
441  return return_field_arrays
442 
443 
444 def _prepend_primary_header_fields(existing_fields):
445  """Helper function that prepends primary header fields to a list of packet
446  fields, to support load(include_primary_header=True)
447 
448  Parameters
449  ----------
450  existing_fields : list of `ccsdspy.PacketField`
451  Non-primary header fields defined by the packet.
452 
453  Returns
454  -------
455  New list of fields with the primary header fields prepended.
456  """
457  return_fields = [
458  PacketField(
459  name="CCSDS_VERSION_NUMBER",
460  data_type="uint",
461  bit_length=3,
462  bit_offset=0,
463  ),
464  PacketField(
465  name="CCSDS_PACKET_TYPE",
466  data_type="uint",
467  bit_length=1,
468  bit_offset=3,
469  ),
470  PacketField(
471  name="CCSDS_SECONDARY_FLAG",
472  data_type="uint",
473  bit_length=1,
474  bit_offset=4,
475  ),
476  PacketField(name="CCSDS_APID", data_type="uint", bit_length=11, bit_offset=5),
477  PacketField(
478  name="CCSDS_SEQUENCE_FLAG",
479  data_type="uint",
480  bit_length=2,
481  bit_offset=16,
482  ),
483  PacketField(
484  name="CCSDS_SEQUENCE_COUNT",
485  data_type="uint",
486  bit_length=14,
487  bit_offset=18,
488  ),
489  PacketField(
490  name="CCSDS_PACKET_LENGTH",
491  data_type="uint",
492  bit_length=16,
493  bit_offset=32,
494  ),
495  ]
496 
497  return_fields.extend(existing_fields)
498 
499  return return_fields
500 
501 
502 def _get_fields_csv_file(csv_file):
503  """Parse a simple comma-delimited file that defines a packet.
504 
505  Should not include the CCSDS header. The minimum set of columns are (name,
506  data_type, bit_length). An optional bit_offset can also be provided.
507 
508  Parameters
509  ----------
510  csv_file : str
511  Path to file on the local file system
512 
513  Returns
514  -------
515  fields : list
516  A list of `PacketField` objects.
517  """
518  req_columns = ["name", "data_type", "bit_length"]
519 
520  with open(csv_file, "r") as fp:
521  fields = []
522  reader = csv.DictReader(fp, skipinitialspace=True)
523  headers = reader.fieldnames
524 
525  if headers is None:
526  raise RuntimeError("CSV file must not be empty")
527 
528  if not all((req_col in headers) for req_col in req_columns):
529  raise ValueError(f"Minimum required columns are {req_columns}.")
530 
531  for row in reader: # skip the header row
532  if "bit_offset" not in headers: # 3 col csv file
533  if (row["data_type"].count("(") == 1) and (row["data_type"].count(")") == 1):
534  data_type = row["data_type"].split("(")[0]
535  array_shape_str = row["data_type"][
536  row["data_type"].find("(") + 1 : row["data_type"].find(")")
537  ]
538  array_shape = tuple(map(int, array_shape_str.split(", ")))
539  fields.append(
540  PacketArray(
541  name=row["name"],
542  data_type=data_type,
543  bit_length=int(row["bit_length"]),
544  array_shape=(array_shape),
545  )
546  )
547  else:
548  fields.append(
549  PacketField(
550  name=row["name"],
551  data_type=row["data_type"],
552  bit_length=int(row["bit_length"]),
553  )
554  )
555  if "bit_offset" in headers: # 4 col csv file provides bit offsets
556  # TODO: Check the consistency of bit_offsets versus previous bit_lengths
557  if (row["data_type"].count("(") == 1) and (row["data_type"].count(")") == 1):
558  data_type = row["data_type"].split("(")[0]
559  array_shape_str = row["data_type"][
560  row["data_type"].find("(") + 1 : row["data_type"].find(")")
561  ]
562  array_shape = tuple(map(int, array_shape_str.split(", ")))
563  fields.append(
564  PacketArray(
565  name=row["name"],
566  data_type=data_type,
567  bit_length=int(row["bit_length"]),
568  bit_offset=int(row["bit_offset"]),
569  array_shape=array_shape,
570  )
571  )
572  else:
573  fields.append(
574  PacketField(
575  name=row["name"],
576  data_type=row["data_type"],
577  bit_length=int(row["bit_length"]),
578  bit_offset=int(row["bit_offset"]),
579  )
580  )
581 
582  return fields
583 
584 
585 def _load(file, fields, converters, decoder_name, include_primary_header=False):
586  """Decode a file-like object containing a sequence of these packets.
587 
588  Parameters
589  ----------
590  file: str
591  Path to file on the local file system, or file-like object
592  fields : list of `ccsdspy.PacketField`
593  Layout of packet fields contained in the definition.
594  converters : dict, str to tuple (str, Converter)
595  Dictionary of post-processing conversions. keys are input field names,
596  values are tuples of (output_field_name, Converter instance)
597  decoder_name: {'fixed_length', 'variable_length'}
598  String identifying which decoder to use.
599  include_primary_header: bool
600  If True, provides the primary header in the output
601 
602  Returns
603  -------
604  dictionary mapping field names to NumPy arrays, with key order matching
605  the order fields in the packet.
606 
607  Raises
608  ------
609  ValueError
610  the decoder_name is not one of the allowed values
611  """
612  if hasattr(file, "read"):
613  file_bytes = np.frombuffer(file.read(), "u1")
614  else:
615  file_bytes = np.fromfile(file, "u1")
616 
617  if include_primary_header:
618  fields = _prepend_primary_header_fields(fields)
619 
620  fields, expand_history = _expand_array_fields(fields)
621 
622  if decoder_name == "fixed_length":
623  field_arrays = _decode_fixed_length(file_bytes, fields)
624  elif decoder_name == "variable_length":
625  field_arrays = _decode_variable_length(file_bytes, fields)
626  else:
627  raise ValueError(
628  f"Invalid decoder_name 'f{decoder_name}' specified. Must be "
629  "either 'fixed_length', or 'variable_length'"
630  )
631 
632  field_arrays = _unexpand_field_arrays(field_arrays, expand_history)
633  field_arrays = _apply_converters(field_arrays, converters)
634 
635  return field_arrays
636 
637 
638 def _apply_converters(field_arrays, converters):
639  """Apply post-processing converters in place to a dictionary of field
640  arrays.
641 
642  Parameters
643  ----------
644  field_arrays : dict of string to NumPy arrays
645  The decoded packet field arrays without any post-processing applied
646  converters : dict, str to tuple (str, Converter)
647  Dictionary of post-processing conversions. keys are input field names,
648  values are tuples of (output_field_name, Converter instance)
649 
650  Returns
651  -------
652  converted_field_arrays : dict of string to NumPy arrays
653  The converted decoded packet field arrays, as a dictionary with the same
654  key as the passed `field_arrays`.
655  """
656  converted = field_arrays.copy()
657 
658  for input_field_names, (output_field_name, converter) in converters.items():
659  # Collect list of input arrays to pass as *args to converter function
660  input_arrays = []
661 
662  for input_field_name in input_field_names:
663  input_arrays.append(field_arrays[input_field_name])
664 
665  # Call converter function
666  converted[output_field_name] = converter.convert(*input_arrays)
667 
668  return converted
list(APPEND LIBS ${NETCDF_LIBRARIES}) find_package(GSL REQUIRED) include_directories($
Definition: CMakeLists.txt:8
def load(self, file, include_primary_header=False)
def load(self, file, include_primary_header=False)
def add_converted_field(self, input_field_name, output_field_name, converter)
Definition: packet_types.py:60
int count
Definition: decode_rs.h:79