OB.DAAC Logo
NASA Logo
Ocean Color Science Software

ocssw V2022
__init__.py
Go to the documentation of this file.
1 from ..metrics import performance
2 from ..utils import find_wavelength, ignore_warnings
3 from ..meta import get_sensor_bands
4 from ..transformers import TransformerPipeline, LogTransformer
5 from .utils import get_benchmark_models, GlobalRandomManager
6 
7 from collections import defaultdict as dd
8 from functools import partial, update_wrapper
9 
10 import numpy as np
11 import traceback
12 
13 
14 def get_models(wavelengths, sensor, product, debug=False, allow_opt=False, method=None, **kwargs):
15  ''' Retrieve all benchmark functions from the appropriate product
16  directory. Import each function with "model" in the function
17  name, ensure any necessary parameters have a default value
18  available, and test whether the function can be run with the
19  given wavelengths. A template folder for new algorithms is
20  available in the Benchmarks directory.
21  '''
22  valid = {}
23  sensor = sensor.split('-')[0] # remove any extra data which is used by the MDN
24  models = get_benchmark_models(product, allow_opt, debug, method)
25  kwargs.update({
26  'sensor' : sensor,
27  'wavelengths' : wavelengths,
28  'product' : product,
29  })
30 
31  # Gather all models which return an output with the available wavelengths
32  for name, model in models.items():
33  sample_input = np.ones((1, len(wavelengths)))
34  model_kwargs = dict(kwargs)
35 
36  # Add a set of optimization dummy parameters to check if wavelengths are valid for the method
37  if getattr(model, 'has_default', False):
38  model_kwargs.update( dict(zip(model.opt_vars, [1]*len(model.opt_vars))) )
39 
40  try:
41  output = model(sample_input, **model_kwargs)
42  assert(output is not None), f'Output for {name} is None'
43  assert(not isinstance(output, dict)), f'"{product}" not found in the outputs of {name}'
44  valid[name] = update_wrapper(partial(model, **kwargs), model)
45  except Exception as e:
46  if debug: print(f'Exception for function {name}: {e}\n{traceback.format_exc()}')
47  return valid
48 
49 
50 @ignore_warnings
51 def run_benchmarks(sensor, x_test, y_test=None, x_train=None, y_train=None, slices=None, args=None,
52  *, product='chl', bands=None, verbose=False,
53  return_rs=True, return_ml=False, return_opt=False,
54  kwargs_rs={}, kwargs_ml={}, kwargs_opt={}):
55 
56  def assert_same_features(a, b, label):
57  assert(a is None or b is None or a.shape[1] == b.shape[1]), \
58  f'Differing number of {label} features: {a.shape[1]} vs {b.shape[1]}'
59 
60  slices = slices or {p: slice(None) for p in np.atleast_1d(product)}
61  bands = np.array(get_sensor_bands(sensor, args) if bands is None else bands)
62  bench = dd(dict)
63 
64  # Ensure training / testing data have the same number of features, and the appropriate number of bands
65  assert_same_features(x_test, x_train, 'x')
66  assert_same_features(y_test, y_train, 'y')
67  assert_same_features(x_test, np.atleast_2d(bands), f'{sensor} band')
68 
69  if (return_ml or return_opt) and (x_train is None or y_train is None):
70  raise Exception('Training data must be passed to use ML/Opt models')
71 
72  # Set the avaliable products for each set of benchmarking methods
73  products_rs = ['chl', 'tss', 'cdom', 'a', 'aph', 'ap', 'ag', 'aph', 'adg', 'b', 'bbp']
74  products_ml = ['chl', 'tss', 'cdom']
75  products_opt = ['chl', 'tss', 'cdom']
76 
77  # Get the benchmark estimates for each product individually
78  for product in slices:
79 
80  kwargs_default = {
81  'bands' : bands,
82  'args' : args,
83  'sensor' : sensor,
84  'product' : product,
85  'verbose' : verbose,
86  'x_train' : x_train,
87  'x_test' : x_test,
88  'y_train' : y_train[:, slices[product]] if y_train is not None else None,
89  'y_test' : y_test[:, slices[product]] if y_test is not None else None,
90  }
91 
92  for bench_return, bench_products, bench_kwargs, bench_function in [
93  (return_rs, products_rs, dict(kwargs_rs ), _bench_rs ),
94  (return_ml, products_ml, dict(kwargs_ml ), _bench_ml ),
95  (return_opt, products_opt, dict(kwargs_opt), _bench_opt),
96  ]:
97  if bench_return and product in bench_products:
98  bench_kwargs.update(kwargs_default)
99  bench[product].update( bench_function(**bench_kwargs) )
100  return dict(bench)
101 
102 
103 def _create_estimates(model, inputs, postprocess=None, preprocess=None, **kwargs):
104  if postprocess is None: postprocess = lambda x: x
105  if preprocess is None: preprocess = lambda x: x
106 
107  model = preprocess(model) or model
108  outputs = getattr(model, 'predict', model)(inputs.copy())
109  estimates = postprocess(outputs.flatten()[:, None])
110 
111  if kwargs.get('verbose', False) and kwargs.get('y_test', None) is not None:
112  print( performance(model.__name__, kwargs['y_test'], estimates) )
113  return estimates
114 
115 
116 def _bench_rs(sensor, bands, x_test, product='chl', method=None, tol=15, allow_opt=False, **kwargs):
117  postps = lambda x: (np.copyto(x, np.nan, where=x < 0) or x) if product == 'chl' else x
118  create = lambda f: _create_estimates(f, x_test, postps, **kwargs)
119  models = get_models(bands, sensor, product, method=method, tol=tol, allow_opt=allow_opt)
120  return {name: create(model) for name, model in models.items()}
121 
122 
123 def _bench_opt(sensor, bands, x_train, y_train, *args, **kwargs):
124  preproc = lambda m: m.fit(x_train, y_train, bands)
125  estims = _bench_rs(sensor, bands, *args, allow_opt=True, preprocess=preproc, **kwargs)
126  return {f'{k}_opt': v for k, v in estims.items()}
127 
128 
129 def _bench_ml(sensor, x_train, y_train, x_test, *, x_other=None, verbose=False,
130  seed=42, bagging=True, gridsearch=False, scale=True, methods=None,
131  **kwargs):
132 
133  from sklearn.preprocessing import RobustScaler, MinMaxScaler
134  from sklearn.model_selection import GridSearchCV
135  from sklearn.multioutput import MultiOutputRegressor
136  from sklearn.ensemble import BaggingRegressor
137 
138  from .ML import models
139 
140  args = getattr(kwargs, 'args', None)
141  seed = getattr(args, 'seed', seed)
142 
143  gridsearch_kwargs = {'refit': False, 'scoring': 'neg_median_absolute_error'}
144  bagging_kwargs = {
145  'n_estimators' : getattr(args, 'n_rounds', 10),
146  'max_samples' : 0.75,
147  'bootstrap' : False,
148  'random_state' : seed,
149  }
150 
151  if len(y_train.shape) == 1: y_train = y_train[:, None]
152  valid = np.isfinite(x_train).all(-1) & np.isfinite(y_train).all(-1)
153  x_train = x_train[valid]
154  y_train = y_train[valid]
155 
156  if scale:
157  # x_scaler = TransformerPipeline([AUCTransformer(list(get_sensor_bands(sensor))), RobustScaler()])
158  x_scaler = TransformerPipeline([RobustScaler()])
159  y_scaler = TransformerPipeline([LogTransformer(), MinMaxScaler((-1, 1))])
160  x_scaler.fit(x_train)
161  y_scaler.fit(y_train)
162  x_test = x_scaler.transform(x_test)
163  x_train = x_scaler.transform(x_train)
164  y_train = y_scaler.transform(y_train)
165 
166  preprocess = lambda m: m.fit(x_train.copy(), y_train.copy())
167  postprocess = None if not scale else y_scaler.inverse_transform
168 
169  if verbose and gridsearch:
170  print('\nPerforming gridsearch...')
171 
172  if methods is None:
173  methods = list(models.keys())
174 
175  other = {}
176  estim = {}
177  for method, params in models.items():
178  if method not in methods: continue
179  methods.remove(method)
180 
181  params['grid']['random_state'] = params['default']['random_state'] = seed
182  model_kwargs = params['default']
183  model_class = params['class']
184  n_jobs = 1 if method == 'MDN' else 3
185 
186  if y_train.shape[1] > 1:
187  model_class = lambda *args, **kwargs: MultiOutputRegressor(params['class'](*args, **kwargs))
188 
189  with GlobalRandomManager(seed):
190  if gridsearch and method != 'SVM':
191  model = GridSearchCV(model_class(), params['grid'], n_jobs=n_jobs, **gridsearch_kwargs)
192  model.fit(x_train.copy(), y_train.copy())
193 
194  model_kwargs = model.best_params_
195  if verbose: print(f'Best {method} params: {model_kwargs}')
196 
197  model = model_class(**model_kwargs)
198  if bagging: model = BaggingRegressor(model, **bagging_kwargs)
199 
200  model.__name__ = method
201  estim[method] = _create_estimates(model, x_test, postprocess, preprocess, verbose=verbose, **kwargs)
202 
203  if x_other is not None:
204  other[method] = _create_estimates(model, x_other, postprocess)
205 
206  if len(methods):
207  print(f'Unknown ML benchmark methods requested: {methods}')
208 
209  if len(other):
210  return estim, other
211  return estim
list(APPEND LIBS ${PGSTK_LIBRARIES}) add_executable(atteph_info_modis atteph_info_modis.c) target_link_libraries(atteph_info_modis $
Definition: CMakeLists.txt:7
def get_benchmark_models(products, allow_opt=False, debug=False, method=None)
Definition: utils.py:63
def get_sensor_bands(sensor, args=None)
Definition: meta.py:114
def performance(key, y, y_hat, metrics=[mdsa, sspb, slope, msa, rmsle, mae, leqznan], csv=False)
Definition: metrics.py:208
def run_benchmarks(sensor, x_test, y_test=None, x_train=None, y_train=None, slices=None, args=None, *product='chl', bands=None, verbose=False, return_rs=True, return_ml=False, return_opt=False, kwargs_rs={}, kwargs_ml={}, kwargs_opt={})
Definition: __init__.py:51
int32_t model
Definition: atrem_corl1.h:132
def get_models(wavelengths, sensor, product, debug=False, allow_opt=False, method=None, **kwargs)
Definition: __init__.py:14