dapr / components-contrib

Community driven, reusable components for distributed apps
Apache License 2.0
548 stars 479 forks source link

Mqtt pubsub components delivers pending messages multiple times on startup when multiple subscriptions configured #2306

Open bwojdyla opened 1 year ago

bwojdyla commented 1 year ago

Expected Behavior

Pending messages should be delivered once when consumer starts for qos=2. Same issue happens when qos=1. One might argue that qos=1 states 'At least once'. But we might expect redelivery in case of some failure. But in scenario below it looks like regular behavior on startup.

Actual Behavior

If there are some pending messages when consumer starts then they might be delivered more than once.

Steps to Reproduce the Problem

Configure 2 subscribtions for mqtt component:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: testapp-pubsub
spec:
  type: pubsub.mqtt
  version: v1
  metadata:
  - name: url
    value: "abc"
  - name: qos
    value: 2
  - name: retain
    value: "false"
  - name: cleanSession
    value: "false"
  - name: backOffMaxRetries
    value: "5"

Subscription 1

apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
  name: subscription1 
spec:
  pubsubname: testapp-pubsub
  topic: A
  route: /api/route1

Subscription 2

apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
  name: subscription2
spec:
  pubsubname: testapp-pubsub
  topic: B
  route: /api/route2
  1. Start dapr and application so that queue is created
  2. Stop dapr
  3. Publish one messages for each topic A and B
  4. Start dapr and application
  5. You should see following behavior
time="2022-11-22T07:43:51.388742594Z" level=info msg="app is subscribed to the following topics: [A B] through pubsub=testapp-pubsub" app_id=app.api instance=1d8b7a879e39 scope=dapr.runtime type=log ver=edge
time="2022-11-22T07:43:51.388814521Z" level=debug msg="subscribing to topic='A' on pubsub='testapp-pubsub'" app_id=app.api instance=1d8b7a879e39 scope=dapr.runtime type=log ver=edge
time="2022-11-22T07:43:51.388846082Z" level=info msg="initializing the subscriber" app_id=app.api instance=1d8b7a879e39 scope=dapr.contrib type=log ver=edge
time="2022-11-22T07:43:51.390399924Z" level=debug msg="subscribing to topic='B' on pubsub='testapp-pubsub'" app_id=app.api instance=1d8b7a879e39 scope=dapr.runtime type=log ver=edge
time="2022-11-22T07:43:51.390475906Z" level=info msg="re-initializing the subscriber" app_id=app.api instance=1d8b7a879e39 scope=dapr.contrib type=log ver=edge
time="2022-11-22T07:43:51.390476618Z" level=debug msg="Processing MQTT message A#38 (retained=false)" app_id=app.api instance=1d8b7a879e39 scope=dapr.contrib type=log ver=edge
time="2022-11-22T07:43:51.397443966Z" level=info msg="dapr initialized. Status: Running. Init Elapsed 172ms" app_id=app.api instance=1d8b7a879e39 scope=dapr.runtime type=log ver=edge
time="2022-11-22T07:43:51.397464075Z" level=debug msg="Processing MQTT message B#37 (retained=false)" app_id=app.api instance=1d8b7a879e39 scope=dapr.contrib type=log ver=edge
time="2022-11-22T07:43:51.59255471Z" level=debug msg="Done processing MQTT message B#37; sending ACK" app_id=app.api instance=1d8b7a879e39 scope=dapr.contrib type=log ver=edge
time="2022-11-22T07:43:51.592660606Z" level=debug msg="Processing MQTT message A#38 (retained=false)" app_id=app.api instance=1d8b7a879e39 scope=dapr.contrib type=log ver=edge
time="2022-11-22T07:43:51.592784253Z" level=debug msg="Done processing MQTT message A#38; sending ACK" app_id=app.api instance=1d8b7a879e39 scope=dapr.contrib type=log ver=edge
time="2022-11-22T07:43:51.601196029Z" level=debug msg="Done processing MQTT message A#38; sending ACK" app_id=app.api instance=1d8b7a879e39 scope=dapr.contrib type=log ver=edge
time="2022-11-22T07:44:21.227996205Z" level=debug msg="Refreshing all mDNS addresses." app_id=app.api instance=1d8b7a879e39 scope=dapr.contrib type=log ver=edge
time="2022-11-22T07:44:21.228190037Z" level=debug msg="no mDNS apps to refresh." app_id=app.api instance=1d8b7a879e39 scope=dapr.contrib type=log ver=edge
  1. Dapr is subscribing for topic A
  2. Dapr is subscribing for topic B
  3. Pending messages for to topic A are processed twice.

Notice that:

  1. If there were more subscriptions for different topics messages might be processed more times
  2. Which messages are processed multiple times is determined by subscriptions order. If there were 3 topics and subscriptions order was A => B => C then messages for topic A would be processed 3 times, B - 2 times, C once
  3. Subscriptions order is not deterministic
  4. You can send messages only to topic A to reproduce issue but subscription order is relevant.

Mosquitto logs:

1669103031: New connection from 172.18.0.4 on port 1883.
1669103031: New client connected from 172.18.0.4 as app.api-producer (p2, c0, k30).
1669103031: New connection from 172.18.0.4 on port 1883.
1669103031: New client connected from 172.18.0.4 as app.api-consumer (p2, c0, k30).
1669103031: Client app.api-consumer disconnected.
1669103031: New connection from 172.18.0.4 on port 1883.
1669103031: New client connected from 172.18.0.4 as app.api-consumer (p2, c0, k30).

For each topic (but not first) disconnect occurs. Maybe this is the source of problem?

DeepanshuA commented 1 year ago

I am able to reproduce it. Issue: Start subscribers S1, S2 and S3:::::: S1 -> S2 -> S3

Topics are formed

Stop subscribers

Publish Messages on these topics S1, S2 and S3

Start back subscribers. These pending messages will be subscribed multiple times.

Now, there are 2 questions: Q.1. Why Dapr runtime subscribes to topics 1 by 1. Why it can't subscribe for all topics in one go? I have created https://github.com/dapr/dapr/issues/5571 to discuss and conclude on that. Q.2. Even if Dapr runtime subscribes to messages 1 by 1, is there a way out for now on component side? Even if Dapr runtime is changed, it will be a big task and will need consensus to be arrived and then actual work will require time [being very optimistic], but what can be the way out right now? Can MQTT delay receiving of messages after subscription start, so that all topics are subscribed and only then messages are received. It will ensure that messages are not repeated in the pattern due to which this issue is filed. But, how to determine this delay. This sounds like a error-prone hack. Rather, can MQTT or similar kinda component start subscription only when it has known all topics to be subscribed. And, before that, it was just adding topic list. But, how do we know that inside component. This is doable, if Dapr runtime can send info to component that what topic is last topic to be subscribed to. This can be achieved, if we send some flag like lastSubscriptionTopic along with topic metadata set to true.

github-actions[bot] commented 1 year ago

This issue has been automatically marked as stale because it has not had activity in the last 30 days. It will be closed in the next 7 days unless it is tagged (pinned, good first issue, help wanted or triaged/resolved) or other activity occurs. Thank you for your contributions.

github-actions[bot] commented 1 year ago

This issue has been automatically closed because it has not had activity in the last 37 days. If this issue is still valid, please ping a maintainer and ask them to label it as pinned, good first issue, help wanted or triaged/resolved. Thank you for your contributions.