frequenz-floss / frequenz-sdk-python

Frequenz Python Software Development Kit (SDK)
https://frequenz-floss.github.io/frequenz-sdk-python/
MIT License
13 stars 17 forks source link

Simple SDK app does not recover after networking service restart #1036

Closed daniel-zullo-frequenz closed 2 weeks ago

daniel-zullo-frequenz commented 1 month ago

What happened?

The resampler is not able to get power metrics after restarting the networking service where a simple SDK application is running

__main__ INFO:Latest battery consumption: Sample(timestamp=datetime.datetime(2024, 8, 12, 12, 49, 57, tzinfo=datetime.timezone.utc), value=Power(value=0.0, exponent=0))
__main__ INFO:Latest net consumption: Sample(timestamp=datetime.datetime(2024, 8, 12, 12, 49, 57, tzinfo=datetime.timezone.utc), value=None)
frequenz.sdk.timeseries._resampling WARNING:No relevant samples found for: component_metric_request<namespace=grid-c01a4328-2e56-4017-b3df-594723abe827,component_id=1001,metric_id=ACTIVE_POWER,start=None>
frequenz.sdk.timeseries._resampling WARNING:No relevant samples found for: component_metric_request<namespace=battery-pool-frozenset({2009, 2003, 2006})-80a2a06b-34db-4545-8d92-01eeb4ef53da,component_id=2002,metric_id=ACTIVE_POWER,start=None>
frequenz.sdk.timeseries._resampling WARNING:No relevant samples found for: component_metric_request<namespace=battery-pool-frozenset({2009, 2003, 2006})-80a2a06b-34db-4545-8d92-01eeb4ef53da,component_id=2005,metric_id=ACTIVE_POWER,start=None>
frequenz.sdk.timeseries._resampling WARNING:No relevant samples found for: component_metric_request<namespace=battery-pool-frozenset({2009, 2003, 2006})-80a2a06b-34db-4545-8d92-01eeb4ef53da,component_id=2008,metric_id=ACTIVE_POWER,start=None>

Also pleae notice that a similar behaviour is observed when pausing the application with ctrl-z and resuming it after +20 minutes.

What did you expect instead?

The resampler recovers getting new power values after a networking service restart.

Affected version(s)

Observed in v0.25.2, v1.0.0-rc601 and higher versions

Affected part(s)

Core components (data structures, etc.) (part:core), Microgrid (API, component graph, etc.) (part:microgrid)

Extra information

Simple application to reproduce the issue:

# License: All rights reserved
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Application to read net and battery pool power consumption data."""

import asyncio
import logging
import os
from datetime import timedelta

from frequenz.channels import Receiver, select, selected_from
from frequenz.sdk.actor import Actor
from frequenz.sdk import actor, microgrid
from frequenz.sdk.timeseries import Power, Sample
from frequenz.sdk.timeseries._resampling import ResamplerConfig

_logger = logging.getLogger(__name__)

class PowerReadingActor(Actor):
    """Actor to read net and battery pool power consumption data."""

    def __init__(
        self,
        recv_net_consumption: Receiver[Sample[Power]],
        recv_bat_consumption: Receiver[Sample[Power]],
    ):
        """Initialize the actor.

        Args:
            recv_net_consumption: Receiver channel for grid power consumption
                without battery pool power consumption data.
            recv_bat_consumption: Receiver channel for battery pool power
                consumption data.
        """
        super().__init__()
        self._recv_net_consumption = recv_net_consumption
        self._recv_bat_consumption = recv_bat_consumption

    async def _run(self) -> None:
        """Run the actor."""
        async for selected in select(
            self._recv_net_consumption,
            self._recv_bat_consumption,
        ):
            if selected_from(selected, self._recv_net_consumption):
                net_consumption_sample = selected.message
                if not net_consumption_sample:
                    raise ValueError("Net consumption data channel closed.")
                _logger.info("Latest net consumption: %s", net_consumption_sample)

            elif selected_from(selected, self._recv_bat_consumption):
                bat_consumption_sample = selected.message
                if not bat_consumption_sample:
                    raise ValueError("Battery pool consumption data channel closed.")
                _logger.info("Latest battery consumption: %s", bat_consumption_sample)

async def run() -> None:
    """Run the actor."""
    logging.basicConfig(
        level=logging.DEBUG, format="%(asctime)s %(name)s %(levelname)s:%(message)s"
    )

    microgrid_url = os.getenv(
        key="MICROGRID_API_URL", default="grpc://[2a01:4f8:c010:1adf::1]:62060"
    )

    await microgrid.initialize(
        server_url=microgrid_url,
        resampler_config=ResamplerConfig(
            resampling_period=timedelta(seconds=1.0),
        ),
    )

    grid = microgrid.grid()
    battery_pool = microgrid.new_battery_pool(priority=1)

    net_power_formula = (grid.power - battery_pool.power).build("net_power")

    power_reading_actor = PowerReadingActor(
        recv_net_consumption=net_power_formula.new_receiver(),
        recv_bat_consumption=battery_pool.power.new_receiver(),
    )

    await actor.run(power_reading_actor)

def main() -> None:
    """Run the application."""
    asyncio.run(run())

if __name__ == "__main__":
    main()

Commands to restart the networking service:

sudo nmcli networking off
sleep 5
sudo nmcli networking on
llucax commented 1 month ago

Something to look into: using keepalive: https://grpc.io/docs/guides/keepalive/

We probably should add keepalive options (with some sane defaults) to the base client URL parsing, and my feeling is we need to re-create the gRPC channel if the connection dropped.

daniel-zullo-frequenz commented 1 month ago

Nothing is received/raised calling the stream method after restarting the networking service and the retry strategy never kicked in, I'm referring to this line of code here https://github.com/frequenz-floss/frequenz-client-base-python/blob/v0.x.x/src/frequenz/client/base/streaming.py#L89

First problem is a missing timeout for the streaming component data, I mean the timeout is always set to None. Though adding a timeout only helps the retry strategy take effect but the gRPC stream still seems dead.

Then the stream method https://github.com/frequenz-floss/frequenz-client-base-python/blob/v0.x.x/src/frequenz/client/base/streaming.py#L40-L41 is called on each retry but it does not have any effect if the gRPC stream is already dead.

The client might need to recreate https://github.com/frequenz-floss/frequenz-client-microgrid-python/blob/v0.x.x/src/frequenz/client/microgrid/_client.py#L242-L244 when the gRPC stream is dead

daniel-zullo-frequenz commented 1 month ago

The only working solution I found so far (without considering keepallive) is re-creating the gRPC channel

daniel-zullo-frequenz commented 1 month ago

Tested using keepalive options and it works. This should probably be the long run solution

daniel-zullo-frequenz commented 1 month ago

The fix needs to be addressed for https://github.com/frequenz-floss/frequenz-client-base-python/issues/68

llucax commented 4 weeks ago

Do you have any minimal test case to reproduce this that doesn't depend on the SDK (i.e. it uses only the client)? We need to figure out at which level we should re-create the channel too, or did you observed that when using keep-alive the channel automatically recovers (re-connects)?

daniel-zullo-frequenz commented 4 weeks ago

I observed that using keep-alive the gRPC channel recovers. I only have a small program to reproduce the problem without the SDK. You can set timeout or/and configure keep-alive to see the different behavior.

import asyncio
from collections.abc import AsyncIterator
from typing import Any, cast

import grpc.aio as grpcaio
from frequenz.api.microgrid.microgrid_pb2 import ComponentData as PbComponentData
from frequenz.api.microgrid.microgrid_pb2 import ComponentIdParam as PbComponentIdParam
from frequenz.api.microgrid.microgrid_pb2_grpc import MicrogridStub
from frequenz.client.base import streaming

from frequenz.client.microgrid._component_data import MeterData

HOST = "[2a01:4f8:c010:1adf::1]"
PORT = 62060
COMPONENT_ID = 1001

TIMEOUT: float| None = None #10.0

keepalive_options = [
    # ("grpc.keepalive_time_ms", 10000),  # 10 seconds
    # ("grpc.keepalive_timeout_ms", 5000),  # 5 seconds
    # ("grpc.keepalive_permit_without_calls", 1),  # Allow pings even without active calls
    # ("grpc.http2.max_pings_without_data", 0),  # Unlimited pings without data
]

async def run() -> None:
    """Run the main function."""
    target = f"{HOST}:{PORT}"
    grpc_channel = grpcaio.insecure_channel(target, options=keepalive_options)
    microgrid_grpc_stub = MicrogridStub(grpc_channel)

    broadcaster = streaming.GrpcStreamBroadcaster(
        f"raw-component-data-{COMPONENT_ID}",
        lambda: cast(
            AsyncIterator[PbComponentData],
            microgrid_grpc_stub.StreamComponentData(PbComponentIdParam(id=COMPONENT_ID),
                                                    timeout=TIMEOUT),
        ),
        transform=MeterData.from_proto,
        retry_strategy=None,
    )

    broadcaster_recv = broadcaster.new_receiver()

    async for data in broadcaster_recv:
        print(data)

asyncio.run(run())
daniel-zullo-frequenz commented 4 weeks ago

Please note that there is no timeout currently set for the microgrid API client when it calls StreamComponentData to construct GrpcStreamBroadcaster. In the example above I was using it just for testing purposes but it is ~probably~ NOT needed ~if gRPC channel keep-alive is configured~ for gRPC stream.

llucax commented 4 weeks ago

Thanks @daniel-zullo-frequenz , I will have a look and do some tests soon and create issues where appropriate (most likely client-base).

llucax commented 2 weeks ago

I think we already figured out what the problem is and that it should be fixed at a lower level than the SDK (API client/server). So I will close for now, we can reopen if we feel we need to gather more info.