Closed izmailoff closed 6 years ago
Hi,
Is this from python or from some other language? For non-nameko python code, nameko ships a "standalone rpc proxy" which may be used to make rpc calls to nameko services
For non-python (or if re-implementing the protocol), the exclusive, guid named queue is, as you guessed, for replies. The caller is responsible for managing its own reply queue (nameko creates one per service on startup), and should pass the name of its reply queue in the reply_to
header
Hi @davidszotten , thanks for the reply. My "client" is written in Python and uses rpc proxy to call the "server" which has a method annotated with nameko rpc
. This works well as they both use nameko. Now I want to replace Python "server" with something else (Scala) which has no support for nameko style rpc. So I want to implement something similar to rpc
annotation on the server side. Sorry I was not very clear about that. I'm trying to figure out how to reply back to the "client"/rpc caller side.
Here is what I see being sent to the server (Scala):
bytes:
{"kwargs": {}, "args": ["encoded string"]}
consumerTag:
amq.ctag-qpCtJA_afw_fvbXlT_yi7w
envelope:
Envelope(deliveryTag=1, redeliver=false, exchange=nameko-rpc, routingKey=cv_parsing_service.parse)
properties:
#contentHeader<basic>(content-type=application/json, content-encoding=utf-8, headers={
nameko.call_id_stack=[standalone_rpc_proxy.call.f70038cf-0f85-49fc-8f9f-e86c6f87ea69]
},
delivery-mode=2, priority=0, correlation-id=d7e9065b-d700-4593-b7f4-5c7251c3d375, reply-to=706a0408-5166-4c72-a108-4d4c3bba9ed1, expiration=null, message-id=null,
timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
I also noticed that this queue was created: rpc.reply-standalone_rpc_proxy-706a0408-5166-4c72-a108-4d4c3bba9ed1
I guess if I figure out how to construct this queue name above from the received headers/props and reply back with message (need to set any other props?) I should be done. Are there any other considerations? How do you serialize exceptions?
Thanks a lot
If i recall correctly
the request and response bodies are json encoded data. for requests, (this is all a bit python centric for historical reasons) the body is a dict with two keys, args
and kwargs
meaning what they typically do in a python context (args
is a list of unnamed params and kwargs
is a dict of named params). i'm pretty sure nameko rpc is kwarg
only which makes it easier to evolve apis)
services have queues bound on the nameko-rpc
exchange, and should use the routing key <service name>.<method name>
nameko uses a custom header call_id_stack
to track the "lineage" of rpc calls (for debugging and other tracing). any nameko hosted service will generate a call_id
when handling calls, (<service>.<method>.<uuid>
) and push that onto the call ids for any subsequen calls it makes. you can leave this out entirely, though we found it pretty handy. similarly the built-in standalone proxy generates a call id to start this list
the correlation_id
is used by the requesting service to multiplex replies on the same queue, to tie replies back to requests. a service should read this and include it with its reply, which it should post to the queue provided as reply_to
.
Replies are a dict with two keys, result
and error
. error is null for succesful calls and contains a dict for errors (result
may be null for successfull calls that return nothing). error dict is as per https://github.com/onefinestay/nameko/blob/master/nameko/exceptions.py#L86
nameko services ack their only after successfully replying for an "at least once" delivery model
will try to write this up somewhere at some point
Thanks a lot @davidszotten . This is very useful and clear. I'll try to implement it quickly and will update here.
@davidszotten I've got it working, thanks for your help. Here is Wireshark AMQP trace for the part where nameko subscriber/server is already connected and nameko publisher/client is calling the RPC method. This is based on hello world example from README
No. Time Source Destination Protocol Length Info
8 1.663536620 127.0.0.1 127.0.0.1 AMQP 117 Basic.Publish x=nameko-rpc rk=greeting_service.hello
Frame 8: 117 bytes on wire (936 bits), 117 bytes captured (936 bits) on interface 0
Linux cooked capture
Internet Protocol Version 4, Src: 127.0.0.1, Dst: 127.0.0.1
Transmission Control Protocol, Src Port: 44088 (44088), Dst Port: 5672 (5672), Seq: 1, Ack: 1, Len: 49
Advanced Message Queueing Protocol
Type: Method (1)
Channel: 1
Length: 41
Class: Basic (60)
Method: Publish (40)
Arguments
[Publish-Number: 1]
Ticket: 0
Exchange: nameko-rpc
Routing-Key: greeting_service.hello
.... ...1 = Mandatory: True
.... ..0. = Immediate: False
No. Time Source Destination Protocol Length Info
9 1.663607823 127.0.0.1 127.0.0.1 AMQP 286 Content-Header type=application/json
Frame 9: 286 bytes on wire (2288 bits), 286 bytes captured (2288 bits) on interface 0
Linux cooked capture
Internet Protocol Version 4, Src: 127.0.0.1, Dst: 127.0.0.1
Transmission Control Protocol, Src Port: 44088 (44088), Dst Port: 5672 (5672), Seq: 50, Ack: 1, Len: 218
Advanced Message Queueing Protocol
Type: Content header (2)
Channel: 1
Length: 210
Class ID: Basic (60)
Weight: 0
Body size: 40
Property flags: 0xfe00
Properties
Content-Type: application/json
Content-Encoding: utf-8
Headers
nameko.call_id_stack (array)
[0] (string): standalone_rpc_proxy.call.789dbe44-9e63-4405-ba06-4f180b9c4f6c
Delivery-Mode: 2
Priority: 0
Correlation-Id: ec23013d-1aeb-4a27-8b62-cce6d5b0943b
Reply-To: 53879dd9-6fd4-46a3-bf1e-4eaa7609b3ea
No. Time Source Destination Protocol Length Info
10 1.663625618 127.0.0.1 127.0.0.1 AMQP 116 Content-Body
Frame 10: 116 bytes on wire (928 bits), 116 bytes captured (928 bits) on interface 0
Linux cooked capture
Internet Protocol Version 4, Src: 127.0.0.1, Dst: 127.0.0.1
Transmission Control Protocol, Src Port: 44088 (44088), Dst Port: 5672 (5672), Seq: 268, Ack: 1, Len: 48
Advanced Message Queueing Protocol
Type: Content body (3)
Channel: 1
Length: 40
Payload: 7b2261726773223a205b5d2c20226b7761726773223a207b...
No. Time Source Destination Protocol Length Info
12 1.664114170 127.0.0.1 127.0.0.1 AMQP 395
Frame 12: 395 bytes on wire (3160 bits), 395 bytes captured (3160 bits) on interface 0
Linux cooked capture
Internet Protocol Version 4, Src: 127.0.0.1, Dst: 127.0.0.1
Transmission Control Protocol, Src Port: 5672 (5672), Dst Port: 44104 (44104), Seq: 1, Ack: 1, Len: 327
Advanced Message Queueing Protocol
00.. .... = Format: 0
.... 0001 = Position: ---e (1)
Type: Control (0)
Length: 256
Track: Control (0)
Channel: 13568
Class: Unknown (78)
[Expert Info (Error/Protocol): Unknown command/control class 78]
[Unknown command/control class 78]
[Severity level: Error]
[Group: Protocol]
No. Time Source Destination Protocol Length Info
14 1.669652288 127.0.0.1 127.0.0.1 AMQP 89 Basic.Ack
Frame 14: 89 bytes on wire (712 bits), 89 bytes captured (712 bits) on interface 0
Linux cooked capture
Internet Protocol Version 4, Src: 127.0.0.1, Dst: 127.0.0.1
Transmission Control Protocol, Src Port: 5672 (5672), Dst Port: 44088 (44088), Seq: 1, Ack: 316, Len: 21
Advanced Message Queueing Protocol
Type: Method (1)
Channel: 1
Length: 13
Class: Basic (60)
Method: Ack (80)
Arguments
Delivery-Tag: 2
.... ...0 = Multiple: False
No. Time Source Destination Protocol Length Info
19 1.671130325 127.0.0.1 127.0.0.1 AMQP 76 Protocol-Header 1-0-9
Frame 19: 76 bytes on wire (608 bits), 76 bytes captured (608 bits) on interface 0
Linux cooked capture
Internet Protocol Version 4, Src: 127.0.0.1, Dst: 127.0.0.1
Transmission Control Protocol, Src Port: 44106 (44106), Dst Port: 5672 (5672), Seq: 1, Ack: 1, Len: 8
Advanced Message Queueing Protocol
Protocol: AMQP
Protocol ID Major: 1
Protocol ID Minor: 1
Version Major: 0
Version Minor: 9
No. Time Source Destination Protocol Length Info
23 1.694175637 127.0.0.1 127.0.0.1 AMQP 556 Connection.Start
Frame 23: 556 bytes on wire (4448 bits), 556 bytes captured (4448 bits) on interface 0
Linux cooked capture
Internet Protocol Version 4, Src: 127.0.0.1, Dst: 127.0.0.1
Transmission Control Protocol, Src Port: 5672 (5672), Dst Port: 44106 (44106), Seq: 1, Ack: 9, Len: 488
Advanced Message Queueing Protocol
Type: Method (1)
Channel: 0
Length: 480
Class: Connection (10)
Method: Start (10)
Arguments
Version-Major: 0
Version-Minor: 9
Server-Properties
capabilities (field table)
publisher_confirms (boolean): true
exchange_exchange_bindings (boolean): true
basic.nack (boolean): true
consumer_cancel_notify (boolean): true
connection.blocked (boolean): true
consumer_priorities (boolean): true
authentication_failure_close (boolean): true
per_consumer_qos (boolean): true
direct_reply_to (boolean): true
cluster_name (string): rabbit@gazella
copyright (string): Copyright (C) 2007-2016 Pivotal Software, Inc.
information (string): Licensed under the MPL. See http://www.rabbitmq.com/
platform (string): Erlang/OTP
product (string): RabbitMQ
version (string): 3.6.3
Mechanisms: 414d51504c41494e20504c41494e
Locales: 656e5f5553
No. Time Source Destination Protocol Length Info
25 1.694549493 127.0.0.1 127.0.0.1 AMQP 248 Connection.Start-Ok
Frame 25: 248 bytes on wire (1984 bits), 248 bytes captured (1984 bits) on interface 0
Linux cooked capture
Internet Protocol Version 4, Src: 127.0.0.1, Dst: 127.0.0.1
Transmission Control Protocol, Src Port: 44106 (44106), Dst Port: 5672 (5672), Seq: 9, Ack: 489, Len: 180
Advanced Message Queueing Protocol
Type: Method (1)
Channel: 0
Length: 172
Class: Connection (10)
Method: Start-Ok (11)
Arguments
Client-Properties
product (string): py-amqp
product_version (string): 1.4.9
capabilities (field table)
connection.blocked (boolean): true
consumer_cancel_notify (boolean): true
Mechanism: AMQPLAIN
Response: 054c4f47494e530000000567756573740850415353574f52...
Locale: en_US
No. Time Source Destination Protocol Length Info
26 1.694693554 127.0.0.1 127.0.0.1 AMQP 88 Connection.Tune
Frame 26: 88 bytes on wire (704 bits), 88 bytes captured (704 bits) on interface 0
Linux cooked capture
Internet Protocol Version 4, Src: 127.0.0.1, Dst: 127.0.0.1
Transmission Control Protocol, Src Port: 5672 (5672), Dst Port: 44106 (44106), Seq: 489, Ack: 189, Len: 20
Advanced Message Queueing Protocol
Type: Method (1)
Channel: 0
Length: 12
Class: Connection (10)
Method: Tune (30)
Arguments
Channel-Max: 0
Frame-Max: 131072
Heartbeat: 60
No. Time Source Destination Protocol Length Info
27 1.694813859 127.0.0.1 127.0.0.1 AMQP 88 Connection.Tune-Ok
Frame 27: 88 bytes on wire (704 bits), 88 bytes captured (704 bits) on interface 0
Linux cooked capture
Internet Protocol Version 4, Src: 127.0.0.1, Dst: 127.0.0.1
Transmission Control Protocol, Src Port: 44106 (44106), Dst Port: 5672 (5672), Seq: 189, Ack: 509, Len: 20
Advanced Message Queueing Protocol
Type: Method (1)
Channel: 0
Length: 12
Class: Connection (10)
Method: Tune-Ok (31)
Arguments
Channel-Max: 65535
Frame-Max: 131072
Heartbeat: 0
No. Time Source Destination Protocol Length Info
28 1.694856112 127.0.0.1 127.0.0.1 AMQP 84 Connection.Open vhost=/
Frame 28: 84 bytes on wire (672 bits), 84 bytes captured (672 bits) on interface 0
Linux cooked capture
Internet Protocol Version 4, Src: 127.0.0.1, Dst: 127.0.0.1
Transmission Control Protocol, Src Port: 44106 (44106), Dst Port: 5672 (5672), Seq: 209, Ack: 509, Len: 16
Advanced Message Queueing Protocol
Type: Method (1)
Channel: 0
Length: 8
Class: Connection (10)
Method: Open (40)
Arguments
Virtual-Host: /
Capabilities:
.... ...0 = Insist: False
No. Time Source Destination Protocol Length Info
30 1.694942196 127.0.0.1 127.0.0.1 AMQP 81 Connection.Open-Ok
Frame 30: 81 bytes on wire (648 bits), 81 bytes captured (648 bits) on interface 0
Linux cooked capture
Internet Protocol Version 4, Src: 127.0.0.1, Dst: 127.0.0.1
Transmission Control Protocol, Src Port: 5672 (5672), Dst Port: 44106 (44106), Seq: 509, Ack: 225, Len: 13
Advanced Message Queueing Protocol
Type: Method (1)
Channel: 0
Length: 5
Class: Connection (10)
Method: Open-Ok (41)
Arguments
Known-Hosts:
No. Time Source Destination Protocol Length Info
31 1.695115508 127.0.0.1 127.0.0.1 AMQP 81 Channel.Open
Frame 31: 81 bytes on wire (648 bits), 81 bytes captured (648 bits) on interface 0
Linux cooked capture
Internet Protocol Version 4, Src: 127.0.0.1, Dst: 127.0.0.1
Transmission Control Protocol, Src Port: 44106 (44106), Dst Port: 5672 (5672), Seq: 225, Ack: 522, Len: 13
Advanced Message Queueing Protocol
Type: Method (1)
Channel: 1
Length: 5
Class: Channel (20)
Method: Open (10)
Arguments
Out-Of-Band:
No. Time Source Destination Protocol Length Info
32 1.695472629 127.0.0.1 127.0.0.1 AMQP 84 Channel.Open-Ok
Frame 32: 84 bytes on wire (672 bits), 84 bytes captured (672 bits) on interface 0
Linux cooked capture
Internet Protocol Version 4, Src: 127.0.0.1, Dst: 127.0.0.1
Transmission Control Protocol, Src Port: 5672 (5672), Dst Port: 44106 (44106), Seq: 522, Ack: 238, Len: 16
Advanced Message Queueing Protocol
Type: Method (1)
Channel: 1
Length: 8
Class: Channel (20)
Method: Open-Ok (11)
Arguments
Channel-Id: <MISSING>
No. Time Source Destination Protocol Length Info
33 1.695706890 127.0.0.1 127.0.0.1 AMQP 131 Basic.Publish x=nameko-rpc rk=53879dd9-6fd4-46a3-bf1e-4eaa7609b3ea
Frame 33: 131 bytes on wire (1048 bits), 131 bytes captured (1048 bits) on interface 0
Linux cooked capture
Internet Protocol Version 4, Src: 127.0.0.1, Dst: 127.0.0.1
Transmission Control Protocol, Src Port: 44106 (44106), Dst Port: 5672 (5672), Seq: 238, Ack: 538, Len: 63
Advanced Message Queueing Protocol
Type: Method (1)
Channel: 1
Length: 55
Class: Basic (60)
Method: Publish (40)
Arguments
[Publish-Number: 1]
Ticket: 0
Exchange: nameko-rpc
Routing-Key: 53879dd9-6fd4-46a3-bf1e-4eaa7609b3ea
.... ...0 = Mandatory: False
.... ..0. = Immediate: False
No. Time Source Destination Protocol Length Info
34 1.695740864 127.0.0.1 127.0.0.1 AMQP 156 Content-Header type=application/json
Frame 34: 156 bytes on wire (1248 bits), 156 bytes captured (1248 bits) on interface 0
Linux cooked capture
Internet Protocol Version 4, Src: 127.0.0.1, Dst: 127.0.0.1
Transmission Control Protocol, Src Port: 44106 (44106), Dst Port: 5672 (5672), Seq: 301, Ack: 538, Len: 88
Advanced Message Queueing Protocol
Type: Content header (2)
Channel: 1
Length: 80
Class ID: Basic (60)
Weight: 0
Body size: 41
Property flags: 0xfc00
Properties
Content-Type: application/json
Content-Encoding: utf-8
Headers
Delivery-Mode: 2
Priority: 0
Correlation-Id: ec23013d-1aeb-4a27-8b62-cce6d5b0943b
No. Time Source Destination Protocol Length Info
36 1.695769214 127.0.0.1 127.0.0.1 AMQP 117 Content-Body
Frame 36: 117 bytes on wire (936 bits), 117 bytes captured (936 bits) on interface 0
Linux cooked capture
Internet Protocol Version 4, Src: 127.0.0.1, Dst: 127.0.0.1
Transmission Control Protocol, Src Port: 44106 (44106), Dst Port: 5672 (5672), Seq: 389, Ack: 538, Len: 49
Advanced Message Queueing Protocol
Type: Content body (3)
Channel: 1
Length: 41
Payload: 7b22726573756c74223a202248656c6c6f2c204d61747421...
I'll try to put some code example for whoever might need the same thing. Thanks.
JFYI: I've made an integration lib for Node.JS (although I'm not a huge Node fan), just for PoC. You can check it out here: https://github.com/and3rson/node-nameko-client
@izmailoff, on which platform did you capture this tcpdump trace?
I'm surprised by the frame that couldn't be dissected (Unknown command/control class 78
). I've seen this before too, but am not able to reproduce it myself.
@mattbennett I used latest Wireshark with latest Fedora at that time (few months old now). If you need the dump files I might still have them.
@izmailoff if you can, that would be great. Presumably you tested it with nameko 2.4.2 and the most recent versions of all the downstream dependencies at the time? Was it is a local rabbit broker?
I figured this out. The dissector is confused by consumers that are already running when the trace starts. In a longer example, you will see Unknown command/control class
messages early in the trace followed by Unknown frame type
messages for every basic-deliver
to those consumers.
If you start the trace before the service(s) everything looks normal.
I wrote a short article on RPC implementation in Nameko. It's not exactly detailed as much as technical docs would be but explains enough to know how it works: http://izmailoff.github.io/architecture/rpc-revived/. Hopefully it will be useful for someone. Pls let know your feedback and if I missed anything if you read it. Cheers.
This is awesome, thanks @izmailoff. Looks good. Only thing to note is that we're actually in the process of removing the exclusivity of reply queues. More detail here: https://github.com/nameko/nameko/issues/359
thanks @mattbennett for the note on #359 , I'll update the post soon
Hi, I'm trying to implement support for RPC in my non-nameko service that connects to RabbitMQ (AMQP). I wasn't able to find any description of how RPC exchange should happen.
I can see that Nameko is serializing RPC call to JSON - this is what I receive on subscriber side:
{"kwargs": {}, "args": ["test string"]}
. I've figured that the queue will be named as'rpc-' + my_service_name
. It seems that there is another exclusive queue created with GUID appended to it's name, which I suppose is used for the response.Do you have any docs or quick info on how exactly RPC exchange happens? It would save lots of reverse-engineering effort for anyone trying to integrate with nameko.
Thanks