Introduced Buffer (bytearray), and BufferQueues for data buffering. Different buffering policies - buffer per message and periodic flush (fill buffer until it is full or until timeout - default behaviour). Large messages (multiple buffers per one) are not supported
BufferMemoryTracker keeps track of (shared) memory size used by DataReader and DataWriter queues, blocks if above limit to inflict backpressure
Backpressure via shared channel works
Introduced TransferHandler (runs on TransferActor) which is responsible for passing messages back and forth between peer nodes in a cluster (via single bidirectional node<->node TCP connection): 1 Actor per node, 1 IOLoop per Actor/node, 2 TransferHandler instances per loop/actor/node - one for forward data (Direction.SENDER), one for backwards data (Direction.RECEIVER). TransferActor orchestration is WIP
High level flow is: each operator actor (process) has a DataReader and DataWriter (source operators have only DataWriter, sink operators only DataReader).
2 scenarios:
If operators are on the same node, their DataReader/DataWriter(s) connect directly (each of them has a socket per channel which just connect 1-to-1)
If operators are on different nodes: each node runs a single TransferActor (which runs 2 TransferHandlers described above). DataReader/DataWriter connect to corresponding TransferHandlers on TransferActor, these 2 TransferHandlers pass (and receive acks) to 2 TransferHandlers on peer node which then pass data to corresponding operator's DataReader/DataWriter on another node.
[TODO] Credit-based Flow Control for backpressure
[TODO] Networking metrics
All the necessary logic is in volga.streaming.runtime.network and some orchestration in volga.streaming.runtime.master.worker_lifecycle_controller.py and volga.streaming.runtime.worker.task.stream_task.py
volga.streaming.runtime.network.experimental and volga.streaming.runtime.network.deprecated can be skipped.
Introduced
IOLoop
class which asynchronously polls sockets and triggers handlers (IOHandler
) on each eventEverything is
zmq.PAIR
socket - bidirectional socket, can send and receive.DataReader
+DataWriter
implementIOHandler
ACKing mechanism, ACK batching, retries + tests, makes sure messages are delivered over unreliable channels
Low watermarks + buffering - ensure queues are FIFO + discard duplicates
Introduced
Buffer
(bytearray), andBufferQueues
for data buffering. Different buffering policies - buffer per message and periodic flush (fill buffer until it is full or until timeout - default behaviour). Large messages (multiple buffers per one) are not supportedBufferMemoryTracker
keeps track of (shared) memory size used byDataReader
andDataWriter
queues, blocks if above limit to inflict backpressureBackpressure via shared channel works
Introduced
TransferHandler
(runs onTransferActor
) which is responsible for passing messages back and forth between peer nodes in a cluster (via single bidirectional node<->node TCP connection): 1 Actor per node, 1IOLoop
per Actor/node, 2TransferHandler
instances per loop/actor/node - one for forward data (Direction.SENDER
), one for backwards data (Direction.RECEIVER
).TransferActor
orchestration is WIPHigh level flow is: each operator actor (process) has a
DataReader
andDataWriter
(source operators have onlyDataWriter
, sink operators onlyDataReader
). 2 scenarios:DataReader
/DataWriter
(s) connect directly (each of them has a socket per channel which just connect 1-to-1)DataReader
/DataWriter
connect to corresponding TransferHandlers on TransferActor, these 2 TransferHandlers pass (and receive acks) to 2TransferHandler
s on peer node which then pass data to corresponding operator'sDataReader
/DataWriter
on another node.[TODO] Credit-based Flow Control for backpressure
[TODO] Networking metrics
All the necessary logic is in volga.streaming.runtime.network and some orchestration in volga.streaming.runtime.master.worker_lifecycle_controller.py and volga.streaming.runtime.worker.task.stream_task.py
volga.streaming.runtime.network.experimental and volga.streaming.runtime.network.deprecated can be skipped.