apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.37k stars 1.2k forks source link

Any plan to support JSON or JSONB? #7845

Open dojiong opened 1 year ago

dojiong commented 1 year ago

Is your feature request related to a problem or challenge?

Datafusion does not support JSON/JSONB datatype. Is there a plan to support it in the future?

Describe the solution you'd like

support JSON/JSONB data

Describe alternatives you've considered

Treat string/binary data as json/jsonb in json functions or json operators.

Additional context

json functions in postgres: functions-json

alamb commented 1 year ago

DataFusion supports reading JSON in and some basic things like field access (like json['field_name']

However, it doesn't have the range of operatos that postgres does

This is similar to https://github.com/apache/arrow-datafusion/issues/5493

It would be great if someone wanted to start working on adding more native JSON support (starting by spec'ing it out, writing the appropriate tickets), but at the moment I don't know of anyone doing do.

dojiong commented 1 year ago

DataFusion supports reading JSON in and some basic things like field access (like json['field_name']

Indexed field access is only valid for List or Struct types.

DataFusion/Arrow does not have JSON/JSONB datatype, nor the related json functions.


We could solve it in two ways:

  1. add Json/Jsonb type to Arrow, then support it in DataFusion natively
  2. only add json functions to DataFusion, treat utf8/binary as json/jsonb

The first one is more naturely, the type is more precise, and could seamless integrate with Parquet's JSON/BSON type.

The second one could be more faster to land, but there would be some type issues.

alamb commented 1 year ago

A third way could be to parse JSON data into Arrow Structs (which is what the json reader does now) and then improve the Struct support in DataFusion with the various JSON opertors.

One limitation of this approach is that it requires all the JSON records to have the same schema, which is often, but not always, the case. The more dynamic "JSON/BSON" type approach would allow it, but would not support as fast performance

alamb commented 1 year ago

I also think there is a solution that is part way between what @dojiong proposes in https://github.com/apache/arrow-datafusion/issues/7845#issuecomment-1767459328: store BSON as Binary in datafusion and then implement all datafusion functions that take binary arguments

dojiong commented 1 year ago

A third way could be to parse JSON data into Arrow Structs One limitation of this approach is that it requires all the JSON records to have the same schema

Yeah, JSON is schemaless, it's hard to fix schema to Arrow Struct.


store BSON as Binary in datafusion

That could be land quickly, without modifying the types of Arrow. But there are some limitations:

json_extract_path in postgres returns json data type, it's easy to process returned data(extract inner or convert type). Datafusion does not support convert binary to json element types (e.g. int, boolean). Then we should add json_extract_path_int/json_extract_path_str/json_extract_path_bool...., that's annoying.

If we use binary as BSON, we should consider a pleasure way to convert BSON data to other datatypes.

wjones127 commented 1 year ago

We could solve it in two ways:

  1. add Json/Jsonb type to Arrow, then support it in DataFusion natively
  2. only add json functions to DataFusion, treat utf8/binary as json/jsonb

A third option (or maybe you meant this by either 1 or 2): have JSON and BSON be extension types, so DataFusion could recognize them via field metadata. I think the Arrow community would be open to eventually have canonical extension types for these, given their prevalence in various formats, including Parquet.

If we use binary as BSON, we should consider a pleasure way to convert BSON data to other datatypes.

I wonder if json_extract_path could return a sparse union type of [Utf8, Int64, Float64, Boolean, List, JSON]. Then this type could be implicitly or explicitly casted to the expected type.

dojiong commented 1 year ago

have JSON and BSON be extension types, so DataFusion could recognize them via field metadata.

I didn't find ways to use extension type in datafusion, do you means add extension types support to DataFusion? But DataFusion use DataType in arrow, maybe it's more hard to add JSON type to Arrow.


I wonder if json_extract_path could return a sparse union type of [Utf8, Int64, Float64, Boolean, List, JSON].

That a way to archive, then json_extract_path should accepts union type as input (json_extract_path may accept the result of another json_extract_path in subquery)

dojiong commented 1 year ago

Maybe add ExtensionType to Arrow's DataType is more naturely:

trait ExtensionType {
    fn inner_type(&self) -> &DataType;
    // ....
}

enum DataType {
    Int8,
    Utf8,
    // ....
    Extension(Arc<dyn ExtensionType>)
}

impl DataType {
    fn raw_type(&self) -> &DataType {
        let mut ty = self;
        while let Self::Extension(ext) = ty {
             ty = ext.inner_type();
        }
        ty
    }
}
yukkit commented 1 year ago

@dojiong Discussion of support for extension type can be found at https://github.com/apache/arrow-rs/issues/4472

alamb commented 1 year ago

Related proposal for user defined types: https://github.com/apache/arrow-datafusion/issues/7923

thinkharderdev commented 1 year ago

We (Coralogix) built our own binary jsonb format (we call it jsona for json arrow) that we are planning on open-sourcing in the next couple months (hopefully Jan/Feb time frame, need to fill in some details) that has some nifty features for vectorized processing.

In broad strokes we took the Tape representation and tweaked that a bit. So a JsonaArray is encoded as

        DataType::Struct(Fields::from(vec![
            Field::new(
                "nodes",
                DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
                false,
            ),
            Field::new(
                "keys",
                DataType::List(Arc::new(Field::new(
                    "item",
                    DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
                    true,
                ))),
                false,
            ),
            Field::new(
                "values",
                DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
                false,
            ),
        ]))

So you have three child arrays: nodes - Basically TapeElement encoded as a u32 where the top 4 bits encode the type (StartObject,EndObject,StartArray,EndArray,Key,String,Number) and the bottom 28 bits store an offset keys - The JSON keys stored in a dict array values - The JSON leaf values

If this is something other's would be interested in, we would be happy to upstream it into arrow-rs proper as a native array type (not in the sense of adding the the arrow spec but as something with "native" APIs in arrow-rs to avoid the ceremony around dealing with struct arrays) and add support in DataFusion.

The benefits of this over just using JSONB in a regular binary array (and the reason we built it) are roughly:

  1. You can test for existence/non-existence/nullity of a json path using just the nodes/keys arrays which are generally quite compact and cache friendly. This is especially helpful in cases where you are doing predicate pushdown into parquet since you can potentially prune significant IO from reading the values array
  2. Manipulating the "structure" (removing paths, inserting paths, etc) are quite efficient as they are mostly manipulating the nodes array.
  3. It's very efficient to serialize back to a JSON string
philippemnoel commented 9 months ago

Definitely interested!

alamb commented 9 months ago

Cross posting. There are some interesting ideas in https://github.com/apache/arrow-datafusion/discussions/9103#discussion-6168066

alamb commented 9 months ago

I think there are two major themes here:

Theme 1

How to query such semi-structured data.

DataFusion today supports the Arrow type system, which while powerful can not describe arbitrary json schemas. For example you have to know up front what all the possible fields are for a StructArray

Systems like Postgres handle this by adding a new type and operators (see Postgres Docs for example the containment operators

-- This array contains the primitive string value:
SELECT '["foo", "bar"]'::jsonb @> '"bar"'::jsonb;

-- String exists as array element:
SELECT '["foo", "bar", "baz"]'::jsonb ? 'bar';

-- String exists as object key:
SELECT '{"foo": "bar"}'::jsonb ? 'foo';

-- Object values are not considered:
SELECT '{"foo": "bar"}'::jsonb ? 'bar';  -- yields false

Theme 2: how to store such data efficiently

Once we have some sort of idea how to query the data then there is the question about how to store / implement those queries efficiently. I think that is where https://github.com/apache/arrow-datafusion/issues/7845#issuecomment-1779821351 from @thinkharderdev and https://github.com/ArroyoSystems/arroyo/pull/477 from @mwylde are coming from

Suggestions on paths forward

I personally think it may be tough to reach a consensus on storing JSON vs BSON vs JSONA as they seem to have different usecases and pros/cons (interop vs performance for example)

However, I could easily see this supported as "extension types" (basically views over StringArray/BinaryArray).

I think we could implement such support today using functions, albiet with a poor UX. Like why can't we do something like

-- create jsonb constant from a json string, like `make_json`
select make_jsonb('{"foo": "bar"}');

-- create json string from jsonb
select json_from_jsonb(<binary json>);

--  test if data in binary_column contains the structure {foo: bar}
select jsonb_contains(binary_column, make_jsonb('{"foo": "bar"}');

🤔

mwylde commented 9 months ago

Our immediate concern (which motivated our json extension type and the changes in https://github.com/ArroyoSystems/arrow-rs/tree/49.0.0/json) is being able to support partial deserialization/serialization of json.

The arrow typesystem is necessarily limited compared to the schemas enabled by, for example, json-schema, and for a system like ours it's important to be able to handle real-world json with arbitrarily complex schemas.

We do that by taking json schema and attempting to convert it into arrow as far as that's possible; when we encounter a subschema that isn't representable as arrow, we use utf8 with the json extension type. Then, with those arrow-json changes we're able to partially deserialize/serialize the arrow-schema fields, while leaving the unsupported fields as string-encoded json.

Similarly, for tables defined via SQL DDL, we support a JSON type that has the same behavior.

We would be interested in more native json support, particularly if we could parse the json once and store it in some form that enables more efficient json functions (which seems to be the direction that @thinkharderdev is going in with the tape representation).

alamb commented 9 months ago

Similarly, for tables defined via SQL DDL, we support a JSON type that has the same behavior.

How do people query such types? Do you have native operator support (like postres @>)? I looked briefly in The Arroyo Docs but I didn't see any mention of JSON yet

BTW Adding something new to the arrow spec is possible, but it is quite a process.

There is a slightly less intense way here: https://arrow.apache.org/docs/format/CanonicalExtensions.html

However, I think it might be good to figure out how to support this initially and then consider trying to standardize. It isn't clear to me there is a consensus on the actual storage / representation yet.

It seems there is a consensus on the need to be able to access / query types with an arbitrary structure (JSON, JSONB, etc)

mwylde commented 9 months ago

We support a few JSON functions (https://doc.arroyo.dev/sql/scalar-functions#json-functions) for querying JSON data, and for more complex needs users can write Rust UDFs with serde_json. In practice both approaches seem pretty widely used.

rtyler commented 9 months ago

From a user's standpoint I've run into this now from a Datafusion SQL standpoint. As a SQL user I am hurting mostly by the lack of a get_json_object() function to use in order to extract keys from within strings.

For example, Delta Lake has no native JSON type, so my applications rely on String columns which are known a priori to contain JSON data, and then a common query pattern might be:

SELECT id, get_json_object(ctx_data, '$.name') as name FROM sometable WHERE ds = current_date()

Where the schema might be:

+-------------------+
|     id   |  Int64 |
| ctx_data |  Utf8  | 
+-------------------+

In the meantime I might just shove a UDF into place.

abuisman commented 8 months ago

I'd very much like it if there were jsonb or json support. I want to use the pg_analytics extension and they use datafusion.

alamb commented 8 months ago

I think we are now pretty close to being able to support JSON / JSONB via scalar functions

The basic idea might be:

  1. Implement ScalarUDFs for the relevant JSON/BSON functioanlity
  2. Implement a rewrtier that rewrites whatever syntax you want on those columns into the corresponding function calls

This is basically how Array function support is now implemented

Here is the library of array functions https://github.com/apache/arrow-datafusion/tree/main/datafusion/functions-array/src

Here is the rewrite

https://github.com/apache/arrow-datafusion/blob/1e4ddb6d86328cb6596bb50da9ccc654f19a83ea/datafusion/functions-array/src/rewrite.rs#L40-L180

alamb commented 8 months ago

FYI this topic came up in our first meetup https://github.com/apache/arrow-datafusion/discussions/8522

samuelcolvin commented 7 months ago

@alamb if you're interested in JSON parsing support I might be interested in contributing.

We (Pydantic) maintain a very fast Rust JSON parser (generally significantly faster thane serde-json), jiter which I think would be perfect for this case as it allows you to iterate over the JSON, rather than materialize the data before querying it.

(Jiter is pretty stable and included in pydantic-core, meaning it's the most downloaded non-std JSON parser for Python)

I did a brief experiment with using Jiter to implement a json_contains method (equivalent of the ? operator in postgres) as a scalar UDF, which looked like this:

fn json_contains(args: &[ColumnarValue]) -> Result<ColumnarValue> {
    // first argument is the array of json values
    let json_values = match &args[0] {
        // ColumnarValue::Array(array) => as_generic_binary_array::<i32>(array),
        ColumnarValue::Array(array) => as_string_array(array),
        _ => panic!("first argument: unexpected argument type"),
    };

    // second argument is the key name
    let key_name = match &args[1] {
        ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => s.clone(),
        _ => panic!("second argument: unexpected argument type"),
    };

    let array = json_values
        .iter()
        .map(|json| {
            if let Some(json) = json {
                Some(jiter_json_contains(json.as_bytes(), &key_name))
            } else {
                None
            }
        })
        .collect::<BooleanArray>();

    Ok(ColumnarValue::from(Arc::new(array) as ArrayRef))
}

fn jiter_json_contains(json_data: &[u8], expected_key: &str) -> bool {
    let mut jiter = Jiter::new(json_data, false);
    let first_key = match jiter.next_object() {
        Ok(Some(key)) => key,
        _ => return false,
    };
    if first_key == expected_key {
        return true;
    }
    // TODO we should skip over the next value, rather than fully parse it, needs a small change to jiter
    if jiter.next_value().is_err() {
        return false;
    }
    while let Ok(Some(key)) = jiter.next_key() {
        if key == expected_key {
            return true;
        }
        // TODO we should skip over the next value, rather than fully parse it, needs a small change to jiter
        if jiter.next_value().is_err() {
            return false;
        }
    }
    false
}

Using that and querying a set of parquet files (28gb), even without the next_skip optimisation I mentioned above, the performance is comparable with duckDB:

--------------------- count(*) ---------------------
-- duckdb:
  SELECT count(*) FROM records_parquet_local;
  -- 102275349 in 99ms

-- datafusion:
  SELECT count(*) FROM records;
  -- 102275349 in 58ms

--------------------- ilike span_name ---------------------
-- duckdb:
  SELECT count(*) FROM records_parquet_local where span_name ilike 'converting traces to database%';
  -- 3151751 in 1.805s

-- datafusion:
  SELECT count(*) FROM records where span_name ilike 'converting traces to database%';
  -- 3151751 in 1068ms

--------------------- ilike span_name, list attributes ---------------------
-- duckdb:
  SELECT attributes FROM records_parquet_local where span_name ilike 'converting traces to database%';
  -- 7.6s

-- datafusion:
  SELECT attributes FROM records where span_name ilike 'converting traces to database%';
  -- 13.9s

--------------------- json_contains attributes ---------------------
-- duckdb
  SELECT count(*) FROM records_parquet_local where list_contains(json_keys(attributes), 'size');
  -- 6165747 in 11.5s

-- datafusion
  SELECT count(*) FROM records where json_contains(attributes, 'size');
  -- 6165747 in 18.0s

The slowdown relative to duckdb of using json_contains is about the same as just accessing the attributes column.

of course, if you want to use an alternative storage structure for a custom JSONB type similar to postgres or msgpack, then jiter won't help, but that would be much more work unless there's an existing rust library that provides serialization and iterable deserialization?

adriangb commented 7 months ago

For what it’s worth I think having the ability to performantly parse JSON stored as a String or Binary is valuable in and of itself. You don’t always control how the data gets stored. If a binary format improves performance that’s a great option for when you can control the storage format.

samuelcolvin commented 7 months ago

tiny update to my example above, I realised there’s a much better comparison query:

-- datafusion
  SELECT count(*) FROM records where json_contains(attributes, 'size');
  -- 6165747 in 18.0s
-- datafusion
  SELECT count(*) FROM records where attributes like '%"size":%';
  -- 6165747 in 15.4s
-- datafusion
  SELECT count(*) FROM records where attributes ilike '%"size":%';
  -- 6165747 in 22.6s

so even unoptimised, jiter/json_contains is only 16% slower than like and already faster than ilike.

alamb commented 7 months ago

Nice -- thank you for the offer and information @samuelcolvin and @adriangb

High level proposal

I think it would initailly possible to implement JSON support using the existing datafusion apis (and implement it in a separate crate). I think @philippemnoel is also interested in such an endeavor as well.

Using the existing extension APIs would both

  1. Allow initial iteration to go faster (without the ASF governance)
  2. Ensure that the datafusion APIs are sufficient for the usecase

While I don't think I personally have the bandwidth to help implement the JSON functionality I think the API design is critical to the success of DataFusion and would be very interested in helping make it happen.

Specific proposal

So, in this case the solution might look like

  1. Create a new repo in crate like datafusion-functions-json
  2. Follow the the model of the functions-array that are built into DataFusion (kudos to @jayzhan211 for making most of that happen): https://github.com/apache/arrow-datafusion/blob/19356b26f515149f96f9b6296975a77ac7260149/datafusion/functions-array/src/lib.rs#L102-L149
  3. Add the appropriate functions (like json_contains) as ScalarUDFs
  4. Add a rewrite pass to rewrite json operators like --> into the appropriate function calls (see ArrayFunctionRewriter for an example: https://github.com/apache/arrow-datafusion/blob/19356b26f515149f96f9b6296975a77ac7260149/datafusion/functions-array/src/rewrite.rs#L41-L64

Then using the JSON functionality would look something like

let ctx = SessionContext::new();
datafusion_json_functions::register_all(&ctx)?;

let results = ctx.sql("SELECT count(*) FROM records where json_contains(attributes, 'size')
  .await?
.collect()?;

Next Steps

Once we have that working I think we could then discuss with the community if we should incorporate this set of functions into the main datafusion repo for maintenance (given the demand I suspect this would be a popular idea)

New Repo

If anyone is interested. I created https://github.com/datafusion-contrib/datafuison-functions-json in the datafusion-contrib organization. I can add you as admin if you are interested.

samuelcolvin commented 7 months ago

This is great news @alamb.

Just to confirm, are you happy to use jiter and the approach I described above in datafuison-functions-json?

If so, I'm happy to help both implementing the functions and making some alterations in jiter to make things faster. Feel free to invite me, or I can just submit PRs.

alamb commented 7 months ago

This is great news @alamb.

Just to confirm, are you happy to use jiter and the approach I described above in datafuison-functions-json?

If you are going to write the code I think you should decide how to do so :)

If we get to the point where we want to propose moving the code from datafuison-functions-json into datafusion (for maintenance / apache governance) we would likely evaluate benefits of using the arrow-json reader instead of a new dependency, but there is no reason not to start with jiter (and maybe we'll find areas it is better than arrow-json)

If so, I'm happy to help both implementing the functions and making some alterations in jiter to make things faster.

That is a great idea. I suspect that will make both DataFusion and jiter better

Feel free to invite me, or I can just submit PRs.

I invited you -- I don't think I will likely have time to work on / review PRs at this time but I suspect others on this ticket may. Please chime in if you do

WenyXu commented 7 months ago

Hi @alamb, We (GreptimeDB) are also planning to support JSON. We are participating in the OSPP (Open Source Promotion Plan), similar to Google Summer of Code, and are currently recruiting students to implement this feature. We aim to find an efficient and elegant solution for storing JSON data. As part of this effort, we also plan to evaluate various storage implementations of JSON formats in the OLAP scenario.

Thank you for @thinkharderdev's work. JSONA greatly inspired me. I propose a JSONA variant, which may work well in our scenario.

A naive proposal of JSONA variant

For JSON [false, 10, {"k":"v"}, null] can be stored as the following struct.

Struct {
    Nodes: [StartArray, FALSE, Number, StartObject, Key, String, EndObject, NULL, EndArray]
    Offsets: [NULL, NULL, 0, NULL, 0, 0, NULL,…]
    Keys: ["k"]
    Strings: ["v"]
    Numbers: [10]
}

The Struct data can be efficiently encoded into compact files using the underlying file format. In our scenario, we use the Parquet as the underlying file format. For instance, the Nodes field can be represented as UINT8 and efficiently encoded using default dictionary encoding.

alamb commented 7 months ago

Thanks @WenyXu -- sounds very neat. FWI I think @samuelcolvin is also thinking about the representation in https://github.com/datafusion-contrib/datafusion-functions-json/issues/2

samuelcolvin commented 7 months ago

https://github.com/datafusion-contrib/datafusion-functions-json now provides the following methods, I think we're nearly ready for a first release, see https://github.com/datafusion-contrib/datafusion-functions-json/issues/5.

Cast expressions with json_get are rewritten to the appropriate method, e.g.

select * from foo where json_get(attributes, 'bar')::string='ham'

Will be rewritten to:

select * from foo where json_get_str(attributes, 'bar')='ham'

Oh, and as per the micro-benchmarks on https://github.com/pydantic/jiter/pull/84, the performance of these methods should now be on a par with duckdb.

@alamb, you might want to take a look at the serde-json (which arrow-json uses) case I added - ignoring the overhead of running a query it's 4x slower than jiter :)

mwylde commented 7 months ago

@samuelcolvin this is awesome! I'm very excited to try it in Arroyo.

samuelcolvin commented 2 months ago

I've done some further digging into fast querying of semi-structured data, in particularly I have a prototype library "batson" (binary alternative to (J)SON), https://github.com/pydantic/jiter/pull/136.

Batson is heavily inspired by Postgres's JSONB type and snowflake's Variant type (see https://github.com/datafusion-contrib/datafusion-functions-variant/issues/11), and the apache initiative Open Variant. The main advantages of batson are it maintains order of items in objects, ints > i64::MAX are supported via rust's BigInt.

Performance:

(for reference, and for those wondering why datafusion-functions-json doesn't use serde-json, batson is 106x and 588x faster than serde-json in the same cases respectively)

So far so good.


The problem is that batson isn't actually that much faster than datafusion-functions-json using jiter (DFJ) in real world uses cases:

The improvement would be even worse in cases (like ours) where the data is actually being read from an object store.

The main difference is that decompression takes up the lion’s share of time when reading parquet files locally.

Running a query like select count(*) from records where json_contains(attributes, 'needle') over 1m rows:

In memory:

+-----------------+
| count(Int64(1)) |
+-----------------+
| 105461          |
+-----------------+
mode: FilterJson zstd(1), query took 600.49625ms
+-----------------+
| count(Int64(1)) |
+-----------------+
| 105461          |
+-----------------+
mode: FilterBatson zstd(1), query took 75.818625ms

From parquet:

running queries...
+-----------------+
| count(Int64(1)) |
+-----------------+
| 105461          |
+-----------------+
mode: FilterJson zstd(1), query took 1.143363958s
+-----------------+
| count(Int64(1)) |
+-----------------+
| 105461          |
+-----------------+
mode: FilterBatson zstd(1), query took 604.522375ms

(happy to share the code I used, if it helps anyone)

We can see why that is using samply: image

image

So making batson faster, or implementing it at all isn't immediately that useful.

Simple data shredding (as described in Open Variant here) won’t help I believe since datafusion will get and decompress the body column if they’re included in the query, even if no rows in the body column are accessed.

ways forward:

Any pointers from those who understand datafusion better than me (👀 @alamb @andygrove @jayzhan211) on the best way forward would be much appreciate.

jayzhan211 commented 2 months ago

select count(*) from records where json_contains(attributes, 'needle')

What is the the explained plain for this query? Maybe we could push down filter json_contains(attributes, 'needle') to parquet then only the filtered data is decompressed? 🤔

alamb commented 2 months ago

somehow get filter pushdown / late materialization to work based on the result of a UDF so some columns aren't decompressed (or even aren't fetched) unless they're needed

This seems like the right idea to pursue to me

The ParquetExec already can push filtering down into the scan via

datafusion.execution.parquet.pushdown_filters true

(we have it on in InfluxData)

But it is not turned on by default as it isn't faster for all cases yet: https://github.com/apache/datafusion/pull/12524

samuelcolvin commented 2 months ago

I've shared the code I'm using to run these tests: https://github.com/samuelcolvin/batson-perf

Unfortunately setting datafusion.execution.parquet.pushdown_filters to true makes things slightly worse:

code/batson-perf 130 ➤ cargo run --release                                             
   Compiling jiter v0.5.0 (https://github.com/pydantic/jiter.git?branch=batson#6513df59)
   Compiling batson v0.5.0 (https://github.com/pydantic/jiter.git?branch=batson#6513df59)
   Compiling batson-perf v0.1.0 (/Users/samuel/code/batson-perf)
    Finished `release` profile [optimized] target(s) in 2.47s
     Running `target/release/batson-perf`
loading data from records_1000000_zstd(1).parquet
running queries...
+-----------------+
| count(Int64(1)) |
+-----------------+
| 105461          |
+-----------------+
mode: FilterJson zstd(1), pushdown_filters: false, query took 1.123133083s
+-----------------+
| count(Int64(1)) |
+-----------------+
| 105461          |
+-----------------+
mode: FilterBatson zstd(1), pushdown_filters: false, query took 544.314375ms
code/batson-perf 0 ➤ cargo run --release
   Compiling batson-perf v0.1.0 (/Users/samuel/code/batson-perf)
    Finished `release` profile [optimized] target(s) in 1.53s
     Running `target/release/batson-perf`
loading data from records_1000000_zstd(1).parquet
running queries...
+-----------------+
| count(Int64(1)) |
+-----------------+
| 105461          |
+-----------------+
mode: FilterJson zstd(1), pushdown_filters: true, query took 1.426721083s
+-----------------+
| count(Int64(1)) |
+-----------------+
| 105461          |
+-----------------+
mode: FilterBatson zstd(1), pushdown_filters: true, query took 957.441ms

With pushdown_filters enabled, the flamegraph looks completely different (there are two blocks of decompression, with the batson parsing in the middle) (Update, actually three blocks of decompression, one fairly small)

However, overall decompression still dominates:

image

You can view this flamegraph properly here.

Since these flamegraphs run right to left (you can see when functions were actually running by selecting them and viewing the timeline), the chronology seems to be:

My guess is that pushdown_filters isn't working correctly?

alamb commented 2 months ago

My guess is that pushdown_filters isn't working correctly?

I think this falls into the category of "pushdown_filters still needs more optimization so it doesn't slow down" -- it is a matter of engineering I think (probably preventing the same column from being decoded more than once e.g. https://github.com/apache/arrow-rs/issues/6454#issuecomment-2374726218

thinkharderdev commented 1 month ago

https://github.com/samuelcolvin/batson-perf

I think this is a general issue with low-selectivity filters pushed down to the parquet scan. How the row filtering works now is that the column will be decoded once to execute the filter and then decoded again to produce the output batches. If the decoding time is non-trivial (eg it requires zstd decompression of a lot of data) and the filter is not particularly selective then the redundant decoding can easily more than offset the cost of just materializing the whole column and filtering.

When the row filtering was initially implemented we discussed keeping the decoded data cached but ended up deciding against it because it can potentially consume a lot of memory

alamb commented 1 month ago

When the row filtering was initially implemented we discussed keeping the decoded data cached but ended up deciding against it because it can potentially consume a lot of memory

Yes I would like to revisit this assumption eventually but I need to study the code more to convince myself it is possible

tustvold commented 1 month ago

you might want to take a look at the serde-json (which arrow-json uses) case I added - ignoring the overhead of running a query it's 4x slower than jiter :)

FWIW arrow-json doesn't make use of serde-json for reading or writing anymore - https://github.com/apache/arrow-rs/pull/3479 https://github.com/apache/arrow-rs/pull/5318

adriangb commented 2 weeks ago

Is there any way to do per-batch rewrites, or otherwise modify how an operator gets applied?

The "traditional" way to getting performance from JSON data in an analytical system is to dynamically create columns for keys (e.g. ClickHouse). For our use case that breaks down because the data is extremely variable, e.g. in one place user may be an int and somewhere else it's a str. There is also overhead of keeping track of which columns have their own column and which don't, what the type of each key/column is, etc. But within a small chunk of data the keys are much more likely to be homogeneous and thus the win of breaking them out into their own column much larger. I'm wondering if we could do per record batch rewrites such that given the query json_column->>'foo' for each record batch we check if there is a column called __json_column__foo in the file and if so pull the value from there. This would mean that at write time we'd have to introspect the json data to determine which keys have enough repetition, have a stable type, etc. to be worth breaking out. But there'd be no need to maintain any global state around which keys map to which columns, etc.

alamb commented 2 weeks ago

FWIW DataFusion has pretty strong assumptions that the schema matches (aka inserting extra columns would likely be challenging)

One thing to potentially look into is building an "inverted index" externally maybe 🤔 That is basically "state of the art" in text search / search engines 🤔