Azure / azure-sdk-for-python

This repository is for active development of the Azure SDK for Python. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/python/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-python.
MIT License
4.56k stars 2.78k forks source link

Async `EventHubProducerClient` + `AzureFunctions` #21849

Closed Stael closed 2 years ago

Stael commented 2 years ago

Hello ! I am posting this ticket here, but please let me know if you think I should close it and post it on Azure/azure-functions-python-worker instead.

We are currently using several AzureFunctions in production :

Use case:

The AzureFunction has a timeout of X seconds, meaning that after X seconds, the function is "aborted" (we have been experimenting with 15/30/60/120sec).


We have 2 versions of the code :

"Threaded Version": the one currently in production which doesn't use async/await but uses threading (See here). This version was created to mitigate issue 2 (see below).

def threaded_execution(f):
    def wrapper(*args, **kwargs):
        thread = Thread(target=f, args=args, kwargs=kwargs)
        thread.start()

    return wrapper

class EventProducer:
    def __init__(self, password: str):
        self._producer: EventHubProducerClient = EventHubProducerClient.from_connection_string(password)

    @threaded_execution
    def produce(self, data: dict):
        bson_encoded_data = bson.BSON.encode(data)
        b64_encoded_data = base64.b64encode(bson_encoded_data)
        try:
            self._producer.send_batch([EventData(b64_encoded_data)])
        except Exception as e:
            logging.error(...)

The function code looks like this:

event_producer: EventProducer = EventProducer(...)

def main(req: func.HttpRequest) -> func.HttpResponse:
    data: dict = build_event_data(req)
    event_producer.produce(data)
    return func.HttpResponse(status_code=200)

"Async/Await Version": Another one which we created and released in "staging" because we were unable to debug issue 1 while using the "threading version" of the code.

Our producer code looks like this:

class EventProducer:
    def __init__(self, password: str):
        self._password: str = password

    async def produce(self, data: dict):
        bson_encoded_data = bson.BSON.encode(data)
        b64_encoded_data = base64.b64encode(bson_encoded_data)
        client = EventHubProducerClient.from_connection_string(self._password)
        async with client:
            await client.send_batch([EventData(b64_encoded_data)])

The function code looks like this:

event_producer: EventProducer = EventProducer(...)

async def main(req: func.HttpRequest) -> func.HttpResponse:
    data: dict = build_event_data(req)
    await event_producer.produce(data)
    return func.HttpResponse(status_code=200)

We face two issues :

  1. With the "Threaded Version" of the code, several times per day we experience timeouts. We were unable to pinpoint the issue so we developed the "Async/Await version" of the code and managed to trace the issue back to (from what I understand) send_batch which requires more than X seconds to be executed. See here an example with a 120sec timeout.

    • Do you have any idea what could cause the issue ?
    • Should we add a timeout to send_batch with a retry mechanism ? (but there is already a retry mechanism in the library right ?)
  2. As you can see, on each HTTP Request, we create a new EventHubProducerClient. But this process takes more than 500ms each time (See here and here) and sometimes even more, which is a big deal for us.

    • Originally we developed the "Threaded version" to mitigate this issue but I would like your opinion on it, should we stick to threading or is there a way to have great performances with async/await ?
    • I have tried to use a single/global EventHubProducerClient (by instancing the EventHubProducerClient in the EventProducer construct basically) unfortunately it seems that this approach doesn't seem to work (see here).

Thanks !

yunhaoling commented 2 years ago

hey @Stael , thanks for reaching out, we'll investigate into the issue ASAP

Stael commented 2 years ago

Hello @yunhaoling, perfect, thanks !

yunhaoling commented 2 years ago

hey @Stael , I got some time to read through the issue. So first of all, it is expected that the EventHubProducerClient takes some time to establish the connection to the service. There're multiple AMQP layers initialization as well as authentication needed to be performed as per the AMQP spec.

In general, I would recommend you to reuse the same client if your application is time sensitive.

to answer your questions:

1.1 Do you have any idea what could cause the issue?

  • I'm unfamiliar with the Azure function 120s timeout, but if the EventHubProducerClient fails to send events, errors will be raised. But depending on your logging for the "sync version", there is no error raised from EventHubProducerClient.
  • I have one question about this "sync version", will the decorated method be blocking and returns until send is complete? from the code, I think a thread is started for sending to event hub, but not waiting for the send to finish, the azure function immediately returns. So I'm not sure whether this 120s indicate the event fails to be sent or there's something else happening.

1.2 Should we add a timeout to send_batch with a retry mechanism ? (but there is already a retry mechanism in the library right ?)

I would say yes, it would be helpful to narrowing down the problems whether it's within the Event Hub or Azure Function.

2.1 Originally we developed the "Threaded version" to mitigate this issue but I would like your opinion on it, should we stick to threading or is there a way to have great performances with async/await ?

I have read your original issue, it seems that you want to reduce the time of reconnection. Async/await would be helpful if you're working with I/O bound tasks (let's say you have 10 coroutine tasks that perform sending). But yea, you could keep the async one as it does no harm.

2.2 I have tried to use a single/global EventHubProducerClient (by instancing the EventHubProducerClient in the EventProducer construct basically) unfortunately it seems that this approach doesn't seem to work (see here).

I'm sorry this is actually a bug in our SDK that reusing (open-close-open) the same client to send events without specifying partition id would lead to failure. (I'll make a PR to fix it)

but I think it's better to remove async with in the produce because it's of the same effect as creating a new client each time, async with will shut down the connection completely and re-open the connection.

I'll probably just go with:

class EventProducer:
    def __init__(self, password: str):
        self._password: str = password
        self.client = EventHubProducerClient.from_connection_string(self._password)

    async def produce(self, data: dict):
        bson_encoded_data = bson.BSON.encode(data)
        b64_encoded_data = base64.b64encode(bson_encoded_data)
        await client.send_batch([EventData(b64_encoded_data)])

and for more context, the service side has enforced a 10 mins idle timeout which means if there's no activities in 10 mins, the service would force closing the connection between the service and the client.

Please let me know if you have any questions!

Stael commented 2 years ago

Hello @yunhaoling, Thanks a lot for your comprehensive reply !

  1. So first of all, it is expected that the EventHubProducerClient takes some time to establish the connection to the service. There're multiple AMQP layers initialization as well as authentication needed to be performed as per the AMQP spec. In general, I would recommend you to reuse the same client if your application is time sensitive.

Ok, got it. This will be do-able once your PR fixing the reuse of EventHubProducerClient will be merged.

  1. I'm unfamiliar with the Azure function 120s timeout, but if the EventHubProducerClient fails to send events, errors will be raised. But depending on your logging for the "sync version", there is no error raised from EventHubProducerClient.

To be clear the timeout is ajustable on our side. There is indeed no error raised, but from my tests, I believe that client.send_batch never finishes which triggers a timeout after 120 sec. Indeed, when I add a log right after EventHubProducerClient.send_batch, it never shows up when a timeout occurs.

  1. I have one question about this "sync version", will the decorated method be blocking and returns until send is complete? from the code, I think a thread is started for sending to event hub, but not waiting for the send to finish, the azure function immediately returns. So I'm not sure whether this 120s indicate the event fails to be sent or there's something else happening.

From my understanding (I asked the question previously here) :

However from time to time the function needs more than 120 sec to execute which triggers a timeout. The tricky part is that I don't believe that the code executed before EventHubProducerClient.send_batch has any chance to require that much time. I can share it with you privately but it's basically a chain of dict.get and dict.set. That's how I deduced that the issue might came from the EventHubProducerClient however ... I am unable to explain how a send_batch executed in a thread manages to "lock" the main process. And when testing without the thread ... I still end up with the same timeout.

IMHO me there are 3 possibilities :

  1. I would say yes, it would be helpful to narrowing down the problems whether it's within the Event Hub or Azure Function.

Ok. I will implement a version with a timeout in send_batch and a retry mechanism and keep you posted ASAP.

  1. I have read your original issue, it seems that you want to reduce the time of reconnection. Async/await would be helpful if you're working with I/O bound tasks (let's say you have 10 coroutine tasks that perform sending). But yea, you could keep the async one as it does no harm.

Alright, once the bug is fixed on your side, I will implement a version which reuses EventHubProducerClient and keep you posted regarding the performance.

  1. but I think it's better to remove async with in the produce because it's of the same effect as creating a new client each time, async with will shut down the connection completely and re-open the connection.

I have tried to use your simplified version (which is different from the one in the docs). Unfortunately, it frequently (but not always) crashes with this message: python exited with code 139. Do you have any idea why ?

Thanks again.

Stael commented 2 years ago

Update on the send_batch + timeout + retry mechanism I have added a lot of logging to help narrow down the issue From my understanding, the arg timeout expect a value in seconds.

async def produce(self, data: dict):
    logging.info("Start BSON encode")
    bson_encoded_data = bson.BSON.encode(data)
    logging.info("Done BSON encode")

    logging.info("Start B64 encode")
    b64_encoded_data = base64.b64encode(bson_encoded_data)
    logging.info("Done B64 encode")

    logging.info("Start creating client from connection string")
    client = EventHubProducerClient.from_connection_string(self._password)
    logging.info("Done creating client from connection string")

    for i in range(1, 4):
        try:
            logging.info("Start async with")
            async with client:
                logging.info("In async with")

                logging.info("Start send batch")
                await client.send_batch([EventData(b64_encoded_data)], timeout=2)
                logging.info("Done send batch")
            logging.info("Done async with")
            return
        except Exception as e:
            logging.error(f"Try: {i}: failed to send batch")
            logging.exception(e)

This code still triggered a timeout after send_batch was called. It looks like timeout=2 has no impact. This doesn't happen everytime, and I am unable to find the correlation.

See logs: example 1 | example 2 with debug logs

Stael commented 2 years ago

Hello @yunhaoling, any update on this?

yunhaoling commented 2 years ago

hey @Stael , apologize for the late response and I appreciate you trying out different approaches and sharing your latest debugging information.

I think there're two issues here:

How about let's solving one issue at a time? I would like to work with you on getting a workaround first. Let's focus on reusing a single async client because I feel this is the ultimate approach we want to go with, just one client, no new client/new connection each time the Function is triggered. Let's see why the async client would crash with python exited with code 139.

Are you aware of any pattern that your application (Azure Function) is undergoing when the single async client program crashes? like a burst of http request coming in or a long period of activity?

Usually I see crashes like 139(SIGSEGV) due to our underlying networking uAMQP lib crashes as it's not thread safe. A burst of messages + long inactivity would lead to a client shutting down and recreating the underlying connection which is error prone. I would go with adding a lock see if it helps mitigate the crash issue.

Also, one more thing is that could you help turn on the networking logging by setting logging_enable=True when creating the client? via this we could tell what's the last frame that has been put onto the wire before the world stopped. Please see my code snippets below:

import asyncio

class EventProducer:
    def __init__(self, password: str):
        self._password: str = password
        self.client = EventHubProducerClient.from_connection_string(self._password, logging_enable=True)
        self.lock = asyncio.Lock()

    async def produce(self, data: dict):
        bson_encoded_data = bson.BSON.encode(data)
        b64_encoded_data = base64.b64encode(bson_encoded_data)
        async with self.lock:
            await client.send_batch([EventData(b64_encoded_data)])
Stael commented 2 years ago

Hello @yunhaoling Thank you for your help !

Yes, I do agree with you, it looks like there are several issues mixed up.

I have tried your snippet on our staging environment with success (no 139 crash / timeout) So afterwards I deployed it on production and again most of the issues seem to be fixed : No 139 crash and only 2 waves of timeouts in 4 days instead of 2/3/4 waves per day

So it looks like the lock does indeed solve the issue (or part of it). However I am unsure of "why".

Moreover unlike previously where the logs would abruptly stop right after entering the send_batch now when there is a timeout ... there is no log at all. Which in my opinion might be an error related to the Azure Functions and not to the EventHubProducerClient.

Are you aware of any pattern that your application (Azure Function) is undergoing when the single async client program crashes? like a burst of http request coming in or a long period of activity?

Unfortunately no, I have tried several times to find a pattern, but I did not find anything conclusive.

If it helps I can, on our staging environment, re-deploy the previous version of the code with logging_enable=True to help you identify the last sent frame.

I will now open a ticket on the AzureFunction for python repo. I want them to have a look at what can freeze the Function App like that.

Stael commented 2 years ago

Hello @yunhaoling do you have any update on this ?

yunhaoling commented 2 years ago

hey @Stael , apologize for not getting back to your sooner and thanks for trying out the snippet.

I do have an update for you is that the producer client unable to reopen after close due to partition id issue which your posed earlier has been fixed in the latest stable release.

"now when there is a timeout ... there is no log at all."

This is interesting.. I'm gonna to verify that if send timeout is met, the send_batch method would raise the error properly and log the necessary error information.

Yea, I would also like to know why the logs freeze..

I would appreciate if you could help turn on the logging by logging_enable=True. As far as I know, if the producer blocks in sending, it should be keeping empty frames out to keep connection alive while waiting for acknowledge of the sending operation.

ghost commented 2 years ago

Hi, we're sending this friendly reminder because we haven't heard back from you in a while. We need more information about this issue to help address it. Please be sure to give us your input within the next 7 days. If we don't hear back from you within 14 days of this comment the issue will be automatically closed. Thank you!

Stael commented 2 years ago

Hello @yunhaoling

Thank you for your reply.

I have noted that the issue with the producer client has been fixed. Thank you. Does it mean that I can go back from :

import asyncio

class EventProducer:
    def __init__(self, password: str):
        self._password: str = password
        self.client = EventHubProducerClient.from_connection_string(self._password, logging_enable=True)
        self.lock = asyncio.Lock()

    async def produce(self, data: dict):
        bson_encoded_data = bson.BSON.encode(data)
        b64_encoded_data = base64.b64encode(bson_encoded_data)
        async with self.lock:
            await client.send_batch([EventData(b64_encoded_data)])

to :

class EventProducer:
    def __init__(self, password: str):
        self._password: str = password
        self.client = EventHubProducerClient.from_connection_string(self._password)

    async def produce(self, data: dict):
        bson_encoded_data = bson.BSON.encode(data)
        b64_encoded_data = base64.b64encode(bson_encoded_data)
        await client.send_batch([EventData(b64_encoded_data)])

Have you been able to look at why does the timeout parameter have no effect ? See : https://github.com/Azure/azure-sdk-for-python/issues/21849#issuecomment-983489177


I had turned on the logging using logging_enable=True quite some time ago. Unfortunately no luck on that side.


I am also waiting for an investigation on the other issue that it opened: https://github.com/Azure/azure-functions-python-worker/issues/931

Stael commented 2 years ago

Hello again @yunhaoling !

Out of sheer luck ... I managed to trigger the bug and to capture the logs. Here are the logs

So it is definitely related to the EventHubProducer or azure-uamqp.

Could you please have a look at it and get back as soon as possible. Thanks,

yunhaoling commented 2 years ago

hey @Stael , apologize for the delayed response and thanks for sharing the logs!

from the logs we could conclude that:

  1. when the producer tries to send out the event for the first time, it hits a Connection reset by peer error which indicates the previous connection is down.
  2. then the producer tries to reestablish the connection and succeeds in authentication
  3. however, the producer got stuck after authentication succeeds and timeout doesn't work as well

I have inspiration on debugging the issue, as the next thing to expect in the flow in creating a new uamqp sender and get sender ready, however, it's not happening means the uamqp sender preparation process got stuck probably due to an inconsistent internal status.

I will dig deep into that part.

one question for you to confirm it that does the error occur with or without the asyncio lock or both would trigger this hanging issue?

Stael commented 2 years ago

Hello @yunhaoling,

when the producer tries to send out the event for the first time

I don't think that it is during "the first time". Indeed this error happens wether if we have other calls to the function before it or not. Moreover as you said, there is a Connection reset by peer which implies that there was a previous connection. But maybe you don't mean "the first message once the EventHubProducerClient is created", but the "first try" to send the message ?

one question for you to confirm it that does the error occur with or without the asyncio lock or both would trigger this hanging issue?

I am currently using the code that you suggested with the asyncio lock in production.

Thanks,

yunhaoling commented 2 years ago

apologize for not making it clear, it's the "first try to send the message".

Thanks for the extra info on which scripts you're using, will dig deep into it!

btw, I'm thinking of a workaround for you to unblock your deployment/application, in case fixing/locating the issue takes longer than I expect.

I think asyncio.wait_for could be used here as a temporary workaround to resolve the issue by implementing a manual retry, please see my code snippets below:

async def produce(self, data: dict):
    bson_encoded_data = bson.BSON.encode(data)
    b64_encoded_data = base64.b64encode(bson_encoded_data)

    retry = 0
    while retry < 4:
        try:
            async with self.lock:
                await asyncio.wait_for(client.send_batch([EventData(b64_encoded_data)]), timeout=30)  # if the coroutine doesn't finish in 30s, asyncio.TimeoutError would be raised.
            return
        except asyncio.TimeoutError:
            retry += 1
    logging.warning("retry exhausted")
Stael commented 2 years ago

Hello @yunhaoling.

Thank you for this snippet, I will try to implement and test it ASAP.

Stael commented 2 years ago

Hello @yunhaoling

Here is the implementation I went for (azure-eventhub==5.7.0) :

class EventProducer:
    _MAX_NUM_RETRIES: int = 4
    _TIMEOUT_SEC: int = 5

    def __init__(self, password: str):
        self.client = EventHubProducerClient.from_connection_string(
            password, logging_enable=True, retry_total=0, auth_timeout=EventProducer._TIMEOUT_SEC
        )
        self.lock = asyncio.Lock()

    async def produce(self, data: dict):
        bson_encoded_data = bson.BSON.encode(data)
        b64_encoded_data = base64.b64encode(bson_encoded_data)
        logging.info('Done encoding data')

        for i in range(EventProducer._MAX_NUM_RETRIES):
            try:
                async with self.lock:
                    logging.info(f'Start sending batch, try : {i + 1}')
                    await asyncio.wait_for(
                        self.client.send_batch([EventData(b64_encoded_data)], timeout=EventProducer._TIMEOUT_SEC),
                        timeout=EventProducer._TIMEOUT_SEC
                    )
                    logging.info(f'Done sending batch, try : {i + 1}')
                return
            except asyncio.TimeoutError:
                logging.error(
                    f'Timeout while sending data, try : {i + 1} / {EventProducer._MAX_NUM_RETRIES}'
                )
            except Exception as e:
                logging.error(f'Unexpected exception while sending data: {e}')
                logging.exception(e)

        logging.error('Unable to send the data: retries exhausted')

Unfortunately no impact.

Here are two examples :

To be honest with you, at this point, seeing how many issues we are facing, we will probably stop using EventHub/AzureFunctions/BlobStorage in the next few months. We are left we no other options as we simply had too many issues and spent too much time debugging them over the years. It prevents us from focusing on our core product, and makes us unable to provide a great quality of service to our clients.

yunhaoling commented 2 years ago

hey @Stael , thanks for the updates and sharing me with your complete code.

I sincerely feel sorry about your bad experience, and I admit it's our fault that we didn't run enough test against the scenarios when the Event Hub SDK is integrated with Azure Function.

I have prioritized this issue and working on it now, making changes to the underlying dependency library uamqp where I believe the stuck took place.

I would probably give you a manually built patch first to try out the fix for the sake of saving time.

Again, I understand the pain here and I really appreciate your collaboration and patience with us. I will do my best to support.

Stael commented 2 years ago

Hello @yunhaoling.

Thank you for the update.

Alright, you can send me a patch and I will try my best to test it asap. But as you know, I am unable to trigger this bug manually. I can apply the patch but we will have to wait for ~ a week to see if it works (less if it does not).

yunhaoling commented 2 years ago

hey @Stael , thanks for your patience and I have got some updates and potential workarounds for you.

Background knowledge: connection idle time out

The two issues related to idle time out:

Attempts:

- approach 1, try manually keeping connection alive?

The workaround jumps right into my mind is whether we could keep the connection alive to avoid idle timeout, so I tried using a background thread to keep sending heartbeats to keep connection alive (a feature supported by the dependency uamqp library).

However, this doesn't work out nicely as the azure function is triggered by incoming request (event driven), the program gets suspended once the main function returns. And I see azure function switching workers executing the function.

Then I realize what I'm trying to achieve is to turn azure function into a stateful/long processing service (the long amqp connection as a state), while the azure function itself is designed to be stateless.

I did some search on the stackoverflow and found this: https://stackoverflow.com/questions/63490224/is-azure-functions-a-right-fit-for-long-running-tasks, maybe azure function is not the best for this scenario, or you could alternatively use EventGrid instead EventHubs which is a http based service.

- approach 2, use azure eventhub output binding integration provided by azure function

I was wondering instead doing it myself, is there feature azure function provides? I further read the documentations of azure function and found that it has already provided the integration with event hubs for binding output (sending events to EventHubs).

I tried it and it feels promising, I didn't see any issue so far.

the documentation could be found here: https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-event-hubs-output?tabs=python

My code:

I have put all my code in the following repo, approach 2 is fairly straightforward, it's more about configuration. https://github.com/yunhaoling/python-azure-function-eventhubs-integration

I feel approach 2 is the right approach here, could you help check if approach 2 satisfies your requirements? I would be glad to help you out!

Stael commented 2 years ago

Hello @yunhaoling,

Thank you for the update. I understand the situation, there is probably something wrong but it's very hard to debug.

I looked at the two approaches that you offer :

  1. I did try it for the sake of simplicity. I cannot say if it prevents the "hanging issue", but I can say that the "succeeds in sending the data, but requires ~ 1.500 ms which is unacceptable" case still happens.
  2. I honestly did not know that this option existed. I tried it, and tweaked your version a bit to be able to return an HTTP Response and send a message to EventHub. Here is the code that I ended up with. It is so much simpler 🤦 . I deployed it on our staging platform and it is running fine so far. I don't know how it works underneath, but sending the data to EventHub that way makes the function much faster (up to 3 times, ~ 60ms -> ~ 20ms) AND I did not encounter the timeout issue where the re-connection takes ~ 1.500ms.

I will let it run for a while and do some more tests before releasing to our production env. I do agree with you, this is the right approach.

Thank you for your help & the code that you provided. I will keep you updated on how my tests go.

function.json:

...
{
      "authLevel": "anonymous",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": ["get"],
      "route": "my-route"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    },
    {
      "type": "eventHub",
      "name": "queue",
      "eventHubName": "__MUST_EXISTS_BUT_WILL_BE_OVERWRITTEN_BY_CONNECTION__",
      "connection": "<connection>",
      "direction": "out"
    }

controller:

def main(req: func.HttpRequest, queue: func.Out[bytes]) -> func.HttpResponse:
    response = ...
    encoded_data = ...
    queue.set(encoded_data)
    return response
yunhaoling commented 2 years ago

awesome, looking forward to your good news.

The azure function eventhub output binding has internally integrated with a .NET SDK EventHubProducerClient, and I assume it's kept alive by the azure function for sending events.

ghost commented 2 years ago

Hi @Stael. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text “/unresolve” to remove the “issue-addressed” label and continue the conversation.

ghost commented 2 years ago

Hi @Stael, since you haven’t asked that we “/unresolve” the issue, we’ll close this out. If you believe further discussion is needed, please add a comment “/unresolve” to reopen the issue.

Stael commented 2 years ago

Hello @yunhaoling,

I am happy to report that after a full week using the solution mentioned previously ... I encountered 0 issue 👍

I will also update my other issue to let them know that: the issue seems related to how EventHubProducer and AzureFunctions interact together (AKA: be very careful when using this combo) + we found a workaround.

Indeed, I was advised to use this combo and it is actually described and covered by tests on their side.

Thank you for your help.