Closed MrNaif2018 closed 2 years ago
Thanks @MrNaif2018 , you're right we should make the add
method a coroutine.
In this case I guess I should update that in default channel layer and in readme, right?
@eliclement I have changed create_redis
to create_redis_pool.
Also added flush method which was missing and I made all public apis async(and updated README).
@taoufik07 Is it ready to merge?
Actually why does name.isindentifier()(validate_group
) check is added? Does it break something if channel name is just numeric?
@taoufik07 I have fixed all the reviewed issues.
But hgetall
doesn't support async for
interface.
@taoufik07 can this pull request be merged?
@taoufik07 Could this pr get merged, more than two months already passed? If you can't maintain this project, I can probably maintain it. What do you think?
@taoufik07 are there any code review or suggestions needed or to add to get this merged?
This was long time ago, but if someone needs just something like redis for websocket sync, you can use the following utility functions:
async def make_subscriber(name):
subscriber = await aioredis.create_redis_pool(settings.REDIS_HOST)
res = await subscriber.subscribe(f"channel:{name}")
channel = res[0]
return subscriber, channel
async def publish_message(channel, message):
return await settings.redis_pool.publish_json(f"channel:{channel}", message)
Where settings.redis_pool can be created like so:
loop = asyncio.get_event_loop()
redis_pool = None
async def init_redis():
global redis_pool
redis_pool = await aioredis.create_redis_pool(REDIS_HOST)
loop.create_task(init_redis())
And in websocket route:
@router.websocket_route("/ws/some/endpoint")
class WebsocketTestEndpoint(WebSocketEndpoint):
subscriber = None
async def on_connect(self, websocket, **kwargs):
await websocket.accept()
self.channel_name = secrets.token_urlsafe(32) # or other id
self.subscriber, self.channel = await utils.make_subscriber(self.wallet_id)
settings.loop.create_task(self.poll_subs(websocket))
async def poll_subs(self, websocket):
while await self.channel.wait_message():
msg = await self.channel.get_json()
await websocket.send_json(msg)
async def on_disconnect(self, websocket, close_code):
if self.subscriber:
await self.subscriber.unsubscribe(f"channel:{self.channel_name}")
See example in my repo
I doubt this PR would get merged, as the project seems unmaintained, so here's a custom solution I use.
@MrNaif2018 thank you for this custom solution! In the meanwhile Encode is also working on a nice simple broadcast API (called broadcaster) that works with a number of different backend services such as Redis PUB/SUB, Apache Kafka, and Postgres LISTEN/NOTIFY. It is still in devolpment but seems very promising.
Thanks for suggesting broadcaster! When it will get out of alpha and when I drop python 3.6 support I will definitely consider it in my fastapi app! Even if this doesn't get merged this page can serve good for providing different implementation examples.
Thanks @MrNaif2018 for the amazing work and I'm sorry for not following this PR.
I had some thoughts to redesign nejma and decoupled it from the web framework so it can be easy to maintain and fit nicely with the ASGI ecosystem, but I was overwhelmed with my final year report and startup work.
I think that what broadcaster did represents partly what I wanted to do in my redesign and all the efforts should go there, even tho I'll try to update my draft and discuss it here just for fun.
This pull requests adds RedisLayer class. As starlette endpoint was hardcoded to use default one, now it is possible to select one by setting class attribute. To install redis layer, it is needed to install optional dependency i.e.:
pip install nejma[redis]
I think that all functions(except for utility ones) should become asyncronous. What's the point in having most functions async and only add function sync for example? This pull request is probably not yet ready to merge as some compatibility changes should be decided.