launchbadge / sqlx

🧰 The Rust SQL Toolkit. An async, pure Rust SQL crate featuring compile-time checked queries without a DSL. Supports PostgreSQL, MySQL, and SQLite.
Apache License 2.0
13.27k stars 1.26k forks source link

Query pipelining #408

Open farnoy opened 4 years ago

farnoy commented 4 years ago

Currently, query execution functions require either a const borrow of a connection pool, or a mutable borrow of a Transaction. This makes it impossible to prepare multiple queries and execute them in a pipelined fashion within a single transaction. tokio-postgres has this and it's an impressive feature.

mehcode commented 4 years ago

I'm wary of implicit query batching. In tokio-postgres, batching happens automatically if multiple futures from query executions are polled concurrently.

Supporting query pipelining is fairly simple at the protocol level, nearly all major sync and async clients (in other languages, in Rust, it seems only tokio-postgres does) support pipelining.

It's not the priority currently as we tend to write API servers and execute at most 2-3, complex queries per request.

With that said, I do want to see this supported in SQLx in an explicit form once we figure out the details of how that form looks. What follows is what I'm currently thinking so it's available for discussion.

// Batch or Pipeline or ?
let b = Batch::new(&mut conn);

let q1 = sqlx::query("SELECT 1").fetch_one(&b);
let q2 = sqlx::query("SELECT 2").fetch_all(&b);
let q3 = sqlx::query("SELECT 3").fetch_optional(&b);

// run all queries in a try_join_all
// this would entirely be optional sugar over using try_join_all on all the queries
b.join().await?;

// these now return immediately
let v1 = q1.await?;
let v2 = q2.await?;
let v3 = q3.now_or_never().unwrap()?;
farnoy commented 4 years ago

I'm also not looking for any kind of implicit solution, I think we're in complete agreement here. Instead of a new object Batch, could we make it work with the existing Transaction? Simultaneous queries against a pool are already executed concurrently by the pooling logic.

On Sun, Jun 14, 2020, 12:16 AM Ryan Leckey notifications@github.com wrote:

I'm wary of implicit query batching. In tokio-postgres, batching happens automatically if multiple futures from query executions are polled concurrently.

Supporting query pipelining is fairly simple at the protocol level, nearly all major sync and async clients (in other languages, in Rust, it seems only tokio-postgres does) support pipelining.

It's of no interest to us currently as we tend to write API servers and execute at most 2-3, complex queries per request.

With that said, I do want to see this supported in SQLx in an explicit form once we figure out the details of how that form looks. What follows is what I'm currently thinking so it's up for discussion.

let b = Batch::new(&mut conn); let q1 = sqlx::query("SELECT 1").fetch_one(&b);let q2 = sqlx::query("SELECT 2").fetch_all(&b);let q3 = sqlx::query("SELECT 3").fetch_optional(&b); // run all queries in a try_join_all// this would entirely be optional sugar over using try_join_all on all the queries b.join().await?; // these now return immediatelylet v1 = q1.await?;let v2 = q2.await?;let v3 = q3.now_or_never().unwrap()?;

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/launchbadge/sqlx/issues/408#issuecomment-643685102, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACUNUACMCRCT6YQZYKVYBDRWP3FBANCNFSM4N5EY65Q .

unrealhoang commented 4 years ago

@mehcode In your example, if q2.await? is called before q1.await?, you have to either allocate to store the result of q1 or skip/ignore the response of q1 in the response stream and return error on q1.await?. Also you need to be in charge of mapping between request / response which is also quite cumbersome.

My suggestion: As batch/pipelining is only make sense in case of sending a lot of query constantly, you push all those responsibility back to the user. Batch should be a Sink<Query> for sending side and a Stream<Item = Cursor> for receiving size. With that we will be able to:

// Batch insert
// futures::stream::StreamExt
let (s, r) = Batch::new(&mut conn).split();

let item_stream = stream::iter(items.into_iter().map(|i| sqlx::query("INSERT ... RETURNING").bind(i)));
let send_all = s.send_all(&mut item_stream);
let result = r.map(|cursor| cursor.next()).collect();

send_all.join(result).await

Or for infinite pipeline

let (s, r) = Batch::new(&mut conn);

// forward an infinite stream of input to DB
let stream_in = socket.map(|data| sqlx::query("...").bind(data)).forward(s);
// forward an infinite stream of output to socket
let stream_out = r.map(|cursor| rows_to_data(cursor)).forward(socket);

stream_in.join(stream_out).await
Cocalus commented 4 years ago

I tend to use futures::stream::StreamExt::for_each_concurrent a lot when I need to batch any async work. Would this Batch type work with that API. tokio-postgres's implicit approach should.

mehcode commented 4 years ago

Instead of a new object Batch, could we make it work with the existing Transaction?

We can make pipeline creation generic similar to how transaction creation is, so you can start a new pipeline with tx.pipeline() or conn.pipeline().


In your example, if q2.await? is called before q1.await?, you have to either allocate to store the result of q1 or skip/ignore the response of q1 in the response stream and return error on q1.await?

In my idea of how this would be implemented, there is no difference in behavior depending on the order you await the queries. The batch ran them all at once and they are all thin wrappers over oneshot channels.


Also you need to be in charge of mapping between request / response which is also quite cumbersome. Batch should be a Sink for sending side and a Stream for receiving size. With that we will be able to:

It may not seem like it but we actually need something similar to Sink<impl Execute> and Stream<?>. Its' not possible to express the stream side uniformly as how we execute the query needs to be chosen up front.

With the same API I proposed, this is possible:

// I'm now leaning towards `Pipeline` as a type name over `Batch`
let p = Pipeline::new(&mut conn);
let r = Vec::new();

for _ in 0..10 {
  r.push(query("INSERT INTO ...").bind(10).bind(20).fetch_one(&p));
}

p.join().await?;

for cursor in r {
  let row = cursor.await?; // result from fetch_one

  // do something with that row
}

Please note, that just like the spawn API, the returned futures from using Pipeline as an executor do not need to be awaited.

let p = Pipeline::new(&mut conn);

for _ in 0..10 {
  r.push(query("INSERT INTO ...").bind(10).bind(20).fetch_one(&p));
}

p.join().await?;

The examples I've shown so far are "prepare a ton of queries, then execute them all in one pipeline". There is another use case which is "setup a pipeline and let me shove queries at it and keep them executing".

// let p = conn.pipeline();
let p = Pipeline::new(&mut conn);

// spawns us in the background now so we can execute queries while more are added
p.spawn();

for _ in 0..10 {
  p.execute("INSERT INTO ..");
}

p.join().await?;

@Cocalus We could do something like:

let p = conn.pipeline(None); // no limit
let p = conn.pipeline(5); // max 5 queries at once
farnoy commented 4 years ago

@mehcode What would be the type safety story for SELECT queries? Your example seems to deal with a Vec of homogeneous queries and runs them in a pipelined fashion. What about heterogeneous read only queries?

Could something like this work?

let q1 = sqlx::query("SELECT 1").fetch_one(&b);
let q2 = sqlx::query("SELECT 2").fetch_all(&b);

let (res1, res2) = conn.pipeline((q1, q2)).await?;

I think that to have type safety, this pipeline needs to know the shape (how many queries) and the type of each.

Either way, this feels less ergonomic than tokio-postgres. I don't share your view that using let (res1, res2) = join!(q1, q2); is too implicit.

lacasaprivata2 commented 3 years ago

+1

BratSinot commented 2 years ago

Either way, this feels less ergonomic than tokio-postgres. I don't share your view that using let (res1, res2) = join!(q1, q2); is too implicit.

Imho, when I used tokio-postgress it was non-obvious.

DXist commented 2 years ago

I've made an implementation attempt and was invited to this discussion.

The implemented pipeline runs queries in a single transaction. The main motivation was not to run explicit transactions against CockroachDB and rely on its mechanism of automatic transaction retries. The mechanism handles serialization failures of concurrent implicit transactions.

My current needs to run several related INSERTs transactionally are covered just by pipeline.execute() but I also provided fetch_pipeline method.

The raw streaming API is definitely not easy and not user friendly but doesn't set any limits on how query results are processed.

DXist commented 2 years ago

Guys, I extended this discussion to explicit transaction pipelines and providing the API that doesn't return stale data - https://github.com/launchbadge/sqlx/issues/2082.

ewoolsey commented 8 months ago

I'd like to bump this! Would be an awesome feature.

spencerbart commented 6 months ago

I'm a huge fan of SQLx! Adding this would make it even more killer! I am happy to help out