bytewax / bytewax

Python Stream Processing
https://docs.bytewax.io/
Apache License 2.0
1.55k stars 64 forks source link

Bytewax does not scale in case of single process and multiple workers #405

Closed szhem closed 8 months ago

szhem commented 8 months ago

Bug Description

It seems that Bytewax does not scale in case of single process and multiple workers. Having more threads does not lead to any speed up in case of pretty large batch_size, and slows down processing multiple times (up to x4 comparing to single thread mode) in case of batch_size=1 while increasing number of workers.

Python version (python -V)

Python 3.11.2

Bytewax version (pip list | grep bytewax)

bytewax 0.18.2

Operating System version (uname -morp)

Darwin 23.3.0 arm64 arm

Relevant log output

No response

Steps to Reproduce

Here is code snippet to reproduce an issue

main_bytewax.py

from bytewax.testing import _IterSourcePartition
from bytewax.inputs import FixedPartitionedSource
from bytewax.outputs import DynamicSink, StatelessSinkPartition
from bytewax.dataflow import Dataflow
import bytewax.operators as op
import threading

class _NullSink(StatelessSinkPartition):

    def write_batch(self, items):
        pass

class NullOutput(DynamicSink):

    def __init__(self, **config):
        print(f"Created sink for output: {threading.get_ident()}")
        self._config = config

    def build(self, worker_index, worker_count):
        print(f"Created part for output: [{threading.get_ident()}, worker: {worker_index}, num_workers: {worker_count}]")
        return _NullSink()

class TestingSource(FixedPartitionedSource):

    def __init__(self, num_records, num_workers=1, batch_size = 1):
        print(f"Created source for input: {threading.get_ident()}")
        self._num_workers = num_workers
        self._num_records = num_records
        self._batch_size = batch_size

    def list_parts(self):
        return [ f"part:{str(p)}:{self._num_workers}" for p in range(self._num_workers) ]

    def build_part(self, now, for_part, resume_state):
        part_info = for_part.split(':')
        records_per_part = self._num_records // self._num_workers
        print(f"Created {for_part} for input: [thread: {threading.get_ident()}, part_id: {part_info[1]}, parts: {part_info[2]}, part_records: {records_per_part}]")

        return _IterSourcePartition(range(records_per_part), self._batch_size, resume_state)

def run(records=1000000, batch=1, num_workers=1):
    flow = Dataflow('dataflow')
    stream = op.input('in', flow, TestingSource(records, num_workers, batch))
    op.output('out', stream, NullOutput())
    return flow

And here is how I've run it

1 process, 1 worker, 100M of elements, 10K batch size

✗ time python -m bytewax.run -w 1 "main_bytewax:run(100000000, 10000, 1)"
Created source for input: 8048286784
Created sink for output: 8048286784
Created part for output: [8048286784, worker: 0, num_workers: 1]
Created part:0:1 for input: [thread: 8048286784, part_id: 0, parts: 1, part_records: 100000000]
python -m bytewax.run -w 1 "main_bytewax:run(100000000, 10000, 1)"  14.35s user 0.03s system 99% cpu 14.505 total

1 process, 2 workers, 100M of elements, 10K batch size

✗ time python -m bytewax.run -w 2 "main_bytewax:run(100000000, 10000, 2)"
Created source for input: 8048286784
Created sink for output: 8048286784
Created part for output: [6108033024, worker: 1, num_workers: 2]
Created part for output: [6105886720, worker: 0, num_workers: 2]
Created part:1:2 for input: [thread: 6108033024, part_id: 1, parts: 2, part_records: 50000000]
Created part:0:2 for input: [thread: 6105886720, part_id: 0, parts: 2, part_records: 50000000]
python -m bytewax.run -w 2 "main_bytewax:run(100000000, 10000, 2)"  14.32s user 0.17s system 100% cpu 14.468 total

1 process, 4 workers, 100M of elements, 10K batch size

✗ time python -m bytewax.run -w 4 "main_bytewax:run(100000000, 10000, 4)"
Created source for input: 8048286784
Created sink for output: 8048286784
Created part for output: [6133886976, worker: 3, num_workers: 4]
Created part for output: [6129594368, worker: 1, num_workers: 4]
Created part for output: [6131740672, worker: 2, num_workers: 4]
Created part for output: [6127448064, worker: 0, num_workers: 4]
Created part:2:4 for input: [thread: 6131740672, part_id: 2, parts: 4, part_records: 25000000]
Created part:0:4 for input: [thread: 6127448064, part_id: 0, parts: 4, part_records: 25000000]
Created part:3:4 for input: [thread: 6133886976, part_id: 3, parts: 4, part_records: 25000000]
Created part:1:4 for input: [thread: 6129594368, part_id: 1, parts: 4, part_records: 25000000]
python -m bytewax.run -w 4 "main_bytewax:run(100000000, 10000, 4)"  14.45s user 0.17s system 100% cpu 14.564 total

1 process, 1 worker, 50M of elements, 10K batch sizetime python -m bytewax.run -w 1

✗ time python -m bytewax.run -w 1 "main_bytewax:run(50000000, 10000, 1)"
Created source for input: 8048286784
Created sink for output: 8048286784
Created part for output: [8048286784, worker: 0, num_workers: 1]
Created part:0:1 for input: [thread: 8048286784, part_id: 0, parts: 1, part_records: 50000000]
python -m bytewax.run -w 1 "main_bytewax:run(50000000, 10000, 1)"  7.19s user 0.02s system 99% cpu 7.254 total

1 process, 1 worker, 25M of elements, 10K batch sizetime python -m bytewax.run -w 1

✗ time python -m bytewax.run -w 1 "main_bytewax:run(25000000, 10000, 1)"
Created source for input: 8048286784
Created sink for output: 8048286784
Created part for output: [8048286784, worker: 0, num_workers: 1]
Created part:0:1 for input: [thread: 8048286784, part_id: 0, parts: 1, part_records: 25000000]
python -m bytewax.run -w 1 "main_bytewax:run(25000000, 10000, 1)"  3.63s user 0.02s system 98% cpu 3.692 total

1 process, 1 worker, 10M of elements, 1 element batch size

✗ time python -m bytewax.run -w 1 "main_bytewax:run(10000000, 1, 1)"
Created source for input: 8048286784
Created sink for output: 8048286784
Created part for output: [8048286784, worker: 0, num_workers: 1]
Created part:0:1 for input: [thread: 8048286784, part_id: 0, parts: 1, part_records: 10000000]
python -m bytewax.run -w 1 "main_bytewax:run(10000000, 1, 1)"  18.58s user 0.02s system 99% cpu 18.633 total

1 process, 2 workers, 10M of elements, 1 element batch size

✗ time python -m bytewax.run -w 2 "main_bytewax:run(10000000, 1, 2)"
Created source for input: 8048286784
Created sink for output: 8048286784
Created part for output: [6163050496, worker: 0, num_workers: 2]
Created part for output: [6165196800, worker: 1, num_workers: 2]
Created part:0:2 for input: [thread: 6163050496, part_id: 0, parts: 2, part_records: 5000000]
Created part:1:2 for input: [thread: 6165196800, part_id: 1, parts: 2, part_records: 5000000]
python -m bytewax.run -w 2 "main_bytewax:run(10000000, 1, 2)"  38.24s user 48.19s system 110% cpu 1:18.15 total

1 process, 4 workers, 10M of elements, 1 element batch size

✗ time python -m bytewax.run -w 4 "main_bytewax:run(10000000, 1, 4)"
Created source for input: 8048286784
Created sink for output: 8048286784
Created part for output: [6162903040, worker: 1, num_workers: 4]
Created part for output: [6167195648, worker: 3, num_workers: 4]
Created part for output: [6165049344, worker: 2, num_workers: 4]
Created part for output: [6160756736, worker: 0, num_workers: 4]
Created part:3:4 for input: [thread: 6167195648, part_id: 3, parts: 4, part_records: 2500000]
Created part:0:4 for input: [thread: 6160756736, part_id: 0, parts: 4, part_records: 2500000]
Created part:1:4 for input: [thread: 6162903040, part_id: 1, parts: 4, part_records: 2500000]
Created part:2:4 for input: [thread: 6165049344, part_id: 2, parts: 4, part_records: 2500000]
python -m bytewax.run -w 4 "main_bytewax:run(10000000, 1, 4)"  52.75s user 88.69s system 132% cpu 1:46.51 total

1 process, 1 workers, 50M of elements, 1 element batch size

✗ time python -m bytewax.run -w 1 "main_bytewax:run(5000000, 1, 1)"
Created source for input: 8048286784
Created sink for output: 8048286784
Created part for output: [8048286784, worker: 0, num_workers: 1]
Created part:0:1 for input: [thread: 8048286784, part_id: 0, parts: 1, part_records: 5000000]
python -m bytewax.run -w 1 "main_bytewax:run(5000000, 1, 1)"  9.47s user 0.03s system 99% cpu 9.592 total

1 process, 1 workers, 25M of elements, 1 element batch size

✗ time python -m bytewax.run -w 1 "main_bytewax:run(2500000, 1, 1)"
Created source for input: 8048286784
Created sink for output: 8048286784
Created part for output: [8048286784, worker: 0, num_workers: 1]
Created part:0:1 for input: [thread: 8048286784, part_id: 0, parts: 1, part_records: 2500000]
python -m bytewax.run -w 1 "main_bytewax:run(2500000, 1, 1)"  4.69s user 0.02s system 99% cpu 4.752 total

So it seems there is some bug in bytewax internals and it does not scale with the number of workers per single process, as results of the test above show something pretty strange and unexpected:

processing 10M of elements by 1 element with 1 thread took 18.5s 2 threads - 38.2s (expected result: 9.5s) 4 threads - 52.7s (expected result: 4.7s)

processing 100M of elements by 10K batches with 1 thread took 14.3s 2 threads - 14.3s (expected result: 7.2s) 4 threads - 14.4s (expected result: 3.6s)

Psykopear commented 8 months ago

Hi, that's some good observation, and in the specific case you posted, it's also the expected behavior.

Bytewax can parallelize your dataflow, but multiple python threads in the same process will still share the same GIL.

There are a couple of things that I'd correct on the input source as well, but they won't change the result that much. But just as an advice, if you want to create a number of partitions that matches the dynamic number of workers, you should probably use DynamicSource as a base class. Something like this:


class TestingPartition(StatelessSourcePartition):
    def __init__(self, it, batch_size):
        self.it = it
        self.batch_size = batch_size

    def next_batch(self):
        return [next(self.it) for i in range(self.batch_size)]

class TestingSource(DynamicSource):
    def __init__(self, num_records, batch_size = 1):
        print(f"Created source for input: {threading.get_ident()}")
        self._num_records = num_records
        self._batch_size = batch_size

    def build(self, worker_index, worker_count):
        records_per_part = self._num_records // worker_count
        print(f"Created part for input: [thread: {threading.get_ident()},worker_index: {worker_index}, part_records: {records_per_part}]")
        return TestingPartition(iter(range(records_per_part)), self._batch_size)

The point is that if each partition runs on a different thread that uses the same python processes, the next_batch calls won't be able to run concurrently, because they will be fighting over trying to acquire the same lock.

Now, if instead of doing that you were doing any operation that releases the GIL while ongoing, like making a web request, you would see a real increase in performance. Try something like this:

import bytewax.operators as op
import requests
from bytewax.dataflow import Dataflow
from bytewax.inputs import DynamicSource, StatelessSourcePartition
from bytewax.outputs import DynamicSink, StatelessSinkPartition

class _NullSink(StatelessSinkPartition):
    def write_batch(self, items):
        pass

class NullOutput(DynamicSink):
    def build(self, worker_index, worker_count):
        return _NullSink()

class TestingPartition(StatelessSourcePartition):
    def __init__(self, num):
        self.num = num
        self.counter = 0

    def next_batch(self):
        self.counter += 1
        if self.counter > self.num:
            raise StopIteration()
        # Add your url here, beware of the number of calls you make
        return [requests.get("<an-url>")]

class TestingSource(DynamicSource):
    def __init__(self, num_records):
        self._num_records = num_records

    def build(self, worker_index, worker_count):
        records_per_part = self._num_records // worker_count
        return TestingPartition(records_per_part)

def run(records=10):
    flow = Dataflow('dataflow')
    stream = op.input('in', flow, TestingSource(records))
    op.output('out', stream, NullOutput())
    return flow

Running it with a single worker I get ~3s, running with 8 workers it takes ~0.8s and with 16 workers only ~0.1s.

But as you noticed, not all work can run free of the GIL for most of the time. When that's the case, you should use bytewax's ability to run separate processes instead of multiple workers in the same process. We have a testing runner to try this easily, but follow the guide in the documentation once you want a production setup.

Running multiple processes can face an initial delay, while all workers try to connect to each other. So a short running task might take more time than running a single process, but you can measure the improvement by measuring for example the time it takes between the first and last message processed. Let's try to modify your code to show that:

import time

import bytewax.operators as op
from bytewax.dataflow import Dataflow
from bytewax.inputs import DynamicSource, StatelessSourcePartition
from bytewax.outputs import DynamicSink, StatelessSinkPartition

class _NullSink(StatelessSinkPartition):
    def write_batch(self, items):
        pass

class NullOutput(DynamicSink):
    def __init__(self, **config):
        self._config = config

    def build(self, worker_index, worker_count):
        return _NullSink()

class TestingPartition(StatelessSourcePartition):
    def __init__(self, it, batch_size):
        self.it = it
        self.batch_size = batch_size
        self.start = None

    def next_batch(self):
        # First time this is called we set self.start
        if self.start is None:
            self.start = time.time()
        # If `next(self.it)` raises `StopIteration`, it
        # means we consumed our iterator. Print elapsed
        # time and reraise so the dataflow can stop.
        try:
            return [next(self.it) for i in range(self.batch_size)]
        except StopIteration as e:
            print(f"Elapsed: {time.time() - self.start:.2f}s")
            raise e

class TestingSource(DynamicSource):
    def __init__(self, num_records, batch_size = 1):
        self._num_records = num_records
        self._batch_size = batch_size

    def build(self, worker_index, worker_count):
        records_per_part = self._num_records // worker_count
        return TestingPartition(iter(range(records_per_part)), self._batch_size)

def run(records=1000000, batch=1):
    flow = Dataflow('dataflow')
    stream = op.input('in', flow, TestingSource(records, batch))
    op.output('out', stream, NullOutput())
    return flow

And now try to run it.

This is the baseline, running with a single worker and single process, 100M records, 10k batch:

❯ time python -m bytewax.run "examples.scale:run(records=100_000_000, batch=10_000)"
Worker 0: 3.21s

________________________________________________________
Executed in    3.28 secs    fish           external
   usr time    3.26 secs    0.00 millis    3.26 secs
   sys time    0.02 secs    4.86 millis    0.02 secs

And here is running it with 8 workers. You can see that some workers manage to finish early, but overall the total time spent is very similar:

❯ time python -m bytewax.run "examples.scale:run(records=100_000_000, batch=10000)" -w8
Worker 7: 1.53s
Worker 5: 1.87s
Worker 2: 2.44s
Worker 0: 2.29s
Worker 3: 2.93s
Worker 6: 2.95s
Worker 1: 3.18s
Worker 4: 3.39s

________________________________________________________
Executed in    3.47 secs    fish           external
   usr time    3.39 secs  263.00 micros    3.39 secs
   sys time    0.23 secs  246.00 micros    0.23 secs

Finally, try the testing runner with 1 worker per process, but 8 separate processes:

❯ time python -m bytewax.testing "examples.scale:run(records=100_000_000, batch=10_000)" -w1 -p8
Worker 0: 0.54s
Worker 6: 0.55s
Worker 3: 0.55s
Worker 1: 0.55s
Worker 2: 0.56s
Worker 7: 0.58s
Worker 5: 0.59s
Worker 4: 0.60s

________________________________________________________
Executed in  798.50 millis    fish           external
   usr time    5.19 secs    568.00 micros    5.19 secs
   sys time    0.35 secs    530.00 micros    0.35 secs

This last one was a lucky run, because all workers could connect immediately to each other, in other cases you might see some delay introduced by this:

❯ time python -m bytewax.testing "examples.scale:run(records=100_000_000, batch=10_000)" -w1 -p8
worker 4:   error connecting to worker 0: Connection refused (os error 111); retrying
worker 3:   error connecting to worker 0: Connection refused (os error 111); retrying
worker 5:   error connecting to worker 0: Connection refused (os error 111); retrying
worker 2:   error connecting to worker 0: Connection refused (os error 111); retrying
worker 1:   error connecting to worker 0: Connection refused (os error 111); retrying
worker 7:   error connecting to worker 6: Connection refused (os error 111); retrying
Worker 0: 0.53s
Worker 1: 0.54s
Worker 6: 0.56s
Worker 4: 0.56s
Worker 2: 0.57s
Worker 5: 0.57s
Worker 7: 0.58s
Worker 3: 0.61s

________________________________________________________
Executed in    1.80 secs    fish           external
   usr time    5.19 secs    0.00 micros    5.19 secs
   sys time    0.39 secs  716.00 micros    0.39 secs

But you can see that the running time of each worker is consistently lower than in the original one.

I hope this answer your doubts, but let me know if you want me to expand on anything here.

szhem commented 8 months ago

Hi @Psykopear!

Thanks a lot! Now it is more clearer.

Psykopear commented 8 months ago

Happy to know it was useful, I'll close this issue then, but feel free to comment here if you need anything else