BHoM / LadybugTools_Toolkit

GNU Lesser General Public License v3.0
2 stars 2 forks source link

Compare EPWs scripts #218

Open jamesramsden-bh opened 1 week ago

jamesramsden-bh commented 1 week ago

Tristan to provide his scripts to Louise for comparing EPW files. Louise to incorporate useful bits of code into LBT_TK.

jamesramsden-bh commented 1 week ago

Also see #86 for an example Tristan provided before

tg359 commented 23 hours ago

@louiseholwayjpeg, I've got a rough attempt at creating an object which compares EPWs. It works, but it's not perfect and could be enhanced with a few more features.

What is missing are a few other ways of visualising the data (how do their wind profiles compare for example) and ways of creating automatic comparison summary text to help the user figure out which is the "best" dataset to use for different purposes (e.g. the hottest, the coldest, the windiest, etc.).

The plot methods already on the object are self-contained here too, built eventually will be moved into some more generic location too.

import calendar
import functools
import warnings
from pathlib import Path

import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import numpy as np
import pandas as pd
from ladybug.epw import EPW, Location
from ladybugtools_toolkit.ladybug_extension.datacollection import \
    summarise_collection
from ladybugtools_toolkit.ladybug_extension.epw import (
    HourlyContinuousCollection, average_collection, average_epw,
    average_location, degree_time, epw_to_dataframe)
from ladybugtools_toolkit.ladybug_extension.header import header_to_string
from matplotlib.axes import Axes

MAX_DISTANCE = 25  # km, between EPWs before warning raised
VARIABLES = [
            "aerosol_optical_depth",
            "albedo",
            "atmospheric_station_pressure",
            "ceiling_height",
            "days_since_last_snowfall",
            "dew_point_temperature",
            "diffuse_horizontal_illuminance",
            "diffuse_horizontal_radiation",
            "direct_normal_illuminance",
            "direct_normal_radiation",
            "dry_bulb_temperature",
            "extraterrestrial_direct_normal_radiation",
            "extraterrestrial_horizontal_radiation",
            "global_horizontal_illuminance",
            "global_horizontal_radiation",
            "horizontal_infrared_radiation_intensity",
            "liquid_precipitation_depth",
            "liquid_precipitation_quantity",
            "opaque_sky_cover",
            "precipitable_water",
            "present_weather_codes",
            "present_weather_observation",
            "relative_humidity",
            "snow_depth",
            "total_sky_cover",
            "visibility",
            "wind_direction",
            "wind_speed",
            "years",
            "zenith_luminance",
        ]

def location_distance(location1: Location, location2: Location) -> float:
    """
    Calculate the great circle distance between two points on the earth (specified in decimal degrees)

    Args:
        location1: Location object of the first location
        location2: Location object of the second location

    Returns:
        distance: The distance between the two locations in km
    """

    r = 6373.0  # approximate radius of earth in km
    lat1 = np.radians(location1.latitude)
    lon1 = np.radians(location1.longitude)
    lat2 = np.radians(location2.latitude)
    lon2 = np.radians(location2.longitude)
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = np.sin(dlat / 2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2)**2
    c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
    distance = r * c
    return distance

class EPWComparison:
    """A class for the holding of EPW objects and comparison of their data.

    Args:
        epws: A list of EPW objects to compare

    """
    def __init__(self, epws: list[EPW]):
        self.epws = epws

        self._assign_properties()

    def __repr__(self) -> str:
        return f"EPWComparison({self.epw_ids})"

    @property
    def epws(self):
        """The list of EPW objects to compare."""
        return self._epws

    @epws.setter
    def epws(self, d):
        if not isinstance(d, (list, tuple)):
            raise ValueError("epws must be a list")
        for epw in d:
            if not isinstance(epw, EPW):
                raise ValueError("epws must be a 1d list of EPW objects")
        if len(set(d)) != len(d):
            raise ValueError("epws must contain unique EPW objects")
        if len(d) < 2:
            raise ValueError("epws must contain at least 2 EPW objects")
        if len(d) > 24:
            raise ValueError("epws must contain 24 or fewer EPW objects")

        # check that EPWs are within a certain distance of each other and raise a warning if not
        locations = [i.location for i in d]
        for loc in locations:
            distance = location_distance(locations[0], loc)
            if distance > MAX_DISTANCE:
                warnings.warn(f"EPWs are not within {MAX_DISTANCE} km of each other", UserWarning)

        self._epws = d

    @functools.cached_property
    def average_epw(self) -> EPW:
        """Get the average EPW object of the EPWs."""
        return average_epw(self.epws)

    @property
    def epw_ids(self) -> list[str]:
        """The IDs of the EPWs."""
        return [Path(i.file_path).stem for i in self.epws]

    @property
    def locations(self) -> list[Location]:
        """The locations of the EPWs."""
        return [i.location for i in self.epws]

    @property
    def average_location(self) -> Location:
        """The average location of the EPWs."""
        return average_location([i.location for i in self.epws])

    @functools.cached_property
    def df(self) -> pd.DataFrame:
        """A DataFrame of the EPW data."""
        dfs = []
        for i in self.epws:
            dfs.append(epw_to_dataframe(i, include_additional=True))
        return pd.concat(dfs, axis=1, keys=self.epw_ids)

    def _assign_properties(self) -> None:
        """Assign properties of the EPW objects to @property objects of this object."""
        # run the df method to ensure that the data is cached
        df = self.df.swaplevel(axis=1)

        # get the attribute from the EPW objects
        for prop in VARIABLES:
            setattr(self, prop, df[header_to_string(getattr(self.epws[0], prop).header)])

    def _get_collections(self, variable: str) -> list[HourlyContinuousCollection]:
        """Get a list of collections of a variable from the EPWs."""
        return [getattr(epw, variable) for epw in self.epws]

    def _get_series(self, variable: str) -> list[pd.Series]:
        """Get a list of series of a variable from the EPWs."""
        if variable not in VARIABLES:
            raise ValueError(f"{variable} is not a valid variable. Must be one of {VARIABLES}")
        df = self.df.swaplevel(axis=1)
        return df[header_to_string(getattr(self.epws[0], variable).header)]

    def average_collection(self, variable: str) -> HourlyContinuousCollection:
        """Get the average collection of a variable from the EPWs."""
        return average_collection(self._get_collections(variable=variable))

    def statistics(self, variable: str) -> pd.DataFrame:
        """Get statistics for a variable from the EPW data.

        Args:
            variable: The variable to get statistics for.

        Returns:
            DataFrame: A DataFrame of statistics for the variable.

        """
        return self._get_series(variable=variable).describe().T

    def summary(self, variable: str) -> pd.DataFrame:
        """Get a summary of a variable from the EPW data.

        Args:
            variable: The variable to get a summary for.

        Returns:
            DataFrame: A DataFrame of the summary for the variable.

        """
        if variable not in VARIABLES:
            raise ValueError(f"{variable} is not a valid variable. Must be one of {VARIABLES}")

        summaries = []
        for collection, epw_id in zip(*[self._get_collections(variable=variable), self.epw_ids]):
            summaries.append([epw_id] + summarise_collection(collection))

        return summaries

    def degree_time(self, heat_base: float = 18, cool_base: float = 23, return_type: str = "days") -> pd.DataFrame:
        """Create a summary of degree time for the EPW data."""
        return degree_time(self.epws, heat_base=heat_base, cool_base=cool_base, return_type=return_type)

    def histogram(self, variable: str, overlap: bool = True, ax: Axes = None, **kwargs) -> Axes:
        """Plot a histogram of a variable from the EPW data.

        Args:
            variable: The variable to plot.
            overlap: If True, the histograms are overlapped. If False, they are arranged next to each other.
            ax: The matplotlib axes to plot on. If None, the current axes are used.
            **kwargs: Additional keyword arguments to pass to the matplotlib hist method.

        Returns:
            Axes: The matplotlib axes object.

        """

        if ax is None:
            ax = plt.gca()

        data = self._get_series(variable=variable)

        color = kwargs.pop("color", None)
        bins = kwargs.pop("bins", np.linspace(data.values.min(), data.values.max(), 10))
        if isinstance(bins, int):
            bins = np.linspace(data.values.min(), data.values.max(), bins)
        density = kwargs.pop("density", False)

        if color is None:
            color = plt.rcParams['axes.prop_cycle'].by_key()['color'][:len(self.epws)]
        if len(color) < len(self.epws):
            raise ValueError("color must have a color for each EPW")

        if isinstance(bins, (list, tuple, np.ndarray)):
            xlim = min(bins), max(bins)
        else:
            xlim = min(data.min()), max(data.max())

        if overlap:
            for n, _ in enumerate(self.epws):
                ax.hist(data.T.values[n], label=self.epw_ids, color=color[n], bins=bins, density=density, **kwargs)
        else:
            _, edges, _ = ax.hist(data.values, label=self.epw_ids, color=color, bins=bins, density=density, **kwargs)
            # ax.set_xticks((edges[1:] + edges[:-1]) / 2)
            ax.set_xticks(edges)
        # return _

        ax.set_xlim(xlim)
        ax.set_xlabel(header_to_string(getattr(self.epws[0], variable).header))
        ax.set_ylabel("Frequency" if density else "Count")
        if density:
            ax.yaxis.set_major_formatter(mtick.PercentFormatter(1))
        ax.legend(
            bbox_to_anchor=(1, 1), loc="upper left", ncol=1
        )
        return ax

    def line(self, variable: str, ax: Axes = None, **kwargs) -> Axes:
        """Plot a line plot of a variable from the EPW data.

        Args:
            variable: The variable to plot.
            ax: The matplotlib axes to plot on. If None, the current axes are used.
            **kwargs: Additional keyword arguments to pass to the matplotlib plot method.

        Returns:
            Axes: The matplotlib axes object.

        """

        if ax is None:
            ax = plt.gca()

        data = self._get_series(variable=variable)

        color = kwargs.pop("color", None)
        if color is None:
            color = plt.rcParams['axes.prop_cycle'].by_key()['color'][:len(self.epws)]
        if len(color) < len(self.epws):
            raise ValueError("color must have a color for each EPW")

        data.plot(ax=ax, **kwargs)
        ax.set_xlabel("Hour of Year")
        ax.set_ylabel(header_to_string(getattr(self.epws[0], variable).header))
        ax.legend(
            bbox_to_anchor=(1, 1), loc="upper left", ncol=1
        )
        return ax

    def diurnal(self, variable: str, month: int, ax: Axes = None, agg: str = "mean", **kwargs) -> Axes:
        """Plot a diurnal plot of a variable from the EPW data."""

        if month < 1 or month > 12:
            raise ValueError("month must be between 1 and 12")

        if ax is None:
            ax = plt.gca()

        data = self._get_series(variable=variable)
        data = data.loc[data.index.month == month]
        data = data.groupby(data.index.hour).agg(agg)
        data.loc[len(data)] = data.loc[0].values

        color = kwargs.pop("colors", None)
        if color is None:
            color = plt.rcParams['axes.prop_cycle'].by_key()['color'][:len(self.epws)]
        if len(color) < len(self.epws):
            raise ValueError("color must have a color for each EPW")

        data.plot(ax=ax, color=color, **kwargs)
        ax.set_xlabel("Hour of Day")
        ax.set_ylabel(header_to_string(getattr(self.epws[0], variable).header))
        ax.set_title(f"{calendar.month_name[month]} ({agg})")
        ax.legend(
            bbox_to_anchor=(1, 1), loc="upper left", ncol=1
        )
        ax.set_xlim(0, 24)
        ax.set_xticks([0, 3, 6, 9, 12, 15, 18, 21])
        return ax