slackapi / python-slack-sdk

Slack Developer Kit for Python
https://tools.slack.dev/python-slack-sdk/
MIT License
3.85k stars 837 forks source link

Intermittent socket mode connectivity #1280

Closed BrianGallew closed 1 year ago

BrianGallew commented 1 year ago

(Filling out the following details about bugs will help us solve your issue sooner.)

Reproducible in:

The Slack SDK version

slack-sdk==3.19.0

Python runtime version

Python 3.8.14

OS info

ProductName: Mac OS X ProductVersion: 10.15.7 BuildVersion: 19H2026 Darwin Kernel Version 19.6.0: Tue Jun 21 21:18:39 PDT 2022; root:xnu-6153.141.66~1/RELEASE_X86_64

Steps to reproduce:

[excerpt of the new socket mode slack driver for our internal fork of the skozcen/will project]

    @property
    def client(self):
        "References/initializes our Slack client"
        if self._client is None:
            self._client = SocketModeClient(
                app_token=settings.SLACK_APP_TOKEN,
                web_client=WebClient(token=settings.SLACK_BOT_TOKEN),
                ping_pong_trace_enabled=True,
                on_message_listeners=[self._sm_event_handler],
                on_error_listeners=[self._sm_error_handler],
            )
        return self._client

    def _load_channels(self):
        "Updates our internal list of channels.  Kind of expensive."
        channels = {}
        for page in self.client.web_client.conversations_list(
            limit=self.PAGE_LIMIT,
            exclude_archived=True,
            types="public_channel,private_channel,mpim,im",
        ):
            for channel in page["channels"]:
                channels[channel["id"]] = Channel(
                    id=channel["id"],
                    name=channel.get("name", channel["id"]),
                    source=clean_for_pickling(channel),
                    members=dict(),
                )
        if channels:
            self._channels = channels
            self.save("slack_channel_cache", channels)
            logging.info("Loaded/saved %s channels in the cache", len(channels))

    def _update_channels(self, channel: dict = None):
        "Updates our internal list of channels.  Kind of expensive."
        if channel is None and not self._channels:
            channels = self.load("slack_channel_cache", None)
            if channels is not None:
                self._channels = channels
            return
        channels = self.channels
        name = channel.get("name", channel["id"])
        logging.debug("Updated channel %s", str(name))
        channels[channel["id"]] = Channel(
            id=channel["id"],
            name=name,
            source=clean_for_pickling(channel),
            members=dict(),
        )
        if channels:
            self._channels = channels
            self.save("slack_channel_cache", channels)
            logging.debug("saved %s channels to the cache", len(channels))

    def _load_people(self):
        "Loads all Slack users, kind of expensive."
        people = dict()
        for page in self.client.web_client.users_list(limit=self.PAGE_LIMIT):
            for member in page["members"]:
                if member["deleted"]:
                    continue
                people[member["id"]] = Person.from_slack(member)
                if member["name"] == self.handle:
                    self.me = people[member["id"]]

        if people:
            self._people = people
            self.save("slack_people_cache", people)
            logging.info("Loaded/saved %s people in the cache", len(people))

    def _update_people(self, person: dict = None):
        "Updates our internal list of Slack users.  Kind of expensive."
        if person is None and not self._people:
            if self.load("slack_people_cache", None):
                self._people = self.load("slack_people_cache", None)
            return
        people = self.people
        member_id = person["id"]
        if person.get("deleted", False):
            if member_id in people:
                del people[member_id]
        else:
            people[member_id] = Person.from_slack(person)
        self._people = people
        if self._people:
            self.save("slack_people_cache", people)
            logging.debug(
                "saved %s people to the cache because of %s",
                len(people),
                person["name"],
            )

    def _sm_error_handler(self, message: str):
        data = json.loads(message)
        logging.error("Got an error: %s", data)
        self._ack(data)

    def _ack(self, data: dict):
        """All Slack events need to be acked ... except for the ones that don't (currently only `hello`)
        :param dict: The original Slack event
        """
        if "envelope_id" in data:
            self.client.send_socket_mode_response({"envelope_id": data["envelope_id"]})
            logging.warning(f'{os.getpid()} got message: {data["payload"]}')
        elif data.get("type", None) == "hello":
            return
        else:
            logging.warning("Not acknowledging %s", data)

    def _sm_event_handler(self, message: str):
        """All Slack events are handled/routed by this function.  If it raises an Exception
        the current Slack connection will be closed and re-opened, so maybe don't do that.
        """
        try:
            data = json.loads(message)
            self._ack(data)
            event_type = data.get("type", None)
            if data.get("retry_attempt", 0) > 0:
                # return
                logging.warning("Retry %d", data["retry_attempt"])
            if event_type == "events_api":
                event_type = data["payload"].get("event", {}).get("type", None)

            if event_type is None:
                logging.error("Got an event without an event type!", message)
            elif event_type == "hello":
                logging.info(message)
            elif event_type == "channel_created":
                self._update_channels(data["payload"]["event"]["channel"])
            elif event_type == "user_change":
                self._update_people(data["payload"]["event"]["user"])
            elif event_type == "message":
                if not self.handle:
                    id = data["payload"].get("authorizations", [{"user_id": None}])[0][
                        "user_id"
                    ]
                    if id and id in self.people:
                        self.__class__.handle = id
                        self.__class__.me = self.people[id]

                self.handle_incoming_slack_event(data=data["payload"]["event"])
            else:
                logging.warning("No handler for a %s event: %s", event_type, data)
        except:
            logging.exception("Exception processing %s", message)

    def _watch_slack_ws(self):
        "This is our main loop."
        metadata_loaded = False  # Only load metadata once(expensive)
        while not self.exiting:
            try:
                if not self.client.is_connected():
                    self.client.connect()
                if not metadata_loaded:
                    Thread(target=self._load_people, daemon=True).start()
                    Thread(target=self._load_channels, daemon=True).start()
                    metadata_loaded = True
                while self.client.is_connected():
                    time.sleep(5)
                    if self.exiting:
                        self.client.close()
            except Exception as the_exception:
                logging.exception("Error in websocket loop")
                print(the_exception)
                time.sleep(2)

Expected result:

I expect to see messages come in within a second or two of sending them via DM. I cannot seem to find a way to expose the claimed intermittent availability issue.

Actual result:

Messages take from approximately zero seconds to 8 minutes to be received by the client.

Requirements

I originally submitted my issue to https://my.slack.com/help/requests/new but they directed me here. From their response:

Hi Brian,

Thanks for your patience. I'm a Platform specialist, and I reviewed the notes that Paige provided about your earlier chat.

It appears that the socket mode connection generated for your app seemed to be intermittently unavailable so we don't get the expected `envelope_id` within 3 seconds, and end up retrying your requests.

* The 3-second requirement is mentioned here: https://api.slack.com/apis/connections/socket-implement#acknowledge [api.slack.com].
* The event callback retry mechanism is mentioned here: https://api.slack.com/apis/connections/events-api#the-events-api__field-guide__error-handling__failure-conditions [api.slack.com]

Given this, you may want to consider creating multiple socket mode connections (as mentioned here: https://api.slack.com/apis/connections/socket-implement#connections [api.slack.com]), but if you do that, the Slack callbacks will only be sent to one of the connections, but that's used as a backup in case the main connection is not available for any reason. **Additionally, it should be noted that you're allowed up to 10 simultaneous websocket connections.**

Sorry for the trouble here, and hopefully, this helps. Thanks for checking in about this.
seratch commented 1 year ago

Hi @BrianGallew, thanks for writing in!

As long as your bolt-python app does not display any error-level logging, the app process that you're aware should be alive and be working properly.

The only possible cause of your situation is that you may have outdated Socket Mode client processes, which should cause the issue you've observed. Could you check my comment at https://github.com/slackapi/java-slack-sdk/issues/803#issuecomment-895768180? I am not sure whether this is your case but I hope this was helpful to you.

BrianGallew commented 1 year ago

So, not bolt-python, it's "raw" slack-sdk.

Checking out your other response, I see this: In this case, the Slack server-side can send payloads to active hosts in a round-robin manner. The docs at https://api.slack.com/apis/connections/socket-implement#connections say that payloads can be sent to any connection. Am I then to take it that, if I have 5 instances of the application open, I need to assume that events will not be delivered to all of them, but only some "random" sub-set?

BrianGallew commented 1 year ago

OK, I've done some testing and this seems to be the case: Slack seems to do at-least-once message delivery. Now that I know this I can update my app to handle multiple backends queuing messages for handling with deduplication.