1 """This class hold the implementation of the converter system, which applies
2 post-process to decoded packet fields. This post-processing includes applying
3 linear/polynomial calibration curves, dictionary replacement, and time parsing.
6 from datetime
import datetime, timedelta
11 "EnumConverterMissingKey",
17 "StringifyBytesConverter",
22 """During conversion a value was encountered which did not have a
23 corresponding key in the replacement dictionary.
28 """Base class for all converter objects.
30 This class is extended to create converters, and users may extend this
31 class to write their own custom converters.
33 To write a converter, one must create a subclass and override either the method
34 `convert(*field_arrays)`. This method implements the conversion for an entire
35 sequence of decoded packet field values in a single call.
39 raise NotImplementedError(
"This is a base class not meant to be instantiated directly")
42 """Convert a sequence of decoded packet field values.
46 field_array : NumPy array
47 decoded packet field values, must have at least one dimension
51 converted_field_array : NumPy array
52 converted form of the decoded packet field values
54 raise NotImplementedError(
"This method must be overridden by a subclass")
58 """Post-processing conversion which applies calibration using a series
59 of coefficients ordered from highest power to intercept.
63 """Instantiate a PolyConverter object
67 coeffs : list of float
68 Polynomial coefficients ordered from highest power to intercept.
73 """Apply the polynomial conversion.
77 field_array : NumPy array
78 decoded packet field values, must have at least one dimension
82 converted : NumPy array
83 converted form of the decoded packet field values
85 converted = np.zeros(field_array.shape, dtype=np.float64)
87 for power, coeff
in enumerate(reversed(self.
_coeffs)):
88 converted += coeff * field_array**power
94 """Post-processing conversion which applies a linear (y=mx+b)
99 """Instantiate a LinearConverter"""
100 super().
__init__([slope, intercept])
104 """Post-processing conversion for applying dictionary replacement of
107 If during conversion a value is encountered which does not have a
108 corresponding key in the replacement dictionary, then a
109 `:py:class:`~ccsdspy.converters.EnumConverterMissingKey` exception
114 """Initialize a EnumConverter.
118 replace_dict : dict of int to string
119 Replacement dictionary mapping integer values to string values
124 Either one of the keys of the replacement dictionary is not an
125 integer, or one of the values is not a string.
129 for key, value
in replace_dict.items():
130 if not isinstance(key, int):
132 f
"Found key in EnumConverter replacement dictionary that is "
133 f
"not an integer: {repr(key)}"
135 if not isinstance(value, str):
137 f
"Found value in EnumConverter replacement dictionary that is "
138 f
"not a string: {repr(value)}"
142 """Apply the enum replacement conversion.
146 field_array : NumPy array
147 decoded packet field values, must have at least one dimension
151 converted : NumPy array
152 converted form of the decoded packet field values
154 converted = np.zeros(field_array.shape, dtype=object)
155 converted_mask = np.zeros(field_array.shape, dtype=bool)
158 converted[field_array == key] = value
159 converted_mask[field_array == key] =
True
161 if not converted_mask.all():
162 missing_keys = field_array[~converted_mask].tolist()
165 f
"The following were encountered which did not have "
166 f
"corresponding keys in the replacement dictionary: "
167 f
"{repr(missing_keys)}"
174 """Post-processing conversion for converting timestamp fields to datetime
175 instances, computed using offset(s) from a reference time.
177 This class supports the offsets stored in multiple input fields, for example
178 where one field is a coarse time (e.g. seconds) and a second field is a fine
179 time (e.g. nanoseconds). To use multiple input fields, pass a tuple of input
180 field names when this converter is added to the packet.
192 _MILLISECONDS_PER_SECOND = 1_000
193 _MICROSECONDS_PER_SECOND = 1_000_000
194 _NANOSECONDS_PER_SECOND = 1_000_000_000
197 """Initialize a DatetimeConverter
202 Reference datetime. The time stored in the field(s) is considered an
203 offset to this reference. If this has timezone information attached to
204 it, so will the converted datetimes.
205 units : str or tuple of str
206 Units string of tuples of units strings for the offset of each
207 input field. Valid units are "days", "hours", "minutes",
208 "seconds", "milliseconds", "microseconds", and "nanoseconds".
213 One of the input arguments is not of the correct type
215 One or more of the units are invalid
217 if not isinstance(since, datetime):
218 raise TypeError(
"Argument 'since' must be an instance of datetime")
220 if isinstance(units, str):
221 units_tuple = (units,)
222 elif isinstance(units, tuple):
225 raise TypeError(
"Argument 'units' must be either a string or tuple")
228 raise ValueError(
"One or more units are invalid")
234 """Apply the datetime conversion.
238 field_arrays : list of NumPy array
239 list of decoded packet field values, each must have at least one
244 converted : NumPy array of object (holding datetimes)
245 converted form of the decoded packet field values
250 Too many or too few units were provided, as compared to the
251 input field arrays sent.
253 assert len(field_arrays) > 0,
"Must have at least one input field"
257 for field_values
in zip(*field_arrays):
258 converted_time = self.
_since
260 for unit, offset_raw
in zip(self.
_units, field_values):
261 offset_raw =
float(offset_raw)
264 converted_time += timedelta(days=offset_raw)
265 elif unit ==
"hours":
266 converted_time += timedelta(hours=offset_raw)
267 elif unit ==
"minutes":
268 converted_time += timedelta(minutes=offset_raw)
269 elif unit ==
"seconds":
270 converted_time += timedelta(seconds=offset_raw)
271 elif unit ==
"milliseconds":
273 elif unit ==
"microseconds":
275 elif unit ==
"nanoseconds":
278 converted.append(converted_time)
280 converted = np.array(converted, dtype=object)
286 """Post-processing conversion which converts byte arrays or multi-byte
287 numbers to strings in numeric representations such as binary, hexadecimal,
290 To convert individual bytes, the input field should be defined as a
291 `~ccsdspy.PacketArray` constructed with `data_type="uint"` and
292 `bit_length=8`. Otherwise, each element is converted as a single entity.
294 If the field is an array, the shape of the array is retained. The strings
295 generated are not padded to a fixed length.
297 The converted strings contain prefixes such as `0b` (binary), `0x` (hex),
298 or `0o` (octal). If the number is signed and negative, the prefixes change
299 to `-0b` (binary), `-0x` (hex), or `-0o` (octal).
303 """Instantiate a StringifyBytesConverter object
307 format : {"bin", "hex", "oct"}
308 Format used to encode the bytes in a string.
310 if format
not in (
"bin",
"hex",
"oct"):
312 "The format= keyword passed to StringifyBytesConverter "
313 f
"must be either 'bin', 'hex', or 'oct'. Got {repr(format)}"
318 def _stringify_number(self, num, nbytes):
319 """Internal helper method to convert a number to a string.
324 A single number to convert to string
328 as_string : the byte converted to a string using the format
329 specified when this object was created.
339 """Apply the conversion.
343 field_array : NumPy array
344 decoded packet field values, must have at least two dimensions
348 converted : NumPy array
349 converted form of the converted packet field values
354 ndims = len(field_array.shape)
356 if ndims == 1
and field_array.dtype != object:
359 for num
in field_array:
361 converted.append(as_string)
365 for i
in range(field_array.shape[0]):
366 cur_array_flat = field_array[i].flatten()
367 n_items = cur_array_flat.shape[0]
368 cur_shape = field_array[i].shape
371 curr_array_strings = []
373 for element
in cur_array_flat:
375 curr_array_strings.append(as_string)
378 curr_array_strings = np.array(curr_array_strings, dtype=object).reshape(cur_shape)
379 converted.append(curr_array_strings)
381 converted = np.array(converted, dtype=object)