Monitoring-Mtl / PyTransit-MicroServices

Service AWS Lambda Python pour analyser et filtrer les données
0 stars 0 forks source link

STM_Merge_Daily_GTFS_VehiclePositions Revoir la notion de MultiThread #39

Open abouillon-ets opened 11 months ago

abouillon-ets commented 11 months ago

Revoir la notion de MultiThread pour la récupération des fichiers dans S3, la récupération des "key" devraient aussi être fait en utilisant le MultiThread.

Mesurer le temps pour chaque processus pour déterminer d'où la lenteur provient.

# Paginate through files in the S3 bucket
    paginator = s3.get_paginator('list_objects_v2')
    file_keys = []
    for page in paginator.paginate(Bucket=input_bucket, Prefix=prefix):
        for content in page['Contents']:
            file_key = content['Key']
            if file_key.endswith('.parquet'):
                file_keys.append(file_key)
                try:
                    df = download_from_s3(input_bucket, file_key)
                    all_columns.update(df.columns)
                except Exception as e:
                    print(f"Error processing file: {file_key}")
                    print(e)
                    continue

Cette fonction devrait faire usage de

concurrent.futures.ThreadPoolExecutor
abouillon-ets commented 10 months ago

Pour les opérations de I/O la limitation provient de la vitesse de transfert entre S3 et Lambda. Pour les opérations sur la concaténation des DataFrame la limitation est sur Lambda (10 workers est le nombre que Lambda nous donnent un avertissement). Ceci réduit considérablement le temps d'exécution des fonctions (testé sur un autre compte)