confluentinc / kafka-connect-storage-cloud

Kafka Connect suite of connectors for Cloud storage (Amazon S3)
Other
13 stars 332 forks source link

Does it support generating topics.dir from topics.regex for s3 folder structure? #654

Open akshay-agarwal-17 opened 1 year ago

akshay-agarwal-17 commented 1 year ago

Can i populate topics.dir dynamically from the actual topic name. I want to write the files on specific path -- 0/single_sink/metis/{schema}/{table} using topics.dir

Below is the current connector config -

{
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "s3.region": "ap-south-1",
    "topics.dir": "0/single_sink/metis/${topic}",
    "flush.size": "10000",
    "tasks.max": "1",
    "s3.part.size": "67108864",
    "timezone": "Asia/Calcutta",
    "rotate.interval.ms": "60000",
    "locale": "en_GB",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "s3.bucket.name": "zeta-aws-aps1-metis-0-s3-pvt",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "partition.duration.ms": "86400000",
    "schema.compatibility": "NONE",
    "topics.regex": "cdc_0_metis_metis_0_pgdb.*",
    "parquet.codec": "gzip",
    "connect.meta.data": "true",
    "parquet.avro.write-old-list-structure": "false",
    "value.converter.schema.registry.url": "https://schemaregistry-metis.internal.mum1-pp.zetaapps.in:443",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "name": "cdc_0_metis_metis_0_pgdb.single_sink_zeta-aws-aps1-metis-0-s3-pvt_ap-south-1",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "path.format": "'date'=YYYY-MM-dd",
    "rotate.schedule.interval.ms": "180000",
    "timestamp.extractor": "RecordField",
    "key.converter.schema.registry.url": "https://schemaregistry-metis.internal.mum1-pp.zetaapps.in:443",
    "timestamp.field": "cdc_source_ts_ms",
    "transforms": "Filter",
    "transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
    "transforms.Filter.predicate": "IsFoo",
    "transforms.Filter.negate": "true",
    "predicates": "IsFoo",
    "predicates.IsFoo.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
    "predicates.IsFoo.pattern": "cdc_0_metis_metis_0_pgdb.metis.cdc_heartbeat,cdc_0_metis_metis_0_pgdb.metis.cdc_signal,cdc_0_metis_metis_0_pgdb.transactions"
}

@kkonstantine pls help

akshay-agarwal-17 commented 1 year ago

Can someone pls help to point me to the block of code for adding my code changes to support this, in case its not supported out of the box. thanks

akshay-agarwal-17 commented 1 year ago

@kkonstantine could you pls help.

akshay-agarwal-17 commented 1 year ago

@pbadani could u pls help.

FantFRS commented 8 months ago

@akshay-agarwal-17 did you find a solution? I have the same problem.