Baltic-RCC / EMF

Repository for Open Source EMF implementation
Mozilla Public License 2.0
9 stars 3 forks source link

Refactor Model Retriever to use RabbitMQ #36

Closed Haigutus closed 6 months ago

m-karo commented 8 months ago

Existing issue is that not able to start RMQ consumer when que is not empty. It automatically aborts. Possible to start consumer of current implementation only if que is empty and has no messages.

@makkes Maybe you could look into consumer code what could be wrong

image

m-karo commented 8 months ago

Another problem is that pika module logs can not be transferred to elk, because it is not JSON compatible. Problem is with args attribute in log message. It has datatype of tuple.

One options is to discard this 'args' key from log messages as this informations still goes to message key. Another option add additional log message handler to convert it to some other supported format. image

m-karo commented 8 months ago

Sorry, I think it can handle tuple. But can not handle when inside tuple there is class instance. image

makkes commented 7 months ago

@m-karo, I'll take a look at this later today.

makkes commented 7 months ago

Existing issue is that not able to start RMQ consumer when que is not empty. It automatically aborts. Possible to start consumer of current implementation only if que is empty and has no messages.

@makkes Maybe you could look into consumer code what could be wrong

I wasn't able to reproduce the connection abortion locally. The steps I tried were:

  1. Spin up RabbitMQ locally: podman run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 -v $(pwd)/rabbitmq_data:/var/lib/rabbitmq/ docker.io/rabbitmq:3.12-management
  2. Create queue object-storage.schedules.iec
  3. Navigate to http://localhost:15672/#/queues/%2F/object-storage.schedules.iec
  4. Scroll down to "Publish message"
  5. In Payload type in foobar
  6. Click Publish message button
  7. Start RMQ consumer: python -m emf.common.integrations.rabbit

Logs from RMQ consumer:

INFO       2024-02-06 22:39:19,667 __main__                            connect                         87  : Connecting to localhost:5672 @ / as guest                                                               
INFO       2024-02-06 22:39:19,670 pika.adapters.utils.connection_workflow start                           179 : Pika version 1.3.2 connecting to ('::1', 5672, 0, 0)                                                
INFO       2024-02-06 22:39:19,671 pika.adapters.utils.io_services_utils _on_writable                    345 : Socket connected: <socket.socket fd=6, family=10, type=1, proto=6, laddr=('::1', 49072, 0, 0), raddr=(
'::1', 5672, 0, 0)>                                                                                                                                                                                                  
INFO       2024-02-06 22:39:19,671 pika.adapters.utils.connection_workflow _on_transport_establishment_done  428 : Streaming transport linked up: (<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport ob
ject at 0x737786f86790>, _StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x737786f86790> params=<ConnectionParameters host=loc
alhost port=5672 virtual_host=/ ssl=False>>).                                                                                                                                                                        
INFO       2024-02-06 22:39:19,673 __main__                            on_connection_open              109 : Connection opened                                                                                       
INFO       2024-02-06 22:39:19,673 __main__                            open_channel                    149 : Creating a new channel                                                                                  
INFO       2024-02-06 22:39:19,673 pika.adapters.utils.connection_workflow _report_completion_and_cleanup  293 : AMQPConnector - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services
_utils._AsyncPlaintextTransport object at 0x737786f86790> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>                           
INFO       2024-02-06 22:39:19,673 pika.adapters.utils.connection_workflow _report_completion_and_cleanup  725 : AMQPConnectionWorkflow - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io
_services_utils._AsyncPlaintextTransport object at 0x737786f86790> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>                                                                  
INFO       2024-02-06 22:39:19,674 __main__                            on_channel_open                 158 : Channel opened                                                                                          
INFO       2024-02-06 22:39:19,674 __main__                            add_on_channel_close_callback   167 : Adding channel close callback                                                                           
INFO       2024-02-06 22:39:19,674 __main__                            on_basic_qos_ok                 197 : QOS set to: 1                                                                                           
INFO       2024-02-06 22:39:19,674 __main__                            start_consuming                 209 : Issuing consumer related RPC commands
INFO       2024-02-06 22:39:19,674 __main__                            add_on_cancel_callback          220 : Adding consumer cancellation callback                                    
INFO       2024-02-06 22:39:19,675 __main__                            on_message                      246 : Received message # 1 from None meta:{}                                                                  
INFO       2024-02-06 22:39:19,675 __main__                            acknowledge_message             277 : Acknowledging message 1

@m-karo can you please try to compile a list of steps leading to the abortion you see?

makkes commented 7 months ago

Another way to get a handle at the root cause would be to look at the RMQ logs which should at least contain some more information about the internal 541 error.

m-karo commented 7 months ago

Looks like issue is not related with statement whether que is empty or not. I reproduced steps as you with sample message from UI and not with original PEVF message then consumer connects and works even if there is message in que before connection. Seems like issue could be related with PEVF message itself image

Haigutus commented 7 months ago

Due to Rabbit issues this will go to Back Log