confluentinc / kafka-connect-storage-cloud

Kafka Connect suite of connectors for Cloud storage (Amazon S3)
Other
13 stars 330 forks source link

Support multiple schemas in a topic #247

Open papa99do opened 5 years ago

papa99do commented 5 years ago

Kafka supports multiple types of schema in one topic with this PR: https://github.com/confluentinc/schema-registry/pull/680. This introduced a problem for S3 connector, since every time it encounters a different type of schema, it will create a new file in S3 bucket, which will result in tons of small files in this case.

Can we support this by writing different types of events in the same topic to separate S3 files ( with the schemas full qualified name in the path of the file)?

papa99do commented 5 years ago

Just to check if there's any update on this ticket? io.confluent.connect.s3.TopicPartitionWriter will rotate a file whenever the compatibility check (with the last schema) fails. And the name of a file only has tp.topic() in it. In order to support mutliple types of schema in one topic, we might need to create a new TopicPartitionWriter which is message-type-aware.

Any thoughts? Or any suggestions how I can work around this. It looks to me a general problem for anyone having multiple types in a topic.

teabot commented 4 years ago

As a workaround, how feasible would it be to set up one connector per type and apply a type specific filter in each? Obviously not great from a resource usage perspective as it'll require N consumers for N types, but I'd be keen to know the feasibility of this approach.

corbinbs commented 4 years ago

One solution for getting the schema name in the S3 key is to create a custom storage partitioner class. For example, if you extend io.confluent.connect.storage.partitioner.TimeBasedPartitioner and override the encodePartition methods, you can get the schema name via sinkRecord.valueSchema().name().

sinkRecord.valueSchema().name() + this.delim + timeBasedPartitionerEncodedPartition

that would result in something like topics/{topic_name}/{schema_name}/{time_partition}/file.avro in the bucket that the S3 Sink is targeting.

Of course, this doesn't address the many small files that could be created if the topic has a lot of events alternating back and forth between different types. If you can tolerate that, this approach might help get multiple event types from a single topic organized in the S3 bucket so that it'd be suitable for querying with something like AWS Athena, etc. Once the files land in S3, you could periodically roll them up into more optimized file sizes to help control the number of files.

papa99do commented 4 years ago

Hi @corbinbs , Thanks for the suggestion. That's a brilliant idea. The only issue was the fragmented files which I managed to work around using the following changes.

First of all, the part causing small files is not where compatibility check is done, but rather where the encodedPartition is compared to decide whether rotation on time should happen. The problem could be solving by delegating the comparison to the partitioner.

// In TopicPartitionWriter.java
// rotateIntervalMs > 0 implies timestampExtractor != null
    boolean periodicRotation = rotateIntervalMs > 0
        && timestampExtractor != null
        && (
        recordTimestamp - baseRecordTimestamp >= rotateIntervalMs
   -         || !encodedPartition.equals(currentEncodedPartition)
  +         || partitioner.shouldRotateOnTime(encodedPartition, currentEncodedPartition)
    );

And by default the partitioner will just compare these the encodedPartition with currentEncodedPartition.

// Add a method in Partitioner interface
default boolean shouldRotateOnTime(String encodedPartition, String currentEncodedPartition) {
    return !encodedPartition.equals(currentEncodedPartition);
  }

And we can then implement a custom partitioner to override this comparison logic to just compare the date/time part of the encoded partition. Something like this.

public class SchemaAwareTimeBasedPartitioner<T> extends TimeBasedPartitioner<T> {
  @Override
  public String encodePartition(SinkRecord sinkRecord, long nowInMillis) {
    return sinkRecord.valueSchema().name() + getDelim()
            + super.encodePartition(sinkRecord, nowInMillis);
  }

  @Override
  public String encodePartition(SinkRecord sinkRecord) {
    return sinkRecord.valueSchema().name() + getDelim() + super.encodePartition(sinkRecord);
  }

  @Override
  public boolean shouldRotateOnTime(String encodedPartition, String currentEncodedPartition) {
    return !timePart(encodedPartition).equals(timePart(currentEncodedPartition));
  }

  private String timePart(String encodedPartition) {
    return encodedPartition.split(getDelim(), 2)[1]; // TODO more validation here
  }

  private String getDelim() {
    return (String) config.get(StorageCommonConfig.DIRECTORY_DELIM_CONFIG);
  }
}

I tested this solution and it worked for me. This is the minimum change out of my mind to support multi-schema topic, which won't impact the original behaviour. Any thoughts or concerns?

corbinbs commented 4 years ago

@papa99do I don't have tons of experience in some of those areas of code you mentioned... but I do have some event streams that could benefit from this optimization to avoid tons of small files.

I'd be happy to help test out a possible solution to this problem as a way to help contribute back.

Are you working from a fork/branch that I could try out and report back after giving it some runtime?

corbinbs commented 4 years ago

@papa99do actually - I just re-read your comment. Just to make sure I'm following along OK, you're saying that you've avoided lots of small files on topics that can include events with different schemas strictly through the use of the SchemaAwareTimeBasedPartitioner class (from your comment) as the partitioner.class on your S3 Sink configurations?

Also - what version(s) of Kafka Connect / S3SinkConnector did you evaluate that on?

yihanzhao-afterpay commented 4 years ago

Hi @corbinbs. Thanks for the feedback. Yes, I changed the TopicPartitionWriter.java to delegate the logic to decided whether we need to rotate the files to Partitioner, and also introduced a new partitioner to ignore the rotation caused by switching schema of different events.

I changed based on 5.5.0-post branch. I'll probably fork it in the next few weeks. I can probably attach a patch file if it helps for now. Patches.zip

In the zip file: Introduce_SchemaAwareTimeBasedPartitioner.patch should be applied to kafka-connect-storage-common project Support_multi-schema_topic.patch should be applied to kafka-connect-storage-cloud project

corbinbs commented 4 years ago

@papa99do just following up on this... I've been successfully running with this approach for some high volume event streams that include different event types on the same topic. I've got roughly a month of run time on that now and things have been landing nicely in S3 behind the proper event type/date partition paths 👍

yihanzhao-afterpay commented 4 years ago

@corbinbs , Thanks for the updates. That's great news.

akashnawani commented 4 years ago

Hi @yihanzhao-afterpay. Please correct me if I am wrong, this problem of files rotating very frequently happens only when rotate.interval.ms is set. I am using a combination of FieldPartitioner and RecordNameStrategy with multiple schemas flowing through one topic but not noticing this issue.

ron-damon commented 3 years ago

Hi @papa99do @yihanzhao-afterpay, great work!!! Are you planning to fork both projects and open a PR?

subban545 commented 2 years ago

@papa99do / @corbinbs / @akashnawani , I have the same requirement where different event types will be written to Kafka topic and then sync to different folders in S3 based on schema/event type.

Could you please let me know the approach for this. Thanks