Azure / azure-sdk-for-python

This repository is for active development of the Azure SDK for Python. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/python/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-python.
MIT License
4.63k stars 2.84k forks source link

Eventhubs extension crashes with segmentation fault, either SIGSEGV or SIGABORT from send_batch() #14543

Closed MR-KO closed 3 years ago

MR-KO commented 4 years ago

Describe the bug Python program crashes with a segmentation fault (and nothing else) when uploading data to an eventhub. After a fair amount of debugging using gdb and eliminating all other factors (it is not in our code, as we did a complete dry run with everything minus the actual eventhub sending), we find the following error(s) in the eventhubs extension: output of gdb: Error in `/usr/bin/python3': double free or corruption (fasttop): 0x0000555556ca03a0
signals: Program received signal SIGABT, Aborted. Program received signal SIGSEGV, Segmentation fault.

* Stack traces: ** Stack trace from gdb (py-bt command, for the SIGABRT error):

(gdb) py-bt
Traceback (most recent call first):
  <built-in method send of uamqp.c_uamqp.cMessageSender object at remote 0x7fffef69dc08>
  File "/usr/local/lib/python3.5/dist-packages/uamqp/sender.py", line 246, in send
    return self._sender.send(c_message, timeout, message)
  File "/usr/local/lib/python3.5/dist-packages/uamqp/client.py", line 605, in _transfer_message
    sent = self.message_handler.send(message, self._on_message_sent, timeout=timeout)
  File "/usr/local/lib/python3.5/dist-packages/uamqp/client.py", line 626, in _filter_pending
    self._transfer_message(message, timeout)
  File "/usr/local/lib/python3.5/dist-packages/uamqp/client.py", line 645, in _client_run
    self._pending_messages = self._filter_pending()
  File "/usr/local/lib/python3.5/dist-packages/uamqp/client.py", line 397, in do_work
    return self._client_run()
  File "/usr/local/lib/python3.5/dist-packages/uamqp/client.py", line 756, in wait
    running = self.do_work()
  File "/usr/local/lib/python3.5/dist-packages/azure/eventhub/_producer.py", line 161, in _send_event_data
    self._handler.wait()  # type: ignore
  File "/usr/local/lib/python3.5/dist-packages/azure/eventhub/_client_base.py", line 454, in _do_retryable_operation
    **kwargs
  File "/usr/local/lib/python3.5/dist-packages/azure/eventhub/_producer.py", line 171, in _send_event_data_with_retry
    return self._do_retryable_operation(self._send_event_data, timeout=timeout)
  File "/usr/local/lib/python3.5/dist-packages/azure/eventhub/_producer.py", line 262, in send
    self._send_event_data_with_retry(timeout=timeout)
  File "/usr/local/lib/python3.5/dist-packages/azure/eventhub/_producer_client.py", line 245, in send_batch
    to_send_batch, timeout=send_timeout
  File "/home/user/program/program.py", line 173, in send_batch_of_data
    producer.send_batch(event_data_batch)
  File "/home/user/program/program.py", line 300, in main
    print("Sending all new data...")
  File "program_script.py", line 4, in <module>
    program.main()

**** Stack trace from gdb (py-bt command, for the SIGSEGV error):

(gdb) py-bt
Traceback (most recent call first):
  <built-in method send of uamqp.c_uamqp.cMessageSender object at remote 0x7fffef6c7c48>
  File "/usr/local/lib/python3.5/dist-packages/uamqp/sender.py", line 246, in send
    return self._sender.send(c_message, timeout, message)
  File "/usr/local/lib/python3.5/dist-packages/uamqp/client.py", line 601, in _transfer_message
    sent = self.message_handler.send(message, self._on_message_sent, timeout=timeout)
  File "/usr/local/lib/python3.5/dist-packages/uamqp/client.py", line 622, in _filter_pending
    self._transfer_message(message, timeout)
  File "/usr/local/lib/python3.5/dist-packages/uamqp/client.py", line 641, in _client_run
    self._pending_messages = self._filter_pending()
  File "/usr/local/lib/python3.5/dist-packages/uamqp/client.py", line 397, in do_work
    return self._client_run()
  File "/usr/local/lib/python3.5/dist-packages/uamqp/client.py", line 752, in wait
    running = self.do_work()
  File "/usr/local/lib/python3.5/dist-packages/azure/eventhub/_producer.py", line 161, in _send_event_data
    self._handler.wait()  # type: ignore
  File "/usr/local/lib/python3.5/dist-packages/azure/eventhub/_client_base.py", line 454, in _do_retryable_operation
    **kwargs
  File "/usr/local/lib/python3.5/dist-packages/azure/eventhub/_producer.py", line 171, in _send_event_data_with_retry
    return self._do_retryable_operation(self._send_event_data, timeout=timeout)
  File "/usr/local/lib/python3.5/dist-packages/azure/eventhub/_producer.py", line 262, in send
    self._send_event_data_with_retry(timeout=timeout)
  File "/usr/local/lib/python3.5/dist-packages/azure/eventhub/_producer_client.py", line 245, in send_batch
    to_send_batch, timeout=send_timeout
  File "/home/user/program/program.py", line 156, in send_batch_of_data
    producer.send_batch(event_data_batch)
  File "/home/user/program/program.py", line 263, in main
    latest_id = send_batch_of_data(producer,
  File "program_script.py", line 4, in <module>
    program.main()

To Reproduce I cannot send you our entire codebase of getting data etc, or the actual data, but, in theory, this should hopefully be sufficient: Steps to reproduce the behavior:

  1. Get a fair amount of data, around 10-15 million records/rows. In our case, it comes from a database using sqlalchemy. We use the query.yield_per(1000) method to not load that many rows in memory all at once.
  2. Open an eventhub connection:
    
    producer = EventHubProducerClient.from_connection_string(
    conn_str="<connection string here, e.g.: Endpoint=sb://........>",
    eventhub_name="<name here>")
3. Convert & upload data in batches, in JSON form, trimmed down to essentials:

event_data_batch = producer.create_batch() for row in data: json_object = { "id": row.id,

And more data stuff here of course, in our case about 10 more basic values, nothing fancy

}

json_string = json.dumps(json_object, indent=4, sort_keys=True)
event_data = EventData(json_string)

try:
    event_data_batch.add(event_data)
except Exception as e:
    # Reached max data batch size, send it and create a new one
    # segfault/sigabort will occur on the next line, but not consistently... :(
    producer.send_batch(event_data_batch)
    event_data_batch = producer.create_batch()
    event_data_batch.add(event_data)

And a last send_batch() call here to upload the final batch of data with code pretty much the same as above.


4. Get a "segmentation fault" when running the program. It does not always happen at the exact same "time", but it does happen at the exact same line of code as mentioned in the code comment above in the previous step. So it is independent of the actual data being send. Furthermore, even though the error and/or stack trace indicate a network issue, the program is run on a dedicated VPS, with a gigabit fibre internet connection, so it's not e.g. a flaky 4G connection or something and should therefore be sufficiently stable. 

**Expected behavior**
No segmentation fault and no hard crash, and just upload the data. Or a Python exception would be also fine if something went wrong, but not a hard crash like this. E.g. even PDB isn't able to gracefully handle it, and also crashes.

**Screenshots**
I can add screenshots, but I think the stack traces and provided info should be sufficient. If not, let me know, I can run GDB etc and/or provide more info if needed. But, I cannot share you our entire codebase or the actual data being sent. The code above is exactly what happens, minus some details irrelevant to the bug. 

**Additional context**
It does not happen consistently at the exact same time (e.g. after X amount of data being sent), but it does happen at the exact same line, eventually, with either of the 2 signals being fired: SIGSEGV or SIGABRT. Given the stack trace it could be a network issue, but as said, it's run on a dedicated VPS. The connection string should be correct (some data is being sent and received before the crash), so I would have expected an error of that at the first call of send_batch(), or the eventhubs connect, rather than a random amount of calls later. Also, the python program does not do multiple processes or multiple threads: it's completely single-threaded. Frankly, I'm at a loss, and I hope this is fixable...
samuelkoppes commented 4 years ago

We are investigating this issue and will respond with an update by 11/4/2020.

vasudevan-vishal commented 4 years ago

Facing the same issue. Any updates on this issue would help.

MR-KO commented 4 years ago

Hi, is there any update on this issue? We are still having this problem with no way to get it working.

MR-KO commented 3 years ago

Hi, any chance of a status update?

yunhaoling commented 3 years ago

hey @MR-KO , I'm sorry for missing the threads here.

let me give you a bit more context: azure-eventhub is depending on uamqp which is built on top of some c libraries. what you see may indicate this could be an issue in the c code.

Your stack trace is super helpful, but I'm also wondering if you could also turn on amqp networking tracing and share me with these logs when the segmentation fault happens? -- EventHubProducerClient(..., logging_enable=True).

In the meanwhile I'll dig deep into the C layer to see what causes double free or corruption, thanks for your patience and help!

MR-KO commented 3 years ago

Hi yunhaoling,

Happy new year and sorry for the also late response, christmas holidays and everything :)

Good to know. However, running eventhub with logging_enable=True did not seem to give me any extra output/logs, created like this:

producer = EventHubProducerClient.from_connection_string(
    conn_str="<connection string here, e.g.: Endpoint=sb://........>",
    eventhub_name="<name here>",
    logging_enable=True)

Then run like so:

gdb python3
run myscript.py

I did not get a different stack trace, nor extra output, and did not see any new/extra log files when scouring the system or anything in /var/log/, only something minimal in dmesg (old and generic from the non-gdb runs). Stack trace:

Program received signal SIGSEGV, Segmentation fault.
0x00007ffff37b0e32 in messagesender_send_async () from /usr/local/lib/python3.5/dist-packages/uamqp/c_uamqp.cpython-35m-x86_64-linux-gnu.so
(gdb) bt
#0  0x00007ffff37b0e32 in messagesender_send_async () from /usr/local/lib/python3.5/dist-packages/uamqp/c_uamqp.cpython-35m-x86_64-linux-gnu.so
#1  0x00007ffff371aa10 in __pyx_f_5uamqp_7c_uamqp_14cMessageSender_send (__pyx_v_self=0x7fffef727c48, __pyx_v_message=0x7fffef1c2270,
    __pyx_v_timeout=60000,
    __pyx_v_callback_context=<Message(_header=None, _body=<DataBody(_encoding='UTF-8', _message=<uamqp.c_uamqp.cMessage at remote 0x7fffef1c2270>) at r
emote 0x7fffeeea4a90>, _properties=None, _response=None, delivery_tag=None, _settler=None, retries=1, _annotations=None, _encoding='UTF-8', idle_time=2
289000, _application_properties=None, _delivery_annotations=None, _footer=None, _need_further_parse=False, _message=<uamqp.c_uamqp.cMessage at remote 0x7fffef1c2270>, delivery_no=None, state=<MessageState(__objclass__=<EnumMeta(WaitingForSendAck=<MessageState(__objclass__=<...>, _name_='WaitingForSendAck', _value_=1) at remote 0x7ffff356a828>, WaitingToBeSent=<...>, ReceivedUnsettled=<MessageState(__objclass__=<...>, _name_='ReceivedUnsettled', _value_=4) at remote 0x7ffff356a898>, ReceivedSettled=<MessageState(__objclass__=<...>, _name_='ReceivedSettled', _value_=5) at remote 0x7ffff356a8d0>, _member_type_=<type at remote 0x555555bd9cc0>, _member_map_={'WaitingForSendAck': <...>, 'WaitingToBeSent': <...>, 'ReceivedUnsettled': <...>, 'SendCo...(truncated), __pyx_skip_dispatch=1) at uamqp/c_uamqp.c:69854
#2  0x00007ffff371b5fd in __pyx_pf_5uamqp_7c_uamqp_14cMessageSender_14send (__pyx_v_self=0x7fffef727c48, __pyx_v_message=0x7fffef1c2270,
    __pyx_v_timeout=60000,
    __pyx_v_callback_context=<Message(_header=None, _body=<DataBody(_encoding='UTF-8', _message=<uamqp.c_uamqp.cMessage at remote 0x7fffef1c2270>) at remote 0x7fffeeea4a90>, _properties=None, _response=None, delivery_tag=None, _settler=None, retries=1, _annotations=None, _encoding='UTF-8', idle_time=2289000, _application_properties=None, _delivery_annotations=None, _footer=None, _need_further_parse=False, _message=<uamqp.c_uamqp.cMessage at remote 0x7fffef1c2270>, delivery_no=None, state=<MessageState(__objclass__=<EnumMeta(WaitingForSendAck=<MessageState(__objclass__=<...>, _name_='WaitingForSendAck', _value_=1) at remote 0x7ffff356a828>, WaitingToBeSent=<...>, ReceivedUnsettled=<MessageState(__objclass__=<...>, _name_='ReceivedUnsettled', _value_=4) at remote 0x7ffff356a898>, ReceivedSettled=<MessageState(__objclass__=<...>, _name_='ReceivedSettled', _value_=5) at remote 0x7ffff356a8d0>, _member_type_=<type at remote 0x555555bd9cc0>, _member_map_={'WaitingForSendAck': <...>, 'WaitingToBeSent': <...>, 'ReceivedUnsettled': <...>, 'SendCo...(truncated)) at uamqp/c_uamqp.c:70041
#3  0x00007ffff371b54c in __pyx_pw_5uamqp_7c_uamqp_14cMessageSender_15send (__pyx_v_self=<uamqp.c_uamqp.cMessageSender at remote 0x7fffef727c48>,
    __pyx_args=(<uamqp.c_uamqp.cMessage at remote 0x7fffef1c2270>, 60000, <Message(_header=None, _body=<DataBody(_encoding='UTF-8', _message=<uamqp.c_uamqp.cMessage at remote 0x7fffef1c2270>) at remote 0x7fffeeea4a90>, _properties=None, _response=None, delivery_tag=None, _settler=None, retries=1, _annotations=None, _encoding='UTF-8', idle_time=2289000, _application_properties=None, _delivery_annotations=None, _footer=None, _need_further_parse=False, _message=<uamqp.c_uamqp.cMessage at remote 0x7fffef1c2270>, delivery_no=None, state=<MessageState(__objclass__=<EnumMeta(WaitingForSendAck=<MessageState(__objclass__=<...>, _name_='WaitingForSendAck', _value_=1) at remote 0x7ffff356a828>, WaitingToBeSent=<...>, ReceivedUnsettled=<MessageState(__objclass__=<...>, _name_='ReceivedUnsettled', _value_=4) at remote 0x7ffff356a898>, ReceivedSettled=<MessageState(__objclass__=<...>, _name_='ReceivedSettled', _value_=5) at remote 0x7ffff356a8d0>, _member_type_=<type at remote 0x555555bd9cc0>, _member_map_={'WaitingForSendAck': <...>, 'W...(truncated),
    __pyx_kwds=0x0) at uamqp/c_uamqp.c:70021
    #4  0x000055555571ffb7 in PyCFunction_Call () at ../Objects/methodobject.c:98
#5  0x00005555556f5219 in call_function (oparg=<optimized out>, pp_stack=0x7fffffffc570) at ../Python/ceval.c:4720
#6  PyEval_EvalFrameEx () at ../Python/ceval.c:3251
#7  0x00005555556f9e56 in _PyEval_EvalCodeWithName.lto_priv.1929 () at ../Python/ceval.c:4033
#8  0x00005555556f5b4c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, pp_stack=0x7fffffffc780, func=<optimized out>)
    at ../Python/ceval.c:4828
#9  call_function (oparg=<optimized out>, pp_stack=0x7fffffffc780) at ../Python/ceval.c:4745
#10 PyEval_EvalFrameEx () at ../Python/ceval.c:3251
#11 0x00005555556f546f in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, pp_stack=0x7fffffffc8b0, func=<optimized out>)
    at ../Python/ceval.c:4818
#12 call_function (oparg=<optimized out>, pp_stack=0x7fffffffc8b0) at ../Python/ceval.c:4745
#13 PyEval_EvalFrameEx () at ../Python/ceval.c:3251
#14 0x00005555556f546f in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, pp_stack=0x7fffffffc9e0, func=<optimized out>)
    at ../Python/ceval.c:4818
#15 call_function (oparg=<optimized out>, pp_stack=0x7fffffffc9e0) at ../Python/ceval.c:4745
#16 PyEval_EvalFrameEx () at ../Python/ceval.c:3251
#17 0x00005555556f546f in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, pp_stack=0x7fffffffcb10, func=<optimized out>)
    at ../Python/ceval.c:4818
#18 call_function (oparg=<optimized out>, pp_stack=0x7fffffffcb10) at ../Python/ceval.c:4745
#19 PyEval_EvalFrameEx () at ../Python/ceval.c:3251
#20 0x00005555556f546f in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, pp_stack=0x7fffffffcc40, func=<optimized out>)
    at ../Python/ceval.c:4818
#21 call_function (oparg=<optimized out>, pp_stack=0x7fffffffcc40) at ../Python/ceval.c:4745
#22 PyEval_EvalFrameEx () at ../Python/ceval.c:3251
#23 0x00005555556f546f in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, pp_stack=0x7fffffffcd70, func=<optimized out>)
    at ../Python/ceval.c:4818
#24 call_function (oparg=<optimized out>, pp_stack=0x7fffffffcd70) at ../Python/ceval.c:4745
#25 PyEval_EvalFrameEx () at ../Python/ceval.c:3251
#26 0x00005555556c6c7f in _PyEval_EvalCodeWithName.lto_priv.1929 (qualname=0x0, name=<optimized out>, closure=0x0, kwdefs=0x0, defcount=2,
    defs=0x7ffff4d4c1a0, kwcount=2, kws=<optimized out>, argcount=<optimized out>, args=<optimized out>, locals=<optimized out>,
    globals=<optimized out>, _co=<optimized out>) at ../Python/ceval.c:4033
#27 PyEval_EvalCodeEx () at ../Python/ceval.c:4054
#28 0x0000555555721f5f in function_call.lto_priv () at ../Objects/funcobject.c:627
#29 0x0000555555764607 in PyObject_Call () at ../Objects/abstract.c:2166
#30 0x00005555556f2d56 in ext_do_call (nk=<optimized out>, na=<optimized out>, flags=<optimized out>, pp_stack=0x7fffffffd028,
    func=<function at remote 0x7ffff4d440d0>) at ../Python/ceval.c:5049
#31 PyEval_EvalFrameEx () at ../Python/ceval.c:3290
#32 0x00005555556f9e56 in _PyEval_EvalCodeWithName.lto_priv.1929 () at ../Python/ceval.c:4033
#33 0x00005555556f5b4c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, pp_stack=0x7fffffffd230, func=<optimized out>)
    at ../Python/ceval.c:4828
#34 call_function (oparg=<optimized out>, pp_stack=0x7fffffffd230) at ../Python/ceval.c:4745
#35 PyEval_EvalFrameEx () at ../Python/ceval.c:3251
#36 0x00005555556f9e56 in _PyEval_EvalCodeWithName.lto_priv.1929 () at ../Python/ceval.c:4033
#37 0x00005555556f5b4c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, pp_stack=0x7fffffffd440, func=<optimized out>)
    at ../Python/ceval.c:4828
#38 call_function (oparg=<optimized out>, pp_stack=0x7fffffffd440) at ../Python/ceval.c:4745
#39 PyEval_EvalFrameEx () at ../Python/ceval.c:3251
#40 0x00005555556f9e56 in _PyEval_EvalCodeWithName.lto_priv.1929 () at ../Python/ceval.c:4033
#41 0x00005555556f5b4c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, pp_stack=0x7fffffffd650, func=<optimized out>)
    at ../Python/ceval.c:4828
#42 call_function (oparg=<optimized out>, pp_stack=0x7fffffffd650) at ../Python/ceval.c:4745
#43 PyEval_EvalFrameEx () at ../Python/ceval.c:3251
#44 0x00005555556f9e56 in _PyEval_EvalCodeWithName.lto_priv.1929 () at ../Python/ceval.c:4033
#45 0x00005555556f5f6c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, pp_stack=0x7fffffffd860, func=<optimized out>)
    at ../Python/ceval.c:4828
#46 call_function (oparg=<optimized out>, pp_stack=0x7fffffffd860) at ../Python/ceval.c:4745
#47 PyEval_EvalFrameEx () at ../Python/ceval.c:3251
#48 0x00005555556f546f in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, pp_stack=0x7fffffffd990, func=<optimized out>)
    at ../Python/ceval.c:4818
#49 call_function (oparg=<optimized out>, pp_stack=0x7fffffffd990) at ../Python/ceval.c:4745
#50 PyEval_EvalFrameEx () at ../Python/ceval.c:3251
#51 0x00005555556f546f in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, pp_stack=0x7fffffffdac0, func=<optimized out>)
    at ../Python/ceval.c:4818
#52 call_function (oparg=<optimized out>, pp_stack=0x7fffffffdac0) at ../Python/ceval.c:4745
#53 PyEval_EvalFrameEx () at ../Python/ceval.c:3251
#54 0x00005555556f9e56 in _PyEval_EvalCodeWithName.lto_priv.1929 () at ../Python/ceval.c:4033
#55 0x00005555556fab5f in PyEval_EvalCodeEx () at ../Python/ceval.c:4054
#56 PyEval_EvalCode (co=<optimized out>, globals=<optimized out>, locals=<optimized out>) at ../Python/ceval.c:777
#57 0x00005555557b1a32 in run_mod () at ../Python/pythonrun.c:976
#58 0x00005555557b3ebd in PyRun_FileExFlags () at ../Python/pythonrun.c:929
#59 0x00005555557b465e in PyRun_SimpleFileExFlags () at ../Python/pythonrun.c:396
#60 0x00005555557da027 in run_file (p_cf=0x7fffffffdd3c, filename=0x555555c0f280 L"main_script.py", fp=0x555555ccc0c0) at ../Modules/main.c:318
#61 Py_Main () at ../Modules/main.c:768
#62 0x0000555555669a41 in main () at ../Programs/python.c:65
#63 0x00007ffff6cee2e1 in __libc_start_main (main=0x555555669960 <main>, argc=2, argv=0x7fffffffdf48, init=<optimized out>, fini=<optimized out>,
    rtld_fini=<optimized out>, stack_end=0x7fffffffdf38) at ../csu/libc-start.c:291
#64 0x00005555557741ba in _start ()

And python back trace:

(gdb) py-bt-full
#3 <built-in method send of uamqp.c_uamqp.cMessageSender object at remote 0x7fffef727c48>
#6 Frame 0x555556f865e8, for file /usr/local/lib/python3.5/dist-packages/uamqp/sender.py, line 246, in send (self=<MessageSender(_error=None, _conn=<uamqp.c_uamqp.Connection at remote 0x7fffef6f83a8>, _link=<uamqp.c_uamqp.cLink at remote 0x7fffef6f8510>, _sender=<uamqp.c_uamqp.cMessageSender at remote 0x7fffef727c48>, name=b'sender-link-4c643e72-8452-4840-85a2-131e82f063b0', _session=<Session(_mgmt_links={}, _on_attach=None, _conn=<uamqp.c_uamqp.Connection at remote 0x7fffef6f83a8>, _session=<uamqp.c_uamqp.cSession at remote 0x7fffef727c88>, _connection=<Connection(_error=<ConnectionClose(description=b'Connection in an unexpected error state.', info=None, _encoding='UTF-8', condition=<ErrorCodes(__objclass__=<EnumMeta(LinkMessageSizeExceeded=<ErrorCodes(__objclass__=<...>, _name_='LinkMessageSizeExceeded', _value_=b'amqp:link:message-size-exceeded') at remote 0x7ffff356ae80>, NotImplemented=<ErrorCodes(__objclass__=<...>, _name_='NotImplemented', _value_=b'amqp:not-implemented') at remote 0x7ffff356aa20>, Client...(truncated)
    return self._sender.send(c_message, timeout, message)
#10 Frame 0x7fffef7287c8, for file /usr/local/lib/python3.5/dist-packages/uamqp/client.py, line 605, in _transfer_message (self=<SendClient(_on_attach=None, _max_frame_size=65536, _properties={<AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff787f0>) at remote 0x7fffef721208>: 'Python/3.5.3', <AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff788b0>) at remote 0x7fffef721128>: 'azsdk-python-eventhubs', <AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffef704030>) at remote 0x7fffef70e748>: 'azsdk-python-eventhubs/5.2.0 Python/3.5.3 (Linux-4.9.0-9-amd64-x86_64-with-debian-9.13)', <AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff78810>) at remote 0x7fffef7211d0>: '5.2.0', <AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff78f30>) at remote 0x7fffef702cf8>: 'Linux-4.9.0-9-amd64-x86_64-with-debian-9.13'}, _shutdown=False, _idle_timeout=None, _auth=<JWTTokenAuth(consumed=True, sasl=<_SASL(_interface=None, mechanism=<uamqp.c_uamqp.SASLMechanism ...(truncated)
    sent = self.message_handler.send(message, self._on_message_sent, timeout=timeout)
#13 Frame 0x555556f86388, for file /usr/local/lib/python3.5/dist-packages/uamqp/client.py, line 626, in _filter_pending (self=<SendClient(_on_attach=None, _max_frame_size=65536, _properties={<AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff787f0>) at remote 0x7fffef721208>: 'Python/3.5.3', <AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff788b0>) at remote 0x7fffef721128>: 'azsdk-python-eventhubs', <AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffef704030>) at remote 0x7fffef70e748>: 'azsdk-python-eventhubs/5.2.0 Python/3.5.3 (Linux-4.9.0-9-amd64-x86_64-with-debian-9.13)', <AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff78810>) at remote 0x7fffef7211d0>: '5.2.0', <AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff78f30>) at remote 0x7fffef702cf8>: 'Linux-4.9.0-9-amd64-x86_64-with-debian-9.13'}, _shutdown=False, _idle_timeout=None, _auth=<JWTTokenAuth(consumed=True, sasl=<_SASL(_interface=None, mechanism=<uamqp.c_uamqp.SASLMechanism at...(truncated)
    self._transfer_message(message, timeout)
#16 Frame 0x7fffef1ae048, for file /usr/local/lib/python3.5/dist-packages/uamqp/client.py, line 645, in _client_run (self=<SendClient(_on_attach=None, _max_frame_size=65536, _properties={<AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff787f0>) at remote 0x7fffef721208>: 'Python/3.5.3', <AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff788b0>) at remote 0x7fffef721128>: 'azsdk-python-eventhubs', <AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffef704030>) at remote 0x7fffef70e748>: 'azsdk-python-eventhubs/5.2.0 Python/3.5.3 (Linux-4.9.0-9-amd64-x86_64-with-debian-9.13)', <AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff78810>) at remote 0x7fffef7211d0>: '5.2.0', <AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff78f30>) at remote 0x7fffef702cf8>: 'Linux-4.9.0-9-amd64-x86_64-with-debian-9.13'}, _shutdown=False, _idle_timeout=None, _auth=<JWTTokenAuth(consumed=True, sasl=<_SASL(_interface=None, mechanism=<uamqp.c_uamqp.SASLMechanism at rem...(truncated)
    self._pending_messages = self._filter_pending()
#19 Frame 0x7fffef1ac048, for file /usr/local/lib/python3.5/dist-packages/uamqp/client.py, line 397, in do_work (self=<SendClient(_on_attach=None, _max_frame_size=65536, _properties={<AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff787f0>) at remote 0x7fffef721208>: 'Python/3.5.3', <AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff788b0>) at remote 0x7fffef721128>: 'azsdk-python-eventhubs', <AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffef704030>) at remote 0x7fffef70e748>: 'azsdk-python-eventhubs/5.2.0 Python/3.5.3 (Linux-4.9.0-9-amd64-x86_64-with-debian-9.13)', <AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff78810>) at remote 0x7fffef7211d0>: '5.2.0', <AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff78f30>) at remote 0x7fffef702cf8>: 'Linux-4.9.0-9-amd64-x86_64-with-debian-9.13'}, _shutdown=False, _idle_timeout=None, _auth=<JWTTokenAuth(consumed=True, sasl=<_SASL(_interface=None, mechanism=<uamqp.c_uamqp.SASLMechanism at remote ...(truncated)
    return self._client_run()
#22 Frame 0x7fffef708dc8, for file /usr/local/lib/python3.5/dist-packages/uamqp/client.py, line 756, in wait (self=<SendClient(_on_attach=None, _max_frame_size=65536, _properties={<AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff787f0>) at remote 0x7fffef721208>: 'Python/3.5.3', <AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff788b0>) at remote 0x7fffef721128>: 'azsdk-python-eventhubs', <AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffef704030>) at remote 0x7fffef70e748>: 'azsdk-python-eventhubs/5.2.0 Python/3.5.3 (Linux-4.9.0-9-amd64-x86_64-with-debian-9.13)', <AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff78810>) at remote 0x7fffef7211d0>: '5.2.0', <AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff78f30>) at remote 0x7fffef702cf8>: 'Linux-4.9.0-9-amd64-x86_64-with-debian-9.13'}, _shutdown=False, _idle_timeout=None, _auth=<JWTTokenAuth(consumed=True, sasl=<_SASL(_interface=None, mechanism=<uamqp.c_uamqp.SASLMechanism at remote 0x7...(truncated)
    running = self.do_work()
#25 Frame 0x7fffef0c73d8, for file /usr/local/lib/python3.5/dist-packages/azure/eventhub/_producer.py, line 161, in _send_event_data (self=<EventHubProducer(_error=None, _link_properties={<AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff78ef0>) at remote 0x7fffef70ef60>: <AMQPLong(_c_type=<uamqp.c_uamqp.LongValue at remote 0x7fffeff78930>) at remote 0x7fffef70e780>}, _max_message_size_on_link=1048576, _idle_timeout=None, _outcome=None, _lock=<_thread.lock at remote 0x7fffeffb9bc0>, running=True, _reconnect_backoff=1, _partition=None, _condition=None, _unsent_events=[<BatchMessage(_header=None, _properties=None, _multi_messages=False, _need_further_parse=False, _application_properties=None, _encoding='UTF-8', _annotations=None, _body_gen=[<EventData(_last_enqueued_event_properties={}, _sys_properties=None, message=<Message(_header=None, _body=<DataBody(_encoding='UTF-8', _message=<uamqp.c_uamqp.cMessage at remote 0x7fffeffabef0>) at remote 0x7fffef0c0780>, _properties=None, _response=None, deliver...(truncated)
    self._handler.wait()  # type: ignore
#31 Frame 0x5555567e1da8, for file /usr/local/lib/python3.5/dist-packages/azure/eventhub/_client_base.py, line 454, in _do_retryable_operation (self=<EventHubProducer(_error=None, _link_properties={<AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff78ef0>) at remote 0x7fffef70ef60>: <AMQPLong(_c_type=<uamqp.c_uamqp.LongValue at remote 0x7fffeff78930>) at remote 0x7fffef70e780>}, _max_message_size_on_link=1048576, _idle_timeout=None, _outcome=None, _lock=<_thread.lock at remote 0x7fffeffb9bc0>, running=True, _reconnect_backoff=1, _partition=None, _condition=None, _unsent_events=[<BatchMessage(_header=None, _properties=None, _multi_messages=False, _need_further_parse=False, _application_properties=None, _encoding='UTF-8', _annotations=None, _body_gen=[<EventData(_last_enqueued_event_properties={}, _sys_properties=None, message=<Message(_header=None, _body=<DataBody(_encoding='UTF-8', _message=<uamqp.c_uamqp.cMessage at remote 0x7fffeffabef0>) at remote 0x7fffef0c0780>, _properties=None, _response=Non...(truncated)
    **kwargs
#35 Frame 0x7fffef0c7210, for file /usr/local/lib/python3.5/dist-packages/azure/eventhub/_producer.py, line 171, in _send_event_data_with_retry (self=<EventHubProducer(_error=None, _link_properties={<AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff78ef0>) at remote 0x7fffef70ef60>: <AMQPLong(_c_type=<uamqp.c_uamqp.LongValue at remote 0x7fffeff78930>) at remote 0x7fffef70e780>}, _max_message_size_on_link=1048576, _idle_timeout=None, _outcome=None, _lock=<_thread.lock at remote 0x7fffeffb9bc0>, running=True, _reconnect_backoff=1, _partition=None, _condition=None, _unsent_events=[<BatchMessage(_header=None, _properties=None, _multi_messages=False, _need_further_parse=False, _application_properties=None, _encoding='UTF-8', _annotations=None, _body_gen=[<EventData(_last_enqueued_event_properties={}, _sys_properties=None, message=<Message(_header=None, _body=<DataBody(_encoding='UTF-8', _message=<uamqp.c_uamqp.cMessage at remote 0x7fffeffabef0>) at remote 0x7fffef0c0780>, _properties=None, _response=No...(truncated)
    return self._do_retryable_operation(self._send_event_data, timeout=timeout)
#39 Frame 0x555556ce8638, for file /usr/local/lib/python3.5/dist-packages/azure/eventhub/_producer.py, line 262, in send (self=<EventHubProducer(_error=None, _link_properties={<AMQPSymbol(_c_type=<uamqp.c_uamqp.SymbolValue at remote 0x7fffeff78ef0>) at remote 0x7fffef70ef60>: <AMQPLong(_c_type=<uamqp.c_uamqp.LongValue at remote 0x7fffeff78930>) at remote 0x7fffef70e780>}, _max_message_size_on_link=1048576, _idle_timeout=None, _outcome=None, _lock=<_thread.lock at remote 0x7fffeffb9bc0>, running=True, _reconnect_backoff=1, _partition=None, _condition=None, _unsent_events=[<BatchMessage(_header=None, _properties=None, _multi_messages=False, _need_further_parse=False, _application_properties=None, _encoding='UTF-8', _annotations=None, _body_gen=[<EventData(_last_enqueued_event_properties={}, _sys_properties=None, message=<Message(_header=None, _body=<DataBody(_encoding='UTF-8', _message=<uamqp.c_uamqp.cMessage at remote 0x7fffeffabef0>) at remote 0x7fffef0c0780>, _properties=None, _response=None, delivery_tag=None, ...(truncated)
    self._send_event_data_with_retry(timeout=timeout)
#43 Frame 0x555556ce83f8, for file /usr/local/lib/python3.5/dist-packages/azure/eventhub/_producer_client.py, line 245, in send_batch (self=<EventHubProducerClient(_partition_ids=None, _auth_uri='sb://my-connection-string-here', _idle_timeout=None, _lock=<_thread.lock at remote 0x7fffeffb9be8>, _credential=<EventHubSharedKeyCredential(key='my-key-here=', token_type=b'servicebus.windows.net:sastoken', policy='policy-type-here') at remote 0x7fffef6fa0f0>, _conn_manager=<_SeparateConnectionManager at remote 0x7fffef70ef98>, _address=<Address at remote 0x7fffef6f8bd0>, _container_id='eventhub.pysdk-83d08320', _max_message_size_on_link=1048576, _auto_reconnect=True, _keep_alive=30, _config=<Configuration(receive_timeout=0, user_agent=None, prefetch=300, retry_total=3, max_batch_size=300, http_proxy=None, max_retries=3, transport_type=<TransportType(__objclass__=<EnumMeta(__new__=<function at remote 0x7ffff6891c80>, AmqpO...(truncated)
    to_send_batch, timeout=send_timeout
#47 Frame 0x5555567d6fc8, for file main.py, line 189, in send_batch_of_data (...(truncated)
    producer.send_batch(event_data_batch)
#50 Frame 0x555556695ee8, for file main.py, line 306, in main (f=<_io.TextIOWrapper at remote 0x7ffff4d39708>, ...(truncated)
    data, data_length)
#53 Frame 0x7ffff6cc6828, for file main_script.py, line 4, in <module> ()
    main.main()

dmesg (old, nothing changed, just a generic error from the non-gdb runs): python3[24681]: segfault at fffffffffffffff8 ip 00007f1b8a2a0572 sp 00007fffea54a0a0 error 5 in c_uamqp.cpython-35m-x86_64-linux-gnu.so[7f1b8a072000+507000]

Unfortunately its still the same, the logging did not seem to produce anything. I have the GDB running in a screen so I can do further investigation if needed.

yunhaoling commented 3 years ago

hey @MR-KO, could you try update the uamqp to v1.2.14 via pip install uamqp --upgrade. we recently has updated the whole sub C module of uamqp including several fixes which might resolve the issue.

let me know if it still crashes on your machine

MR-KO commented 3 years ago

Hi, unfortunately the issue still persists, with the exact same stack trace (down to line numbers, where provided by gdb). Pip has bumped uamqp to version 1.2.14, and azure-eventhub to 5.2.1. Still no change.

yunhaoling commented 3 years ago

hey @MR-KO, apologize for not getting back to you sooner.

I'm able to reproduce the seg fault issue with the following pattern, could you confirm it's also the pattern of your application?

The pattern is:

  1. The producer keeps inactive for > 300s, which lead to the underlying socket io getting completely lost
  2. send an event data of large size (which will be split into several amqp frames)

I have written some code with pure uamqp to better illustrate the issue:

import uamqp
from uamqp import authentication
from datetime import datetime
import time
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s')

live_eventhub_config = {...}

uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
sas_auth = authentication.SASTokenAuth.from_shared_access_key(
    uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])

target = "amqps://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])

send_client = uamqp.SendClient(target, auth=sas_auth, debug=True)
send_client.open()
while not send_client.client_ready():
    send_client.do_work()
print(datetime.now(), "send client is opened")

print(datetime.now(), 'start sleep')
time.sleep(350)
# sleep until the underlying socket io is completely lost
# On windows, the socket io reports "Failure: sending socket failed 10054."
# On linux, the socket io reports "sending socket failed. errno=104 (Connection reset by peer)."
print(datetime.now(), 'end sleep')

# big message will be split into multiple amqp frames which goes into an execution path
# different than a small message (which is composed of just one frame)
# see code: https://github.com/Azure/azure-uamqp-c/blob/master/src/session.c#L1532-L1676
message = uamqp.Message(
    b't'*1024*700
)

# seg fault happens
send_client.send_message(message)

send_client.close()
print(datetime.now(), "send client is closed")

I further investigate into the C code to see how to resolve the issue, will keep posting updates here.

MR-KO commented 3 years ago

Hi @yunhaoling, no problem, I know how the job goes as a fellow software engineer :). I am already glad of the ongoing effort. Great to hear you can reproduce it. I took your code, added my config/login details, and also got a stack trace etc as you'd expect. There are a few things of interest to note:

  1. Our application follows almost the exact same pattern. At first, the eventhubs connection/login details are tested to ensure that actually works before doing heavy DB access etc. Then, that eventhubs connection is fully closed, data is gathered etc, and only when data to send is available, the eventhubs connection is made again. Immediately after that, data will be send using the azure.eventhub.EventHubProducerClient, with batches of EventData (see code in the issue description). We do not (of course) do a time.sleep(), and our DB access seems to also not generate such huge delays between rows of data (batches of 1000 rows). I am currently verifying this explicitly to see if that is the case and will get back to you (it's running now). EDIT: cannot verify this as it crashes at the producer.send_batch(event_data_batch) call and hence I got no more output/logs. However, until then, there is no huge time gap between rows of data (its in the order of microseconds), suggesting that the DB access is not the issue...
  2. Having said that, the stacktrace from py-bt-full is almost exactly the same (as the one from my previous comment) for the first 5 functions, after that the last function of this stack trace differs (which is uamqp/client.py, line 725). Of course the remaining azure eventhub stuff is missing now.

stack trace:

(gdb) py-bt-full
#10 <built-in method send of uamqp.c_uamqp.cMessageSender object at remote 0x7ffff4303ac8>
#13 Frame 0x55555601f7c8, for file /usr/local/lib/python3.5/dist-packages/uamqp/sender.py, line 246, in send (self=<MessageSender(source=<uamqp.c_uamqp.CompositeValue at remote 0x7ffff430f090>, error_policy=<ErrorPolicy(max_retries=3, _on_error=None) at remote 0x7ffff42ff9b0>, _state=<MessageSenderState(_value_=1, __objclass__=<EnumMeta(Open=<MessageSenderState(_value_=3, __objclass__=<...>, _name_='Open') at remote 0x7ffff5797630>, Closing=<MessageSenderState(_value_=4, __objclass__=<...>, _name_='Closing') at remote 0x7ffff57976a0>, _member_map_={'Closing': <...>, 'Open': <...>, 'Idle': <...>, 'Opening': <MessageSenderState(_value_=2, __objclass__=<...>, _name_='Opening') at remote 0x7ffff5797668>, 'Error': <Mess_mapenderState(_value_=5, __objclass__=<...>, _name_='Error') at remote 0x7ffff57976d8>}, _member_names_=['Idle', 'Opening', 'Open', 'Closing', 'Error'], Opening=<...>, _value2member_map_={1: <...>, 2: <...>, 3: <...>, 4: <...>, 5: <...>}, Error=<...>, __doc__='An enumeration.', __module__='uamqp.consta...(truncated)
return self._sender.send(c_message, timeout, message)
#17 Frame 0x7ffff4317408, for file /usr/local/lib/python3.5/dist-packages/uamqp/client.py, line 605, in _transfer_message (self=<SendClient(_channel_max=None, _link_properties=None, _hostname='<hostname here>', _keep_alive_thread=None, message_handler=<MessageSender(source=<uamqp.c_uamqp.CompositeValue at remote 0x7ffff430f090>, error_policy=<ErrorPolicy(max_retries=3, _on_error=None) at remote 0x7ffff42ff9b0>, _state=<MessageSenderState(_value_=1, __objclass__=<EnumMeta(Open=<MessageSenderState(_value_=3, __objclass__=<...>, _name_='Open') at remote 0x7ffff5797630>, Closing=<MessageSenderState(_value_=4, __objclass__=<...>, _name_='Closing') at remote 0x7ffff57976a0>, _membenderp_={'Closing': <...>, 'Open': <...>, 'Idle': <...>, 'Opening': <MessageSenderState(_value_=2, __objclass__=<...>, _name_='Opening') at remote 0x7ffff5797668>, 'Error': <MessageSenderState(_value_=5, __objclass__=<...>, _name_='Error') at remote 0x7ffff57976d8>}, _member_names_=['Idle', 'Opening', 'O...(truncated)
sent = self.message_handler.send(message, self._on_message_sent, timeout=timeout)
#20 Frame 0x55555601ecf8, for file /usr/local/lib/python3.5/dist-packages/uamqp/client.py, line 626, in _filter_pending (self=<SendClient(_channel_max=None, _link_properties=None, _hostname='<hostname here>', _keep_alive_thread=None, message_handler=<MessageSender(source=<uamqp.c_uamqp.CompositeValue at remote 0x7ffff430f090>, error_policy=<ErrorPolicy(max_retries=3, _on_error=None) at remote 0x7ffff42ff9b0>, _state=<MessageSenderState(_value_=1, __objclass__=<EnumMeta(Open=<MessageSenderState(_value_=3, __objclass__=<...>, _name_='Open') at remote 0x7ffff5797630>, Closing=<MessageSenderState(_value_=4, __objclass__=<...>, _name_='Closing') at remote 0x7ffff57976a0>, _member_erSt={'Closing': <...>, 'Open': <...>, 'Idle': <...>, 'Opening': <MessageSenderState(_value_=2, __objclass__=<...>, _name_='Opening') at remote 0x7ffff5797668>, 'Error': <MessageSenderState(_value_=5, __objclass__=<...>, _name_='Error') at remote 0x7ffff57976d8>}, _member_names_=['Idle', 'Opening', 'Ope...(truncated)
self._transfer_message(message, timeout)
#23 Frame 0x7ffff4315c50, for file /usr/local/lib/python3.5/dist-packages/uamqp/client.py, line 645, in _client_run (self=<SendClient(_channel_max=None, _link_properties=None, _hostname='<hostname here>', _keep_alive_thread=None, message_handler=<MessageSender(source=<uamqp.c_uamqp.CompositeValue at remote 0x7ffff430f090>, error_policy=<ErrorPolicy(max_retries=3, _on_error=None) at remote 0x7ffff42ff9b0>, _state=<MessageSenderState(_value_=1, __objclass__=<EnumMeta(Open=<MessageSenderState(_value_=3, __objclass__=<...>, _name_='Open') at remote 0x7ffff5797630>, Closing=<MessageSenderState(_value_=4, __objclass__=<...>, _name_='Closing') at remote 0x7ffff57976a0>, _member_map_ate(losing': <...>, 'Open': <...>, 'Idle': <...>, 'Opening': <MessageSenderState(_value_=2, __objclass__=<...>, _name_='Opening') at remote 0x7ffff5797668>, 'Error': <MessageSenderState(_value_=5, __objclass__=<...>, _name_='Error') at remote 0x7ffff57976d8>}, _member_names_=['Idle', 'Opening', 'Open', ...(truncated)
self._pending_messages = self._filter_pending()
#26 Frame 0x7ffff43161f0, for file /usr/local/lib/python3.5/dist-packages/uamqp/client.py, line 397, in do_work (self=<SendClient(_channel_max=None, _link_properties=None, _hostname='<hostname here>', _keep_alive_thread=None, message_handler=<MessageSender(source=<uamqp.c_uamqp.CompositeValue at remote 0x7ffff430f090>, error_policy=<ErrorPolicy(max_retries=3, _on_error=None) at remote 0x7ffff42ff9b0>, _state=<MessageSenderState(_value_=1, __objclass__=<EnumMeta(Open=<MessageSenderState(_value_=3, __objclass__=<...>, _name_='Open') at remote 0x7ffff5797630>, Closing=<MessageSenderState(_value_=4, __objclass__=<...>, _name_='Closing') at remote 0x7ffff57976a0>, _member_map_={'C_valng': <...>, 'Open': <...>, 'Idle': <...>, 'Opening': <MessageSenderState(_value_=2, __objclass__=<...>, _name_='Opening') at remote 0x7ffff5797668>, 'Error': <MessageSenderState(_value_=5, __objclass__=<...>, _name_='Error') at remote 0x7ffff57976d8>}, _member_names_=['Idle', 'Opening', 'Open', 'Clo...(truncated)
return self._client_run()
#29 Frame 0x55555601f548, for file /usr/local/lib/python3.5/dist-packages/uamqp/client.py, line 725, in send_message (self=<SendClient(_channel_max=None, _link_properties=None, _hostname='<hostname here>', _keep_alive_thread=None, message_handler=<MessageSender(source=<uamqp.c_uamqp.CompositeValue at remote 0x7ffff430f090>, error_policy=<ErrorPolicy(max_retries=3, _on_error=None) at remote 0x7ffff42ff9b0>, _state=<MessageSenderState(_value_=1, __objclass__=<EnumMeta(Open=<MessageSenderState(_value_=3, __objclass__=<...>, _name_='Open') at remote 0x7ffff5797630>, Closing=<MessageSenderState(_value_=4, __objclass__=<...>, _name_='Closing') at remote 0x7ffff57976a0>, _member_maptateClosing': <...>, 'Open': <...>, 'Idle': <...>, 'Opening': <MessageSenderState(_value_=2, __objclass__=<...>, _name_='Opening') at remote 0x7ffff5797668>, 'Error': <MessageSenderState(_value_=5, __objclass__=<...>, _name_='Error') at remote 0x7ffff57976d8>}, _member_names_=['Idle', 'Opening', 'Open',...(truncated)
running = self.do_work()
#33 Frame 0x7ffff6cc6828, for file eh_debug_test.py, line 42, in <module> ()
send_client.send_message(message)

So it seems very likely that this is indeed the culprit!

yunhaoling commented 3 years ago

hey@MR-KO, thanks for your patience! we have fixed the issue in azure-eventhub 5.4.0. please update to the latest version via pip install azure-eventhub --upgrade. (If you're interested, the root cause lies in uamqp, and analysis could be found here: https://github.com/Azure/azure-uamqp-python/pull/217#issue-595648009)

I'm closing this now, feel free to reopen if you're still encountering the issue, thanks!

yunhaoling commented 2 years ago

hey @MR-KO , hope you are keeping well!

I'm excited to announce that we redesigned our Python EventHubs SDK to not rely on any C code anymore, it’s now entirely implemented in Python (5.8.0a3 - pypi). We’d love to get your feedback and perspective on this new version, as we believe you are using this SDK in an environment that would benefit from the changes.

Here are some highlights of the SDK : • Stability and robustness, eliminating any risks of C related memory leak or segmentation fault errors. • Full multi-platform support (ARM processors like M1, etc.) • Improved sending and receiving performance.

To get started, just install the wheel by command pip install azure-eventhub==5.8.0a3 in your Python environment and you're all set to go with your applications. This package is fully backward compatible, and no changes are needed in your application.

We’re eager to hear what you think. Please reply to this thread at any time with feedback or any questions you have for us! We’re all ears.

MR-KO commented 2 years ago

Hi @yunhaoling, thanks, I am, hope you are also well.

Great to hear that, looks promising. Unfortunately, our respective client had stopped using our services about 5 months later, so this functionality has since then not really been used anymore. I have made a note to check it out whenever we do require this, thanks.

helen-linkedin commented 2 years ago

@yunhaoling hey Yunhao, I noticed the redesign happens it 5.8.0a3- wondering if the newest 5.9.0 version contains your change to not rely on C? Thank you!