NASA Logo
Ocean Color Science Software

ocssw V2022
converters.py
Go to the documentation of this file.
1 """This class hold the implementation of the converter system, which applies
2 post-process to decoded packet fields. This post-processing includes applying
3 linear/polynomial calibration curves, dictionary replacement, and time parsing.
4 """
5 
6 from datetime import datetime, timedelta
7 
8 import numpy as np
9 
10 __all__ = [
11  "EnumConverterMissingKey",
12  "Converter",
13  "PolyConverter",
14  "LinearConverter",
15  "EnumConverter",
16  "DatetimeConverter",
17  "StringifyBytesConverter",
18 ]
19 
20 
21 class EnumConverterMissingKey(RuntimeError):
22  """During conversion a value was encountered which did not have a
23  corresponding key in the replacement dictionary.
24  """
25 
26 
27 class Converter:
28  """Base class for all converter objects.
29 
30  This class is extended to create converters, and users may extend this
31  class to write their own custom converters.
32 
33  To write a converter, one must create a subclass and override either the method
34  `convert(*field_arrays)`. This method implements the conversion for an entire
35  sequence of decoded packet field values in a single call.
36  """
37 
38  def __init__(self):
39  raise NotImplementedError("This is a base class not meant to be instantiated directly")
40 
41  def convert(self, field_array):
42  """Convert a sequence of decoded packet field values.
43 
44  Parameters
45  ----------
46  field_array : NumPy array
47  decoded packet field values, must have at least one dimension
48 
49  Returns
50  -------
51  converted_field_array : NumPy array
52  converted form of the decoded packet field values
53  """
54  raise NotImplementedError("This method must be overridden by a subclass")
55 
56 
58  """Post-processing conversion which applies calibration using a series
59  of coefficients ordered from highest power to intercept.
60  """
61 
62  def __init__(self, coeffs):
63  """Instantiate a PolyConverter object
64 
65  Parameters
66  ----------
67  coeffs : list of float
68  Polynomial coefficients ordered from highest power to intercept.
69  """
70  self._coeffs = coeffs
71 
72  def convert(self, field_array):
73  """Apply the polynomial conversion.
74 
75  Parameters
76  ----------
77  field_array : NumPy array
78  decoded packet field values, must have at least one dimension
79 
80  Returns
81  -------
82  converted : NumPy array
83  converted form of the decoded packet field values
84  """
85  converted = np.zeros(field_array.shape, dtype=np.float64)
86 
87  for power, coeff in enumerate(reversed(self._coeffs)):
88  converted += coeff * field_array**power
89 
90  return converted
91 
92 
94  """Post-processing conversion which applies a linear (y=mx+b)
95  transformation.
96  """
97 
98  def __init__(self, slope, intercept):
99  """Instantiate a LinearConverter"""
100  super().__init__([slope, intercept])
101 
102 
104  """Post-processing conversion for applying dictionary replacement of
105  integers to strings.
106 
107  If during conversion a value is encountered which does not have a
108  corresponding key in the replacement dictionary, then a
109  `:py:class:`~ccsdspy.converters.EnumConverterMissingKey` exception
110  will be thrown.
111  """
112 
113  def __init__(self, replace_dict):
114  """Initialize a EnumConverter.
115 
116  Parameters
117  ----------
118  replace_dict : dict of int to string
119  Replacement dictionary mapping integer values to string values
120 
121  Raises
122  ------
123  TypeError
124  Either one of the keys of the replacement dictionary is not an
125  integer, or one of the values is not a string.
126  """
127  self._replace_dict = replace_dict
128 
129  for key, value in replace_dict.items():
130  if not isinstance(key, int):
131  raise TypeError(
132  f"Found key in EnumConverter replacement dictionary that is "
133  f"not an integer: {repr(key)}"
134  )
135  if not isinstance(value, str):
136  raise TypeError(
137  f"Found value in EnumConverter replacement dictionary that is "
138  f"not a string: {repr(value)}"
139  )
140 
141  def convert(self, field_array):
142  """Apply the enum replacement conversion.
143 
144  Parameters
145  ----------
146  field_array : NumPy array
147  decoded packet field values, must have at least one dimension
148 
149  Returns
150  -------
151  converted : NumPy array
152  converted form of the decoded packet field values
153  """
154  converted = np.zeros(field_array.shape, dtype=object)
155  converted_mask = np.zeros(field_array.shape, dtype=bool)
156 
157  for key, value in self._replace_dict.items():
158  converted[field_array == key] = value
159  converted_mask[field_array == key] = True
160 
161  if not converted_mask.all():
162  missing_keys = field_array[~converted_mask].tolist()
163 
165  f"The following were encountered which did not have "
166  f"corresponding keys in the replacement dictionary: "
167  f"{repr(missing_keys)}"
168  )
169 
170  return converted
171 
172 
174  """Post-processing conversion for converting timestamp fields to datetime
175  instances, computed using offset(s) from a reference time.
176 
177  This class supports the offsets stored in multiple input fields, for example
178  where one field is a coarse time (e.g. seconds) and a second field is a fine
179  time (e.g. nanoseconds). To use multiple input fields, pass a tuple of input
180  field names when this converter is added to the packet.
181  """
182 
183  _VALID_UNITS = (
184  "days",
185  "hours",
186  "minutes",
187  "seconds",
188  "milliseconds",
189  "microseconds",
190  "nanoseconds",
191  )
192  _MILLISECONDS_PER_SECOND = 1_000
193  _MICROSECONDS_PER_SECOND = 1_000_000
194  _NANOSECONDS_PER_SECOND = 1_000_000_000
195 
196  def __init__(self, since, units):
197  """Initialize a DatetimeConverter
198 
199  Parameters
200  ----------
201  since : datetime
202  Reference datetime. The time stored in the field(s) is considered an
203  offset to this reference. If this has timezone information attached to
204  it, so will the converted datetimes.
205  units : str or tuple of str
206  Units string of tuples of units strings for the offset of each
207  input field. Valid units are "days", "hours", "minutes",
208  "seconds", "milliseconds", "microseconds", and "nanoseconds".
209 
210  Raises
211  ------
212  TypeError
213  One of the input arguments is not of the correct type
214  ValueError
215  One or more of the units are invalid
216  """
217  if not isinstance(since, datetime):
218  raise TypeError("Argument 'since' must be an instance of datetime")
219 
220  if isinstance(units, str):
221  units_tuple = (units,)
222  elif isinstance(units, tuple):
223  units_tuple = units
224  else:
225  raise TypeError("Argument 'units' must be either a string or tuple")
226 
227  if not (set(units_tuple) <= set(self._VALID_UNITS)):
228  raise ValueError("One or more units are invalid")
229 
230  self._since = since
231  self._units = units_tuple
232 
233  def convert(self, *field_arrays):
234  """Apply the datetime conversion.
235 
236  Parameters
237  ----------
238  field_arrays : list of NumPy array
239  list of decoded packet field values, each must have at least one
240  dimension
241 
242  Returns
243  -------
244  converted : NumPy array of object (holding datetimes)
245  converted form of the decoded packet field values
246 
247  Raises
248  ------
249  ValueError
250  Too many or too few units were provided, as compared to the
251  input field arrays sent.
252  """
253  assert len(field_arrays) > 0, "Must have at least one input field"
254 
255  converted = []
256 
257  for field_values in zip(*field_arrays):
258  converted_time = self._since
259 
260  for unit, offset_raw in zip(self._units, field_values):
261  offset_raw = float(offset_raw)
262 
263  if unit == "days":
264  converted_time += timedelta(days=offset_raw)
265  elif unit == "hours":
266  converted_time += timedelta(hours=offset_raw)
267  elif unit == "minutes":
268  converted_time += timedelta(minutes=offset_raw)
269  elif unit == "seconds":
270  converted_time += timedelta(seconds=offset_raw)
271  elif unit == "milliseconds":
272  converted_time += timedelta(seconds=offset_raw / self._MILLISECONDS_PER_SECOND)
273  elif unit == "microseconds":
274  converted_time += timedelta(seconds=offset_raw / self._MICROSECONDS_PER_SECOND)
275  elif unit == "nanoseconds":
276  converted_time += timedelta(seconds=offset_raw / self._NANOSECONDS_PER_SECOND)
277 
278  converted.append(converted_time)
279 
280  converted = np.array(converted, dtype=object)
281 
282  return converted
283 
284 
286  """Post-processing conversion which converts byte arrays or multi-byte
287  numbers to strings in numeric representations such as binary, hexadecimal,
288  or octal.
289 
290  To convert individual bytes, the input field should be defined as a
291  `~ccsdspy.PacketArray` constructed with `data_type="uint"` and
292  `bit_length=8`. Otherwise, each element is converted as a single entity.
293 
294  If the field is an array, the shape of the array is retained. The strings
295  generated are not padded to a fixed length.
296 
297  The converted strings contain prefixes such as `0b` (binary), `0x` (hex),
298  or `0o` (octal). If the number is signed and negative, the prefixes change
299  to `-0b` (binary), `-0x` (hex), or `-0o` (octal).
300  """
301 
302  def __init__(self, format="hex"):
303  """Instantiate a StringifyBytesConverter object
304 
305  Parameters
306  ----------
307  format : {"bin", "hex", "oct"}
308  Format used to encode the bytes in a string.
309  """
310  if format not in ("bin", "hex", "oct"):
311  raise ValueError(
312  "The format= keyword passed to StringifyBytesConverter "
313  f"must be either 'bin', 'hex', or 'oct'. Got {repr(format)}"
314  )
315 
316  self._format = format
317 
318  def _stringify_number(self, num, nbytes):
319  """Internal helper method to convert a number to a string.
320 
321  Parameters
322  ----------
323  number : int
324  A single number to convert to string
325 
326  Returns
327  --------
328  as_string : the byte converted to a string using the format
329  specified when this object was created.
330  """
331  if self._format == "bin":
332  return bin(num)
333  elif self._format == "hex":
334  return hex(num)
335  else:
336  return oct(num)
337 
338  def convert(self, field_array):
339  """Apply the conversion.
340 
341  Parameters
342  ----------
343  field_array : NumPy array
344  decoded packet field values, must have at least two dimensions
345 
346  Returns
347  -------
348  converted : NumPy array
349  converted form of the converted packet field values
350  """
351  # field_arrays may either be a 1-D array, or an N-D array where N>1
352  # (this includes jagged arrays where the outer array is of
353  # dtype=object). These are implemented separately.
354  ndims = len(field_array.shape)
355 
356  if ndims == 1 and field_array.dtype != object:
357  converted = []
358 
359  for num in field_array:
360  as_string = self._stringify_number(num, field_array.itemsize)
361  converted.append(as_string)
362  else:
363  converted = []
364 
365  for i in range(field_array.shape[0]):
366  cur_array_flat = field_array[i].flatten()
367  n_items = cur_array_flat.shape[0]
368  cur_shape = field_array[i].shape
369 
370  # Loop over elements, converting individually
371  curr_array_strings = []
372 
373  for element in cur_array_flat:
374  as_string = self._stringify_number(element, cur_array_flat.itemsize)
375  curr_array_strings.append(as_string)
376 
377  # Put back into original array shape
378  curr_array_strings = np.array(curr_array_strings, dtype=object).reshape(cur_shape)
379  converted.append(curr_array_strings)
380 
381  converted = np.array(converted, dtype=object)
382 
383  return converted
def convert(self, field_array)
Definition: converters.py:41
def __init__(self, replace_dict)
Definition: converters.py:113
def _stringify_number(self, num, nbytes)
Definition: converters.py:318
def __init__(self, since, units)
Definition: converters.py:196
def convert(self, field_array)
Definition: converters.py:72
def __init__(self, coeffs)
Definition: converters.py:62
def convert(self, field_array)
Definition: converters.py:141
def __init__(self, slope, intercept)
Definition: converters.py:98
def convert(self, *field_arrays)
Definition: converters.py:233