aws-amplify / amplify-js

A declarative JavaScript library for application development using cloud services.
https://docs.amplify.aws/lib/q/platform/js
Apache License 2.0
9.43k stars 2.13k forks source link

PubSub unsubscribe breaks subscribe to topics again #4064

Closed mevert closed 4 years ago

mevert commented 5 years ago

Describe the bug Updating aws-amplify version to "1.1.40" broke our PubSub subscribe function when calling it again after unsubscribe.

For example, if I subscribe to ['myTopic1','myTopic2'] and then call unsubscribe and subscribe again to those topics later I can no longer receive any IoT messages to related topics. I do not receive any errors when subscribing again. it just does not work.

If I manually use "@aws-amplify/pubsub": "1.1.0", I do not have the same problem. However, with the latest "@aws-amplify/pubsub": "1.1.2" this problem exists.

To Reproduce

  1. install aws-amplify "1.1.40" or "@aws-amplify/pubsub": "1.1.2"
  2. subscribe to some topics let subscriptions = PubSub.subscribe(['myTopic1','myTopic2']).subscribe({...
  3. Send IoT messages to those topics and test that client receives messages correctly.
  4. call subscriptions.unsubscribe()
  5. Subscribe again to same topics subscriptions = PubSub.subscribe(['myTopic1','myTopic2']).subscribe({...
  6. Send IoT messages to those topics.
  7. No messages are received.

We use the following Amplify configurations:

Amplify.configure({Auth: { mandatorySignIn: false, region: ..., identityPoolId: ..., userPoolWebClientId: ... }...

Amplify.addPluggable(new AWSIoTProvider({ aws_pubsub_region: ..., aws_pubsub_endpoint: ... }))

Expected behavior I expect AWS PubSub to work again after unsubscribe.

Related discussion: https://github.com/aws-amplify/amplify-js/issues/3039 https://github.com/aws-amplify/amplify-js/issues/2692

Aws-amplify PubSub seems to be very unreliable solution for Websockets. We have been using it for 1-2 years now and had a lot of different connection problems. For us it is really important that our clients have successful websocket connection that is easy to reinitialise when connection breaks or fails. Our clients are connected to multiple different topics and topics are sometimes unsubscribed and subscribed again. If someone has more reliable solutions and better libraries to recommend related to websockets in AWS environment, I would appreciate it.

elorzafe commented 5 years ago

@mevert I am sorry you are experiencing this problems.

Can you give more information about your IoT endpoints (in which region are they), are you trying to subscribe with authenticated or guest users?, How are you configuring permissions?. Is it a Web App (which framework are you using?) or React-Native App (iOS or Android or both)?

elorzafe commented 5 years ago

@mevert I tried to reproduce your problem by unsubscribing and resubscribing having multiple topics, still no issue found. If you can share the code snippet that you are using will it can be useful.

import React from 'react';
import logo from './logo.svg';
import './App.css';

import Amplify, { PubSub } from 'aws-amplify';
import { AWSIoTProvider } from "@aws-amplify/pubsub/lib/Providers";

Amplify.configure({
  Auth: {
    identityPoolId: "us-west-2:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx",
    region: 'us-west-2'
  }
})

Amplify.addPluggable(new AWSIoTProvider({
  aws_pubsub_endpoint: 'wss://xxxxxxxxxxxxx-ats.iot.us-west-2.amazonaws.com/mqtt',
  aws_pubsub_region: 'us-west-2'
}));

Amplify.Logger.LOG_LEVEL = 'DEBUG';

function subscriptionWrapper() {
  let subscription;
  function subscribe() {
    if (!subscription) {
      console.log('subscribing');
      subscription = PubSub.subscribe(['myTopic1', 'myTopic2']).subscribe({
        next: data => console.log(JSON.stringify(data, null, 2)),
        error: error => console.log(JSON.stringify(error, null, 2)),
        close: () => console.log('Done'),
      });
    }
  }

  function unsubscribe() {
    if (subscription) {
      console.log('unsubscribing');
      subscription.unsubscribe();
      subscription = null;
    }
  }
  return {
    subscribe,
    unsubscribe
  }
}

function App() {
  const subscriptionHandler = subscriptionWrapper();

  function publish() {
    console.log('publishing to myTopic')
    PubSub.publish('myTopic1', 'Hello to all subscribers!');
  }
  return (
    <div className="App">
      <header className="App-header">
        <img src={logo} className="App-logo" alt="logo" />
        <p>
          Edit <code>src/App.js</code> and save to reload.
        </p>
        <a
          className="App-link"
          href="https://reactjs.org"
          target="_blank"
          rel="noopener noreferrer"
        >
          Learn React
        </a>
      </header>
      <button onClick={publish}>publish</button>
      <button onClick={subscriptionHandler.subscribe}>subscribe</button>
      <button onClick={subscriptionHandler.unsubscribe}>unsubscribe</button>
    </div>
  );
}

export default App;
elorzafe commented 5 years ago

@mevert do you have multiple subscriptions?. I could reproduce that problem when unsubscribing one. I will submit a PR to fix this soon.

Thanks for pointing this issue

mevert commented 5 years ago

@mevert do you have multiple subscriptions?. I could reproduce that problem when unsubscribing one. I will submit a PR to fix this soon.

Thanks for pointing this issue

Thanks for the reply. This is for guest (unauthenticated) users. So we do not need to set specific IoT policies in this case.

I created you the simplest possible example code for this problem:

import React from 'react';
import './App.css';

import Amplify, { PubSub } from 'aws-amplify';
import { AWSIoTProvider } from "@aws-amplify/pubsub/lib/Providers";

Amplify.configure({
  Auth: {
    mandatorySignIn: false,
    region: 'eu-central-1',
    identityPoolId: "eu-central-1:xxx",
    userPoolWebClientId: 'xxx'
  }
})

Amplify.addPluggable(new AWSIoTProvider({
  aws_pubsub_endpoint: 'wss://xxx-ats.iot.eu-central-1.amazonaws.com/mqtt',
  aws_pubsub_region: 'eu-central-1'
}));

Amplify.Logger.LOG_LEVEL = 'DEBUG';

let subscription

function testSubscribe() {
  if (subscription) {
    subscription.unsubscribe()
    console.log('unsubscribed')
  }

  subscription = PubSub.subscribe(['myTopic1', 'myTopic2']).subscribe({
    next: data => console.log(JSON.stringify(data, null, 2)),
    error: error => console.log(JSON.stringify(error, null, 2)),
    close: () => console.log('Done'),
  });
}

function App() { 
  function publish() {
    console.log('publishing to myTopic')
    PubSub.publish('myTopic1', 'Hello to all subscribers!');
  }
  return (
    <div className="App">
      <button onClick={publish}>publish</button>
      <button onClick={testSubscribe}>subscribe</button>
    </div>
  );
}

export default App;

In this example:

  1. click subscribe button
  2. click publish button
  3. you will receive the message
  4. click subscribe button again
  5. click publish button
  6. No message is received

So before each subscribe the code will unsubscribe existing topics. We have similar need in one of our real use cases.

elorzafe commented 5 years ago

@mevert thanks for your code snippet I have a PR in place to fix this issue.

Thanks again

elorzafe commented 5 years ago

@mevert it was published the unstable version for pubsub @aws-amplify/pubsub@1.1.3-unstable.18+61ec13f2 this version contains the fix for the problem you have. Let me know if you have any issues.

mevert commented 5 years ago

Thanks. I tried with following packages in package.json:

    "@aws-amplify/pubsub": "1.1.3-unstable.18+61ec13f2",
    "aws-amplify": "^1.1.40",

Removed package-lock.json and node_modules and reinstalled everything but I still have the same problem. Tried importing PubSub with import PubSub from '@aws-amplify/pubsub' and import Amplify, { PubSub} from 'aws-amplify'; Tested with same code example as I already provided. Any idea why it won't work? @elorzafe

elorzafe commented 5 years ago

@mevert can you removing both @aws-amplify/pubsub and aws-amplify and then add the latest unstable for aws-amplify@unstable.

Another option is to install separately each package. With other packages are you using?

Thanks

mevert commented 5 years ago

@elorzafe see my example codes from https://github.com/mevert/aws-iot-pubsub-test and the full package.json. I just tried the latest "aws-amplify": "1.1.41-unstable.20". If you try my code, remember to fill your aws configs to App.js.

  1. subscribe
  2. publish (works)
  3. subscribe
  4. publish (message not received)

Logs:

[DEBUG] 58:21.102 PubSub - subscribe options undefined
ConsoleLogger.js:111 [DEBUG] 58:21.102 MqttOverWSProvider - Subscribing to topic(s) myTopic1,myTopic2
ConsoleLogger.js:101 [DEBUG] 58:21.103 Credentials - getting credentials
ConsoleLogger.js:101 [DEBUG] 58:21.104 Credentials - picking up credentials
ConsoleLogger.js:101 [DEBUG] 58:21.105 Credentials - getting new cred promise
ConsoleLogger.js:101 [DEBUG] 58:21.105 Credentials - checking if credentials exists and not expired
ConsoleLogger.js:101 [DEBUG] 58:21.106 Credentials - need to get a new credential or refresh the existing one
ConsoleLogger.js:101 [DEBUG] 58:21.106 AuthClass - Getting current user credentials
ConsoleLogger.js:101 [DEBUG] 58:21.108 AuthClass - Getting current session
ConsoleLogger.js:111 [DEBUG] 58:21.108 AuthClass - getting session failed undefined
ConsoleLogger.js:101 [DEBUG] 58:21.109 Credentials - setting credentials for guest
ConsoleLogger.js:111 [DEBUG] 58:21.267 Credentials - Load credentials successfully CognitoIdentityCredentials ...
ConsoleLogger.js:103 [DEBUG] 58:21.269 Signer {region: "eu-central-1", service: "iotdevicegateway"}
ConsoleLogger.js:111 [DEBUG] 58:21.273 MqttOverWSProvider - Creating new MQTT client ...
App.js:40 publishing to myTopic
ConsoleLogger.js:101 [DEBUG] 58:49.80 Credentials - getting credentials
ConsoleLogger.js:101 [DEBUG] 58:49.81 Credentials - picking up credentials
ConsoleLogger.js:101 [DEBUG] 58:49.82 Credentials - getting new cred promise
ConsoleLogger.js:101 [DEBUG] 58:49.82 Credentials - checking if credentials exists and not expired
ConsoleLogger.js:111 [DEBUG] 58:49.83 Credentials - is this credentials expired? CognitoIdentityCredentials {expired: false, ...
ConsoleLogger.js:101 [DEBUG] 58:49.84 Credentials - credentials not changed and not expired, directly return
ConsoleLogger.js:103 [DEBUG] 58:49.86 Signer {region: "eu-central-1", service: "iotdevicegateway"}
ConsoleLogger.js:111 [DEBUG] 58:49.88 MqttOverWSProvider - Publishing to topic(s) (2) ["myTopic1", ""Hello to all subscribers!""]
**App.js:32 message received Hello to all subscribers!**
ConsoleLogger.js:111 [DEBUG] 58:54.737 MqttOverWSProvider - Unsubscribing from topic(s) myTopic1,myTopic2
App.js:28 unsubscribed
ConsoleLogger.js:111 [DEBUG] 58:54.739 PubSub - subscribe options undefined
ConsoleLogger.js:111 [DEBUG] 58:54.739 MqttOverWSProvider - Subscribing to topic(s) myTopic1,myTopic2
ConsoleLogger.js:101 [DEBUG] 58:54.740 Credentials - getting credentials
ConsoleLogger.js:101 [DEBUG] 58:54.740 Credentials - picking up credentials
ConsoleLogger.js:101 [DEBUG] 58:54.741 Credentials - getting new cred promise
ConsoleLogger.js:101 [DEBUG] 58:54.741 Credentials - checking if credentials exists and not expired
ConsoleLogger.js:111 [DEBUG] 58:54.742 Credentials - is this credentials expired? CognitoIdentityCredentials {expired: false, ...
ConsoleLogger.js:101 [DEBUG] 58:54.742 Credentials - credentials not changed and not expired, directly return
ConsoleLogger.js:103 [DEBUG] 58:54.744 Signer {region: "eu-central-1", service: "iotdevicegateway"}
ConsoleLogger.js:111 [DEBUG] 58:54.746 MqttOverWSProvider - Creating new MQTT client ...
App.js:40 publishing to myTopic
ConsoleLogger.js:101 [DEBUG] 58:56.492 Credentials - getting credentials
ConsoleLogger.js:101 [DEBUG] 58:56.492 Credentials - picking up credentials
ConsoleLogger.js:101 [DEBUG] 58:56.493 Credentials - getting new cred promise
ConsoleLogger.js:101 [DEBUG] 58:56.494 Credentials - checking if credentials exists and not expired
ConsoleLogger.js:111 [DEBUG] 58:56.495 Credentials - is this credentials expired? CognitoIdentityCredentials {expired: false, ...
ConsoleLogger.js:101 [DEBUG] 58:56.495 Credentials - credentials not changed and not expired, directly return
ConsoleLogger.js:103 [DEBUG] 58:56.497 Signer {region: "eu-central-1", service: "iotdevicegateway"}
ConsoleLogger.js:111 [DEBUG] 58:56.498 MqttOverWSProvider - Publishing to topic(s) (2) ["myTopic1", ""Hello to all subscribers!""]

Message received only the first time.

elorzafe commented 5 years ago

Thanks @mevert I will try this now

elorzafe commented 5 years ago

@mevert I found the problem, there is a race condition that is trigger when you do unsubscribe and subscribe as you are doing. The problem is on this line onDisconnect callback is invoked after the new subscription.

From your code sample: If you add an unsubscribe button to separate that it wont be a problem. I will reopen this issue to fix that race condition.

mevert commented 5 years ago

@elorzafe Thanks for the answer. In the real use case, I have the need to unsubscibe all the existing topics in the same function execution before the new PubSub.subscribe. So what would be the preffered way to fix this issue? So I need to know when unsubscibe function is ready. If async/await would be supported in unsubscibe there would not be this race condition?

nayan27 commented 5 years ago

Encountering exact same situation as @mevert. Also I would add the following : I don't believe this problem is dependant on the platform but here is the issue i have:

Effectively, in the error function, if i unsubscribe, wait a second and then subscribe again ... i receive published messages on those topics. Waiting a second before subscribing again is not very optimal and more of a patch then solving the problem really. I agree with @mevert that having a way to know when subscription is available would be necessary for those special cases.

Thank you @elorzafe !! :)

mevert commented 4 years ago

@elorzafe and other amplify-js guys: What is the status of this issue and when you will release the fix? It's been almost two month now since you reopened this case. Would be really useful to get this fixed asap.

Amplifiyer commented 4 years ago

@mevert I just merged the fix and it will reach an unstable release in few minutes. Please test using aws-amplify@unstable and let us know if the problem is resolved.

mevert commented 4 years ago

Hello @Amplifiyer , I got my test application working and I was not able reproduce this problem anymore with it ("aws-amplify": "2.2.1-unstable.20").

However, when I tried the same with one of our production application I got: "Error: AMQJS0011E Invalid state not connected. at ClientImpl.subscribe" always when unsubscribing and then subscribing again. When I had a little delay after calling unsubscribe this problem did not exist anymore. So it would be nice to be able to know when unsubscribe() is ready so we do not need to hardcode some X amount of delay before calling subscribe in the same function execution.

Amplifiyer commented 4 years ago

@mevert thanks again for testing and letting us know your results. Is this problem happening with a new app as well or only with an existing app while upgrading aws-amplify?

mevert commented 4 years ago

@Amplifiye it is happening with the existing app that I tried to upgrade to aws-amplify "2.2.1-unstable.20". Previously specific "@aws-amplify/pubsub": "1.1.0" version was used without same error.

elorzafe commented 4 years ago

@mevert can you share the production code, what is the difference between that and the test app?

madebythose commented 4 years ago

@elorzafe - I just noticed this was pushed into the Amplify master. Its great you've made some changes to fix these issues.

One thing I'm still not sure about is how to handle network disconnect/reconnects. Does this now reconnect automatically if the app goes into the background or network is lost?

dingdongping commented 4 years ago

@elorzafe Under normal conditions, it is OK to subscribe and unsubscribe, but if you subscribe several times after the network is disconnected, and then you can't subscribe again after connecting to the network

{"error": {"errorCode": 7, "errorMessage": "AMQJS0007E Socket error:undefined."}} WARN Possible Unhandled Promise Rejection (id: 0): Object { "errorCode": 7, "errorMessage": "AMQJS0007E Socket error:undefined.", "invocationContext": undefined, }

tigerjiang commented 3 years ago

@elorzafe Under normal conditions, it is OK to subscribe and unsubscribe, but if you subscribe several times after the network is disconnected, and then you can't subscribe again after connecting to the network

{"error": {"errorCode": 7, "errorMessage": "AMQJS0007E Socket error:undefined."}} WARN Possible Unhandled Promise Rejection (id: 0): Object { "errorCode": 7, "errorMessage": "AMQJS0007E Socket error:undefined.", "invocationContext": undefined, }

I meet this issue, Do you have resolve it?

github-actions[bot] commented 2 years ago

This issue has been automatically locked since there hasn't been any recent activity after it was closed. Please open a new issue for related bugs.

Looking for a help forum? We recommend joining the Amplify Community Discord server *-help channels or Discussions for those types of questions.