confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
44 stars 1.04k forks source link

Move toward a ksqlDB data model that is "zero copy" #6191

Open agavra opened 3 years ago

agavra commented 3 years ago

Is your feature request related to a problem? Please describe. See #6106 - our current data model requires many copies of the data that we read:

  1. there's bytes-in from Kafka (obviously unaviodable)
  2. we deserialize the data using the underlying data format (unavoidable)
  3. we copy that underlying data into the connect data type (avoidable)
  4. we copy that connect data type into a ksql-sanitized, case-insensitive version (avoidable)
  5. we copy that connect type field-per-field into GenericRow (avoidable, but negligible as it's not a deep copy)

It turns out that we spend more time on steps 3-5 (significantly) than we do on step 2. On a sample dataset (our SerdeBenchmark metrics data set) we spent as little as 13% of the deserialization time actually deserializing the data and then 87% just doing extra copies of the data. Just over 50% of that was spent in step 4.

Since (de)serialization is where the bulk of processing time is spent, fixing our data model is pretty high ROI.

Describe the solution you'd like There are multiple layers of optimization that we could apply to this data model - but all of them should strive to copy less data over:

  1. We should move from hard-copies to interfacing the underlying data. This "flips" the data model by using interfaces that just call the direct underlying data instead of eagerly copying it over.
  2. We could change to a lazy-validation model, where instead of eagerly validating all the data, we only validate data when it is used
  3. We could move away from connect data types altogether (avoiding step 3), which perform "unnecessary" copies of the data into the connect format. This would also indirectly help us in #4961. Alternatively, we could attempt to move the connect data model from hard-copies to delegation, but that might be more difficult given that the connect data format is an external contract in a widely used open-source system.

Here is a more detailed description of how (1) could look like. Each data type would be interfaced, some sketched out below:

interface IntegerData { Integer getData(); }
interface BigIntData { Long getData(); }
interface BooleanData { Boolean getData(); }
...
interface ArrayData<T> { T getData(int idx); }
interface StructuredData { Object getData(String field); }

Each different format that we support would have a different suite of implementations of these interfaces. For Avro, for example, we could implement the following:

class AvroIntegerData implements IntegerData {

  public AvroIntegerData(IndexedRecord record, int idx) {
    this.record = record;
    this.idx = idx;
  } 

  @Override
  Integer getData() {
    return (Integer) record.get(idx);
  }
}

Then for JSON we would implement something similar:

class JsonIntegerData implements IntegerData {

  public JsonntegerData(JsonNode value) {
    this.value = value;
  } 

  @Override
  Integer getData() {
    return value.intValue();
  }
}

Finally, each data type would also have a "ksqlDB Java" implementation that would allow users building UDFs to instantiate UDFs (we could be smart about primitives, and automatically wrap them in the corresponding interface. For the structured types, we could leverage the existing KsqlStruct for example

We would then need to rewrite our serializers to take in the generic interface instead of the connect type, and utilize that when performing serialization.

big-andy-coates commented 3 years ago

Our serde performance is likely hurting adoption and is something we should be starting to improve for sure.

I'm definitely in favour of removing the Connect overhead from our serde and those case-insensitive maps. I think this over complicates our serde and hurts performance. However, I'm slightly hesitant of type interfaces and lazy type checking.

The type interfaces may enable a zero-copy style model, but at a cost of complexity. I think ksqlDB should have its own type system, e.g. a KsqlStruct type, rather than Connect Struct etc, but these types should be sealed, so that we can be confident once the data has left the deserializer that we know what we have. This puts a boundary between the serde code and the streaming engine, which has many benefits, including being able to test them independently.

Type interfaces also delay the throwing of a deserialization exception until the data is accessed, muddying the water of what should go to the processing logger for deserialization errors and what's a processing error.

Lazy type checking can hurt performance if a value is used several times, and hence validated several times.

What we can do is capture the full set of columns, fields, array & map elements that a query needs when building the logical plan. Think of this as a reduced view over the actual schema of the source data. Then we can perform a one-time pass to validate each part that will be accessed is of the correct type, ignoring any data that won't be accessed. This gives us the benefits of the lazy validation without the risks of validating the same data twice, etc.

For example, given a source schema of:

COL0 INT, 
COL1 DECIMAL(4, 2), 
COL2 MAP<STRING, STRING>, 
COL3 STRUCT<F0 INT, F1 INT, F2 STRUCT< F2_1 INT, F2_2 INT>>

and a query:

SELECT COL0,  COL2["a"] + COL2["b"], COL3->F2->F2_1 FROM SRC GROUP BY COL0;

Then the logical model can very easily be extended to capture that the only data we need from the source is:

COL0 INT, 
COL2 MAP["a", "b"], 
COL3 STRUCT<F2 STRUCT<F2_1 INT>>

This reduced view can enable optimisations on deserialization.

We can extend our serde to accept this view, (maybe with a new SerdeFeature that indicates the format supports the view?), using it internally to reduce the amount of work it needs to do, e.g. the JSON serde can ignore many nodes in the graph.

agavra commented 3 years ago

Thanks for the thoughts @big-andy-coates! I agree that it makes sense to eagerly type-check (for all the reasons you spell out) - but I don't think this necessitates an intermediate ksql-specific format and an extra bytes-copy. In some formats, we can get type-safety for "free" (by that I mean not at a per-record cost), so we shouldn't penalize those formats to better support those that don't. Instead, we should do a (potentially in-place) eager check only for those formats that don't support this.

Take avro for example. If we use an avro schema to deserialize avro data, we can be guaranteed that the returned data will always have the same data types (or otherwise an exception will be thrown). That means we don't need to verify every deserialized record that it matches the schema that we want - we can verify the avro schema itself up front.

For JSON, which doesn't give us these guarantees (unless it's JsonSchema) we would need to be more clever. We could do one of two things: (1) we could copy it into our own internal format, thereby guaranteeing the type safety or (2) we could scan the data and type check without copying it. Both of these could be done at the deserializer level (before the ksqlDB interface) so that the rest of the code can assume that it is properly type safe.


You also brought up a point about "being able to test them independently". I think we should (and will need to) have a Java implementation that is independent of a serde format to test them:

Finally, each data type would also have a "ksqlDB Java" implementation that would allow users building UDFs to instantiate UDFs (we could be smart about primitives, and automatically wrap them in the corresponding interface. For the structured types, we could leverage the existing KsqlStruct for example

These are the types we would construct from UDFs and compiled expressions.