sfackler / rust-postgres

Native PostgreSQL driver for the Rust programming language
Apache License 2.0
3.52k stars 446 forks source link

Example using FromSql with tokio_postgres query_raw stream? #1140

Open rodya-mirov opened 6 months ago

rodya-mirov commented 6 months ago

Hello, I'm currently a beginner with tokio_postgres so I may have missed something, but the documentation on what to do with the FromSql trait (as opposed to implementing it, which has a derive macro and is fine) is a little unclear.

I am reading a very large amount of data from a postgres database where there are a smallish number of columns, some of which are json blobs which contain a large number of fields I don't need. If I was receiving this as a postbody into a webserver or etc. I would just use serde, which would ignore those fields as I'm deserializing them, only keeping the ones I explicitly asked for, which would be great from a performance perspective.

The analogue here seems to be the FromSql trait, but I'm having trouble using it. Instead, the query_raw function (which seems to be the only way to stream results in) returns Row objects, which have already split the incoming data into Jsonb objects, which I then have to manually re-parse, clone from, and pay the drop fee.

This just doesn't feel right, since the FromSql trait is already "out there" and seems like I should be able to pass it in as a generic parameter, but I'm at a loss as to where to actually use it. I was not able to find any explicit mention of the trait in the documentation, except how to implement it (which is straightforward), and one note on one function indicating it could not be used there (but implied that it could be used other places, and this was the exception).

Am I missing something? Is there a missing feature, or am I holding it wrong?

For the record, this is how I'm reading data:

    let params: Vec<String> = vec![];
    let results_stream: RowStream = pg_conn.query_raw(READ_MCN_REPORTS, params.as_slice()).await?;

    pin_mut!(results_stream);

    let mut results: Vec<MyType> = Vec::new();

    while let Some(result) = results_stream.next().await {
        let row = result?;
        let report: MyType = manually_convert(row);
        results.push(report);

        // etc.
    }

Appreciate any assistance. Thank you!

sfackler commented 6 months ago

You can deserialize directly through Serde with the Json type:

let value = row.get::<Json<MyType>>(0).0;

query_raw is not the best way to pull very large row counts though. You could use a portal which would let you early-exit the query more cleanly: https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/tests/test/main.rs#L771-L812. If you know you do want all results, a COPY query is another option: https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/tests/test/binary_copy.rs#L142-L169

rodya-mirov commented 6 months ago

~The serde solution is causing some issues, I think due to double deserialization -- I've got a few fields which are strings (I have verified this in the database directly! It really is a string!), but occasionally are parseable as numbers, so the Json parser is helpfully turning them into floats, which then causes a process-killing panic (not a Result::Err for some reason) when the intended struct wants a float. I can convert back to a string with a custom serde deserializer, but if it has a decimal point, in some cases it's causing issues ("25.1" turning to "25.09999" or whatever).~

Edit -- the aggressive string-to-float issue is a data issue, not a tokio_postgres issue, so ignore that for now. Some older data incorrectly codes it as a float, not a string, which is my problem to deal with.

I'll give the copy query a shot. Thanks.

sfackler commented 6 months ago

use try_get instead of get and you'll get an Err back on parse errors.

sfackler commented 6 months ago

I think that's just down to how Postgres normalizes JSON values in its backend.

rodya-mirov commented 6 months ago

You can deserialize directly through Serde with the Json type:

let value = row.get::<Json<MyType>>(0).0;

query_raw is not the best way to pull very large row counts though. You could use a portal which would let you early-exit the query more cleanly: https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/tests/test/main.rs#L771-L812. If you know you do want all results, a COPY query is another option: https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/tests/test/binary_copy.rs#L142-L169

Is the copy query useful simply because you can supply types? Or does it perform better than a rowstream? I don't actually want all rows, but I do want all rows that match the search query (which is a substantial number of individually-large rows).

sfackler commented 6 months ago

COPY queries use a different mode of the communication protocol optimized for very large bulk transfers: https://www.postgresql.org/docs/current/sql-copy.html.

I'm not sure where the perf cutoff between normal queries and copy queries would be - you'd probably need to do some testing.

rodya-mirov commented 6 months ago

Appreciate it. I'll give it a shot. For the type arguments, is there a way to supply custom types? Or does it need to be a postgresql primitive?

sfackler commented 6 months ago

The types are the Postgres types.

rodya-mirov commented 6 months ago

So there's no way to avoid the double deserialization then, is there? If it's going to deserialize these things into postgresql's Jsonb type, then pass that to serde, I've still got to clone and clean up all those strings?

sfackler commented 6 months ago

You can use https://serde.rs/lifetimes.html.

rodya-mirov commented 6 months ago

That doesn't work if I've already deserialized the incoming cell data as a Jsonb object though, right? Or is Jsonb somehow delaying deserialization for this purpose?

sfackler commented 6 months ago

What do you mean by that? There isn't a type called Jsonb in the repo as far as I'm aware of.

rodya-mirov commented 6 months ago

No, there's no "type", but there is a Type. I'm trying to follow the sample code in the test and I need to supply an array of Type instances. I don't see how to pass in my own custom type here; if I put in Type::JSONB then I think I'm doing eager json deserialization, which I then have to pay twice for, as opposed to deserializing directly from the incoming stream. My code attempt looks like this:

    let stream = pg_conn.copy_out(MY_QUERY_HERE).await?;

    // TODO: how to pass custom types here? Or is it even important to do that?
    let row_stream = BinaryCopyOutStream::new(stream, &[Type::VARCHAR, Type::JSONB]);

    pin_mut!(row_stream);

    while let Some(row) = row_stream.next().await {
        let row = row.context("Handle error here")?;
        // do processing here
    }

(aside: the test code is for some reason copying to stdin, but the postgres spec says to copy to stdout, I don't know how it's possible for that to be a typo, but I find it confusing)

sfackler commented 6 months ago

if I put in Type::JSONB then I think I'm doing eager json deserialization

No, you aren't. You're telling the server the type of the value you want it to ship over the wire.

rodya-mirov commented 6 months ago

@sfackler Okay, great! It sounds like it's all working as intended. I appreciate the detailed attention you've given this issue as well as your sample code.

With love I say; is there a way this could have been a bit more discoverable? I was lost completely throughout the process; but clearly people are really using this crate (almost ten million downloads). I wonder if some of this information could find its way into the official documentation, or a curated examples folder, or etc.; I know these things take time.

Anyway for anyone that wanders into this issue later, here is what I think I've learned: