ClickHouse / clickhouse-kafka-connect

ClickHouse Kafka Connector
Apache License 2.0
154 stars 43 forks source link

Single steam with multiple destination dbs #41

Closed kapopken closed 8 months ago

kapopken commented 1 year ago

We have a multi-tenant environment where certain events are streamed along the same topic but need to be sinked into different databases. Example events

{
  "userName": "someUser",
  "tenantId": "tenantA"
}

we are required to store all tenant information into their own database. We would like to do this dynamically and without having to create a new connector per tenant to reduce onboarding of new tenants and maintainence.

mzitnik commented 1 year ago

Just verifying A single cluster with different databases. [database_name].[table_name] = {tenantId}.{topic_name}

kapopken commented 1 year ago

Yes it is a replicated cluster.

abhishekgahlot2 commented 9 months ago

@mzitnik is there a performance bottleneck if i patch the repo and connect to multiple databases for different topics but same clickhouse instance?

mzitnik commented 9 months ago

@abhishekgahlot2 There should not be a bottleneck. Why do need it? (for multi-tenet) If it's only a few of them I would recommend running several instances of connect.

abhishekgahlot2 commented 9 months ago

@mzitnik yeah i am thinking topics.regex should be fine for wildcard table but database could be in thousands so spinning up connectors could take time.

mzitnik commented 9 months ago

We need to think about the best way to implement it since we will also need to open many clients. What is the urgency for it?

abhishekgahlot2 commented 9 months ago

Actually we need it urgently for a usecase where customer database can have isolated events coming from multiple dynamic topics.

for now dynamic topic to table name mapping i am thinking is possible via topics.regex lets via topics.regex=customers_.*

and publish to customers-adam, or customer-abhi without explicitly specifying in connector config let me know if this is wrong.

For database connect in codebase i see most of the clickhouse db connection builder string uses database as a parameter i am wondering if for pushing data in the end not when verifying schema we can get it dynamically similarly like topics.regex too. wdyt?

however guide on building project locally to patch up the library would be really helpful

mzitnik commented 9 months ago

Can you provide a real example of the data and mapping? Do you have multiple topics, or the tenant should extracted from the event ?

abhishekgahlot2 commented 9 months ago

we have lambdas that will push to msk they will have information about customer and organization. so a payload might look like on high level

{
    "organization": "abc",
    "event_root": "monitor_logs",
    "data.a": string,
    "data.b": number,
    ....
}

From lambda i know which topic to insert into which is monitor_logs but organization abc will be the database in clickhouse that will preserve the events and we will also have acl and isolated events for each organization.

abhishekgahlot2 commented 9 months ago

Basically first two fields organization and event_root is defining the location of clickhouse destination which is database name and topic name.

mzitnik commented 9 months ago

The way we planned to implement it, you should run a transformation that extracts {organization}.{event_root}, and in connect, we should push it to different databases. This is actually creating a virtual topic in Kafka Connect. Another question is every tenet has multiple tables (the table name is extracted from event_root)

abhishekgahlot2 commented 9 months ago

Yeah every tenant has multiple tables so database structure might look like

abc (organisation)

  • monitor_logs
  • cpu_logs
  • app_logs
  • appa_logs
  • appb_logs

Notice these names are dynamic depending on event, but before any event is generated we will have table and schema ready in clickhouse so not a problem with mismatching of schema of json and clickhouse table not existing.

I checked extract topic transformation but doesn't look like it's there for MSK or apache only developed for confluent right now.

abhishekgahlot2 commented 9 months ago

I am trying to extract the topic and tenant from the record herein

private void doInsert(List<Record> records, RangeContainer rangeContainer) {
        if (records == null || records.isEmpty()) {
            LOGGER.info("doInsert - No records to insert.");
            return;
        }
        QueryIdentifier queryId = new QueryIdentifier(records.get(0).getRecordOffsetContainer().getTopic(), records.get(0).getRecordOffsetContainer().getPartition(),
                rangeContainer.getMinOffset(), rangeContainer.getMaxOffset(),
                UUID.randomUUID().toString());
    }

and in ClickhouseWriter have something like

private static ClickHouseHelperClient createClient(String database) {
        ClickHouseHelperClient chc = new ClickHouseHelperClient.ClickHouseClientBuilder
                .setDatabase(database)
                .build();
        return chc;
    }

Do you think this could be a viable solution i see records in doInsert are batch records so not sure how to combine them.

abhishekgahlot2 commented 9 months ago

I am also thinking an alternative strategy to above code is to not extract record but use topic name like organisation-name/table-name and then split while writing to clickhouse to avoid writing more logic. so topic name will look like abc/monitor-logs pointing to abc database and monitor-logs table.

mzitnik commented 9 months ago

The only thing that I am missing here is how to set the db on the client side. It looks like it is provided using the URL. So I need to check how it is done on the demand.

abhishekgahlot2 commented 9 months ago

I shared some approach here: https://github.com/ClickHouse/clickhouse-kafka-connect/discussions/322 build and tested on confluent. i am able to push to multiple database but now i am worried about too many topics. so trying using key rather than extracting database or table from topic.

mzitnik commented 9 months ago

if you are using https://docs.confluent.io/platform/current/connect/transforms/extracttopic.html#extracttopic you can extract {database}.{table} from the key it will create a virtual topic and than before insert you need to split by . a set database and table

abhishekgahlot2 commented 9 months ago

Yeah I understand, but my infra is aws i am using confluent for testing for me its very fast to upload jar and get it running, on aws it seems like 15 minutes or so before it spins off connector with updates. :)

mzitnik commented 9 months ago

Just to verify, do you currently have a working solution?

abhishekgahlot2 commented 9 months ago

hacky solution But workable (need to clean it up) that modifies the mutation request and extract database and table from topic but won’t scale because having both db and table name in topic means too many topic for cluster to handle right?

We hit the limit of 6k partitions because of this.

abhishekgahlot2 commented 9 months ago

hi @mzitnik I see new code for multi-database support in the connector does it mean I can use any database and any table as a part of the connector now using topic with regex?

mzitnik commented 9 months ago

@abhishekgahlot2 Yes, we have developed it recently. We have yet to manage to write documentation. There are actually two configs: one that enables the feature and the other that sets the separator. It would be best if you used https://docs.confluent.io/platform/current/connect/transforms/extracttopic.html#extracttopic to extract db/table name according to a field from your content and build a virtual topic.

abhishekgahlot2 commented 8 months ago

Oh, are the new changes using extract topic configuration and is only supported for confluent?

Paultagoras commented 8 months ago

Oh, are the new changes using extract topic configuration and is only supported for confluent?

Strictly speaking we parse the topic to get the necessary parameters (see this code for an example of where we parse it) - the recommendation to use ExtractTopic is because it allows anyone to set the topic (without changing anything on Kafka) but I think it's supported beyond just Confluent (it would depend on where you're trying to use the connector).

mzitnik commented 8 months ago

@abhishekgahlot2 are you running the sink using confluent platform?

abhishekgahlot2 commented 8 months ago

No, I am using Kafka on AWS. but I patched the library to use keys and made it work. so key for table and topics for the database using topics.regex. I believe this avoids making too many topics for me.

And rest of the code looks very similar to your PR, patching the mutation request and creating a new builder for clickhouse database.

Map<String, List<Record>> dataRecords = records.stream()
                .map(v -> Record.convert(v))
                .filter(v -> v.getKafkaKey() != null)
                .collect(Collectors.groupingBy(Record::getKafkaKey));

        statistics.recordProcessingTime(processingTime);
        for (String topicAndPartition : dataRecords.keySet()) {
            // Running on each kafka key.
            List<Record> rec = dataRecords.get(topicAndPartition);
            processing.doLogic(rec);
        }

I think my use case is solved by this, I am not creating too many topics, not making Kafka unstable and able to achieve multitenancy. Sometimes though data takes 1-2 minutes to be visible in clickhouse initially but when it visible first time after that its almost instant for upcoming records.

mzitnik commented 8 months ago

@abhishekgahlot2 Sorry for looping back on this. As I understand, you are using AWS MSK and running the clickhouse-kafka-sink and. it looks like we can not run Confluent transformation there (Need to verify) The only concern with the approach you took when using the key is that this can imbalance the number of messages on each partition.

abhishekgahlot2 commented 8 months ago

Thanks, @mzitnik yes I suspected that. That is why I am thinking of hashing the keys since I don't need an ordering guarantee at least for clickhouse. however, for other topics not related to Clickhouse, I might need ordering.

However looks like hashing isn't the perfect solution for my use case.

mzitnik commented 8 months ago

We are checking the possibility of deploying a transformation with clickhouse-kafka-connect to see if it will run on MSK as expected. Thanks to @Paultagoras, we will have answers soon.

Paultagoras commented 8 months ago

@abhishekgahlot2 Great news in that we were able to make use of the necessary transforms on MSK with a slightly modified jar file. We need to make sure we can include the necessary source code (license and all that) but should have it included in 1.0.15 😄

abhishekgahlot2 commented 8 months ago

@Paultagoras this is awesome thanks