zatosource / zato

ESB, SOA, REST, APIs and Cloud Integrations in Python
https://zato.io
GNU Affero General Public License v3.0
1.12k stars 240 forks source link

AMQP outgoing connection. How is it work? #559

Closed viatoriche closed 8 years ago

viatoriche commented 8 years ago

I have a RabbitMQ.

RabbitMQ:
queue - TEST
exchange - TEST
   where: routing - TEST -> TEST

In ZATO:

outgoing:
       name: TEST
       def: connection for my RabbitMQ [tested, worked]

In service:

from zato.server.service import Service

class TestAMQPService(Service):

    def handle(self):
        s = 'TEST MESSAGE'
        self.logger.info('SEND: {}'.format(s))
        self.outgoing.amqp.send(s, 'TEST', 'TEST', 'TEST')

Log:

2016-02-11 15:45:05,810 - INFO - 14262:Dummy-64188 - json-rpc-client.test-amqp-service:22 - SEND: TEST MESSAGE

But nothing. Queue is empty after publish.

What am I doing wrong?

RabbitMQ hase connection from zato:

Client properties
product py-amqp
product_version 1.0.11
zato-component  out-amqp/workstation/workstation/29163/Thread-3
capabilities    
consumer_cancel_notify: true
dsuch commented 8 years ago

Hi @viatoriche - here is the documentation:

https://zato.io/docs/web-admin/outgoing/amqp.html https://zato.io/docs/progguide/examples/amqp.html https://zato.io/docs/progguide/outconn/amqp.html

Please follow it to the letter and make sure that everything is configured correctly on the AMQP end.

It's impossible to provide support on the "what am I doing wrong" basis. It is you who needs to make sure that you are doing everything OK in accordance with the documentation.

Alternatively, feel free to contact us on info@zato.io to discuss terms of commercial support and then it will be possible to diagnose everything in detail.

Thanks.

viatoriche commented 8 years ago

Please follow it to the letter and make sure that everything is configured correctly on the AMQP end.

Correctly.

I did everything OK in accordance wih the documentation.

And I still have a problem with empty queue.

From kvdb log:

2016-02-11 18:09:50,914 - DEBUG - 6878:Dummy-9 - zato_kvdb:22 - Cmd: `localhost:6379 (db:0)`, `('PUBLISH', '/zato/connector/amqp/publishing/all', '{"body": "TEST MESSAGE", "msg_type": "0003", "exchange": "TEST", "args": [], "routing_key": "TEST", "headers": {}, "out_name": "TEST", "kwargs": {}, "action": "100803", "properties": {}}')`
dsuch commented 8 years ago

Does it work if you prepare a standalone Python program to connect to this queue?

viatoriche commented 8 years ago

@dsuch Yes

from kombu import Connection
>>> connection = Connection('amqp://viator:XXXXXXXX@localhost:5672//')
>>> connection.connect()
<kombu.transport.pyamqp.Connection object at 0x7fdef191df10>
>>> connection.connected
True
>>> connection
<Connection: amqp://viator@127.0.0.1:5672// at 0x7fdef1505090>
>>> from kombu import Producer
>>> p = Producer(connection)
>>> p.publish('TTT', 'TEST', 'persistent', 'TEST')

RabbitMQ:

Message 1

The server reported 0 messages remaining.

Exchange    (AMQP default)
Routing Key TEST
Redelivered ○
Properties  
priority:   0
delivery_mode:  2
headers:    
content_encoding:   binary
content_type:   application/data
Payload
3 bytes
Encoding: string
TTT

Where I can find a cause of problem? Logs? Options?

2016-02-11 18:08:14,673 - INFO - 6927:MainThread - zato_connector:22 - Started an AMQP publisher for [localhost:5672/ (TEST)]
dsuch commented 8 years ago

What version of kombu is it?

viatoriche commented 8 years ago

@dsuch Hi. I get kombu from environment of zato.

>>> import kombu
>>> kombu.__version__
'2.5.10'

I created new cluster:

zato quickstart create . sqlite localhost 6379 --verbose

I added AMQP Definition:

TEST    localhost   5672    /   viator

And changed password I created new outgoing connection:

TEST    Yes TEST    Persistent

I deployed service with AMQP outgoing send

from zato.server.service import Service

class TestAMQPService(Service):

    name = 'test amqp'

    def handle(self):
        self.logger.info('SEND: {}'.format(self.request.payload))
        self.outgoing.amqp.send(self.request.payload, 'TEST', 'TEST', 'TEST')

I invoked this service from zato admin

Result: Queue is empty

:C

I subscribed to "/zato/connector/amqp/publishing/all" in redis-cli

viator@workstation:~$ redis-cli 
127.0.0.1:6379> subscribe '/zato/connector/amqp/publishing/all'
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "/zato/connector/amqp/publishing/all"
3) (integer) 1
1) "message"
2) "/zato/connector/amqp/publishing/all"
3) "{\"body\": \"123123123\", \"msg_type\": \"0003\", \"exchange\": \"TEST\", \"args\": [], \"routing_key\": \"TEST\", \"headers\": {}, \"out_name\": \"TEST\", \"kwargs\": {}, \"action\": \"100803\", \"properties\": {}}"

In DEBUG server.log I found:

2016-02-12 14:24:49,547 - DEBUG - 987:Dummy-99 - zato.pubsub.invoke-callbacks:22 - Callback consumers found `[]`

Consumers: [], why?

viatoriche commented 8 years ago

With my fix https://github.com/viatoriche/zato/commit/78d2dd663358b92fbabcde86606b6074267c2774 all messages are published and amqp works correctly.