Closed slifty closed 4 years ago
I JUST HAD A REALIZATION
In the spirit of choreography vs direction: the sous chef should spin up line cooks to wait for certain kinds of queue messages, but THOSE MESSAGES SHOULD OFTEN COME FROM OTHER LINE COOKS. The Sous Chef should not have to issue the order to the next line cook, but rather the line cook should be watching the same queue as the sous chef and know when to run with newly released data directly.
For instance: the sous chef spins up a "word extraction" line cook that listens for "caption character" ingredients and emits "caption word" ingredients.
Justin and I were talking through the overall data flow, which I think really helped me imagine the implementation of this thing.
video stream
(name and nature TBD) ingredient.The first line cooks to work with raw streams will probably just be responsible for creating useful atomic outputs from streams e.g. letters, snapshots, video clips, audio clips.
Other line cooks would take those slightly more refined ingredients and further refine them (e.g. letters->words, snapshots->OCR words, snapshots-> object detection, video clips->lip movement detection, etc)
Done a bit of reading; not going to be able to capture all of it here, but first:
kafkajs is a potential alternative to kafka-node so we will want to pick one.
Kafka is going to make a lot of sense for the countertop; with one topic per ingredient (though we need to recognize various "flavors" of ingredient -- e.g. words from captions vs words from OCR).
I'm trying to better understand when Kafka Streams should be used, as opposed to the traditional producer / consumer API. For instance, we may have the sous chef immediately convert the video stream created by the content ingestor into a kafka stream on a video
topic. It may be that the caption extraction also produces a text (caption) stream (which might be consumed to create individual letter events).
Ok, back to reading.
I wonder if we should have appliance packages hot-installed by the countertop, or if that should be a manual yarn command.
It feels more convenient / powerful to have the countertop load in appliance modules from npm that are needed, but it also feels potentially less safe.
I wanted to document some thinking here that goes beyond the scope of #82 so it makes sense to post in this thread.
Some concepts I want to float / document here:
Note: when I say "source" I'm talking about the output of an ingestion engine.
When an appliance is registered with the countertop, the relationship of that appliance to various sources should be provided. Specifically, the countertop coordinator would be told to either:
This would be used for things like:
This would be used for most appliances that are actually transforming data.
If appliances can emit payloads with new sourceID
s on runtime, the countertop coordinator would need to monitor the payloads in search of new stream IDs. This is because it would need to trigger the creation of new appliances in any station that is expected to process that new source.
If, however, sources are registered as part of the appliance / topology definition then the countertop coordinator has a much easier job (and therefore much less overhead). We also don't couple event emissions with the task of topology definition (e.g. a new payload
event is not being used for both the output of the data pipeline as well as the CREATION of that data pipeline)
For this reason, I think sources should be explicitly registered as part of a given TVK implementation.
Registration could happen in two ways:
registerSource
method or something along those lines, which would be run separately from registerAppliance
registerAppliance
configuration (e.g. when registering an appliance the TVK implementation would indicate whether that appliance will produce data to a new source).My concern with leaning on registerAppliance
is that burying this inside of a configuration makes it feel option, when in reality a topology is not complete without the definition of sources.
So, given that, why don't we borrow from the previous section and make the two methods of registering appliances be these two methods:
registerSource
which takes an IAppliance + configuration + source name. This triggers the registration of a new appliance + station (which will serve as the origin of that source), as well as register the source with other stations so they can react accordingly.
registerAppliance
which takes an IAppliance + configuration. This triggers the registration of a new appliance + station.
Payloads need a stream ID, so where will that come from?
The IAppliance itself won't be assumed to apply the stream ID, since a given appliance may be invoked in lots of different stream contexts. New stream IDs can only be created via registerSource
.
This means the worker should apply stream IDs to any payloads it emits, since the worker is what is talking to Kafka.
How does the worker know the stream ID? It should be assigned by the CountertopStation
when the worker is created.
Countertop Workers will read and write directly from kafka.
Countertop Workers will emit events.
Sous Chef will subscribe to the countertop worker events (and most likely simply forward them). It will NOT subscribe to kafka.
TL;DR
CountertopWorkers
are the only aspect of the countertop to work with Kafka directly.CountertopWorkers
emit events to CountertopStations
CountertopStations
emit to the CountertopCoordinator
CountertopStations
are created when a new IAppliance
+ configuration
pair are registered with the CountertopCoordinator
.CountertopStations
manage the creation of CountertopWorkers
and IAppliance
instances.registerSource
.CountertopCoordinator
. The data associated with a source is created by IAppliances
using the same model as any other IAppliance (e.g. with Stations and Workers). The only distinction is that an appliance registered via registerSource
is always a singleton appliance (e.g. their station does not create multiples copies of the appliance in response to new streams).CountertopWorkers
will take Payloads
emitted by IAppliances
and assign them to their registered source
.CountertopStation
can either route "all" (quotes because we may allow source filtering) Payloads
through a single instance of its IAppliance
(i.e. singleton station), or create one IAppliance
per specified source
(i.e. factory station). This behavior is specified at the point of appliance registration.Spoke with @chriszs about this a bit, he mentioned the model that Pachyderm uses involves naming inputs and outputs, and connecting explicitly as things are named (I may be summarizing this incorrectly).
The point is, that we should think about whether it is better to treat sources as a special case or treat multi-input as a special case. For instance, the countertop coordinator could just ensure that every distinct appliance with an output has a distinct appliance to accept the input. Food for thought.
Some example topologies may help here:
This will involve the registration of 5 separate appliances:
C-SPAN and CNN ingestion appliances are both "sources" here since they originate content.
Two copies of the caption extraction appliance would be created -- one per source (which also means one per input stream). Two copies of frame extraction would be created -- one per source (which also means one per input stream).
The challenge is in matching the correct output stream from Station C with the correct output stream from Station D. If the sources are named then the countertop coordinator can do that matching automatically, with no configuration. If the sources are not named then the TVK implementation would need to specify that in configuration. The problem is that the appliances (and streams) within a station are not managed by the TVK implementation...
For this reason, I think we do need to have named sources.
That being said, this exercise has exposed something: sources can be automatically detected IN SOME CASES (specifically for appliances with no input types).
Some thoughts:
One more topology (wrote all this before Chris' note)
How does station E know to create four appliances? Frankly, because it knows there are four distinct appliances producing TEXT.ATOM
payloads. In other words, it isn't just about the source, it is about the source AND the station.
So this means that the countertop coordinator will need to keep track of distinct station + source pairs. This is achievable -- the coordinator could definitely define a topology in terms of the stations and data types since both of those items are known.
replying to @chriszs's excellent points:
I wonder if it's the streams we're naming, just doing it at the source because that's where the stream originates.
Totally agree!
Do the names always have to be human readable to work? Or can they be automatically generated identifiers?
Automatically generated I think (see below)
Do we want the configuration to have to refer to the stream name? Would it be handy if it could in some cases?
I think that some config settings when registering a new appliance would benefit from being able to use the IDs we have in mind (see below).
Thinking think maybe "name" (or even ID) of the stream isn't the right way to think about it so much as "station IDs we care about" -- so for instance, we could say when defining Station E that we don't care about Station D or Station B.
Basically, a Payload's stream is defined by the stations / appliance types it has passed through.
When registering an appliance, you would be able to specify if there are any stations whose payloads should be ignored, otherwise the countertop coordinator will create a new appliance instance for EVERY POSSIBLE station combination that could have led to it.
Re: the prior message. Station E might not have to know that if all it knew was there were four distinct and active text atom streams (source unknown). The name/IDs bit to me really is about solving the problem where appliances need to merge multiple types of output from the same stream.
But there may also be cases where you want to process some streams with an appliance configured one way, but others a different way. CNN and Fox have different chyrons, for instance.
Word -- you're right in that case indeed there would be two stations (which would have to be configurable to only follow their pairs using config and station IDs).
Here's another toplogy to add to the mix:
Note that this toplogy does NOT cross A/B streams -- It would rely on logic to say that origin streams are indeed special.
So basically the logic of station F is to look at the first and last stream IDs for EACH DATA TYPE it consumes.
For instance, we wouldn't have an { A,D + B,E)
The algorithm's implied (not trivial to implement, and not trying to type out the pseudo code, but hopefully the pattern makes sense)
Not sure I follow why the combination of station pairs is necessary.
In case this helps: the gif generation appliance is a strawman and stupid but bear with me: it generates a gif and burns on captions (think CATS 2019 april fools)
So in this case the topology would create four sets of gifs:
1) CNN Frames + CNN OCR text 2) CNN Frames + CNN caption text 3) CSPAN Frames + CSPAN OCR text 4) CSPAN Frames + CSPAN caption text
But it would NOT create things like:
CNN Frames + CSPAN OCR Text (etc.)
Regarding the need for pairs, I'm not sure how the system would know to pair CNN frames with CNN text without keeping track of the station chain (e.g. knowing that a packet ultimately came from a transformation of Station A
data)
To your point earlier around the non-multi-input appliance case: yes, I think that a new appliance would ultimately look at the streams that exist for its inputs, and create one appliance per stream.
For appliances that take in multiple types of input, the coordinator wouldl need to create matched pairs of streams (and create sub-pairs for each input station).
So, "worst case" so far is that a new appliance need to will care about:
When there is only one type of input expected, the new station will not care about the source appliances since it's not trying to form sets of common data.
On configuration of a new appliance the TVK Implementation should be able to specify:
This way you could say "I wanna make gifs for all sources, but only using OCR text" or maybe "I only want to make gifs for CNN, but I wanna make both types". Or even for single input appliances: "I want this OCR to only run on Fox".
To play devil's advocate: That example makes sense, but it was defined in terms of named source streams (CNN and C-SPAN) and payload types (text). Station E didn't really need to know the text came from OCR or captions (or the specific station IDs), just that there were four text streams from two sources. Further, if there'd been another hop in the topology (let's say you wanted to process the text first), it wouldn't have needed to know about that distinction either (or that there's another combination of station IDs).
Though let's say someone wanted to configure it to work on processed text and only processed text, how would they specify that? One could define a new type, I suppose.
I guess my thought is still, the less E has to know about every step before it, the less complicated it becomes. To my mind, based on the examples, the two things E has to know are: what type of data am I accepting and what stream is it part of? There could be more complicated topologies where that assumption breaks down (almost certainly there are), but I haven't seen that demonstrated yet.
solid point -- and here i another example that I think might be helpful:
(where the cartoon filter would output manipulated frames, but ultimately they would still be frames)
Specifically, the appliances watching for "cartoon frames" don't care that those frames had touched station C.
I think maybe we're saying similar things? That an appliance only cares about what stream (aka source / initial station?) a payload is a part of, and what type of payload it is (the catch here being that filtered frames and non filtered frames are the same type, so I suppose I'm proposing that we're actually using type
for station level routing (e.g. station E takes payloads of type IMAGE.FRAME
or whatever) and and we're using the station ID that generated the payload for routing within the station.
(But maybe station combinations is a reasonable generalization of those two things it has to know in these topologies to multi-tiered topologies and my fears about over-complication won't turn out to be founded.)
Ty for beating the do-not-overcomplicate-drum by the way -- totally valid fear 😂.
I wanna elevate this framing in case it's helpful (this is more a way of thinking, not necessarily an implementation approach)
Within a given station, there will be one appliance per stream ID.
in cases where streams have diverged (e.g. the raw source was turned into two types of data, and this station is combining streams), it would use the origin portion of the stream ID to make sure to only create one appliance per "origin-cohearant stream combination" -- e.g. one appliance per combination of streams that ultimately came from the same source / origin.
Note: animated WebP > animated GIF, better colour and compression. YT video previews are animated WebP.
@chriszs I think I'm finally grokking what you're saying. "A" Text from OCR and "A" text from captions combined with "A" frames really could be simplified to "two sets of A text, one set of A frames" and it's the same result from an appliance count perspective.
I do think for routing it's going to be necessary to know that the payload came from "Station C" simply because "Station C" text payloads do all need to wind up in the same appliance .
I also think that being able to identify that the resulting GIF was OCR based text will be important from a use case perspective, so knowing the processing history within a payload will still be useful...
I'm gonna try to barrel down a first pass at #82 based on some of this.
@slifty Yeah, agree it could be useful.
Here's another layer (useful as I'm writing the logic to create these topologies)
That's certainly what every workflow with DAGs depends on.
MB I deleted my comment, which was something like "I think that we can assume there are no cycles in our topologies for now"
For the sake of sanity, I'm also going to make a rule for now that:
A given stream can only have one source.
Some day we may allow this to change, (e.g. CNN TV + idk, CNN twitter stream), but that is a far away use case.
This is complete! There are future improvements to be made to the countertop, particularly to allow added appliances to specify stations they want to ignore (or to combine multiple sources). We'll make issues for those kinds of issues if / when they hit the roadmap.
Task
See the full architecture here
The countertop is responsible for coordinating (and ultimately executing) the transformation or TV Kitchen data.
It is overseen by the sous chef, which accepts new sources of data from the input coordinator and kicks off the steps of various configured recipes.
Line cooks are responsible for taking orders from the sous chef, maintaining state, doing the required transformation, and sending the results back to the sous chef.
Binary byproducts (e.g. snapshots or segments of video) generated by line cooks should be stored in the local filesystem, rather than being stored in memory.
The sous chef will spin up line cooks, but will ultimately communicate with them through the coordination engine (Kafka).
Description
We need to create (or set up) the following system components:
Relevant Resources / Research
We'll be setting up kafka We may want to consider using kafka-node
Related Issues
None yet, though we may want to consider creating sub-issues for the individual countertop components.