NASA Logo
Ocean Color Science Software

ocssw V2022
utils.py
Go to the documentation of this file.
1 """Utils for the CCSDSPy package."""
2 
3 __author__ = "Daniel da Silva <mail@danieldasilva.org>"
4 
5 from io import BytesIO
6 import warnings
7 
8 import numpy as np
9 
10 from . import VariableLength, PacketArray
11 from .constants import BITS_PER_BYTE, PRIMARY_HEADER_NUM_BYTES
12 from . import decode
13 
14 
15 def get_packet_total_bytes(primary_header_bytes):
16  """[autogenerated]"""
17  # Wrap around internal function from decode module
18  return decode._get_packet_total_bytes(primary_header_bytes)
19 
20 
21 def get_packet_apid(primary_header_bytes):
22  """[autogenerated]"""
23  # Wrap around internal function from decode module
24  return decode._get_packet_apid(primary_header_bytes)
25 
26 
27 # Copy __doc__ from decode module for functions which wrap decode module functions
28 # so that the doc need not be repeated in two places.
29 get_packet_total_bytes.__doc__ = decode._get_packet_total_bytes.__doc__
30 get_packet_apid.__doc__ = decode._get_packet_apid.__doc__
31 
32 
33 def iter_packet_bytes(file, include_primary_header=True):
34  """Iterate through packets as raw bytes objects, in the order they appear in a file.
35 
36  This function works with mixed files containing multiple APIDs, which may
37  include both fixed length and variable length packets.
38 
39  If end of last packet doesn't align with end of file, a warning is issued.
40 
41  Parameters
42  ----------
43  file : str, file-like
44  Path to file on the local file system, or file-like object
45  include_primary_header : bool
46  If set to False, excludes the primary header bytes (the first six)
47 
48  Yields
49  ------
50  packet_bytes : bytes
51  Bytes associated with each packet as it appears in the file. When
52  include_primary_header=False, the primary header bytes are excluded.
53  """
54  if hasattr(file, "read"):
55  file_bytes = np.frombuffer(file.read(), "u1")
56  else:
57  file_bytes = np.fromfile(file, "u1")
58 
59  offset = 0
60 
61  if include_primary_header:
62  delta_idx = 0
63  else:
64  delta_idx = PRIMARY_HEADER_NUM_BYTES
65 
66  while offset < len(file_bytes):
67  packet_nbytes = get_packet_total_bytes(
68  file_bytes[offset : offset + PRIMARY_HEADER_NUM_BYTES].tobytes()
69  )
70  packet_bytes = file_bytes[offset + delta_idx : offset + packet_nbytes].tobytes()
71 
72  yield packet_bytes
73 
74  offset += packet_nbytes
75 
76  if offset != len(file_bytes):
77  missing_bytes = offset - len(file_bytes)
78  message = (
79  f"File appears truncated-- missing {missing_bytes} byte (or " "maybe garbage at end)"
80  )
81  warnings.warn(message)
82 
83 
84 def split_packet_bytes(file, include_primary_header=True):
85  """Retrieve a list of bytes objects corresponding to each packet in a file.
86 
87  This function works with mixed files containing multiple APIDs, which may
88  include both fixed length and variable length packets.
89 
90  If end of last packet doesn't align with end of file, a warning is issued.
91 
92  Parameters
93  ----------
94  file : str, file-like
95  Path to file on the local file system, or file-like object
96  include_primary_header : bool
97  If set to False, excludes the primary header bytes (the first six)
98 
99  Returns
100  -------
101  packet_bytes : list of bytes
102  List of bytes objects associated with each packet as it appears in the
103  file. When include_primary_header=False, each byte object will have its
104  primary header bytes excluded.
105  """
106  return list(iter_packet_bytes(file, include_primary_header=include_primary_header))
107 
108 
110  """Read primary header fields and return contents as a dictionary
111  of arrays.
112 
113  This function works with mixed files containing multiple APIDs, which may
114  include both fixed length and variable length packets.
115 
116  Parameters
117  ----------
118  file : str, file-like
119  Path to file on the local file system, or file-like object
120 
121  Returns
122  -------
123  header_arrays : dict, string to NumPy array
124  Dictionary mapping header names to NumPy arrays. The header names are:
125  `CCSDS_VERSION_NUMBER`, `CCSDS_PACKET_TYPE`, `CCSDS_SECONDARY_FLAG`,
126  `CCSDS_SEQUENCE_FLAG`, `CCSDS_APID`, `CCSDS_SEQUENCE_COUNT`,
127  `CCSDS_PACKET_LENGTH`
128  """
129  pkt = VariableLength(
130  [
131  PacketArray(
132  name="unused", data_type="uint", bit_length=BITS_PER_BYTE, array_shape="expand"
133  )
134  ]
135  )
136 
137  header_arrays = pkt.load(file, include_primary_header=True)
138  del header_arrays["unused"]
139 
140  return header_arrays
141 
142 
143 def split_by_apid(mixed_file, valid_apids=None):
144  """Split a stream of mixed APIDs into separate streams by APID.
145 
146  This works with a mix of both fixed length and variable length packets.
147 
148  Parameters
149  ----------
150  mixed_file: str, file-like
151  Path to file on the local file system, or file-like object
152  valid_apids: list of int, None
153  Optional list of valid APIDs. If specified, warning will be issued when
154  an APID is encountered outside this list.
155 
156  Returns
157  -------
158  stream_by_apid : dict, int to :py:class:`~io.BytesIO`
159  Dictionary mapping integer apid number to BytesIO instance with the file
160  pointer at the beginning of the stream.
161  """
162  # If not None, convert valid_apids to set for faster lookup times
163  if valid_apids is not None:
164  valid_apids = set(valid_apids)
165 
166  stream_by_apid = {}
167 
168  for packet_bytes in iter_packet_bytes(mixed_file):
169  apid = get_packet_apid(packet_bytes[:PRIMARY_HEADER_NUM_BYTES])
170 
171  if valid_apids is not None and apid not in valid_apids:
172  warnings.warn(f"Found unknown APID {apid}")
173 
174  if apid not in stream_by_apid:
175  stream_by_apid[apid] = BytesIO()
176 
177  stream_by_apid[apid].write(packet_bytes)
178 
179  for stream in stream_by_apid.values():
180  stream.seek(0)
181 
182  return stream_by_apid
183 
184 
185 def count_packets(file, return_missing_bytes=False):
186  """Count the number of packets in a file and check if there are any
187  missing bytes in the last packet.
188 
189  This function works with mixed files containing multiple APIDs, which may
190  include both fixed length and variable length packets. When used with
191  multiple APIDs, it simply returns the total number of packets of any APID.
192 
193  If end of last packet doesn't align with end of file, a warning is issued.
194 
195  Parameters
196  ----------
197  file : str, file-like
198  Path to file on the local file system, or file-like object
199  return_missing_bytes : bool, optional
200  Also return the number of missing bytes at the end of the file. This
201  is the number of bytes which would need to be added to the file to
202  complete the last packet expected (as set by the packet length in
203  the last packet's primary header).
204 
205  Returns
206  -------
207  num_packets : int
208  Number of complete packets in the file
209  missing_bytes : int, optional
210  The number of bytes which would need to be added to the file to
211  complete the last packet expected (as set by the packet length in
212  the last packet's primary header).
213  """
214  if hasattr(file, "read"):
215  file_bytes = np.frombuffer(file.read(), "u1")
216  else:
217  file_bytes = np.fromfile(file, "u1")
218 
219  offset = 0
220  num_packets = 0
221 
222  while offset < len(file_bytes):
223  packet_nbytes = get_packet_total_bytes(
224  file_bytes[offset : offset + PRIMARY_HEADER_NUM_BYTES].tobytes()
225  )
226  offset += packet_nbytes
227  num_packets += 1
228 
229  missing_bytes = offset - len(file_bytes)
230 
231  if offset != len(file_bytes):
232  missing_bytes = offset - len(file_bytes)
233  message = (
234  f"File appears truncated-- missing {missing_bytes} byte (or " "maybe garbage at end)"
235  )
236  warnings.warn(message)
237 
238  if return_missing_bytes:
239  return num_packets, missing_bytes
240  else:
241  return num_packets
def split_by_apid(mixed_file, valid_apids=None)
Definition: utils.py:143
def read_primary_headers(file)
Definition: utils.py:109
def split_packet_bytes(file, include_primary_header=True)
Definition: utils.py:84
list(APPEND LIBS ${NETCDF_LIBRARIES}) find_package(GSL REQUIRED) include_directories($
Definition: CMakeLists.txt:8
def iter_packet_bytes(file, include_primary_header=True)
Definition: utils.py:33
def get_packet_total_bytes(primary_header_bytes)
Definition: utils.py:15
def get_packet_apid(primary_header_bytes)
Definition: utils.py:21
def count_packets(file, return_missing_bytes=False)
Definition: utils.py:185