1 """Utils for the CCSDSPy package."""
3 __author__ =
"Daniel da Silva <mail@danieldasilva.org>"
10 from .
import VariableLength, PacketArray
11 from .constants
import BITS_PER_BYTE, PRIMARY_HEADER_NUM_BYTES
18 return decode._get_packet_total_bytes(primary_header_bytes)
24 return decode._get_packet_apid(primary_header_bytes)
29 get_packet_total_bytes.__doc__ = decode._get_packet_total_bytes.__doc__
30 get_packet_apid.__doc__ = decode._get_packet_apid.__doc__
34 """Iterate through packets as raw bytes objects, in the order they appear in a file.
36 This function works with mixed files containing multiple APIDs, which may
37 include both fixed length and variable length packets.
39 If end of last packet doesn't align with end of file, a warning is issued.
44 Path to file on the local file system, or file-like object
45 include_primary_header : bool
46 If set to False, excludes the primary header bytes (the first six)
51 Bytes associated with each packet as it appears in the file. When
52 include_primary_header=False, the primary header bytes are excluded.
54 if hasattr(file,
"read"):
55 file_bytes = np.frombuffer(file.read(),
"u1")
57 file_bytes = np.fromfile(file,
"u1")
61 if include_primary_header:
64 delta_idx = PRIMARY_HEADER_NUM_BYTES
66 while offset < len(file_bytes):
68 file_bytes[offset : offset + PRIMARY_HEADER_NUM_BYTES].tobytes()
70 packet_bytes = file_bytes[offset + delta_idx : offset + packet_nbytes].tobytes()
74 offset += packet_nbytes
76 if offset != len(file_bytes):
77 missing_bytes = offset - len(file_bytes)
79 f
"File appears truncated-- missing {missing_bytes} byte (or " "maybe garbage at end)"
81 warnings.warn(message)
85 """Retrieve a list of bytes objects corresponding to each packet in a file.
87 This function works with mixed files containing multiple APIDs, which may
88 include both fixed length and variable length packets.
90 If end of last packet doesn't align with end of file, a warning is issued.
95 Path to file on the local file system, or file-like object
96 include_primary_header : bool
97 If set to False, excludes the primary header bytes (the first six)
101 packet_bytes : list of bytes
102 List of bytes objects associated with each packet as it appears in the
103 file. When include_primary_header=False, each byte object will have its
104 primary header bytes excluded.
110 """Read primary header fields and return contents as a dictionary
113 This function works with mixed files containing multiple APIDs, which may
114 include both fixed length and variable length packets.
118 file : str, file-like
119 Path to file on the local file system, or file-like object
123 header_arrays : dict, string to NumPy array
124 Dictionary mapping header names to NumPy arrays. The header names are:
125 `CCSDS_VERSION_NUMBER`, `CCSDS_PACKET_TYPE`, `CCSDS_SECONDARY_FLAG`,
126 `CCSDS_SEQUENCE_FLAG`, `CCSDS_APID`, `CCSDS_SEQUENCE_COUNT`,
127 `CCSDS_PACKET_LENGTH`
129 pkt = VariableLength(
132 name=
"unused", data_type=
"uint", bit_length=BITS_PER_BYTE, array_shape=
"expand"
137 header_arrays = pkt.load(file, include_primary_header=
True)
138 del header_arrays[
"unused"]
144 """Split a stream of mixed APIDs into separate streams by APID.
146 This works with a mix of both fixed length and variable length packets.
150 mixed_file: str, file-like
151 Path to file on the local file system, or file-like object
152 valid_apids: list of int, None
153 Optional list of valid APIDs. If specified, warning will be issued when
154 an APID is encountered outside this list.
158 stream_by_apid : dict, int to :py:class:`~io.BytesIO`
159 Dictionary mapping integer apid number to BytesIO instance with the file
160 pointer at the beginning of the stream.
163 if valid_apids
is not None:
164 valid_apids =
set(valid_apids)
171 if valid_apids
is not None and apid
not in valid_apids:
172 warnings.warn(f
"Found unknown APID {apid}")
174 if apid
not in stream_by_apid:
175 stream_by_apid[apid] = BytesIO()
177 stream_by_apid[apid].write(packet_bytes)
179 for stream
in stream_by_apid.values():
182 return stream_by_apid
186 """Count the number of packets in a file and check if there are any
187 missing bytes in the last packet.
189 This function works with mixed files containing multiple APIDs, which may
190 include both fixed length and variable length packets. When used with
191 multiple APIDs, it simply returns the total number of packets of any APID.
193 If end of last packet doesn't align with end of file, a warning is issued.
197 file : str, file-like
198 Path to file on the local file system, or file-like object
199 return_missing_bytes : bool, optional
200 Also return the number of missing bytes at the end of the file. This
201 is the number of bytes which would need to be added to the file to
202 complete the last packet expected (as set by the packet length in
203 the last packet's primary header).
208 Number of complete packets in the file
209 missing_bytes : int, optional
210 The number of bytes which would need to be added to the file to
211 complete the last packet expected (as set by the packet length in
212 the last packet's primary header).
214 if hasattr(file,
"read"):
215 file_bytes = np.frombuffer(file.read(),
"u1")
217 file_bytes = np.fromfile(file,
"u1")
222 while offset < len(file_bytes):
224 file_bytes[offset : offset + PRIMARY_HEADER_NUM_BYTES].tobytes()
226 offset += packet_nbytes
229 missing_bytes = offset - len(file_bytes)
231 if offset != len(file_bytes):
232 missing_bytes = offset - len(file_bytes)
234 f
"File appears truncated-- missing {missing_bytes} byte (or " "maybe garbage at end)"
236 warnings.warn(message)
238 if return_missing_bytes:
239 return num_packets, missing_bytes