Due to the lapse in federal government funding, NASA is not updating this website. We sincerely regret this inconvenience.
NASA Logo
Ocean Color Science Software

ocssw V2022
PacketUtils.py
Go to the documentation of this file.
1 import sys
2 import numpy as np
3 import datetime
4 
5 TAI58_OFFSET = datetime.datetime(1970, 1, 1) - datetime.datetime(1958, 1, 1)
6 LEAPSEC = 37 # TODO: look up by year
7 
8 def decode_timestamp(seconds,subseconds):
9  return float(seconds) + float(subseconds) / pow(2,8*subseconds.itemsize)
10 
11 CCSDS_timestamp = np.dtype([
12  ('seconds', '>u4'), # Time since start of epoch in seconds (TAI58)
13  ('subsecs', '>u2'), # fractional seconds
14 ]) # total length = 6 bytes
15 
16 def parse_CCSDS_timestamp(timestr):
17  return decode_timestamp(timestr['seconds'], timestr['subsecs'])
18 
19 def readTimestamp(data):
20  tmp = np.frombuffer(data, dtype=CCSDS_timestamp, count=1)
21  ts = parse_CCSDS_timestamp(tmp[0])
22  return ts
23 
24 def tai58_as_datetime(tai58):
25  # convert tai58 (seconds since 1958, including leapsecs) to datetime object
26  dt = datetime.datetime.utcfromtimestamp(tai58 - LEAPSEC) - TAI58_OFFSET
27  return dt # seconds since Jan 1, 1970
28 
29 def seconds_since(tai58, basetime=None):
30  # translate TAI58 to seconds since a baseline time (defaults to start of day)
31  dt = tai58_as_datetime(tai58)
32  if not basetime:
33  basetime = dt.replace(hour=0, minute=0, second=0, microsecond=0)
34  return (dt - basetime).total_seconds()
35 
36 def datetime_repr(dt):
37  return dt.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
38 
39 #-------------------------------------------------------------------------------
40 
41 def getDict(structured_array):
42  '''
43  convert numpy structured array to dict of scalars and numpy arrays
44  '''
45  myDict = {}
46 
47  for key in structured_array.dtype.names:
48  if ( key.upper().startswith('PADDING') or
49  key.upper().startswith('SPARE') ):
50  continue # skip meaningless fields
51  myDict[key] = structured_array[key][0]
52 
53  return myDict
54 
55 def pad_packet(data, length):
56  # pad or truncate bytearray to expected length
57  packet = bytearray(length)
58  if len(data) > length:
59  packet[:] = data[0:length]
60  else:
61  packet[0:len(data)] = data
62  return packet
63 
64 #-------------------------------------------------------------------------------
65 
66 cfeHeaderFields = np.dtype([
67  ('filetype', 'S4'), # "type of file = "cFE1"
68  ('subtype', '>u4'), # 101d = downloaded PACE DS files
69  ('length', '>u4'), # length of CFE file header = 64 bytes
70  ('SCID', 'S4'), # Spacecraft ID
71  ('APID', '>u4'), # Application ID
72  ('processorid', '>u4'), # 1=Spacecraft, 2=OCI (30 in ETE files)
73  ('fileopen_seconds', '>u4'), # Time when file created
74  ('fileopen_subseconds', '>u4'), # Time when file created (fractional seconds)
75  ('description', 'S32'), # = "DS data storage file"
76 ]) # total length = 64 bytes
77 
78 dsfHeaderFields = np.dtype([ # CFE/S Data Storage header
79  ('fileclose_seconds', '>u4'), # Time when file closed
80  ('fileclose_subseconds', '>u4'), # Time when file closed (fractional seconds)
81  ('filetable_index', '>u2'), # 0=events, 1=housekeeping
82  ('filename_type', '>u2'), # 1=count, 2=time (PACE uses count)
83  ('filename', 'S64'), # fully qualified file path
84 ]) # total length = 76 bytes
85 
86 def readFileHeader(filehandle):
87 
88  # is a CFE header present?
89  pos = filehandle.tell()
90  fourchars = filehandle.read(4)
91  filehandle.seek(pos) # rewind to original position
92  if fourchars != b'cFE1':
93  return None
94 
95  # read CFE header
96  data = filehandle.read(cfeHeaderFields.itemsize)
97  tmp = np.frombuffer(data, dtype=cfeHeaderFields, count=1)
98  myDict = getDict(tmp)
99  myDict['fileopen_ts'] = decode_timestamp(myDict['fileopen_seconds'],
100  myDict['fileopen_subseconds'])
101  myDict['fileopen'] = datetime_repr(tai58_as_datetime(myDict['fileopen_ts']))
102 
103  # append DS header, if it's there.
104  if myDict['description'].startswith(b'DS'):
105  data = filehandle.read(dsfHeaderFields.itemsize)
106  tmp = np.frombuffer(data, dtype=dsfHeaderFields, count=1)
107  myDict.update(getDict(tmp))
108  myDict['fileclose_ts'] = decode_timestamp(myDict['fileclose_seconds'],
109  myDict['fileclose_subseconds'])
110  myDict['fileclose'] = datetime_repr(tai58_as_datetime(myDict['fileclose_ts']))
111 
112  return myDict
113 
114 #-------------------------------------------------------------------------------
def seconds_since(tai58, basetime=None)
Definition: PacketUtils.py:29
def readTimestamp(data)
Definition: PacketUtils.py:19
def parse_CCSDS_timestamp(timestr)
Definition: PacketUtils.py:16
def decode_timestamp(seconds, subseconds)
Definition: PacketUtils.py:8
def pad_packet(data, length)
Definition: PacketUtils.py:55
def tai58_as_datetime(tai58)
Definition: PacketUtils.py:24
def datetime_repr(dt)
Definition: PacketUtils.py:36
def getDict(structured_array)
Definition: PacketUtils.py:41