4 Program to perform multilevel processing (previously known as the
5 seadas_processor and sometimes referred to as the 'uber' processor).
11 import ConfigParser
as configparser
39 __author__ =
'melliott'
43 Configuration data for the program which needs to be widely available.
46 def __init__(self, hidden_dir, ori_dir, verbose, overwrite, use_existing,
47 deletefiles, combine_files, out_dir=None):
48 self.
prog_name = os.path.basename(sys.argv[0])
50 if not os.path.exists(hidden_dir):
54 if sys.exc_info()[1].find(
'Permission denied:') != -1:
74 cfg_file_path = os.path.join(self.
hidden_dir,
'seadas_ocssw.cfg')
75 if os.path.exists(cfg_file_path):
80 ProcessorConfig._instance = self
82 def _read_saved_options(self, cfg_path):
84 Gets options stored in the program's configuration file.
87 cfg_parser = configparser.ConfigParser()
88 cfg_parser.read(cfg_path)
91 int(cfg_parser.get(
'main',
94 except configparser.NoSectionError
as nse:
95 print (
'nse: ' +
str(nse))
96 print (
'sys.exc_info(): ')
97 for msg
in sys.exc_info():
98 print (
' ' +
str(msg))
99 log_and_exit(
'Error! Configuration file has no "main" ' +
101 except configparser.NoOptionError:
102 log_and_exit(
'Error! The "main" section of the configuration ' +
103 'file does not specify a "par_file_age".')
104 except configparser.MissingSectionHeaderError:
105 log_and_exit(
'Error! Bad configuration file, no section headers ' +
108 def _set_temp_dir(self):
110 Sets the value of the temporary directory.
112 if os.path.exists(
'/tmp')
and os.path.isdir(
'/tmp')
and \
113 os.access(
'/tmp', os.W_OK):
117 if os.path.exists(cwd)
and os.path.isdir(cwd)
and \
118 os.access(cwd, os.W_OK):
121 log_and_exit(
'Error! Unable to establish a temporary ' +
124 def _write_default_cfg_file(self, cfg_path):
126 Writes out a configuration file using default values.
128 with open(cfg_path,
'wt')
as cfg_file:
129 cfg_file.write(
'[main]\n')
130 cfg_file.write(
'par_file_age=30 # units are days\n')
134 Sensor contains the recipe and procssing method for general sensors.
139 'level 1a': processing_rules.build_rule(
'level 1a', [
'level 0'],
141 'l1brsgen': processing_rules.build_rule(
'l1brsgen', [
'l1'],
143 'l2brsgen': processing_rules.build_rule(
'l2brsgen', [
'l2gen'],
149 'level 1b': processing_rules.build_rule(
'level 1b', [
'level 1a'],
151 'l2gen': processing_rules.build_rule(
'l2gen', [
'l1'], self.
run_l2gen,
153 'l2extract': processing_rules.build_rule(
'l2extract', [
'l2gen'],
155 'l2bin': processing_rules.build_rule(
'l2bin', [
'l2gen'], self.
run_l2bin,
159 'l3mapgen': processing_rules.build_rule(
'l3mapgen', [
'l2bin'],
165 'l2extract',
'l2brsgen',
'l2bin',
'l3mapgen']
166 self.
name =
'general'
173 Exits with an error message when there is an attempt to process a source
174 file at the lowest level of a rule chain.
176 err_msg =
'Error! Attempting to create {0} product, but no creation program is known.'.format(proc.target_type)
181 Sets up and runs an executable program.
184 prog = os.path.join(proc.ocssw_bin,
'l1bgen_generic')
185 args = [
'ifile={}'.format(proc.input_file),
'ofile={}'.format(proc.output_file)]
186 if not proc.geo_file
is None:
187 args.append(
'geofile={}'.format(proc.geo_file))
196 Runs the l1brsgen executable.
202 prog = os.path.join(proc.ocssw_bin,
'l1brsgen')
206 cmd.extend([
'ifile={}'.format(proc.input_file),
'ofile={}'.format(proc.output_file)])
208 cmd.append(
'geofile={}'.format(proc.geo_file))
211 logging.debug(
'Executing: "%s"',
" ".join(
str(x)
for x
in cmd))
217 Set up for and perform L2 binning.
219 prog = os.path.join(proc.ocssw_bin,
'l2bin')
220 if not os.path.exists(prog):
221 print (
"Error! Cannot find executable needed for {0}".\
222 format(proc.rule_set.rules[proc.target_type].action))
223 args = [
'infile={}'.format(proc.input_file),
224 'ofile={}'.format(proc.output_file)]
231 print (
'l2bin cmd: {}'.format(
" ".join(
str(x)
for x
in cmd)))
234 if os.path.exists(proc.output_file):
235 msg =
'-I- The l2bin program returned a status value of {0}. Proceeding with processing, using the output l2 bin file {1}'.format(ret_val, proc.output_file)
239 msg =
'-I- The l2bin program produced a bin file with no data. No further processing will be done.'
245 Runs the l2brsgen executable.
247 logging.debug(
"In run_l2brsgen")
248 prog = os.path.join(proc.ocssw_bin,
'l2brsgen')
252 cmd.extend([
'ifile={}'.format(proc.input_file),
'ofile={}'.format(proc.output_file)])
253 logging.debug(
'Executing: "%s"',
" ".join(
str(x)
for x
in cmd))
259 Set up and run l2extract.
261 if 'SWlon' in proc.par_data
and 'SWlat' in proc.par_data
and \
262 'NElon' in proc.par_data
and 'NElat' in proc.par_data:
264 if (start_line
is None)
or (end_line
is None)
or (start_pixel
is None) \
265 or (end_pixel
is None):
266 err_msg =
'Error! Could not compute coordinates for l2extract.'
268 l2extract_prog = os.path.join(proc.ocssw_bin,
'l2extract')
269 l2extract_cmd = [l2extract_prog, proc.input_file,
270 str(start_pixel),
str(end_pixel),
271 str(start_line),
str(end_line),
'1',
'1',
273 logging.debug(
'Executing l2extract command: {}'.format(
" ".join(
str(x)
for x
in l2extract_cmd)))
277 err_msg =
'Error! Geographical coordinates not specified for l2extract.'
282 Set up for and perform L2 processing.
286 getanc_cmd = [getanc_prog, proc.input_file]
287 logging.debug(
'running getanc command: {}'.format(
" ".join(
str(x)
for x
in getanc_cmd)))
289 l2gen_prog = os.path.join(proc.ocssw_bin,
'l2gen')
290 if not os.path.exists(l2gen_prog):
291 print (
"Error! Cannot find executable needed for {0}".\
292 format(proc.rule_set.rules[proc.target_type].action))
294 proc.geo_file, proc.output_file)
295 logging.debug(
'L2GEN_FILE=' + proc.output_file)
297 args =
'par={}/{}'.format(cfg_data.original_dir, par_name)
298 l2gen_cmd = [l2gen_prog, args]
299 if cfg_data.verbose
or DEBUG:
300 logging.debug(
'l2gen cmd: {}'.format(
" ".join(
str(x)
for x
in l2gen_cmd)))
305 Set up and run the l3Bin program
307 prog = os.path.join(proc.ocssw_bin,
'l3bin')
308 if not os.path.exists(prog):
309 print (
"Error! Cannot find executable needed for {0}".\
310 format(proc.rule_set.rules[proc.target_type].action))
311 args = [
'ifile={}'.format(proc.input_file)]
312 for key
in proc.par_data:
313 if (key !=
'odir')
and (key !=
'ofile')
and not key.lower()
in FILE_USE_OPTS:
314 args.append(
"{}={}".format(key,proc.par_data[key]))
315 args.append(
"in={}".format(proc.input_file))
316 args.append(
"out={}".format(proc.output_file))
320 logging.debug(
'Executing l3bin command: {}'.format(
" ".join(
str(x)
for x
in cmd)))
323 if os.path.exists(proc.output_file):
324 print(msg =
'-I- The l3bin program returned a status value of {0}. Proceeding with processing, using the output l2 bin file {1}'.format(
325 ret_val, proc.output_file))
326 msg =
'-I- The l3bin program returned a status value of {0}. Proceeding with processing, using the output l2 bin file {1}'.format(
327 ret_val, proc.output_file)
337 Set up and run the l3mapgen program.
339 prog = os.path.join(proc.ocssw_bin,
'l3mapgen')
340 if not os.path.exists(prog):
341 print (
"Error! Cannot find executable needed for {0}".\
342 format(proc.rule_set.rules[proc.target_type].action))
343 args = [
'ifile={}'.format(proc.input_file)]
344 for key
in proc.par_data:
345 if (key !=
'odir')
and (key !=
'ofile')
and not key.lower()
in FILE_USE_OPTS:
346 args.append(
'{}={}'.format(key, proc.par_data[key]))
347 args.append(
'ofile={}'.format(proc.output_file))
351 logging.debug(
'Executing l3mapgen command: "%s"',
" ".join(
str(x)
for x
in cmd))
379 Sensor GOCI contains GOCI specific recipe and procssing methods.
384 'level 1a': processing_rules.build_rule(
'level 1a', [
'level 0'],
386 'l1brsgen': processing_rules.build_rule(
'l1brsgen', [
'l1'],
388 'l2brsgen': processing_rules.build_rule(
'l2brsgen', [
'l2gen'],
394 'level 1b': processing_rules.build_rule(
'level 1b', [
'level 1a'],
396 'l2gen': processing_rules.build_rule(
'l2gen', [
'level 1b'], self.
run_l2gen,
398 'l2extract': processing_rules.build_rule(
'l2extract', [
'l2gen'],
400 'l2bin': processing_rules.build_rule(
'l2bin', [
'l2gen'], self.
run_l2bin,
404 'l3mapgen': processing_rules.build_rule(
'l3mapgen', [
'l2bin'],
410 'l2extract',
'l2brsgen',
'l2bin',
'l3mapgen']
417 Sensor HAWKEYE contains HAWKEYE specific recipe and procssing methods.
422 'level 1a': processing_rules.build_rule(
'level 1a', [
'nothing lower'],
424 'l1brsgen': processing_rules.build_rule(
'l1brsgen', [
'level 1a',
'geo'],
428 'geo': processing_rules.build_rule(
'geo', [
'level 1a'],
430 'l2gen': processing_rules.build_rule(
'l2gen', [
'level 1a',
'geo'],
432 'l2extract': processing_rules.build_rule(
'l2extract', [
'l2gen'],
434 'l2brsgen': processing_rules.build_rule(
'l2brsgen', [
'l2gen'],
438 'l2bin': processing_rules.build_rule(
'l2bin', [
'l2gen'], self.
run_l2bin,
442 'l3mapgen': processing_rules.build_rule(
'l3mapgen', [
'l2bin'],
448 'l2gen',
'l2extract',
'l2bin',
449 'l2brsgen',
'l3mapgen']
456 Set up and run the geolocate_hawkeye program, returning the exit status of the run.
458 logging.debug(
'In run_geolocate_hawkeye')
464 err_msg =
'Error! Cannot find program geolocate_hawkeye.'
465 logging.info(err_msg)
467 args = [proc.input_file, proc.output_file]
472 logging.debug(
'\nRunning: {}'.format(
" ".join(
str(x)
for x
in cmd)))
477 Sensor MERIS contains MERIS specific recipe and processing methods.
480 target type (string), source types (list of strings), batch processing
481 flag (Boolean), action to take (function name)
486 'level 1a': processing_rules.build_rule(
'level 1a', [
'level 0'],
488 'l1brsgen': processing_rules.build_rule(
'l1brsgen', [
'l1'],
490 'l2brsgen': processing_rules.build_rule(
'l2brsgen', [
'l2gen'],
496 'level 1b': processing_rules.build_rule(
'level 1b', [
'level 1a'],
498 'l2gen': processing_rules.build_rule(
'l2gen', [
'level 1b'], self.
run_l2gen,
500 'l2extract': processing_rules.build_rule(
'l2extract', [
'l2gen'],
502 'l2bin': processing_rules.build_rule(
'l2bin', [
'l2gen'], self.
run_l2bin,
506 'l3mapgen': processing_rules.build_rule(
'l3mapgen', [
'l2bin'],
512 'l2extract',
'l2brsgen',
'l2bin',
'l3mapgen']
519 Sensor MODIS contains MODIS specific recipe and processing methods.
524 'level 0': processing_rules.build_rule(
'level 0', [
'nothing lower'],
526 'level 1a': processing_rules.build_rule(
'level 1a', [
'level 0'],
528 'l1brsgen': processing_rules.build_rule(
'l1brsgen', [
'level 1b',
'geo'],
532 'geo': processing_rules.build_rule(
'geo', [
'level 1a'], self.
run_geo,
534 'l1aextract': processing_rules.build_rule(
'l1aextract',
538 'level 1b': processing_rules.build_rule(
'level 1b',
541 'l2gen': processing_rules.build_rule(
'l2gen', [
'level 1b',
'geo'],
543 'l2extract': processing_rules.build_rule(
'l2extract', [
'l2gen'],
545 'l2brsgen': processing_rules.build_rule(
'l2brsgen', [
'l2gen'],
549 'l2bin': processing_rules.build_rule(
'l2bin', [
'l2gen'], self.
run_l2bin,
553 'l3mapgen': processing_rules.build_rule(
'l3mapgen', [
'l2bin'],
559 'level 1b',
'l1brsgen',
'l2gen',
'l2extract',
560 'l2bin',
'l2brsgen',
'l3mapgen']
567 Set up and run l1aextract_modis.
569 if 'SWlon' in proc.par_data
and 'SWlat' in proc.par_data
and\
570 'NElon' in proc.par_data
and 'NElat' in proc.par_data:
572 if (start_line
is None)
or (end_line
is None)
or (start_pixel
is None)\
573 or (end_pixel
is None):
574 err_msg =
'Error! Cannot find l1aextract_modis coordinates.'
576 l1aextract_prog = os.path.join(proc.ocssw_bin,
'l1aextract_modis')
577 l1aextract_cmd = [l1aextract_prog, proc.input_file,
578 str(start_pixel),
str(end_pixel),
579 str(start_line),
str(end_line),
581 logging.debug(
'Executing l1aextract_modis command: "%s"',
582 " ".join(
str(x)
for x
in l1aextract_cmd))
588 Sets up and runs the MODIS GEO script.
592 args = [proc.input_file]
593 args.append(
'--output={}'.format(proc.output_file))
598 logging.debug(
'\nRunning: {}'.format(
" ".join(
str(x)
for x
in cmd)))
603 Sets up and runs the MODIS L1A script.
606 args = [proc.input_file]
607 args.append(
'--output={}'.format(proc.output_file))
612 logging.debug(
'\nRunning: {}'.format(
" ".join(
str(x)
for x
in cmd)))
620 args = [
'-o', proc.output_file]
624 args.append(proc.input_file)
625 if not proc.geo_file
is None:
626 args.append(proc.geo_file)
629 logging.debug(
'\nRunning: {}'.format(
" ".join(
str(x)
for x
in cmd)))
634 Sensor SeaWiFS contains SeaWiFS sepcific recipe and processing method.
639 'level 1a': processing_rules.build_rule(
'level 1a', [
'level 0'],
641 'l1aextract': processing_rules.build_rule(
'l1aextract',
645 'l1brsgen': processing_rules.build_rule(
'l1brsgen', [
'l1'],
649 'level 1b': processing_rules.build_rule(
'level 1b', [
'level 1a'],
651 'l2gen': processing_rules.build_rule(
'l2gen', [
'l1'], self.
run_l2gen,
653 'l2extract': processing_rules.build_rule(
'l2extract', [
'l2gen'],
655 'l2brsgen': processing_rules.build_rule(
'l2brsgen', [
'l2gen'],
659 'l2bin': processing_rules.build_rule(
'l2bin', [
'l2gen'], self.
run_l2bin,
663 'l3mapgen': processing_rules.build_rule(
'l3mapgen', [
'l2bin'],
669 'level 1b',
'l2gen',
'l2extract',
670 'l2brsgen',
'l2bin',
'l3mapgen']
677 Set up and run l1aextract_seawifs.
679 if 'SWlon' in proc.par_data
and 'SWlat' in proc.par_data
and\
680 'NElon' in proc.par_data
and 'NElat' in proc.par_data:
682 if (start_line
is None)
or (end_line
is None)
or (start_pixel
is None)\
683 or (end_pixel
is None):
684 err_msg =
'Error! Cannot compute l1aextract_seawifs coordinates.'
686 l1aextract_prog = os.path.join(proc.ocssw_bin,
'l1aextract_seawifs')
687 l1aextract_cmd = [l1aextract_prog, proc.input_file,
688 str(start_pixel),
str(end_pixel),
689 str(start_line),
str(end_line),
'1',
'1',
691 logging.debug(
'Executing l1aextract_seawifs command: "%s"',
692 " ".join(
str(x)
for x
in l1aextract_cmd))
698 Sensor VIIRS contains VIIRS sepcific recipe and processing method..
703 'level 1a': processing_rules.build_rule(
'level 1a', [
'nothing lower'],
705 'l1brsgen': processing_rules.build_rule(
'l1brsgen', [
'l1',
'geo'],
709 'geo': processing_rules.build_rule(
'geo', [
'level 1a'],
711 'l1aextract': processing_rules.build_rule(
'l1aextract',
715 'level 1b': processing_rules.build_rule(
'level 1b', [
'level 1a',
'geo'],
717 'l2gen': processing_rules.build_rule(
'l2gen', [
'l1',
'geo'],
719 'l2extract': processing_rules.build_rule(
'l2extract', [
'l2gen'],
721 'l2brsgen': processing_rules.build_rule(
'l2brsgen', [
'l2gen'],
725 'l2bin': processing_rules.build_rule(
'l2bin', [
'l2gen'], self.
run_l2bin,
729 'l3mapgen': processing_rules.build_rule(
'l3mapgen', [
'l2bin'],
734 self.
rules_order = [
'level 1a',
'geo',
'l1aextract',
'level 1b',
'l1brsgen',
735 'l2gen',
'l2extract',
'l2bin',
736 'l2brsgen',
'l3mapgen']
743 Set up and run the geolocate_viirs program, returning the exit status of the run.
745 logging.debug(
'In run_geolocate_viirs')
751 err_msg =
'Error! Cannot find program geolocate_viirs.'
752 logging.info(err_msg)
754 args = [
'-ifile={}'.format(proc.input_file),
'-geofile_mod={}'.format(proc.output_file)]
759 logging.debug(
'\nRunning: {}'.format(
" ".join(
str(x)
for x
in cmd)))
763 logging.debug(
'In run_viirs_l1b')
767 args = [
'ifile={}'.format(proc.input_file),
'l1bfile_mod={}'.format(proc.output_file)]
778 logging.debug(
'\nRunning: {}'.format(
" ".join(
str(x)
for x
in cmd)))
783 Set up and run l1aextract_viirs.
785 if 'SWlon' in proc.par_data
and 'SWlat' in proc.par_data
and\
786 'NElon' in proc.par_data
and 'NElat' in proc.par_data:
788 elif 'sline' in proc.par_data
and 'eline' in proc.par_data
and\
789 'spixl' in proc.par_data
and 'epixl' in proc.par_data:
790 start_line = proc.par_data[
'sline']
791 end_line = proc.par_data[
'eline']
792 start_pixel = proc.par_data[
'spixl']
793 end_pixel = proc.par_data[
'epixl']
795 if (start_line
is None)
or (end_line
is None)
or (start_pixel
is None)\
796 or (end_pixel
is None):
797 err_msg =
'Error! Cannot find l1aextract_viirs coordinates.'
799 l1aextract_prog = os.path.join(proc.ocssw_bin,
'l1aextract_viirs')
800 l1aextract_cmd = [l1aextract_prog, proc.input_file,
801 str(start_pixel),
str(end_pixel),
802 str(start_line),
str(end_line),
804 logging.debug(
'Executing l1aextract_viirs command: "%s"',
805 " ".join(
str(x)
for x
in l1aextract_cmd))
811 Returns an obpg_data_file object for the file named in file_specification.
814 (ftype, sensor) = ftyper.get_file_type()
815 (stime, etime) = ftyper.get_file_times()
816 obpg_data_file_obj = obpg_data_file.ObpgDataFile(file_specification, ftype,
817 sensor, stime, etime,
819 return obpg_data_file_obj
823 Returns the directory in which the program named in prog_name is found.
824 None is returned if the program is not found.
827 candidate_subdirs = [
'bin',
'scripts']
828 for subdir
in candidate_subdirs:
829 cand_path = os.path.join(OCSSWROOT_DIR, subdir, prog_name)
830 if os.path.exists(cand_path):
837 Create a file listing the names of the files to be processed.
839 with open(filename,
'wt')
as file_list_file:
840 for fname
in file_list:
841 file_list_file.write(fname +
'\n')
845 Build the parameter file for L2 processing.
847 dt_stamp = datetime.datetime.today()
848 par_name =
''.join([
'L2_', dt_stamp.strftime(
'%Y%m%d%H%M%S'),
'.par'])
849 par_path = os.path.join(cfg_data.hidden_dir, par_name)
850 with open(par_path,
'wt')
as par_file:
851 par_file.write(
'# Automatically generated par file for l2gen\n')
852 par_file.write(
'ifile=' + input_file +
'\n')
853 if not geo_file
is None:
854 par_file.write(
'geofile=' + geo_file +
'\n')
855 par_file.write(
'ofile=' + output_file +
'\n')
856 for l2_opt
in par_contents:
857 if l2_opt !=
'ifile' and l2_opt !=
'geofile' \
858 and l2_opt !=
'ofile' and l2_opt !=
'odir' \
859 and not l2_opt
in FILE_USE_OPTS:
860 par_file.write(l2_opt +
'=' + par_contents[l2_opt] +
'\n')
865 Check command line options
873 if not os.path.exists(options.ifile):
874 err_msg =
'Error! The specified input file, {0}, does not exist.'. \
875 format(options.ifile)
880 Delete unwanted files created during processing.
883 print (
"Cleaning up files")
889 for filepath
in delete_list:
891 print (
'Deleting {0}'.format(filepath))
896 hidden_files = os.listdir(cfg_data.hidden_dir)
897 par_files = [f
for f
in hidden_files
if f.endswith(
'.par')]
898 for par_file
in par_files:
899 par_path = os.path.join(cfg_data.hidden_dir, par_file)
900 file_age = round(time.time()) - os.path.getmtime(par_path)
901 if file_age > cfg_data.max_file_age:
903 print (
'Deleting {0}'.format(par_path))
908 if not files_deleted:
909 print (
'No files were found for deletion.')
911 elif files_deleted == 1:
912 print (
'One file was deleted.')
915 print (
'A total of {0} files were deleted.'.format(files_deleted))
920 Returns a list containing all the levels from all the rules sets.
922 set_key =
list(rules_sets.keys())[0]
923 logging.debug(
'set_key = %s', (set_key))
924 lvls_lst = [(lvl, [set_key])
for lvl
in rules_sets[set_key].rules_order[1:]]
925 for rules_set_name
in list(rules_sets.keys())[1:]:
926 for lvl_name
in rules_sets[rules_set_name].rules_order[1:]:
927 names_list = [lst_item[0]
for lst_item
in lvls_lst]
928 if lvl_name
in names_list:
929 lvls_lst[names_list.index(lvl_name)][1].append(rules_set_name)
931 prev_ndx = rules_sets[rules_set_name].rules_order.index(lvl_name) - 1
932 if rules_sets[rules_set_name].rules_order[prev_ndx]
in names_list:
933 ins_ndx = names_list.index(rules_sets[rules_set_name].rules_order[prev_ndx]) + 1
936 lvls_lst.insert(ins_ndx, (lvl_name, [rules_set_name]))
942 Creates the message to be displayed when help is provided.
946 %prog [options] parameter_file
948 The parameter_file is similar to, but not exactly like, parameter
949 files for OCSSW processing programs:
950 - It has sections separated by headers which are denoted by "["
952 The section named "main" is required. Its allowed options are:
953 ifile - Required entry naming the input file(s) to be processed.
954 use_ancillary - use near real time ancillary data
955 deletefiles - delete all the intermediate data files genereated
956 overwrite - overwrite any data files which already exist
957 use_existing - use any data files which already exist
958 combine_files - l2bin bin all the L2 files together instead of bin each L2 file separately
960 Simultaneous use of both the overwrite and use_existing options
963 The names for other sections are the programs for which that section's
964 entries are to be applied. Intermediate sections which are required for the
965 final level of processing do not need to be defined if their default options
966 are acceptable. A section can be empty. The final level of processing
967 must have a section header, even if no entries appear within that section.
968 - Entries within a section appear as key=value. Comma separated lists of
969 values can be used when appropriate.
970 - Comments are marked by "#"; anything appearing on a line after that
971 character is ignored. A line beginning with a "#" is completely ignored.
973 In addition to the main section, the following sections are allowed:
974 Section name: Applicable Instrument(s):
975 ------------- -------------------------\n"""
978 for lname
in level_names:
979 lvl_name_help +=
' {0:24s}{1}\n'.\
980 format(lname[0] +
':',
', '.join(lname[1]))
982 message += lvl_name_help
986 # Sample par file for %prog.
988 ifile=2010345034027.L1A_LAC
991 # final processing level
997 Perform the processing for each step (element of processor_list) needed.
999 global input_file_data
1001 files_to_delete = []
1002 input_files_list = []
1006 skip_par_ifile =
True
1007 if os.path.exists(cmd_line_ifile):
1008 input_files_list = [cmd_line_ifile]
1010 msg =
'Error! Specified ifile {0} does not exist.'.\
1011 format(cmd_line_ifile)
1014 skip_par_ifile =
False
1015 if par_contnts[
'main']:
1016 if (
not skip_par_ifile)
and (
not 'ifile' in par_contnts[
'main']):
1017 msg =
'Error! No ifile specified in the main section of {0}.'.\
1024 cfg_data.deletefiles =
True
1026 cfg_data.use_existing =
True
1028 cfg_data.overwrite =
True
1030 cfg_data.combine_files =
True
1031 if 'use_ancillary' in par_contnts[
'main']
and \
1032 int(par_contnts[
'main'][
'use_ancillary']) == 0:
1033 cfg_data.get_anc =
False
1034 if 'odir' in par_contnts[
'main']:
1035 dname = par_contnts[
'main'][
'odir']
1036 if os.path.exists(dname):
1037 if os.path.isdir(dname):
1038 if cfg_data.output_dir_is_settable:
1039 cfg_data.output_dir = os.path.realpath(dname)
1041 log_msg =
'Ignoring par file specification for output directory, {0}; using command line value, {1}.'.format(par_contnts[
'main'][
'odir'], cfg_data.output_dir)
1042 logging.info(log_msg)
1044 msg =
'Error! {0} is not a directory.'.format(dname)
1047 msg =
'Error! {0} does not exist.'.format(dname)
1050 logging.debug(
'cfg_data.overwrite: ' +
str(cfg_data.overwrite))
1051 logging.debug(
'cfg_data.use_existing: ' +
str(cfg_data.use_existing))
1052 logging.debug(
'cfg_data.deletefiles: ' +
str(cfg_data.deletefiles))
1053 logging.debug(
'cfg_data.combine_files: ' +
str(cfg_data.combine_files))
1054 if cfg_data.overwrite
and cfg_data.use_existing:
1055 err_msg =
'Error! Incompatible options overwrite and use_existing were found in {0}.'.format(par_file)
1057 if len(input_files_list) == 1:
1058 if MetaUtils.is_ascii_file(input_files_list[0])
and not MetaUtils.is_metadata_file(input_files_list[0]):
1061 if not input_file_data:
1062 log_and_exit(
'No valid data files were specified for processing.')
1063 logging.debug(
"input_file_data: " +
str(input_file_data))
1067 get_processors(src_files, input_file_data, par_contnts, files_to_delete)
1073 err_msg =
"Unrecoverable error encountered in processing."
1077 if cfg_data.verbose:
1078 print (
"Processing complete.")
1080 logging.debug(
"Processing complete.")
1085 Execute what is contained in command and then output the results to log
1086 files and the console, as appropriate.
1089 print (
"Entering execute_command, cfg_data.verbose =",
1091 log_msg =
'Executing command:\n {0}'.format(command)
1092 logging.debug(log_msg)
1095 subproc = subprocess.run(command, capture_output=
True, text=
True, shell=
False)
1096 std_out, err_out = subproc.stdout, subproc.stderr
1097 status = subproc.returncode
1098 logging.info(std_out)
1099 logging.info(err_out)
1100 if cfg_data.verbose:
1106 Returns a single section (e.g. L1a, GEO, L1B, L2, etc.) from the "par" file.
1109 for key
in list(par_contents[section].keys()):
1110 sect_dict[key] = par_contents[section][key]
1115 Searches for a GEO file corresponding to inp_file. If that GEO file exists,
1116 returns that file name; otherwise, returns None.
1118 src_dir = os.path.dirname(inp_file)
1119 src_base = os.path.basename(inp_file)
1120 src_base_tmp = src_base.replace(
"L1B",
"GEO")
1121 geo_base = src_base_tmp.replace(
"_LAC",
"")
1123 geo_file = os.path.join(src_dir, geo_base)
1124 if not os.path.exists(geo_file):
1130 Searches for a GEO file corresponding to inp_file. If that GEO file exists,
1131 returns that file name; otherwise, returns None.
1133 src_dir = os.path.dirname(inp_file)
1134 src_base = os.path.basename(inp_file)
1135 if instrument.find(
'hawkeye') != -1:
1136 src_base_tmp = src_base.replace(
"L1A",
"GEO")
1137 geo_base = src_base_tmp.replace(
"nc",
"hdf")
1138 elif instrument.find(
'modis') != -1:
1139 if lvl.find(
'level 1a') != -1:
1140 src_base_tmp = src_base.replace(
"L1A",
"GEO")
1141 geo_base_tmp = src_base_tmp.replace(
"_LAC",
"")
1142 if geo_base_tmp.find(
'MODIS') != -1:
1143 geo_base = geo_base_tmp.replace(
"nc",
"hdf")
1145 geo_base = geo_base_tmp
1146 elif lvl.find(
'level 1b') != -1:
1147 src_base_tmp = src_base.replace(
"L1B",
"GEO")
1148 geo_base = src_base_tmp.replace(
"_LAC",
"")
1149 elif instrument.find(
'viirs') != -1:
1150 if lvl.find(
'level 1a') != -1:
1151 geo_base_tmp = src_base.replace(
"L1A",
"GEO-M")
1152 elif lvl.find(
'level 1b') != -1:
1153 geo_base_tmp = src_base.replace(
"L1B",
"GEO")
1154 if geo_base_tmp.find(
'VIIRS') != -1:
1155 geo_base_tmp2 = geo_base_tmp.replace(
"nc",
"hdf")
1156 geo_base = geo_base_tmp2.replace(
"GEO-M",
"GEO_M")
1158 geo_base = geo_base_tmp
1160 geo_file = os.path.join(src_dir, geo_base)
1161 if not os.path.exists(geo_file):
1167 Searches for a GEO file corresponding to first_svm_file. If that GEO file
1168 exists, returns that file name; otherwise, returns None.
1170 fname = first_svm_file.replace(
'SVM01',
'GMTCO').rstrip()
1171 if not os.path.exists(fname):
1177 Returns the output file for a "batch" run, i.e. a process that can accept
1178 multiple inputs, such as l2bin or l3bin.
1180 mission_prefixes = [
'A',
'C',
'O',
'S',
'T']
1182 if not len(file_set):
1183 err_msg =
"Error! An output file name could not be determined."
1185 elif len(file_set) == 1:
1186 stem = os.path.splitext(file_set[0])[0]
1188 earliest_file = file_set[0]
1189 latest_file = file_set[0]
1192 for cur_file
in file_set[1:]:
1194 if file_date < earliest_file_date:
1195 earliest_file = cur_file
1196 earliest_file_date = file_date
1197 elif file_date > latest_file_date:
1198 latest_file = cur_file
1199 latest_file_date = file_date
1200 if (earliest_file[0] == latest_file[0])
and \
1201 (earliest_file[0]
in mission_prefixes):
1202 stem = earliest_file[0]
1205 earliest_file_date_stamp = earliest_file_date.strftime(
'%Y%j')
1206 latest_file_date_stamp = latest_file_date.strftime(
'%Y%j')
1207 if earliest_file_date_stamp == latest_file_date_stamp:
1208 stem += earliest_file_date_stamp
1210 stem += earliest_file_date_stamp + latest_file_date_stamp
1211 return ''.join([stem,
'.', suffix])
1215 If found in par_contents, the value for the option specified by opt_text
1216 is returned; otherwise, False is returned.
1219 if re.search(
'combine_files', opt_text):
1223 if opt_text
in par_contents[
'main']:
1224 opt_str = par_contents[
'main'][opt_text].upper()
1225 opt_found = mlp_utils.is_option_value_true(opt_str)
1230 Run the lonlat2pixline program and return the parameters found.
1234 in_file = proc.geo_file
1237 in_file = proc.input_file
1239 proc.par_data[
'SWlon'],
1240 proc.par_data[
'SWlat'],
1241 proc.par_data[
'NElon'],
1242 proc.par_data[
'NElat']]
1243 lonlat_prog = os.path.join(proc.ocssw_bin,
'lonlat2pixline')
1244 lonlat_cmd = [lonlat_prog]
1245 lonlat_cmd.extend(args)
1246 logging.debug(
'Executing lonlat2pixline command: "%s"',
" ".join(
str(x)
for x
in lonlat_cmd))
1247 process_output = subprocess.run(lonlat_cmd, capture_output=
True, text=
True, shell=
False)
1248 lonlat_output = process_output.stdout.splitlines()
1253 for line
in lonlat_output:
1254 line_text =
str(line).strip(
"'")
1255 if 'sline' in line_text:
1256 start_line =
int(line_text.split(
'=')[1])
1257 if 'eline' in line_text:
1258 end_line =
int(line_text.split(
'=')[1])
1259 if 'spixl' in line_text:
1260 start_pixel =
int(line_text.split(
'=')[1])
1261 if 'epixl' in line_text:
1262 end_pixel =
int(line_text.split(
'=')[1])
1263 return start_line, end_line, start_pixel, end_pixel
1267 Get a Python Date object from a recognized file name's year and day of year.
1269 base_filename = os.path.basename(filename)
1270 if re.match(
r'[ACMOQSTV]\d\d\d\d\d\d\d.*', base_filename):
1271 year =
int(base_filename[1:5])
1272 doy =
int(base_filename[5:8])
1273 elif re.match(
r'\d\d\d\d\d\d\d.*', base_filename):
1275 year =
int(base_filename[0:4])
1276 doy =
int(base_filename[4:7])
1277 elif re.match(
r'\w*_npp_d\d\d\d\d\d\d\d_.*', base_filename):
1279 prefix_removed_name = re.sub(
r'\w*_npp_d',
'', base_filename)
1280 year =
int(prefix_removed_name[0:4])
1281 doy =
int(prefix_removed_name[5:7])
1283 err_msg =
'Unable to determine date for {0}'.format(filename)
1285 file_date = datetime.datetime(year, 1, 1) + datetime.timedelta(doy - 1)
1290 Returns the values of the file handling options in par_contents.
1296 return deletefiles, use_existing, overwrite, combine_files
1300 Get input files found in the uber par file's ifile line, a file list file,
1301 or both. Ensure that the list contains no duplicates.
1305 from_infilelist = []
1306 if 'ifile' in par_data[
'main']:
1307 inp_file_str = par_data[
'main'][
'ifile'].split(
'#', 2)[0]
1308 cleaned_str = re.sub(
r'[\t,:\[\]()"\']',
' ', inp_file_str)
1309 from_ifiles = cleaned_str.split()
1310 if 'infilelist' in par_data[
'main']:
1311 infilelist_name = par_data[
'main'][
'infilelist']
1312 if os.path.exists(infilelist_name):
1313 if os.path.isfile(infilelist_name)
and \
1314 os.access(infilelist_name, os.R_OK):
1315 with open(infilelist_name,
'rt')
as in_file_list_file:
1316 inp_lines = in_file_list_file.readlines()
1317 from_infilelist = [fn.rstrip()
for fn
in inp_lines
1318 if not re.match(
r'^\s*#', fn)]
1319 if len(from_ifiles) == 0
and len(from_infilelist) == 0:
1323 return list(
set(from_ifiles + from_infilelist))
1327 Returns a dictionary with the the file_type (L0, L1A, L2, etc) and
1328 instrument for each file in the input list.
1332 'level 0':
'level 0',
1333 'level 1 browse data':
'l1brsgen',
1334 'level 1a':
'level 1a',
1335 'level 1b':
'level 1b',
1336 'level 1c':
'level 1c',
1339 'level 3 binned':
'l3bin'
1342 input_file_type_data = {}
1343 for inp_file
in input_files_list:
1349 file_type, file_instr = file_typer.get_file_type()
1356 if file_type.lower()
in converter:
1357 file_type = converter[file_type.lower()]
1358 input_file_type_data[inp_file] = (file_type, file_instr.lower())
1362 warn_msg =
"Warning: Unable to determine a type for file {0}. It will not be processed.".format(inp_file)
1364 logging.info(warn_msg)
1365 return input_file_type_data
1369 Create processor objects for products which are needed, but not explicitly
1370 specified in the par file.
1372 existing_products = [proc.target_type
for proc
in existing_procs]
1374 lowest_source_level)
1375 intermediate_processors = []
1376 for prod
in intermediate_products:
1379 if not prod
in existing_products:
1380 new_proc = processor.Processor(sensor, rules, prod, {},
1381 cfg_data.hidden_dir)
1382 intermediate_processors.append(new_proc)
1383 return intermediate_processors
1386 lowest_source_level):
1388 Find products which are needed, but not explicitly specified by the
1392 for prog
in existing_prod_names:
1394 lowest_source_level)
1395 if not isinstance(candidate_progs,
type(
None)):
1396 for candidate_prog
in candidate_progs:
1397 required_progs.append(candidate_prog)
1399 required_progs.sort()
1400 return required_progs
1404 Returns the extension for an L2 file. For the time being, this is
1405 just '.L2'; however, different extensions may be wanted in the future, thus
1406 this function is in place.
1412 Returns the extension for an L3 Binned file. For the time being, this is
1413 just '.L3bin'; however, different extensions may be wanted in the future,
1414 thus this function is in place.
1420 Find the level of the lowest level source file to be processed.
1422 order = [
'level 1a',
'geo',
'level 1b',
'l2gen',
1423 'l2bin',
'l3mapgen']
1424 if len(source_files) == 1:
1425 return list(source_files.keys())[0]
1427 lowest =
list(source_files.keys())[0]
1428 for key
in list(source_files.keys())[1:]:
1430 if order.index(key) < order.index(lowest):
1436 Extract the options for a program to be run from the corresponding data in
1440 for key
in par_data:
1441 if key !=
'ofile' and key !=
'odir' and not key.lower()
in FILE_USE_OPTS:
1443 options.append(
'{}={}'.format(key,par_data[key]))
1450 Determine what the output name would be if targ_prog is run on input_files.
1452 cl_opts = optparse.Values()
1453 cl_opts.suite = suite
1454 cl_opts.oformat = oformt
1455 cl_opts.resolution = res
1456 if not isinstance(inp_files, list):
1467 Determine the output name for a program to be run.
1471 if input_name
in input_files:
1472 if input_files[input_name][0] ==
'level 0' and \
1473 input_files[input_name][1].find(
'modis') != -1:
1474 if input_files[input_name][1].find(
'aqua') != -1:
1479 if os.path.exists(input_name +
'.const'):
1480 with open(input_name +
'.const')
as constructor_file:
1481 constructor_data = constructor_file.readlines()
1482 for line
in constructor_data:
1483 if line.find(
'starttime=') != -1:
1484 start_time = line[line.find(
'=') + 1].strip()
1486 time_stamp = ProcUtils.date_convert(start_time,
't',
'j')
1488 if re.match(
r'MOD00.P\d\d\d\d\d\d\d\.\d\d\d\d', input_name):
1489 time_stamp = input_name[7:14] + input_name[15:19] +
'00'
1491 err_msg =
"Cannot determine time stamp for input file {0}".\
1494 output_name = first_char + time_stamp +
'.L1A'
1497 (dirname, basename) = os.path.split(input_name)
1498 basename_parts = basename.rsplit(
'.', 2)
1499 output_name = os.path.join(dirname, basename_parts[0] +
'.' +
1502 (dirname, basename) = os.path.split(input_name)
1503 basename_parts = basename.rsplit(
'.', 2)
1504 output_name = os.path.join(dirname, basename_parts[0] +
'.' + suffix)
1509 Return the contents of the input "par" file.
1511 acceptable_par_keys = {
1512 'level 0' :
'level 0',
'l0' :
'level 0',
1513 'level 1a' :
'level 1a',
'l1a' :
'level 1a',
'l1agen':
'level 1a',
1514 'modis_L1A':
'level 1a',
1515 'l1brsgen':
'l1brsgen',
1517 'l1aextract':
'l1aextract',
1518 'l1aextract_modis':
'l1aextract_modis',
1519 'l1aextract_seawifs' :
'l1aextract_seawifs',
1520 'l1aextract_viirs' :
'l1aextract_viirs',
1521 'l1brsgen' :
'l1brsgen',
1522 'geo' :
'geo',
'modis_GEO':
'geo',
'geolocate_viirs':
'geo',
1523 'geolocate_hawkeye':
'geo',
1524 'level 1b' :
'level 1b',
'l1b' :
'level 1b',
'l1bgen' :
'level 1b',
1525 'modis_L1B':
'level 1b',
'calibrate_viirs':
'level 1b',
1526 'level 2' :
'l2gen',
1529 'l2brsgen' :
'l2brsgen',
1530 'l2extract' :
'l2extract',
1533 'l3mapgen' :
'l3mapgen',
1537 if cfg_data.verbose:
1538 print (
"Processing %s" % par_file)
1539 par_reader = uber_par_file_reader.ParReader(par_file,
1540 acceptable_single_keys,
1541 acceptable_par_keys)
1542 par_contents = par_reader.read_par_file()
1543 ori_keys =
list(par_contents.keys())
1544 for key
in ori_keys:
1545 if key
in acceptable_par_keys:
1546 if key != acceptable_par_keys[key]:
1547 par_contents[acceptable_par_keys[key]] = par_contents[key]
1548 del par_contents[key]
1550 acc_key_str =
', '.join(
list(acceptable_par_keys.keys()))
1551 err_msg =
"""Error! Parameter file {0} contains a section titled "{1}", which is not a recognized program.
1552 The recognized programs are: {2}""".format(par_file, key, acc_key_str)
1555 if 'main' in par_contents:
1558 err_msg =
'Error! Could not find section "main" in {0}'.format(par_file)
1560 return par_contents, input_files_list
1564 Determine the processors which are needed.
1567 for key
in list(par_contents.keys()):
1570 proc = processor.Processor(sensor, rules, key, section_contents,
1571 cfg_data.hidden_dir)
1572 processors.append(proc)
1576 lowest_source_level)
1582 Execute the processor.
1584 if proc.out_directory == cfg_data.hidden_dir:
1585 proc.out_directory = cfg_data.output_dir
1586 if proc.requires_batch_processing()
and cfg_data.combine_files:
1587 logging.debug(
'Performing batch processing for ' +
str(proc))
1590 return out_file,
False
1592 if proc.rule_set.rules[proc.target_type].action:
1593 logging.debug(
'Performing nonbatch processing for ' +
str(proc))
1602 if success_count == 0:
1603 print(
'The {0} processor produced no output files.'.format(proc.target_type), flush=
True)
1604 msg =
'The {0} processor produced no output files.'.format(proc.target_type)
1607 return out_file, used_exsiting
1609 msg =
'-I- There is no way to create {0} files for {1}.'.format(proc.target_type, proc.instrument)
1614 Determine how to chain the processors together.
1616 order = [
'level 0',
'level 1a',
'level 1c',
'geo',
'l1aextract',
1617 'level 1b',
'l1brsgen',
'l2gen',
'l2extract',
'l2brsgen',
1618 'l2bin',
'l3mapgen']
1619 key_list =
list(par_contents.keys())
1620 last_key = key_list[-1]
1621 if 'l2extract' in key_list:
1622 if not (
'l2gen' in key_list):
1623 pos = key_list.index(
'l2extract')
1624 key_list.insert(pos,
'l2gen')
1625 items =
list(par_contents.items())
1626 items.insert(pos, (
'l2gen', {}))
1627 par_contents = dict(items)
1628 elif 'l2brsgen' in key_list:
1629 if not (
'l2gen' in key_list):
1630 pos = key_list.index(
'l2brsgen')
1631 key_list.insert(pos,
'l2gen')
1632 items =
list(par_contents.items())
1633 items.insert(pos, (
'l2gen', {}))
1634 par_contents = dict(items)
1635 elif 'l2bin' in key_list:
1637 if not (
'l2gen' in key_list):
1638 pos = key_list.index(
'l2bin')
1639 key_list.insert(pos,
'l2gen')
1640 items =
list(par_contents.items())
1641 items.insert(pos, (
'l2gen', {}))
1642 par_contents = dict(items)
1643 elif 'l3mapgen' in key_list:
1644 if not (
'l2bin' in key_list):
1645 pos = key_list.index(
'l3mapgen')
1646 key_list.insert(pos,
'l2bin')
1647 items =
list(par_contents.items())
1648 items.insert(pos, (
'l2bin', {}))
1649 par_contents = dict(items)
1651 if not (
'l2gen' in key_list):
1652 pos = key_list.index(
'l2bin')
1653 key_list.insert(pos,
'l2gen')
1654 items =
list(par_contents.items())
1655 items.insert(pos, (
'l2gen', {}))
1656 par_contents = dict(items)
1657 for key
in key_list:
1660 if not order.index(key) > order.index(
'l2gen'):
1661 src_lvls =
list(src_files.keys())
1662 if not key
in src_files:
1664 for src_lvl
in src_lvls:
1665 if order.index(src_lvl) < order.index(
'l2gen'):
1666 for file
in src_files[src_lvl]:
1668 instrument = file_typer.get_file_type()[1].lower().split()[0]
1670 logging.debug(
"instrument: " + instrument)
1671 if instrument
in sensors_sets:
1672 rules = sensors_sets[instrument].recipe
1673 sensor = sensors_sets[instrument]
1675 rules = sensors_sets[
'general'].recipe
1676 sensor = sensors_sets[
'general']
1677 proc = processor.Processor(sensor, rules, key, section_contents,
1678 cfg_data.hidden_dir)
1679 proc.input_file = file
1680 if file_typer.get_file_type()[0].lower().find(
'level 0') == -1:
1681 if sensor.require_geo
and key !=
'geo':
1682 if 'geo' in src_lvls:
1683 proc.geo_file = src_files[
'geo'][0]
1685 proc.geo_file =
find_geo_file2(proc.input_file, instrument, src_lvl)
1687 if not proc.geo_file:
1688 if src_lvl.find(
'level 1b') != -1:
1689 err_msg =
'Error! Cannot produce GEO file for {0}, Need level 1a file'.format(file)
1691 logging.debug(err_msg , flush=
True)
1693 print (
'Skipping processing on file {0}.'.format(file), flush=
True)
1695 log_msg =
'Skipping processing on file {0}.'.format(file)
1696 logging.debug(log_msg)
1699 proc_geo = processor.Processor(sensor, rules,
'geo', {},
1700 cfg_data.hidden_dir)
1701 proc_geo.input_file = file
1702 print (
'Running geo on file {0}.'.format(file), flush=
True)
1704 log_msg =
'Processing for geo:'
1705 logging.debug(log_msg)
1706 proc.geo_file, used_existing =
exe_processor(proc_geo, src_files, src_lvl)
1707 if not proc.geo_file:
1708 print (
'Skipping processing on file {0}.'.format(file), flush=
True)
1710 log_msg =
'Skipping processing on file {0}.'.format(file)
1711 logging.debug(log_msg)
1713 if cfg_data.deletefiles
and not used_existing:
1715 files_to_delete.append(proc.geo_file)
1717 print (
'Used existing geo file {0}.'.format(proc.geo_file))
1719 log_msg =
'Used existing geo file {0}.'.format(proc.geo_file)
1720 logging.debug(log_msg)
1721 if key ==
'l2gen' and sensor.require_l1b_for_l2gen
and src_lvl.find(
'level 1b') == -1:
1722 proc_l1b = processor.Processor(sensor, rules,
'level 1b', {},
1723 cfg_data.hidden_dir)
1724 if sensor.require_geo:
1725 proc_l1b.input_file = file
1726 proc_l1b.geo_file = proc.geo_file
1727 print (
'Running level 1b on file {0}.'.format(file), flush=
True)
1729 log_msg =
'Processing for level 1b:'
1730 logging.debug(log_msg)
1731 proc.input_file, used_existing =
exe_processor(proc_l1b, src_files, src_lvl)
1732 if not proc.input_file:
1733 print (
'Skipping processing on file {0}.'.format(file), flush=
True)
1735 log_msg =
'Skipping processing on file {0}.'.format(file)
1736 logging.debug(log_msg)
1738 if cfg_data.deletefiles
and not used_existing:
1740 files_to_delete.append(proc.input_file)
1742 print (
'Used existing level 1b file {0}.'.format(proc.input_file))
1744 log_msg =
'Used existing level 1b file {0}.'.format(proc.input_file)
1745 logging.debug(log_msg)
1747 file_input = proc.input_file
1748 print (
'Running {0} on file {1}.'.format(proc.target_type, proc.input_file), flush=
True)
1750 log_msg =
'Processing for {0}:'.format(proc.target_type)
1751 logging.debug(log_msg)
1752 out_file, used_existing =
exe_processor(proc, src_files, src_lvl)
1754 src_files[key].append(out_file)
1756 print (
'Skipping processing on file {0}.'.format(file_input, flush=
True))
1758 log_msg =
'Skipping processing on file {0}.'.format(file_input)
1759 logging.debug(log_msg)
1761 if cfg_data.deletefiles
and not used_existing
and key != last_key:
1762 if out_file
and key.find(
'brsgen') == -1:
1763 files_to_delete.append(out_file)
1765 print (
'Used existing file {0}.'.format(out_file))
1767 log_msg =
'Used existing file {0}.'.format(out_file)
1768 logging.debug(log_msg)
1769 if key.find(
'l1aextract') != -1:
1770 src_files[
'level 1a'] = src_files[key]
1771 del src_files[
'l1aextract']
1772 if 'geo' in src_files:
1773 del src_files[
'geo']
1774 src_lvls.remove(
'geo')
1777 if key !=
'level 1a':
1778 proc_l1a = processor.Processor(sensor, rules,
'level 1a', {},
1779 cfg_data.hidden_dir)
1780 proc_l1a.input_file = file
1781 print (
'Running level 1a on file {0}.'.format(file), flush=
True)
1783 log_msg =
'Processing for level 1a:'
1784 logging.debug(log_msg)
1785 proc.input_file, used_existing =
exe_processor(proc_l1a, src_files, src_lvl)
1786 if not proc.input_file:
1787 print (
'Skipping processing on file {0}.'.format(file), flush=
True)
1789 log_msg =
'Skipping processing on file {0}.'.format(file)
1790 logging.debug(log_msg)
1792 if cfg_data.deletefiles
and not used_existing:
1794 files_to_delete.append(proc.input_file)
1796 print (
'Used existing level 1a file {0}.'.format(proc.input_file))
1798 log_msg =
'Used existing level 1a file {0}.'.format(proc.input_file)
1799 logging.debug(log_msg)
1800 if proc.input_file
and sensor.require_geo
and key !=
'geo' and key !=
'level 1a':
1801 proc.geo_file =
find_geo_file2(proc.input_file, instrument,
'level 1a')
1802 if not proc.geo_file:
1803 proc_geo = processor.Processor(sensor, rules,
'geo', {},
1804 cfg_data.hidden_dir)
1805 proc_geo.input_file = proc.input_file
1806 print (
'Running geo on file {0}.'.format(proc.input_file), flush=
True)
1808 log_msg =
'Processing for geo:'
1809 logging.debug(log_msg)
1810 proc.geo_file, used_existing =
exe_processor(proc_geo, src_files, src_lvl)
1811 if not proc.geo_file:
1812 print (
'Skipping processing on file {0}.'.format(proc.input_file), flush=
True)
1814 log_msg =
'Skipping processing on file {0}.'.format(proc.input_file)
1815 logging.debug(log_msg)
1817 if cfg_data.deletefiles
and not used_existing:
1819 files_to_delete.append(proc.geo_file)
1821 print (
'Used existing geo file {0}.'.format(proc.geo_file))
1823 log_msg =
'Used existing geo file {0}.'.format(proc.geo_file)
1824 logging.debug(log_msg)
1825 if key ==
'l2gen' and sensor.require_l1b_for_l2gen:
1826 proc_l1b = processor.Processor(sensor, rules,
'level 1b', {},
1827 cfg_data.hidden_dir)
1828 if sensor.require_geo:
1829 proc_l1b.input_file = proc.input_file
1830 proc_l1b.geo_file = proc.geo_file
1831 print (
'Running level 1b on file {0}.'.format(proc.input_file), flush=
True)
1833 log_msg =
'Processing for level 1b:'
1834 logging.debug(log_msg)
1835 file_input = proc.input_file
1836 proc.input_file, used_existing =
exe_processor(proc_l1b, src_files, src_lvl)
1837 if not proc.input_file:
1838 print (
'Skipping processing on file {0}.'.format(file_input), flush=
True)
1840 log_msg =
'Skipping processing on file {0}.'.format(file_input)
1841 logging.debug(log_msg)
1843 if cfg_data.deletefiles
and not used_existing:
1845 files_to_delete.append(proc.input_file)
1847 print (
'Used existing level 1b file {0}.'.format(proc_l1b.input_file))
1849 log_msg =
'Used existing level 1b file {0}.'.format(proc_l1b.input_file)
1850 logging.debug(log_msg)
1852 file_input = proc.input_file
1853 print (
'Running {0} on file {1}.'.format(proc.target_type, proc.input_file), flush=
True)
1855 log_msg =
'Processing for {0}:'.format(proc.target_type)
1856 logging.debug(log_msg)
1857 out_file, used_existing =
exe_processor(proc, src_files, src_lvl)
1859 src_files[key].append(out_file)
1861 print (
'Skipping processing on file {0}.'.format(file_input), flush=
True)
1863 log_msg =
'Skipping processing on file {0}.'.format(file_input)
1864 logging.debug(log_msg)
1866 if cfg_data.deletefiles
and not used_existing
and key != last_key:
1867 if out_file
and key.find(
'brsgen') == -1:
1868 files_to_delete.append(out_file)
1870 print (
'Used existing file {0}.'.format(out_file))
1872 log_msg =
'Used existing file {0}.'.format(out_file)
1873 logging.debug(log_msg)
1874 if key.find(
'l1aextract') != -1:
1875 src_files[
'level 1a'] = src_files[key]
1876 del src_files[
'l1aextract']
1877 if 'geo' in src_files:
1878 del src_files[
'geo']
1879 src_lvls.remove(
'geo')
1881 if len(src_files) > 1
and key !=
'geo' :
1882 if key.find(
'brsgen') != -1:
1885 del src_files[src_lvl]
1886 if len(src_files) > 1:
1887 if 'geo' in src_lvls:
1888 del src_files[
'geo']
1889 src_lvls.remove(
'geo')
1891 src_lvls =
list(src_files.keys())
1892 rules = sensors_sets[
'general'].recipe
1893 sensor = sensors_sets[
'general']
1894 if not key
in src_files:
1896 for src_lvl
in src_lvls:
1897 if not order.index(src_lvl) < order.index(
'l2gen'):
1898 for file
in src_files[src_lvl]:
1899 proc = processor.Processor(sensor, rules, key, section_contents,
1900 cfg_data.hidden_dir)
1901 proc.input_file = file
1902 for program
in proc.required_types:
1903 if not program
in src_files:
1904 proc1 = processor.Processor(sensor, rules, program, {},
1905 cfg_data.hidden_dir)
1906 proc1.input_file = file
1908 for program2
in proc1.required_types:
1909 if program2.find(src_lvl) == -1:
1910 proc2 = processor.Processor(sensor, rules, program2, {},
1911 cfg_data.hidden_dir)
1912 proc2.input_file = file
1914 print (
'Running {0} on file {1}.'.format(proc2.target_type, proc2.input_file), flush=
True)
1916 log_msg =
'Processing for {0}:'.format(proc2.target_type)
1917 logging.debug(log_msg)
1918 proc1.input_file, used_existing =
exe_processor(proc2, src_files, src_lvl)
1919 if not proc1.input_file:
1920 print (
'Skipping processing on file {0}.'.format(file), flush=
True)
1922 log_msg =
'Skipping processing on file {0}.'.format(file)
1923 logging.debug(log_msg)
1925 if cfg_data.deletefiles
and not used_existing:
1926 if proc1.input_file:
1927 files_to_delete.append(proc1.input_file)
1929 print (
'Used existing file {0}.'.format(proc1.input_file))
1931 log_msg =
'Used existing file {0}.'.format(proc1.input_file)
1932 logging.debug(log_msg)
1933 if proc1.input_file:
1934 file_input = proc1.input_file
1935 print (
'Running {0} on file {1}.'.format(proc1.target_type, proc1.input_file), flush=
True)
1937 log_msg =
'Processing for {0}:'.format(proc1.target_type)
1938 logging.debug(log_msg)
1939 proc.input_file, used_existing =
exe_processor(proc1, src_files, src_lvl)
1940 if not proc.input_file:
1941 print (
'Skipping processing on file {0}.'.format(file_input), flush=
True)
1943 log_msg =
'Skipping processing on file {0}.'.format(file_input)
1944 logging.debug(log_msg)
1946 if cfg_data.deletefiles:
1947 if proc.input_file
and not used_existing:
1948 files_to_delete.append(proc.input_file)
1950 print (
'Used existing file {0}.'.format(proc.input_file))
1952 log_msg =
'Used existing file {0}.'.format(proc.input_file)
1953 logging.debug(log_msg)
1954 del src_files[src_lvl]
1956 file_input = proc.input_file
1957 if len(src_files[src_lvl]) > 1
and cfg_data.combine_files:
1958 print (
'Running {0} on a list of files including {1}.'.format(proc.target_type, proc.input_file), flush=
True)
1960 log_msg =
'Processing for {0}:'.format(proc.target_type)
1962 print (
'Running {0} on file {1}.'.format(proc.target_type, proc.input_file), flush=
True)
1964 log_msg =
'Processing for {0}:'.format(proc.target_type)
1965 logging.debug(log_msg)
1966 out_file, used_existing =
exe_processor(proc, src_files, program)
1968 src_files[key].append(out_file)
1970 print (
'Skipping processing on file {0}.'.format(file_input), flush=
True)
1972 log_msg =
'Skipping processing on file {0}.'.format(file_input)
1973 logging.debug(log_msg)
1976 if cfg_data.deletefiles
and not used_existing
and key != last_key:
1977 if out_file
and key.find(
'brsgen') == -1:
1978 files_to_delete.append(out_file)
1980 print (
'Used existing file {0}.'.format(out_file))
1982 log_msg =
'Used existing file {0}.'.format(out_file)
1983 logging.debug(log_msg)
1984 if program
in src_files:
1985 if proc.target_type.find(
'brsgen') != -1:
1986 del src_files[proc.target_type]
1988 if cfg_data.combine_files:
1989 del src_files[program]
1990 elif re.search(
"l2bin", proc.target_type)
and len(src_files[program]) == len(src_files[proc.target_type]):
1991 del src_files[program]
1996 if key.find(
'l2extract') != -1:
1997 src_files[
'l2gen'] = src_files[key]
1998 del src_files[
'l2extract']
1999 if proc.requires_batch_processing()
and cfg_data.combine_files:
2006 Returns the programs required too produce the desired final output.
2008 programs_to_run = []
2009 cur_rule = ruleset.rules[target_program]
2010 src_types = cur_rule.src_file_types
2011 if src_types[0] == cur_rule.target_type:
2012 programs_to_run = [target_program]
2014 for src_type
in src_types:
2015 if src_type
in ruleset.rules:
2016 if ruleset.order.index(src_type) > \
2017 ruleset.order.index(lowest_source_level):
2018 programs_to_run.insert(0, src_type)
2019 if len(src_types) > 1:
2020 programs_to_run.insert(0, src_types[1])
2022 lowest_source_level)
2023 for prog
in programs_to_add:
2024 programs_to_run.insert(0, prog)
2025 return programs_to_run
2029 :param source_files: list of source files
2030 :param proc_src_types: list of source types for the processor
2031 :param proc_src_ndx: index into the proc_src_types list pointing to the
2032 source type to use to get the input files
2033 :return: list of GEO files that correspond to the files in source_files
2035 inp_files = source_files[proc_src_types[proc_src_ndx]]
2037 for inp_file
in inp_files:
2040 geo_files.append(geo_file)
2042 err_msg =
'Error! Cannot find GEO ' \
2043 'file {0}.'.format(geo_file)
2049 Returns the set of source files needed.
2051 if len(proc_src_types) == 1:
2053 src_file_sets = source_files[src_key]
2064 err_msg =
'Error! Unable to determine what source files are required for the specified output files.'
2067 if requires_all_sources:
2068 if len(proc_src_types) == 2:
2069 if proc_src_types[0]
in source_files \
2070 and proc_src_types[1]
in source_files:
2071 src_file_sets =
list(zip(source_files[proc_src_types[0]],
2072 source_files[proc_src_types[1]]))
2074 if proc_src_types[0]
in source_files:
2075 if proc_src_types[1] ==
'geo':
2077 src_file_sets =
list(zip(source_files[proc_src_types[0]],
2080 err_msg =
'Error! Cannot find all {0} and' \
2081 ' {1} source files.'.format(proc_src_types[0],
2084 elif proc_src_types[1]
in source_files:
2085 if proc_src_types[0] ==
'geo':
2087 src_file_sets =
list(zip(source_files[proc_src_types[1]],
2090 err_msg =
'Error! Cannot find all {0} and' \
2091 ' {1} source files.'.format(proc_src_types[0],
2095 err_msg =
'Error! Cannot find all source files.'
2098 err_msg =
'Error! Encountered too many source file types.'
2101 for proc_src_type
in proc_src_types:
2102 if proc_src_type
in source_files:
2103 src_file_sets = source_files[proc_src_type]
2104 return src_file_sets
2108 Returns a dictionary containing the programs to be run (as keys) and the
2109 a list of files on which that program should be run.
2112 for file_path
in input_files:
2113 ftype = input_files[file_path][0]
2114 if ftype
in source_files:
2115 source_files[ftype].append(file_path)
2117 source_files[ftype] = [file_path]
2122 Return the list of source product types needed to produce the final product.
2124 src_prod_names = [targt_prod]
2125 targt_pos = ruleset.order.index(targt_prod)
2127 for pos
in range(targt_pos, 1, -1):
2128 for prod_name
in src_prod_names:
2129 if ruleset.rules[ruleset.order[pos]].target_type == prod_name:
2130 for src_typ
in ruleset.rules[ruleset.order[pos]].src_file_types:
2131 new_prod_names.append(src_typ)
2132 src_prod_names += new_prod_names
2133 return src_prod_names
2137 Returns an error message built from traceback data.
2139 exc_parts = [
str(l)
for l
in sys.exc_info()]
2140 err_type_parts =
str(exc_parts[0]).strip().split(
'.')
2141 err_type = err_type_parts[-1].strip(
"'>")
2142 tb_data = traceback.format_exc()
2143 tb_line = tb_data.splitlines()[-3]
2144 line_num = tb_line.split(
',')[1]
2145 st_data = traceback.extract_stack()
2146 err_file = os.path.basename(st_data[-1][0])
2147 msg =
'Error! The {0} program encountered an unrecoverable {1}, {2}, at {3} of {4}!'.\
2148 format(cfg_data.prog_name,
2149 err_type, exc_parts[1], line_num.strip(), err_file)
2156 sensors = dict(general=
Sensor(),
2167 Record error_msg in the debug log, then exit with error_msg going to stderr
2168 and an exit code of 1; see:
2169 http://docs.python.org/library/sys.html#exit.
2171 logging.info(error_msg)
2176 main processing function.
2184 version=
' '.join([
'%prog', __version__]))
2188 print (
"\nError! No file specified for processing.\n")
2189 cl_parser.print_help()
2201 options.verbose, options.overwrite,
2202 options.use_existing, options.combine_files,
2203 options.deletefiles, options.odir)
2204 if not os.access(cfg_data.hidden_dir, os.R_OK):
2205 log_and_exit(
"Error! The working directory is not readable!")
2206 if os.path.exists(args[0]):
2207 log_timestamp = datetime.datetime.today().strftime(
'%Y%m%d%H%M%S')
2230 err_msg =
'Unanticipated error encountered during processing!'
2233 err_msg =
'Error! Parameter file {0} does not exist.'.\
2241 Get arguments and options from the calling command line.
2242 To be consistent with other OBPG programs, an underscore ('_') is used for
2243 multiword options, instead of a dash ('-').
2245 cl_parser.add_option(
'--debug', action=
'store_true', dest=
'debug',
2246 default=
False, help=optparse.SUPPRESS_HELP)
2247 cl_parser.add_option(
'-d',
'--deletefiles', action=
'store_true',
2248 dest=
'deletefiles', default=
False,
2249 help=
'delete files created during processing')
2250 cl_parser.add_option(
'--ifile', action=
'store', type=
'string',
2251 dest=
'ifile', help=
"input file")
2252 cl_parser.add_option(
'--output_dir',
'--odir',
2253 action=
'store', type=
'string', dest=
'odir',
2254 help=
"user specified directory for output")
2255 cl_parser.add_option(
'--overwrite', action=
'store_true',
2256 dest=
'overwrite', default=
False,
2257 help=
'overwrite files which already exist (default = stop processing if file already exists)')
2263 cl_parser.add_option(
'--use_existing', action=
'store_true',
2264 dest=
'use_existing', default=
False,
2265 help=
'use files which already exist (default = stop processing if file already exists)')
2266 cl_parser.add_option(
'--combine_files', action=
'store_true',
2267 dest=
'combine_files', default=
True,
2268 help=
'combine files for l2bin instead of binning each file separately')
2269 cl_parser.add_option(
'-v',
'--verbose',
2270 action=
'store_true', dest=
'verbose', default=
False,
2271 help=
'print status messages to stdout')
2273 (options, args) = cl_parser.parse_args()
2274 for ndx, cl_arg
in enumerate(args):
2275 if cl_arg.startswith(
'par='):
2276 args[ndx] = cl_arg.lstrip(
'par=')
2277 if options.overwrite
and options.use_existing:
2278 log_and_exit(
'Error! Options overwrite and use_existing cannot be ' + \
2279 'used simultaneously.')
2280 return options, args
2284 Reads flf_name and returns the list of files to be processed.
2288 with open(flf_name,
'rt')
as flf:
2289 inp_lines = flf.readlines()
2290 for line
in inp_lines:
2291 fname = line.split(
'#')[0].strip()
2293 if os.path.exists(fname):
2294 files_list.append(fname)
2296 bad_lines.append(fname)
2297 if len(bad_lines) > 0:
2298 err_msg =
'Error! File {0} specified the following input files which could not be located:\n {1}'.\
2299 format(flf_name,
', '.join([bl
for bl
in bad_lines]))
2305 Run a processor, e.g. l2bin, which processes batches of files.
2308 if os.path.exists((file_set[0]))
and tarfile.is_tarfile(file_set[0]):
2309 processor.input_file = file_set[0]
2311 timestamp = time.strftime(
'%Y%m%d_%H%M%S', time.gmtime(time.time()))
2312 file_list_name = cfg_data.hidden_dir + os.sep +
'files_' + \
2313 processor.target_type +
'_' + timestamp +
'.lis'
2314 with open(file_list_name,
'wt')
as file_list:
2315 for fname
in file_set:
2316 file_list.write(fname +
'\n')
2317 processor.input_file = file_list_name
2320 for fspec
in file_set:
2322 data_file_list.append(dfile)
2323 if 'suite' in processor.par_data:
2324 finder_opts[
'suite'] = processor.par_data[
'suite']
2325 elif 'prod' in processor.par_data:
2326 finder_opts[
'suite'] = processor.par_data[
'prod']
2327 if 'resolution' in processor.par_data:
2328 finder_opts[
'resolution'] = processor.par_data[
'resolution']
2329 if 'oformat' in processor.par_data:
2330 finder_opts[
'oformat'] = processor.par_data[
'oformat']
2334 if processor.output_file:
2335 processor.output_file = os.path.join(processor.out_directory,
2336 processor.output_file )
2338 processor.output_file = os.path.join(processor.out_directory,
2340 processor.target_type,
2343 log_msg =
"Running {0} with input file {1} to generate {2} ".\
2344 format(processor.target_type,
2345 processor.input_file,
2346 processor.output_file)
2347 logging.debug(log_msg)
2349 return processor.output_file
2353 Run a processor which deals with single input files (or pairs of files in
2354 the case of MODIS L1B processing in which GEO files are also needed).
2362 used_existing =
False
2365 cl_opts = optparse.Values()
2366 if 'suite' in processor.par_data:
2367 cl_opts.suite = processor.par_data[
'suite']
2368 elif 'prod' in processor.par_data:
2369 cl_opts.suite = processor.par_data[
'prod']
2371 cl_opts.suite =
None
2372 if 'resolution' in processor.par_data:
2373 cl_opts.resolution = processor.par_data[
'resolution']
2375 cl_opts.resolution =
None
2376 if 'oformat' in processor.par_data:
2377 cl_opts.oformat = processor.par_data[
'oformat']
2379 cl_opts.oformat =
None
2383 if processor.output_file:
2384 output_file = os.path.join(processor.out_directory, processor.output_file)
2386 output_file = os.path.join(processor.out_directory,
2389 print (
'in run_nonbatch_processor, output_file = ' + output_file)
2391 processor.output_file = output_file
2399 if (
not os.path.exists(output_file))
or cfg_data.overwrite:
2400 if cfg_data.verbose:
2402 print (
'\nRunning ' +
str(processor))
2404 proc_status = processor.execute()
2410 msg =
"Error! Status {0} was returned during {1} {2} processing.".\
2411 format(proc_status, processor.sensor.name,
2412 processor.target_type)
2418 elif not cfg_data.use_existing:
2419 log_and_exit(
'Error! Target file {0} already exists.'.\
2420 format(output_file))
2422 used_existing =
True
2423 processor.input_file =
''
2424 processor.output_file =
''
2425 return output_file, used_existing
2429 Build the command to run the processing script which is passed in.
2432 args = [
'ifile={}'.format(proc.input_file),
'ofile={}'.format(proc.output_file)]
2437 logging.debug(
'\nRunning: {}'.format(
" ".join(
str(x)
for x
in cmd)))
2444 Opens log file(s) for debugging.
2446 info_log_name =
''.join([
'Processor_', time_stamp,
'.log'])
2447 debug_log_name =
''.join([
'multilevel_processor_debug_', time_stamp,
2449 info_log_path = os.path.join(cfg_data.output_dir, info_log_name)
2450 debug_log_path = os.path.join(cfg_data.output_dir, debug_log_name)
2451 mlp_logger = logging.getLogger()
2454 info_hndl = logging.FileHandler(info_log_path)
2455 info_hndl.setLevel(logging.INFO)
2456 mlp_logger.addHandler(info_hndl)
2459 debug_hndl = logging.FileHandler(debug_log_path)
2460 debug_hndl.setLevel(logging.DEBUG)
2461 mlp_logger.addHandler(debug_hndl)
2462 logging.debug(
'Starting ' + os.path.basename(sys.argv[0]) +
' at ' +
2463 datetime.datetime.now().strftime(
"%Y-%m-%d %H:%M:%S"))
2467 Returns a list with no duplicates. Somewhat borrowed from:
2468 http://www.peterbe.com/plog/uniqifiers-benchmark (example f5)
2472 for item
in orig_list:
2473 if item
not in seen_items:
2474 seen_items[item] = 1
2475 uniqified_list.append(item)
2476 return uniqified_list
2484 FILE_USE_OPTS = [
'deletefiles',
'overwrite',
'use_existing',
'combine_files']
2487 'l1brsgen':
'L1B_BRS',
2488 'l1aextract':
'L1A.sub',
2489 'l1aextract_viirs':
'L1A.sub',
2490 'l1aextract_seawifs':
'L1A.sub',
2491 'l1aextract_modis':
'L1A.sub',
2494 'l2brsgen':
'L2_BRS',
2495 'l2extract':
'L2.sub',
2501 'level 1b':
'L1B_LAC',
2504 input_file_data = {}
2507 if os.environ[
'OCSSWROOT']:
2508 OCSSWROOT_DIR = os.environ[
'OCSSWROOT']
2509 logging.debug(
'OCSSWROOT -> %s', OCSSWROOT_DIR)
2511 sys.exit(
'Error! Cannot find OCSSWROOT environment variable.')
2513 if __name__ ==
"__main__":