microsoft / garnet

Garnet is a remote cache-store from Microsoft Research that offers strong performance (throughput and latency), scalability, storage, recovery, cluster sharding, key migration, and replication features. Garnet can work with existing Redis clients.
https://microsoft.github.io/garnet/
MIT License
10.18k stars 508 forks source link

API - PUBSUB CHANNELS #425

Open rnz opened 4 months ago

rnz commented 4 months ago

Feature request type

sample request

Is your feature request related to a problem? Please describe

Often used

Describe the solution you'd like

Often used

Describe alternatives you've considered

No response

Additional context

No response

badrishc commented 4 months ago

Garnet supports pub/sub already. If you search online for examples of pub/sub in Redis, there should be a lot of hits. Let us know if you run into any bugs or issues.

rnz commented 4 months ago

@badrishc This request about command "PUBSUB CHANNELS"

$ keydb-cli PUBSUB CHANNELS | wc -l
1453015

and

$ keydb-cli INFO | grep -i channel
pubsub_channels:1454151
mardukbp commented 1 month ago

Running GarnetServer from the main branch (with the new --lua option set), the first example in the Celery documentation fails.

To reproduce the error you will need Python and Celery. Install it with:

pip install celery[redis]

Create the file tasks.py:

from celery import Celery

app = Celery('tasks', backend='redis://localhost', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

In the directory where the file is saved open a terminal and execute:

celery -A tasks worker --pool=solo

In the same directory start a new terminal, execute python.exe and enter the following:

> from tasks import add
> add.delay(4,4)

In the first terminal you will get an exception from the Python Redis client:

redis.exceptions.ResponseError: Command # 2 (PUBLISH celery-task-meta-11b19d75-2bbc-496b-8a2f-fe520bcbf8ad {"status": "SUCCESS", "result": 8, "traceback": null, "children": [], "date_done": "2024-08-18T10:39:27.316382+00:00", "task_id": "11b19d75-2bbc-496b-8a2f-fe520bcbf8ad"}) of pipeline caused error: unknown command
mardukbp commented 1 month ago

Using Carmine, a Redis client for Clojure that auto-generates Clojure functions from the official Redis command spec, the following example causes GarnetServer to terminate:

(ns my-app (:require [taoensso.carmine :as car]))

(defonce my-conn-pool (car/connection-pool {}))
(def     my-conn-spec {:uri "redis://localhost/"})
(def     my-conn-opts {:pool my-conn-pool, :spec my-conn-spec})

(defmacro wcar* [& body] `(car/wcar my-conn-opts ~@body))

(def my-listener
  (car/with-new-pubsub-listener (:spec my-conn-spec)
    {"channel*" (fn f2 [msg] (println "f2:" msg))}
    (car/psubscribe "channel*")))

(car/with-open-listener my-listener
  (car/unsubscribe))

This is the command line output of GarnetServer:

Process terminated. Assertion Failed
   at Garnet.server.RespServerSession.NetworkUNSUBSCRIBE(Int32 count) in C:\Users\mardu\git\garnet\libs\server\Resp\PubSubCommands.cs:line 262
   at Garnet.server.RespServerSession.ProcessArrayCommands[TGarnetApi](RespCommand cmd, TGarnetApi& storageApi) in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 570
   at Garnet.server.RespServerSession.ProcessMessages() in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 419
   at Garnet.server.RespServerSession.TryConsumeMessages(Byte* reqBuffer, Int32 bytesReceived) in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 298
   at Garnet.networking.NetworkHandler`2.TryProcessRequest() in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 481
   at Garnet.networking.NetworkHandler`2.Process() in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 337
   at Garnet.networking.NetworkHandler`2.OnNetworkReceive(Int32 bytesTransferred) in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 290
   at Garnet.common.TcpNetworkHandlerBase`2.RecvEventArg_Completed(Object sender, SocketAsyncEventArgs e) in C:\Users\mardu\git\garnet\libs\common\Networking\TcpNetworkHandlerBase.cs:line 120
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Net.Sockets.SocketAsyncEventArgs.<>c.<.cctor>b__173_0(UInt32 errorCode, UInt32 numBytes, NativeOverlapped* nativeOverlapped)
   at System.Threading.PortableThreadPool.IOCompletionPoller.Callback.Invoke(Event e)
   at System.Threading.ThreadPoolTypedWorkItemQueue`2.System.Threading.IThreadPoolWorkItem.Execute()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()
badrishc commented 1 month ago
celery -A tasks worker --pool=solo

Hi @mardukbp, can you try this PR to see if Celery works after this small fix? https://github.com/microsoft/garnet/pull/604

badrishc commented 1 month ago

Using Carmine, a Redis client for Clojure that auto-generates Clojure functions from the official Redis command spec, the following example causes GarnetServer to terminate:

(ns my-app (:require [taoensso.carmine :as car]))

(defonce my-conn-pool (car/connection-pool {}))
(def     my-conn-spec {:uri "redis://localhost/"})
(def     my-conn-opts {:pool my-conn-pool, :spec my-conn-spec})

(defmacro wcar* [& body] `(car/wcar my-conn-opts ~@body))

(def my-listener
  (car/with-new-pubsub-listener (:spec my-conn-spec)
    {"channel*" (fn f2 [msg] (println "f2:" msg))}
    (car/psubscribe "channel*")))

(car/with-open-listener my-listener
  (car/unsubscribe))

This is the command line output of GarnetServer:

Process terminated. Assertion Failed
   at Garnet.server.RespServerSession.NetworkUNSUBSCRIBE(Int32 count) in C:\Users\mardu\git\garnet\libs\server\Resp\PubSubCommands.cs:line 262
   at Garnet.server.RespServerSession.ProcessArrayCommands[TGarnetApi](RespCommand cmd, TGarnetApi& storageApi) in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 570
   at Garnet.server.RespServerSession.ProcessMessages() in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 419
   at Garnet.server.RespServerSession.TryConsumeMessages(Byte* reqBuffer, Int32 bytesReceived) in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 298
   at Garnet.networking.NetworkHandler`2.TryProcessRequest() in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 481
   at Garnet.networking.NetworkHandler`2.Process() in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 337
   at Garnet.networking.NetworkHandler`2.OnNetworkReceive(Int32 bytesTransferred) in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 290
   at Garnet.common.TcpNetworkHandlerBase`2.RecvEventArg_Completed(Object sender, SocketAsyncEventArgs e) in C:\Users\mardu\git\garnet\libs\common\Networking\TcpNetworkHandlerBase.cs:line 120
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Net.Sockets.SocketAsyncEventArgs.<>c.<.cctor>b__173_0(UInt32 errorCode, UInt32 numBytes, NativeOverlapped* nativeOverlapped)
   at System.Threading.PortableThreadPool.IOCompletionPoller.Callback.Invoke(Event e)
   at System.Threading.ThreadPoolTypedWorkItemQueue`2.System.Threading.IThreadPoolWorkItem.Execute()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()

I cannot get carmine to work on WSL, but try commenting out the debug assert on that line (C:\Users\mardu\git\garnet\libs\server\Resp\PubSubCommands.cs:line 262)

mardukbp commented 1 month ago
celery -A tasks worker --pool=solo

Hi @mardukbp, can you try this PR to see if Celery works after this small fix? #604

It works!

mardukbp commented 1 month ago

I cannot get carmine to work on WSL, but try commenting out the debug assert on that line (C:\Users\mardu\git\garnet\libs\server\Resp\PubSubCommands.cs:line 262)

That works!

Actually, I tried out Carmine on Windows. I installed Clojure using the Windows installer. Then I created the file deps.edn with the contents

{:deps
 {com.taoensso/carmine {:mvn/version "3.4.1"}}
}

And then I started the Clojure REPL (clj.exe) on the same directory. I also used the VS Code Extension Calva to execute code directly in the REPL.

badrishc commented 1 month ago

I cannot get carmine to work on WSL, but try commenting out the debug assert on that line (C:\Users\mardu\git\garnet\libs\server\Resp\PubSubCommands.cs:line 262)

That works!

Actually, I tried out Carmine on Windows. I installed Clojure using the Windows installer. Then I created the file deps.edn with the contents

{:deps
 {com.taoensso/carmine {:mvn/version "3.4.1"}}
}

And then I started the Clojure REPL (clj.exe) on the same directory. I also used the VS Code Extension Calva to execute code directly in the REPL

Got it, will try this later. Thanks!