NASA Logo
Ocean Color Science Software

ocssw V2022
PacketUtils.py
Go to the documentation of this file.
1 import sys
2 import numpy as np
3 import datetime
4 
5 TAI58_OFFSET = datetime.datetime(1970, 1, 1) - datetime.datetime(1958, 1, 1)
6 LEAPSEC = 37 # TODO: look up by year
7 
8 def decode_timestamp(seconds,subseconds):
9  return float(seconds) + float(subseconds) / pow(2,8*subseconds.itemsize)
10 
11 CCSDS_timestamp = np.dtype([
12  ('seconds', '>u4'), # Time since start of epoch in seconds (TAI58)
13  ('subsecs', '>u2'), # fractional seconds
14 ]) # total length = 6 bytes
15 
16 def parse_CCSDS_timestamp(timestr):
17  return decode_timestamp(timestr['seconds'], timestr['subsecs'])
18 
19 def readTimestamp(data):
20  tmp = np.frombuffer(data, dtype=CCSDS_timestamp, count=1)
21  ts = parse_CCSDS_timestamp(tmp[0])
22  return ts
23 
24 def tai58_as_datetime(tai58):
25  # convert tai58 (seconds since 1958, including leapsecs) to datetime object
26  dt = datetime.datetime.utcfromtimestamp(tai58 - LEAPSEC) - TAI58_OFFSET
27  return dt # seconds since Jan 1, 1970
28 
29 def seconds_since(tai58, basetime=None):
30  # translate TAI58 to seconds since a baseline time (defaults to start of day)
31  dt = tai58_as_datetime(tai58)
32  if not basetime:
33  basetime = dt.replace(hour=0, minute=0, second=0, microsecond=0)
34  return (dt - basetime).total_seconds()
35 
36 def datetime_repr(dt):
37  return dt.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
38 
39 #-------------------------------------------------------------------------------
40 
41 def getDict(structured_array):
42  '''
43  convert numpy structured array to dict of scalars and numpy arrays
44  '''
45  myDict = {}
46 
47  for key in structured_array.dtype.names:
48  if ( key.upper().startswith('PADDING') or
49  key.upper().startswith('SPARE') ):
50  continue # skip meaningless fields
51  myDict[key] = structured_array[key][0]
52 
53  return myDict
54 
55 def pad_packet(data, length):
56  # pad or truncate bytearray to expected length
57  packet = bytearray(length)
58  if len(data) > length:
59  packet[:] = data[0:length]
60  else:
61  packet[0:len(data)] = data
62  return packet
63 
64 #-------------------------------------------------------------------------------
65 
66 cfeHeaderFields = np.dtype([
67  ('filetype', 'S4'), # "type of file = "cFE1"
68  ('subtype', '>u4'), # 101d = downloaded PACE DS files
69  ('length', '>u4'), # length of CFE file header = 64 bytes
70  ('SCID', 'S4'), # Spacecraft ID
71  ('APID', '>u4'), # Application ID
72  ('processorid', '>u4'), # 1=Spacecraft, 2=OCI (30 in ETE files)
73  ('fileopen_seconds', '>u4'), # Time when file created
74  ('fileopen_subseconds', '>u4'), # Time when file created (fractional seconds)
75  ('description', 'S32'), # = "DS data storage file"
76 ]) # total length = 64 bytes
77 
78 dsfHeaderFields = np.dtype([ # CFE/S Data Storage header
79  ('fileclose_seconds', '>u4'), # Time when file closed
80  ('fileclose_subseconds', '>u4'), # Time when file closed (fractional seconds)
81  ('filetable_index', '>u2'), # 0=events, 1=housekeeping
82  ('filename_type', '>u2'), # 1=count, 2=time (PACE uses count)
83  ('filename', 'S64'), # fully qualified file path
84 ]) # total length = 76 bytes
85 
86 def readFileHeader(filehandle):
87 
88  # is a CFE header present?
89  pos = filehandle.tell()
90  fourchars = filehandle.read(4)
91  filehandle.seek(pos) # rewind to original position
92  if fourchars != b'cFE1':
93  return None
94 
95  # read CFE header
96  data = filehandle.read(cfeHeaderFields.itemsize)
97  tmp = np.frombuffer(data, dtype=cfeHeaderFields, count=1)
98  myDict = getDict(tmp)
99  myDict['fileopen_ts'] = decode_timestamp(myDict['fileopen_seconds'],
100  myDict['fileopen_subseconds'])
101  myDict['fileopen'] = datetime_repr(tai58_as_datetime(myDict['fileopen_ts']))
102 
103  # append DS header, if it's there.
104  if myDict['description'].startswith(b'DS'):
105  data = filehandle.read(dsfHeaderFields.itemsize)
106  tmp = np.frombuffer(data, dtype=dsfHeaderFields, count=1)
107  myDict.update(getDict(tmp))
108  myDict['fileclose_ts'] = decode_timestamp(myDict['fileclose_seconds'],
109  myDict['fileclose_subseconds'])
110  myDict['fileclose'] = datetime_repr(tai58_as_datetime(myDict['fileclose_ts']))
111 
112  return myDict
113 
114 #-------------------------------------------------------------------------------
def seconds_since(tai58, basetime=None)
Definition: PacketUtils.py:29
def readTimestamp(data)
Definition: PacketUtils.py:19
def parse_CCSDS_timestamp(timestr)
Definition: PacketUtils.py:16
def decode_timestamp(seconds, subseconds)
Definition: PacketUtils.py:8
def pad_packet(data, length)
Definition: PacketUtils.py:55
def tai58_as_datetime(tai58)
Definition: PacketUtils.py:24
def datetime_repr(dt)
Definition: PacketUtils.py:36
def getDict(structured_array)
Definition: PacketUtils.py:41