Azure / azure-uamqp-python

AMQP 1.0 client library for Python
MIT License
56 stars 48 forks source link

Azure Function Service Bus ImportError: cannot import name 'c_uamqp' from partially initialized module 'uamqp' #281

Closed ElAouaneHamza closed 1 year ago

ElAouaneHamza commented 2 years ago

Describe the bug I have a Azure function app to trigger ServiceBusMessage. When the azure function starts, it fails throwing the following error:

Result: Failure Exception: ImportError: cannot import name 'c_uamqp' from partially initialized module 'uamqp' (most likely due to a circular import) (/home/site/wwwroot/.python_packages/lib/site-packages/uamqp/__init__.py) Stack: File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 402, in _handle__invocation_request call_result = await self._loop.run_in_executor( File "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 52, in run result = self.fn(*self.args, **self.kwargs) File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 606, in _run_sync_func return ExtensionManager.get_sync_invocation_wrapper(context, File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/extension.py", line 215, in _raw_invocation_wrapper result = function(**args) File "/home/site/wwwroot/SentimentAnalysis/testServiceBus.py", line 10, in main from azure.servicebus import ServiceBusClient File "/home/site/wwwroot/.python_packages/lib/site-packages/azure/servicebus/__init__.py", line 6, in <module> from uamqp import constants File "/home/site/wwwroot/.python_packages/lib/site-packages/uamqp/__init__.py", line 12, in <module> from uamqp import c_uamqp # pylint: disable=import-self

My azure functions looks like this:

import logging
import json
import boto3
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
import azure.functions as func

def main(message: func.ServiceBusMessage):
    # Log the Service Bus Message as plaintext
    message_content_type = message.content_type
    message_body = message.get_body().decode("utf-8")

    logging.info("Python ServiceBus topic trigger processed message.")
    logging.info("Message Content Type: " + message_content_type)
    logging.info("Message Body: " + message_body)

    #KeyVault Configuration
    KeyVault_Url = f'keyvailt_url'
    Keyvault_Name = 'keyvalt_name'
    credential = DefaultAzureCredential()
    client_keyvault = SecretClient(vault_url=KeyVault_Url, credential=credential)

    # Service Bus Connection string
    CONNECTION_STR = client_keyvault.get_secret("Service-CONN").value
    # For receiving the feedback from campaigns
    TOPIC_NAME_A = "topicAname" 
    SUBSCRIPTION_NAME = "subscriptionName"

    # For sending feedback and results of sentiment analysis and language detection
    TOPIC_NAME_B = "topicBname" 

    comprehend = boto3.client(service_name='comprehend', region_name='eu-west-1', aws_access_key_id=client_keyvault.get_secret("ID").value, aws_secret_access_key=client_keyvault.get_secret("SECRET").value)

    servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)
    with servicebus_client:
        receiver = servicebus_client.get_subscription_receiver(
            topic_name=TOPIC_NAME_A,
            subscription_name=SUBSCRIPTION_NAME
        )
        with receiver:
            received_msgs = receiver.receive_messages(max_message_count=10, max_wait_time=5)

            for msg in received_msgs:

                message= str(msg)
                res = json.loads(message) 
                text = res['Text']
                result_json= json.dumps(comprehend.detect_sentiment(Text=text, LanguageCode='en'), sort_keys=True, indent=4)
                result = json.loads(result_json) # converting json to python dictionary

                #extracting the sentiment value 
                sentiment = result["Sentiment"]

                #extracting the sentiment score
                if sentiment == "POSITIVE":
                    value = round(result["SentimentScore"]["Positive"] * 100,2)

                elif sentiment == "NEGATIVE":
                    value = round(result["SentimentScore"]["Negative"] * 100,2)

                elif sentiment == "NEUTRAL":
                    value = round(result["SentimentScore"]["Neutral"] * 100,2)

                elif sentiment == "MIXED":
                    value = round(result["SentimentScore"]["Mixed"] * 100,2)

                lang_result=json.dumps(comprehend.detect_dominant_language(Text = text), sort_keys=True, indent=4)

                #converting languages detection results into a dictionary
                lang_result_json=json.loads(lang_result)

                #Formatting the score from the results
                for line in lang_result_json["Languages"]:
                    line['Score'] = round(line['Score']* 100, 2)

                #storing the output of sentiment analysis, language detection and ids in a dictionary and converting it to JSON
                output={
                'ID':res['ID'],
                'userId':res['userId'],
                'defID':res['defID']
                }

                output_json = json.dumps(output, ensure_ascii=False)

                #-------------------------------------------------------------------------------------------------------
                # Sending the processed output (output_json) in json format to another service bus

                def send_output(sender):
                    message = ServiceBusMessage(
                    output_json,
                    content_type="Analysis", #setting the content type so that the service bus can route it.
                    application_properties={b'CODE':msg.application_properties[b'CODE']} #setting the tenant code
                    )
                    sender.send_messages(message)

                servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)

                with servicebus_client:
                    sender = servicebus_client.get_topic_sender(topic_name=TOPIC_NAME_B)
                    with sender:
                        send_output(sender)

                receiver.complete_message(msg)

I have requirements.txt

azure-functions
azure-servicebus==7.0.0
boto3
azure-keyvault
azure-identity
uamqp

function.json

{
  "scriptFile": "testServiceBus.py",
  "entryPoint": "main",
  "bindings": [
    {
      "name": "message",
      "type": "serviceBusTrigger",
      "direction": "in",
      "topicName": "XXX",
      "subscriptionName": "XXX",
      "connection": "XXX"
    }
  ]
}

and the host

{
  "version": "2.0",
  "extensions": {
    "serviceBus": {
      "messageHandlerOptions": {
        "autoComplete": false
      }
    }
  },
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[2.*, 3.0.0)"
  }
}

To Reproduce Steps to reproduce the behavior:

  1. To reproduce, just create a python function on azure and set the configuration. You will need 1 service bus with 2 topics.. topic_name_A and topic_name_B.

Expected behavior The azure function will read the message from A, process the message and send it to B where I am expecting the see MESSAGE 1. But what is happening is when I send the message to TOPIC A..the function triggers and fails due to the above error, which cause the message to end in dead letter

yunhaoling commented 2 years ago

hey @ElAouaneHamza , sorry for the late response.

I was unable to reproduce the issue on local Ubuntu (WSL2) and azure, as this is an import error, could you help verify locally that even just with the simplest import you will get the failure as well?

which means the azure function should be just like following:

from azure.servicebus import ServiceBusClient, ServiceBusMessage

def main(message: func.ServiceBusMessage):
    sb_client = ServiceBusClient.from_connection_string(...)

also is it that you're developing locally or deployed into the azure cloud when the error is encountered? -- usually I saw this type of error is encounter when there's a local copy of the uamqp project, and the path to which is appended to the PYTHONPATH.

djong1 commented 2 years ago

I observe the same problem, local development environment works, when running in azure functions it fails with message: "Failure Exception: ImportError: cannot import name 'c_uamqp' from partially initialized module 'uamqp' (most likely due to a circular import)". When I remove the sent message part to ServiceBus it works fine. I cannot use it as an output because output only supports string and no user_properties support for the message. So, issue is related appears when using a SB trigger and sent a SB in code. Older SDK versions do work.

kashifkhan commented 1 year ago

We now have released the latest azure-servicebus that is using the new python based amqp library. It would be great if you can give that a try. If there are issues with the client library please open an issue on the sdk repo