Closed myamashita closed 3 years ago
Hi @myamashita! The aggregate
config options were removed with the release of ioos_qc=2.0
. As I was developing the changes to Config
to support time and spatial windows it didn't make much sense to me to include support for the aggregate
functions. It wasn't clear what the aggregate
functions would be aggregating, i.e. Just the time window? Just a single module of checks? All checks on a stream?
The concept of an aggregate, to me, seems like something that should happen after all lazy-evaluation has been completed (the checks have been run) and the results of each individual stream have computed. Below is an example of how I aggregate together checks from a PandasStore
, and I'm looking for suggestions on how to make this configurable. For now you will have to compute the aggregates yourself.
def apply_qc(df: pd.DataFrame, config: Config) -> pd.DataFrame:
# Setup the stream
stream = PandasStream(df)
# Run the tests
results = stream.run(config)
# Store the results in another DataFrame
store = PandasStore(
results,
axes={
't': 'time',
'z': 'z',
'y': 'lat',
'x': 'lon'
}
)
# Compute any aggregations
store.compute_aggregate(name='rollup_qc') # Appends to the results internally
# Write only the test results to the store
results_store = store.save(write_data=False, write_axes=False)
# Append columns from qc results back into the data
return pd.concat([df, results_store], axis=1)
Alternatively you can compute more complex aggregates directly:
def apply_qc(df: pd.DataFrame, config: Config) -> pd.DataFrame:
# Setup the stream
stream = PandasStream(df)
# Run the tests
results = stream.run(config)
# Store the results in another DataFrame
store = PandasStore(
results,
axes={
't': 'time',
'z': 'z',
'y': 'lat',
'x': 'lon'
}
)
# Compute specific aggregations on a subset of results
from ioos_qc.qartod import aggregate
agg = CollectedResult(
stream_id='',
package='qartod',
test='my_custom_aggregate',
function=aggregate,
results=aggregate([a list of CollectedResult objects subset from store.collected_results]) # https://github.com/ioos/ioos_qc/blob/c70565f91bb47d1375bef9b5325f1e8711386f8c/ioos_qc/results.py#L38-L39
)
store.collected_results.append(agg)
# Write only the test results to the store
results_store = store.save(write_data=False, write_axes=False)
# Append columns from qc results back into the data
return pd.concat([df, results_store], axis=1)
Thank you @kwilcox and @ocefpaf It works for me!
Hi, my name is Márcio, I work for Fugro as data processor in Metocean consultancy team.
I'm trying to reproduce the PandasStream example but I cannot figure out how to aggregate the flags.
Here an example: https://nbviewer.jupyter.org/gist/ocefpaf/6c17cbee3f1474510368207066597a3d