nameko / nameko-amqp-retry

Other
24 stars 4 forks source link

RPC retry entrypoints can't retry other RPC retry entrypoints #29

Open gchai87 opened 5 years ago

gchai87 commented 5 years ago

Describe the bug Retrying rpc endpoint with decorator imported from nameko-amqp-retry will not work if it then makes a call to another rpc endpoint with decorator imported from nameko-amqp-retry.

To Reproduce setting up 2 services both with nameko_amqp_retry rpc entrypoints

service1

import logging

from nameko.rpc import RpcProxy from nameko_amqp_retry import entrypoint_retry from nameko_amqp_retry.rpc import rpc

logger = logging.getLogger(name)

class Service1():

name = "service1"

service2_rpc = RpcProxy("service2")

@rpc
@entrypoint_retry(retry_for=KeyError, limit=2, schedule=[1000])
def call_service2(self):
    logger.info(self.service2_rpc.do_bad_thing())
    a = {"bad": "thing"}
    return a["good"]

service2

import logging

from nameko_amqp_retry.rpc import rpc

logger = logging.getLogger(name)

class Service2():

name = "service2"

@rpc
def do_bad_thing(self):
    return "Works the first time only."

Call service 2 with service1 n.rpc.service1.call_service2()

results in KeyError as expected first but on retry it hits a MethodNotFound error instead of second KeyError:

nameko run --config deploy/core/config.yaml service1.services.core.service:Service1 2019-03-28 16:42:15,484 [INFO] [studapart-service] [nameko.runners] starting services: service1 2019-03-28 16:42:15,545 [INFO] [studapart-service] [kombu.mixins] Connected to amqp://guest:*@127.0.0.1:5672// 2019-03-28 16:42:16,349 [INFO] [studapart-service] [service1.services.core.service] Works the first time only. 2019-03-28 16:42:16,349 [WARNING] [studapart-service] [nameko.containers] (expected) error handling worker <WorkerContext [service1.call_service2] at 0x1117c1b70>: Backoff(uninitialised) Traceback (most recent call last): File "/Users/gchai/.virtualenvs/svc1/lib/python3.6/site-packages/nameko_amqp_retry/decorators.py", line 64, in wrapper return wrapped(args, **kwargs) File "/Users/gchai/project/service1/src/service1/services/core/service.py", line 21, in call_service2 return a["good"] KeyError: 'good'

The above exception was the direct cause of the following exception:

Traceback (most recent call last): File "/Users/gchai/.virtualenvs/svc1/lib/python3.6/site-packages/nameko/containers.py", line 392, in _run_worker result = method(*worker_ctx.args, worker_ctx.kwargs) File "/Users/gchai/.virtualenvs/svc1/lib/python3.6/site-packages/nameko_amqp_retry/decorators.py", line 66, in wrapper six.raise_from(backoff_cls(), exc) File "", line 3, in raise_from nameko_amqp_retry.decorators.backoff_factory..CustomBackoff: Backoff(uninitialised) 2019-03-28 16:42:17,288 [ERROR] [studapart-service] [nameko.containers] error handling worker <WorkerContext [service1.call_service2] at 0x111807710>: call_service2 Traceback (most recent call last): File "/Users/gchai/.virtualenvs/svc1/lib/python3.6/site-packages/nameko/containers.py", line 392, in _run_worker result = method(*worker_ctx.args, *worker_ctx.kwargs) File "/Users/gchai/.virtualenvs/svc1/lib/python3.6/site-packages/nameko_amqp_retry/decorators.py", line 64, in wrapper return wrapped(args, kwargs) File "/Users/gchai/project/service1/src/service1/services/core/service.py", line 19, in call_service2 logger.info(self.service2_rpc.do_bad_thing()) File "/Users/gchai/.virtualenvs/svc1/lib/python3.6/site-packages/nameko/rpc.py", line 373, in call return reply.result() File "/Users/gchai/.virtualenvs/svc1/lib/python3.6/site-packages/nameko/rpc.py", line 331, in result raise deserialize(error) nameko.exceptions.MethodNotFound: call_service2

Expected behavior Not hitting MethodNotFound exceptions. Example above should retry KeyError twice and give up.

Environment (please complete the following information):

ketgo commented 5 years ago

Is this issue because the dead letter exchange is the same for all services? https://github.com/nameko/nameko-amqp-retry/blob/0d36cbdbf25d23ab3cf2ff1ce3761861bcf28c47/nameko_amqp_retry/backoff.py#L102

ketgo commented 5 years ago

Never mind, that does not seam to be the problem here. Just tested the following code and still see the same error.

Code:

import logging

from nameko.rpc import RpcProxy
from nameko_amqp_retry import Backoff, BackoffPublisher
from nameko_amqp_retry.rpc import Rpc
from kombu.messaging import Exchange

logger = logging.getLogger(__name__)

class CustomBackoffPublisherService1(BackoffPublisher):
    @property
    def exchange(self):
        backoff_exchange = Exchange(
            type="headers",
            name="service1-backoff"
        )
        return backoff_exchange

class CustomRpcService1(Rpc):
    backoff_publisher = CustomBackoffPublisherService1()

rpc1 = CustomRpcService1.decorator

class Service1:
    name = "service1"
    service2_rpc = RpcProxy("service2")

    @rpc1
    def call_service2(self):
        try:
            logger.info('service2: {}'.format(self.service2_rpc.do_bad_thing()))
            a = {"bad": "thing"}
            return a["good"]
        except KeyError:
            raise Backoff()

class CustomBackoffPublisherService2(BackoffPublisher):
    @property
    def exchange(self):
        backoff_exchange = Exchange(
            type="headers",
            name="service2-backoff"
        )
        return backoff_exchange

class CustomRpcService2(Rpc):
    backoff_publisher = CustomBackoffPublisherService2()

rpc2 = CustomRpcService2.decorator

class Service2:
    name = "service2"

    @rpc2
    def do_bad_thing(self):
        return "Works the first time only."

Logs:

(venv) ketan@Other:~/Projects/pyMSC/scheduler/profile/service$ nameko run --config config.yaml tests.integration.test_retry:Service1
starting <QueueConsumer at 0x7f45755585c0>
waiting for consumer ready <QueueConsumer at 0x7f45755585c0>
Connected to amqp://guest:**@127.0.0.1:5672//
setting up consumers <QueueConsumer at 0x7f45755585c0>
consumer started <QueueConsumer at 0x7f45755585c0>
started <QueueConsumer at 0x7f45755585c0>
invoking <proxy method: service2.do_bad_thing>
Waiting for RPC reply event <nameko.rpc.RpcReply object at 0x7f4575bcdc50>
RPC reply event complete <nameko.rpc.RpcReply object at 0x7f4575bcdc50> {'result': 'Works the first time only.', 'error': None}
service2: Works the first time only.
invoking <proxy method: service2.do_bad_thing>
Waiting for RPC reply event <nameko.rpc.RpcReply object at 0x7f4575c84dd8>
RPC reply event complete <nameko.rpc.RpcReply object at 0x7f4575c84dd8> {'result': None, 'error': {'exc_type': 'MethodNotFound', 'exc_path': 'nameko.exceptions.MethodNotFound', 'exc_args': ['call_service2'], 'value': 'call_service2'}}
ketgo commented 5 years ago

Using the nameko_tracer extension and adding a print statement in MethodProxy class (see link) after line 433 (see link), I was finally able to debug the issue. Here the MethodProxy class is used internally by RpcProxy.

Problem

The headers for dead letter queue of service 1 are being sent to service 2 which results in the observed exception. See extra_headers printed for nameko.rpc.MethodProxy in the below logs for details.

starting <QueueConsumer at 0x7f3466c713c8>
waiting for consumer ready <QueueConsumer at 0x7f3466c713c8>
Connected to amqp://guest:**@127.0.0.1:5672//
setting up consumers <QueueConsumer at 0x7f3466c713c8>
consumer started <QueueConsumer at 0x7f3466c713c8>
started <QueueConsumer at 0x7f3466c713c8>
[nameko_tracer] {'timestamp': datetime.datetime(2019, 9, 9, 6, 28, 12, 994320), 'hostname': 'Other', 'service': 'service1', 'entrypoint_type': 'Rpc', 'entrypoint_name': 'call_service2', 'context_data': {}, 'call_id': 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792', 'call_id_stack': ['standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792'], 'origin_call_id': 'standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'stage': 'request', 'call_args': {}, 'call_args_redacted': False}
[service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792] entrypoint call trace
invoking <proxy method: service2.do_bad_thing>
<class 'nameko.rpc.MethodProxy'>:
         exchange: Exchange nameko-rpc(topic)
         routing_key: service2.do_bad_thing
         reply_to: c006b4e3-5125-46bb-a84a-0fa5c97f3c38
         correlation_id: 9451ad22-1956-42a6-8963-0cbc91ca4e35
         extra_headers: {'nameko.call_id_stack': ['standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792']}
Waiting for RPC reply event <nameko.rpc.RpcReply object at 0x7f3466df4eb8>
RPC reply event complete <nameko.rpc.RpcReply object at 0x7f3466df4eb8> {'result': 'Works the first time only.', 'error': None}
service2: Works the first time only.
[nameko_tracer] {'timestamp': datetime.datetime(2019, 9, 9, 6, 28, 13, 72546), 'hostname': 'Other', 'service': 'service1', 'entrypoint_type': 'Rpc', 'entrypoint_name': 'call_service2', 'context_data': {}, 'call_id': 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792', 'call_id_stack': ['standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792'], 'origin_call_id': 'standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'stage': 'response', 'call_args': {}, 'call_args_redacted': False, 'response_status': 'error', 'exception_type': 'CustomBackOff', 'exception_path': 'tests.integration.test_retry.CustomBackOff', 'exception_args': [], 'exception_value': 'Backoff(retry #1 in 1000ms)', 'exception_traceback': 'Traceback (most recent call last):\n  File "./tests/integration/test_retry.py", line 45, in call_service2\n    return a["good"]\nKeyError: \'good\'\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File "/home/ketan/Projects/pyMSC/scheduler/profile/venv/lib/python3.6/site-packages/nameko/containers.py", line 392, in _run_worker\n    result = method(*worker_ctx.args, **worker_ctx.kwargs)\n  File "./tests/integration/test_retry.py", line 47, in call_service2\n    raise CustomBackOff()\ntests.integration.test_retry.CustomBackOff: Backoff(retry #1 in 1000ms)\n', 'exception_expected': True, 'response_time': 0.078226}
[service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792] entrypoint result trace
[nameko_tracer] {'timestamp': datetime.datetime(2019, 9, 9, 6, 28, 14, 68293), 'hostname': 'Other', 'service': 'service1', 'entrypoint_type': 'Rpc', 'entrypoint_name': 'call_service2', 'context_data': {'backoff': 1000, 'rpc_method_id': 'service1.call_service2', 'x-death': [{'count': 1, 'reason': 'expired', 'queue': 'backoff--1000ms', 'time': '2019-09-09 06:28:14', 'exchange': 'backoff', 'routing-keys': ['rpc-service1'], 'original-expiration': '1000'}], 'x-first-death-exchange': 'backoff', 'x-first-death-queue': 'backoff--1000ms', 'x-first-death-reason': 'expired'}, 'call_id': 'service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945', 'call_id_stack': ['standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792.backoff', 'service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945'], 'origin_call_id': 'standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'stage': 'request', 'call_args': {}, 'call_args_redacted': False}
[service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945] entrypoint call trace
invoking <proxy method: service2.do_bad_thing>
<class 'nameko.rpc.MethodProxy'>:
         exchange: Exchange nameko-rpc(topic)
         routing_key: service2.do_bad_thing
         reply_to: c006b4e3-5125-46bb-a84a-0fa5c97f3c38
         correlation_id: 09d6f40e-bb68-4f8e-a98b-87d99373c56d
         extra_headers: {'nameko.backoff': 1000, 'nameko.rpc_method_id': 'service1.call_service2', 'nameko.x-death': [{'count': 1, 'reason': 'expired', 'queue': 'backoff--1000ms', 'time': datetime.datetime(2019, 9, 9, 6, 28, 14), 'exchange': 'backoff', 'routing-keys': ['rpc-service1'], 'original-expiration': '1000'}], 'nameko.x-first-death-exchange': 'backoff', 'nameko.x-first-death-queue': 'backoff--1000ms', 'nameko.x-first-death-reason': 'expired', 'nameko.call_id_stack': ['standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792.backoff', 'service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945']}
Waiting for RPC reply event <nameko.rpc.RpcReply object at 0x7f3466d65160>
RPC reply event complete <nameko.rpc.RpcReply object at 0x7f3466d65160> {'result': None, 'error': {'exc_type': 'MethodNotFound', 'exc_path': 'nameko.exceptions.MethodNotFound', 'exc_args': ['call_service2'], 'value': 'call_service2'}}
[nameko_tracer] {'timestamp': datetime.datetime(2019, 9, 9, 6, 28, 14, 96395), 'hostname': 'Other', 'service': 'service1', 'entrypoint_type': 'Rpc', 'entrypoint_name': 'call_service2', 'context_data': {'backoff': 1000, 'rpc_method_id': 'service1.call_service2', 'x-death': [{'count': 1, 'reason': 'expired', 'queue': 'backoff--1000ms', 'time': '2019-09-09 06:28:14', 'exchange': 'backoff', 'routing-keys': ['rpc-service1'], 'original-expiration': '1000'}], 'x-first-death-exchange': 'backoff', 'x-first-death-queue': 'backoff--1000ms', 'x-first-death-reason': 'expired'}, 'call_id': 'service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945', 'call_id_stack': ['standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792.backoff', 'service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945'], 'origin_call_id': 'standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'stage': 'response', 'call_args': {}, 'call_args_redacted': False, 'response_status': 'error', 'exception_type': 'MethodNotFound', 'exception_path': 'nameko.exceptions.MethodNotFound', 'exception_args': ['call_service2'], 'exception_value': 'call_service2', 'exception_traceback': 'Traceback (most recent call last):\n  File "/home/ketan/Projects/pyMSC/scheduler/profile/venv/lib/python3.6/site-packages/nameko/containers.py", line 392, in _run_worker\n    result = method(*worker_ctx.args, **worker_ctx.kwargs)\n  File "./tests/integration/test_retry.py", line 43, in call_service2\n    logger.info(\'service2: {}\'.format(self.service2_rpc.do_bad_thing()))\n  File "/home/ketan/Projects/pyMSC/scheduler/profile/venv/lib/python3.6/site-packages/nameko/rpc.py", line 373, in __call__\n    return reply.result()\n  File "/home/ketan/Projects/pyMSC/scheduler/profile/venv/lib/python3.6/site-packages/nameko/rpc.py", line 331, in result\n    raise deserialize(error)\nnameko.exceptions.MethodNotFound: call_service2\n', 'exception_expected': False, 'response_time': 0.028102}
[service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945] entrypoint result trace

Solution

Remove the extra headers added for dead letter queue and backoff. This can be achieved by using the following custom rpc proxy:

# utils.py module

from nameko.rpc import RpcProxy as NamekoRpcProxy, ServiceProxy

class CustomRpcProxy(NamekoRpcProxy):

    dead_letter_properties = ['x-death', 'x-first-death-exchange', 'x-first-death-queue', 'x-first-death-reason']
    backoff_properties = ['backoff', 'rpc_method_id']

    def get_dependency(self, worker_ctx):
        # Removing dead letter queue and backoff headers
        for key in self.dead_letter_properties + self.backoff_properties:
            if key in worker_ctx.data:
                worker_ctx.data.pop(key)
        return ServiceProxy(
            worker_ctx,
            self.target_service,
            self.rpc_reply_listener,
            **self.options
        )

The following code worked for me as desired:

# sevice.py module

import logging

from .utils import CustomRpcProxy

from nameko_amqp_retry import Backoff
from nameko_amqp_retry.rpc import rpc
from nameko_tracer import Tracer

logger = logging.getLogger(__name__)

class CustomBackOff(Backoff):
    limit = 2
    schedule = [1000]

class Service1:
    name = "service1"
    service2_rpc = CustomRpcProxy("service2")
    tracer = Tracer()

    @rpc
    def call_service2(self):
        try:
            logger.info('service2: {}'.format(self.service2_rpc.do_bad_thing()))
            a = {"bad": "thing"}
            return a["good"]
        except KeyError:
            raise CustomBackOff()

class Service2:
    name = "service2"
    tracer = Tracer()

    @rpc
    def do_bad_thing(self):
        return "Works the first time only."

Hope that helps! If there is a more elegant solution do please let me know. Thanks!

mattbennett commented 5 years ago

Thanks for doing this investigation @ketgo.

I think the only header that's causing a problem is rpc_method_id. It's caused by this conditional:

https://github.com/nameko/nameko-amqp-retry/blob/0d36cbdbf25d23ab3cf2ff1ce3761861bcf28c47/nameko_amqp_retry/rpc.py#L15-L18

The conditional is required because the message republisher has to use the routing_key property with a different value.

If you unset just that header in a custom RpcProxy I think it'll work. I'd welcome a pull request with a fix for this if you're able to contribute one.

ketgo commented 5 years ago

@mattbennett Sure, will send one.