Due to the lapse in federal government funding, NASA is not updating this website. We sincerely regret this inconvenience.
NASA Logo
Ocean Color Science Software

ocssw V2022
PacketUtils.py
Go to the documentation of this file.
1 import numpy as np
2 from tlm.timestamp import *
3 
4 def getDict(structured_array):
5  '''
6  convert numpy structured array to dict of scalars and numpy arrays
7  '''
8  myDict = {}
9 
10  for key in structured_array.dtype.names:
11  if ( key.upper().startswith('PADDING') or
12  key.upper().startswith('SPARE') ):
13  continue # skip meaningless fields
14  myDict[key] = structured_array[key][0]
15 
16  return myDict
17 
18 
19 def getbits(var,i=0):
20  return np.unpackbits(var[i,np.newaxis].view(dtype='|u1'))
21 
22 def as_int(bitlist):
23  # convert an arbitrary-length list of bits to integer
24  result = 0
25  for bit in bitlist:
26  result = (result << 1) | bit
27  return result
28 
29 def pad_packet(data, length):
30  # pad or truncate bytearray to expected length
31  packet = bytearray(length)
32  if len(data) > length:
33  packet[:] = data[0:length]
34  else:
35  packet[0:len(data)] = data
36  return packet
37 
38 #-------------------------------------------------------------------------------
39 
40 cfeHeaderFields = np.dtype([
41  ('filetype', 'S4'), # "type of file = "cFE1"
42  ('subtype', '>u4'), # 101d = downloaded PACE DS files
43  ('length', '>u4'), # length of CFE file header = 64 bytes
44  ('SCID', 'S4'), # Spacecraft ID
45  ('APID', '>u4'), # Application ID
46  ('processorid', '>u4'), # 1=Spacecraft, 2=OCI (30 in ETE files)
47  ('fileopen_seconds', '>u4'), # Time when file created
48  ('fileopen_subseconds', '>u4'), # Time when file created (fractional seconds)
49  ('description', 'S32'), # = "DS data storage file"
50 ]) # total length = 64 bytes
51 
52 dsfHeaderFields = np.dtype([ # CFE/S Data Storage header
53  ('fileclose_seconds', '>u4'), # Time when file closed
54  ('fileclose_subseconds', '>u4'), # Time when file closed (fractional seconds)
55  ('filetable_index', '>u2'), # 0=events, 1=housekeeping
56  ('filename_type', '>u2'), # 1=count, 2=time (PACE uses count)
57  ('filename', 'S64'), # fully qualified file path
58 ]) # total length = 76 bytes
59 
60 def readFileHeader(filehandle):
61 
62  # is a CFE header present?
63  pos = filehandle.tell()
64  fourchars = filehandle.read(4)
65  filehandle.seek(pos) # rewind to original position
66  if fourchars != b'cFE1':
67  return None
68 
69  # read CFE header
70  data = filehandle.read(cfeHeaderFields.itemsize)
71  tmp = np.frombuffer(data, dtype=cfeHeaderFields, count=1)
72  myDict = getDict(tmp)
73  myDict['fileopen_ts'] = decode_timestamp(myDict['fileopen_seconds'],myDict['fileopen_subseconds'])
74  myDict['fileopen'] = datetime_repr(tai58_as_datetime(myDict['fileopen_ts']))
75 
76  # append DS header, if it's there.
77  if myDict['description'].startswith(b'DS'):
78  data = filehandle.read(dsfHeaderFields.itemsize)
79  tmp = np.frombuffer(data, dtype=dsfHeaderFields, count=1)
80  myDict.update(getDict(tmp))
81  myDict['fileclose_ts'] = decode_timestamp(myDict['fileclose_seconds'],myDict['fileclose_subseconds'])
82  myDict['fileclose'] = datetime_repr(tai58_as_datetime(myDict['fileclose_ts']))
83 
84  return myDict
85 
86 #-------------------------------------------------------------------------------
87 
88 primaryHeaderFields = np.dtype([
89  ('words', '>u2', 3), # Unpack bits later
90 ]) # total length = 6 bytes
91 
92 def primaryHeader(data):
93 
94  # unpack each 16-bit word
95  tmp = np.frombuffer(data, dtype=primaryHeaderFields, count=1)
96  words = tmp['words'][0]
97  myDict = {}
98 
99  # WORD_00
100  bits = getbits(words, 0)
101  myDict['version'] = as_int(bits[0:0+3]) # Packet Version Number
102  myDict['ptype'] = bits[3] # Packet Type (0=tlm;1=cmd)
103  myDict['secondary'] = bits[4] # Secondary Header Flag
104  myDict['APID'] = as_int(bits[5:5+11]) # Application Process Identifier
105 
106  # WORD_01
107  bits = getbits(words, 1)
108  myDict['grouping'] = as_int(bits[0:0+2]) # Sequence (00=cont., 01=first, 10=last, 11=only)
109  myDict['sequence'] = as_int(bits[2:2+11]) # Packet Sequence Count
110 
111  # WORD_02
112  myDict['length'] = words[2] # Packet Data Length (total bytes - 1)
113 
114  # no conversions needed
115  return myDict
116 
117 #-------------------------------------------------------------------------------
118 
119 def readPacket(filehandle):
120 
121  # Read packetField 1: packet primary header
122  try:
123  rawhdr = filehandle.read(primaryHeaderFields.itemsize)
124  header = primaryHeader(rawhdr)
125  datalen = header['length']
126  assert datalen > 0
127  except:
128  return None, None, None
129 
130  # Read packetField 2: data
131  data = filehandle.read(datalen+1)
132 
133  return header, data, rawhdr
def getbits(var, i=0)
Definition: PacketUtils.py:19
def as_int(bitlist)
Definition: PacketUtils.py:22
def decode_timestamp(seconds, subseconds)
Definition: PacketUtils.py:8
def pad_packet(data, length)
Definition: PacketUtils.py:55
def tai58_as_datetime(tai58)
Definition: PacketUtils.py:24
def datetime_repr(dt)
Definition: PacketUtils.py:36
def getDict(structured_array)
Definition: PacketUtils.py:41