OB.DAAC Logo
NASA Logo
Ocean Color Science Software

ocssw V2022
PacketUtils.py
Go to the documentation of this file.
1 import numpy as np
2 from tlm.timestamp import *
3 
4 def getDict(structured_array):
5  '''
6  convert numpy structured array to dict of scalars and numpy arrays
7  '''
8  myDict = {}
9 
10  for key in structured_array.dtype.names:
11  if ( key.upper().startswith('PADDING') or
12  key.upper().startswith('SPARE') ):
13  continue # skip meaningless fields
14  myDict[key] = structured_array[key][0]
15 
16  return myDict
17 
18 
19 def getbits(var,i=0):
20  return np.unpackbits(var[i,np.newaxis].view(dtype='|u1'))
21 
22 def as_int(bitlist):
23  # convert an arbitrary-length list of bits to integer
24  result = 0
25  for bit in bitlist:
26  result = (result << 1) | bit
27  return result
28 
29 def pad_packet(data, length):
30  # pad or truncate bytearray to expected length
31  packet = bytearray(length)
32  if len(data) > length:
33  packet[:] = data[0:length]
34  else:
35  packet[0:len(data)] = data
36  return packet
37 
38 #-------------------------------------------------------------------------------
39 
40 cfeHeaderFields = np.dtype([
41  ('filetype', 'S4'), # "type of file = "cFE1"
42  ('subtype', '>u4'), # 101d = downloaded PACE DS files
43  ('length', '>u4'), # length of CFE file header = 64 bytes
44  ('SCID', 'S4'), # Spacecraft ID
45  ('APID', '>u4'), # Application ID
46  ('processorid', '>u4'), # 1=Spacecraft, 2=OCI (30 in ETE files)
47  ('fileopen_seconds', '>u4'), # Time when file created
48  ('fileopen_subseconds', '>u4'), # Time when file created (fractional seconds)
49  ('description', 'S32'), # = "DS data storage file"
50 ]) # total length = 64 bytes
51 
52 dsfHeaderFields = np.dtype([ # CFE/S Data Storage header
53  ('fileclose_seconds', '>u4'), # Time when file closed
54  ('fileclose_subseconds', '>u4'), # Time when file closed (fractional seconds)
55  ('filetable_index', '>u2'), # 0=events, 1=housekeeping
56  ('filename_type', '>u2'), # 1=count, 2=time (PACE uses count)
57  ('filename', 'S64'), # fully qualified file path
58 ]) # total length = 76 bytes
59 
60 def readFileHeader(filehandle):
61 
62  # is a CFE header present?
63  pos = filehandle.tell()
64  fourchars = filehandle.read(4)
65  filehandle.seek(pos) # rewind to original position
66  if fourchars != b'cFE1':
67  return None
68 
69  # read CFE header
70  data = filehandle.read(cfeHeaderFields.itemsize)
71  tmp = np.frombuffer(data, dtype=cfeHeaderFields, count=1)
72  myDict = getDict(tmp)
73  myDict['fileopen_ts'] = decode_timestamp(myDict['fileopen_seconds'],myDict['fileopen_subseconds'])
74  myDict['fileopen'] = datetime_repr(tai58_as_datetime(myDict['fileopen_ts']))
75 
76  # append DS header, if it's there.
77  if myDict['description'].startswith(b'DS'):
78  data = filehandle.read(dsfHeaderFields.itemsize)
79  tmp = np.frombuffer(data, dtype=dsfHeaderFields, count=1)
80  myDict.update(getDict(tmp))
81  myDict['fileclose_ts'] = decode_timestamp(myDict['fileclose_seconds'],myDict['fileclose_subseconds'])
82  myDict['fileclose'] = datetime_repr(tai58_as_datetime(myDict['fileclose_ts']))
83 
84  return myDict
85 
86 #-------------------------------------------------------------------------------
87 
88 primaryHeaderFields = np.dtype([
89  ('words', '>u2', 3), # Unpack bits later
90 ]) # total length = 6 bytes
91 
92 def primaryHeader(data):
93 
94  # unpack each 16-bit word
95  tmp = np.frombuffer(data, dtype=primaryHeaderFields, count=1)
96  words = tmp['words'][0]
97  myDict = {}
98 
99  # WORD_00
100  bits = getbits(words, 0)
101  myDict['version'] = as_int(bits[0:0+3]) # Packet Version Number
102  myDict['ptype'] = bits[3] # Packet Type (0=tlm;1=cmd)
103  myDict['secondary'] = bits[4] # Secondary Header Flag
104  myDict['APID'] = as_int(bits[5:5+11]) # Application Process Identifier
105 
106  # WORD_01
107  bits = getbits(words, 1)
108  myDict['grouping'] = as_int(bits[0:0+2]) # Sequence (00=cont., 01=first, 10=last, 11=only)
109  myDict['sequence'] = as_int(bits[2:2+11]) # Packet Sequence Count
110 
111  # WORD_02
112  myDict['length'] = words[2] # Packet Data Length (total bytes - 1)
113 
114  # no conversions needed
115  return myDict
116 
117 #-------------------------------------------------------------------------------
118 
119 def readPacket(filehandle):
120 
121  # Read packetField 1: packet primary header
122  try:
123  rawhdr = filehandle.read(primaryHeaderFields.itemsize)
124  header = primaryHeader(rawhdr)
125  datalen = header['length']
126  assert datalen > 0
127  except:
128  return None, None, None
129 
130  # Read packetField 2: data
131  data = filehandle.read(datalen+1)
132 
133  return header, data, rawhdr
def getDict(structured_array)
Definition: PacketUtils.py:4
def decode_timestamp(seconds, subseconds)
Definition: timestamp.py:7
def getbits(var, i=0)
Definition: PacketUtils.py:19
def pad_packet(data, length)
Definition: PacketUtils.py:29
def as_int(bitlist)
Definition: PacketUtils.py:22
def tai58_as_datetime(tai58)
Definition: timestamp.py:23
def datetime_repr(dt)
Definition: timestamp.py:35