AxFoundation / strax

Stream analysis for xenon TPCs
BSD 3-Clause "New" or "Revised" License
28 stars 38 forks source link

Replace mailbox system #81

Open tunnell opened 6 years ago

tunnell commented 6 years ago

Mailbox should be factored out into an independent repository rather than using something else. Closes #62.

JelleAalbers commented 5 years ago

By now I've come to think the mailbox approach is fundamentally flawed. The original idea was to build a simple class where one function could push and another could iterate. But it has turned into a complicated piece of custom threading code, with lots of special cases, and always the specter of a hang or timeout if things go really wrong.

I'm now working on replacing the mailbox system with a single scheduler / work distributor. This will be a medium-term project of a few weeks or months most likely, since I can only work on it off and on. There is a possibility it won't work out. The mailbox system has worked well for us so far fortunately, but it does make debugging complicated problems (e.g. Joran's comment in #212), well, complicated.

tunnell commented 5 years ago

This might be a good project for a CS student, you think? At very least, can speak with some CS systems people who might have ideas on how to replace. Pivarski might also be a resource.

JelleAalbers commented 5 years ago

Indeed this would be an interesting project, especially to generalize it to a multi-machine system, perhaps using zeromq or something similar.

I've put what I have now here https://github.com/JelleAalbers/plarx

jmosbacher commented 4 years ago

Is this issue being worked on? if not I can take a look at it.

tunnell commented 4 years ago

Doubt it... It's a bit of a black hole that is ripe for overengineering, so be careful :) Be aware that the DAQ use case drives the performance requirements

jmosbacher commented 4 years ago

The Mailboxectomy

I made a quick review of the options we have for using 3rd party tooling for processing instead og the ThreadedMailboxProcessor. Here are my findings.

Frameworks looked at

Details

Apache Storm and its successor Heron

The topology of spouts and bolts are conceptually very similar to strax plugins, and have extremely battle tested implementations in python, they are mostly suitable for distibuted processing on large clusters and would not be suitable for a "one guy with a laptop" processing situation.

Apache Pulsar

If strax moved to a pub-sub configuration this would be suitable for the online processing mode of running, on a cluster of high performance machines. It boasts great throughput and reliability. Probably overkill for what xenonnt needs.

Wallaroo

Very interesting project, but implemented in a pre-version 1 language called pony, has a limited python API. May be worth revisiting for DARWIN.

Faust

Really nice project, built as a pure python alternative to the legendary java-based kafka streams. Its a framework for distributed stream processing based on kafka topics and ktables (distributed dictionaries backed by a change log topic on kafka, used for preserving state when stream processing). Kafka is known for its high throughput, eventual consitency guarentees, reliability and rich ecosystem. Its throughput is limited only by disk write speed since its essentially just writing a stream of bytes to disk and recording offsets. Using Faust can be done in two ways: 1) Implement generic "workers" that grab processing jobs off a kafka job topic and submit the results to another kafka topic, if the dependencies for a job are not yet met the job is simply returned to the pile of jobs to be processed later. Other workers grab processed data off the result topic and store the results in a distributed dictionary for consumption by jobs that have dependencies. Sunch an implementations would look something like this:


class DataKey(faust.Record, serializer='json'):
    '''
    The data uid should be unique to a single processing result in the pipeline
    '''
    # non-unique key used for sharding
    run_id: str  
    target: str

    # single unique key accross all datastore instances
    uid: str 

class DataChunk(faust.Record, serializer='pickle'):
    uid: str
    run_id: str
    data: np.ndarray

class ProcessRequest(faust.Record, serializer='pickle'):
    run_id: str
    inputs: dict
    outputs: dict
    plugin: strax.Plugin

app = faust.App(
    "strax_workers",
    broker='kafka://192.168.99.104',
    store='memory://',
    version=2,
    topic_partitions=1,
    table_partitions=1,
    producer_max_request_size=int(1e7),
    consumer_max_fetch_size=int(1e7),
    stream_buffer_maxsize=int(1e7), #16368,
    )

data_cache = app.Table("data_cache",
                        key_type=str, 
                        value_type=DataChunk,
                        partitions=1,
                        value_serializer='pickle',
    )

# data_requests = app.topic('data_requests', value_type=DataRequest)
process_requests = app.topic('process_requests', value_type=ProcessRequest)
processed_data = app.topic('processed_data', value_type=DataChunk)

@app.agent(processed_data, concurrency=2)
async def save(chunks: StreamT[DataChunk]) -> None:
    async for chunk in chunks:

        print(f"Saving chunk {chunk.uid}")
        data_cache[chunk.uid] = chunk
        print(data_cache.as_ansitable())

@app.agent(process_requests, concurrency=10)
async def processor(requests: StreamT[ProcessRequest]) -> AsyncIterable[DataChunk]:
    async for request in requests.group_by(ProcessRequest.run_id):
        #FIXME: add check if all outputs exist in cache 
        if any([any([uid not in data_cache for uid in deps])
                             for deps in request.inputs.values()]):
            # print("Delaying request for ",request.plugin.__class__.__name__, "requirements not met.")
            await process_requests.send(value=request)
            continue

        print("Processing request for ", request.plugin.__class__.__name__)
        kwargs = {kind: np.concatenate([data_cache[uid].data for uid in deps]) 
                                        for kind, deps in request.inputs.items()}
        plugin = request.plugin
        output = plugin.compute(**kwargs)
        if output is None:
            yield output
        if isinstance(output, dict):
            for k, v in output.items():
                chunk = DataChunk(
                    uid = request.outputs[k],
                    data = v
                )       
                await processed_data.send(value=chunk)
        else:
            chunk = DataChunk(
                    uid = list(request.outputs.values())[0],
                    data = output,
                )       
            await processed_data.send(value=chunk)
        print("Done processing request: ", request)
        yield output

async def generate_requests(ctx, run_ids, targets):
    plugins = set()
    for run_id, target in itertools.product(run_ids, targets):
        cmps = ctx.get_components(run_id,(target,))
        plugins = cmps.plugins
        for name, loader in cmps.loaders.items():
            data = np.concatenate(list(loader()))
            uid = ctx.key_for(run_id, name)
            chunk = DataChunk(
                    uid = uid,
                    data = data,
                )
            await processed_data.send(value=chunk)

    for plugin in list(plugins):
        inputs = {
                kind: [ ctx.key_for(run_id, dep) for dep in deps]
                            for kind, deps in plugin.dependencies_by_kind().items()}
        outputs ={ name: ctx.key_for(run_id, name) for name in plugin.provides }

        request = ProcessRequest(
                run_id = run_id,
                inputs = inputs,
                outputs = outputs,
                plugin = plugin,

            )
        await process_requests.send(value=request)

async def get_data(context_name, run_ids, targets, timeout: int = 100) -> np.ndarray:
    run_ids, targets = strax.to_str_tuple(run_ids), strax.to_str_tuple(targets)
    context = getattr(straxen.contexts, context_name)()

    await generate_requests(context, run_ids, targets)
    waiting_for = list(itertools.product(run_ids, targets))
    for _ in range(timeout):
        for run_id, target in waiting_for:
            uid = context.key_for(run_id, target)
            if uid in data_cache:
                print(f"{uid} finished")
                result = data_cache[uid]
                waiting_for.pop(result)
                yield result
        if waiting_for:
            print(data_cache.as_ansitable())
            print(f"Still waiting for {waiting_for}")

            await asyncio.sleep(1)
        else:
            break
    else:
        raise TimeoutError("Timeout was reached without receiving results.")

The main change required for such an implementation would be to enforce serializability of plugins even after initialization. It would be trivial to implement priority topics. Rechunking can be done at the job level. Sharding could be done based on the run_id key or some other key that ensures independence of each shard.

2) Go all in and use faust as a stream processing framework as it was intended to be used. This would require a serious change in the topology of our processing. We would just dump all the raw aquisitions from the DAQ directly into kafka topics. All data streams would be aligned by timestamp and processed independently by an army of workers managed by faust (including worker replacement policy etc.). Such an implementation would look something like this:


class DigitizerReadRequest(faust.Record):
    uid: str
    timestamp: datetime
    digitizers: list 
    metadata: dict
    digitizer: int

class ReadDAQRequest(faust.Record):
    run_id: str
    uid: str
    timestamp: datetime
    digitizers: list 
    metadata: dict

class Waveform(faust.Record):
    run_id: str
    uid: str
    timestamp: datetime
    pmts: list 
    metadata: dict
    data: np.ndarray

class Peaklet(faust.Record, serializer='pickle'):
    run_id: str
    uid: str
    timestamp: datetime
    metadata: dict

class Peak(faust.Record, serializer='pickle'):
    run_id: str
    uid: str
    timestamp: datetime
    metadata: dict

class Event(faust.Record, serializer='pickle'):
    run_id: str
    uid: str
    timestamp: datetime
    metadata: dict

app = faust.App(
    "xenon_processor",
    broker='kafka://192.168.99.104',
    store='memory://',
    version=1,
    producer_max_request_size=int(1e6),
    consumer_max_fetch_size=int(1e6),
    stream_buffer_maxsize=16368,
    )

@app.agent(concurrency=100, )
async def read_digitizer(requests: StreamT[DigitizerReadRequest]) -> AsyncIterable[Waveform]:
    # read the digitizer
    return Waveform()

read_request_topic = app.topic('read_requests', value_type=ReadDAQRequest)

# the waveform topic is just a kafka topic which is a stream 
# of waveforms that need to be sharded and then processed
# I would probably have these 
waveform_topic = app.topic('waveforms', value_type=Waveform)

# a table allows the waveform processing agents to keep state information withing a window
waveform_data = app.Table("waveform_data",
                        key_type=str, 
                        value_type=Waveform) \
                            .hopping(timedelta(microseconds=10), expires=timedelta(seconds=1),) \
                            .relative_to_field(Waveform.timestamp)

peaklet_topic = app.topic('peaklets', value_type=Peaklet)
peaklet_data = app.Table("peaklet_data", key_type=str, value_type=float, default=float)

peak_topic = app.topic('peaks', value_type=Peak)
peak_data = app.Table("peak_data", key_type=str, value_type=Peak) \
                            .tumbling(timedelta(seconds=1),expires=timedelta(hours=1))

event_topic = app.topic('events', value_type=Event)
event_data = app.Table("event_data", key_type=str, value_type=Event) \
                            .hopping(size=100,step=10,)

@app.agent(read_request_topic, concurrency=1, )
async def read_daq(requests: StreamT[Waveform]) -> AsyncIterable[Peaklet]:
    async for request in requests:
        # read daq async since most likely io bound
        async for wf in read_digitizer.map(request.digitizers):
            await waveform_topic.send(key=request.uid, value=wf)

def add_metdata(wf):
    #do some smoothing here
    return wf

@app.agent(waveform_topic, concurrency=100, processors=[add_metdata])
async def make_peaklets(waveforms: StreamT[Waveform]) -> AsyncIterable[Peaklet]:
    async for wf in waveforms.group_by(Waveform.run_id):
        # do some processing and yield peaklets

        peaklets = []
        for peaklet in peaklets:
            await peaklet_topic.send(key=wf.run_id, value=peaklet)
            yield peaklet

@app.agent(peaklet_topic, concurrency=1, processors=[add_metdata])
async def count_peaks(peaklets: StreamT[Peaklet]) -> AsyncIterable[Peak]:
    async for peaklet in peaklets.group_by(Peaklet.run_id):
        # do some processing and yield peaklets
        peaklet_data["peaklet_count"] += 1
        peaklet_data["peaklets_per_ms"] = (peaklet_data["peaklet_count"].now()-peaklet_data["peaklet_count"].delta(timedelta(milliseconds=5)))/5
        if peaklet_data["peaklets_per_ms"].delta(timedelta(milliseconds=5)) > peaklet_data["peaklets_per_ms"].now():
            print("nearing end of peaklet train")

@app.agent(peaklet_topic, concurrency=100, processors=[add_metdata])
async def make_peaks(peaklets: StreamT[Peaklet]) -> AsyncIterable[Peak]:
    async for peaklet_batch in peaklets.group_by(Peaklet.run_id).take(max=100, within=0.05):
        peaks = []
        if len(peaklet_batch)>10:
            peak = Peak()
            await peak_topic.send(key=peak.run_id, value=peak)
            yield peak

@app.agent(peak_topic, concurrency=100, processors=[add_metdata])
async def make_events(peaks: StreamT[Peak]) -> AsyncIterable[Event]:
    async for peak_batch in peaks.group_by(Waveform.run_id).take(100, within=1.0):
        # do some processing and yield event or not
        if len(peak_batch)>10:
            event = Event()
            await event_topic.send(key=event.run_id, value=event)
            yield event

This would require a complete shift in the way we think about runs and processing. It would require anyone working on this to get intimately familiar with async programming, eventually consistent distributed systems and stream processing. There are huge benefits to be gained by working this way but also a great amount of effort. The stream processors here are acting on sharded streams and tables, making the code here a bit more complex, best to leave this to a telecon to go over it and explain.

Ray

This is a fascinating project and in my opinion the best fit for strax. It essentially combines an efficient shared object storage (for interprocess communication ) with a pretty smart centralized worker manager. It can easily scale to as many processors on as many machines as you can through at it (i think the maximum tested was 800 but it should theoretically be able to controll more) and has excellent tooling including a web based dashboard and deterministic replay for debugging. It leans heavily on the actor model, and you have two building blocks to define your computation: remote functions (stateless) and remote actors (statefull) that you submit work to and recieve a future back. You then use those futures as arguments to other remote functions and actors. Ray will only execute a remote function once all the futures passed to it as arguments are ready and then it will pass the result seamlessly to the remote function in one of its workers. Ray also provide a multiprocessing.Pool API and a stream API that are wrappers around this core functionality. There are three possible implementations using ray: 1) Use the ray Pool API to implement the concurrent futures Executor API and simply plug the executor into to strax as is. This is technicially not a mailboxectomy but it would solve the scaling limitation of using threads and processes locally. such and implementation of the executor API is almost trivial, it would look something like this:

from concurrent.futures import Executor, Future
import ray

class FutureRay(Future):
    def __init__(self, result=None):
        self._result = result

    def result(self, timeout=None):
        if self._result is not None:
            return self._result.get()

    def cancel(self):
        pass

    def cancelled(self):
        pass

    def running(self):
        pass

    def done(self):
        pass

    def exception(self, timeout=None):
        pass

    def add_done_callback(self, fn):
        pass

class Exray(Executor):
    def __init__(self):
        if not ray.initialized():
            ray.init()
        self._pool = ray.utils.Pool()

    def submit(self, fn, *args, **kwargs):
        fut = FutureRay()
        res = self._pool.apply_async(fn, *args, **kwargs, callback=fut.set_result, exception_callback=fut.set_exception)
        fut._result = res
        return fut

    def map(self, func, *iterables, timeout=None, chunksize=1):
        # executor API callback guarentees may be a bit tricky
        # translation probably a bit more complex but doable
        for res in self._pool.imap_unordered(func, *iterables):
            yield res

    def shutdown(self):
        self._pool = None

Where each of these methods would have to be implemented in a robust manner.

2) Add a method to the context that sets up a ray processing topology on request for a given plugin and run_id. This would look like:

def get_array(self, run_id: ty.Union[str, tuple, list],
                targets, save=tuple(), max_workers=None,
                **kwargs) -> np.ndarray:
    targets = strax.to_str_tuple(targets)
    run_ids = strax.to_str_tuple(run_id)
    if not ray.is_initialized():
        ray.init()

    cmps = self.get_components(run_id, targets)
    futures = {}

    # just for now preload all cached stuff
    for name, loader in cmps.loaders.items():
        futures[name] = concatenate.remote(*list(loader()))

    class FakePlugin(strax.Plugin):
        __name__ = "Fake"
        def __init__(self, **kwargs):
            for k, v in kwargs.items():
                setattr(self, k, v)
    to_add = ["run_id", "lineage", "config", "dtype", "data_kind", "provides", "depends_on"]
    actors = {}

    for name, plugin in cmps.plugins.items():
        # generate a single actor for each plugin
        # Here i create one but we can create multiple actors
        # if multiprocessing enabled for plugin and do a round robin on chunks
        actors[name] = ray.remote(plugin.__class__).remote()
        #--------------------------------------------------------------------------------#
        # Do some workarounds for plugins being heavily dependent on existing processor

        # set values on actor that are set by the get_components method.
        actors[name].set_values.remote({
            "deps": {k:FakePlugin(**{attr:getattr(v,attr) for attr in to_add}) for k,v in plugin.deps.items()},
            "provides": strax.to_str_tuple(plugin.provides),
            "depends_on": strax.to_str_tuple(plugin.depends_on),
        })
        actors[name].set_values.remote(
            {attr:getattr(plugin,attr) for attr in to_add}
        )

        # Run the setup method before we submit the compute method to the actor
        actors[name].setup.remote()

    start = time.time()
    remaining = list(cmps.plugins)
    for _ in range(len(cmps.plugins)**2): 
        if not remaining:
            break
        for name in remaining:
            plugin = cmps.plugins[name]
            plugin.provides = strax.to_str_tuple(plugin.provides)

            # if futures exist for all dependencies,
            # submit job to appropriate actor and add to future dict.
            if all([dep in futures for dep in strax.to_str_tuple(plugin.depends_on)]):
                print(f"can compute {name}")

                # collect dependency futures 
                kwargs = {}
                for kind, ps in plugin.dependencies_by_kind().items():
                    print(f"Dependencies: {kind}: {ps} met")
                    if len(ps)==1:
                        kwargs[kind] = futures[ps[0]]
                    else:
                        kwargs[kind] = merge.remote(*[futures[p] for p in ps])
                print(f"Provides: {plugin.provides}\n\n")

                # call the do_compute method on the actor with its dependencies as futures
                future = actors[name].do_compute.remote(**kwargs)
                if plugin.multi_output:
                    for p in plugin.provides:
                        if p not in futures:
                            futures[p] = extract.remote(future, p)
                else:
                    if name not in futures:
                        futures[name] = future
                print(f"submited {name}")
                remaining.remove(name)

    result = ray.get(futures[targets[0]])
    futures = None
    actors = None
    print(f"Took {time.time()-start}, No mailboxes were harmed in this experiment.")
    return result

This is actually a working example. It can be generalized quite easily to scale to any number of processors/machines.

3) use the ray streaming API. This can be an elegant implementation of the "get_iter" topology in strax. It would look something like this:


from ray.streaming.streaming import Environment

env = Environment()
env.set_parallelism(2) # Each operator will be executed by two actors
source = LoaderSource([])

# Build a processing pipeline and apply it to a loader or a DAQ reader source

streams = {
    # add a source stream that reads from the DAQ in a round robin
    "raw_records": env.source(source) \
                        .round_robin() \
                        .key_by("run_id")
}

# setup the dataflow topology, assuming plugins are already ordered by dependencies
for plugin in plugins:
    # in practice we would merge multiple streams here with a gather operation
    stream = streams[plugin.depends_on] \  
                        .set_parallelism(plugin.parallelism) \
                        .flat_map(plugin.compute) # etc... 
    if plugin.multi_output:
        # apply a filter to the output for multi output streams
        for p in plugin.provides:                 
            streams[p] =  stream.filter(by_name(plugin.provides))
    else:
        streams[plugin.output] =  stream

start = time.time()
env_handle = env.execute()  # Deploys and executes the dataflow
ray.get(env_handle)  # Stay alive until execution finishes
env.wait_finish()
end = time.time()

This approach is very suitable for the online processing use case.

tunnell commented 4 years ago

The Faust approach looks like a big change, though I was always surprised that we didn't leverage async more so maybe we have a conceptual problem on our side. I agree that Ray looks more promising given current code. @JelleAalbers is the expert though.

I'm curious if @jpivarski has any experience with any of these, using frameworks to pass data to plugins used for signal processing and event reconstruction. We can ask other experiments and Python experts for advice, if you can formulate the question. Or to sanity check your implementation if you write it out as a proposal? I did a review long ago for pax or the event builder (forget), and things have clearly changed a lot since those days.

jpivarski commented 4 years ago

I haven't tried any of these projects directly. (For mailbox-like systems, akka is something I've used once or twice.)

@nsmith- has experience with a largish project using async everywhere (an async version of Uproot). Uproot itself uses functions as poor-man's futures (because I didn't want to glue to any particular framework or give up on Python 2 support).

tunnell commented 4 years ago

@nsmith mind if @jmosbacher contacts you for your expertise before we redo parts of XENONnT core software?

jpivarski commented 4 years ago

Okay, I can give some ideas if it's something that I know about.

JelleAalbers commented 4 years ago

Thanks for your research @jmosbacher! Let me first add a bit of background on mailboxes and strax generally. If we end up keeping mailboxes, I will add it to we already have in the docs.

Strax is made of multiple actors (plugins, loaders and savers) that produce or consume numpy arrays with some metadata.

Mailboxes receive the outputs of plugins and loaders and pass them to other plugins and savers. They do the following tasks:

Mailboxes have a couple of unit tests, are integrated with strax and we've used them for a while. However, there are several problems, arising mostly from the fact that each mailbox operates independently of the others:

jmosbacher commented 4 years ago

@JelleAalbers @tunnell I didnt mean to force a discussion here, I know everyone is super busy right now, I just put this up as a reference. Me and Jelle were planning on going over the options next week so he can decide whether its worth making changes at this late stage. In any case I think its fair to assume that any attempt at this would be done in parallel to current strax development and only merged if a working framework was produced that gets the same results with better performance/stability. I see no reason to replace a working system, I just want to be prepared for a scenario where the complexity starts getting out of hand and debugging without Jelles help becomes impossible.

nsmith- commented 4 years ago

I'm happy to comment where I can, but I may already be behind you after your thorough survey. From a far-away view, here are some random ideas:

jpivarski commented 4 years ago

This is function color: https://journal.stuffwithstuff.com/2015/02/01/what-color-is-your-function/

(At least, it's the article I think of when referring to "function color.")