awslabs / aws-crt-python

Python bindings for the AWS Common Runtime
Apache License 2.0
87 stars 43 forks source link

Consecutive publish() is clobbered or mixed up somehow #501

Closed hoIIer closed 1 year ago

hoIIer commented 1 year ago

Describe the bug

I am trying to publish the same payload to two separate topics sequentially (serverless python api req).

        session.mqtt.publish(f'groups/{message.group_id}/messages', {
            'payload': <payload>,
        })
        session.mqtt.publish(f'groups/{message.group_id}/preview', {
            'payload': <payload>,
        })

The client is subscribed to groups/<group_id>/preview only.

What happens is the client receives an event from topic groups/<group_id>/messages, and does not receive one from the 2nd /preview topic.

I create the mqtt connection via a global session object as below (wondering if this is part of my problem?):

AWSSession:
    ...

    @cached_property
    def _mqtt(self):
        """
        Creates an mqtt connection to the AWS IoT Core service using
        the iot domain endpoint passed in from the environment.

        Additionally overrides the mqtt.publish function to set global
        QoS and enable parsing the payload dict in one place. This is
        to make the ergonomics of working with mqtt slightly nicer.

        Example: session.mqtt.publish('my/topic', {'message': 'hello'})
        """
        ...
        mqtt_conn = (
            mqtt_connection_builder
            .websockets_with_default_aws_signing(
                ...,
            )
        )

        # connect.
        conn = mqtt_conn.connect()

        # resolve the future.
        conn.result()

        return mqtt_conn

    @property
    def mqtt(self):
        conn = self._mqtt

        # use partial for publish defaults.
        pub_orig = conn.publish

        def publish(topic, payload):
            pub_orig(topic, json.dumps(payload), mqtt.QoS.AT_LEAST_ONCE)

        conn.publish = publish

        return conn

Expected Behavior

I expect that sequential mqtt publish() calls would work as expected and publish both messages to the correct topics without clobbering one or the other.

Current Behavior

The client receives the event from the first publish() despite not being subscribed to that topic and actually being subscribed to the topic from the second publish()

Reproduction Steps

Included my code above.

Possible Solution

No response

Additional Information/Context

No response

aws-crt-python version used

awscrt==0.16.21

Python version used

3.9.17

Operating System and version

amazon linux 4

bretambrose commented 1 year ago

Rather than indirectly invoke publish through intermediate, dynamically-created function objects, what happens if you just, in one function, invoke connect -> subscribe -> publish1 -> publish2 -> wait-on-publish-received directly?

My first-glance reaction is that there is a scope or function object capture issue which is causing the actual publish call to be invoked with the wrong topic.

hoIIer commented 1 year ago

Ok I tried this directly where the calls to publish() happen, part of regular http request:

        mqtt_conn = (
            mqtt_connection_builder
            .websockets_with_default_aws_signing(
                ...,
            )
        )

        # connect.
        conn = mqtt_conn.connect()

        # resolve the future.
        conn.result()

        # publish the message to message topics.
        mqtt_conn.publish(f'groups/{message.group_id}/messages', json.dumps({
            'payload': <payload>,
        }), mqtt.QoS.AT_LEAST_ONCE)
        mqtt_conn.publish(f'groups/{message.group_id}/preview', json.dumps({
            'payload': <payload>,
        }), mqtt.QoS.AT_LEAST_ONCE)

It sends both events, however it's weird that the first one is received by the client because the client is only subscribed to the second topic groups/<group_id>/preview. Also the second event to the correct topic has an empty payload.

I've verified this to be sure!

Wonder how/what causes the session object to clobber it? I use Starlette (asgi) served via lambda.

hoIIer commented 1 year ago

I just tried logging out of the user I was testing against and noticed it seems to work fine for unauthenticated user OR a different user.

So I think that maybe somehow the user I was testing with may have stale subscription to the other topic somehow? Unsure how to wipe that session on iot but I think it works in other cases. :X