Azure / azure-sdk-for-net

This repository is for active development of the Azure SDK for .NET. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/dotnet/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-net.
MIT License
5.47k stars 4.8k forks source link

[QUERY]Eventhub trigger function maxBatchSize clarification #28634

Closed dxynnez closed 2 years ago

dxynnez commented 2 years ago

Library name and version

Microsoft.Azure.WebJobs.Extensions.EventHubs 5.0.0

Query/Question

Hi team,

We have a question regarding the maxEventBatchSize - Is it guaranteed that batches would be processed in serial?

Basically we want to know on a single function worker instance, would the worker start multiple threads to process multiple batches at the same time? We don't need to enforce any order on how the events are processed, but for us processing each event consumes a huge amount of memory so a high degree of parallelism on a single instance could cause memory spike which is something we are trying to avoid (we are on dedicated plan).

Base on my understanding, each event batch that got passed to each function invocation would not exceed the maxBatchSize, but what if we have less function worker instances than the eventhub partition? In that case each worker would be in charge of more than one eventhub partitions. Would function host spin up multiple threads to handle all the partitions at the same time and hence results in more than the maxEventBatchSize events to be processed at the same time? Any input would be greatly appreciated!

Environment

No response

jsquire commented 2 years ago

Hi @dxynnez. Thank you for reaching out. The maxEventBatchSize is the maximum number of events that will be passed to your Function for a single invocation. Your function may be invoked concurrently - either on a single instance or across multiple instances.

To address your questions explicitly:

Is it guaranteed that batches would be processed in serial?

No; assuming that events exist in all partitions, you would have at least one active invocation of your Function per partition.

In that case each worker would be in charge of more than one eventhub partitions. Would function host spin up multiple threads to handle all the partitions at the same time and hence results in more than the maxEventBatchSize events to be processed at the same time?

Yes; the processor that is used by the trigger to read from Event Hubs will ensure that all partitions are processed. If there are multiple instances available, they'll share the work evenly between them.

When can more invocations than than the number of partitions occur?

In the scenario where instances are scaling up or down, ownership of a partition may change while an invocation is in progress. When this happens, the new owner will rewind to the most recent checkpoint. During this time, it is possible that there will be two active invocations for the same partition. Once the previous owner completes the inovocation that was in-progresss, it will be notified of the ownership change and the new owner has full control.

jsquire commented 2 years ago

I'm going to mark this as addressed, but please feel free to unresolve if you'd like to continue the discussion.

ghost commented 2 years ago

Hi @dxynnez. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text “/unresolve” to remove the “issue-addressed” label and continue the conversation.

dxynnez commented 2 years ago

Thanks @jsquire ! That explained a few weird behavior (memory spikes) we were seeing when there was a spike of the unprocessed events.

We are on a dedicated plan and hence auto-scaling isn't something that we really expect to happen for reasons: 1) With our own dedicated ASE, the scaling usually takes a long time (~30min) 2) The number of workers can handle the events volume for most of the time, the spike of events happen rarely

It's totally fine that there could be more invocations than the number of partitions - I take it as a different form of 'at least once delivery' and our system is already idempotent.

The maxEventBatchSize doesn't sound like something we could use to rate limit the number of events being processed on a single worker concurrently. Is there anything else might help with our use case?

Also, how does the function host know whether there are enough resource to invoke another function? We have seen cases where the memory reached 80% and eventually the worker crashed - we host multiple apps on the same worker (Same ASP to be more specific), we surely don't want a temporary noisy neighbor causes all other apps to crash (there seems to be no memory limit for each app's process as we have seen one app taking over more than 50% available memory of a worker). I know this might be a better question for the azure function team, but maybe you could shed some lights here?

Thanks again!

dxynnez commented 2 years ago

/unresolve

jsquire commented 2 years ago

One interesting property of the processor used by the trigger is that it will wait for the Function code to complete before dispatching another batch for that partition. In theory, it would be possible to use a global mutex in the Function to ensure that a single execution was active on a given host. That said, you'd be potentially running up against the Function execution timeout and may lead to dropping events if it takes too long for an invocation to acquire the mutex. That wouldn't prevent concurrent invocations on other hosts.

Unfortunately, there's not much that I can offer on the scaling questions. What you're asking is specific to the Functions host and not the Event Hubs bindings, so my insight is mostly limited to what you'll find in the documentation. In this case, using the Scaling guidance. As you mentioned, opening an issue in the Aure Functions repository or creating an Azure support request may help find better assistance for controlling scale.

dxynnez commented 2 years ago

One interesting property of the processor used by the trigger is that it will wait for the Function code to complete before dispatching another batch for that partition. In theory, it would be possible to use a global mutex in the Function to ensure that a single execution was active on a given host. That said, you'd be potentially running up against the Function execution timeout and may lead to dropping events if it takes too long for an invocation to acquire the mutex. That wouldn't prevent concurrent invocations on other hosts.

Yeah I thought about that but it all comes down to how the function host decides whether there are enough resource to invoke another function. Assuming there are millions of unprocessed events and their partitions are all locked by a single worker and the host decides to invoke multiple functions to process them in parallel, then those events would just sit in memory and cannot be GC'ed - as you mentioned already, we might end up with multiple long running threads which might cause more harm than good in that case.

Just to clarify that we are ok with concurrent invocations by different hosts. The only case that we don't want parallel processing is when it's on the same worker - as again since there is no way to limit the degree of parallelism on a single worker we are worried that in some extreme cases, a spike of events would just crash the worker.

I will talk to the azure function team to see what the expectation should be as far as function host's concerned. Still I have some questions hope you can help to clarify:

  1. Are all sort of host.json configurations listed HERE (e.g. maxEventBatchSize, batchCheckpointFrequency, prefetchCount etc.) only apply to a single partition?

  2. On a single instance, the event processor would fetch events in the partitions that it's in charge of desperately (meaning that potential unbounded size of events from multiple batches) and it's up to the function host to decide whether to trigger multiple function invocations to process those event batches in-parallel or not.

  3. Would it be possible for a batch of events to be processed by several function invocations? - e.g. Function host invokes the function twice to process a single batch concurrently, one in charge of part of the events. (This is unrelated to the original query but I am interested to know as it would affect the ordering of how the events in a batch are processed)

  4. Could a single batch that get passed to a single function invocation contains events from different partitions?

dxynnez commented 2 years ago

Hi @jsquire , is there anything you could share regarding my previous comment? Thanks!

jsquire commented 2 years ago

Are all sort of host.json configurations listed HERE (e.g. maxEventBatchSize, batchCheckpointFrequency, prefetchCount etc.) only apply to a single partition?

These apply to every partition, but the scope is a single partition. To illuatrate - maxEventBatchSize says "read a batch of 0 to this number of events for a given partition". That batch will consist of events only from one partition. However, that same setting is used for all partitions.

On a single instance, the event processor would fetch events in the partitions that it's in charge of desperately (meaning that potential unbounded size of events from multiple batches) and it's up to the function host to decide whether to trigger multiple function invocations to process those event batches in-parallel or not.

I'm not sure that I'm exactly following this one, but what I think that you're asking is "are events read in one processor and then dispatched to different function instances to process?" Assuming that's correct, the answer is no. Each instance of a Function spawns with its own event processor. That processor coordinates with the other active processors to share work and will take ownership of some number of partitions.

The processor in each Function instance will only read from its assigned partitions and will dispatch those to the associated Function instance only. All the work is local;F there's no distributed coordination between Function instances.

The scale controller's job is to say "these X instances are not keeping up with the work, let me deploy Y more to try and help." It has no awareness of events, partitions or any similar concept.

Would it be possible for a batch of events to be processed by several function invocations? - e.g. Function host invokes the function twice to process a single batch concurrently, one in charge of part of the events. (This is unrelated to the original query but I am interested to know as it would affect the ordering of how the events in a batch are processed)

No. The batch is read by the local processor and dispatched to the function. There's no way to have multiple Function instances own the same partition nor chop up batches.

Could a single batch that get passed to a single function invocation contains events from different partitions?

No. Every partition is treated as an isolated unit of work. The batch passed in was read from a single partition. Binding to the PartitionContext would give you the ability to understand which partition those events belong to, if that's interesting to your application.

dxynnez commented 2 years ago

Thanks @jsquire ! These are indeed very helpful information, although some of them are implementation details, but they are something that would affect how the eventhub trigger function should be implemented IMO.

Regarding

I'm not sure that I'm exactly following this one, but what I think that you're asking is "are events read in one processor and then dispatched to different function instances to process?" Assuming that's correct, the answer is no. Each instance of a Function spawns with its own event processor. That processor coordinates with the other active processors to share work and will take ownership of some number of partitions. The processor in each Function instance will only read from its assigned partitions and will dispatch those to the associated Function instance only. All the work is local;F there's no distributed coordination between Function instances.

If I understand you right, then basically you were saying that on a single worker, there could be multiple processors running in their own thread; each processor is in charge of its own partition and as long as there are unprocessed events in the partition, the events would be pumped to the processor. These processors make no effort in determine whether the events across multiple partitions could be processed or not. It's function host's job to make sure that there are enough resource to trigger the functions which is basically to invoke the callback (the function we wrote) to actually process the events and hence progress the checkpoint.

Sorry if the above question sounds dumb to you... I just don't have a clear understanding on how function works underneath.

jsquire commented 2 years ago

I just don't have a clear understanding on how function works underneath.

That makes two of us. 😄

I can speak to how the bindings interact with the Event Hubs client and the Functions runtime, but I don't have a clear understanding of how the host infrastructure works. Please keep that tucked in the back of your head.

If I understand you right, then basically you were saying that on a single worker, there could be multiple processors running in their own thread

That, I cannot answer definitively. I don't know how the Functions hosts spawn/manage instances, but I would expect that the physical placement isn't relevant to our discussion. The important part is that there's a logical instance of the function. That comes with its own instance of the Event Hubs bindings, which uses its own event processor to read from Event Hubs.

each processor is in charge of its own partition

That's not quite correct. The event processors configured to use the same Storage container and Event Hubs consumer group will coordinate with each other. Each event processor owns some number of partitions in the Event Hub; each partition is owned by one, and only one, processor. If there are fewer event processors running than there are partitions, the processors will own more than one partition. If there are more, some processors will own nothing. If there are exactly the same number of processors as partitions, each processor will own a single partition.

These processors make no effort in determine whether the events across multiple partitions could be processed or not.

The processor treats each partition that it owns as an isolated unit of work. There's a background task that reads from the partition, invokes the handler, and then waits for that handler call to complete. Then, it starts again.

This means that a processor will read from each partition that it owns, and each partition calls your handler without regard for what the others are doing. That's why your handler can be called concurrently, but each call will contain a batch from only one partition. Because the task waits for the handler to complete before it dispatches again for that partition, your handler will never be called concurrently for the same partition. Effectively, this means that if your processor owns a single partition, your handler will never be called concurrently.

For those times when you may have multiple partitions owned, that's where I suggested creating a global primitive to synchronize with. As a practical example for that scenario, let's assume that a processor owns partitions 0, 1, and 2. Let's also assume that we have a global mutex for that host machine to use as synchronization. Since each partition is running independently, the flow looks something like:

.... and that process continues.

There are a few considerations to be aware of:

Sorry if the above question sounds dumb to you... I just don't have a clear understanding on how function works underneath.

Not at all; there's a lot of magic black box taking place with three different constructs working together. There's complexity here. 😄

ghost commented 2 years ago

Hi, we're sending this friendly reminder because we haven't heard back from you in 7 days. We need more information about this issue to help address it. Please be sure to give us your input. If we don't hear back from you within 14 days of this comment the issue will be automatically closed. Thank you!

dxynnez commented 2 years ago

@jsquire Thank so much for the detailed walkthrough! Apologize for the late response - for some reasons I missed all the notifications.

Yes I think we are on the same page - for processor I was referring to what you described as 'handler'.

Since we are on a dedicated plan, physical placement does play a big role here as the function app are 'always-on' on all of our dedicated worker machines. We lost the ability for elastic scale and although we have auto-scaling (adding more workers), it's not something that's really going to help as auto-scaling normally takes ~30mins. That's also why we want to limit the max degree of parallelism on a single worker - if it's just a temporary burst of events, then chances are when it finally scales out, the events number already went back to normal and it would just scale back in.

Anyway I already submitted a question to the azure function host github and am waiting for their response.

Thank you so much again for being so patient on explaining all the details! That really helps me in understanding how eventhub trigger function works!