Closed xiaojinghuang closed 8 years ago
Could explain in more detail one specific use case? I think this is a case where it's better to push the new data into a processing pipeline than "pull" new data by monitoring a status. On Mon, Sep 28, 2015 at 5:58 PM xiaojinghuang notifications@github.com wrote:
When a new scan starts, it will be great to be reflected with a status value (a PV?).
On-line data analysis tools need access to the running dataset during scan. Could you please provide a tool that can automatically check and read newly saved files?
— Reply to this email directly or view it on GitHub https://github.com/NSLS-II/wishlist/issues/61.
@xiaojinghuang I am reposting your reply here, which went directly to my email instead of posting to this GitHub issue
Hi Dan,
Thank you very much for your prompt reply.
The status is like a triggering signal, for instance a differential-phase contrast visualization program is idling, whenever a new scan starts, the program starts to grab the new data and process them to give image on-line. The same scheme will be applied to on-line ptychography reconstruction as well, the reconstruction program will automatically load newly saved data and feed it into phase-retrieval algorithm.
Xiaojing
Yes, in that case, this is something that we should implement using a subscription. Take a look at this section of the documentation and then we can discuss more.
Correct me if I'm wrong, but I don't believe we have a standardized way to push data from runs to an external processing pipeline (i.e., one residing in a separate process).
@arkilic today mentioned to me about the possibility for a caching layer / monitor-for-new-scans-and-events directly from mongodb using "capped collections." (Admittedly, I don't know too much of the details required to implement the above, perhaps @arkilic can comment himself.) I think such a layer could support either just notifying of when scans have been written to the db or pushing data into a processing pipeline as @danielballan seems to prefer - with no changes necessary to the collection side.
Interesting. Let's hear more about "capped collections." There is more than one urgent application for pushing data to external processes. And, happily, there are some external resources helping on this part of the project, so if we decide it's right model we should plan on it (and build it!).
@arkilic I have a mostly-abandoned PR that pushes Documents to an external process. I rebased it on current master and left it for you at NSLS-II/bluesky#211.
My model is that the Document "server" would be a callback that subscribes to the RunEngine on the main process. The Document "client" would have an instance of Dispatcher
(see bluesky/run_engine.py
). That way, if a user wants to switch from running callbacks on the local process to running them on a different process, they merely subscribe to the client's Disptacher instead of the RunEngine's Dispatcher.
That PR of mine does not solve the hard part of the problem -- how to manage potentially overflowing volume of Events. A lossy LIFO queue, or a ring buffer like "capped collections" seems to be, is the missing piece.
I optimistically put my code in a new subpackage, bluesky.dispatchers
because it would be cool to cook up several experiments for pushing Documents.
hrmmm... a standardized way of handling streaming...?
Could we handle the eiger streaming interface with the same layer? With control and metadata traveling through the regular daq stack but "live" data potentially multicast to go straight from detector to stream processing clients, multiple parallel pipelines in some cases, as well as to the regular daq stack? Or distribute stream processing across a cluster?
this stuff keeps coming up for amx/fmx.
On Mon, 28 Sep 2015, K Lauer wrote:
Correct me if I'm wrong, but I don't believe we have a standardized way to push data from runs to an external processing pipeline (i.e., one residing in a separate process).
@arkilic today mentioned to me about the possibility for a caching layer / monitor-for-new-scans-and-events directly from mongodb using "capped collections." (Admittedly, I don't know too much of the details required to implement the above, perhaps @arkilic can comment himself.) I think such a layer could support either just notifying of when scans have been written to the db or pushing data into a processing pipeline as @danielballan seems to prefer - with no changes necessary to the collection side.
I think @klauer has some relevant code, but it needs to be integrated.
Currently, our "streaming" processing code (bluesky/callbacks.py, bluesky/broker_callbacks.py, etc.) gets a stream of Event Documents and requests external data from filestore. That second step should be made modular so that the data could come directly from a PV instead. The downstream should not know/care whether the arrays are coming from filestore, PVs, or whatever.
@tacaswell and I are meeting in about half hour to go over some stuff and this is on top of my list. The capped collection stuff addresses some of the issues you mentioned. The purpose of using a capped collection is its ability to provide us with a “tailable” cursor(essentially a cursor that never disconnects after first query and new data is pushed from server side making it a monitor_my_query() routine).
You’re absolutely right, the capped collections are fixed sized lossy circular buffers. We would have to decide the size of the collection given buffer size we need(can be annoying because when created with say size M, , mongo daemon creates M empty documents that can that a very long time depending on M). In terms of caching, because of their fixed size, capped collections are used for high-throughput operations, which is what we were looking for testing out memcache etc. It is also not as complicated as other caching solutions.
On Sep 29, 2015, at 8:26 AM, Dan Allan notifications@github.com<mailto:notifications@github.com> wrote:
@arkilichttps://github.com/arkilic I have a mostly-abandoned PR that pushes Documents to an external process. I rebased it on current master and left it for you at NSLS-II/bluesky#211https://github.com/NSLS-II/bluesky/pull/211.
My model is that the Document "server" would be a callback that subscribes to the RunEngine on the main process. The Document "client" would have an instance of Dispatcher (see bluesky/run_engine.py). That way, if a user wants to switch from running callbacks on the local process to running them on a different process, they merely subscribe to the client's Disptacher instead of the RunEngine's Dispatcher.
That PR of mine does not solve the hard part of the problem -- how to manage potentially overflowing volume of Events. A lossy LIFO queue, or a ring buffer like "capped collections" seems to be, is the missing piece.
I optimistically put my code in a new subpackage, bluesky.dispatchers because it would be cool to cook up several experiments for pushing Documents.
— Reply to this email directly or view it on GitHubhttps://github.com/NSLS-II/wishlist/issues/61#issuecomment-144044162.
On Thu, 1 Oct 2015, Dan Allan wrote:
I think @klauer has some relevant code, but it needs to be integrated.
Currently, our "streaming" processing code (bluesky/callbacks.py, bluesky/broker_callbacks.py, etc.) gets a stream of Event Documents and requests external data from filestore. That second step should be made modular so that the data could come directly from a PV instead. The downstream should not know/care whether the arrays are coming from filestore, PVs, or whatever.
... and gets external data from [pluggable datasource].?
Where "gets" could be via generating requests, or through receiving some sort of messaging (dectris zmq stream) or broadcast or whatever?
See https://github.com/synchbot/metadataclient/blob/master/metadataclient/commands.py#L835
@tacaswell suggests for live stream of events one should be subscribe to bluesky
There is the option to publish the current scan UID to an EPICS PV built into ophyd - coupled with databroker it is sufficient to get all of the scan data. A Document forwarding mechanism based on zmq is currently in the works for a streaming model. Closing for now.
When a new scan starts, it will be great to be reflected with a status value (a PV?).
On-line data analysis tools need access to the running dataset during scan. Could you please provide a tool that can automatically check and read newly saved files?