OB.DAAC Logo
NASA Logo
Ocean Color Science Software

ocssw V2022
MakeUnc.py
Go to the documentation of this file.
1 #! /usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 """
4 Created on Mon Nov 2 15:09:44 2015
5 script to run analytic uncertainty analysis of rrs data
6 @author: ekarakoy
7 """
8 import numpy as np
9 import netCDF4 as nc
10 import glob
11 import sys
12 import os
13 import re
14 import argparse
15 import logging
16 import multiprocessing as mp
17 import pickle
18 import shutil
19 from datetime import datetime as DT
20 
21 
22 class MakeUnc(object):
23  """
24  Class to get Rrs uncertainties for a given L2 granule. Includes methods to:
25  * get baseline from L2 granule
26  * calculate uncertainties (as rmse) from corresponding perturbed L2
27  files
28  * save uncertainty variables in original unperturbed granule
29  Inputs:
30  *args:
31  1- baselineFile
32  2- noisyDir -- directory where noisy files are located
33 
34 
35  """
36  def __init__(self, pArgs, parent_logger_name):
37  self.logger = logging.getLogger('%s.MakeUnc' % parent_logger_name)
38  self.silFile = pArgs.ifile
39  self.noisyDir = pArgs.npath
40  # Process options
41  self.doChla = pArgs.dochl
42  self.doNflh = pArgs.doflh
43  self.pSafe = pArgs.psafe
44  self.doSaniCheck = pArgs.sanity
45  self.rrsSilDict = dict.fromkeys(self.bands)
46  self.attrRrsUncDict = dict.fromkeys(self.bands)
47  self.dimsDict = dict.fromkeys(['rrs_unc_%s' % band for band in self.bands])
48  self.dTypeDict = dict.fromkeys(['rrs_unc_%s' % band for band in self.bands])
49  self.rrsUncArrDict = dict.fromkeys(self.bands)
50  if self.pSafe:
51  self._PlaySafe()
52  if self.doSaniCheck:
53  self.dimsDict.update(dict.fromkeys(['lt_unc_%s' % band for band in self.bands]))
54  self.dTypeDict.update(dict.fromkeys(['lt_unc_%s' % band for band in self.bands]))
55  self.ltUncArrDict = dict.fromkeys(self.bands)
56  self.ltSilDict = dict.fromkeys(self.bands)
57  self.attrLtUncDict = dict.fromkeys(self.bands)
58  otherProdkeys = []
59  if self.doChla:
60  self.logger.info('chl_a_unc will be included')
61  otherProdkeys.append('chlor_a')
62  otherProdkeys.append('chlor_a_unc')
63  if self.doNflh:
64  self.logger.info('nflh_unc will be included')
65  otherProdkeys.append('nflh')
66  otherProdkeys.append('nflh_unc')
67  if otherProdkeys:
68  self.otherProdsDict = dict.fromkeys(otherProdkeys)
69  attrUncKeys = [x for x in otherProdkeys if re.search('_unc', x)]
70  self.attrOtherProdUncDict = dict.fromkeys(attrUncKeys)
71  self.dTypeDict.update(dict.fromkeys(attrUncKeys))
72  self.dimsDict.update(dict.fromkeys(attrUncKeys))
73 
74  def _PlaySafe(self):
75  '''
76  Method to copy backup of unprocessed silent L2
77  This so as not to redo entire processing if a problem arises.
78  If a copy already exists, it is assumed this is not the first
79  processing attempt and the silent L2 is now tainted. It is removed and
80  a clean copy is generated from the backup.
81  '''
82  orig = self.silFile
83  cpy = self.silFile + '.cpy'
84  if os.path.exists(cpy): # not the first time - something's wrong
85  os.remove(orig) # remove "original - tainted file"
86  self.logger.info('%s already exists. Removing original %s' % (cpy,
87  orig)
88  )
89  shutil.copy2(cpy, orig)
90  self.logger.info('Copying silent from %s' % cpy)
91  else:
92  shutil.copy2(orig, cpy)
93  self.logger.info('No copy for %s. Generating copy' % self.silFile)
94 
95  def WriteToSilent(self):
96  # first create NC variables if necessary
97  # save unc in corresponding NC variables.
98  with nc.Dataset(self.silFile, 'r+') as dsSil:
99  geoGr = dsSil.groups['geophysical_data']
100  geoVar = geoGr.variables
101  for band in self.bands:
102  rrsUncProdName = 'Rrs_unc_%s' % band
103  if rrsUncProdName not in geoVar:
104  try:
105  varRrsUnc = geoGr.createVariable(rrsUncProdName,
106  self.dTypeDict['rrs_unc_%s' % band],
107  self.dimsDict['rrs_unc_%s' % band])
108  varRrsUnc.setncatts(self.attrRrsUncDict[band])
109  except KeyError: # skip nir bands
110  pass
111  else:
112  varRrsUnc = geoVar[rrsUncProdName]
113  varRrsUnc[:] = self.rrsUncArrDict[band]
114  if self.doSaniCheck:
115  ltUncProdName = 'Lt_unc_%s' % band
116  if ltUncProdName not in geoVar:
117  varLtUnc = geoGr.createVariable(ltUncProdName,
118  self.dTypeDict['lt_unc_%s' % band],
119  self.dimsDict['lt_unc_%s' % band])
120  varLtUnc.setncatts(self.attrLtUncDict[band])
121  else:
122  varLtUnc = geoVar[ltUncProdName]
123  varLtUnc[:] = self.ltUncArrDict[band]
124  if self.doChla:
125  if 'chlor_a_unc' not in geoVar:
126  varChloraUnc = geoGr.createVariable('chlor_a_unc',
127  self.dTypeDict['chlor_a_unc'],
128  self.dimsDict['chlor_a_unc'])
129  varChloraUnc.setncatts(self.attrOtherProdUncDict['chlor_a_unc'])
130  else:
131  varChloraUnc = geoVar['chlor_a_unc']
132  varChloraUnc[:] = self.otherProdsDict['chlor_a_unc']
133 
134  if self.doNflh:
135  if 'nflh_unc' not in geoVar:
136  self.logger.info('nflh_unc there; creating variable...')
137  varNflhUnc = geoGr.createVariable('nflh_unc',
138  self.otherProdsDict['nflh_unc'].dtype,
139  self.dimsDict['nflh_unc'])
140  varNflhUnc.setncatts(self.attrOtherProdUncDict['nflh_unc'])
141  else:
142  self.logger.info('nflh_unc available, using existing variable...')
143  varNflhUnc = geoVar['nflh_unc']
144  varNflhUnc[:] = self.otherProdsDict['nflh_unc']
145  # self.logger.info("%s Processing Complete" % baseLineFname)
146  return None
147 
148  def BuildUncs(self, noisySfx):
149  """"
150  Calculates rrs uncertainty as st.dev of rrs. Note that to save time
151  I use unperturbed rrs as the rrs baseline for the simulation
152  """
153  fBaseName = self.silFile.split('/')[-1].split('.')[0].split('_')[0]
154  matchFilPatt = os.path.join(self.noisyDir, '%s*%s*' % (fBaseName, noisySfx))
155  self.logger.info("searching for %s..." % matchFilPatt)
156  flis = glob.glob(matchFilPatt)
157  lflis = len(flis)
158  if lflis == 0:
159  self.logger.error('No files to process with pattern %s' % matchFilPatt)
160  sys.exit(1)
161  else:
162  self.logger.info("%d files to be processed" % lflis)
163  rrsAggDataDict = {band: np.zeros_like(self.rrsSilDict[band])
164  for band in self.rrsSilDict.keys()}
165  if self.doSaniCheck:
166  ltAggDataDict = {band: np.zeros_like(self.ltSilDict[band])
167  for band in self.ltSilDict.keys()}
168  if self.doChla:
169  chlAggDataArr = np.zeros_like(self.otherProdsDict['chlor_a'])
170  if self.doNflh:
171  nflhAggDataArr = np.zeros_like(self.otherProdsDict['nflh'])
172  # process noisy data
173  for fcount, fname in enumerate(flis):
174  prcDone = 100 * fcount/ (lflis - 1)
175  self.logger.info("Loading and reading %s -- %.1f%%" %
176  (fname, prcDone))
177  with nc.Dataset(fname) as nds:
178  nGeoGr = nds.groups['geophysical_data']
179  nGeoVar = nGeoGr.variables
180  for band in self.bands:
181  try:
182  noisyRrs = nGeoVar['Rrs_%s' % band][:]
183  rrsAggDataDict[band] += (noisyRrs -
184  self.rrsSilDict[band]) ** 2
185  except KeyError as e:
186  self.logger.error(e)
187  if self.doSaniCheck:
188  noisyLt = nGeoVar['Lt_%s' % band][:]
189  ltAggDataDict[band] += (noisyLt -
190  self.ltSilDict[band]) ** 2
191  if self.doChla:
192  noisyChl = nGeoVar['chlor_a'][:]
193  chlAggDataArr += (noisyChl - self.otherProdsDict['chlor_a']) ** 2
194  if self.doNflh:
195  noisyNflh = nGeoVar['nflh'][:]
196  nflhAggDataArr += (noisyNflh - self.otherProdsDict['nflh']) ** 2
197 
198  for band in self.bands:
199  self.logger.debug("computing deviation for band %s" % band)
200  try:
201  self.rrsUncArrDict[band] = np.ma.sqrt(rrsAggDataDict[band]/ lflis)
202  except KeyError: # band not here just skipping
203  pass
204  if self.doSaniCheck:
205  self.ltUncArrDict[band] = np.sqrt(ltAggDataDict[band]/ lflis)
206  self.logger.debug('running sanity check for band %s' % band)
207  if self.doChla:
208  self.otherProdsDict['chlor_a_unc'] = np.ma.sqrt(chlAggDataArr
209  / lflis)
210  self.logger.debug('computing deviation for chlor a')
211  if self.doNflh:
212  self.otherProdsDict['nflh_unc'] = np.ma.sqrt(nflhAggDataArr
213  / lflis)
214  self.logger.debug('computing deviation for nflh')
215  self.logger.info("\nProcessed %d files " % lflis)
216  return None
217 
218  def ReadFromSilent(self):
219  '''Reads Baseline file
220  Flags: l2bin default flags, namely ATMFAIL(1), LAND(2), HIGLINT(8),
221  HILT(16), HISATZEN(32), STRAYLIGHT(256), CLDICE(512),
222  COCCOLITH(1024), HISOLZEN(4096), LOWLW(16384), CHLFAIL(32768),
223  NAVWARN(65536), MAXAERITER(524288), CHLWARN(2097152),
224  ATMWARN(4194304), NAVFAIL(33554432), FILTER(67108864)
225  flagBits = 1 + 2 + 8 + 16 + 32 + 256 + 512 + 1024 + 4096 + 16384 +
226  32768 + 65536 + 524288 + 2097152 + 4194304 + 33554432 + 67108864
227  l2flags = geoVar['l2_flags'][:]
228  flagMaskArr = (l2flags & flagBits > 0)
229  '''
230  self.logger.debug('attemping to open silent file %s' % self.silFile)
231  with nc.Dataset(self.silFile, 'r') as dsSil:
232  geoGr = dsSil.groups['geophysical_data']
233  geoVar = geoGr.variables
234  for band in self.bands:
235  try:
236  rrs = geoVar['Rrs_%s' % band]
237  self.rrsSilDict[band] = rrs[:]
238  self.attrRrsUncDict[band] = {'long_name': 'Uncertainty in %s' % rrs.long_name,
239  '_FillValue': rrs._FillValue, 'units': rrs.units,
240  'scale_factor': rrs.scale_factor,
241  'add_offset': rrs.add_offset}
242  self.dimsDict['rrs_unc_%s' % band] = rrs.dimensions
243  self.dTypeDict['rrs_unc_%s' % band] = rrs.dtype
244  except KeyError as e:
245  self.logger.info("Rrs_%s not available, skipping" % band)
246  self.rrsSilDict.pop(band)
247  self.attrRrsUncDict.pop(band)
248  self.dimsDict.pop('rrs_unc_%s' % band)
249  self.dTypeDict.pop('rrs_unc_%s' % band)
250  if self.doSaniCheck:
251  self.logger.debug('setting up to run sanity check for band %s' % band)
252  lt = geoVar['Lt_%s' % band]
253  self.ltSilDict[band] = lt[:]
254  self.attrLtUncDict[band] = {'long_name': 'Uncertainty in %s' % lt.long_name,
255  '_FillValue': lt._FillValue, 'units': lt.units}
256  self.dimsDict['lt_unc_%s' % band] = lt.dimensions
257  self.dTypeDict['lt_unc_%s' % band] = lt.dtype
258  if self.doChla:
259  self.logger.debug('setting up to compute chla uncertainty')
260  chla = geoVar['chlor_a']
261  self.otherProdsDict['chlor_a'] = chla[:]
262  self.attrOtherProdUncDict['chlor_a_unc'] = {'long_name':
263  'Uncertainty in %s' % chla.long_name,
264  '_FillValue': chla._FillValue,
265  'units': chla.units,
266  'valid_min': 0}
267  self.dTypeDict['chlor_a_unc'] = chla.dtype
268  self.dimsDict['chlor_a_unc'] = chla.dimensions
269  if self.doNflh:
270  self.logger.debug('setting up to compute nflh uncertainty')
271  nflh = geoVar['nflh']
272  self.otherProdsDict['nflh'] = nflh[:]
273  self.attrOtherProdUncDict['nflh_unc'] = {'long_name':
274  'Uncertainty in %s' % nflh.long_name,
275  '_FillValue': nflh._FillValue,
276  'units': nflh.units,
277  'scale_factor': nflh.scale_factor,
278  'add_offset': nflh.add_offset}
279  self.dimsDict['nflh_unc'] = nflh.dimensions
280  self.dTypeDict['nflh_unc'] = nflh.dtype
281  return None
282 
283 
285  """Uncertainty subclass for SeaWiFS"""
286  def __init__(self, *args, **kwargs):
287  self.sensor = 'SeaWiFS'
288  self.bands = kwargs.pop("bands",
289  ['412', '443', '490', '510', '555', '670',
290  '765', '865'])
291  self.colDict = {'412': '#001166', '443': '#004488', '490': '#116688',
292  '510': '#228844', '555': '#667722', '670': '#aa2211',
293  '765': '#770500', '865': '#440000'}
294  super(MakeSwfUnc, self).__init__(*args)
295  return None
296 
297 
299  """Uncertainty engine for HMODISA"""
300  def __init__(self, *args, **kwargs):
301  self.sensor = 'HMODISA'
302  self.bands = kwargs.pop("bands",
303  ['412', '443', '488', '531', '547', '555',
304  '645', '667', '678', '748', '859', '869',
305  '1240', '1640', '2130'])
306  super(MakeHMA, self).__init__(*args)
307  self.colDict = {'412': '#001166', '443': '#004488', '488': '#1166FF',
308  '531': '#337722', '547': '#557733', '555': '#669922',
309  '645': '#883311', '667': '#aa2211', '678': '#dd3300'}
310  return None
311 
312 
313 def PathsGen(matchPattern, l2MainPath):
314  # create generator of l2 directory paths
315  l2PathsGen = glob.iglob(matchPattern)
316  spatt = re.compile('(S[0-9]+)')
317  for l2path in l2PathsGen:
318  if os.path.isdir(l2path):
319  basename = spatt.findall(l2path)[0]
320  l2Pa = os.path.join(l2MainPath, basename)
321  silFiPa = os.path.join(l2Pa, basename) + '_silent.L2'
322  noiDiPa = os.path.join(l2Pa, 'Noisy/')
323  else:
324  continue
325  yield [silFiPa, noiDiPa]
326 
327 
329  '''
330  Class to manage complete uncertainty generation; from processing of L1As to
331  creation of uncertainty from noisy L2 files, to the final packing of new
332  uncertainty products into the baseline L2.
333  '''
334 
335  def __init__(self, pArgs):
336  '''
337  Takes a directory containing L2 directories
338  '''
339  self.pArgs = pArgs
340  self.l2MainPath = pArgs.ipath
341  if self.pArgs.sensor == 'SeaWiFS':
342  self.matchPattern = os.path.join(self.l2MainPath, 'S*/')
343  return None
344 
345  def _BatchRun(self, sArgs):
346  ifile, npath = sArgs
347  uncObj = MakeSwfUnc(ifile, npath)
348  uncObj.ReadFromSilent()
349  uncObj.BuildUncs(self.pArgs.nsfx)
350  uncObj.WriteToSilent()
351  return uncObj.silFile
352 
353  def ProcessL2s(self):
354  paramGen = (params for params in PathsGen(self.matchPattern,
355  self.l2MainPath))
356  with mp.Pool() as pool:
357  results = pool.map(self._BatchRun, paramGen)
358  return results # temporary: should be replaced by log entry
359 
360 
362  parser = argparse.ArgumentParser(prog="MakeUnc")
363  parser.add_argument('-i', '--ifile', help='Initial L2 file path.',
364  type=str)
365  parser.add_argument('-j', '--ipath',
366  help='Main L2 path for batch processing.', type=str)
367  parser.add_argument('-n', '--npath', help='Path to noisy data directory.',
368  type=str)
369  parser.add_argument('-s', '--nsfx',
370  help='Noisy file suffix for pattern matching.',
371  type=str, default='_noisy_')
372  parser.add_argument('-c', '--dochl', help='Compute chloropyll uncertainty. Default is False',
373  action='store_true', default=False)
374  parser.add_argument('-f', '--doflh',
375  help='Compute normalized fluorescence line height. Default is False',
376  action='store_true', default=False)
377  parser.add_argument('-p', '--psafe', help='Back source file up. Default is False',
378  action='store_true', default=False)
379  parser.add_argument('-a', '--sanity', help='Do sanity check. Default is False',
380  action='store_true', default=False)
381  parser.add_argument('-b', '--batch', help='Batch processing option. Default is False',
382  action='store_true', default=False)
383  parser.add_argument('-w', '--workers',
384  help='Number of concurrent processes. Default is 1',
385  type=int, default=1)
386  parser.add_argument('-z', '--sensor',
387  help='Specify sensor data originates from. Default is SeaWiFS',
388  type=str, default='SeaWiFS')
389  parser.add_argument('-e', '--debug', help='increase output verbosity',
390  action='store_true', default=False)
391  parsedArgs = parser.parse_args(args)
392 
393  if ((not parsedArgs.ifile and not parsedArgs.batch) or
394  (parsedArgs.batch and not parsedArgs.ipath)):
395  parser.print_help()
396  sys.exit(1)
397 
398  return parsedArgs
399 
400 
401 def SetLogger(dbg=False):
402  '''
403  sets logger with more verbose format if dbg_lvl=True
404  '''
405  logger_name = 'MakeUNC_%s_T_%s' % (DT.date(DT.now()), DT.time(DT.now()))
406  logfn = '%s.log' % logger_name
407  logger = logging.getLogger(logger_name)
408  if dbg:
409  level = logging.DEBUG
410  formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s -'
411  + ' [%(module)s..%(funcName)s..%(lineno)d]'
412  + ' - %(message)s')
413  else:
414  level = logging.INFO
415  formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
416  logger.setLevel(level)
417  fh = logging.FileHandler(logfn)
418  fh.setLevel(level)
419  fh.setFormatter(formatter)
420  ch = logging.StreamHandler()
421  ch.setLevel(logging.WARNING)
422  ch.setFormatter(formatter)
423  logger.addHandler(ch)
424  logger.addHandler(fh)
425  logger.debug('logging')
426  return logger
427 
428 
429 def Main(argv):
430 
431  pArgs = ParseCommandLine(argv)
432  mainLogger = SetLogger(dbg=pArgs.debug)
433  if pArgs.batch:
434  # min. cmd line is ipath for main L2Path (all L2s should be in a
435  # common directory. ) and -b
436  mainLogger.info('Initializing batch processor')
437  bRunner = CBatchManager(pArgs)
438  res = bRunner.ProcessL2s()
439  pickle.dump(res, open('L2BatchList.pkl', 'wb'))
440  else:
441  baseLineFile = pArgs.ifile
442  noisyDataDir = pArgs.npath
443  noisySfx = pArgs.nsfx
444  baseLineFname = baseLineFile.split('/')[-1]
445  if noisyDataDir[-1] != '/':
446  noisyDataDir += '/'
447  if baseLineFname[0] == 'S':
448  mainLogger.info('processing SeaWiFS data')
449  uncObj = MakeSwfUnc(pArgs, mainLogger.name)
450  elif baseLineFname[0] == 'A':
451  mainLogger.info('processing MODISA data')
452  uncObj = MakeHMA(pArgs)
453  uncObj.ReadFromSilent()
454  uncObj.BuildUncs(noisySfx)
455  uncObj.WriteToSilent()
456 
457 if __name__ == "__main__":
458 
459  Main(sys.argv[1:])
def ReadFromSilent(self)
Definition: MakeUnc.py:218
def __init__(self, pArgs, parent_logger_name)
Definition: MakeUnc.py:36
def WriteToSilent(self)
Definition: MakeUnc.py:95
def __init__(self, *args, **kwargs)
Definition: MakeUnc.py:300
def Main(argv)
Definition: MakeUnc.py:429
def _BatchRun(self, sArgs)
Definition: MakeUnc.py:345
def PathsGen(matchPattern, l2MainPath)
Definition: MakeUnc.py:313
def __init__(self, *args, **kwargs)
Definition: MakeUnc.py:286
def SetLogger(dbg=False)
Definition: MakeUnc.py:401
def _PlaySafe(self)
Definition: MakeUnc.py:74
def ParseCommandLine(args)
Definition: MakeUnc.py:361
def BuildUncs(self, noisySfx)
Definition: MakeUnc.py:148
def ProcessL2s(self)
Definition: MakeUnc.py:353
def __init__(self, pArgs)
Definition: MakeUnc.py:335