Open ErSauravAdhikari opened 1 month ago
We can then modify the login code from client.py as:
async def _login(self) -> bool:
update_backup_code = False
guest_token = await self._request_guest_token()
self._session.headers["X-Guest-Token"] = guest_token
flow_token, subtasks = await self._request_login_tasks()
for _ in range(2):
flow_token, subtasks = await self._login_enter_user_identifier(flow_token)
subtask_ids = {subtask.id for subtask in subtasks}
if "LoginEnterAlternateIdentifierSubtask" in subtask_ids:
if not self.account.username:
raise TwitterException("Failed to login: no username to relogin")
flow_token, subtasks = await self._login_enter_password(flow_token)
flow_token, subtasks = await self._account_duplication_check(flow_token)
for subtask in subtasks:
if subtask.id == "LoginAcid":
if not self.account.email:
raise TwitterException(
f"Failed to login. Task id: LoginAcid." f" No email!"
)
if subtask.primary_text == "Check your email":
# raise TwitterException(
# f"Failed to login. Task id: LoginAcid."
# f" Email verification required!"
# f" No IMAP handler for this version of library :<"
# )
flow_token, subtasks = await self._login_email_auth_challenge(flow_token, self.account.email, self.account.email_password)
else:
try:
# fmt: off
flow_token, subtasks = await self._login_acid(flow_token, self.account.email)
# fmt: on
except HTTPException as exc:
if 399 in exc.api_codes:
logger.warning(
f"(auth_token={self.account.hidden_auth_token}, id={self.account.id}, username={self.account.username})"
f" Bad email!"
)
raise TwitterException(
f"Failed to login. Task id: LoginAcid. Bad email!"
)
else:
raise
subtask_ids = {subtask.id for subtask in subtasks}
if "LoginTwoFactorAuthChallenge" in subtask_ids:
if not self.account.totp_secret:
raise TwitterException(
f"Failed to login. Task id: LoginTwoFactorAuthChallenge. No totp_secret!"
)
try:
# fmt: off
flow_token, subtasks = await self._login_two_factor_auth_challenge(flow_token, self.account.get_totp_code())
# fmt: on
except HTTPException as exc:
if 399 in exc.api_codes:
logger.warning(
f"(auth_token={self.account.hidden_auth_token}, id={self.account.id}, username={self.account.username})"
f" Bad TOTP secret!"
)
if not self.account.backup_code:
raise TwitterException(
f"Failed to login. Task id: LoginTwoFactorAuthChallenge. No backup code!"
)
# Enter backup code
# fmt: off
flow_token, subtasks = await self._login_two_factor_auth_choose_method(flow_token)
try:
flow_token, subtasks = await self._login_two_factor_auth_challenge(flow_token, self.account.backup_code)
except HTTPException as exc:
if 399 in exc.api_codes:
logger.warning(
f"(auth_token={self.account.hidden_auth_token}, id={self.account.id}, username={self.account.username})"
f" Bad backup code!"
)
raise TwitterException(
f"Failed to login. Task id: LoginTwoFactorAuthChallenge. Bad backup_code!"
)
else:
raise
update_backup_code = True
# fmt: on
else:
raise
await self._complete_subtask(flow_token, [])
return update_backup_code
This will use another method, that is defined as:
async def _login_email_auth_challenge(self, flow_token, email, password):
print("Waiting for the email auth challange")
code = await login_and_get_token(email, password, 30)
# code = input("Enter the code: ")
inputs = [
{
"subtask_id": "LoginAcid",
"enter_text": {"text": code, "link": "next_link"},
}
]
print(f"Found {code} and sending email auth challange code")
return await self._complete_subtask(flow_token, inputs, auth=False)
This logs in to account that have not been banned.
But this process fails when the account has been banned. Even when we have the capsolver API key set up, as it does not properly handle the unlock, which I can't seem to figure out why.
That's why I did not submit this as a pull request 😄
I can see you've done a lot of work and that's really cool. I'll check it out when I have time, thanks❤️
My pleasure,
Please do tell me if I can help in any way or form.
I can also submit this as a PR, but as I mentioned some more work needs to be done to refine this. The above setup works for most of the cases, I am currently using the modified code locally with auto email otp auth. Works perfectly most of the time. But when we need to solve captcha, it fails, that might also be due to how the library is handling things in regards to capsolver, that is where I am a bit lost.
We can get the auth code automatically using the following logic
import asyncio import email as emaillib import imaplib import os import time from datetime import datetime, timezone, timedelta import logging logger = logging.getLogger(__name__) class EmailLoginError(Exception): def __init__(self, message="Email login error"): self.message = message super().__init__(self.message) class EmailCodeTimeoutError(Exception): def __init__(self, message="Email code timeout"): self.message = message super().__init__(self.message) class EmailClient: IMAP_MAPPING = { "gmail.com": "imap.gmail.com", "yahoo.com": "imap.mail.yahoo.com", "icloud.com": "imap.mail.me.com", "outlook.com": "imap-mail.outlook.com", "hotmail.com": "imap-mail.outlook.com", "aol.com": "imap.aol.com", "gmx.com": "imap.gmx.com", "zoho.com": "imap.zoho.com", "yandex.com": "imap.yandex.com", "protonmail.com": "imap.protonmail.com", "mail.com": "imap.mail.com", "rambler.ru": "imap.rambler.ru", "qq.com": "imap.qq.com", "163.com": "imap.163.com", "126.com": "imap.126.com", "sina.com": "imap.sina.com", "comcast.net": "imap.comcast.net", "verizon.net": "incoming.verizon.net", "mail.ru": "imap.mail.ru", } @classmethod def add_imap_mapping(cls, email_domain: str, imap_domain: str): cls.IMAP_MAPPING[email_domain] = imap_domain @classmethod def _get_imap_domain(cls, email: str) -> str: email_domain = email.split("@")[1] if email_domain in cls.IMAP_MAPPING: return cls.IMAP_MAPPING[email_domain] return f"imap.{email_domain}" def __init__(self, email: str, password: str, wait_email_code: int): self.email = email self.password = password self.wait_email_code = wait_email_code self.domain = self._get_imap_domain(email) self.imap = imaplib.IMAP4_SSL(self.domain) async def login(self): try: self.imap.login(self.email, self.password) self.imap.select("INBOX", readonly=True) logger.info(f"Logged into {self.email} on {self.domain}") except imaplib.IMAP4.error as e: logger.error(f"Error logging into {self.email} on {self.domain}: {e}") raise EmailLoginError() from e async def _wait_email_code(self, count: int, min_t: datetime | None) -> str | None: for i in range(count, 0, -1): _, rep = self.imap.fetch(str(i), "(RFC822)") for x in rep: if isinstance(x, tuple): msg = emaillib.message_from_bytes(x[1]) msg_time = msg.get("Date", "").split("(")[0].strip() msg_time = datetime.strptime(msg_time, "%a, %d %b %Y %H:%M:%S %z") msg_from = str(msg.get("From", "")).lower() msg_subj = str(msg.get("Subject", "")).lower() logger.info(f"({i} of {count}) {msg_from} - {msg_time} - {msg_subj}") if "info@x.com" in msg_from and "confirmation code is" in msg_subj: return msg_subj.split(" ")[-1].strip() return None async def get_email_code(self, min_t: datetime | None = None) -> str: try: logger.info(f"Waiting for confirmation code for {self.email}...") start_time = time.time() while True: _, rep = self.imap.select("INBOX") msg_count = int(rep[0].decode("utf-8")) if rep and rep[0] else 0 code = await self._wait_email_code(msg_count, min_t) if code: return code if self.wait_email_code < time.time() - start_time: raise EmailCodeTimeoutError(f"Email code timeout ({self.wait_email_code} sec)") await asyncio.sleep(5) except Exception as e: self.imap.select("INBOX") self.imap.close() raise e async def test(): now_time = datetime.now(timezone.utc) - timedelta(seconds=30) # Look from 30 seconds ago print("Searching from: ", now_time) wait_email_code = int(os.getenv("TWS_WAIT_EMAIL_CODE", os.getenv("LOGIN_CODE_TIMEOUT", 30))) client = EmailClient("....", "...", wait_email_code) await client.login() print("Logged in") code = await client.get_email_code(now_time) print("Code is", code) return code if __name__ == "__main__": import asyncio asyncio.run(test())
Credits: Code inspired from twscrape (not all of the above code orginally belongs to me)
Nice job ! I guess that you need to add rep.reverse() to get the last email with the code. otherwise you'll always get the first email.
We can get the auth code automatically using the following logic
Credits: Code inspired from twscrape (not all of the above code orginally belongs to me)