NASA Logo
Ocean Color Science Software

ocssw V2022
multilevel_processor.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 """
4 Program to perform multilevel processing (previously known as the
5 seadas_processor and sometimes referred to as the 'uber' processor).
6 """
7 
8 try:
9  import configparser
10 except ImportError:
11  import ConfigParser as configparser
12 
13 import datetime
14 import logging
15 import optparse
16 import os
17 import re
18 import subprocess
19 import sys
20 import tarfile
21 import time
22 import traceback
23 
26 import mlp.mlp_utils as mlp_utils
27 import mlp.benchmark_timer as benchmark_timer
28 import seadasutils.MetaUtils as MetaUtils
29 import mlp.name_finder_utils as name_finder_utils
30 import mlp.obpg_data_file as obpg_data_file
31 import seadasutils.ProcUtils as ProcUtils
32 import mlp.processor as processor
33 import mlp.processing_rules as processing_rules
34 import mlp.uber_par_file_reader as uber_par_file_reader
35 #import product
36 
37 __version__ = '1.0.6'
38 
39 __author__ = 'melliott'
40 
41 class ProcessorConfig(object):
42  """
43  Configuration data for the program which needs to be widely available.
44  """
45  SECS_PER_DAY = 86400
46  def __init__(self, hidden_dir, ori_dir, verbose, overwrite, use_existing,
47  deletefiles, combine_files, out_dir=None):
48  self.prog_name = os.path.basename(sys.argv[0])
49 
50  if not os.path.exists(hidden_dir):
51  try:
52  os.mkdir(hidden_dir)
53  except OSError:
54  if sys.exc_info()[1].find('Permission denied:') != -1:
55  log_and_exit('Error! Unable to create directory {0}'.\
56  format(hidden_dir))
57  self.hidden_dir = hidden_dir
58  self.original_dir = ori_dir
59  self.verbose = verbose
60  self.deletefiles = deletefiles
61  self.overwrite = overwrite
62  self.use_existing = use_existing
63  self.combine_files = combine_files
64  self.get_anc = True
65  # self.tar_filename = tar_name
66  # self.timing = timing
67  if out_dir:
68  self.output_dir = out_dir
69  self.output_dir_is_settable = False
70  else:
71  self.output_dir = '.' # default to current dir, change later if
72  # specified in par file or command line
73  self.output_dir_is_settable = True
74  cfg_file_path = os.path.join(self.hidden_dir, 'seadas_ocssw.cfg')
75  if os.path.exists(cfg_file_path):
76  self._read_saved_options(cfg_file_path)
77  else:
78  self.max_file_age = 2592000 # number of seconds in 30 days
79  self._write_default_cfg_file(cfg_file_path)
80  ProcessorConfig._instance = self
81 
82  def _read_saved_options(self, cfg_path):
83  """
84  Gets options stored in the program's configuration file.
85  """
86  try:
87  cfg_parser = configparser.ConfigParser()
88  cfg_parser.read(cfg_path)
89  try:
90  self.max_file_age = ProcessorConfig.SECS_PER_DAY * \
91  int(cfg_parser.get('main',
92  'par_file_age').\
93  split(' ', 2)[0])
94  except configparser.NoSectionError as nse:
95  print ('nse: ' + str(nse))
96  print ('sys.exc_info(): ')
97  for msg in sys.exc_info():
98  print (' ' + str(msg))
99  log_and_exit('Error! Configuration file has no "main" ' +
100  'section.')
101  except configparser.NoOptionError:
102  log_and_exit('Error! The "main" section of the configuration ' +
103  'file does not specify a "par_file_age".')
104  except configparser.MissingSectionHeaderError:
105  log_and_exit('Error! Bad configuration file, no section headers ' +
106  'found.')
107 
108  def _set_temp_dir(self):
109  """
110  Sets the value of the temporary directory.
111  """
112  if os.path.exists('/tmp') and os.path.isdir('/tmp') and \
113  os.access('/tmp', os.W_OK):
114  return '/tmp'
115  else:
116  cwd = os.getcwd()
117  if os.path.exists(cwd) and os.path.isdir(cwd) and \
118  os.access(cwd, os.W_OK):
119  return cwd
120  else:
121  log_and_exit('Error! Unable to establish a temporary ' +
122  'directory.')
123 
124  def _write_default_cfg_file(self, cfg_path):
125  """
126  Writes out a configuration file using default values.
127  """
128  with open(cfg_path, 'wt') as cfg_file:
129  cfg_file.write('[main]\n')
130  cfg_file.write('par_file_age=30 # units are days\n')
131 
132 class Sensor(object):
133  """
134  Sensor contains the recipe and procssing method for general sensors.
135  """
136  def __init__(self):
137  self.name = 'general'
138  self.rules_dict = {
139  'level 1a': processing_rules.build_rule('level 1a', ['level 0'],
140  self.run_bottom_error, False),
141  'l1brsgen': processing_rules.build_rule('l1brsgen', ['l1'],
142  self.run_l1brsgen, False),
143  'l2brsgen': processing_rules.build_rule('l2brsgen', ['l2gen'],
144  self.run_l2brsgen, False),
145  # 'l1mapgen': processing_rules.build_rule('l1mapgen', ['l1'],
146  # self.run_l1mapgen, False),
147  # 'l2mapgen': processing_rules.build_rule('l2mapgen', ['l2gen'],
148  # self.run_l2mapgen, False),
149  'level 1b': processing_rules.build_rule('level 1b', ['level 1a'],
150  self.run_l1b, False),
151  'l2gen': processing_rules.build_rule('l2gen', ['l1'], self.run_l2gen,
152  False),
153  'l2extract': processing_rules.build_rule('l2extract', ['l2gen'],
154  self.run_l2extract, False),
155  'l2bin': processing_rules.build_rule('l2bin', ['l2gen'], self.run_l2bin,
156  True),
157  #'l3bin': processing_rules.build_rule('l3bin', ['l2bin'], self.run_l3bin,
158  # True),
159  'l3mapgen': processing_rules.build_rule('l3mapgen', ['l2bin'],
160  self.run_l3mapgen, False)
161  # 'smigen': processing_rules.build_rule('smigen', ['l3bin'], self.run_smigen,
162  # False)
163  }
164  self.rules_order = ['level 1a', 'l1brsgen', 'level 1b', 'l2gen',
165  'l2extract', 'l2brsgen', 'l2bin', 'l3mapgen']
166  self.name = 'general'
167  self.require_geo = False
169  self.recipe = processing_rules.RuleSet('General rules', self.rules_dict, self.rules_order)
170 
171  def run_bottom_error(self, proc):
172  """
173  Exits with an error message when there is an attempt to process a source
174  file at the lowest level of a rule chain.
175  """
176  err_msg = 'Error! Attempting to create {0} product, but no creation program is known.'.format(proc.target_type)
177  log_and_exit(err_msg)
178 
179  def run_l1b(self, proc):
180  """
181  Sets up and runs an executable program.
182  """
183  #todo: replace l1bgen with the appropriate proc.whatever
184  prog = os.path.join(proc.ocssw_bin, 'l1bgen_generic')
185  args = ['ifile={}'.format(proc.input_file),'ofile={}'.format(proc.output_file)]
186  if not proc.geo_file is None:
187  args.append('geofile={}'.format(proc.geo_file))
188  args.extend(get_options(proc.par_data))
189  cmd = [prog]
190  cmd.extend(args)
191  # cmd = [prog, args]
192  return execute_command(cmd)
193 
194  def run_l1brsgen(self, proc):
195  """
196  Runs the l1brsgen executable.
197  """
198  # l1brs_suffixes = {'0':'L1_BRS', '1':'L1_BRS', '2':'ppm',
199  # '3':'flt', '4':'png',
200  # 'hdf4': 'hdf', 'bin': 'bin', 'png': 'png',
201  # 'ppm': 'ppm'}
202  prog = os.path.join(proc.ocssw_bin, 'l1brsgen')
203  opts = get_options(proc.par_data)
204  cmd = [prog]
205  cmd.extend(opts)
206  cmd.extend(['ifile={}'.format(proc.input_file),'ofile={}'.format(proc.output_file)])
207  if proc.geo_file:
208  cmd.append('geofile={}'.format(proc.geo_file))
209 
210  # logging.debug('Executing: "%s"', cmd)
211  logging.debug('Executing: "%s"', " ".join(str(x) for x in cmd))
212  status = execute_command(cmd)
213  return status
214 
215  def run_l2bin(self, proc):
216  """
217  Set up for and perform L2 binning.
218  """
219  prog = os.path.join(proc.ocssw_bin, 'l2bin')
220  if not os.path.exists(prog):
221  print ("Error! Cannot find executable needed for {0}".\
222  format(proc.rule_set.rules[proc.target_type].action))
223  args = ['infile={}'.format(proc.input_file),
224  'ofile={}'.format(proc.output_file)]
225  args.extend(get_options(proc.par_data))
226  cmd = [prog]
227  cmd.extend(args)
228  # cmd = [prog, args]
229  # logging.debug('Running l2bin cmd: {}'.format(" ".join(str(x) for x in cmd)))
230  if cfg_data.verbose:
231  print ('l2bin cmd: {}'.format(" ".join(str(x) for x in cmd)))
232  ret_val = execute_command(cmd)
233  if ret_val != 0:
234  if os.path.exists(proc.output_file):
235  msg = '-I- The l2bin program returned a status value of {0}. Proceeding with processing, using the output l2 bin file {1}'.format(ret_val, proc.output_file)
236  logging.info(msg)
237  ret_val = 0
238  else:
239  msg = '-I- The l2bin program produced a bin file with no data. No further processing will be done.'
240  sys.exit(msg)
241  return ret_val
242 
243  def run_l2brsgen(self, proc):
244  """
245  Runs the l2brsgen executable.
246  """
247  logging.debug("In run_l2brsgen")
248  prog = os.path.join(proc.ocssw_bin, 'l2brsgen')
249  opts = get_options(proc.par_data)
250  cmd = [prog]
251  cmd.extend(opts)
252  cmd.extend(['ifile={}'.format(proc.input_file), 'ofile={}'.format(proc.output_file)])
253  logging.debug('Executing: "%s"', " ".join(str(x) for x in cmd))
254  status = execute_command(cmd)
255  return status
256 
257  def run_l2extract(self, proc):
258  """
259  Set up and run l2extract.
260  """
261  if 'SWlon' in proc.par_data and 'SWlat' in proc.par_data and \
262  'NElon' in proc.par_data and 'NElat' in proc.par_data:
263  start_line, end_line, start_pixel, end_pixel = get_extract_params(proc)
264  if (start_line is None) or (end_line is None) or (start_pixel is None) \
265  or (end_pixel is None):
266  err_msg = 'Error! Could not compute coordinates for l2extract.'
267  log_and_exit(err_msg)
268  l2extract_prog = os.path.join(proc.ocssw_bin, 'l2extract')
269  l2extract_cmd = [l2extract_prog, proc.input_file,
270  str(start_pixel), str(end_pixel),
271  str(start_line), str(end_line), '1', '1',
272  proc.output_file]
273  logging.debug('Executing l2extract command: {}'.format(" ".join(str(x) for x in l2extract_cmd)))
274  status = execute_command(l2extract_cmd)
275  return status
276  else:
277  err_msg = 'Error! Geographical coordinates not specified for l2extract.'
278  log_and_exit(err_msg)
279 
280  def run_l2gen(self, proc):
281  """
282  Set up for and perform L2 processing.
283  """
284  if cfg_data.get_anc:
285  getanc_prog = build_executable_path('getanc')
286  getanc_cmd = [getanc_prog, proc.input_file]
287  logging.debug('running getanc command: {}'.format(" ".join(str(x) for x in getanc_cmd)))
288  execute_command(getanc_cmd)
289  l2gen_prog = os.path.join(proc.ocssw_bin, 'l2gen')
290  if not os.path.exists(l2gen_prog):
291  print ("Error! Cannot find executable needed for {0}".\
292  format(proc.rule_set.rules[proc.target_type].action))
293  par_name = build_l2gen_par_file(proc.par_data, proc.input_file,
294  proc.geo_file, proc.output_file)
295  logging.debug('L2GEN_FILE=' + proc.output_file)
296 
297  args = 'par={}/{}'.format(cfg_data.original_dir, par_name)
298  l2gen_cmd = [l2gen_prog, args]
299  if cfg_data.verbose or DEBUG:
300  logging.debug('l2gen cmd: {}'.format(" ".join(str(x) for x in l2gen_cmd)))
301  return execute_command(l2gen_cmd)
302 
303  def run_l3bin(self, proc):
304  """
305  Set up and run the l3Bin program
306  """
307  prog = os.path.join(proc.ocssw_bin, 'l3bin')
308  if not os.path.exists(prog):
309  print ("Error! Cannot find executable needed for {0}".\
310  format(proc.rule_set.rules[proc.target_type].action))
311  args = ['ifile={}'.format(proc.input_file)]
312  for key in proc.par_data:
313  if (key != 'odir') and (key != 'ofile') and not key.lower() in FILE_USE_OPTS:
314  args.append("{}={}".format(key,proc.par_data[key]))
315  args.append("in={}".format(proc.input_file))
316  args.append("out={}".format(proc.output_file))
317  cmd = [prog]
318  cmd.extend(args)
319  # cmd = [prog, args]
320  logging.debug('Executing l3bin command: {}'.format(" ".join(str(x) for x in cmd)))
321  ret_val = execute_command(cmd)
322  if ret_val != 0:
323  if os.path.exists(proc.output_file):
324  print(msg = '-I- The l3bin program returned a status value of {0}. Proceeding with processing, using the output l2 bin file {1}'.format(
325  ret_val, proc.output_file))
326  msg = '-I- The l3bin program returned a status value of {0}. Proceeding with processing, using the output l2 bin file {1}'.format(
327  ret_val, proc.output_file)
328  logging.info(msg)
329  ret_val = 0
330  # else:
331  # msg = "-I- The l3bin program produced a bin file with no data. No further processing will be done."
332  # sys.exit(msg)
333  return ret_val
334 
335  def run_l3mapgen(self, proc):
336  """
337  Set up and run the l3mapgen program.
338  """
339  prog = os.path.join(proc.ocssw_bin, 'l3mapgen')
340  if not os.path.exists(prog):
341  print ("Error! Cannot find executable needed for {0}".\
342  format(proc.rule_set.rules[proc.target_type].action))
343  args = ['ifile={}'.format(proc.input_file)]
344  for key in proc.par_data:
345  if (key != 'odir') and (key != 'ofile') and not key.lower() in FILE_USE_OPTS:
346  args.append('{}={}'.format(key, proc.par_data[key]))
347  args.append('ofile={}'.format(proc.output_file))
348  cmd = [prog]
349  cmd.extend(args)
350  # cmd = [prog, args]
351  logging.debug('Executing l3mapgen command: "%s"', " ".join(str(x) for x in cmd))
352  return execute_command(cmd)
353 
354  # def run_smigen(self, proc):
355  # """
356  # Set up for and perform SMI (Standard Mapped Image) generation.
357  # """
358  # status = None
359  # prog = os.path.join(proc.ocssw_bin, 'smigen')
360  # if not os.path.exists(prog):
361  # print ("Error! Cannot find executable needed for {0}".\
362  # format(proc.rule_set.rules[proc.target_type].action))
363  # if 'prod' in proc.par_data:
364  # args = 'ifile=' + proc.input_file + ' ofile=' + proc.output_file + \
365  # ' prod=' + proc.par_data['prod']
366  # cmd = ' '.join([prog, args])
367  # for key in proc.par_data:
368  # if (key != 'prod') and not (key.lower() in FILE_USE_OPTS):
369  # args += ' ' + key + '=' + proc.par_data[key]
370  # logging.debug('\nRunning smigen command: ' + cmd)
371  # status = execute_command(cmd)
372  # else:
373  # err_msg = 'Error! No product specified for smigen.'
374  # log_and_exit(err_msg)
375  # return status
376 
378  """
379  Sensor GOCI contains GOCI specific recipe and procssing methods.
380  """
381  def __init__(self):
382  self.name = 'goci'
383  self.rules_dict = {
384  'level 1a': processing_rules.build_rule('level 1a', ['level 0'],
385  self.run_bottom_error, False),
386  'l1brsgen': processing_rules.build_rule('l1brsgen', ['l1'],
387  self.run_l1brsgen, False),
388  'l2brsgen': processing_rules.build_rule('l2brsgen', ['l2gen'],
389  self.run_l2brsgen, False),
390  # 'l1mapgen': processing_rules.build_rule('l1mapgen', ['l1'],
391  # self.run_l1mapgen, False),
392  # 'l2mapgen': processing_rules.build_rule('l2mapgen', ['l2gen'],
393  # self.run_l2mapgen, False),
394  'level 1b': processing_rules.build_rule('level 1b', ['level 1a'],
395  self.run_l1b, False),
396  'l2gen': processing_rules.build_rule('l2gen', ['level 1b'], self.run_l2gen,
397  False),
398  'l2extract': processing_rules.build_rule('l2extract', ['l2gen'],
399  self.run_l2extract, False),
400  'l2bin': processing_rules.build_rule('l2bin', ['l2gen'], self.run_l2bin,
401  True),
402  #'l3bin': processing_rules.build_rule('l3bin', ['l2bin'], self.run_l3bin,
403  # True),
404  'l3mapgen': processing_rules.build_rule('l3mapgen', ['l2bin'],
405  self.run_l3mapgen, False)
406  # 'smigen': processing_rules.build_rule('smigen', ['l3bin'], self.run_smigen,
407  # False)
408  }
409  self.rules_order = ['level 1a', 'l1brsgen', 'level 1b', 'l2gen',
410  'l2extract', 'l2brsgen', 'l2bin', 'l3mapgen']
411  self.require_geo = False
413  self.recipe = processing_rules.RuleSet('GOCI rules', self.rules_dict, self.rules_order)
414 
416  """
417  Sensor HAWKEYE contains HAWKEYE specific recipe and procssing methods.
418  """
419  def __init__(self):
420  self.name = 'hawkeye'
421  self.rules_dict = {
422  'level 1a': processing_rules.build_rule('level 1a', ['nothing lower'],
423  self.run_bottom_error, False),
424  'l1brsgen': processing_rules.build_rule('l1brsgen', ['level 1a', 'geo'],
425  self.run_l1brsgen, False),
426  # 'l1mapgen': processing_rules.build_rule('l1mapgen', ['level 1a', 'geo'],
427  # self.run_l1mapgen, False),
428  'geo': processing_rules.build_rule('geo', ['level 1a'],
429  self.run_geo, False),
430  'l2gen': processing_rules.build_rule('l2gen', ['level 1a', 'geo'],
431  self.run_l2gen, False),
432  'l2extract': processing_rules.build_rule('l2extract', ['l2gen'],
433  self.run_l2extract, False),
434  'l2brsgen': processing_rules.build_rule('l2brsgen', ['l2gen'],
435  self.run_l2brsgen, False),
436  # 'l2mapgen': processing_rules.build_rule('l2mapgen', ['l2gen'],
437  # self.run_l2mapgen, False),
438  'l2bin': processing_rules.build_rule('l2bin', ['l2gen'], self.run_l2bin,
439  True),
440  #'l3bin': processing_rules.build_rule('l3bin', ['l2bin'], self.run_l3bin,
441  # True),
442  'l3mapgen': processing_rules.build_rule('l3mapgen', ['l2bin'],
443  self.run_l3mapgen, False, False),
444  # 'smigen': processing_rules.build_rule('smigen', ['l3bin'], self.run_smigen,
445  # False)
446  }
447  self.rules_order = ['level 1a', 'geo', 'l1brsgen',
448  'l2gen', 'l2extract', 'l2bin',
449  'l2brsgen', 'l3mapgen']
450  self.require_geo = True
452  self.recipe = processing_rules.RuleSet('HAWKEYE Rules', self.rules_dict, self.rules_order)
453 
454  def run_geo(self, proc):
455  """
456  Set up and run the geolocate_hawkeye program, returning the exit status of the run.
457  """
458  logging.debug('In run_geolocate_hawkeye')
459  prog = build_executable_path('geolocate_hawkeye')
460 
463  if not prog:
464  err_msg = 'Error! Cannot find program geolocate_hawkeye.'
465  logging.info(err_msg)
466  sys.exit(err_msg)
467  args = [proc.input_file, proc.output_file]
468  args.extend(get_options(proc.par_data))
469  cmd = [prog]
470  cmd.extend(args)
471  # cmd = [prog, args]
472  logging.debug('\nRunning: {}'.format(" ".join(str(x) for x in cmd)))
473  return execute_command(cmd)
474 
476  """
477  Sensor MERIS contains MERIS specific recipe and processing methods.
478 
479  Rule format:
480  target type (string), source types (list of strings), batch processing
481  flag (Boolean), action to take (function name)
482  """
483  def __init__(self):
484  self.name = 'meris'
485  self.rules_dict = {
486  'level 1a': processing_rules.build_rule('level 1a', ['level 0'],
487  self.run_bottom_error, False),
488  'l1brsgen': processing_rules.build_rule('l1brsgen', ['l1'],
489  self.run_l1brsgen, False),
490  'l2brsgen': processing_rules.build_rule('l2brsgen', ['l2gen'],
491  self.run_l2brsgen, False),
492  # 'l1mapgen': processing_rules.build_rule('l1mapgen', ['l1'],
493  # self.run_l1mapgen, False),
494  # 'l2mapgen': processing_rules.build_rule('l2mapgen', ['l2gen'],
495  # self.run_l2mapgen, False),
496  'level 1b': processing_rules.build_rule('level 1b', ['level 1a'],
497  self.run_l1b, False),
498  'l2gen': processing_rules.build_rule('l2gen', ['level 1b'], self.run_l2gen,
499  False),
500  'l2extract': processing_rules.build_rule('l2extract', ['l2gen'],
501  self.run_l2extract, False),
502  'l2bin': processing_rules.build_rule('l2bin', ['l2gen'], self.run_l2bin,
503  True),
504  #'l3bin': processing_rules.build_rule('l3bin', ['l2bin'], self.run_l3bin,
505  # True),
506  'l3mapgen': processing_rules.build_rule('l3mapgen', ['l2bin'],
507  self.run_l3mapgen, False),
508  # 'smigen': processing_rules.build_rule('smigen', ['l3bin'], self.run_smigen,
509  # False)
510  }
511  self.rules_order = ['level 1a', 'l1brsgen', 'level 1b', 'l2gen',
512  'l2extract', 'l2brsgen', 'l2bin', 'l3mapgen']
513  self.require_geo = False
515  self.recipe = processing_rules.RuleSet('MERIS rules', self.rules_dict, self.rules_order)
516 
518  """
519  Sensor MODIS contains MODIS specific recipe and processing methods.
520  """
521  def __init__(self):
522  self.name = 'modis'
523  self.rules_dict = {
524  'level 0': processing_rules.build_rule('level 0', ['nothing lower'],
525  self.run_bottom_error, False),
526  'level 1a': processing_rules.build_rule('level 1a', ['level 0'],
527  self.run_l1a, False),
528  'l1brsgen': processing_rules.build_rule('l1brsgen', ['level 1b', 'geo'],
529  self.run_l1brsgen, False),
530  # 'l1mapgen': processing_rules.build_rule('l1mapgen', ['level 1b', 'geo'],
531  # self.run_l1mapgen, False),
532  'geo': processing_rules.build_rule('geo', ['level 1a'], self.run_geo,
533  False),
534  'l1aextract': processing_rules.build_rule('l1aextract',
535  ['level 1a', 'geo'],
536  self.run_l1aextract,
537  False),
538  'level 1b': processing_rules.build_rule('level 1b',
539  ['level 1a', 'geo'],
540  self.run_l1b, False),
541  'l2gen': processing_rules.build_rule('l2gen', ['level 1b', 'geo'],
542  self.run_l2gen, False),
543  'l2extract': processing_rules.build_rule('l2extract', ['l2gen'],
544  self.run_l2extract, False),
545  'l2brsgen': processing_rules.build_rule('l2brsgen', ['l2gen'],
546  self.run_l2brsgen, False),
547  # 'l2mapgen': processing_rules.build_rule('l2mapgen', ['l2gen'],
548  # self.run_l2mapgen, False),
549  'l2bin': processing_rules.build_rule('l2bin', ['l2gen'], self.run_l2bin,
550  True),
551  #'l3bin': processing_rules.build_rule('l3bin', ['l2bin'], self.run_l3bin,
552  # True),
553  'l3mapgen': processing_rules.build_rule('l3mapgen', ['l2bin'],
554  self.run_l3mapgen, False, False),
555  # 'smigen': processing_rules.build_rule('smigen', ['l3bin'], self.run_smigen,
556  # False)
557  }
558  self.rules_order = ['level 0', 'level 1a', 'geo', 'l1aextract',
559  'level 1b', 'l1brsgen', 'l2gen', 'l2extract',
560  'l2bin', 'l2brsgen', 'l3mapgen']
561  self.require_geo = True
563  self.recipe = processing_rules.RuleSet('MODIS Rules', self.rules_dict, self.rules_order)
564 
565  def run_l1aextract(self, proc):
566  """
567  Set up and run l1aextract_modis.
568  """
569  if 'SWlon' in proc.par_data and 'SWlat' in proc.par_data and\
570  'NElon' in proc.par_data and 'NElat' in proc.par_data:
571  start_line, end_line, start_pixel, end_pixel = get_extract_params(proc)
572  if (start_line is None) or (end_line is None) or (start_pixel is None)\
573  or (end_pixel is None):
574  err_msg = 'Error! Cannot find l1aextract_modis coordinates.'
575  log_and_exit(err_msg)
576  l1aextract_prog = os.path.join(proc.ocssw_bin, 'l1aextract_modis')
577  l1aextract_cmd = [l1aextract_prog, proc.input_file,
578  str(start_pixel), str(end_pixel),
579  str(start_line), str(end_line),
580  proc.output_file]
581  logging.debug('Executing l1aextract_modis command: "%s"',
582  " ".join(str(x) for x in l1aextract_cmd))
583  status = execute_command(l1aextract_cmd)
584  return status
585 
586  def run_geo(self, proc):
587  """
588  Sets up and runs the MODIS GEO script.
589  """
590  prog = build_executable_path('modis_GEO')
591  # os.path.join(proc.ocssw_root, 'run', 'scripts', 'modis_GEO')
592  args = [proc.input_file]
593  args.append('--output={}'.format(proc.output_file))
594  args.extend(get_options(proc.par_data))
595  # cmd = [prog, args]
596  cmd = [prog]
597  cmd.extend(args)
598  logging.debug('\nRunning: {}'.format(" ".join(str(x) for x in cmd)))
599  return execute_command(cmd)
600 
601  def run_l1a(self, proc):
602  """
603  Sets up and runs the MODIS L1A script.
604  """
605  prog = build_executable_path('modis_L1A')
606  args = [proc.input_file]
607  args.append('--output={}'.format(proc.output_file))
608  args.extend(get_options(proc.par_data))
609  cmd = [prog]
610  cmd.extend(args)
611  # logging.debug("\nRunning: " + cmd)
612  logging.debug('\nRunning: {}'.format(" ".join(str(x) for x in cmd)))
613  return execute_command(cmd)
614 
615  def run_l1b(self, proc):
616  """
617  Runs the L1B script.
618  """
619  prog = build_executable_path('modis_L1B')
620  args = ['-o', proc.output_file]
621  args.extend(get_options(proc.par_data))
622  # The following is no longer needed, but kept for reference.
623  # args += ' --lutdir $OCSSWROOT/run/var/modisa/cal/EVAL --lutver=6.1.15.1z'
624  args.append(proc.input_file)
625  if not proc.geo_file is None:
626  args.append(proc.geo_file)
627  cmd = [prog]
628  cmd.extend(args)
629  logging.debug('\nRunning: {}'.format(" ".join(str(x) for x in cmd)))
630  return execute_command(cmd)
631 
633  """
634  Sensor SeaWiFS contains SeaWiFS sepcific recipe and processing method.
635  """
636  def __init__(self):
637  self.name = 'seawifs'
638  self.rules_dict = {
639  'level 1a': processing_rules.build_rule('level 1a', ['level 0'],
640  self.run_bottom_error, False),
641  'l1aextract': processing_rules.build_rule('l1aextract',
642  ['level 1a'],
643  self.run_l1aextract,
644  False),
645  'l1brsgen': processing_rules.build_rule('l1brsgen', ['l1'],
646  self.run_l1brsgen, False),
647  # 'l1mapgen': processing_rules.build_rule('l1mapgen', ['l1'],
648  # self.run_l1mapgen, False),
649  'level 1b': processing_rules.build_rule('level 1b', ['level 1a'],
650  self.run_l1b, False),
651  'l2gen': processing_rules.build_rule('l2gen', ['l1'], self.run_l2gen,
652  False),
653  'l2extract': processing_rules.build_rule('l2extract', ['l2gen'],
654  self.run_l2extract, False),
655  'l2brsgen': processing_rules.build_rule('l2brsgen', ['l2gen'],
656  self.run_l2brsgen, False),
657  # 'l2mapgen': processing_rules.build_rule('l2mapgen', ['l2gen'],
658  # self.run_l2mapgen, False),
659  'l2bin': processing_rules.build_rule('l2bin', ['l2gen'], self.run_l2bin,
660  True),
661  #'l3bin': processing_rules.build_rule('l3bin', ['l2bin'], self.run_l3bin,
662  # True, False),
663  'l3mapgen': processing_rules.build_rule('l3mapgen', ['l2bin'],
664  self.run_l3mapgen, False, False),
665  # 'smigen': processing_rules.build_rule('smigen', ['l3bin'], self.run_smigen,
666  # False)
667  }
668  self.rules_order = ['level 1a', 'l1aextract', 'l1brsgen',
669  'level 1b', 'l2gen', 'l2extract',
670  'l2brsgen', 'l2bin', 'l3mapgen']
671  self.require_geo = False
673  self.recipe = processing_rules.RuleSet("SeaWiFS Rules", self.rules_dict, self.rules_order)
674 
675  def run_l1aextract(self, proc):
676  """
677  Set up and run l1aextract_seawifs.
678  """
679  if 'SWlon' in proc.par_data and 'SWlat' in proc.par_data and\
680  'NElon' in proc.par_data and 'NElat' in proc.par_data:
681  start_line, end_line, start_pixel, end_pixel = get_extract_params(proc)
682  if (start_line is None) or (end_line is None) or (start_pixel is None)\
683  or (end_pixel is None):
684  err_msg = 'Error! Cannot compute l1aextract_seawifs coordinates.'
685  log_and_exit(err_msg)
686  l1aextract_prog = os.path.join(proc.ocssw_bin, 'l1aextract_seawifs')
687  l1aextract_cmd = [l1aextract_prog, proc.input_file,
688  str(start_pixel), str(end_pixel),
689  str(start_line), str(end_line), '1', '1',
690  proc.output_file]
691  logging.debug('Executing l1aextract_seawifs command: "%s"',
692  " ".join(str(x) for x in l1aextract_cmd))
693  status = execute_command(l1aextract_cmd)
694  return status
695 
697  """
698  Sensor VIIRS contains VIIRS sepcific recipe and processing method..
699  """
700  def __init__(self):
701  self.name = 'viirs'
702  self.rules_dict = {
703  'level 1a': processing_rules.build_rule('level 1a', ['nothing lower'],
704  self.run_bottom_error, False),
705  'l1brsgen': processing_rules.build_rule('l1brsgen', ['l1', 'geo'],
706  self.run_l1brsgen, False),
707  # 'l1mapgen': processing_rules.build_rule('l1mapgen', ['l1', 'geo'],
708  # self.run_l1mapgen, False),
709  'geo': processing_rules.build_rule('geo', ['level 1a'],
710  self.run_geo, False),
711  'l1aextract': processing_rules.build_rule('l1aextract',
712  ['level 1a', 'geo'],
713  self.run_l1aextract,
714  False),
715  'level 1b': processing_rules.build_rule('level 1b', ['level 1a', 'geo'],
716  self.run_l1b, False),
717  'l2gen': processing_rules.build_rule('l2gen', ['l1', 'geo'],
718  self.run_l2gen, False),
719  'l2extract': processing_rules.build_rule('l2extract', ['l2gen'],
720  self.run_l2extract, False),
721  'l2brsgen': processing_rules.build_rule('l2brsgen', ['l2gen'],
722  self.run_l2brsgen, False),
723  # 'l2mapgen': processing_rules.build_rule('l2mapgen', ['l2gen'],
724  # self.run_l2mapgen, False),
725  'l2bin': processing_rules.build_rule('l2bin', ['l2gen'], self.run_l2bin,
726  True),
727  #'l3bin': processing_rules.build_rule('l3bin', ['l2bin'], self.run_l3bin,
728  # True),
729  'l3mapgen': processing_rules.build_rule('l3mapgen', ['l2bin'],
730  self.run_l3mapgen, False, False),
731  # 'smigen': processing_rules.build_rule('smigen', ['l3bin'], self.run_smigen,
732  # False)
733  }
734  self.rules_order = ['level 1a', 'geo', 'l1aextract', 'level 1b', 'l1brsgen',
735  'l2gen', 'l2extract', 'l2bin',
736  'l2brsgen', 'l3mapgen']
737  self.require_geo = True
739  self.recipe = processing_rules.RuleSet('VIIRS Rules', self.rules_dict, self.rules_order)
740 
741  def run_geo(self, proc):
742  """
743  Set up and run the geolocate_viirs program, returning the exit status of the run.
744  """
745  logging.debug('In run_geolocate_viirs')
746  prog = build_executable_path('geolocate_viirs')
747 
750  if not prog:
751  err_msg = 'Error! Cannot find program geolocate_viirs.'
752  logging.info(err_msg)
753  sys.exit(err_msg)
754  args = ['-ifile={}'.format(proc.input_file), '-geofile_mod={}'.format(proc.output_file)]
755  args.extend(get_options(proc.par_data))
756  cmd = [prog]
757  cmd.extend(args)
758  # cmd = [prog, args]
759  logging.debug('\nRunning: {}'.format(" ".join(str(x) for x in cmd)))
760  return execute_command(cmd)
761 
762  def run_l1b(self, proc):
763  logging.debug('In run_viirs_l1b')
764  prog = build_executable_path('calibrate_viirs')
765  # prog='/accounts/melliott/seadas/ocssw/bin/calibrate_viirs'
766 
767  args = ['ifile={}'.format(proc.input_file), 'l1bfile_mod={}'.format(proc.output_file)]
768  args.extend(get_options(proc.par_data))
769  # The following is no longer needed, but kept for reference.
770  # args += ' --lutdir $OCSSWROOT/run/var/modisa/cal/EVAL --lutver=6.1.15.1z'
771  # args += ' ' + proc.input_file
772  if proc.geo_file:
773  pass
774  # args += ' geofile=' + proc.geo_file
775  cmd = [prog]
776  cmd.extend(args)
777  # cmd = [prog, args]
778  logging.debug('\nRunning: {}'.format(" ".join(str(x) for x in cmd)))
779  return execute_command(cmd)
780 
781  def run_l1aextract(self, proc):
782  """
783  Set up and run l1aextract_viirs.
784  """
785  if 'SWlon' in proc.par_data and 'SWlat' in proc.par_data and\
786  'NElon' in proc.par_data and 'NElat' in proc.par_data:
787  start_line, end_line, start_pixel, end_pixel = get_extract_params(proc)
788  elif 'sline' in proc.par_data and 'eline' in proc.par_data and\
789  'spixl' in proc.par_data and 'epixl' in proc.par_data:
790  start_line = proc.par_data['sline']
791  end_line = proc.par_data['eline']
792  start_pixel = proc.par_data['spixl']
793  end_pixel = proc.par_data['epixl']
794 
795  if (start_line is None) or (end_line is None) or (start_pixel is None)\
796  or (end_pixel is None):
797  err_msg = 'Error! Cannot find l1aextract_viirs coordinates.'
798  log_and_exit(err_msg)
799  l1aextract_prog = os.path.join(proc.ocssw_bin, 'l1aextract_viirs')
800  l1aextract_cmd = [l1aextract_prog, proc.input_file,
801  str(start_pixel), str(end_pixel),
802  str(start_line), str(end_line),
803  proc.output_file]
804  logging.debug('Executing l1aextract_viirs command: "%s"',
805  " ".join(str(x) for x in l1aextract_cmd))
806  status = execute_command(l1aextract_cmd)
807  return status
808 
809 def get_obpg_data_file_object(file_specification):
810  """
811  Returns an obpg_data_file object for the file named in file_specification.
812  """
813  ftyper = mlp.get_obpg_file_type.ObpgFileTyper(file_specification)
814  (ftype, sensor) = ftyper.get_file_type()
815  (stime, etime) = ftyper.get_file_times()
816  obpg_data_file_obj = obpg_data_file.ObpgDataFile(file_specification, ftype,
817  sensor, stime, etime,
818  ftyper.attributes)
819  return obpg_data_file_obj
820 
821 def build_executable_path(prog_name):
822  """
823  Returns the directory in which the program named in prog_name is found.
824  None is returned if the program is not found.
825  """
826  exe_path = None
827  candidate_subdirs = ['bin', 'scripts']
828  for subdir in candidate_subdirs:
829  cand_path = os.path.join(OCSSWROOT_DIR, subdir, prog_name)
830  if os.path.exists(cand_path):
831  exe_path = cand_path
832  break
833  return exe_path
834 
835 def build_file_list_file(filename, file_list):
836  """
837  Create a file listing the names of the files to be processed.
838  """
839  with open(filename, 'wt') as file_list_file:
840  for fname in file_list:
841  file_list_file.write(fname + '\n')
842 
843 def build_l2gen_par_file(par_contents, input_file, geo_file, output_file):
844  """
845  Build the parameter file for L2 processing.
846  """
847  dt_stamp = datetime.datetime.today()
848  par_name = ''.join(['L2_', dt_stamp.strftime('%Y%m%d%H%M%S'), '.par'])
849  par_path = os.path.join(cfg_data.hidden_dir, par_name)
850  with open(par_path, 'wt') as par_file:
851  par_file.write('# Automatically generated par file for l2gen\n')
852  par_file.write('ifile=' + input_file + '\n')
853  if not geo_file is None:
854  par_file.write('geofile=' + geo_file + '\n')
855  par_file.write('ofile=' + output_file + '\n')
856  for l2_opt in par_contents:
857  if l2_opt != 'ifile' and l2_opt != 'geofile' \
858  and l2_opt != 'ofile' and l2_opt != 'odir' \
859  and not l2_opt in FILE_USE_OPTS:
860  par_file.write(l2_opt + '=' + par_contents[l2_opt] + '\n')
861  return par_path
862 
863 def check_options(options):
864  """
865  Check command line options
866  """
867  # if options.tar_file:
868  # if os.path.exists(options.tar_file):
869  # err_msg = 'Error! The tar file, {0}, already exists.'. \
870  # format(options.tar_file)
871  # log_and_exit(err_msg)
872  if options.ifile:
873  if not os.path.exists(options.ifile):
874  err_msg = 'Error! The specified input file, {0}, does not exist.'. \
875  format(options.ifile)
876  log_and_exit(err_msg)
877 
878 def clean_files(delete_list):
879  """
880  Delete unwanted files created during processing.
881  """
882  if cfg_data.verbose:
883  print ("Cleaning up files")
884  sys.stdout.flush()
885  files_deleted = 0
886  # Delete any files in the delete list. This contain "interemediate" files
887  # which were needed to complete processing, but which weren't explicitly
888  # requested as output targets.
889  for filepath in delete_list:
890  if cfg_data.verbose:
891  print ('Deleting {0}'.format(filepath))
892  sys.stdout.flush()
893  os.remove(filepath)
894  files_deleted += 1
895  # Delete hidden par files older than the cut off age
896  hidden_files = os.listdir(cfg_data.hidden_dir)
897  par_files = [f for f in hidden_files if f.endswith('.par')]
898  for par_file in par_files:
899  par_path = os.path.join(cfg_data.hidden_dir, par_file)
900  file_age = round(time.time()) - os.path.getmtime(par_path)
901  if file_age > cfg_data.max_file_age:
902  if cfg_data.verbose:
903  print ('Deleting {0}'.format(par_path))
904  sys.stdout.flush()
905  os.remove(par_path)
906  files_deleted += 1
907  if cfg_data.verbose:
908  if not files_deleted:
909  print ('No files were found for deletion.')
910  sys.stdout.flush()
911  elif files_deleted == 1:
912  print ('One file was deleted.')
913  sys.stdout.flush()
914  else:
915  print ('A total of {0} files were deleted.'.format(files_deleted))
916  sys.stdout.flush()
917 
918 def create_levels_list(rules_sets):
919  """
920  Returns a list containing all the levels from all the rules sets.
921  """
922  set_key = list(rules_sets.keys())[0]
923  logging.debug('set_key = %s', (set_key))
924  lvls_lst = [(lvl, [set_key]) for lvl in rules_sets[set_key].rules_order[1:]]
925  for rules_set_name in list(rules_sets.keys())[1:]:
926  for lvl_name in rules_sets[rules_set_name].rules_order[1:]:
927  names_list = [lst_item[0] for lst_item in lvls_lst]
928  if lvl_name in names_list:
929  lvls_lst[names_list.index(lvl_name)][1].append(rules_set_name)
930  else:
931  prev_ndx = rules_sets[rules_set_name].rules_order.index(lvl_name) - 1
932  if rules_sets[rules_set_name].rules_order[prev_ndx] in names_list:
933  ins_ndx = names_list.index(rules_sets[rules_set_name].rules_order[prev_ndx]) + 1
934  else:
935  ins_ndx = 0
936  lvls_lst.insert(ins_ndx, (lvl_name, [rules_set_name]))
937  return lvls_lst
938 
939 
940 def create_help_message(rules_sets):
941  """
942  Creates the message to be displayed when help is provided.
943  """
944  level_names = create_levels_list(rules_sets)
945  message = """
946  %prog [options] parameter_file
947 
948  The parameter_file is similar to, but not exactly like, parameter
949  files for OCSSW processing programs:
950  - It has sections separated by headers which are denoted by "["
951  and "]".
952  The section named "main" is required. Its allowed options are:
953  ifile - Required entry naming the input file(s) to be processed.
954  use_ancillary - use near real time ancillary data
955  deletefiles - delete all the intermediate data files genereated
956  overwrite - overwrite any data files which already exist
957  use_existing - use any data files which already exist
958  combine_files - l2bin bin all the L2 files together instead of bin each L2 file separately
959 
960  Simultaneous use of both the overwrite and use_existing options
961  is not permitted.
962 
963  The names for other sections are the programs for which that section's
964  entries are to be applied. Intermediate sections which are required for the
965  final level of processing do not need to be defined if their default options
966  are acceptable. A section can be empty. The final level of processing
967  must have a section header, even if no entries appear within that section.
968  - Entries within a section appear as key=value. Comma separated lists of
969  values can be used when appropriate.
970  - Comments are marked by "#"; anything appearing on a line after that
971  character is ignored. A line beginning with a "#" is completely ignored.
972 
973  In addition to the main section, the following sections are allowed:
974  Section name: Applicable Instrument(s):
975  ------------- -------------------------\n"""
976 
977  lvl_name_help = ''
978  for lname in level_names:
979  lvl_name_help += ' {0:24s}{1}\n'.\
980  format(lname[0] + ':', ', '.join(lname[1]))
981 
982  message += lvl_name_help
983  message += """
984  Example:
985 
986  # Sample par file for %prog.
987  [main]
988  ifile=2010345034027.L1A_LAC
989  [l2gen]
990  l2prod=chlor_a
991  # final processing level
992  """
993  return message
994 
995 def do_processing(sensors_sets, par_file, cmd_line_ifile=None):
996  """
997  Perform the processing for each step (element of processor_list) needed.
998  """
999  global input_file_data
1000  #todo: Break this up into smaller parts!
1001  files_to_delete = []
1002  input_files_list = []
1003  (par_contnts, input_files_list) = get_par_file_contents(par_file,
1004  FILE_USE_OPTS)
1005  if cmd_line_ifile:
1006  skip_par_ifile = True
1007  if os.path.exists(cmd_line_ifile):
1008  input_files_list = [cmd_line_ifile]
1009  else:
1010  msg = 'Error! Specified ifile {0} does not exist.'.\
1011  format(cmd_line_ifile)
1012  sys.exit(msg)
1013  else:
1014  skip_par_ifile = False
1015  if par_contnts['main']:
1016  if (not skip_par_ifile) and (not 'ifile' in par_contnts['main']):
1017  msg = 'Error! No ifile specified in the main section of {0}.'.\
1018  format(par_file)
1019  sys.exit(msg)
1020  # Avoid overwriting file options that are already turned on in cfg_data
1021  # (from command line input).
1022  deletefiles, use_existing, overwrite, combine_files = get_file_handling_opts(par_contnts)
1023  if deletefiles:
1024  cfg_data.deletefiles = True
1025  if use_existing:
1026  cfg_data.use_existing = True
1027  if overwrite:
1028  cfg_data.overwrite = True
1029  if combine_files:
1030  cfg_data.combine_files = True
1031  if 'use_ancillary' in par_contnts['main'] and \
1032  int(par_contnts['main']['use_ancillary']) == 0:
1033  cfg_data.get_anc = False
1034  if 'odir' in par_contnts['main']:
1035  dname = par_contnts['main']['odir']
1036  if os.path.exists(dname):
1037  if os.path.isdir(dname):
1038  if cfg_data.output_dir_is_settable:
1039  cfg_data.output_dir = os.path.realpath(dname)
1040  else:
1041  log_msg = 'Ignoring par file specification for output directory, {0}; using command line value, {1}.'.format(par_contnts['main']['odir'], cfg_data.output_dir)
1042  logging.info(log_msg)
1043  else:
1044  msg = 'Error! {0} is not a directory.'.format(dname)
1045  sys.exit(msg)
1046  else:
1047  msg = 'Error! {0} does not exist.'.format(dname)
1048  sys.exit(msg)
1049 
1050  logging.debug('cfg_data.overwrite: ' + str(cfg_data.overwrite))
1051  logging.debug('cfg_data.use_existing: ' + str(cfg_data.use_existing))
1052  logging.debug('cfg_data.deletefiles: ' + str(cfg_data.deletefiles))
1053  logging.debug('cfg_data.combine_files: ' + str(cfg_data.combine_files))
1054  if cfg_data.overwrite and cfg_data.use_existing:
1055  err_msg = 'Error! Incompatible options overwrite and use_existing were found in {0}.'.format(par_file)
1056  log_and_exit(err_msg)
1057  if len(input_files_list) == 1:
1058  if MetaUtils.is_ascii_file(input_files_list[0]) and not MetaUtils.is_metadata_file(input_files_list[0]):
1059  input_files_list = read_file_list_file(input_files_list[0])
1060  input_file_data = get_input_files_type_data(input_files_list)
1061  if not input_file_data:
1062  log_and_exit('No valid data files were specified for processing.')
1063  logging.debug("input_file_data: " + str(input_file_data))
1064  src_files = get_source_files(input_file_data)
1065  sys.stdout.flush()
1066  try:
1067  get_processors(src_files, input_file_data, par_contnts, files_to_delete)
1068  except Exception:
1069  if DEBUG:
1070  err_msg = get_traceback_message()
1071  log_and_exit(err_msg)
1072  else:
1073  err_msg = "Unrecoverable error encountered in processing."
1074  log_and_exit(err_msg)
1075  finally:
1076  clean_files(files_to_delete)
1077  if cfg_data.verbose:
1078  print ("Processing complete.")
1079  sys.stdout.flush()
1080  logging.debug("Processing complete.")
1081  return
1082 
1083 def execute_command(command):
1084  """
1085  Execute what is contained in command and then output the results to log
1086  files and the console, as appropriate.
1087  """
1088  if DEBUG:
1089  print ("Entering execute_command, cfg_data.verbose =",
1090  cfg_data.verbose)
1091  log_msg = 'Executing command:\n {0}'.format(command)
1092  logging.debug(log_msg)
1093  # subproc = subprocess.Popen(command, shell=False, stdout=subprocess.PIPE,
1094  # stderr=subprocess.PIPE)
1095  subproc = subprocess.run(command, capture_output=True, text=True, shell=False)
1096  std_out, err_out = subproc.stdout, subproc.stderr
1097  status = subproc.returncode
1098  logging.info(std_out)
1099  logging.info(err_out)
1100  if cfg_data.verbose:
1101  print (std_out)
1102  return status
1103 
1104 def extract_par_section(par_contents, section):
1105  """
1106  Returns a single section (e.g. L1a, GEO, L1B, L2, etc.) from the "par" file.
1107  """
1108  sect_dict = {}
1109  for key in list(par_contents[section].keys()):
1110  sect_dict[key] = par_contents[section][key]
1111  return sect_dict
1112 
1113 def find_geo_file(inp_file):
1114  """
1115  Searches for a GEO file corresponding to inp_file. If that GEO file exists,
1116  returns that file name; otherwise, returns None.
1117  """
1118  src_dir = os.path.dirname(inp_file)
1119  src_base = os.path.basename(inp_file)
1120  src_base_tmp = src_base.replace("L1B", "GEO")
1121  geo_base = src_base_tmp.replace("_LAC", "")
1122  # geo_base = src_base.rsplit('.', 1)[0]
1123  geo_file = os.path.join(src_dir, geo_base)
1124  if not os.path.exists(geo_file):
1125  geo_file = None
1126  return geo_file
1127 
1128 def find_geo_file2(inp_file, instrument, lvl):
1129  """
1130  Searches for a GEO file corresponding to inp_file. If that GEO file exists,
1131  returns that file name; otherwise, returns None.
1132  """
1133  src_dir = os.path.dirname(inp_file)
1134  src_base = os.path.basename(inp_file)
1135  if instrument.find('hawkeye') != -1:
1136  src_base_tmp = src_base.replace("L1A", "GEO")
1137  geo_base = src_base_tmp.replace("nc", "hdf")
1138  elif instrument.find('modis') != -1:
1139  if lvl.find('level 1a') != -1:
1140  src_base_tmp = src_base.replace("L1A", "GEO")
1141  geo_base_tmp = src_base_tmp.replace("_LAC", "")
1142  if geo_base_tmp.find('MODIS') != -1:
1143  geo_base = geo_base_tmp.replace("nc", "hdf")
1144  else:
1145  geo_base = geo_base_tmp
1146  elif lvl.find('level 1b') != -1:
1147  src_base_tmp = src_base.replace("L1B", "GEO")
1148  geo_base = src_base_tmp.replace("_LAC", "")
1149  elif instrument.find('viirs') != -1:
1150  if lvl.find('level 1a') != -1:
1151  geo_base_tmp = src_base.replace("L1A", "GEO-M")
1152  elif lvl.find('level 1b') != -1:
1153  geo_base_tmp = src_base.replace("L1B", "GEO")
1154  if geo_base_tmp.find('VIIRS') != -1:
1155  geo_base_tmp2 = geo_base_tmp.replace("nc", "hdf")
1156  geo_base = geo_base_tmp2.replace("GEO-M", "GEO_M")
1157  else:
1158  geo_base = geo_base_tmp
1159  # geo_base = src_base.rsplit('.', 1)[0]
1160  geo_file = os.path.join(src_dir, geo_base)
1161  if not os.path.exists(geo_file):
1162  geo_file = None
1163  return geo_file
1164 
1165 def find_viirs_geo_file(proc, first_svm_file):
1166  """
1167  Searches for a GEO file corresponding to first_svm_file. If that GEO file
1168  exists, returns that file name; otherwise, returns None.
1169  """
1170  fname = first_svm_file.replace('SVM01', 'GMTCO').rstrip()
1171  if not os.path.exists(fname):
1172  fname = None
1173  return fname
1174 
1175 def get_batch_output_name(file_set, suffix):
1176  """
1177  Returns the output file for a "batch" run, i.e. a process that can accept
1178  multiple inputs, such as l2bin or l3bin.
1179  """
1180  mission_prefixes = ['A', 'C', 'O', 'S', 'T']
1181  stem = 'out'
1182  if not len(file_set): # == 0:
1183  err_msg = "Error! An output file name could not be determined."
1184  log_and_exit(err_msg)
1185  elif len(file_set) == 1:
1186  stem = os.path.splitext(file_set[0])[0]
1187  else:
1188  earliest_file = file_set[0]
1189  latest_file = file_set[0]
1190  earliest_file_date = get_file_date(earliest_file)
1191  latest_file_date = get_file_date(latest_file)
1192  for cur_file in file_set[1:]:
1193  file_date = get_file_date(cur_file)
1194  if file_date < earliest_file_date:
1195  earliest_file = cur_file
1196  earliest_file_date = file_date
1197  elif file_date > latest_file_date:
1198  latest_file = cur_file
1199  latest_file_date = file_date
1200  if (earliest_file[0] == latest_file[0]) and \
1201  (earliest_file[0] in mission_prefixes):
1202  stem = earliest_file[0]
1203  else:
1204  stem = ''
1205  earliest_file_date_stamp = earliest_file_date.strftime('%Y%j')
1206  latest_file_date_stamp = latest_file_date.strftime('%Y%j')
1207  if earliest_file_date_stamp == latest_file_date_stamp:
1208  stem += earliest_file_date_stamp
1209  else:
1210  stem += earliest_file_date_stamp + latest_file_date_stamp
1211  return ''.join([stem, '.', suffix])
1212 
1213 def get_data_file_option(par_contents, opt_text):
1214  """
1215  If found in par_contents, the value for the option specified by opt_text
1216  is returned; otherwise, False is returned.
1217  """
1218 
1219  if re.search('combine_files', opt_text):
1220  opt_found = True
1221  else:
1222  opt_found = False
1223  if opt_text in par_contents['main']:
1224  opt_str = par_contents['main'][opt_text].upper()
1225  opt_found = mlp_utils.is_option_value_true(opt_str)
1226  return opt_found
1227 
1229  """
1230  Run the lonlat2pixline program and return the parameters found.
1231  """
1232  if proc.geo_file:
1233  # MODIS
1234  in_file = proc.geo_file
1235  else:
1236  # SeaWiFS
1237  in_file = proc.input_file
1238  args = [in_file,
1239  proc.par_data['SWlon'],
1240  proc.par_data['SWlat'],
1241  proc.par_data['NElon'],
1242  proc.par_data['NElat']]
1243  lonlat_prog = os.path.join(proc.ocssw_bin, 'lonlat2pixline')
1244  lonlat_cmd = [lonlat_prog]
1245  lonlat_cmd.extend(args)
1246  logging.debug('Executing lonlat2pixline command: "%s"', " ".join(str(x) for x in lonlat_cmd))
1247  process_output = subprocess.run(lonlat_cmd, capture_output=True, text=True, shell=False)
1248  lonlat_output = process_output.stdout.splitlines()
1249  start_line = None
1250  end_line = None
1251  start_pixel = None
1252  end_pixel = None
1253  for line in lonlat_output:
1254  line_text = str(line).strip("'")
1255  if 'sline' in line_text:
1256  start_line = int(line_text.split('=')[1])
1257  if 'eline' in line_text:
1258  end_line = int(line_text.split('=')[1])
1259  if 'spixl' in line_text:
1260  start_pixel = int(line_text.split('=')[1])
1261  if 'epixl' in line_text:
1262  end_pixel = int(line_text.split('=')[1])
1263  return start_line, end_line, start_pixel, end_pixel
1264 
1265 def get_file_date(filename):
1266  """
1267  Get a Python Date object from a recognized file name's year and day of year.
1268  """
1269  base_filename = os.path.basename(filename)
1270  if re.match(r'[ACMOQSTV]\d\d\d\d\d\d\d.*', base_filename):
1271  year = int(base_filename[1:5])
1272  doy = int(base_filename[5:8])
1273  elif re.match(r'\d\d\d\d\d\d\d.*', base_filename):
1274  # Some Aquarius
1275  year = int(base_filename[0:4])
1276  doy = int(base_filename[4:7])
1277  elif re.match(r'\w*_npp_d\d\d\d\d\d\d\d_.*', base_filename):
1278  # NPP
1279  prefix_removed_name = re.sub(r'\w*_npp_d', '', base_filename)
1280  year = int(prefix_removed_name[0:4])
1281  doy = int(prefix_removed_name[5:7])
1282  else:
1283  err_msg = 'Unable to determine date for {0}'.format(filename)
1284  log_and_exit(err_msg)
1285  file_date = datetime.datetime(year, 1, 1) + datetime.timedelta(doy - 1)
1286  return file_date
1287 
1288 def get_file_handling_opts(par_contents):
1289  """
1290  Returns the values of the file handling options in par_contents.
1291  """
1292  deletefiles = get_data_file_option(par_contents, 'deletefiles')
1293  use_existing = get_data_file_option(par_contents, 'use_existing')
1294  overwrite = get_data_file_option(par_contents, 'overwrite')
1295  combine_files = get_data_file_option(par_contents, 'combine_files')
1296  return deletefiles, use_existing, overwrite, combine_files
1297 
1298 def get_input_files(par_data):
1299  """
1300  Get input files found in the uber par file's ifile line, a file list file,
1301  or both. Ensure that the list contains no duplicates.
1302  """
1303  #inp_file_list = None
1304  from_ifiles = []
1305  from_infilelist = []
1306  if 'ifile' in par_data['main']:
1307  inp_file_str = par_data['main']['ifile'].split('#', 2)[0]
1308  cleaned_str = re.sub(r'[\t,:\[\]()"\']', ' ', inp_file_str)
1309  from_ifiles = cleaned_str.split()
1310  if 'infilelist' in par_data['main']:
1311  infilelist_name = par_data['main']['infilelist']
1312  if os.path.exists(infilelist_name):
1313  if os.path.isfile(infilelist_name) and \
1314  os.access(infilelist_name, os.R_OK):
1315  with open(infilelist_name, 'rt') as in_file_list_file:
1316  inp_lines = in_file_list_file.readlines()
1317  from_infilelist = [fn.rstrip() for fn in inp_lines
1318  if not re.match(r'^\s*#', fn)]
1319  if len(from_ifiles) == 0 and len(from_infilelist) == 0:
1320  return None
1321  # Make sure there are no duplicates. Tests with timeit showed that
1322  # list(set()) is much faster than a "uniqify" function.
1323  return list(set(from_ifiles + from_infilelist))
1324 
1325 def get_input_files_type_data(input_files_list):
1326  """
1327  Returns a dictionary with the the file_type (L0, L1A, L2, etc) and
1328  instrument for each file in the input list.
1329  """
1330  converter = {
1331  'geo': 'geo',
1332  'level 0': 'level 0',
1333  'level 1 browse data': 'l1brsgen',
1334  'level 1a': 'level 1a',
1335  'level 1b': 'level 1b',
1336  'level 1c': 'level 1c',
1337  'sdr': 'level 1b',
1338  'level 2': 'l2gen',
1339  'level 3 binned': 'l3bin'
1340  # 'level 3 smi': 'smigen'
1341  }
1342  input_file_type_data = {}
1343  for inp_file in input_files_list:
1344  # if os.path.dirname((inp_file)) == '':
1345  # inp_path = os.path.join(os.getcwd(), inp_file)
1346  # else:
1347  # inp_path = inp_file
1348  file_typer = mlp.get_obpg_file_type.ObpgFileTyper(inp_file)
1349  file_type, file_instr = file_typer.get_file_type()
1350  #if file_type in converter:
1351  # file_type = converter[file_type.lower()]
1352  #else:
1353  # err_msg =
1354  # 'Error! Cannot process file type {0} of {1}'.format(file_type,
1355  # inp_file)
1356  if file_type.lower() in converter:
1357  file_type = converter[file_type.lower()]
1358  input_file_type_data[inp_file] = (file_type, file_instr.lower())
1359  else:
1360 
1361  # input_file_type_data[inp_file] = ('unknown', 'unknown')
1362  warn_msg = "Warning: Unable to determine a type for file {0}. It will not be processed.".format(inp_file)
1363  print (warn_msg)
1364  logging.info(warn_msg)
1365  return input_file_type_data
1366 
1367 def get_intermediate_processors(sensor, existing_procs, rules, lowest_source_level):
1368  """
1369  Create processor objects for products which are needed, but not explicitly
1370  specified in the par file.
1371  """
1372  existing_products = [proc.target_type for proc in existing_procs]
1373  intermediate_products = get_intermediate_products(existing_products, rules,
1374  lowest_source_level)
1375  intermediate_processors = []
1376  for prod in intermediate_products:
1377  # Create a processor for the product and add it to the intermediate
1378  # processors list
1379  if not prod in existing_products:
1380  new_proc = processor.Processor(sensor, rules, prod, {},
1381  cfg_data.hidden_dir)
1382  intermediate_processors.append(new_proc)
1383  return intermediate_processors
1384 
1385 def get_intermediate_products(existing_prod_names, ruleset,
1386  lowest_source_level):
1387  """
1388  Find products which are needed, but not explicitly specified by the
1389  par file.
1390  """
1391  required_progs = []
1392  for prog in existing_prod_names:
1393  candidate_progs = get_required_programs(prog, ruleset,
1394  lowest_source_level)
1395  if not isinstance(candidate_progs, type(None)):
1396  for candidate_prog in candidate_progs:
1397  required_progs.append(candidate_prog)
1398  required_progs = uniqify_list(required_progs)
1399  required_progs.sort()
1400  return required_progs
1401 
1403  """
1404  Returns the extension for an L2 file. For the time being, this is
1405  just '.L2'; however, different extensions may be wanted in the future, thus
1406  this function is in place.
1407  """
1408  return '.L2'
1409 
1411  """
1412  Returns the extension for an L3 Binned file. For the time being, this is
1413  just '.L3bin'; however, different extensions may be wanted in the future,
1414  thus this function is in place.
1415  """
1416  return '.L3b'
1417 
1418 def get_lowest_source_level(source_files):
1419  """
1420  Find the level of the lowest level source file to be processed.
1421  """
1422  order = ['level 1a', 'geo', 'level 1b', 'l2gen',
1423  'l2bin', 'l3mapgen']
1424  if len(source_files) == 1:
1425  return list(source_files.keys())[0]
1426  else:
1427  lowest = list(source_files.keys())[0]
1428  for key in list(source_files.keys())[1:]:
1429  # if key < lowest:
1430  if order.index(key) < order.index(lowest):
1431  lowest = key
1432  return lowest
1433 
1434 def get_options(par_data):
1435  """
1436  Extract the options for a program to be run from the corresponding data in
1437  the uber par file.
1438  """
1439  options = []
1440  for key in par_data:
1441  if key != 'ofile' and key != 'odir' and not key.lower() in FILE_USE_OPTS:
1442  if par_data[key]:
1443  options.append('{}={}'.format(key,par_data[key]))
1444  else:
1445  options.append(key)
1446  return options
1447 
1448 def get_output_name2(inp_files, targ_prog, suite=None, oformt=None, res=None):
1449  """
1450  Determine what the output name would be if targ_prog is run on input_files.
1451  """
1452  cl_opts = optparse.Values()
1453  cl_opts.suite = suite
1454  cl_opts.oformat = oformt
1455  cl_opts.resolution = res
1456  if not isinstance(inp_files, list):
1457  data_file = get_obpg_data_file_object(inp_files)
1458  output_name = mlp.get_output_name_utils.get_output_name([data_file], targ_prog,
1459  cl_opts)
1460  else:
1461  output_name = mlp.get_output_name_utils.get_output_name(inp_files, targ_prog,
1462  cl_opts)
1463  return output_name
1464 
1465 def get_output_name3(input_name, input_files, suffix):
1466  """
1467  Determine the output name for a program to be run.
1468  """
1469  # Todo: rename to get_output_name and delete other get_output_name
1470  output_name = None
1471  if input_name in input_files:
1472  if input_files[input_name][0] == 'level 0' and \
1473  input_files[input_name][1].find('modis') != -1:
1474  if input_files[input_name][1].find('aqua') != -1:
1475  first_char = 'A'
1476  else:
1477  first_char = 'T'
1478  time_stamp = ''
1479  if os.path.exists(input_name + '.const'):
1480  with open(input_name + '.const') as constructor_file:
1481  constructor_data = constructor_file.readlines()
1482  for line in constructor_data:
1483  if line.find('starttime=') != -1:
1484  start_time = line[line.find('=') + 1].strip()
1485  break
1486  time_stamp = ProcUtils.date_convert(start_time, 't', 'j')
1487  else:
1488  if re.match(r'MOD00.P\d\d\d\d\d\d\d\.\d\d\d\d', input_name):
1489  time_stamp = input_name[7:14] + input_name[15:19] + '00'
1490  else:
1491  err_msg = "Cannot determine time stamp for input file {0}".\
1492  format(input_name)
1493  log_and_exit(err_msg)
1494  output_name = first_char + time_stamp + '.L1A'
1495  else:
1496 # if input_files[input_name] == ''
1497  (dirname, basename) = os.path.split(input_name)
1498  basename_parts = basename.rsplit('.', 2)
1499  output_name = os.path.join(dirname, basename_parts[0] + '.' +
1500  suffix)
1501  else:
1502  (dirname, basename) = os.path.split(input_name)
1503  basename_parts = basename.rsplit('.', 2)
1504  output_name = os.path.join(dirname, basename_parts[0] + '.' + suffix)
1505  return output_name
1506 
1507 def get_par_file_contents(par_file, acceptable_single_keys):
1508  """
1509  Return the contents of the input "par" file.
1510  """
1511  acceptable_par_keys = {
1512  'level 0' : 'level 0', 'l0' : 'level 0',
1513  'level 1a' : 'level 1a', 'l1a' : 'level 1a', 'l1agen': 'level 1a',
1514  'modis_L1A': 'level 1a',
1515  'l1brsgen': 'l1brsgen',
1516  # 'l1mapgen': 'l1mapgen',
1517  'l1aextract': 'l1aextract',
1518  'l1aextract_modis': 'l1aextract_modis',
1519  'l1aextract_seawifs' : 'l1aextract_seawifs',
1520  'l1aextract_viirs' : 'l1aextract_viirs',
1521  'l1brsgen' : 'l1brsgen',
1522  'geo' : 'geo', 'modis_GEO': 'geo', 'geolocate_viirs': 'geo',
1523  'geolocate_hawkeye': 'geo',
1524  'level 1b' : 'level 1b', 'l1b' : 'level 1b', 'l1bgen' : 'level 1b',
1525  'modis_L1B': 'level 1b', 'calibrate_viirs': 'level 1b',
1526  'level 2' : 'l2gen',
1527  'l2gen' : 'l2gen',
1528  'l2bin' : 'l2bin',
1529  'l2brsgen' : 'l2brsgen',
1530  'l2extract' : 'l2extract',
1531  # 'l2mapgen' : 'l2mapgen',
1532  #'l3bin' : 'l3bin',
1533  'l3mapgen' : 'l3mapgen',
1534  # 'smigen' : 'smigen',
1535  'main' : 'main'
1536  }
1537  if cfg_data.verbose:
1538  print ("Processing %s" % par_file)
1539  par_reader = uber_par_file_reader.ParReader(par_file,
1540  acceptable_single_keys,
1541  acceptable_par_keys)
1542  par_contents = par_reader.read_par_file()
1543  ori_keys = list(par_contents.keys())
1544  for key in ori_keys:
1545  if key in acceptable_par_keys:
1546  if key != acceptable_par_keys[key]:
1547  par_contents[acceptable_par_keys[key]] = par_contents[key]
1548  del par_contents[key]
1549  else:
1550  acc_key_str = ', '.join(list(acceptable_par_keys.keys()))
1551  err_msg = """Error! Parameter file {0} contains a section titled "{1}", which is not a recognized program.
1552 The recognized programs are: {2}""".format(par_file, key, acc_key_str)
1553 
1554  log_and_exit(err_msg)
1555  if 'main' in par_contents:
1556  input_files_list = get_input_files(par_contents)
1557  else:
1558  err_msg = 'Error! Could not find section "main" in {0}'.format(par_file)
1559  log_and_exit(err_msg)
1560  return par_contents, input_files_list
1561 
1562 def get_processors2(sensor, par_contents, rules, lowest_source_level):
1563  """
1564  Determine the processors which are needed.
1565  """
1566  processors = []
1567  for key in list(par_contents.keys()):
1568  if key != 'main':
1569  section_contents = extract_par_section(par_contents, key)
1570  proc = processor.Processor(sensor, rules, key, section_contents,
1571  cfg_data.hidden_dir)
1572  processors.append(proc)
1573  if processors:
1574  processors.sort() # needs sorted for get_intermediate_processors
1575  processors += get_intermediate_processors(sensor, processors, rules,
1576  lowest_source_level)
1577  processors.sort()
1578  return processors
1579 
1580 def exe_processor(proc, src_files, src_lvl):
1581  """
1582  Execute the processor.
1583  """
1584  if proc.out_directory == cfg_data.hidden_dir:
1585  proc.out_directory = cfg_data.output_dir
1586  if proc.requires_batch_processing() and cfg_data.combine_files:
1587  logging.debug('Performing batch processing for ' + str(proc))
1588  out_file= run_batch_processor(proc,
1589  src_files[src_lvl])
1590  return out_file, False
1591  else:
1592  if proc.rule_set.rules[proc.target_type].action:
1593  logging.debug('Performing nonbatch processing for ' + str(proc))
1594  # src_file_sets = get_source_file_sets(proc.rule_set.rules[proc.target_type].src_file_types,
1595  # src_files, src_lvl,
1596  # proc.rule_set.rules[proc.target_type].requires_all_sources)
1597  success_count = 0
1598  # for file_set in src_file_sets:
1599  out_file, used_exsiting = run_nonbatch_processor(proc)
1600  if out_file:
1601  success_count += 1
1602  if success_count == 0:
1603  print('The {0} processor produced no output files.'.format(proc.target_type), flush=True)
1604  msg = 'The {0} processor produced no output files.'.format(proc.target_type)
1605  # logging.info(msg)
1606  logging.debug(msg)
1607  return out_file, used_exsiting
1608  else:
1609  msg = '-I- There is no way to create {0} files for {1}.'.format(proc.target_type, proc.instrument)
1610  logging.info(msg)
1611 
1612 def get_processors(src_files, input_files, par_contents, files_to_delete):
1613  """
1614  Determine how to chain the processors together.
1615  """
1616  order = ['level 0', 'level 1a', 'level 1c', 'geo', 'l1aextract',
1617  'level 1b', 'l1brsgen', 'l2gen', 'l2extract', 'l2brsgen',
1618  'l2bin', 'l3mapgen']
1619  key_list = list(par_contents.keys())
1620  last_key = key_list[-1]
1621  if 'l2extract' in key_list:
1622  if not ('l2gen' in key_list):
1623  pos = key_list.index('l2extract')
1624  key_list.insert(pos, 'l2gen')
1625  items = list(par_contents.items())
1626  items.insert(pos, ('l2gen', {}))
1627  par_contents = dict(items)
1628  elif 'l2brsgen' in key_list:
1629  if not ('l2gen' in key_list):
1630  pos = key_list.index('l2brsgen')
1631  key_list.insert(pos, 'l2gen')
1632  items = list(par_contents.items())
1633  items.insert(pos, ('l2gen', {}))
1634  par_contents = dict(items)
1635  elif 'l2bin' in key_list:
1636  # if not ('l2gen' in key_list or 'l2extract' in key_list):
1637  if not ('l2gen' in key_list):
1638  pos = key_list.index('l2bin')
1639  key_list.insert(pos, 'l2gen')
1640  items = list(par_contents.items())
1641  items.insert(pos, ('l2gen', {}))
1642  par_contents = dict(items)
1643  elif 'l3mapgen' in key_list:
1644  if not ('l2bin' in key_list):
1645  pos = key_list.index('l3mapgen')
1646  key_list.insert(pos, 'l2bin')
1647  items = list(par_contents.items())
1648  items.insert(pos, ('l2bin', {}))
1649  par_contents = dict(items)
1650  # if not ('l2gen' in key_list or 'l2extract' in key_list):
1651  if not ('l2gen' in key_list):
1652  pos = key_list.index('l2bin')
1653  key_list.insert(pos, 'l2gen')
1654  items = list(par_contents.items())
1655  items.insert(pos, ('l2gen', {}))
1656  par_contents = dict(items)
1657  for key in key_list:
1658  if key != 'main':
1659  section_contents = extract_par_section(par_contents, key)
1660  if not order.index(key) > order.index('l2gen'):
1661  src_lvls = list(src_files.keys())
1662  if not key in src_files:
1663  src_files[key] = []
1664  for src_lvl in src_lvls:
1665  if order.index(src_lvl) < order.index('l2gen'):
1666  for file in src_files[src_lvl]:
1667  file_typer = mlp.get_obpg_file_type.ObpgFileTyper(file)
1668  instrument = file_typer.get_file_type()[1].lower().split()[0]
1669  # instrument = input_files[file][1].split()[0]
1670  logging.debug("instrument: " + instrument)
1671  if instrument in sensors_sets:
1672  rules = sensors_sets[instrument].recipe
1673  sensor = sensors_sets[instrument]
1674  else:
1675  rules = sensors_sets['general'].recipe
1676  sensor = sensors_sets['general']
1677  proc = processor.Processor(sensor, rules, key, section_contents,
1678  cfg_data.hidden_dir)
1679  proc.input_file = file
1680  if file_typer.get_file_type()[0].lower().find('level 0') == -1:
1681  if sensor.require_geo and key != 'geo':
1682  if 'geo' in src_lvls:
1683  proc.geo_file = src_files['geo'][0]
1684  else:
1685  proc.geo_file = find_geo_file2(proc.input_file, instrument, src_lvl)
1686 
1687  if not proc.geo_file:
1688  if src_lvl.find('level 1b') != -1:
1689  err_msg = 'Error! Cannot produce GEO file for {0}, Need level 1a file'.format(file)
1690  # log_and_exit(err_m sg)
1691  logging.debug(err_msg , flush=True)
1692  print(err_msg)
1693  print ('Skipping processing on file {0}.'.format(file), flush=True)
1694  logging.debug('')
1695  log_msg = 'Skipping processing on file {0}.'.format(file)
1696  logging.debug(log_msg)
1697  continue
1698  else:
1699  proc_geo = processor.Processor(sensor, rules, 'geo', {},
1700  cfg_data.hidden_dir)
1701  proc_geo.input_file = file
1702  print ('Running geo on file {0}.'.format(file), flush=True)
1703  logging.debug('')
1704  log_msg = 'Processing for geo:'
1705  logging.debug(log_msg)
1706  proc.geo_file, used_existing = exe_processor(proc_geo, src_files, src_lvl)
1707  if not proc.geo_file:
1708  print ('Skipping processing on file {0}.'.format(file), flush=True)
1709  logging.debug('')
1710  log_msg = 'Skipping processing on file {0}.'.format(file)
1711  logging.debug(log_msg)
1712  continue
1713  if cfg_data.deletefiles and not used_existing:
1714  if proc.geo_file:
1715  files_to_delete.append(proc.geo_file)
1716  if used_existing:
1717  print ('Used existing geo file {0}.'.format(proc.geo_file))
1718  logging.debug('')
1719  log_msg = 'Used existing geo file {0}.'.format(proc.geo_file)
1720  logging.debug(log_msg)
1721  if key == 'l2gen' and sensor.require_l1b_for_l2gen and src_lvl.find('level 1b') == -1:
1722  proc_l1b = processor.Processor(sensor, rules, 'level 1b', {},
1723  cfg_data.hidden_dir)
1724  if sensor.require_geo:
1725  proc_l1b.input_file = file
1726  proc_l1b.geo_file = proc.geo_file
1727  print ('Running level 1b on file {0}.'.format(file), flush=True)
1728  logging.debug('')
1729  log_msg = 'Processing for level 1b:'
1730  logging.debug(log_msg)
1731  proc.input_file, used_existing = exe_processor(proc_l1b, src_files, src_lvl)
1732  if not proc.input_file:
1733  print ('Skipping processing on file {0}.'.format(file), flush=True)
1734  logging.debug('')
1735  log_msg = 'Skipping processing on file {0}.'.format(file)
1736  logging.debug(log_msg)
1737  continue
1738  if cfg_data.deletefiles and not used_existing:
1739  if proc.input_file:
1740  files_to_delete.append(proc.input_file)
1741  if used_existing:
1742  print ('Used existing level 1b file {0}.'.format(proc.input_file))
1743  logging.debug('')
1744  log_msg = 'Used existing level 1b file {0}.'.format(proc.input_file)
1745  logging.debug(log_msg)
1746  if proc.input_file:
1747  file_input = proc.input_file
1748  print ('Running {0} on file {1}.'.format(proc.target_type, proc.input_file), flush=True)
1749  logging.debug('')
1750  log_msg = 'Processing for {0}:'.format(proc.target_type)
1751  logging.debug(log_msg)
1752  out_file, used_existing = exe_processor(proc, src_files, src_lvl)
1753  if out_file:
1754  src_files[key].append(out_file)
1755  else:
1756  print ('Skipping processing on file {0}.'.format(file_input, flush=True))
1757  logging.debug('')
1758  log_msg = 'Skipping processing on file {0}.'.format(file_input)
1759  logging.debug(log_msg)
1760  continue
1761  if cfg_data.deletefiles and not used_existing and key != last_key:
1762  if out_file and key.find('brsgen') == -1:
1763  files_to_delete.append(out_file)
1764  if used_existing:
1765  print ('Used existing file {0}.'.format(out_file))
1766  logging.debug('')
1767  log_msg = 'Used existing file {0}.'.format(out_file)
1768  logging.debug(log_msg)
1769  if key.find('l1aextract') != -1:
1770  src_files['level 1a'] = src_files[key]
1771  del src_files['l1aextract']
1772  if 'geo' in src_files:
1773  del src_files['geo']
1774  src_lvls.remove('geo')
1775  # input_files[src_files[src_lvl][0]] = input_files[file]
1776  else:
1777  if key != 'level 1a':
1778  proc_l1a = processor.Processor(sensor, rules, 'level 1a', {},
1779  cfg_data.hidden_dir)
1780  proc_l1a.input_file = file
1781  print ('Running level 1a on file {0}.'.format(file), flush=True)
1782  logging.debug('')
1783  log_msg = 'Processing for level 1a:'
1784  logging.debug(log_msg)
1785  proc.input_file, used_existing = exe_processor(proc_l1a, src_files, src_lvl)
1786  if not proc.input_file:
1787  print ('Skipping processing on file {0}.'.format(file), flush=True)
1788  logging.debug('')
1789  log_msg = 'Skipping processing on file {0}.'.format(file)
1790  logging.debug(log_msg)
1791  continue
1792  if cfg_data.deletefiles and not used_existing:
1793  if proc.input_file:
1794  files_to_delete.append(proc.input_file)
1795  if used_existing:
1796  print ('Used existing level 1a file {0}.'.format(proc.input_file))
1797  logging.debug('')
1798  log_msg = 'Used existing level 1a file {0}.'.format(proc.input_file)
1799  logging.debug(log_msg)
1800  if proc.input_file and sensor.require_geo and key != 'geo' and key != 'level 1a':
1801  proc.geo_file = find_geo_file2(proc.input_file, instrument, 'level 1a')
1802  if not proc.geo_file:
1803  proc_geo = processor.Processor(sensor, rules, 'geo', {},
1804  cfg_data.hidden_dir)
1805  proc_geo.input_file = proc.input_file
1806  print ('Running geo on file {0}.'.format(proc.input_file), flush=True)
1807  logging.debug('')
1808  log_msg = 'Processing for geo:'
1809  logging.debug(log_msg)
1810  proc.geo_file, used_existing = exe_processor(proc_geo, src_files, src_lvl)
1811  if not proc.geo_file:
1812  print ('Skipping processing on file {0}.'.format(proc.input_file), flush=True)
1813  logging.debug('')
1814  log_msg = 'Skipping processing on file {0}.'.format(proc.input_file)
1815  logging.debug(log_msg)
1816  continue
1817  if cfg_data.deletefiles and not used_existing:
1818  if proc.geo_file:
1819  files_to_delete.append(proc.geo_file)
1820  if used_existing:
1821  print ('Used existing geo file {0}.'.format(proc.geo_file))
1822  logging.debug('')
1823  log_msg = 'Used existing geo file {0}.'.format(proc.geo_file)
1824  logging.debug(log_msg)
1825  if key == 'l2gen' and sensor.require_l1b_for_l2gen:
1826  proc_l1b = processor.Processor(sensor, rules, 'level 1b', {},
1827  cfg_data.hidden_dir)
1828  if sensor.require_geo:
1829  proc_l1b.input_file = proc.input_file
1830  proc_l1b.geo_file = proc.geo_file
1831  print ('Running level 1b on file {0}.'.format(proc.input_file), flush=True)
1832  logging.debug('')
1833  log_msg = 'Processing for level 1b:'
1834  logging.debug(log_msg)
1835  file_input = proc.input_file
1836  proc.input_file, used_existing = exe_processor(proc_l1b, src_files, src_lvl)
1837  if not proc.input_file:
1838  print ('Skipping processing on file {0}.'.format(file_input), flush=True)
1839  logging.debug('')
1840  log_msg = 'Skipping processing on file {0}.'.format(file_input)
1841  logging.debug(log_msg)
1842  continue
1843  if cfg_data.deletefiles and not used_existing:
1844  if proc.input_file:
1845  files_to_delete.append(proc.input_file)
1846  if used_existing:
1847  print ('Used existing level 1b file {0}.'.format(proc_l1b.input_file))
1848  logging.debug('')
1849  log_msg = 'Used existing level 1b file {0}.'.format(proc_l1b.input_file)
1850  logging.debug(log_msg)
1851  if proc.input_file:
1852  file_input = proc.input_file
1853  print ('Running {0} on file {1}.'.format(proc.target_type, proc.input_file), flush=True)
1854  logging.debug('')
1855  log_msg = 'Processing for {0}:'.format(proc.target_type)
1856  logging.debug(log_msg)
1857  out_file, used_existing = exe_processor(proc, src_files, src_lvl)
1858  if out_file:
1859  src_files[key].append(out_file)
1860  else:
1861  print ('Skipping processing on file {0}.'.format(file_input), flush=True)
1862  logging.debug('')
1863  log_msg = 'Skipping processing on file {0}.'.format(file_input)
1864  logging.debug(log_msg)
1865  continue
1866  if cfg_data.deletefiles and not used_existing and key != last_key:
1867  if out_file and key.find('brsgen') == -1:
1868  files_to_delete.append(out_file)
1869  if used_existing:
1870  print ('Used existing file {0}.'.format(out_file))
1871  logging.debug('')
1872  log_msg = 'Used existing file {0}.'.format(out_file)
1873  logging.debug(log_msg)
1874  if key.find('l1aextract') != -1:
1875  src_files['level 1a'] = src_files[key]
1876  del src_files['l1aextract']
1877  if 'geo' in src_files:
1878  del src_files['geo']
1879  src_lvls.remove('geo')
1880  # input_files[src_files[src_lvl][0]] = input_files[file]
1881  if len(src_files) > 1 and key != 'geo' :
1882  if key.find('brsgen') != -1:
1883  del src_files[key]
1884  else:
1885  del src_files[src_lvl]
1886  if len(src_files) > 1:
1887  if 'geo' in src_lvls:
1888  del src_files['geo']
1889  src_lvls.remove('geo')
1890  else:
1891  src_lvls = list(src_files.keys())
1892  rules = sensors_sets['general'].recipe
1893  sensor = sensors_sets['general']
1894  if not key in src_files:
1895  src_files[key] = []
1896  for src_lvl in src_lvls:
1897  if not order.index(src_lvl) < order.index('l2gen'):
1898  for file in src_files[src_lvl]:
1899  proc = processor.Processor(sensor, rules, key, section_contents,
1900  cfg_data.hidden_dir)
1901  proc.input_file = file
1902  for program in proc.required_types:
1903  if not program in src_files:
1904  proc1 = processor.Processor(sensor, rules, program, {},
1905  cfg_data.hidden_dir)
1906  proc1.input_file = file
1907  # proc1.deletefiles = cfg_data.deletefiles
1908  for program2 in proc1.required_types:
1909  if program2.find(src_lvl) == -1:
1910  proc2 = processor.Processor(sensor, rules, program2, {},
1911  cfg_data.hidden_dir)
1912  proc2.input_file = file
1913  # proc2.deletefiles = cfg_data.deletefiles
1914  print ('Running {0} on file {1}.'.format(proc2.target_type, proc2.input_file), flush=True)
1915  logging.debug('')
1916  log_msg = 'Processing for {0}:'.format(proc2.target_type)
1917  logging.debug(log_msg)
1918  proc1.input_file, used_existing = exe_processor(proc2, src_files, src_lvl)
1919  if not proc1.input_file:
1920  print ('Skipping processing on file {0}.'.format(file), flush=True)
1921  logging.debug('')
1922  log_msg = 'Skipping processing on file {0}.'.format(file)
1923  logging.debug(log_msg)
1924  continue
1925  if cfg_data.deletefiles and not used_existing:
1926  if proc1.input_file:
1927  files_to_delete.append(proc1.input_file)
1928  if used_existing:
1929  print ('Used existing file {0}.'.format(proc1.input_file))
1930  logging.debug('')
1931  log_msg = 'Used existing file {0}.'.format(proc1.input_file)
1932  logging.debug(log_msg)
1933  if proc1.input_file:
1934  file_input = proc1.input_file
1935  print ('Running {0} on file {1}.'.format(proc1.target_type, proc1.input_file), flush=True)
1936  logging.debug('')
1937  log_msg = 'Processing for {0}:'.format(proc1.target_type)
1938  logging.debug(log_msg)
1939  proc.input_file, used_existing = exe_processor(proc1, src_files, src_lvl)
1940  if not proc.input_file:
1941  print ('Skipping processing on file {0}.'.format(file_input), flush=True)
1942  logging.debug('')
1943  log_msg = 'Skipping processing on file {0}.'.format(file_input)
1944  logging.debug(log_msg)
1945  continue
1946  if cfg_data.deletefiles:
1947  if proc.input_file and not used_existing:
1948  files_to_delete.append(proc.input_file)
1949  if used_existing:
1950  print ('Used existing file {0}.'.format(proc.input_file))
1951  logging.debug('')
1952  log_msg = 'Used existing file {0}.'.format(proc.input_file)
1953  logging.debug(log_msg)
1954  del src_files[src_lvl]
1955  if proc.input_file:
1956  file_input = proc.input_file
1957  if len(src_files[src_lvl]) > 1 and cfg_data.combine_files:
1958  print ('Running {0} on a list of files including {1}.'.format(proc.target_type, proc.input_file), flush=True)
1959  logging.debug('')
1960  log_msg = 'Processing for {0}:'.format(proc.target_type)
1961  else:
1962  print ('Running {0} on file {1}.'.format(proc.target_type, proc.input_file), flush=True)
1963  logging.debug('')
1964  log_msg = 'Processing for {0}:'.format(proc.target_type)
1965  logging.debug(log_msg)
1966  out_file, used_existing = exe_processor(proc, src_files, program)
1967  if out_file:
1968  src_files[key].append(out_file)
1969  else:
1970  print ('Skipping processing on file {0}.'.format(file_input), flush=True)
1971  logging.debug('')
1972  log_msg = 'Skipping processing on file {0}.'.format(file_input)
1973  logging.debug(log_msg)
1974  break
1975  # continue
1976  if cfg_data.deletefiles and not used_existing and key != last_key:
1977  if out_file and key.find('brsgen') == -1:
1978  files_to_delete.append(out_file)
1979  if used_existing:
1980  print ('Used existing file {0}.'.format(out_file))
1981  logging.debug('')
1982  log_msg = 'Used existing file {0}.'.format(out_file)
1983  logging.debug(log_msg)
1984  if program in src_files:
1985  if proc.target_type.find('brsgen') != -1:
1986  del src_files[proc.target_type]
1987  else:
1988  if cfg_data.combine_files:
1989  del src_files[program]
1990  elif re.search("l2bin", proc.target_type) and len(src_files[program]) == len(src_files[proc.target_type]):
1991  del src_files[program]
1992  # else:
1993  # src_files[program].remove(file_input)
1994  # if len(src_files[program]) == 0:
1995  # del src_files[program]
1996  if key.find('l2extract') != -1:
1997  src_files['l2gen'] = src_files[key]
1998  del src_files['l2extract']
1999  if proc.requires_batch_processing() and cfg_data.combine_files:
2000  break
2001 
2002  return
2003 
2004 def get_required_programs(target_program, ruleset, lowest_source_level):
2005  """
2006  Returns the programs required too produce the desired final output.
2007  """
2008  programs_to_run = []
2009  cur_rule = ruleset.rules[target_program]
2010  src_types = cur_rule.src_file_types
2011  if src_types[0] == cur_rule.target_type:
2012  programs_to_run = [target_program]
2013  else:
2014  for src_type in src_types:
2015  if src_type in ruleset.rules:
2016  if ruleset.order.index(src_type) > \
2017  ruleset.order.index(lowest_source_level):
2018  programs_to_run.insert(0, src_type)
2019  if len(src_types) > 1:
2020  programs_to_run.insert(0, src_types[1])
2021  programs_to_add = get_required_programs(src_type, ruleset,
2022  lowest_source_level)
2023  for prog in programs_to_add:
2024  programs_to_run.insert(0, prog)
2025  return programs_to_run
2026 
2027 def get_source_geo_files(source_files, proc_src_types, proc_src_ndx):
2028  """
2029  :param source_files: list of source files
2030  :param proc_src_types: list of source types for the processor
2031  :param proc_src_ndx: index into the proc_src_types list pointing to the
2032  source type to use to get the input files
2033  :return: list of GEO files that correspond to the files in source_files
2034  """
2035  inp_files = source_files[proc_src_types[proc_src_ndx]]
2036  geo_files = []
2037  for inp_file in inp_files:
2038  geo_file = find_geo_file(inp_file)
2039  if geo_file:
2040  geo_files.append(geo_file)
2041  else:
2042  err_msg = 'Error! Cannot find GEO ' \
2043  'file {0}.'.format(geo_file)
2044  log_and_exit(err_msg)
2045  return geo_files
2046 
2047 def get_source_file_sets(proc_src_types, source_files, src_key, requires_all_sources):
2048  """
2049  Returns the set of source files needed.
2050  """
2051  if len(proc_src_types) == 1:
2052  try:
2053  src_file_sets = source_files[src_key]
2054  except Exception:
2055  # print "Exception encountered: "
2056  # e_info = sys.exc_info()
2057  # err_msg = ''
2058  # for info in e_info:
2059  # err_msg += " " + str(info)
2060  if DEBUG:
2061  err_msg = get_traceback_message()
2062  log_and_exit(err_msg)
2063  else:
2064  err_msg = 'Error! Unable to determine what source files are required for the specified output files.'
2065  log_and_exit(err_msg)
2066  else:
2067  if requires_all_sources:
2068  if len(proc_src_types) == 2:
2069  if proc_src_types[0] in source_files \
2070  and proc_src_types[1] in source_files:
2071  src_file_sets = list(zip(source_files[proc_src_types[0]],
2072  source_files[proc_src_types[1]]))
2073  else:
2074  if proc_src_types[0] in source_files:
2075  if proc_src_types[1] == 'geo':
2076  geo_files = get_source_geo_files(source_files, proc_src_types, 0)
2077  src_file_sets = list(zip(source_files[proc_src_types[0]],
2078  geo_files))
2079  else:
2080  err_msg = 'Error! Cannot find all {0} and' \
2081  ' {1} source files.'.format(proc_src_types[0],
2082  proc_src_types[1])
2083  log_and_exit(err_msg)
2084  elif proc_src_types[1] in source_files:
2085  if proc_src_types[0] == 'geo':
2086  geo_files = get_source_geo_files(source_files, proc_src_types, 1)
2087  src_file_sets = list(zip(source_files[proc_src_types[1]],
2088  geo_files))
2089  else:
2090  err_msg = 'Error! Cannot find all {0} and' \
2091  ' {1} source files.'.format(proc_src_types[0],
2092  proc_src_types[1])
2093  log_and_exit(err_msg)
2094  else:
2095  err_msg = 'Error! Cannot find all source files.'
2096  log_and_exit(err_msg)
2097  else:
2098  err_msg = 'Error! Encountered too many source file types.'
2099  log_and_exit(err_msg)
2100  else:
2101  for proc_src_type in proc_src_types:
2102  if proc_src_type in source_files:
2103  src_file_sets = source_files[proc_src_type]
2104  return src_file_sets
2105 
2106 def get_source_files(input_files):
2107  """
2108  Returns a dictionary containing the programs to be run (as keys) and the
2109  a list of files on which that program should be run.
2110  """
2111  source_files = {}
2112  for file_path in input_files:
2113  ftype = input_files[file_path][0]
2114  if ftype in source_files:
2115  source_files[ftype].append(file_path)
2116  else:
2117  source_files[ftype] = [file_path]
2118  return source_files
2119 
2120 def get_source_products_types(targt_prod, ruleset):
2121  """
2122  Return the list of source product types needed to produce the final product.
2123  """
2124  src_prod_names = [targt_prod]
2125  targt_pos = ruleset.order.index(targt_prod)
2126  new_prod_names = []
2127  for pos in range(targt_pos, 1, -1):
2128  for prod_name in src_prod_names:
2129  if ruleset.rules[ruleset.order[pos]].target_type == prod_name:
2130  for src_typ in ruleset.rules[ruleset.order[pos]].src_file_types:
2131  new_prod_names.append(src_typ)
2132  src_prod_names += new_prod_names
2133  return src_prod_names
2134 
2136  """
2137  Returns an error message built from traceback data.
2138  """
2139  exc_parts = [str(l) for l in sys.exc_info()]
2140  err_type_parts = str(exc_parts[0]).strip().split('.')
2141  err_type = err_type_parts[-1].strip("'>")
2142  tb_data = traceback.format_exc()
2143  tb_line = tb_data.splitlines()[-3]
2144  line_num = tb_line.split(',')[1]
2145  st_data = traceback.extract_stack()
2146  err_file = os.path.basename(st_data[-1][0])
2147  msg = 'Error! The {0} program encountered an unrecoverable {1}, {2}, at {3} of {4}!'.\
2148  format(cfg_data.prog_name,
2149  err_type, exc_parts[1], line_num.strip(), err_file)
2150  return msg
2151 
2153  """
2154  Initialize sensors.
2155  """
2156  sensors = dict(general=Sensor(),
2157  goci=Sensor_goci(),
2158  hawkeye=Sensor_hawkeye(),
2159  meris=Sensor_meris(),
2160  modis=Sensor_modis(),
2161  seawifs=Sensor_seawifs(),
2162  viirs=Sensor_viirs())
2163  return sensors
2164 
2165 def log_and_exit(error_msg):
2166  """
2167  Record error_msg in the debug log, then exit with error_msg going to stderr
2168  and an exit code of 1; see:
2169  http://docs.python.org/library/sys.html#exit.
2170  """
2171  logging.info(error_msg)
2172  sys.exit(error_msg)
2173 
2174 def main():
2175  """
2176  main processing function.
2177  """
2178  global cfg_data
2179  global DEBUG
2180  # rules_sets = build_rules()
2181  global sensors_sets
2182  sensors_sets = initialze_sensors()
2183  cl_parser = optparse.OptionParser(usage=create_help_message(sensors_sets),
2184  version=' '.join(['%prog', __version__]))
2185  (options, args) = process_command_line(cl_parser)
2186 
2187  if len(args) < 1:
2188  print ("\nError! No file specified for processing.\n")
2189  cl_parser.print_help()
2190  else:
2191  if options.debug:
2192  # Don't just set DEBUG = options.debug, as that would override the
2193  # in-program setting.
2194  DEBUG = True
2195  check_options(options)
2196  # cfg_data = ProcessorConfig('.seadas_data', os.getcwd(),
2197  # options.verbose, options.overwrite,
2198  # options.use_existing, options.tar_file,
2199  # options.timing, options.odir)
2200  cfg_data = ProcessorConfig('.seadas_data', os.getcwd(),
2201  options.verbose, options.overwrite,
2202  options.use_existing, options.combine_files,
2203  options.deletefiles, options.odir)
2204  if not os.access(cfg_data.hidden_dir, os.R_OK):
2205  log_and_exit("Error! The working directory is not readable!")
2206  if os.path.exists(args[0]):
2207  log_timestamp = datetime.datetime.today().strftime('%Y%m%d%H%M%S')
2208  start_logging(log_timestamp)
2209  try:
2210  # if cfg_data.timing:
2211  # main_timer = benchmark_timer.BenchmarkTimer()
2212  # main_timer.start()
2213  # do_processing(sensors_sets, args[0])
2214  # main_timer.end()
2215  # timing_msg = 'Total processing time: {0}'.format(
2216  # str(main_timer.get_total_time_str()))
2217  # print (timing_msg)
2218  # logging.info(timing_msg)
2219  # else:
2220  if options.ifile:
2221  do_processing(sensors_sets, args[0], options.ifile)
2222  else:
2223  do_processing(sensors_sets, args[0])
2224  except Exception:
2225  if DEBUG:
2226  err_msg = get_traceback_message()
2227  log_and_exit(err_msg)
2228  else:
2229  # todo: make a friendlier error message
2230  err_msg = 'Unanticipated error encountered during processing!'
2231  log_and_exit(err_msg)
2232  else:
2233  err_msg = 'Error! Parameter file {0} does not exist.'.\
2234  format(args[0])
2235  sys.exit(err_msg)
2236  logging.shutdown()
2237  return 0
2238 
2239 def process_command_line(cl_parser):
2240  """
2241  Get arguments and options from the calling command line.
2242  To be consistent with other OBPG programs, an underscore ('_') is used for
2243  multiword options, instead of a dash ('-').
2244  """
2245  cl_parser.add_option('--debug', action='store_true', dest='debug',
2246  default=False, help=optparse.SUPPRESS_HELP)
2247  cl_parser.add_option('-d', '--deletefiles', action='store_true',
2248  dest='deletefiles', default=False,
2249  help='delete files created during processing')
2250  cl_parser.add_option('--ifile', action='store', type='string',
2251  dest='ifile', help="input file")
2252  cl_parser.add_option('--output_dir', '--odir',
2253  action='store', type='string', dest='odir',
2254  help="user specified directory for output")
2255  cl_parser.add_option('--overwrite', action='store_true',
2256  dest='overwrite', default=False,
2257  help='overwrite files which already exist (default = stop processing if file already exists)')
2258  # cl_parser.add_option('-t', '--tar', type=str, dest='tar_file',
2259  # help=optparse.SUPPRESS_HELP)
2260  # cl_parser.add_option('--timing', dest='timing', action='store_true',
2261  # default=False,
2262  # help='report time required to run each program and total')
2263  cl_parser.add_option('--use_existing', action='store_true',
2264  dest='use_existing', default=False,
2265  help='use files which already exist (default = stop processing if file already exists)')
2266  cl_parser.add_option('--combine_files', action='store_true',
2267  dest='combine_files', default=True,
2268  help='combine files for l2bin instead of binning each file separately')
2269  cl_parser.add_option('-v', '--verbose',
2270  action='store_true', dest='verbose', default=False,
2271  help='print status messages to stdout')
2272 
2273  (options, args) = cl_parser.parse_args()
2274  for ndx, cl_arg in enumerate(args):
2275  if cl_arg.startswith('par='):
2276  args[ndx] = cl_arg.lstrip('par=')
2277  if options.overwrite and options.use_existing:
2278  log_and_exit('Error! Options overwrite and use_existing cannot be ' + \
2279  'used simultaneously.')
2280  return options, args
2281 
2282 def read_file_list_file(flf_name):
2283  """
2284  Reads flf_name and returns the list of files to be processed.
2285  """
2286  files_list = []
2287  bad_lines = []
2288  with open(flf_name, 'rt') as flf:
2289  inp_lines = flf.readlines()
2290  for line in inp_lines:
2291  fname = line.split('#')[0].strip()
2292  if fname != '':
2293  if os.path.exists(fname):
2294  files_list.append(fname)
2295  else:
2296  bad_lines.append(fname)
2297  if len(bad_lines) > 0:
2298  err_msg = 'Error! File {0} specified the following input files which could not be located:\n {1}'.\
2299  format(flf_name, ', '.join([bl for bl in bad_lines]))
2300  log_and_exit(err_msg)
2301  return files_list
2302 
2303 def run_batch_processor(processor, file_set):
2304  """
2305  Run a processor, e.g. l2bin, which processes batches of files.
2306  """
2307  # logging.debug('in run_batch_processor, ndx = %d', ndx)
2308  if os.path.exists((file_set[0])) and tarfile.is_tarfile(file_set[0]):
2309  processor.input_file = file_set[0]
2310  else:
2311  timestamp = time.strftime('%Y%m%d_%H%M%S', time.gmtime(time.time()))
2312  file_list_name = cfg_data.hidden_dir + os.sep + 'files_' + \
2313  processor.target_type + '_' + timestamp + '.lis'
2314  with open(file_list_name, 'wt') as file_list:
2315  for fname in file_set:
2316  file_list.write(fname + '\n')
2317  processor.input_file = file_list_name
2318  data_file_list = []
2319  finder_opts = {}
2320  for fspec in file_set:
2321  dfile = get_obpg_data_file_object(fspec)
2322  data_file_list.append(dfile)
2323  if 'suite' in processor.par_data:
2324  finder_opts['suite'] = processor.par_data['suite']
2325  elif 'prod' in processor.par_data:
2326  finder_opts['suite'] = processor.par_data['prod']
2327  if 'resolution' in processor.par_data:
2328  finder_opts['resolution'] = processor.par_data['resolution']
2329  if 'oformat' in processor.par_data:
2330  finder_opts['oformat'] = processor.par_data['oformat']
2331  # name_finder = name_finder_utils.get_level_finder(data_file_list,
2332  # processors[ndx].target_type,
2333  # finder_opts)
2334  if processor.output_file:
2335  processor.output_file = os.path.join(processor.out_directory,
2336  processor.output_file )
2337  else:
2338  processor.output_file = os.path.join(processor.out_directory,
2340  processor.target_type,
2341  finder_opts))
2342  if DEBUG:
2343  log_msg = "Running {0} with input file {1} to generate {2} ".\
2344  format(processor.target_type,
2345  processor.input_file,
2346  processor.output_file)
2347  logging.debug(log_msg)
2348  processor.execute()
2349  return processor.output_file
2350 
2351 def run_nonbatch_processor(processor):
2352  """
2353  Run a processor which deals with single input files (or pairs of files in
2354  the case of MODIS L1B processing in which GEO files are also needed).
2355  """
2356  # if isinstance(file_set, tuple):
2357  # input_file = file_set[0]
2358  # geo_file = file_set[1]
2359  # else:
2360  # input_file = file_set
2361  # geo_file = None
2362  used_existing = False
2363  dfile = get_obpg_data_file_object(processor.input_file)
2364 
2365  cl_opts = optparse.Values()
2366  if 'suite' in processor.par_data:
2367  cl_opts.suite = processor.par_data['suite']
2368  elif 'prod' in processor.par_data:
2369  cl_opts.suite = processor.par_data['prod']
2370  else:
2371  cl_opts.suite = None
2372  if 'resolution' in processor.par_data:
2373  cl_opts.resolution = processor.par_data['resolution']
2374  else:
2375  cl_opts.resolution = None
2376  if 'oformat' in processor.par_data:
2377  cl_opts.oformat = processor.par_data['oformat']
2378  else:
2379  cl_opts.oformat = None
2380  # name_finder = name_finder_utils.get_level_finder([dfile],
2381  # processors[ndx].target_type,
2382  # cl_opts)
2383  if processor.output_file:
2384  output_file = os.path.join(processor.out_directory, processor.output_file)
2385  else:
2386  output_file = os.path.join(processor.out_directory,
2387  mlp.get_output_name_utils.get_output_name([dfile], processor.target_type, cl_opts))
2388  if DEBUG:
2389  print ('in run_nonbatch_processor, output_file = ' + output_file)
2390  # processor.input_file = input_file
2391  processor.output_file = output_file
2392  # processor.geo_file = geo_file
2393  # if 'deletefiles' in processor.par_data:
2394  # if processor.par_data['deletefiles']: # != 0:
2395  # if processor.par_data['deletefiles'] == 1:
2396  # processor.deletefiles = True
2397  # else:
2398  # processor.deletefiles = False
2399  if (not os.path.exists(output_file)) or cfg_data.overwrite:
2400  if cfg_data.verbose:
2401  print ()
2402  print ('\nRunning ' + str(processor))
2403  sys.stdout.flush()
2404  proc_status = processor.execute()
2405 
2406  if proc_status:
2407  output_file = None
2408  # print ("Error! Status {0} was returned during {1} {2} processing.".\
2409  # format(proc_status, processor.sensor.name, processor.target_type))
2410  msg = "Error! Status {0} was returned during {1} {2} processing.".\
2411  format(proc_status, processor.sensor.name,
2412  processor.target_type)
2413  # log_and_exit(msg)
2414  logging.debug(msg)
2415  # Todo: remove the failed file from future processing
2416  if output_file:
2417  output_file = ''
2418  elif not cfg_data.use_existing:
2419  log_and_exit('Error! Target file {0} already exists.'.\
2420  format(output_file))
2421  else:
2422  used_existing = True
2423  processor.input_file = ''
2424  processor.output_file = ''
2425  return output_file, used_existing
2426 
2427 def run_script(proc, script_name):
2428  """
2429  Build the command to run the processing script which is passed in.
2430  """
2431  prog = build_executable_path(script_name)
2432  args = ['ifile={}'.format(proc.input_file),'ofile={}'.format(proc.output_file)]
2433  args.append(get_options(proc.par_data))
2434  cmd = [prog]
2435  cmd.extend(args)
2436  # cmd = [prog, args]
2437  logging.debug('\nRunning: {}'.format(" ".join(str(x) for x in cmd)))
2438  # logging.debug("\nRunning: " + cmd)
2439  return execute_command(cmd)
2440 
2441 
2442 def start_logging(time_stamp):
2443  """
2444  Opens log file(s) for debugging.
2445  """
2446  info_log_name = ''.join(['Processor_', time_stamp, '.log'])
2447  debug_log_name = ''.join(['multilevel_processor_debug_', time_stamp,
2448  '.log'])
2449  info_log_path = os.path.join(cfg_data.output_dir, info_log_name)
2450  debug_log_path = os.path.join(cfg_data.output_dir, debug_log_name)
2451  mlp_logger = logging.getLogger()
2452  #mlp_logger.setLevel(logging.DEBUG)
2453 
2454  info_hndl = logging.FileHandler(info_log_path)
2455  info_hndl.setLevel(logging.INFO)
2456  mlp_logger.addHandler(info_hndl)
2457 
2458  if DEBUG:
2459  debug_hndl = logging.FileHandler(debug_log_path)
2460  debug_hndl.setLevel(logging.DEBUG)
2461  mlp_logger.addHandler(debug_hndl)
2462  logging.debug('Starting ' + os.path.basename(sys.argv[0]) + ' at ' +
2463  datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
2464 
2465 def uniqify_list(orig_list):
2466  """
2467  Returns a list with no duplicates. Somewhat borrowed from:
2468  http://www.peterbe.com/plog/uniqifiers-benchmark (example f5)
2469  """
2470  uniqified_list = []
2471  seen_items = {}
2472  for item in orig_list:
2473  if item not in seen_items:
2474  seen_items[item] = 1
2475  uniqified_list.append(item)
2476  return uniqified_list
2477 
2478 
2479 
2480 DEBUG = False
2481 #DEBUG = True
2482 
2483 cfg_data = None
2484 FILE_USE_OPTS = ['deletefiles', 'overwrite', 'use_existing', 'combine_files']
2485 SUFFIXES = {
2486  'geo': 'GEO',
2487  'l1brsgen': 'L1B_BRS',
2488  'l1aextract': 'L1A.sub',
2489  'l1aextract_viirs': 'L1A.sub',
2490  'l1aextract_seawifs': 'L1A.sub',
2491  'l1aextract_modis': 'L1A.sub',
2492  # 'l1mapgen': 'L1B_MAP',
2493  'l2bin': 'L3b',
2494  'l2brsgen': 'L2_BRS',
2495  'l2extract': 'L2.sub',
2496  'l2gen': 'L2',
2497  # 'l2mapgen': 'L2B_MAP',
2498  #'l3bin': 'L3b',
2499  'l3mapgen': 'L3m',
2500  'level 1a': 'L1A',
2501  'level 1b': 'L1B_LAC',
2502  # 'smigen': 'SMI'
2503 }
2504 input_file_data = {}
2505 #verbose = False
2506 
2507 if os.environ['OCSSWROOT']:
2508  OCSSWROOT_DIR = os.environ['OCSSWROOT']
2509  logging.debug('OCSSWROOT -> %s', OCSSWROOT_DIR)
2510 else:
2511  sys.exit('Error! Cannot find OCSSWROOT environment variable.')
2512 
2513 if __name__ == "__main__":
2514  sys.exit(main())
def get_source_products_types(targt_prod, ruleset)
def run_script(proc, script_name)
def get_processors2(sensor, par_contents, rules, lowest_source_level)
def get_input_files_type_data(input_files_list)
def get_intermediate_products(existing_prod_names, ruleset, lowest_source_level)
def get_output_name3(input_name, input_files, suffix)
def get_intermediate_processors(sensor, existing_procs, rules, lowest_source_level)
def get_batch_output_name(file_set, suffix)
def find_geo_file2(inp_file, instrument, lvl)
def get_par_file_contents(par_file, acceptable_single_keys)
def get_output_name2(inp_files, targ_prog, suite=None, oformt=None, res=None)
def build_file_list_file(filename, file_list)
def build_executable_path(prog_name)
def create_help_message(rules_sets)
list(APPEND LIBS ${NETCDF_LIBRARIES}) find_package(GSL REQUIRED) include_directories($
Definition: CMakeLists.txt:8
def find_viirs_geo_file(proc, first_svm_file)
def clean_files(delete_list)
void print(std::ostream &stream, const char *format)
Definition: PrintDebug.hpp:38
def get_obpg_data_file_object(file_specification)
def get_source_file_sets(proc_src_types, source_files, src_key, requires_all_sources)
def do_processing(sensors_sets, par_file, cmd_line_ifile=None)
def __init__(self, hidden_dir, ori_dir, verbose, overwrite, use_existing, deletefiles, combine_files, out_dir=None)
def get_required_programs(target_program, ruleset, lowest_source_level)
def get_processors(src_files, input_files, par_contents, files_to_delete)
def run_nonbatch_processor(processor)
def get_output_name(data_files, target_program, clopts)
def get_data_file_option(par_contents, opt_text)
def create_levels_list(rules_sets)
def extract_par_section(par_contents, section)
def get_lowest_source_level(source_files)
def get_source_files(input_files)
def run_batch_processor(processor, file_set)
def exe_processor(proc, src_files, src_lvl)
def process_command_line(cl_parser)
Definition: aerosol.c:136
def get_file_handling_opts(par_contents)
def get_source_geo_files(source_files, proc_src_types, proc_src_ndx)
def build_l2gen_par_file(par_contents, input_file, geo_file, output_file)