IBM / cloudant-kafka-connector

A Kafka connector for Cloudant
Apache License 2.0
11 stars 12 forks source link

Add SMT for schema flattening #63

Closed ricellis closed 2 years ago

ricellis commented 2 years ago

As a replacement for the cloudant.value.schema.struct.flatten option provide a SMT that can perform that flattening.

This is useful for e.g. making structures more compatible with a SQL database column than JSON objects.

In fact there is one available already for nested data structures org.apache.kafka.connect.transforms.Flatten which we can document in an example. Note, however, that this will not flatten arrays, which may be desirable if using Cloudant as a source and JDBC, for example, as a sink.

We may wish to provide an additional array flattening SMT or find another open-source one to document as an example.

ricellis commented 2 years ago

Couple of points to note:

Examples (shown as JSON) but in practice they will be records of Struct or Map

Example 1, an array of strings:

  1. Flatten array
    {
    "m": {"foo": "bar"},
    "a": ["foo", "bar"]
    }
{
  "m": {"foo": "bar"},
  "a.0": "foo",
  "a.1": "bar"
}
  1. Built-in flatten map
    {
    "m": {"foo": "bar"},
    "a.0": "foo",
    "a.1": "bar"
    }
{
  "m.foo": "bar",
  "a.0": "foo",
  "a.1": "bar"
}

Example 2, an array of maps

  1. Flatten array
    {
    "m": {"foo": "bar"},
    "a": [{"foo": "bar"}, {"boo": "baz"}]
    }
{
  "m": {"foo": "bar"},
  "a.0": {"foo": "bar"},
  "a.1": {"boo": "baz"}
}
  1. Built-in flatten map
    {
    "m": {"foo": "bar"},
    "a.0": {"foo": "bar"},
    "a.1": {"boo": "baz"}
    }
{
  "m.foo": "bar",
  "a.0.foo": "bar",
  "a.1.boo": "baz"
}

If the schema modification makes it hard to do this in the SMT we could leave the array flattening as an option in our source connector (i.e. pre-record creation) and just use the built-in map flattening.

ricellis commented 2 years ago

After a bit more investigation I can add:

Arrays must be flattened first otherwise the built-in flatten will miss their content.

Actually although this limitation was documented in Kafka some time ago it has been resolved in the built-in map flatten trasnform in more recent releases so it is not a problem.

Mixed type arrays are not supported

This is a limitation of Struct, but it is something that can be worked around if arrays are flattened before creating the Struct.

Further a current limitation of our built-in flattening option is that operates as part of building a schema/struct, so it requires the generate schemas option to be used (it is effectively an extension of it). Given our move towards using SMTs in general rather than having a plethora of options on the source connector itself and the fact that the built-in Map flatten is already also an SMT I think it makes sense to move the schema generation also into an SMT.

The outcome from this task would then be:

  1. The source connector task will produce [schemaless] Map values on its emitted SourceRecords. (This is also convenient as the Java representation of the JSON document is already (mostly) Map based anyway).
  2. The existing flattening is replaced by 2 SMTs (1 kafka built-in for map flattening) and 1 new one provided by us for array flattening. The former can operate on Map|Struct, but the intent for our new array flattening SMT would be to operate only on Map (as it is simpler and avoids the need to rewrite schemas) and apply it before any transformation to Struct (thus allowing a workaround for mixed type arrays).
  3. The existing schema/struct generation option is replaced by a new SMT that operates on Maps and converts them to Schema/Struct.

This not only has the advantage of largely simplifying the amount of transformation code needed, but it also makes all three transformations completely independent and thus:

Further I think it makes it possible to remove direct calls to GSON in the SourceConnector alllowing JSON serde to be handled independently and opaquely in the cloudant-java-sdk and Kafka converters respectively, effectively future-proofing against underlying serde provider changes.

The only limitations are: