C5T / Current

C++ framework for realtime machine learning.
https://medium.com/dima-korolev/current-for-realtime-machine-learning-4f04aa8ab81a
97 stars 29 forks source link

Streamed sockets larger test. #871

Closed dkorolev closed 5 years ago

dkorolev commented 5 years ago

[ NOTE: This PR contains Trivial latency measurements, #870. ]

Hi Max,

First of all, let me apologize up front: this code is not very clean by my standards. It has obvious flaws; in the past few dozen hours I even sacrificed restartability, so the modules should be spawned from scratch, in the right order, in order for the test to work (otherwise data packets broken and re-sent from the middle of the 32-byte block would ruin everything).

Anyway, to give you a better idea, here is the Kafka-esque picture of what I ultimately want to arrive at (pr-871-plan.dot), with business logic processing shards, and frontend load balancing nodes, monitoring, and high availability servicing logic omitted for clarity:

image

And here is what is actually implemented in this PR (pr-871.dot):

image


The above is what I just tested on five EC2 m4.16xlarge x64 machines (ami-04b762b4289fba92b (64-bit x86) / ami-03920bf5f903e90d4 (64-bit Arm)). The five machines are, in the reverse order of what's started by examples/streamed_sockets/latencytest/run_all_locally.sh:

  1. Generator.
  2. Indexer.
  3. Forwarder.
  4. Processor.
  5. Terminator.

The generator binary sends out some "fake" data, to simulate real traffic. It also injects the "real" requests, which are not really "handled" by the processor, but their latency is what's measured. This is the dotted line back.

The indexer binary does several things:

Basically, the indexer is the highest per-machine load I am imagining for now, business logic excluded. It has three network flows (two outbound, one inbound), and it also writes everything it receives to disk.

Obviously, in the indexing phase, the "receive from network" and "add sequence IDs" functions are synchronous, while the three immutable "outputs" -- to both network destinations and to disk -- are run in parallel.

The forward binary is simply sending the traffic forward and also keeps its local copy. I introduced it only to make sure the latency doesn't suffer much when another "save and send over" node is present. After all, the purpose of this experiment largely is to measure the end-to-end latency, and it is important to know we can both fan out the data streams (in the 1-to-2 manner) and save them to disk.

The processor binary ignores everything but the "real" requests, which it sends back to the generator, via the dotted line. This is the latency of which is measured, end to end, by the generator.

Finally, the terminator binary is, actually, a copy-paste of the receiver from the ../speedtest directory. I only copied it into here to keep the test self-contained in one directory. All it does, except just receiving the second half of the splitted-in-two data flow from the indexer is measuring the GB/s of the traffic, and printing it to the standard output.


The TL;DR of the result of my test is:

I measured the cross-instance latency (using the code from #870) as well, and it's about 4ms on EC2. Whether we could do better there is an open question, which I'd rather not invest extra time into.

The resulting binary files (saved by the indexer and the forwarder independently) match MD5-wise.

Two closing remarks:

  1. Disabling the Nagle algorithm has no effect at all, on both throughput and latency.
  2. Disabling writing to disk has no effect as well. It just works at the rate above 0.6GB/s, and multithreading does its job.
  3. There is a lot that can be done to improve the code. And I plan to put a stop to this for now. Got some real work to do.

Obviously, this is a very crude test. But it does show that it is indeed possible to show decent throughput and decent latency without employing zero-copy sockets or lock-free queues.

I wish we had access to (five) machines of 1GB/s++ network per connection (which Amazon advertises, but doesn't deliver as per my experiments so far). And I see no reason why would this code not utilize the whole 1GB/s++ throughput, while indexing every single 32-byte blob, keeping offline copies of it on several machines, and performing (asynchronous!) business logic on this data stream.

Thanks, Dima

mzhurovich commented 5 years ago

@dkorolev Thank you! This is a very nice example of using high-level primitives and abstractions to achieve superior stream processing performance!

dkorolev commented 5 years ago

Tested this just-merged-in code on five fresh m4.16xlarge EC2 instances.

Instance setup command:

sudo yum install -y git make clang
git clone https://github.com/dkorolev/current
cd current/examples/streamed_sockets/latencytest
NDEBUG=1 make -j

Then I spin up five binaries, in the following order, back to front.

 five$ ./.current/terminator
 four$ ./.current/processor --host  $ONE_PRIVATE_IP
three$ ./.current/forward   --host  $FOUR_PRIVATE_IP
  two$ ./.current/indexer   --host1 $THREE_PRIVATE_IP --host2 $FIVE_PRIVATE_IP
  one$ ./.current/generator --host  $TWO_PRIVATE_IP

The output of generator, on one:

Format: "$(average) + $(p01) / $(p10) / $(median) / $(p90) / $(p99) + N=$(sample size)".
18.6ms + 16.3ms / 17.2ms / 18.6ms / 20.0ms / 21.0ms + N=825

The output of terminator, on five: 0.624GB/s

The md5sum hashes of the saved files match.

594d2d5caed8c5a14addef44881456ee  .current/idx.0x00000000c4800000-0x00000000c4ffffff.bin
290b57f48da40345a18d49c796c34c60  .current/idx.0x00000000c5000000-0x00000000c57fffff.bin
6b43228027e95900bc3d55fdb8e814e8  .current/idx.0x00000000c5800000-0x00000000c5ffffff.bin
25e86c8a1209a69a764a61f6642670cf  .current/idx.0x00000000c6000000-0x00000000c67fffff.bin
594d2d5caed8c5a14addef44881456ee  .current/fwd.0x00000000c4800000-0x00000000c4ffffff.bin
290b57f48da40345a18d49c796c34c60  .current/fwd.0x00000000c5000000-0x00000000c57fffff.bin
6b43228027e95900bc3d55fdb8e814e8  .current/fwd.0x00000000c5800000-0x00000000c5ffffff.bin
25e86c8a1209a69a764a61f6642670cf  .current/fwd.0x00000000c6000000-0x00000000c67fffff.bin

Each of the above files is 256MB.


The locally-run test will be slower now, and I know why: because, when on the same machine, the receiver should have higher receive throughput than the sender -- i.e. the read block size of BlockingRead of the processor and forwarder have to be larger than the read block size of the indexer. Good thing we don't need to support both the local and the remote runs :-)


Thanks, Dima

dkorolev commented 5 years ago

Addendum: the blog post on which this code is loosely based.