frequenz-floss / frequenz-channels-python

Channel implementations for Python
https://frequenz-floss.github.io/frequenz-channels-python/
MIT License
7 stars 8 forks source link

Support filtering the messages on a receiver #303

Closed shsms closed 3 months ago

shsms commented 3 months ago

A new filter method is added to the Receiver interface, which allows the application of a filter function on the messages on a receiver.

Example:

async for message in receiver.filter(lambda num: num % 2):
    print(f"An even number: {message}")
llucax commented 3 months ago

What about adding an async_filter to the new core repo instead? This would be useful with any async iterator. I would also remove map() and add an async_map() to core.

llucax commented 3 months ago

There is also https://snyk.io/advisor/python/asyncstdlib. Maybe we should start using that? It is strange that this is not part of Python stdlib already.

llucax commented 3 months ago

https://asyncstdlib.readthedocs.io/en/latest/source/api/builtins.html#asyncstdlib.builtins.filter

llucax commented 3 months ago

If we agree it is a good idea we could add it to repo-config so it is used by default, like we do with typing_extensions.

shsms commented 3 months ago

What about adding an async_filter to the new core repo instead? This would be useful with any async iterator. I would also remove map() and add an async_map() to core.

This gives out Receivers, we can do all the things we can do with receivers with them. Maybe we can have an additional async_filter or use asyncstd, but having it here is still valuable.

shsms commented 3 months ago

If we agree it is a good idea we could add it to repo-config so it is used by default, like we do with typing_extensions.

I do agree, it would be very useful, but I don't think it can replace this PR, because the filter in this PR returns receivers, which we can put in select or any of the many possibilities.

llucax commented 3 months ago

Right. OK, and I guess it is useful to get a receiver because it can be used in select(). It came to my mind a couple of times that maybe we should have a utility receiver that converts any async iterator into a receiver. So we could potentially do:

filtered_receiver = as_receiver(a.filter(lambda num: num % 2), original_receiver)
async for selected in select(filtered_receiver, ...):
    print(f"An even number: {message}")

But if we want to move in that direction, we'll eventually have to remove map() too, and that will be a breaking change, so I'm happy to leave that for 2.0 and add filter now if it is useful.

Will start the review now then...

llucax commented 3 months ago

What about adding an async_filter to the new core repo instead? This would be useful with any async iterator. I would also remove map() and add an async_map() to core.

This gives out Receivers, we can do all the things we can do with receivers with them. Maybe we can have an additional async_filter or use asyncstd, but having it here is still valuable.

shsms commented 3 months ago

Based on #301

llucax commented 3 months ago

Also tests are failing:

src/frequenz/channels/_receiver.py:451:101: E501 line too long (138 > 100 characters)
shsms commented 3 months ago

Fixed the long line