apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.23k stars 3.47k forks source link

[FlightRPC] Async API overhaul (tracking issue) #34607

Open lidavidm opened 1 year ago

lidavidm commented 1 year ago

Describe the enhancement requested

Flight is mostly based around synchronous APIs. This is convenient, but doesn't scale as well. There have been various issues filed and ML questions asked over the years about asynchronous APIs. We should consider adding a set of asynchronous APIs (and hopefully to reduce maintenance, rewriting the synchronous APIs as a layer on top of the asynchronous ones). This will also fit better with recommended usage of gRPC (in Java) and UCX (in C++).

Also, this would be a chance to tackle some API improvements:

Related issues:

Implementation

Component(s)

FlightRPC

pvardanis commented 6 months ago

@lidavidm I'm wondering if I can use asyncio to run concurrent requests for do_exchange in an arrow flight rpc server. From the docs I don't see any reference around it, I assume it's not supported yet?

lidavidm commented 6 months ago

It's not. I think this ticket should be considered cancelled, fwiw

pvardanis commented 6 months ago

@lidavidm so there are no plans in the short term to support this I guess?

lidavidm commented 6 months ago

No.

My preference going forward would be to expose the Arrow Flight serializer (which is where all the performance improvements happen anyways) and let you directly use grpcio to make calls (so all the caveats listed in the issue above would automatically fall away).

lidavidm commented 6 months ago

But I'm not sure if I can convince the other maintainers.

pvardanis commented 6 months ago

That's unfortunate... please keep us updated if anything changes

pitrou commented 6 months ago

My preference going forward would be to expose the Arrow Flight serializer (which is where all the performance improvements happen anyways) and let you directly use grpcio to make calls (so all the caveats listed in the issue above would automatically fall away).

You mean grpcio the Python library?

pvardanis commented 6 months ago

@lidavidm from this thread: https://www.mail-archive.com/user@arrow.apache.org/msg01414.html I understand that concurrency is handled by gRPC itself and not by Arrow Flight. Could you give some more insights on how Arrow Flight handles multiple requests coming simultaneously from external users?

lidavidm commented 6 months ago

You mean grpcio the Python library?

Yes, that would naturally resolve all problems about asyncio support, fork support, middleware/interceptors that actually work, etc.

Could you give some more insights on how Arrow Flight handles multiple requests coming simultaneously from external users?

It's handled by gRPC, which uses a thread pool internally. In Python there is the GIL so there is no true concurrency. That's about all there is to say unless you have a specific question.

pitrou commented 6 months ago

I'm wondering if I can use asyncio to run concurrent requests for do_exchange in an arrow flight rpc server. From the docs I don't see any reference around it, I assume it's not supported yet?

Note: if you do not need any streaming results, you can trivially do so by executing each Flight request in an external thread pool using https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools

If you do need to async-iterate over Flight streaming results, though, then I'm afraid there is no easy and performant way to do this.

pvardanis commented 6 months ago

This is what I'm trying to do.

I have this function that runs a request using a FlightClient:

def send_request(client, table, descriptor):
    writer, reader = client.do_exchange(descriptor)
    writer.begin(table.schema)
    for batch in table.to_batches():
        writer.write_batch(batch)
        writer.done_writing()

    return reader.read_chunk().data.to_pydict()

then running concurrent requests using:

with ThreadPoolExecutor() as executor:
        futures = [executor.submit(send_request, client) for client in clients]
        # Wait for all futures to complete
        concurrent.futures.wait(futures)
        # Retrieve results from the completed futures
        results = [future.result() for future in futures]
        # executor.map(synchronous, clients)

for e.g. 10 clients. The code on the server does run, it's just that I'm seeing the RPC is called sequentially resulting in the same time execution as by just running:

for _ in range(10):
    send_request(client, table, descriptor)

I would expect the former to run in parallel and execute faster than the latter. Am I getting something wrong here?

lidavidm commented 6 months ago

Python has the GIL

pitrou commented 6 months ago

This is what I'm trying to do.

Well, no, it is not. Your snippet does not use asyncio. Please take a look at the example below: https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools

@lidavidm

Python has the GIL

I don't understand what the GIL has to do with this...

lidavidm commented 6 months ago

10 concurrent requests will get serialized on the server due to the GIL

lidavidm commented 6 months ago

(Also, without seeing the server itself, it's possible the request isn't long enough for concurrency to matter anyways. We might just be measuring request/thread overhead.)

pitrou commented 6 months ago

I don't know who "we" is here. Is there some context I'm lacking?

lidavidm commented 6 months ago

"We" being the poster.

pitrou commented 6 months ago

Well, I do not see any measurements, so I wouldn't conclude anything here.

To focus a bit: 1) @pvardanis is trying to running concurrent client requests (see https://github.com/apache/arrow/issues/34607#issuecomment-1966635680) 2) they want to do so from a Python async function (presumably), hence using asyncio 3) asyncio allows delegating blocking requests to worker threads so as to wait on their results asynchronously (see https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools) 4) the GIL does not prevent IO parallelism in Python, since any well-behaved IO function would release the GIL (implicitly or explicitly) before diving into IO

lidavidm commented 6 months ago

If you make 10 concurrent requests to a Python service, the service can only handle one at a time due to the GIL. So the concurrency will not get you any wall-clock-time speedup. @pvardanis notes that they got the same time whether they used a thread pool or ran the requests sequentially. That is the question I am answering. They've simply hijacked this thread to ask whether asyncio would make a difference: I contend it would not.

lidavidm commented 6 months ago

(Of course, maybe the service does some I/O or something in the background. Then the GIL would matter less. But again, we don't have the service they tested, so I'm answering the most obvious scenario of what happened.)

pitrou commented 6 months ago

Well, if one assumes that the GIL gets in the way, then asyncio won't change anything.

If, on the other hand, the GIL doesn't get in the way and the server only sees requests serially, then either 1) there is something very suboptimal on our Flight server implementation 2) the client itself is emitting requests serially.

pvardanis commented 6 months ago

@lidavidm I've also tried with asyncio:

async def send_concurrent_request(client, table, descriptor):
    try:
        writer, reader = client.do_exchange(descriptor)
        writer.begin(table.schema)
        for batch in table.to_batches():
            writer.write_batch(batch)
            writer.done_writing()

        result = reader.read_chunk().data.to_pydict()
        return result
    except StopIteration as e:
        # print(f"Error: {e}")
        return None

async def gather_concurrent_requests(clients, table, descriptor):
    tasks = []
    for client in clients:
        tasks.append(send_concurrent_request(client, table, descriptor))
    return await asyncio.gather(*tasks, return_exceptions=True)

async def run_concurrent(clients):
    start = time.time()
    results = await gather_concurrent_requests(clients, table, descriptor)
    elapsed = time.time() - start
    print(f"Elapsed: {elapsed}")
    print(results)

clients = [flight.FlightClient("grpc://localhost:8080") for _ in range(2)]
asyncio.run(run_concurrent(clients))

and I get the same performance. Also, on the server side I've put time.sleep(20) for testing purposes.

lidavidm commented 6 months ago

You'll have to profile or otherwise debug then. It's possible we missed a place where we need to release the GIL on the client when calling into native code.

pvardanis commented 6 months ago

Ok posting actual times here so we're all on the same page, thanks for the follow-up in any case.

Summary

I have a do_exchange endpoint on my flight server with a 20s sleep on purpose just for benchmarking purposes. I'm sending 10 requests in two ways and comparing:

I've set the GRPC_VERBOSITY=DEBUG and after the server is initialized I see the following:

D0227 15:58:33.382416000 7916605440 config.cc:113]                     gRPC EXPERIMENT tcp_frame_size_tuning               OFF (default:OFF)
D0227 15:58:33.382428000 7916605440 config.cc:113]                     gRPC EXPERIMENT tcp_read_chunks                     OFF (default:OFF)
D0227 15:58:33.382430000 7916605440 config.cc:113]                     gRPC EXPERIMENT tcp_rcv_lowat                       OFF (default:OFF)
D0227 15:58:33.382433000 7916605440 config.cc:113]                     gRPC EXPERIMENT peer_state_based_framing            OFF (default:OFF)
D0227 15:58:33.386899000 7916605440 config.cc:113]                     gRPC EXPERIMENT flow_control_fixes                  OFF (default:OFF)
D0227 15:58:33.386981000 7916605440 config.cc:113]                     gRPC EXPERIMENT memory_pressure_controller          OFF (default:OFF)
D0227 15:58:33.386989000 7916605440 config.cc:113]                     gRPC EXPERIMENT periodic_resource_quota_reclamation OFF (default:OFF)
D0227 15:58:33.386995000 7916605440 config.cc:113]                     gRPC EXPERIMENT unconstrained_max_quota_buffer_size OFF (default:OFF)
D0227 15:58:33.387001000 7916605440 config.cc:113]                     gRPC EXPERIMENT new_hpack_huffman_decoder           OFF (default:OFF)
D0227 15:58:33.387006000 7916605440 config.cc:113]                     gRPC EXPERIMENT event_engine_client                 OFF (default:OFF)
D0227 15:58:33.387147000 7916605440 ev_posix.cc:141]                   Using polling engine: poll
D0227 15:58:33.387170000 7916605440 dns_resolver_ares.cc:831]          Using ares dns resolver
I0227 15:58:33.390538000 7916605440 server_builder.cc:348]             Synchronous server. Num CQs: 1, Min pollers: 1, Max Pollers: 2, CQ timeout (msec): 10000
D0227 15:58:33.390578000 7916605440 lb_policy_registry.cc:45]          registering LB policy factory for "priority_experimental"
D0227 15:58:33.390601000 7916605440 lb_policy_registry.cc:45]          registering LB policy factory for "weighted_target_experimental"
D0227 15:58:33.390610000 7916605440 lb_policy_registry.cc:45]          registering LB policy factory for "pick_first"
D0227 15:58:33.390616000 7916605440 lb_policy_registry.cc:45]          registering LB policy factory for "round_robin"
D0227 15:58:33.390622000 7916605440 lb_policy_registry.cc:45]          registering LB policy factory for "ring_hash_experimental"
D0227 15:58:33.390643000 7916605440 lb_policy_registry.cc:45]          registering LB policy factory for "grpclb"
D0227 15:58:33.390674000 7916605440 lb_policy_registry.cc:45]          registering LB policy factory for "rls_experimental"
D0227 15:58:33.394581000 7916605440 lb_policy_registry.cc:45]          registering LB policy factory for "xds_cluster_manager_experimental"
D0227 15:58:33.394607000 7916605440 lb_policy_registry.cc:45]          registering LB policy factory for "xds_cluster_impl_experimental"
D0227 15:58:33.394619000 7916605440 lb_policy_registry.cc:45]          registering LB policy factory for "cds_experimental"
D0227 15:58:33.394635000 7916605440 lb_policy_registry.cc:45]          registering LB policy factory for "xds_cluster_resolver_experimental"
D0227 15:58:33.394661000 7916605440 certificate_provider_registry.cc:35] registering certificate provider factory for "file_watcher"

You can also let me know of other env vars to set to debug even further, I'd be happy to do that.

pitrou commented 6 months ago

@pvardanis Your asyncio snippet is not right. Again, please see https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools for how to offload blocking requests using asyncio.

pvardanis commented 6 months ago

@pitrou Following your suggestion I modified my code as follows:

async def run_concurrent(client, num_requests):
    loop = asyncio.get_running_loop()
    tasks = []
    start = time.time()
    for _ in range(num_requests):
        func = partial(send_concurrent_request, client, table, descriptor)
        with concurrent.futures.ThreadPoolExecutor() as pool:
            tasks.append(loop.run_in_executor(pool, func))

    await asyncio.gather(*tasks)
    elapsed = time.time() - start
    print(f"Elapsed: {elapsed}")

if __name__ == "__main__":
    client = flight.FlightClient("grpc://localhost:8080")
    asyncio.run(run_concurrent(client, num_requests=2))

Testing for 2 requests concurrently still gives me the same as sequential. Sorry if I'm missing something again, happy to revisit.

pvardanis commented 6 months ago

Also, is this expected when the server spins up:

I0227 15:58:33.390538000 7916605440 server_builder.cc:348]             Synchronous server. Num CQs: 1, Min pollers: 1, Max Pollers: 2, CQ timeout (msec): 10000
pitrou commented 6 months ago

Well, with pool will wait for the all the pool's tasks to finish, so this is probably not right. Perhaps try:

flight_executor = concurrent.futures.ThreadPoolExecutor(max_workers=16)

async def run_concurrent(client, num_requests):
    loop = asyncio.get_running_loop()
    tasks = []
    start = time.time()
    for _ in range(num_requests):
        func = partial(send_concurrent_request, client, table, descriptor)
        tasks.append(loop.run_in_executor(flight_executor, func))

    await asyncio.gather(*tasks)
    ...
pvardanis commented 6 months ago

@pitrou still that makes no difference, I even tried for 10 requests and getting the same with a sequential call. Is there anything else I could do to help you both figure out what's the issue?

Here's my do_exchange() implementation:

def do_exchange(
        self,
        context: flight.ServerCallContext,
        descriptor: flight.FlightDescriptor,
        reader: flight.FlightStreamReader,
        writer: flight.FlightStreamWriter,
    ) -> None:
        """This method implements the `do_exchange` method of the FlightServerBase
        class.

        :param context: A ServerCallContext object.
        :param descriptor: A FlightDescriptor object.
        :param reader: A FlightStreamReader object.
        :param writer: A FlightStreamWriter object.
        """
        is_first_batch = True
        while True:
            logger.info("Processing data...")
            (
                writer,
                reader,
                is_first_batch,
            ) = self._foo(writer, reader, is_first_batch)
            logger.info("Output data ready to be consumed.")

def _foo(
        self,
        writer: flight.FlightStreamWriter,
        reader: flight.FlightStreamReader,
        is_first_batch: bool,
    ) -> Tuple[flight.FlightStreamWriter, flight.FlightStreamReader, bool]:
        logger.debug("Starting batch processing...")
        for batch in reader.read_chunk():
            if batch is None:
                break
            writer, is_first_batch = self._bar(  # type: ignore[no-redef]
                batch, writer, is_first_batch
            )
        logger.debug("Batch processing finished.")

        writer.close()

        return (writer, reader, is_first_batch)

def _bar(
        self,
        batch: pa.RecordBatch,
        writer: flight.MetadataRecordBatchWriter,
        is_first_batch: bool,
    ) -> Tuple[flight.FlightStreamWriter, bool]:
        result = self._run_inference_for_batch(batch)
        return self._write_result(writer, result, is_first_batch)  # type: ignore

Can't provide more code unfortunately for confidentiality reasons.

UPDATE: I've realized I've been using a threading.Lock() for a method inside the do exchange, after removing it that seems to process everything in parallel. Will double check with more benchmarks and let you know.

pitrou commented 6 months ago

Could you tell us roughly what _run_inference_for_batch is doing, or at least what kind of libraries it spends most of its time in?

pitrou commented 6 months ago

UPDATE: I've realized I've been using a threading.Lock() for a method inside the do exchange, after removing it that seems to process everything in parallel. Will double check with more benchmarks and let you know.

Ha, ok. I'm glad that you've found the culprit :-)

lidavidm commented 6 months ago

That would explain it, thanks for the followup.

pvardanis commented 6 months ago

Thank you both. The server is used to serve ML models for inference, the lock is put there to make sure we avoid OOM issues, so I'll have to find a way to get advantage of async functionality but at the same time not overdo it with resource allocation to prevent errors.

pvardanis commented 6 months ago

I have one last question: Assuming no lock is used do you know what is the configuration that bounds the number of requests accessing do_exchange simultaneously? Or there isn't any?

lidavidm commented 6 months ago

Flight does not impose any bound. I don't believe gRPC does either (or is capable of doing so), unless you use gRPC yourself and manage the completion queue directly.

pitrou commented 6 months ago

I suppose we could expose gRPC options: https://grpc.github.io/grpc/cpp/classgrpc_1_1_server_builder.html

pvardanis commented 6 months ago

So there's a high chance of getting OOM issues when a lock is not present, especially in the case of serving ML models

lidavidm commented 6 months ago

That's simply the tradeoff you have to make. Flight doesn't and isn't meant to monitor your memory usage for you.

lidavidm commented 6 months ago

You are better off accepting the request, checking available memory, and deciding whether to continue, wait, or reject the request in the application itself instead of trying to cobble together options in gRPC that aren't meant for that in the first place.

pvardanis commented 6 months ago

Right, thanks for the input everyone!