Open brettviren opened 2 years ago
Another approach is to introduce two new WCT DFP node categories which can allow some amount of asynchronicity to develop over some subgraph. the two categories are described:
A branch node category would be a hybrid of fanout and queuedout categories. An IBranchNode
would define:
using queuedany = std::deque<boost::any>;
using multiqueuedany = std::vector<queuedany>;
bool operator()(const boost::any& anyin, multiqueuedany& outqs);
A mid-level branch iface may define specific types (instead of any
) and may fix the cardinality of the queues via a tuple
(or leave vector
).
A concrete branch node must follow this protocol:
1) Accept the anyin
passed on each call.
2) Document the semantic meaning of each output port (eg, port 0 is for "good" data, port 1 is for "bad" data)
3) Send zero or more data out as needed.
4) When anyin
is EOS (nullptr
), flush any waiting data if applicable, and send the EOS to every output queue.
A branch node in isolation would cause great chaos. A countering node is needed, which brings us to:
A merge node category would be hybrid of the fanin, the non-existing queuedin and the queuedout categories. An IMergeNode
would define:
bool operator()(const multiqueuedany& inqs, queuedany& outq);
A mid-level merge iface may define specific types and may or not fix the cardinality of the input queues.
A concrete merge node must follow this protocol:
1) Accept (pop) input from each of inqs
only when their head object is ready to consume (ie, it is okay to keep input objects in the queue).
2) Document any semantic differences in how data from each input queue is interpreted.
3) When an EOS is is seen on any input queue, no further objects may be read from that input queue until an EOS has been found on all other input queues. At which point, the EOS should be popped from each, any internal flushing done and a single EOS added to the end of the output queue.
The primary intention of a merge node is to undo the chaos created by a branch node. Besides enforcing synchronicity at EOS a merge node likely will:
Beside "good" and "bad" data we also must support "sparse processing".
For example, in the simulation some of the APA may see no depos. Or may see depos only from "uninteresting" sources (say, Ar39 but no neutrinos). We wish to define a job that spends time on noise simulation, NF+SP+... for that APA only if it is "interesting". When the APA is "uninteresting" we wish to just pass some "placeholder" (as first part of this ticket) or better simply form a "shortcut" past all the expensive downstream components.
Another certain example, the FD DAQ will produce very sparse trigger records. Easily 90% of the APAs will have no data. We'd like to simply "shortcut" around NF+SP+IMG+... for these "empty" frames and only have data pass through "real" components when the frame is not empty.
For both scenarios we would write a special branch node that enacts some policy. It would, say, have output port 0 producing "interesting" or "full" frames and port 1 producing essentially empty "uninteresting" frames, but still containing metadata like frame ID, time.
Some long chain of processing would intervene between the branch port 0 and a merge input port 0 while branch port 1 would directly connect to merge port 1.
If we run in /art/ or otherwise one trigger record per graph execution then only one of these branched pipelines ever see data at any given time. The branch sends output which ever port and merge gets it and EOS flushes.
When run with multiple TRs in flight things get more complicated. Say TR0 is "interesting" while the next 9 are "empty" and then an EOS is sent. The branch will send TR1 to the long, slow pipeline and then quickly send TRs 1-9 down the "shortcut" and then EOS down both. The merge will see 1-9 + EOS almost instantly on port 1. In order to not prematurely output TR1 it must be configured with an initial TR number (0). Since it has not yet seen TR0 it must buffer TRs1-9+EOS. It then gets TR0 on port 0 and knows it can output TRs 0-9. And it sees EOS on both inputs so it knows it can also output EOS.
This puts a large burden on the node to follow proper policy. But, policy is inherently variant and there is no way to bake it in at the graph execution engine level.
This shows a cartoon flow graph of the above scenario.
We have for a long time had the partly implemented hydra node category.
Hydra actually provides a super-set of the functionality needed for both branch and merge categories as described. A merge is exactly a hydra with two inputs and one output. A branch is exactly a hybrid with two outputs and one input with the only difference being that a branch input as described would be a scalar not a queue. Giving branches a queued input is perhaps a small conceptual wart.
I will look at completing the hydra category implementations.
The problem
As a practical matter, output from a node may be garbage or otherwise exceptional. It may be impossible for any nodes downstream of the garbage to ever produce reasonable results and attempts to do so can lead to consumption of large computing resources and may even crash unwary code.
However, such garbage can often be detected and we want:
A way to avoid sending garbage to the nominal downstream subgraph.
Apply some exceptional downstream subgraph to do something with the garbage.
Keeping sync
At the same time we must allow the downstream subgraph to maintain synchronicity with possible sibling subgraphs that share a common upstream node. For example, if one APA worth of data is found to be garbage by a node in a per-APA pipeline but the data from the other N-1 APAs are fine and the graph joins all N pipelines we must have each pipeline output objects in sync across the pipelines.
This sync must be maintained even in light of the fact that
QueuedOut
nodes exist. These typically cache data internally until some feature in the data indicates that it has become "time" to produce output.WCT data flow graph protocol allows for an end-of-stream (EOS) marker (
nullptr
) to flow across any edge. An EOS is not itself an object place holder but can be thought of as an EOF marker for file I/O or end of string\0
marker for C strings. All nodes are required to honor EOS byAs EOS is itself not considered an object in the protocol we must device a way to indicate "I was going to produce some garbage but instead I produce something else". The simplest choice is to produce an "empty" object. The "emptiness" is in whatever sense allowed by the data type. Eg,
IFrame
may be produced with no tracesISlice
may be produced with no activityICluster
may be produced with not nodes (or perhaps none of a given type)Note the "empty" object represents not the WCT graph protocol but a contract between node implementations. It places a burden on any downstream node to do something meaningful with an "empty" object.
Removing garbage
Adopting the "empty is a place holder for garbage" convention we may develop an
isgood?
filter which either outputs its input or outputs an "empty" object based on some criteria on the input.or
Garbage dispatch
Next, we need a way to construct an exception handling subgraph which can receive the exceptional data. It may also require to maintain synchronization. Effectively it needs the logical NOT of the output of
isgood?
. That is, whenisgood?
outputs an "empty" object, the exception subgraph should receive the original garbage and whenisgood?
passes the original input then the exception subgraph should receive an "empty" marker object. One mans garbage is another man's treasure.In order to reuse
isgood?
we may construct a subgraph like:Here
isgood?
passes through theframe
when good, else sends an empty frame. Thehandle exception
node has two input ports each taking a frame in synchrony. When port 0 has an empty frame,handle exception
knows the frame on port 1 is garbage and may dispatch it as such. Otherwise it receives the same (good) frame on both ports. Dispatch can be to provide the logical NOT as described above or do something like save the garbage to a special file.Detection can itself produce summary information and which wants to be dispatched. In the above subgraph the
dispatch
would have to repeat the summary calculation. If that is expensive then a tighter composition can be made.Here a compound
goodbad
with two output ports forwards "good" input to output or when detecting "bad" input sends an empty frame to port 0. On port 1 it sends an empty frame if input is "good" (to keep sync) or it sends a frame which includes the summary info in a frame. IfIFrame
is not a suitable type to hold the summary then a new type must be used or invented.Most tightly coupled composition would be:
Here, the complex node
goodbaddump
would output its input if it is "good" else it would output an empty frame. As a side effect it would dispatch the "badness" information (eg, log file, hidden local file I/O).Steps to fixing this
For this issue:
isgood?
,handle exception
with example job (dispatch
can be a frame dumper).goodbad
(maybe in terms ofisgood?
) and example job.