yepcord / server

Unofficial discord backend implementation in python.
GNU Affero General Public License v3.0
2 stars 1 forks source link

add test for nonexistent user #237

Open github-actions[bot] opened 1 month ago

github-actions[bot] commented 1 month ago

https://github.com/yepcord/server/blob/7dfe87e5c92d527d77109fc89663993144526033/yepcord/yepcord/models/user.py#L214


                "accent_color": data.accent_color
            }
        }
        if guild_id and (guild := await models.Guild.get_or_none(id=guild_id)):
            if member := await guild.get_member(self.id):
                data["guild_member_profile"] = {"guild_id": str(guild_id)}
                data["guild_member"] = await member.ds_json()
        if mutual_friends_count:
            data["mutual_friends_count"] = 0  # TODO: add mutual friends count
        if with_mutual_guilds:
            query = Q(user=self)
            if self != other_user:
                query &= Q(guild__id__in=Subquery(
                    models.GuildMember
                    .filter(user__id__in=[self.id, other_user.id])
                    .group_by("guild_id")
                    .annotate(user_count=Count("user__id", distinct=True))
                    .filter(user_count=2)
                    .values_list("guild_id", flat=True)
                ))

            data["mutual_guilds"] = [
                {"id": str(guild_id), "nick": nick}
                for nick, guild_id in await models.GuildMember.filter(query).values_list("nick", "guild_id")
            ]
        if self.is_bot:
            data["user"]["bot"] = True

        return data

    # noinspection PyMethodMayBeStatic
    async def get_another_user(self, user_id: int) -> User:
        # TODO: check for relationship, mutual guilds or mutual friends
        if (user := await User.y.get(user_id, False)) is None:  # TODO: add test for nonexistent user
            raise UnknownUser
        return user

    def check_password(self, password: str) -> bool:
        return checkpw(self.y.prepare_password(password, self.id), self.password.encode("utf8"))

    def hash_new_password(self, password: str) -> str:
        return self.y.hash_password(password, self.id)

    async def change_password(self, new_password: str) -> None:
        self.password = self.hash_new_password(new_password)
        await self.save(update_fields=["password"])

    async def change_username(self, username: str) -> None:
        data = await self.data
        discriminator = data.discriminator
        if await User.y.getByUsername(username, discriminator):
            discriminator = await self.y.get_free_discriminator(username)
            if discriminator is None:
                raise InvalidDataErr(400, Errors.make(50035, {"username": {
                    "code": "USERNAME_TOO_MANY_USERS",
                    "message": "This name is used by too many users. Please enter something else or try again."
                }}))
        data.username = username
        data.discriminator = discriminator
        await data.save(update_fields=["username", "discriminator"])

    async def change_discriminator(self, new_discriminator: int, username_changed: bool = False) -> bool:
        data = await self.data
        username = data.username
        if await self.y.getByUsername(username, new_discriminator):
            if username_changed:
                return False
            raise InvalidDataErr(400, Errors.make(50035, {"username": {
                "code": "USERNAME_TOO_MANY_USERS",
                "message": "This discriminator already used by someone. Please enter something else."
            }}))
        data.discriminator = new_discriminator
        await data.save(update_fields=["discriminator"])
        return True

    async def change_email(self, new_email: str) -> None:
        new_email = new_email.lower()
        if self.email == new_email:
            return
        if await User.exists(email=new_email):
            raise InvalidDataErr(400, Errors.make(50035, {"email": {"code": "EMAIL_ALREADY_REGISTERED",
                                                                    "message": "Email address already registered."}}))
        self.email = new_email
        self.verified = False
        await self.save()

    async def create_backup_codes(self) -> list[str]:
        codes = ["".join([choice('abcdefghijklmnopqrstuvwxyz0123456789') for _ in range(8)]) for _ in range(10)]

        await self.clear_backup_codes()
        await models.MfaCode.bulk_create([
            models.MfaCode(user=self, code=code) for code in codes
        ])

        return codes

    async def clear_backup_codes(self) -> None:
        await models.MfaCode.filter(user=self).delete()

    async def get_backup_codes(self) -> list[str]:
        return [code.code for code in await models.MfaCode.filter(user=self).limit(10)]

    async def use_backup_code(self, code: str) -> bool:
        if (code := await models.MfaCode.get_or_none(user=self, code=code, used=False)) is None:
            return False
        code.used = True
        await code.save(update_fields=["used"])
        return True

    async def y_delete(self) -> None:
        await self.update(deleted=True, email=f"deleted_{self.id}@yepcord.ml", password="")
        data = await self.data
        await data.update(discriminator=0, username=f"Deleted User {hex(self.id)[2:]}", avatar=None, public_flags=0,
                          avatar_decoration=None)
        await models.Session.filter(user=self).delete()
        await models.Relationship.filter(Q(from_user=self) | Q(to_user=self)).delete()
        await models.MfaCode.filter(user=self).delete()
        await models.GuildMember.filter(user=self).delete()
        await models.UserSettings.filter(user=self).delete()
        await models.FrecencySettings.filter(user=self).delete()
        await models.Invite.filter(inviter=self).delete()
        await models.ReadState.filter(user=self).delete()

    async def get_guilds(self) -> list[models.Guild]:
        return [
            member.guild for member in
            await models.GuildMember.filter(user=self).select_related("guild", "guild__owner")
        ]

    async def get_private_channels(self) -> list[models.Channel]:
        return [
            channel
            for channel in await models.Channel.filter(recipients__id=self.id).select_related("owner")
            if not await channel.dm_is_hidden(self)
        ]

    async def get_relationships(self) -> list[models.Relationship]:
        return [
            relationship
            for relationship in
            await models.Relationship.filter(Q(from_user=self) | Q(to_user=self)).select_related("from_user", "to_user")
            if not (relationship.type == RelationshipType.BLOCK and relationship.from_user.id != self.id)
        ]

    async def get_related_users(self) -> list[models.User]:
        users = {
            relationship.other_user(self).id: relationship.other_user(self)
            for relationship in await self.get_relationships()
        }
        for channel in await models.Channel.filter(recipients__id=self.id):
            for recipient in await channel.recipients.all():
                if recipient.id in users or recipient == self:
                    continue
                users[recipient.id] = recipient

        return list(users.values())

    async def get_mfa_key(self) -> str | None:
        return cast(str, await models.UserSettings.get(user=self).values_list("mfa", flat=True))

    async def generate_mfa_nonce(self) -> tuple[str, str]:
        key = b64decode(Config.KEY)
        exp = time() + 600
        code = b64encode(urandom(16))
        nonce = JWT.encode({"t": MfaNonceType.NORMAL, "c": code, "u": self.id}, key, exp)
        rnonce = JWT.encode({"t": MfaNonceType.REGENERATE, "c": code, "u": self.id}, key, exp)
        return nonce, rnonce

    async def verify_mfa_nonce(self, nonce: str, nonce_type: MfaNonceType) -> None:
        key = b64decode(Config.KEY)
        assert_(payload := JWT.decode(nonce, key), InvalidKey)
        assert_(payload["u"] == self.id, InvalidKey)
        assert_(nonce_type == payload["t"], InvalidKey)

    async def update_read_state(self, channel: models.Channel, count: int, last: int) -> None:
        await models.ReadState.update_or_create(user=self, channel=channel, defaults={
            "last_read_id": last,
            "count": count,
        })

    async def send_verification_email(self) -> None:
        token = JWT.encode({"id": self.id, "email": self.email}, b64decode(Config.KEY), expires_after=600)
        await EmailMsg.send_verification(self.email, token)

    async def get_read_states(self) -> list[models.ReadState]:
        if self.is_bot:
            return []

        return await models.ReadState.filter(user=self).select_related("channel")