Rapptz / discord.py

An API wrapper for Discord written in Python.
http://discordpy.rtfd.org/en/latest
MIT License
14.72k stars 3.75k forks source link

AWS lambda pre warmed function and bot.run throwing RuntimeError("Session is closed") error after close command #9121

Open sebnapo opened 1 year ago

sebnapo commented 1 year ago

Summary

AWS lambda pre warmed function and bot.run throwing RuntimeError("Session is closed") error after close command

Reproduction Steps

Deploy a lambda with the provided code. I personnaly set it as a trigger for a gitlab MR webhook event. Make the function run once, everything will be great. Make it run twice, there will be an error.

Here is the stacktrace : Traceback (most recent call last): File "/var/task/src/handler/merge_request_webhook_handler.py", line 9, in handle MergeRequestWebhookHandler().handle(event) File "/var/task/src/handler/merge_request_webhook_handler.py", line 29, in handle self.new_merge_request_service.create_new_channel(application_related_to_merge_request, File "/var/task/src/service/new_merge_request_service.py", line 23, in create_new_channel bot.run(self.ssm_dao.get_parameter("DISCORD_BOT_TOKEN")) File "/var/task/discord/client.py", line 828, in run asyncio.run(runner()) File "/var/lang/lib/python3.9/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/var/lang/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete return future.result() File "/var/task/discord/client.py", line 817, in runner await self.start(token, reconnect=reconnect) File "/var/task/discord/client.py", line 745, in start await self.login(token) File "/var/task/discord/client.py", line 580, in login data = await self.http.static_login(token) File "/var/task/discord/http.py", line 801, in static_login data = await self.request(Route('GET', '/users/@me')) File "/var/task/discord/http.py", line 624, in request async with self.session.request(method, url, **kwargs) as response: File "/var/task/aiohttp/client.py", line 1141, in aenter__ self._resp = await self._coro File "/var/task/aiohttp/client.py", line 400, in _request raise RuntimeError("Session is closed")

Minimal Reproducible Code

import os
from typing import Optional

import discord
from discord.ext import commands  # type: ignore

from src.data_access.ssm_dao import SsmDao

bot = commands.Bot(command_prefix='!', intents=discord.Intents.all())

class OldMergeRequestService:

    def __init__(
            self,
            ssm_dao: Optional[SsmDao] = None
    ):
        self.ssm_dao = ssm_dao or SsmDao()

    def delete_channel(self, category_name: str, channel_name: str):
        bot.run(self.ssm_dao.get_parameter("DISCORD_BOT_TOKEN"))
        os.environ["channel_name"] = channel_name
        os.environ["category_name"] = category_name

    @staticmethod
    @bot.event
    async def on_ready():
        guild = bot.get_guild('MY_GUILD')
        category = discord.utils.get(guild.channels, name=f'{os.environ["category_name"]} Pending MR')
        channel = discord.utils.get(category.channels, name=os.environ["channel_name"])
        if channel is not None:
            await channel.delete()
        await bot.close()

Expected Results

Actual Results

First attemp my category and channel are created on my server but on the second attempt, an RuntimeError("Session is closed") is raised and make my lambda fail.

Intents

discord.Intents.all()

System Information

But do not take into account the system info as the function is run on a lambda

Checklist

Additional Context

No response

mikeshardmind commented 1 year ago

Your traceback includes code which isn't shown here, and your "minimal reproducable code" doesn't do anything on it's own as it has no entrypoint to where the bot is run.

That said, from what is here as well as the error, you're closing the bot and then trying to re-use the same bot object.

If the only thing this is meant to do is to do 1-off HTTP requests triggered by something else, you may want to consider changing your use.

For instance:

import asyncio
import discord

def delete_channel(category_name: str, channel_name: str):
    async def runner():
        client = discord.Client(intents=discord.intents.none())
        async with client:
            await client.login()
            guild = await client.fetch_guild(GUILD_ID_HERE)
            category = discord.utils.get(guild.categories, name=category_name)
            channel = discord.utils.get(category.channels, name=channel_name)
            await channel.delete()

    asyncio.run(runner())

Now this doesn't handle re-using the client with pre-warming and is merely an example of what can be done with http only use based on what you have shown here.

sebnapo commented 1 year ago

You are right, I did not include the handler of my lambda function because I thought it has nothing to do with the use of this library.

nonetheless, here is my lambda handler code :

import json
from typing import Dict, Optional

from src.service.delete_merge_request_service import OldMergeRequestService
from src.service.new_merge_request_service import NewMergeRequestService

def handle(event: Dict, _context: Dict):
    MergeRequestWebhookHandler().handle(event)

class MergeRequestWebhookHandler:

    def __init__(
            self,
            new_merge_request_service: Optional[NewMergeRequestService] = None,
            old_merge_request_serivce: Optional[OldMergeRequestService] = None
    ):
        self.new_merge_request_service = new_merge_request_service or NewMergeRequestService()
        self.old_merge_request_serivce = old_merge_request_serivce or OldMergeRequestService()

    def handle(self, event: Dict):
        body = json.loads(event["body"])
        merge_request_name = body["object_attributes"]["source_branch"]
        merge_request_action = body["object_attributes"]["action"]
        application_related_to_merge_request = body["project"]["name"]
        if merge_request_action == "open":
            self.new_merge_request_service.create_new_channel(application_related_to_merge_request, merge_request_name)
        elif merge_request_action == "merge" or "close":
            self.old_merge_request_serivce.delete_channel(application_related_to_merge_request, merge_request_name)

I am going to give it a try using what you shown here. But let me understand something. When you use bot.run(self.ssm_dao.get_parameter("DISCORD_BOT_TOKEN")) It open a new session of the bot ?

So when I'm doing this await bot.close() i am closing this session right ? And so, when we create a new instance of this class, a new session should be opened when we go past the bot.run method ?

I tried in the handler part to use a new instance of my service each time the lambda is called with this little tweak

def handle(self, event: Dict):
        body = json.loads(event["body"])
        merge_request_name = body["object_attributes"]["source_branch"]
        merge_request_action = body["object_attributes"]["action"]
        merge_request_url = body["object_attributes"]["url"]
        application_related_to_merge_request = body["project"]["name"]
        if merge_request_action == "open":
            self.new_merge_request_service = NewMergeRequestService()
            self.new_merge_request_service.create_new_channel(application_related_to_merge_request,
                                                              merge_request_name,
                                                              merge_request_url)
        elif merge_request_action == "merge" or "close":
            self.old_merge_request_serivce = OldMergeRequestService()
            self.old_merge_request_serivce.delete_channel(application_related_to_merge_request, merge_request_name)

But i am getting the same error.

CptDarkrex commented 1 month ago

To fix the issue at hand, you should avoid reusing the same bot instance after it has been closed. Instead, you can create a new instance of the discord.Client for each operation. Below is a revised version of the provided code, which ensures a new client instance is created each time:

import os
import json
import asyncio
import discord
from typing import Dict, Optional

from src.data_access.ssm_dao import SsmDao
from src.service.delete_merge_request_service import OldMergeRequestService
from src.service.new_merge_request_service import NewMergeRequestService

class MergeRequestWebhookHandler:
    def __init__(self,
                 new_merge_request_service: Optional[NewMergeRequestService] = None,
                 old_merge_request_service: Optional[OldMergeRequestService] = None):
        self.new_merge_request_service = new_merge_request_service or NewMergeRequestService()
        self.old_merge_request_service = old_merge_request_service or OldMergeRequestService()

    def handle(self, event: Dict):
        body = json.loads(event["body"])
        merge_request_name = body["object_attributes"]["source_branch"]
        merge_request_action = body["object_attributes"]["action"]
        merge_request_url = body["object_attributes"]["url"]
        application_related_to_merge_request = body["project"]["name"]

        if merge_request_action == "open":
            self.new_merge_request_service.create_new_channel(
                application_related_to_merge_request,
                merge_request_name,
                merge_request_url
            )
        elif merge_request_action in ("merge", "close"):
            self.old_merge_request_service.delete_channel(
                application_related_to_merge_request,
                merge_request_name
            )

class OldMergeRequestService:
    def __init__(self, ssm_dao: Optional[SsmDao] = None):
        self.ssm_dao = ssm_dao or SsmDao()

    def delete_channel(self, category_name: str, channel_name: str):
        async def runner():
            client = discord.Client(intents=discord.Intents.none())
            async with client:
                await client.login(self.ssm_dao.get_parameter("DISCORD_BOT_TOKEN"))
                guild = await client.fetch_guild(os.environ["MY_GUILD"])
                category = discord.utils.get(guild.categories, name=category_name)
                channel = discord.utils.get(category.channels, name=channel_name)
                if channel:
                    await channel.delete()

        asyncio.run(runner())

class NewMergeRequestService:
    def __init__(self, ssm_dao: Optional[SsmDao] = None):
        self.ssm_dao = ssm_dao or SsmDao()

    def create_new_channel(self, category_name: str, channel_name: str, channel_topic: str):
        async def runner():
            client = discord.Client(intents=discord.Intents.none())
            async with client:
                await client.login(self.ssm_dao.get_parameter("DISCORD_BOT_TOKEN"))
                guild = await client.fetch_guild(os.environ["MY_GUILD"])
                category = discord.utils.get(guild.categories, name=category_name)
                channel = await guild.create_text_channel(name=channel_name, category=category)
                await channel.edit(topic=channel_topic)

        asyncio.run(runner())

def handle(event: Dict, _context: Dict):
    MergeRequestWebhookHandler().handle(event)

Considerations:

  1. Separate Client Instances: Each service (OldMergeRequestService and NewMergeRequestService) creates a new discord.Client instance for each operation. This ensures that the bot instance is not reused after it has been closed.
  2. Async Client Handling: Used async with client to ensure proper handling of the client's context.
  3. Entrypoint: The handle function is now set as the entry point for the Lambda function, ensuring it processes incoming events correctly.

This should resolve the RuntimeError("Session is closed") by ensuring a fresh client instance is used for each interaction with Discord.