iterative / PyDrive2

Google Drive API Python wrapper library. Maintained fork of PyDrive.
https://docs.iterative.ai/PyDrive2
Other
581 stars 69 forks source link

access_token refresh causes threads to hang #163

Closed nice-shot closed 2 years ago

nice-shot commented 2 years ago

I've wrote a script to download a google folder and all of its subfolders. I'm using this script to create a backup for a large folder hierarchy and running it with multiple threads. The script uses a queue that stores drive folder ids and the corresponding local folder path to download them to. Each thread takes one of these items, starts downloading the files inside (it also checks if a file exists and skips it if the MD5 matches) and inserts any folders it finds into the queue.

Once the script runs for more than an hour the following log message appears in several threads: access_token is expired. Now: 2022-03-25 17:02:54.541287, token_expiry: 2022-03-25 17:02:54.164462 Following it, this log message appears in one of the threads: Refreshing access_token

The problem is that after this message appears, one or more of the threads starts hanging and will not continue execution. I'm not sure if it's an issue on my side or something about the way the token is renewed that causes this issue.

What can be causing these threads the hang? From my logs It doesn't seem like they're throwing an error or something...

This is the script's code:

import os
import sys
import time
import queue
import shutil
import logging
import hashlib
import argparse
import threading
from dataclasses import dataclass
from unicodedata import name

from httplib2.error import ServerNotFoundError
from socket import gaierror
from pydrive2.auth import GoogleAuth
from pydrive2.drive import GoogleDrive
from pydrive2.files import FileNotDownloadableError

FOLDER_MIMETYPE = "application/vnd.google-apps.folder"

SKIP_MIMETYPES = [
    "application/vnd.google-apps.document",
    "application/vnd.google-apps.presentation",
    "application/vnd.google-apps.spreadsheet",
]

@dataclass
class Stats:
    downloaded = 0
    skipped = 0
    deleted_files = 0
    deleted_folders = 0
    folders = 0

    def join(self, other):
        self.downloaded += other.downloaded
        self.skipped += other.skipped
        self.deleted_files += other.deleted_files
        self.deleted_folders += other.deleted_folders
        self.folders += other.folders

    def __str__(self):
        return f"""
        Downloaded: {self.downloaded}
        Skipped: {self.skipped}
        Folders: {self.folders}
        Deleted Files: {self.deleted_files}
        Deleted Folders: {self.deleted_folders}
        """

q = queue.Queue()
stats_lock = threading.Lock()
total_stats = Stats()

def create_drive():
    gauth = GoogleAuth()
    gauth.LocalWebserverAuth()

    return GoogleDrive(gauth)

def md5_file(file_path):
    md5 = hashlib.md5()
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            md5.update(chunk)
    return md5.hexdigest()

def download_files_recursive(drive, folder_id, output_folder, folder_name, log: logging.Logger):
    log.info(f"Reading folder: {folder_name} into {output_folder}")
    os.makedirs(output_folder, exist_ok=True)

    files = drive.ListFile({
        "q": f"'{folder_id}' in parents and trashed=false and mimeType != '{FOLDER_MIMETYPE}'"
    })
    valid_paths = []
    stats = Stats()

    # Downloading Files:
    for f in files.GetList():
        file_name = f["title"]
        if f["mimeType"] in SKIP_MIMETYPES:
            log.debug(f"Skipping file: {file_name} - irrelevant mimetype: {f['mimeType']}")
            continue
        output_path = os.path.join(output_folder, file_name)
        log.debug(f"Downloading file: {file_name} to {output_path}")

        if os.path.exists(output_path):
            log.debug("File exists - checking hash")
            output_md5 = md5_file(output_path)
            if f["md5Checksum"] == output_md5:
                log.debug("Checksum not changed - skipping")
                valid_paths.append(file_name)
                stats.skipped += 1
                continue
            else:
                log.debug("Checksum changed - downloading")

        f.GetContentFile(output_path)
        stats.downloaded += 1
        valid_paths.append(file_name)

    # Downloading folders:
    folders = drive.ListFile({
        "q": f"'{folder_id}' in parents and trashed=false and mimeType = '{FOLDER_MIMETYPE}'"
    })
    for f in folders.GetList():
        sub_folder_name = f["title"]
        q.put((f["id"], os.path.join(output_folder, sub_folder_name), sub_folder_name))
        valid_paths.append(sub_folder_name)
        stats.folders += 1

    # Removing non-existing files and folders
    for local_path in os.listdir(output_folder):
        if local_path not in valid_paths:
            full_path = os.path.join(output_folder, local_path)
            log.info(f"Local path: {full_path} - not in drive anymore - deleting")
            if os.path.isdir(full_path):
                stats.deleted_files += 1
                shutil.rmtree(full_path)
            elif os.path.isfile(full_path):
                stats.deleted_folders += 1
                os.remove(full_path)
            else:
                log.warning("WARNING - Path {full_path} is not a file or directory...")

    log.info(f"Finished going over folder: {folder_name} downloaded: {stats.downloaded} skipped: {stats.skipped}")
    return stats

def worker(drive):
    max_retrys = 10
    retrys = max_retrys
    sleep_time = 10
    log = logging.getLogger("drive-backup")
    while True:
        id, output, folder_name = q.get(10)
        retrys = 10
        succeeded = False
        while not succeeded and retrys > 0:
            try:
                stats = download_files_recursive(drive, id, output, folder_name, log)
                succeeded = True
            except Exception as e:
                retrys -= 1
                if retrys > 0:
                    log.error(f"Error downloading folder: {folder_name} - retry: {10 - retrys}")
                    log.error(e)
                    time.sleep(sleep_time)
                else:
                    log.error(f"Failed multiple retries")
                    q.task_done()
                    return

        with stats_lock:
            total_stats.join(stats)

        q.task_done()

if __name__ == "__main__":
    p = argparse.ArgumentParser()
    p.add_argument("id", help="Google drive folder id")
    p.add_argument("output", help="Directory to download the files to")
    p.add_argument("folder", help="Google drive folder name")
    p.add_argument("--threads", "-t", type=int, default=5)
    p.add_argument("--debug", "-d", help="Debug log file", )

    args = p.parse_args()

    ## Logging:

    log_handlers = []
    log_level = logging.INFO

    log_simple_format = "%(asctime)s\t%(threadName)s\t%(levelname)s\t%(message)s"
    log_debug_format = "%(asctime)s\t%(threadName)s\t%(levelname)s\t%(module)s\t%(message)s"
    log_date_format = "%Y-%m-%d %H:%M:%S"

    log_simple_handler = logging.StreamHandler(sys.stdout)
    log_simple_handler.setLevel(logging.INFO)
    log_simple_handler.setFormatter(logging.Formatter(log_simple_format, datefmt=log_date_format))

    log_handlers.append(log_simple_handler)

    if args.debug:
        log_debug_handler = logging.FileHandler(args.debug, encoding="utf-8")
        log_debug_handler.setLevel(logging.DEBUG)
        log_debug_handler.setFormatter(logging.Formatter(log_debug_format, datefmt=log_date_format))
        log_handlers.append(log_debug_handler)
        log_level = logging.DEBUG

    logging.basicConfig(
        handlers=log_handlers,
        level=log_level,
    )

    drive = create_drive()
    start_time = time.time()
    q.put((args.id, args.output, args.folder))

    threads = []
    for i in range(args.threads):
        t = threading.Thread(target=worker, name=f"T{i}", args=[drive], daemon=True)
        threads.append(t)
        t.start()

    q.join()
    end_time = time.time()
    logging.info("Total time: %d seconds", int(end_time - start_time))
    logging.info("Final stats: %s", total_stats)
shcheklein commented 2 years ago

@nice-shot what happens if you terminate it? does it print a stack trace, could you please share it?

shcheklein commented 2 years ago

@nice-shot one more question - do you provide a custom settings.yaml file? what settings does it have? specifically, do you use save_credentials option or not? If not, could you please try to enable it as described here https://docs.iterative.ai/PyDrive2/oauth/#automatic-and-custom-authentication-with-settings-yaml ?

nice-shot commented 2 years ago

This is the settings.yaml file:

client_config_backend: file

save_credentials: True
save_credentials_backend: file
save_credentials_file: credentials.json

get_refresh_token: True

Unfortunately, because of the threads I can't seem to terminate the script using Ctrl-C. I have to close the terminal window so I don't get a stack trace. Based on the log I can see that the hanging threads reach one of the following lines:

f.GetContentFile(output_path)

or

for f in files.GetList():

Before emitting the "access_token is expired" log message and then stop writing any further logs.

shcheklein commented 2 years ago

Hmm, I can't reproduce this, that's what I see:

2022-04-03 19:35:36 T2  INFO    access_token is expired. Now: 2022-04-04 02:35:36.493511, token_expiry: 2022-04-04 02:35:36.464825
2022-04-03 19:35:36 T2  INFO    access_token is expired. Now: 2022-04-04 02:35:36.494126, token_expiry: 2022-04-04 02:35:36.464825
2022-04-03 19:35:36 T1  INFO    access_token is expired. Now: 2022-04-04 02:35:36.494502, token_expiry: 2022-04-04 02:35:36.464825
2022-04-03 19:35:36 T1  INFO    access_token is expired. Now: 2022-04-04 02:35:36.494678, token_expiry: 2022-04-04 02:35:36.464825
2022-04-03 19:35:36 T2  INFO    Refreshing access_token
2022-04-03 19:35:36 T1  INFO    Refreshing access_token
2022-04-03 19:35:36 T1  ERROR   Error downloading folder: 46 - retry: 1
2022-04-03 19:35:36 T1  ERROR   release unlocked lock
2022-04-03 19:35:46 T1  INFO    Reading folder: 46 into 22/46
....

Do you also have this release unlocked lock error? What OS and filesystem do you use? What is your Python version?

nice-shot commented 2 years ago

Yes I do get the release unlocked lock error, but the threads that show it keep working afterwards. I have windows 10, python version 3.71 and I’m running the script on an external exFat drive.

shcheklein commented 2 years ago

@nice-shot should be fixed in #164 I hope, give us some time to test it. Thanks for the report, it's indeed a critical non thread safety issue.

shcheklein commented 2 years ago

@nice-shot for the record - 1.10.1 was released on PyPi and conda

nice-shot commented 2 years ago

I've tested the new version and it seems to be working now! Thanks! I do sometimes get HttpError 500 but it's solved after one retry and I suppose that's more to do on Google's side.