influxdata / influxdb-client-python

InfluxDB 2.0 python client
https://influxdb-client.readthedocs.io/en/stable/
MIT License
724 stars 187 forks source link

WriteApi should use ThreadPoolScheduler for batching #561

Closed goznauk closed 11 months ago

goznauk commented 1 year ago

When creating WriteApi instance, window_with_time_or_count is called, which is usingTimeoutScheduler for default. This creates new thread for every flush interval.


    def __init__(self,
                 influxdb_client,
                 write_options: WriteOptions = WriteOptions(),
                 point_settings: PointSettings = PointSettings(),
                 **kwargs) -> None:

        ...

        if self._write_options.write_type is WriteType.batching:
            # Define Subject that listen incoming data and produces writes into InfluxDB
            self._subject = Subject()

            self._disposable = self._subject.pipe(
                # Split incoming data to windows by batch_size or flush_interval
                ops.window_with_time_or_count(count=write_options.batch_size,
                                              timespan=timedelta(milliseconds=write_options.flush_interval)),
                ...

Instead of TimeoutScheduler creating millions of threads, we can just use ThreadPoolScheduler(1) for handling window. TimeoutScheduler might not make sense to use for python. It can easily changed by just a line of code.

   ops.window_with_time_or_count(count=write_options.batch_size,
                                              timespan=timedelta(milliseconds=write_options.flush_interval), scheduler=ThreadPoolScheduler(1)),
goznauk commented 1 year ago

Quick Fix

# noinspection PyProtectedMember
def monkey_patch_no_timeout_scheduler(write_api):
    # https://github.com/influxdata/influxdb-client-python/pull/562
    if write_api._write_options.write_type is WriteType.batching:
        write_api._subject.on_completed()
        # Define Subject that listen incoming data and produces writes into InfluxDB
        write_api._subject = Subject()
        write_api._disposable = write_api._subject.pipe(
            # Split incoming data to windows by batch_size or flush_interval
            ops.window_with_time_or_count(count=write_api._write_options.batch_size,
                                          timespan=timedelta(milliseconds=write_api._write_options.flush_interval),
                                          scheduler=ThreadPoolScheduler(1)),
            # Map  window into groups defined by 'organization', 'bucket' and 'precision'
            ops.flat_map(lambda window: window.pipe(
                # Group window by 'organization', 'bucket' and 'precision'
                ops.group_by(lambda batch_item: batch_item.key),
                # Create batch (concatenation line protocols by \n)
                ops.map(lambda group: group.pipe(
                    ops.to_iterable(),
                    ops.map(lambda xs: _BatchItem(key=group.key, data=_body_reduce(xs), size=len(xs)))
                )),
                ops.merge_all()
            )),
            # Write data into InfluxDB (possibility to retry if its fail)
            ops.filter(lambda batch: batch.size > 0),
            ops.map(mapper=lambda batch: write_api._to_response(data=batch, delay=write_api._jitter_delay())),
            ops.merge_all()
        ).subscribe(write_api._on_next, write_api._on_error, write_api._on_complete)

write_api = client.write_api(write_options=WriteOptions(...))
monkey_patch_no_timeout_scheduler(write_api)