NASA Logo
Ocean Color Science Software

ocssw V2022
tlmgen_pace.py
Go to the documentation of this file.
1 #! /usr/bin/env python3
2 
3 import argparse
4 import glob
5 import logging
6 import os.path
7 import sys
8 from io import BytesIO
9 
10 import pandas as pd
11 from telemetry import ccsdspy
12 from telemetry.PacketUtils import *
13 
14 __version__ = "1.1.0 (2024-05-20)"
15 
16 
17 def main():
18  print("tlmgen_pace", __version__)
19 
20  # Read command line options
21  parser = argparse.ArgumentParser(
22  formatter_class=argparse.RawTextHelpFormatter,
23  description="Convert S-band housekeeping telemetry to CSV for specified fields",
24  epilog="""
25 EXIT Status:
26 0 : Success
27 1 : Fatal error
28 101 : Non-fatal file open error
29 102 : Invalid file or instrument from CFE header
30 103 : Invalid packet header
31 104 : Invalid packet [header/datatype/length]
32 110 : No valid packets found
33 120 : Multiple warnings; see log
34 """,
35  )
36  parser.add_argument(
37  "ifile",
38  type=str,
39  help="path to S-band telemetry (HSK) file OR list of input files, one per line",
40  )
41  parser.add_argument(
42  "-o", "--ofile", type=str, help="output CSV file; defaults to ifile.csv"
43  )
44  parser.add_argument(
45  "--packetdir",
46  type=str,
47  help="path to directory containing packet structures for desired mnemonics.",
48  )
49  parser.add_argument(
50  "-v",
51  "--verbose",
52  action="store_true",
53  default=False,
54  help="print status messages",
55  )
56  args = parser.parse_args()
57  status = 0
58 
59  loglevel = logging.INFO if args.verbose else logging.WARNING
60  logging.basicConfig(format="%(levelname)s: %(message)s", level=loglevel)
61 
62  # locate CSV directory
63  if args.packetdir is None:
64  pktDir = os.path.join(
65  os.getenv("OCDATAROOT"), "telemetry", "pace", "monitoring"
66  )
67  else:
68  pktDir = args.packetdir
69  if not os.path.exists(pktDir):
70  logging.error(f"ERROR: The directory {pktDir} does not exist.")
71  return 1
72 
73  # read conversions
74  csvfile = os.path.join(pktDir, "LinearConverters.csv")
75  if os.path.exists(csvfile):
76  conversions = pd.read_csv(csvfile)
77  for column in ["slope", "intercept"]:
78  if isinstance(conversions[column][0], str):
79  conversions[column] = [eval(v) for v in conversions[column]]
80 
81  # read packet definitions
82  packetDef = {}
83  for csvfile in glob.glob(os.path.join(pktDir, "APID[0-9]*.*.csv")):
84  apid = int(
85  os.path.basename(csvfile).split(".", maxsplit=1)[0].replace("APID", "")
86  )
87  packetDef[apid] = ccsdspy.FixedLength.from_file(csvfile)
88 
89  # attach conversion, if present
90  for name in [f._name for f in packetDef[apid]._fields]:
91  row = conversions.loc[conversions["mnemonic"] == name]
92  if len(row) > 0:
93  packetDef[apid].add_converted_field(
94  name,
95  name,
97  slope=row.slope.values[0], intercept=row.intercept.values[0]
98  ),
99  )
100 
101  ofile = f"{args.ifile}.csv" if args.ofile is None else args.ofile
102 
103  # Is input file tlm or list of files?
104  filelist = []
105  infile = os.path.expandvars(args.ifile)
106 
107  try:
108  with open(infile, mode="rt") as flist: # try to read as list of files
109  try:
110  input_list = True
111  for ifile in flist:
112  filelist.append(os.path.expandvars(ifile.rstrip()))
113  except UnicodeDecodeError:
114  input_list = False # contains invalid byte - infile is binary
115 
116  except IOError as e:
117  logging.error(f"{e}; exiting")
118  return 1
119 
120  if not len(filelist): # if that didn't work, it's a single HSK file
121  filelist.append(infile)
122 
123  writeheader = True
124 
125  # Step through all input files
126  for filename in filelist:
127  logging.info(f"Reading {filename}")
128  fname = os.path.basename(filename)
129  dictList = []
130 
131  try:
132  ifile = open(filename, mode="rb")
133  except IOError as e:
134  status = 120 if status > 0 else 101
135  logging.warning(f"{e}; continuing")
136  continue # input_list errors already caught above
137 
138  # Read any file header(s)
139  filehdr = readFileHeader(ifile)
140  if filehdr:
141  logging.info(filehdr)
142  logging.info("")
143 
144  # Is it the right kind of file?
145  desired = (
146  filehdr["subtype"] == 101
147  and filehdr["length"] == 64
148  and filehdr["SCID"] == b"PACE"
149  and filehdr["processorid"] in (1, 2, 30)
150  )
151 
152  if not filehdr or not desired:
153  status = 120 if status > 0 else 102
154  if input_list:
155  logging.warning(f"File {filename} has invalid header; continuing")
156  continue # go to next file
157  else:
158  logging.error(f"File {filename} has invalid header; returning")
159  return status
160 
161  # read CCSDS packets
162  for packet in ccsdspy.utils.iter_packet_bytes(
163  ifile, include_primary_header=True
164  ):
165  data = packet[6:]
166  header = ccsdspy.utils.read_primary_headers(BytesIO(packet))
167  for k, v in header.items():
168  header[k] = v[0] # remove outer array
169  logging.info(header)
170 
171  # check for invalid header
172  if header["CCSDS_PACKET_LENGTH"] > 16378:
173  status = 120 if status > 0 else 103
174  logging.warning(
175  f"File {filename} contains invalid CCSDS packet header: {header}"
176  )
177  continue # go to next packet
178 
179  # check for truncated data
180  if len(data) < header["CCSDS_PACKET_LENGTH"] + 1:
181  status = 120 if status > 0 else 104
182  logging.warning(
183  f"File {filename} has unexpected EOF: expected"
184  f" {header['CCSDS_PACKET_LENGTH']+1} more bytes, got {len(data)}"
185  )
186  break # done with this file
187 
188  # check for invalid timestamp
189  if (
190  header["CCSDS_SECONDARY_FLAG"] == 1
191  and data[0] > 112 # after 2017-07-18T05:49:15Z
192  and data[0] < 192 # before 2060-01-28T16:50:35Z
193  ):
194  timestamp = tai58_as_datetime(readTimestamp(data))
195  else:
196  continue # done with this packet
197 
198  # parse defined mnemonics
199  apid = header["CCSDS_APID"]
200  if apid in packetDef.keys():
201  myDict = (packetDef[apid]).load(BytesIO(packet))
202  for key, val in myDict.items():
203  if not key.endswith("time"):
204  outdict = {}
205  outdict["filename"] = fname
206  outdict["time_val"] = timestamp.strftime(
207  "%Y-%m-%d %H:%M:%S.%f"
208  )[:-3]
209  outdict["var"] = key
210  outdict["value"] = val[0]
211  outdict["alert_type"] = "" # populated later
212  outdict["recorded"] = "" # database ingest datetime
213  dictList.append(outdict)
214 
215  # end (for packet in packets)
216 
217  # close input file
218  ifile.close()
219 
220  # write new dataframe
221  if len(dictList) > 0:
222  logging.info(f"Writing {len(dictList)} records from {fname} to {ofile}")
223  df = pd.DataFrame(dictList)
224  df.to_csv(ofile, sep=",", index=False, header=writeheader, mode="a")
225  writeheader = False
226 
227  # end (for filename in filelist)
228 
229  if writeheader: # true if no records written
230  logging.warning(f"No requested packets found.")
231  status = 120 if status > 0 else 110
232 
233  if status:
234  logging.warning(f"Exiting with status code {status}")
235  return status
236 
237 
238 if __name__ == "__main__":
239  sys.exit(main())
def read_primary_headers(file)
Definition: utils.py:109
void load(float x1, float v[], float y[])
def iter_packet_bytes(file, include_primary_header=True)
Definition: utils.py:33
void print(std::ostream &stream, const char *format)
Definition: PrintDebug.hpp:38
def readTimestamp(data)
Definition: PacketUtils.py:19
def main()
Definition: tlmgen_pace.py:17
def tai58_as_datetime(tai58)
Definition: PacketUtils.py:24