Open bgamari opened 7 years ago
It looks like what you need is the "hasql-cursor-transaction" package, which provides a lower level of abstraction.
Also can you please provide details on your use-case?
Nikita Volkov notifications@github.com writes:
It looks like what you need is the "hasql-cursor-transaction" package, which provides a lower level of abstraction.
That looks like a reasonable solution although a
FoldM
approach would be nice.
In my case I am reading a large volume of records from a database and slurping them into a compact, transiently-mutable datastructure
If it's mutable I guess it implies that the monad that you need is either IO or ST and that makes me suspicious about how you plan to reach out to that monad from either of the context monads that you get with the current API (Session or Transaction). Keep in mind that the Transaction monad prohibits lifting of IO for valid reasons.
It would be beneficial for the discussion, if you provided an end-example of how you would use the presumed FoldM implementation that you expect.
I'm also quite keen to see this happen, I'm looking into the upcoming streaming result work in servant, which has an interface where you need to provide a function which can accept a function to pass in data to be streamed to the client (something along the lines of ((ByteString -> IO b) -> IO a)
where the user is supposed to pass in byte strings as they are produced to send to the client. As it stands, there is no way to use cursors to send data streamed from the database to the client as it's produced, requiring the whole result to be placed in memory before being serialised and sent out.
I can't really see the point of the hasql cursor work if this sort of workflow isn't possible, I would've thought that streaming data from the database would be one of the primary uses for it, but I can't see how, since none of the relevant monads implement MonadIO or anything else that would allow for interleaved reading and writing.
The reason for the existing limitations is that the Transaction abstraction from the "hasql-transaction" package doesn't allow IO. It is so because it automates the concurrency conflict resolution by reexecuting comletely in case of concurrency conflicts. This means that if it allowed side effects, they'd possibly be executed multiple times and you would never know how many.
However I see your point. This means that such an abstraction doesn't fit your case and what you need is manual control over the concurrency errors. No problem, just execute your transaction manually. After all, it's just about wrapping your session in the "BEGIN" and "COMMIT" statements.
Having your use case now, I think I have some rough ideas for the update of the "hasql-transaction" design. I wouldn't bet on getting them released any time soon though, so I suggest you to go on solving your problem manually.
Thanks Nikita, that's good news. Our current app is performing well enough at the moment, but I'm sure it would perform better if we could stream the data. In case it helps you at all, the PR for the servant streaming support is https://github.com/haskell-servant/servant/pull/836, with the particular part of the interface that's relevant here being the StreamGenerator
defined at https://github.com/haskell-servant/servant/pull/836/files#diff-b9ed6ea5569e0ebb1cdcf2f89e536e6cR32
I believe the plan is to rewrite/add functionality to packages such as servant-cassava to support this interface.
I am also running into similar difficulties. I am fetching O(3 million) rows from the database to generate a report that I am writing to the file-system. I don't need retriable transactions, I just need to run in I/O and be able to output rows as they are fetched in constant memory.
A foldM
API (inside a non-retriable transaction) would be ideal. When you say "just execute your transaction manually", what do you mean? I know how begin and end transactions, but what I don't know how to do is fetch and stream rows in constant space with Hasql.
In other words, what goes in the "session"? Manual calls to "DECLARE CURSOR", "FETCH", ... bypassing this API?
FWIW, in my case the row decoder generates an "IO ()" action that output the row. My foldM
would just be (>>)
, but what I'm missing is a clean way to get the fold to happen as the query results are streaming in.
Indeed a manual loop using "FETCH FORWARD" runs in constant space. Something along the lines of:
flip HS.run conn $ do
HS.sql "BEGIN"
openReportCursor querySql params
fetchLoop
HS.sql "END"
where
fetchSql = "FETCH FORWARD 1000 FROM report_cursor"
fetchLoop dec = do
rows <- prepared fetchSql () def decoder
if (V.null rows)
then return ()
else forM_ rows liftIO >> fetchLoop decoder
decoder = HD.rowVector rowDecoder
prepared sql a enc dec = HS.statement a $ HQ.Statement sql enc dec True
In case it's helpful, I also needed a way to stream the results via IO, since I'm using a cursor not to load millions of
cursorQuery :: Int -> ByteString -> HasqlE.Params a -> HasqlD.Row b -> a -> ([b] -> App ()) -> App ()
cursorQuery batchSize query params decoder input action = do
env <- ask
cursor_name <- ("cursor_" <>) <<$>> replicateM 10 $ randomRIO ('a', 'z')
Env.sqlSession $ do
Hasql.sql "BEGIN READ ONLY"
catchError (do
cursor <- Hasql.statement input (declareCursorQuery cursor_name)
loop env cursor cursor_name
) (const $ do
Hasql.sql [i|"CLOSE ${cursor_name}"|]
Hasql.sql "ROLLBACK"
)
cleanup cursor_name
where
cleanup cursor_name = do
Hasql.sql [i|"CLOSE ${cursor_name}"|]
Hasql.sql "COMMIT"
declareCursorQuery cursor_name =
Hasql.Statement
[i|DECLARE ${cursor_name} CURSOR FOR ${query}|]
params Hasql.noResult True
fetchQuery cursor_name = Hasql.Statement
[i|FETCH FORWARD ${batchSize} FROM ${cursor_name}|]
HasqlE.noParams (HasqlD.rowList decoder) True
loop env cursor cursor_name = do
rows <- Hasql.statement () (fetchQuery cursor_name)
liftIO $ Env.runApp env $ action rows
unless (null rows) $ loop env cursor cursor_name
Sometimes there is just no avoid effects. The
foldl
packages'FoldM
type captures this variety of fold nicely.