oliyh / re-graph

A graphql client for clojurescript and clojure
460 stars 39 forks source link

Mutations and Subscriptions #64

Closed Folcon closed 4 years ago

Folcon commented 4 years ago

So I'm digging into hn-clj-pedestal-re-frame as I've been trying to understand how subscriptions work.

I've been making edits and fixes as I go here.

My primary experience with using websockets in clojure/script has been sente, however I've been wanting to try integrating lacinia into my application and have been looking for tutorials to get a clear understanding on how it works.

However it seems that some changes were made as hn-clj-pedestal-re-frame is using a fork of re-graph which basically removes it's ability to send mutations over a websocket. Diff is here: https://github.com/oliyh/re-graph/compare/master...promesante:hn-clj-pedestal-re-frame.

I personally would like to understand how this is all supposed to work and it's rather frustrating that an example doesn't really seem to exist.

I did find your historical reply, but unfortunately the example you linked seems to require that I also understand kafka which isn't a thing I want to jump into yet.

1) hn-clj-pedestal-re-frame is using a mutation to perform a login, by my currently limited understanding of things, this appears to be only advisable to perform over HTTP and not through re-graph as I'm not sure that the mutation being sent can be returned without setting up some sort of subscription, in which case how do you prevent the credentials token response from being sent to all the people who are listening to that subscription?

2) How complex is this source-stream? Could I just create an atom and have the source-stream be a watcher (using add-watch?) Is it a core-async go block? What would be a minimal example?

Sorry about dropping this here, but I've been spending the last few days trying to make heads or tails of this...

Folcon commented 4 years ago

I can't believe I didn't think to look for tests, you're a star!

However, now I'm really confused.

I replaced line 167 to line 172 with the code below:

(defn- stream-pets [context args source-stream]
  (let [{:keys [count]} args
        runnable ^Runnable (fn []
                             (dotimes [i count]
                               (source-stream {:i i})))
        streamer (Thread. runnable "stream-pets-thread")]
    (.start streamer)
    #(.stop streamer)))

(defn streamer-map
 [component]
  (let [db (:db component)]
    {:subscription/new-link stream-pets #_(new-link db)
     :subscription/new-vote stream-pets #_(new-vote db)}))

And now things appear to work? As far as I can tell the prior issue was as you mentioned due to the subscriptions disconnecting on the server (which interestingly doesn't close the websocket).

So does the streamer only exist to tell the clients to poll? After all as far as I can tell all your stream-pets function is doing is counting? It doesn't appear to be sending to the client the new data.

Additionally, I am now getting null pointer exceptions:

Exception in thread "stream-pets-thread" java.lang.NullPointerException
    at clojure.lang.RT.longCast(RT.java:1277)
    at hn_clj_pedestal_re_frame.schema$stream_pets$runnable__19332.invoke(schema.clj:172)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.base/java.lang.Thread.run(Thread.java:830)
Exception in thread "stream-pets-thread" java.lang.NullPointerException
    at clojure.lang.RT.longCast(RT.java:1277)
    at hn_clj_pedestal_re_frame.schema$stream_pets$runnable__19332.invoke(schema.clj:172)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.base/java.lang.Thread.run(Thread.java:830)
00:02:48.237 ERROR com.walmartlabs.lacinia.pedestal.subscriptions - {:event :com.walmartlabs.lacinia.pedestal.subscriptions/error, :line 499}
java.io.EOFException: Disconnected
    at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.disconnect(AbstractWebSocketConnection.java:323)
    at org.eclipse.jetty.websocket.common.io.DisconnectCallback.failed(DisconnectCallback.java:36)
    at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.close(AbstractWebSocketConnection.java:200)
    at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.remoteClose(AbstractWebSocketConnection.java:279)
    at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.incomingFrame(AbstractEventDriver.java:109)
    at org.eclipse.jetty.websocket.common.WebSocketSession.incomingFrame(WebSocketSession.java:319)
    at org.eclipse.jetty.websocket.common.extensions.AbstractExtension.nextIncomingFrame(AbstractExtension.java:147)
    at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.nextIncomingFrame(PerMessageDeflateExtension.java:112)
    at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.incomingFrame(PerMessageDeflateExtension.java:71)
    at org.eclipse.jetty.websocket.common.extensions.ExtensionStack.incomingFrame(ExtensionStack.java:201)
    at org.eclipse.jetty.websocket.common.Parser.notifyFrame(Parser.java:226)
    at org.eclipse.jetty.websocket.common.Parser.parse(Parser.java:255)
    at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.readParse(AbstractWebSocketConnection.java:581)
    at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:434)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
    at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129)
    at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:388)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
    at java.base/java.lang.Thread.run(Thread.java:830)

user=> Exception in thread "stream-pets-thread" java.lang.NullPointerException
    at clojure.lang.RT.longCast(RT.java:1277)
    at hn_clj_pedestal_re_frame.schema$stream_pets$runnable__19332.invoke(schema.clj:172)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.base/java.lang.Thread.run(Thread.java:830)
Exception in thread "stream-pets-thread" java.lang.NullPointerException
    at clojure.lang.RT.longCast(RT.java:1277)
    at hn_clj_pedestal_re_frame.schema$stream_pets$runnable__19332.invoke(schema.clj:172)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.base/java.lang.Thread.run(Thread.java:830)

Including when I attempted to submit a new link:

Exception in thread "async-dispatch-4" com.fasterxml.jackson.core.JsonGenerationException: Cannot JSON encode object of class: class java.lang.NullPointerException: java.lang.NullPointerException
    at cheshire.generate$generate.invokeStatic(generate.clj:152)
    at cheshire.generate$generate.invoke(generate.clj:116)
    at cheshire.generate$generate.invokeStatic(generate.clj:122)
    at cheshire.generate$generate.invoke(generate.clj:116)
    at cheshire.core$generate_string.invokeStatic(core.clj:74)
    at cheshire.core$generate_string.invoke(core.clj:49)
    at cheshire.core$generate_string.invokeStatic(core.clj:55)
    at cheshire.core$generate_string.invoke(core.clj:49)
    at com.walmartlabs.lacinia.pedestal.subscriptions$xform_channel$fn__23520$state_machine__13048__auto____23527$fn__23529.invoke(subscriptions.clj:47)
    at com.walmartlabs.lacinia.pedestal.subscriptions$xform_channel$fn__23520$state_machine__13048__auto____23527.invoke(subscriptions.clj:47)
    at clojure.core.async.impl.ioc_macros$run_state_machine.invokeStatic(ioc_macros.clj:978)
    at clojure.core.async.impl.ioc_macros$run_state_machine.invoke(ioc_macros.clj:977)
    at clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invokeStatic(ioc_macros.clj:982)
    at clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invoke(ioc_macros.clj:980)
    at clojure.core.async.impl.ioc_macros$take_BANG_$fn__13066.invoke(ioc_macros.clj:991)
    at clojure.core.async.impl.channels.ManyToManyChannel$fn__7885$fn__7886.invoke(channels.clj:95)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at clojure.core.async.impl.concurrent$counted_thread_factory$reify__7754$fn__7755.invoke(concurrent.clj:29)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.base/java.lang.Thread.run(Thread.java:830)

I'm assuming for the new link there's something wrong with the mutate as it's giving me a jackson null pointer exception. But not clear on the other two.

I've not deployed it as I still have no idea what's going on...

Folcon commented 4 years ago

Ok, one of these is from casting a nil to a long, as args can be nil.

(defn- stream-pets [context args source-stream]
  (let [{:keys [count]} args
        runnable ^Runnable (fn []
                             (when args
                               (dotimes [i count]
                                 (source-stream {:i i}))))
        streamer (Thread. runnable "stream-pets-thread")]
    (.start streamer)
    #(.stop streamer)))

Still not clear what this is doing, nor how to get rid of this error:

00:02:48.237 ERROR com.walmartlabs.lacinia.pedestal.subscriptions - {:event :com.walmartlabs.lacinia.pedestal.subscriptions/error, :line 499}
java.io.EOFException: Disconnected

I figured out where the jackson error is coming from as well, there's an assumption baked in that session state will be sent over, looking at switch to using hato, I was hoping that incorporating those changes might help give me access to the session data, but that appears to not be the case.

oliyh commented 4 years ago

That hn-clj-pedestal-re-frame project seemed to have some real issues, but I believed all of them to be on the server side and not re-graph. As I stated a few times, re-graph supports queries and mutations on both HTTP and websockets equally, there should be no need to only send mutations via HTTP. The integration server in this repo, or the test server in the lacinia-pedestal codebase, have much simpler examples and demonstrate that this is possible and supported.

Streamers are a way of watching data such that you can send a message to the subscribing client when they change. That's why they launch a thread. The source-stream function is provided by lacinia-pedestal, and you should call it with a non-nil value to indicate that the data has changed, the query should be re-run and the result sent to the client.

Starting from the top: The schema says that the pets subscription has a count argument. It has a streamer called stream-pets and a resolver called resolve-pets:

{:pets
     {:type '(list :Pet)
      :stream stream-pets
      :resolve resolve-pets
      :args {:count {:type 'Int :default 5}}}}

So when we call it, we specify the count as an argument as well as asking for the fields we are interested in:

(re-graph/subscribe :all-pets "MyPets($count: Int) { pets(count: $count) { id name } }" {:count 5} ...)

This invokes the streamer with args looking like this {:count 5}. This specifies that we want to call source-stream 5 times.

When source-stream is called with a non-nil argument, it runs the resolver resolve-pets. This runs the query and sends the result to the client down the websocket. Because we call it 5 times, 5 messages will be sent. The callback on the client side receives all 5 of them, as asserted here

(is (= 5 (count responses)))

Websockets and HTTP are just transports. GraphQL doesn't talk about transports, it's purely an abstract query language and pattern. Due to the semantics of subscriptions they don't work over HTTP, but otherwise both websockets and HTTP support queries and mutations with the same semantics, it's just a different way of communicating between the server and client. You will see this in the websocket mutation test.

When a streamer calls source-stream with nil, that is a signal that the data will no longer change and to terminate the subscription (a subscription "stop" message will be sent to the client, the same as if the client had unsubscribed). This does not terminate the websocket connection, just the subscription.

Your earlier error was because you were probably not providing the count argument and it was calling (dotimes [i nil]) and blowing up there. Replace count with a hardcoded 5 or something. The second error was jackson trying to encode that exception into a response for the client, and understandably failing.

I suggest you start playing with the integration server provided here to understand how to use streamers and websockets, rather than your more complicated project. As the integration test shows, the subscription can be started and send multiple messages to the client as desired.

I feel the documentation is lacking in lacinia-pedestal for how to work with subscriptions, so I hope this helps. I don't believe this is an issue with re-graph so I'm going to close this issue. Feel free to continue asking questions however and I will try to help.

gklijs commented 4 years ago

Maybe my implementation might be helpful. I use Kafka in between, so the subscriptions basically start with optionally sending something using Kafka, and have a filter on the messages coming in from Kafka, supplying the stream. https://github.com/openweb-nl/kafka-graphql-examples/blob/master/graphql-endpoint/src/nl/openweb/graphql_endpoint/money_transfer_service.clj

oliyh commented 4 years ago

I realise I may have been hasty after a conversation with @gklijs and have opened #65. However if you are using lacinia this should not affect you.

Folcon commented 4 years ago

Hi @oliyh,

Thanks for chiming in, to be clear, I'm only using pedestal because that's what it seems lacinia utilises and the example project hn-clj-pedestal-re-frame was using them.

Personally, I've gotten a reasonably good solution working in sente with http-kit. I use re-frame on the client-side, so re-graph seems like it's right up my alley. If I could figure out how to do graphql within that context I'd be doing that =)...

The tricky bit is that those pieces don't quite fit together and I'm only slowly getting a grip on what they do so that I can figure out if I need to ditch what I've been doing so far and start using pedestal or if I can make a few changes and get lacinia working in my existing project.

I'm going to give your longer form answer a careful re-read to see if I've understood what you're saying.

Folcon commented 4 years ago

Thanks @gklijs, a different perspective is useful =)...

One thing I've been wondering about is session data, should I manually be passing the a session token as part of the mutator? Or perhaps disconnecting and reconnecting with credentials somehow?

I'm assuming terminating the subscription closes it for just the client that called terminate, not all connected clients? This appears to be the case =)...

With regards to the streamer function, I suppose the expected use-case is something like start a thread and if the underlying data gets updated then call source-stream? So I'll have to do some bookkeeping on my end to notify streamers that the underlying data changed? So for example postgres notifications?

I'm surprised that we're spawning a Thread. for each connected client * each subscription? Shouldn't this be pooled? Or replaced with core.async?

gklijs commented 4 years ago

Not entirely sure a Thread. is spawned for every subscription. But it's heavy, from the performance tests I ran things start getting worse from about 300 concurrent subscriptions. There are several ways to deal with session tokens, but there is nothing GraphQL specific. In micronaut-graphql I store the initial Http request used to create the websocket. So with a custom resolver any token from that request can be used as part a subscription.

oliyh commented 4 years ago

The source-stream fn puts a value on a channel, so it's lightweight and you could call it from inside a go block. If you can check a data source without doing IO then you don't need a full thread, you could do that inside a go block too (if for example you had a callback, or a channel representing change in your data source). It was just a simple example without requiring something complicated like core.async. Knowing when and how the underlying data changes is up to you, you just need to call source-stream when it does.

With HTTP transport you have cookies and headers that you can use for authorisation. For websockets you have both the original upgrade request, which can contain headers and cookies which you can associate with the context, or you can send an initial payload to the server containing credentials (re-graph supports this) which you can then store on the context for authorisation of the commands that are sent subsequently.

Folcon commented 4 years ago

Right so it is my job to take a subscription request and setup something that tracks changes, preferably in as lightweight a way as possible until someone sends a corresponding unsubscribe. And thanks @gklijs for the numbers, I'll have to see if I can figure out a way to deal with a reasonable number of multiple subscribers then.

I noticed that you didn't mention anything about using the websocket for login and then propagating that session, so perhaps if I send a mutator over the websocket to handle login I'll have to write an interceptor that does something with that, perhaps setting a value in the local session store and also having re-graph set a token in the client session store.

I realised that one of the things that was really confusing me is that lacinia-pedestal doesn't run any interceptors for websocket connections by default, so I wasn't getting any app-context or anything else happening. I'll have to work out how to match on a websocket message so that I can put in an app-context into the websocket messages or at least some way of getting session state.