awslabs / aws-athena-query-federation

The Amazon Athena Query Federation SDK allows you to customize Amazon Athena with your own data sources and code.
Apache License 2.0
557 stars 293 forks source link

[FEATURE] Parallel row processing in UserDefinedFunctionHandler #377

Open lmichelbacher opened 3 years ago

lmichelbacher commented 3 years ago

Is your feature request related to a problem? If yes, please describe.

I've prototyped a UDF-based solution for custom permissions applied in Athena queries. The source of truth for permissions is a REST API.

I found that iterative batch processing in UDFs is a performance bottleneck for our use case.

We're thinking about not using UDF at all because we can't get the performance we need to make customers happy even though from a technical perspective, a UDF-based solution has many advantages for us.

The way the handler processes rows currently is sequentially (processRows:177):

            for (int rowNum = 0; rowNum < rowCount; ++rowNum) {
                outputRowWriter.writeRow(outputRecords, rowNum, rowNum);
            }

Queries involving hundreds of thousands of rows take too long because of sequential processing.

Describe the solution you'd like

I'd like to be able to optionally set parallelism for row processing. Since Lambdas can have (currently) between one and six cores, fine-grained controls over the cores would be helpful.

A simple MAX setting would also be a start but I'm concerned since I'm not clear on what impact processing speed has on subsequent batch sizes. There's a risk that processing will be too fast for its own good, triggering batch sizes that Lambda won't be able to handle in 15 minutes anymore.

Describe alternatives you've considered

The only alternative I have is to optimize the service that is called by the UDF but that can only go so far and it won't get us where our solution needs to be to finish queries in acceptable times for customers.

rstrahan commented 3 years ago

+1 on the feature request for row processing parallelism.

I had a similar issue when implementing the TextAnalyticsUDFHandler . I ended up implementing an override for processRows() see TextAnalyticsUDFHandler.java#L943. Would a similar (tactical) approach work for you?

avirtuos commented 3 years ago

can you share more info about the performance you are seeing vs. your expectation? What size Lambda function are you using and how much work are you doing for each row?

Also, i believe we do call your UDF in parallel (not serially) so you are not bounded by the throughput of a single invocation.

lmichelbacher commented 3 years ago

At the top level, we're seeing throughput of about 1,700 rows per minute but we'd like to see at least twice that number.

There are multiple factors that determine the throughput:

  1. Latency of the service we're calling for each row (an internal service)
  2. Batching vs individual calls
  3. Are calls parallelized in the UDF Lambda?
  4. Batch size
  5. Lambda invocation pattern (determined by Athena engine)

Regarding 1. and 2., we can work on this internally. There's a limit on what we can squeeze out in terms of service latency which is currently about 100ms P99 per individual call.

I'm going to switch to batch calls (without parallelizing anything) like in @rstrahan's example (thanks for that!) and compare performance. Batching is currently the lowest-hanging fruit in my mind.

Regarding 3., calls from processRows are not currently parallelized in the standard implementation.

Regarding 4. and 5., in a test query with 400K rows, we saw the following batch sizes and invocation pattern. Maximum concurrent executions were three.

| start of lambda invocation | batch size |
|----------------------------+------------|
| 09:34:14                   |        255 |
| 09:34:41                   |        256 |
| 09:35:06                   |        512 |
| 09:35:49                   |       1024 |
| 09:37:16                   |       2048 |
| 09:40:05                   |       4096 |
| 09:45:26                   |       8192 |
| 09:55:23                   |       8192 |

The query (not the Lambda) timed out after 30 minutes. I know query timeout is a soft limit. I'm keen to optimize UDF integration instead of making customers wait longer for results though.

In our experiments, we maxed out at 8,192. I'm not sure if this is a hard-limit. Even if it is, I don't see it documented anywhere so I would assume it's subject to change without warning. We're interested in a predictable upper limit for batch size. Without a limit on batch size, Lambdas could time out no matter how efficient they are at some point.

lmichelbacher commented 3 years ago

To summarize my questions

  1. Is there a predictable maximum batch size?
  2. Can we specify a target for the (maximum) number of concurrent Lambda executions?
    • We're currently seeing a maximum of 3.
dmarkey commented 1 year ago

I'm also interested in controlling concurrency of the lambda invocations. I'm seeing it top out at about 6 concurrent instances for a million rows... We would like a lot more as there are many rows and our processing task is heavy.

mignaulo commented 1 year ago

We also tested a UDF making a call to a rest API and the performance was inadequate. Being able to parallelize both inside the lambda and increase how many lambdas would be called concurrently would be valuable.