Open awb99 opened 1 month ago
the interesting thing is that the reader did work. and it is also interesting that subsequent transact were working again.
and it is an exception that occurs frequently
Thank you for reporting! If you happen to have a piece of code that reproduces the issue that would be helpful.
https://github.com/clojure-quant/quanta-market
I have managed to create the errors consistently in this repo when I opened the ticket.
https://github.com/clojure-quant/quanta-market/blob/main/src/quanta/market/trade/db.clj
I call trade-db-start once to create/connect And then (store-message! conn account direction data) from 5 threads at a rate of say 1-20 messages a minute.
So all I do is add new data.
I got the exceptions very consistently. Do you need a more reproduceable setup?
I thought about the issue longer: datahike has a thread lock issue. I am using missionary, which is a functional reactive dataflow library. It comes with its own solution for threads / Fibers etc. Missionary has a whole bunch of Java Code.. so what could be is that the datahike lock issue is due to this. So on the datahike internals it believes that it does not need to lock something because it does not see it as a separate thread.
Or the datahike issue is more simple and simply was not tested with multiple threads and there is a logic error with the lock.
Interesting, I was looking into missionary for https://github.com/whilo/simmie as well, but have not started using it yet. You can see where the deadlock is happening through visual vm for instance. I can try to build code to reproduce the issue, but it would speed things up considerably if you could provide a small test case example.
@awb99 Have you tried using transact! asynchronously? That should avoid the blocking threads problem altogether. It still should not jam missionary with transact
though.
You get a throwable-promise
https://github.com/replikativ/datahike/blob/main/src/datahike/tools.cljc#L56 returned, which does not have an async interface yet. We could probably turn this into a promise-chan and teach it deref, need to look into this more. Alternatively you can also just register a listener to the connection and update whenever it gets invoked. This might not be convenient for you though. Lmk what you think.
What version of Datahike are you using?
0.6.1568"
What version of Java are you using?
(build 19.0.2+-adhoc..source)
What operating system are you using?
linux - guix
What database EDN configuration are you using?
{:store {:backend :file ; backends: in-memory, file-based, LevelDB, PostgreSQL :path path} :keep-history? false :schema-flexibility :write ;default - strict value types need to be defined in advance. ;:schema-flexibility :read ; transact any kind of data into the database you can set :schema-flexibility to read :initial-tx schema ; commit a schema }
Describe the bug
I am using datahike to log messages in/out from websocket connections, so that I have a queriable log database which I use for debugging. When I had just one websocket connection, the db transacting was working ok. When I started 4 websocket connections, transacting brought the following exception:
error {
:cause nil :via [{:type java.lang.InterruptedException :message nil :at [java.util.concurrent.locks.AbstractQueuedSynchronizer acquireSharedInterruptibly "AbstractQueuedSynchronizer.java" 1048]}] :trace [[java.util.concurrent.locks.AbstractQueuedSynchronizer acquireSharedInterruptibly "AbstractQueuedSynchronizer.java" 1048] [java.util.concurrent.CountDownLatch await "CountDownLatch.java" 230] [datahike.tools$throwable_promise$reify__31116 deref "tools.cljc" 67] [clojure.core$deref invokeStatic "core.clj" 2337] [clojure.core$deref invoke "core.clj" 2323] [datahike.api.impl$transact invokeStatic "impl.cljc" 38] [datahike.api.impl$transact invoke "impl.cljc" 26] [quanta.market.trade.db$store_messageBANG invokeStatic "db.clj" 111] [quanta.market.trade.db$store_messageBANG invoke "db.clj" 105]
What is the expected behaviour?
there should not be an exception.
How can the behaviour be reproduced?
I can provide link to the github project if this is of any interest.