paurkedal / ocaml-caqti

Cooperative-threaded access to relational data
https://paurkedal.github.io/ocaml-caqti/index.html
GNU Lesser General Public License v3.0
309 stars 36 forks source link

Response's streams #21

Closed NathanReb closed 5 years ago

NathanReb commented 5 years ago

First I'd like to thank you for releasing Caqti as it's proven to be a great library so far and saved me quite a lot of time and effort!

Currently it seems the only way to consume a response without allocating it all is to use fold, fold_s or iter_s.

While the current API seems to be enough for simple operations it might be a bit limited as your response handling logic complexifies.

I mostly use Lwt and am not too familiar with Async but they both have streams that come with a whole bunch of helper functions to filter, map and iterate through them. Being able to get a row stream from a response would definitely make things easier.

Would you consider adding such functions to the API?

Exposing functions that directly return such streams might not be ideal as it could turn out to be quite error prone with regards to deadlocks and other connection issues. In that case I guess exposing some sort of wrapper in CONNECTION modules such as:

val with_response_stream :
  ('a, 'b, [< `Zero | `One | `Many ]) Caqti_request.t ->
  ('b stream -> ('c, 'e) result future) ->
  'a ->
  ('c, [> Caqti_error.call_or_retrieve ] as e) result future

So that users could still benefit from the stream helpers while not being able to mess things up too much.

From a quick look at the code it seems like this should be fairly easy to implement although it requires some changes on Caqti_driver_sig.System_unix and a bit of plumbing to make it work with both Lwt and Async.

I'm happy to provide a PR if you feel like this would be a worthy addition to caqti. Please let me know what you think!

paurkedal commented 5 years ago

Good to hear.

Yes, I think adding sequence would be a good addition. I can see there is an async stream similar to lwt. For the blocking instance Seq might be a good option. With the seq compatibility package we should be able to retain support for ocaml >=4.04.

A PR will be appreciated. We need to guard the usage of the stream, but the call could do that job, so it seem sufficient to add a function to acquire the stream to Caqti_response_sig.S. But feel free to add a convenience function to Caqti_connection_sig.S if using call seems to verbose. As for the System_unix extension I think one mainly needs to stream type and a function to construct it, and picking the right signature for the latter and implementing it may need some consideration and plumbing. Feel free do discuss any details here.

(Note that I have not implemented single-row mode for PostgreSQL yet, so one needs to use LIMIT and OFFSET to process big data sets, for now. For MariaDB the row-fetching function returns a future, so that may be okay, though I haven't check what happens under the hood.)

NathanReb commented 5 years ago

Great!

Indeed I agree Seq is a much better candidate for Caqti_blocking than the stdlib's Stream.

I'll start working on that!

paurkedal commented 5 years ago

There is a way to get an lwt-stream (at least) from the current interface, though it has a bit of overhead. There is a complete example

open Lwt.Infix

let q =
  Caqti_request.collect Caqti_type.unit Caqti_type.int
  "SELECT generate_series(0, 9999)"

let create_caqti_lwt_stream () =
  let stream, bp = Lwt_stream.create_bounded 16 in
  let push x =
    try%lwt bp#push x >|= fun () -> Ok ()
    with Lwt_stream.Closed -> Lwt.return_ok () in
  let close () = bp#close; Lwt.return_unit in
  (stream, push, close)

let main = Lwt_main.run begin
  let%lwt db =
    Caqti_lwt.connect (Uri.of_string "postgresql://") >>= Caqti_lwt.or_fail in
  let module Db = (val db) in

  let stream, push, close = create_caqti_lwt_stream () in
  let%lwt () = Db.iter_s q push () >>= Caqti_lwt.or_fail
  and ys = 
    begin
      let%lwt () = Lwt_stream.njunk 20 stream in
      Lwt_stream.nget 4 stream
    end
    [%lwt.finally close ()]
  in
  assert (ys = [20; 21; 22; 23]);
  Lwt.return_unit
end

Something similar should be possible for async, but not for blocking. It would be better for push to fail to avoid iterating over everything, that gives a bit extra overhead in handling the Db.iter_s result.