WallarooLabs / wally

Distributed Stream Processing
https://www.wallaroolabs.com
Apache License 2.0
1.48k stars 68 forks source link

Source & sink parallelism does not yield TCP sink output diversity #2986

Open slfritchie opened 5 years ago

slfritchie commented 5 years ago

Is this a bug, feature request, or feedback?

Bug

What is the current behavior?

When using parallelism parameters on both a TCP source & sink, the work of multiple input TCP streams is all routed to a single sink actor, thus Wallaroo's output is sent by only one TCP connection.

What is the expected behavior?

When receiving input from multiple simultaneous TCP source connections, the output work of this simple app would be output by multiple TCP connections

What OS and version of Wallaroo are you using?

Ubuntu Xenial/16.04 + Linux, see Wallaroo commits IDs below

Steps to reproduce?

git checkout 5d38e7a4250 ; echo position of origin/newschool-perf2
git merge 480e4d2243 ; echo position of origin/multi-sink-per-worker
patch -p1 <<EOF
diff --git a/examples/pony/passthrough/passthrough.pony b/examples/pony/passthrough/passthrough.pony
index 66c647adb..21aa6bfbd 100644
--- a/examples/pony/passthrough/passthrough.pony
+++ b/examples/pony/passthrough/passthrough.pony
@@ -30,16 +30,19 @@ type InputBlob is Array[U8] val

 actor Main
   new create(env: Env) =>
+    let par_factor: USize = 64
+
     try
       let pipeline = recover val
           let inputs = Wallaroo.source[InputBlob]("Input",
                 TCPSourceConfig[InputBlob].from_options(InputBlobDecoder,
                   TCPSourceConfigCLIParser("InputBlobs", env.args)?
-                  where parallelism' = 64))
+                  where parallelism' = par_factor))

           inputs
             .to_sink(TCPSinkConfig[InputBlob].from_options(
-              InputBlobEncoder, TCPSinkConfigCLIParser(env.args)?(0)?))
+              InputBlobEncoder, TCPSinkConfigCLIParser(env.args)?(0)?)
+              where parallelism = par_factor)
         end
       Wallaroo.build_application(env, "Passthrough", pipeline)
     else
EOF
make PONYCFLAGS="--verbose=1" build-examples-pony-passthrough build-testing-tools-fixed_length_message_blaster build-utils-data_receiver
./bin/data_receiver --listen 0.0.0.0:8081 --no-write --ponynoblock --ponythreads=2 --ponyminthreads=2 > ./receiver.out 2>&1 &
./examples/pony/passthrough/passthrough --in InputBlobs@0.0.0.0:8080 --out 127.0.0.1:8081 --cluster-initializer --control 127.0.0.1:12500 --data 127.0.0.1:12501 --worker-count 1 --metrics 127.0.0.1:9999 --ponynoblock --ponypinasio --ponythreads=35 --ponyminthreads=999 2>&1 > /tmp/wal &

The contents of test-test.bin can be found at http://wallaroolabs-dev.s3.amazonaws.com/logs/test-test.bin (16 KBytes). Run the load generator procs using:

for i in `seq 1 24`; do ./testing/tools/fixed_length_message_blaster/fixed_length_message_blaster --host 127.0.0.1:8080 --file ./test-test.bin --msg-size 16432 --batch-size 1 --report-interval 999999999999 --time-limit 60000000 --msec-interval 500 --catch-up --throttled-messages  --ponynoblock --ponypinasio --ponythreads=1 --ponyminthreads=1 |& egrep '^f' & done; wait

Then use tcpdump -i lo port 8081 to find the one TCP port that all sink traffic is being sent to, e.g., 15:41:01.089052 IP 127.0.0.1.40994 > 127.0.0.1.8081: Flags [P.], seq 82160:82212, ack 1, win 342, options [nop,nop,TS val 147491792 ecr 147491668], length 52. Then refine the tcpdump command to filter out port 40994.

tcpdump -i lo port 8081 and not port 40944

If Wallaroo is sending data on only one TCP connection, then that tcpdump proc will see 0 packets.

slfritchie commented 5 years ago

All instances of an empty pipeline or a pipeline with a single stateless or state computation exhibit the same behavior: only a single sink TCP connection sends data to the sink.

jtfmumm commented 5 years ago

I've created a distinct issue (#2988) for the case of state computations. That's because we actually have 3 scenarios here related to current work on multiple sinks per worker per pipeline on the multi-sink-per-worker branch: 1) steps in state stage step groups should each be connected to one of multiple sinks on a worker as is. If this is not true, that's a bug in current code on that branch. 2) sources will not, given the code on that branch, be connected to one of multiple sinks per worker, for reasons related to how we currently assign unique ids to source actors. This needs to be treated as a separate unit of work since the solution is not straightforward, and the intended results on the current branch are an improvement over master even without this functionality. 3) computations in a stateless stage immediately following a source will be coalesced onto the source, meaning that, given (2), they will not be plugged into multiple sinks given current code.