scipp / essreduce

Common functionality for ESS data reduction
https://scipp.github.io/essreduce/
BSD 3-Clause "New" or "Revised" License
1 stars 1 forks source link

Add streaming module #85

Closed SimonHeybrock closed 3 months ago

SimonHeybrock commented 3 months ago

This adds ess.reduce.streaming, mainly providing StreamProcessor. This is intended for use with Beamlime, processing chunks of detector and monitor event-data form a stream.

For context, this is how this could be used with a workflow from ESSsans:

%matplotlib widget
dummy = sc.DataArray(
    sc.zeros(dims=('Q',), shape=(100,), with_variances=True),
    coords={'Q': sc.linspace('Q', 0.0, 1.0, 101, unit='1/angstrom')},
)
fig = dummy.plot(norm='log', scale={'Q': 'log'}, vmin=0.1, vmax=5)
artist = next(iter(fig.artists))
display(fig)
from ess.reduce import streaming

streaming_wf = streaming.StreamProcessor(
    base_workflow=workflow,
    dynamic_keys=(
        NeXusMonitorEventData[SampleRun, Incident],
        NeXusDetectorEventData[SampleRun],
    ),
    target_keys=(IofQ[SampleRun],),
    accumulators={
        ReducedQ[SampleRun, Numerator]:streaming.RollingAccumulator(window=5),
        ReducedQ[SampleRun, Denominator]:streaming.RollingAccumulator(window=5),
    },
)

# This Loki data has different det and mon time scales
det_stride = 60
mon_stride = 1
for i in range(100):
    # assume we have loaded events manually somehow, this is not using an actual stream of events
    det_chunk = det_events[det_stride * i : det_stride * (i + 1)].copy()
    mon_chunk = mon_events[mon_stride * i : mon_stride * (i + 1)].copy()
    # simulate temporary loss of detector data
    if 20 < i < 30:
        det_chunk *= 0.0
    results = streaming_wf.add_chunk(
        {
            NeXusDetectorEventData[SampleRun]: det_chunk,
            NeXusMonitorEventData[SampleRun, Incident]: mon_chunk,
        }
    )
    fig.update({artist: results[IofQ[SampleRun]]})
    fig.fig.canvas.draw()
    fig.fig.canvas.flush_events()