minos-framework / minos-python

🐍 Minos is a framework which helps you create reactive microservices in Python
http://www.minos.run
MIT License
465 stars 39 forks source link

Saga Process return a timeout #216

Closed andrea-mucci closed 2 years ago

andrea-mucci commented 2 years ago

Describe the bug the Saga Process return the following error

eboutique-cart-1 | 2022-02-18 19:26:05 [T:MainThread] ERROR:minos.networks.rest.handlers: Raised a system exception: SagaFailedExecutionException(message='There was a failure while \'SagaStepExecution\' was executing: MinosHandlerNotFoundEnoughEntriesException(message="Timeout exceeded while trying to fetch 1 entries from \'04283213e2e04d6583bd835b335083da\'.")')

To Reproduce this is a Command Handler for CartItem POST

@enroute.rest.command("/cart/{uuid}/item", "POST")
    async def create_cart_item(self, request: Request) -> Response:
        """Create a new ``Cart`` instance.

        :param request: The ``Request`` instance.
        :return: A ``Response`` instance.
        """
        data = await request.content()
        params = await request.params()

        saga_execution = await self.saga_manager.run(
            ADD_CART_ITEM, context=SagaContext(cart_uid=params['uuid'], product_uid=data['product'],
                                               quantity=data['quantity'])
        )
        return Response({"saga_uid": saga_execution.uuid})

this is the SAGA process

def _raise_error():
    return ValueError("The Product uid does not exist")

ProductGet = ModelType.build("ProductGet", {"uid": str})

def _get_product(context: SagaContext):
    # check if the product exist
    return SagaRequest("GetProductById", ProductGet(context['product_uid']))

async def _get_product_success(context: SagaContext, response: SagaResponse) -> SagaContext:
    content = await response.content()
    return context

async def _add_item_to_cart(context: SagaContext):
    cart = context['cart_uid']
    product = context['product_uid']
    quantity = context['quantity']
    cart_obj = await CartAggregate.addCartItem(cart, product, quantity)
    return SagaContext(cart=cart_obj['uuid'])

ADD_CART_ITEM = (
    Saga().remote_step(_get_product).on_success(_get_product_success).commit(_add_item_to_cart)
)

this is the test


@pytest.fixture
def add_product():
    add_p_response = requests.post('http://localhost:5566/product', json={
        "title": "Product For Cart",
        "description": "Product used during Cart Test",
        "picture": "images/product.jpeg",
        "categories": [
            {
                "title": "tshirt"
            },
            {
                "title": "casual"
            }
        ],
        "price": {
            "currency": "EUR",
            "units": 59
        }
    })
    content = add_p_response.json()
    return content['uuid']

@pytest.fixture
def add_cart():
    add_c_response = requests.post('http://localhost:5566/cart', json={
        "user": "a6ef81f1-8145-46e3-bd54-52713958cae3",
    })
    content = add_c_response.json()
    return content['uuid']

def test_add_item_to_cart(add_product, add_cart):
    product_uid = add_product
    cart_id = add_cart
    add_c_i_response = requests.post('http://localhost:5566/cart/{}/item'.format(cart_id), json={
        "product": product_uid,
        "quantity": 2
    })
    assert add_c_i_response.status_code == 200

the docker logs return:

eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:minos.networks.rest.handlers: Dispatching '<Request POST /cart/c04e766e-b742-4744-8f85-6d316860975f/item >' from '172.19.0.9'...
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:minos.common.setup: Building a 'BrokerClient' instance from config...
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:minos.common.setup: Setting up a 'BrokerClient' instance...
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:minos.common.setup: Setting up a 'QueuedBrokerSubscriber' instance...
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:minos.common.setup: Setting up a 'InMemoryBrokerSubscriberQueue' instance...
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:minos.common.setup: Setting up a 'KafkaBrokerSubscriber' instance...
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:minos.plugins.kafka.subscriber: Creating {'04283213e2e04d6583bd835b335083da'} topics...
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:kafka.conn: <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('172.19.0.5', 9092)]>: connecting to kafka:9092 [('172.19.0.5', 9092) IPv4]
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:kafka.conn: Probing node bootstrap-0 broker version
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:kafka.conn: <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('172.19.0.5', 9092)]>: Connection complete.
eboutique-product-1     | 2022-02-18 19:25:05 [T:MainThread] INFO:minos.networks.brokers.dispatchers.impl: Dispatching 'BrokerMessageV1(topic='ProductCreated', identifier=UUID('c1f045b8-eb39-4334-897f-42040267128b'), reply_topic=None, strategy=<BrokerMessageV1Strategy.MULTICAST: 'multicast'>, payload=BrokerMessageV1Payload(content=Event(uuid=UUID('f8c26802-cfdc-42f0-810a-6fa05896ca6a'), name='src.aggregates.Product', version=1, action=<Action.CREATE: 'create'>, created_at=datetime.datetime(2022, 2, 18, 19, 25, 5, 134637, tzinfo=datetime.timezone.utc), fields_diff=FieldDiffContainer(title=FieldDiff(name='title', value='Product For Cart'), description=FieldDiff(name='description', value='Product used during Cart Test'), picture=FieldDiff(name='picture', value='images/product.jpeg'), price=FieldDiff(name='price', value=Price(uuid=UUID('37aa637b-03ec-4075-ace7-ed295ea34825'), currency='EUR', units=59)), categories=FieldDiff(name='categories', value=ValueObjectSet(data={Category(title='casual'), Category(title='tshirt')})))), status=<BrokerMessageV1Status.SUCCESS: 200>, headers={}))'...
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:kafka.conn: Broker version identified as 2.5.0
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:kafka.conn: Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:kafka.conn: Probing node bootstrap-0 broker version
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:kafka.conn: Broker version identified as 2.5.0
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:kafka.conn: Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:kafka.conn: <BrokerConnection node_id=1001 host=kafka:9092 <connecting> [IPv4 ('172.19.0.5', 9092)]>: connecting to kafka:9092 [('172.19.0.5', 9092) IPv4]
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:kafka.conn: Probing node 1001 broker version
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:kafka.conn: <BrokerConnection node_id=1001 host=kafka:9092 <connecting> [IPv4 ('172.19.0.5', 9092)]>: Connection complete.
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:kafka.conn: <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connected> [IPv4 ('172.19.0.5', 9092)]>: Closing connection. 
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:kafka.conn: Broker version identified as 2.5.0
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:kafka.conn: Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
eboutique-kafka-1       | [2022-02-18 19:25:05,786] INFO Creating topic 04283213e2e04d6583bd835b335083da with configuration {} and initial partition assignment HashMap(0 -> ArrayBuffer(1001)) (kafka.zk.AdminZkClient)
eboutique-zookeeper-1   | 2022-02-18 19:25:05,788 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException when processing sessionid:0x17f0e4b133f0000 type:setData cxid:0xf6 zxid:0x168 txntype:-1 reqpath:n/a Error Path:/config/topics/04283213e2e04d6583bd835b335083da Error:KeeperErrorCode = NoNode for /config/topics/04283213e2e04d6583bd835b335083da
eboutique-kafka-1       | [2022-02-18 19:25:05,854] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions Set(04283213e2e04d6583bd835b335083da-0) (kafka.server.ReplicaFetcherManager)
eboutique-kafka-1       | [2022-02-18 19:25:05,863] INFO [Log partition=04283213e2e04d6583bd835b335083da-0, dir=/kafka/kafka-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
eboutique-kafka-1       | [2022-02-18 19:25:05,868] INFO Created log for partition 04283213e2e04d6583bd835b335083da-0 in /kafka/kafka-logs/04283213e2e04d6583bd835b335083da-0 with properties {} (kafka.log.LogManager)
eboutique-kafka-1       | [2022-02-18 19:25:05,871] INFO [Partition 04283213e2e04d6583bd835b335083da-0 broker=1001] No checkpointed highwatermark is found for partition 04283213e2e04d6583bd835b335083da-0 (kafka.cluster.Partition)
eboutique-kafka-1       | [2022-02-18 19:25:05,872] INFO [Partition 04283213e2e04d6583bd835b335083da-0 broker=1001] Log loaded for partition 04283213e2e04d6583bd835b335083da-0 with initial high watermark 0 (kafka.cluster.Partition)
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'04283213e2e04d6583bd835b335083da'})
eboutique-cart-1        | 2022-02-18 19:25:05 [T:MainThread] INFO:aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'04283213e2e04d6583bd835b335083da': 1}. 
eboutique-product-1     | 2022-02-18 19:25:05 [T:MainThread] INFO:minos.networks.brokers.dispatchers.impl: Dispatching 'BrokerMessageV1(topic='GetProductById', identifier=UUID('4b49a0ab-8464-4fdd-8513-6c7f66737095'), reply_topic='04283213e2e04d6583bd835b335083da', strategy=<BrokerMessageV1Strategy.UNICAST: 'unicast'>, payload=BrokerMessageV1Payload(content=ProductGet[DTO](uid='f8c26802-cfdc-42f0-810a-6fa05896ca6a'), status=<BrokerMessageV1Status.SUCCESS: 200>, headers={'saga': 'c1a5ac6c-dbd0-4ab1-a0e8-9f79123a5021', 'transactions': 'c1a5ac6c-dbd0-4ab1-a0e8-9f79123a5021'}))'...
eboutique-product-1     | 2022-02-18 19:25:59 [T:MainThread] INFO:minos.aggregate.snapshots.services: Performing periodic Snapshot synchronization...
eboutique-cart-1        | 2022-02-18 19:25:59 [T:MainThread] INFO:minos.aggregate.snapshots.services: Performing periodic Snapshot synchronization...
eboutique-product-1     | 2022-02-18 19:26:00 [T:MainThread] INFO:minos.aggregate.snapshots.services: Performing periodic Snapshot synchronization...
eboutique-cart-1        | 2022-02-18 19:26:00 [T:MainThread] INFO:minos.aggregate.snapshots.services: Performing periodic Snapshot synchronization...
eboutique-cart-1        | 2022-02-18 19:26:05 [T:MainThread] INFO:minos.saga.executions.commit: Successfully rejected!
eboutique-cart-1        | 2022-02-18 19:26:05 [T:MainThread] ERROR:minos.networks.rest.handlers: Raised a system exception: SagaFailedExecutionException(message='There was a failure while \'SagaStepExecution\' was executing: MinosHandlerNotFoundEnoughEntriesException(message="Timeout exceeded while trying to fetch 1 entries from \'04283213e2e04d6583bd835b335083da\'.")')
eboutique-cart-1        | Traceback (most recent call last):
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/minos/networks/brokers/clients.py", line 164, in _get_many
eboutique-cart-1        |     async for message in self.subscriber:
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/minos/networks/brokers/subscribers/abc.py", line 53, in __anext__
eboutique-cart-1        |     return await self.receive()
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/minos/networks/brokers/subscribers/abc.py", line 60, in receive
eboutique-cart-1        |     message = await self._receive()
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/minos/networks/brokers/collections/queues/abc.py", line 47, in dequeue
eboutique-cart-1        |     message = await self._dequeue()
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/minos/networks/brokers/collections/queues/memory.py", line 48, in _dequeue
eboutique-cart-1        |     message = await self._queue.get()
eboutique-cart-1        |   File "/usr/local/lib/python3.9/asyncio/queues.py", line 166, in get
eboutique-cart-1        |     await getter
eboutique-cart-1        | asyncio.exceptions.CancelledError
eboutique-cart-1        | 
eboutique-cart-1        | During handling of the above exception, another exception occurred:
eboutique-cart-1        | 
eboutique-cart-1        | Traceback (most recent call last):
eboutique-cart-1        |   File "/usr/local/lib/python3.9/asyncio/tasks.py", line 490, in wait_for
eboutique-cart-1        |     return fut.result()
eboutique-cart-1        | asyncio.exceptions.CancelledError
eboutique-cart-1        | 
eboutique-cart-1        | The above exception was the direct cause of the following exception:
eboutique-cart-1        | 
eboutique-cart-1        | Traceback (most recent call last):
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/minos/networks/brokers/clients.py", line 152, in receive_many
eboutique-cart-1        |     messages = await wait_for(self._get_many(count, **kwargs), timeout=timeout)
eboutique-cart-1        |   File "/usr/local/lib/python3.9/asyncio/tasks.py", line 492, in wait_for
eboutique-cart-1        |     raise exceptions.TimeoutError() from exc
eboutique-cart-1        | asyncio.exceptions.TimeoutError
eboutique-cart-1        | 
eboutique-cart-1        | During handling of the above exception, another exception occurred:
eboutique-cart-1        | 
eboutique-cart-1        | Traceback (most recent call last):
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/minos/saga/manager.py", line 234, in _get_response
eboutique-cart-1        |     message = await broker.receive(**kwargs)
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/minos/networks/brokers/clients.py", line 142, in receive
eboutique-cart-1        |     return await self.receive_many(*args, **(kwargs | {"count": 1})).__anext__()
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/minos/networks/brokers/clients.py", line 154, in receive_many
eboutique-cart-1        |     raise MinosHandlerNotFoundEnoughEntriesException(
eboutique-cart-1        | minos.networks.exceptions.MinosHandlerNotFoundEnoughEntriesException: Timeout exceeded while trying to fetch 1 entries from '04283213e2e04d6583bd835b335083da'.
eboutique-cart-1        | 
eboutique-cart-1        | During handling of the above exception, another exception occurred:
eboutique-cart-1        | 
eboutique-cart-1        | Traceback (most recent call last):
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/minos/networks/rest/handlers.py", line 137, in _wrapper
eboutique-cart-1        |     response = await response
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/minos/saga/middleware.py", line 42, in transactional_command
eboutique-cart-1        |     return await inner(request)
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/minos/networks/decorators/builders.py", line 170, in _wrapper
eboutique-cart-1        |     response = await response
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/minos/networks/decorators/callables/handlers.py", line 112, in _wrapper
eboutique-cart-1        |     response = await response
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/src/commands/services.py", line 59, in create_cart_item
eboutique-cart-1        |     saga_execution = await self.saga_manager.run(
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/minos/saga/manager.py", line 134, in run
eboutique-cart-1        |     return await self._run_new(
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/minos/saga/manager.py", line 154, in _run_new
eboutique-cart-1        |     return await self._run(execution, **kwargs)
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/minos/saga/manager.py", line 176, in _run
eboutique-cart-1        |     raise exc
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/minos/saga/manager.py", line 172, in _run
eboutique-cart-1        |     await self._run_with_pause_on_memory(execution, **kwargs)
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/minos/saga/manager.py", line 227, in _run_with_pause_on_memory
eboutique-cart-1        |     raise exc
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/minos/saga/manager.py", line 220, in _run_with_pause_on_memory
eboutique-cart-1        |     response = await self._get_response(broker, execution, **kwargs)
eboutique-cart-1        |   File "/usr/local/lib/python3.9/site-packages/minos/saga/manager.py", line 237, in _get_response
eboutique-cart-1        |     raise SagaFailedExecutionException(exc)
eboutique-cart-1        | minos.saga.exceptions.SagaFailedExecutionException: There was a failure while 'SagaStepExecution' was executing: MinosHandlerNotFoundEnoughEntriesException(message="Timeout exceeded while trying to fetch 1 entries from '04283213e2e04d6583bd835b335083da'.")
eboutique-apigateway-1  | 172.19.0.1 [18/Feb/2022:19:25:05 +0000] "POST /cart/c04e766e-b742-4744-8f85-6d316860975f/item HTTP/1.1" 500 196 "-" "python-requests/2.27.1"
vladyslav-fenchak commented 2 years ago

Solved in #220

vladyslav-fenchak commented 2 years ago

Related to #229