databrickslabs / tempo

API for manipulating time series on top of Apache Spark: lagged time values, rolling statistics (mean, avg, sum, count, etc), AS OF joins, downsampling, and interpolation
https://pypi.org/project/dbl-tempo
Other
308 stars 53 forks source link

Interpolate: Support "limit" #147

Open epa095 opened 2 years ago

epa095 commented 2 years ago

Pandas supports a parameter limit : " Maximum number of consecutive NaNs to fill. Must be greater than 0."

It would be usefull if tempo supported something simililar. The pandas version is a bit weird (imo) in that it still interpolates up to that nr of NaNs, and then just stops. So if you have a resampling of 1 min, a gap of 1,5 hours (90 buckets) and a limit of 1 hour (60 buckets) it will interpolate out 60 bucketes into the gap, and leave 30 NaNs. To me the reasonable implementation of "limt" will avoid the whole stretch, and interpolate none of the NaNs, since the reasonable interpretation of the limit is something like "if the gap is this large we have to little information".

epa095 commented 2 years ago

If someone else is looking for this feature I can share the hacky way I implemented it so far (but it would definitely be better if it was included in tempo):


def filter_on_consecutive_equal_values(
    df,
    search_column: str,
    order_column: str,
    max_consecutive_rows: int,
    search_value,
    partition: Optional[Union[str, List[str]]] = None,
    include_debug_columns=False,
):
    """This searches for consecutive occurrences of a value in a given column (
    possible partitioned on columns in "partition"), and if it finds streaks longer than
    max_consecutive_rows it removes *all* rows in that streak.

    Parameters
    ----------
    df
        PySpark dataframe to filter
    search_column
        Column to search for large streaks in
    order_column
        To find streaks we need a order, and this column describes that order.
    max_consecutive_rows
        Max allowed streak size. Any streak strictly larger than this will be removed.
    search_value
        Value to search for
    partition
        Optional column to partition on.
    include_debug_columns : bool
        If true then keep generated columns for debug
    """
    if partition is None:
        partition = []
    elif isinstance(partition, str):
        partition = [partition]

    has_value = generate_unique_column_name(df, "has_value")
    start_streak = generate_unique_column_name(df, "start_streak")
    streak_id = generate_unique_column_name(df, "streak_id")
    streak_count = generate_unique_column_name(df, "streak_count")
    df = (
        # Indicating if this row has the value we are limiting.
        df.withColumn(
            has_value, F.when(F.col(search_column) == search_value, 1).otherwise(0)
        )
        # 1 if this is the start of a streak of the value we are searching for in
        # this partition, 0 otherwise.
        .withColumn(
            start_streak,
            F.when(
                (F.col(has_value) == 1)
                & (
                    (F.col(has_value) == 1)
                    != F.lead(has_value, -1).over(
                        Window.partitionBy(partition).orderBy(order_column)
                    )  # Note: Because of Null we must not just check if prev is 0
                ),
                1,
            ).otherwise(0),
        )
        # The cumulative sum of start_streak's (which are 1 or 0) inside this
        # partition gives this streaks streak_id (a running nr from 1 and up). All rows
        # without the value has streak_id==0
        .withColumn(
            streak_id,
            F.when(
                (F.col(has_value) == 1),
                F.sum(start_streak).over(
                    Window.partitionBy(partition).orderBy(order_column)
                ),
            ).otherwise(0),
        )
        # Now we add the count of rows inside each partition-streak
        .withColumn(
            streak_count,
            F.when(
                (F.col(has_value) == 1),
                F.count("*").over(Window.partitionBy(partition + [streak_id])),
            ).otherwise(0),
        )
        # We filter away all rows which are part of an oversized streak.
        .where((F.col(has_value) == 0) | (F.col(streak_count) <= max_consecutive_rows))
    )

    if not include_debug_columns:
        df = df.drop(
            *[
                has_value,
                start_streak,
                streak_id,
                streak_count,
            ]
        )
    return df

def generate_unique_column_name(df, base_name):
    """Generates column name based on base_name which does not exist in df. If
    base_name not in df.columns it will return base_name."""
    propsal = base_name
    while propsal in df.columns:
        random_suffix = "".join(
            random.choices(string.ascii_uppercase + string.digits, k=5)
        )
        propsal = base_name + "_" + random_suffix
    return propsal

And then after interpolating into resampled with show_interpolated=True I filter like this:

df = filter_on_consecutive_equal_values(
    df=resampled.df,
    search_column=f"is_interpolated_{value_col_name}",
    order_column=ts_col,
    max_consecutive_rows=max_consecutive_interpolated_rows,
    search_value=True,
    partition=partition_cols,
    include_debug_columns=False,
)