replikativ / konserve

A clojuresque key-value/document store protocol with core.async.
Eclipse Public License 1.0
300 stars 25 forks source link

Always getting a closed channel with bget #18

Closed MrEbbinghaus closed 6 years ago

MrEbbinghaus commented 6 years ago

Hey, I am using the filestore backend (fsync on) and I am trying to store and retrieve files. My version is [io.replikativ/konserve "0.5.0-beta3"]. The storing works, but the retrieving side not so much. Here my minimal example:

(<!! (k/bassoc storage "test" file))
(<!! (k/exists? storage "test")
(<!! (k/bget storage "test" :input-stream))

bget always returns nil, while exists? returns true. I can find the file in my filesystem.

whilo commented 6 years ago

Hey, thanks for opening this issue. The idea of bget is to provide locked binary access to a value for the specified key. That means you should try:

(<!! (k/bget storage "test" (fn [{is :input-stream}] (go (your-read-does-all-work-here is)))))

Right now all data is in memory, so the go-block in there will not be blocked by synchronous IO. I am not sure yet how to provide an asynchronous buffered interface to the value as this does not seem to be popular on the JVM. It might be necessary to use async/thread there in the future, see https://martintrojer.github.io/clojure/2013/07/07/coreasync-and-blocking-io for background information.

(I have just released beta4 which handles the go-block in the callback function properly).

MrEbbinghaus commented 6 years ago

Thank you for your reply, I am a step closer to understanding this, but it doesn't feels right.

There is a callback but you have to return a channel in this callback function, with which you gain nothing, because it is always nil.

(def my-prem (premise))
(k/bget storage "test" (fn [{is :input-stream}] (deliver my-prem is)
@my-prem

This will "work", but it will throw an exception, because in filestore.clj#L234 there is an attempt to take from the premise.

Shouldn't this be working? Why should it return nil? Shouldn't it be a channel with the result of the function?

(<!! (k/bget store "test" (fn [{is :input-stream}] (go is))))
=> nil

.. this is the same as previously:(<!! (k/bget store "test" :input-stream))

Now I use this:

(def c (asnyc/chan))
(k/bget store -142631906 (fn [{is :input-stream}] (go (>! c is))))
=> nil
(<!! c)
=> #object[java.io.ByteArrayInputStream 0x7e5241d6 "java.io.ByteArrayInputStream@7e5241d6"]

I just want the InputStream.

whilo commented 6 years ago

Ok, I see where your confusion is. The point is that the protocol guarantees consistent read access inside of the go-block, but after (go (>! c is)) is executed another writer could change the underlying value. It gives you an explicitly locked context. So the right thing to do is do all critical IO in the go block of the callback. Does this make sense?

MrEbbinghaus commented 6 years ago

Yes! It does make sense!

If anyone ever want to send data with http-kit as their server, this is the way to go:

(with-channel request channel
  (k/bget storage "test" (fn [{is :input-stream}] (go (send! channel is)))

(Maybe you want to update the part in the README.md as well, as that started my initial confusion (<!! (k/bget store "banana" :input-stream)))

whilo commented 6 years ago

Thanks, yes I have updated the README. If there are any further issues, feel free to open another issue.