kaburia / filter-stations

Making it easier to navigate and clean TAHMO weather station data for ML development
https://pypi.org/project/filter-stations/
16 stars 2 forks source link

Generators #2

Open kaburia opened 1 year ago

kaburia commented 1 year ago

Use generators in the multiple_measurements function to reduce memory usage Alternatively to find a more optimal way rather than for loops

kaburia commented 1 year ago

resulted to multi processing def multiple_measurements(self, stations_list, csv_file, startDate, endDate, variables, dataset='controlled', aggregate=True): """ Retrieves measurements for multiple stations and saves the aggregated data to a CSV file.

    Parameters:
    -----------
    - stations_list (list): A list of strings containing the names of the stations to retrieve data from.
    - csv_file (str): The name of the CSV file to save the data to.
    - startDate (str): The start date for the measurements, in the format 'yyyy-mm-dd'.
    - endDate (str): The end date for the measurements, in the format 'yyyy-mm-dd'.
    - variables (list): A list of strings containing the names of the variables to retrieve.
    - dataset (str): The name of the dataset to retrieve the data from. Default is 'controlled'.

    Returns:
    -----------
    - df (pandas.DataFrame): A DataFrame containing the aggregated data for all stations.

    Raises:

        ValueError: If stations_list is not a list.
    """
    if not isinstance(stations_list, list):
        raise ValueError('Pass in a list')

    error_dict = {}
    pool = mp.Pool(processes=mp.cpu_count())  # Use all available CPU cores

    try:
        results = []
        with tqdm(total=len(stations_list), desc='Retrieving data for stations') as pbar:
            for station in stations_list:
                results.append(pool.apply_async(self.retrieve_data, args=(station, startDate, endDate, variables, dataset, aggregate), callback=lambda _: pbar.update(1)))

            pool.close()
            pool.join()

        df_stats = [result.get() for result in results if isinstance(result.get(), pd.DataFrame)]

        if len(df_stats) > 0:
            df = pd.concat(df_stats, axis=1)
            df.to_csv(f'{csv_file}.csv')
            return df
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        pool.terminate()
kaburia commented 4 months ago

The method is well optimized for requesting single variables with a list of stations to get data and might not work as well given multiple variables together with multiple stations list