Closed github-actions[bot] closed 1 year ago
https://api.github.com/yepcord/server/blob/4e0cb962580c7ea8f5a0261f2c5605a0ca44f72f/src/yepcord/core.py#L414
settings = await user.settings return MFA(settings.mfa, uid) async def generateUserMfaNonce(self, user: mUser) -> tuple[str, str]: exp = time() + 600 code = b64encode(urandom(16)) nonce = JWT.encode({"type": "normal", "code": code, "user_id": user.id}, self.key, exp) rnonce = JWT.encode({"type": "regenerate", "code": code, "user_id": user.id}, self.key, exp) return nonce, rnonce async def verifyUserMfaNonce(self, user: mUser, nonce: str, regenerate: bool) -> None: if not (payload := JWT.decode(nonce, self.key)) or payload["user_id"] != user.id: raise InvalidDataErr(400, Errors.make(60011)) nonce_type = "normal" if not regenerate else "regenerate" if nonce_type != payload["type"]: raise InvalidDataErr(400, Errors.make(60011)) async def useMfaCode(self, user: mUser, code: str) -> bool: if (code := await MfaCode.objects.get_or_none(user=user, code=code, used=False)) is None: return False await code.update(used=True) return True async def getChannel(self, channel_id: int) -> Optional[mChannel]: if (channel := await mChannel.objects.select_related(["guild", "owner", "parent"]) .get_or_none(id=channel_id)) is None: return return await self.setLastMessageIdForChannel(channel) async def getDmChannel(self, user1: mUser, user2: mUser) -> Optional[mChannel]: channel_row = await mChannel.Meta.database.fetch_one( query="SELECT * FROM `channels` WHERE `id` = (" "SELECT `channel` FROM `channels_users` WHERE `user` IN (:user1, :user2) GROUP BY `channel` " "HAVING COUNT(DISTINCT `user`) = 2);", values={"user1": user1.id, "user2": user2.id} ) if channel_row is None: return return await mChannel.from_row(channel_row, mChannel).load_all() async def getDMChannelOrCreate(self, user1: mUser, user2: mUser) -> mChannel: channel = await self.getDmChannel(user1, user2) if channel is None: channel = await mChannel.objects.create(id=Snowflake.makeId(), type=ChannelType.DM) await channel.recipients.add(user1) await channel.recipients.add(user2) return await mChannel.objects.get(id=channel.id) if await self.isDmChannelHidden(user1, channel): await self.unhideDmChannel(user1, channel) await c.getGw().dispatch(DMChannelCreateEvent(channel), users=[user1.id]) return await self.setLastMessageIdForChannel(channel) async def getLastMessageId(self, channel: mChannel) -> Optional[int]: if (last_message_id := await mMessage.objects.filter(channel=channel).max("id")) is not None: return last_message_id async def setLastMessageIdForChannel(self, channel: mChannel) -> mChannel: channel.last_message_id = await self.getLastMessageId(channel) return channel async def getLastMessage(self, channel: mChannel) -> mMessage: return await mMessage.objects.filter(channel=channel).max("id") async def getChannelMessagesCount(self, channel: mChannel) -> int: return await mMessage.objects.filter(channel=channel).count() async def getPrivateChannels(self, user: mUser, with_hidden: bool=False) -> list[mChannel]: channels = await mChannel.objects.select_related("recipients").filter(recipients__id__in=[user.id]).all() return [await self.setLastMessageIdForChannel(channel) for channel in channels] async def getChannelMessages(self, channel, limit: int, before: int=0, after: int=0) -> list[mMessage]: id_filter = {} if after: id_filter["id__gt"] = after if before: id_filter["id__lt"] = before return await mMessage.objects.filter(channel=channel, **id_filter).limit(limit).all() async def getMessage(self, channel: mChannel, message_id: int) -> Optional[mMessage]: if not message_id: return return await mMessage.objects.select_related(["author", "channel", "thread", "guild"])\ .get_or_none(channel=channel, id=message_id) async def sendMessage(self, message: mMessage) -> mMessage: async def _addToReadStates(): users = await self.getRelatedUsersToChannel(message.channel) if message.author.id in users: users.remove(message.author.id) for user in users: read_state, _ = await mReadState.objecs.get_or_create( user=user, channel=message.channel, _defaults={"last_read_id": message.id, "count": 0} ) await read_state.update(count=read_state.count+1) Context().run(get_event_loop().create_task, _addToReadStates()) return message async def deleteMessage(self, message: mMessage) -> None: await message.delete() async def getRelatedUsersToChannel(self, channel: mChannel, ids: bool=True) -> list[Union[int, mUser]]: if channel.type in [ChannelType.DM, ChannelType.GROUP_DM]: await channel.load_all() if ids: return [recipient.id for recipient in channel.recipients] return channel.recipients elif channel.type in GUILD_CHANNELS: return [member.user_id for member in await self.getGuildMembers(channel.guild)] elif channel.type in (ChannelType.GUILD_PUBLIC_THREAD, ChannelType.GUILD_PRIVATE_THREAD): return [member.user_id for member in await self.getThreadMembers(channel)] async def setReadState(self, user: mUser, channel: mChannel, count: int, last: int) -> None: read_state, _ = await mReadState.objects.get_or_create( user=user, channel=channel, _defaults={"last_read_id": last, "count": count} ) await read_state.update(last_read_id=last, count=count) async def getReadStatesJ(self, user: mUser) -> list: states = [] for st in await mReadState.objects.filter(user=user).all: states.append({ # TODO: replace with st.ds_json "mention_count": st.count, "last_pin_timestamp": await self.getLastPinTimestamp(st.channel), "last_message_id": str(st.last_read_id), "id": str(st.channel_id), }) return states async def getUserNote(self, user: mUser, target: mUser) -> Optional[mUserNote]: return await mUserNote.objects.get_or_none(user=user, target=target) async def getAttachment(self, attachment_id: int) -> Optional[mAttachment]: return await mAttachment.objects.get_or_none(id=attachment_id) async def getUserByChannelId(self, channel_id: int, user_id: int) -> Optional[mUser]: if not (channel := await self.getChannel(channel_id)): return None return await self.getUserByChannel(channel, user_id) async def getUserByChannel(self, channel: mChannel, user_id: int) -> Optional[mUser]: if channel.type in (ChannelType.DM, ChannelType.GROUP_DM): if await mChannel.objects.select_related("recipients").filter(id=channel.id, recipients__id__in=[user_id])\ .exists(): return await self.getUser(user_id) elif channel.type in GUILD_CHANNELS: return await self.getGuildMember(channel.guild, user_id) elif channel.type in (ChannelType.GUILD_PUBLIC_THREAD, ChannelType.GUILD_PRIVATE_THREAD): return await self.getThreadMember(channel, user_id) async def setFrecencySettings(self, user: mUser, proto: str) -> None: await mFrecencySettings.objects.get_or_create(user=user, _defaults={"settings": proto}) async def getFrecencySettings(self, user: mUser) -> str: return (await mFrecencySettings.objects.get(user=user)).settings async def sendVerificationEmail(self, user: mUser) -> None: key = new(self.key, str(user.id).encode('utf-8'), sha256).digest() t = int(time()) sig = b64encode(new(key, f"{user.id}:{user.email}:{t}".encode('utf-8'), sha256).digest())
Closed in 0afcb236b7a3ff0df9161eb63cc0e361cb5c82ec
https://api.github.com/yepcord/server/blob/4e0cb962580c7ea8f5a0261f2c5605a0ca44f72f/src/yepcord/core.py#L414