confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
44 stars 1.04k forks source link

ksql-migrations and sequential dependencies and timing issues #8752

Closed davidwmartines2 closed 2 years ago

davidwmartines2 commented 2 years ago

ksql-migrations is a great tool for deploying objects to ksqlDB in a repeatable and predictable way. However, there are some cases where the desired end state (a set of connectors, streams, tables, etc.) cannot be created in a single application of a set of ksql migration scripts.

Consider this scenario:

Waiting a few seconds and re-running the migrations apply will give it time to create the topic, and migration 2 will then succeed.

Ideally, we don't have any failure/retry logic in the automated ksql-migrations process.

This is a request for guidance and/or more detailed documentation on how to handle this type of situation.

suhas-satish commented 2 years ago

Bumping this up as this impacts SD

wicknicks commented 2 years ago

@colinhicks I believe this problem is not specific to migrations. if you run a create stream statement (with key/value schema), followed by a create connector statement on the CLI, this will still break the connector once it starts producing records to that topic. the connector failure would be because the existing schema for that topic (registered by ksqldb on the create stream statement) is not compatible with the records its trying to produce (specifically, the connector might have certain REQUIRED columns while ksql would have registered all columns as OPTIONAL). This will not be an issue if the connector produces data to that topic before the ksqldb stream is created (because ksqldb's schema is backward compatible with the one registered by Connect -- required fields can be upgraded to optional).

overall, its a race condition between the connector and ksqldb trying to register schemas on the same subject in SR. the connector followed-by ksqldb situation works fine, but ksqldb followed-by connect will fail for the above reason.

Overall, I'm wondering if this requires ksqldb should support the complete Connect schema spec. Right now, Connect and ksqldb speak slightly different "schemas" and that seems to create friction in such ways.

guozhangwang commented 2 years ago

I had a quick chat with @wicknicks about this issue today offline, and aggregating ksql team's own discussions about this:

  1. I think the general problem we should tackle here is how ksql and connect should synchronize on common metadata -- at the moment this "common metadata" is primarily SR. Today ksql and connect do not interact with each other about this common metadata, e.g. quote @apurvam : Previous attempts to keep the metastore consistent will not apply (since this metastore is only within ksql)... maybe the criteria for CREATE CONNECTOR returning should include some sort of healthcheck, ie. the connector is actually functioning. Connect must have an API to check this.
  2. More concretely, we need to decide:
    • Should we enforce the ordering of statements being executed between connect and ksql. E.g. should we enforce that CREATE CONNECTOR be always executed before CREATE STREAM that source the topic the former statement is creating.
    • If yes, should that ordering be enforced at the SR service.
colinhicks commented 2 years ago

Thanks @guozhangwang and @wicknicks!

Should we enforce the ordering of statements being executed between connect and ksql.

Above all, the behavior should be deterministic and we should systematically prevent timing issues that yield processing-time issues. Ideally ksql returns immediate feedback when there's a problem: If I submit problematic statements, I should get an error in the response rather than having to look at either the processing log or connector task errors.

Statement ordering is indeed very important with regard to CREATE SOURCE CONNECTOR and CREATE STREAM where a common topic is shared. But I am not sure we should enforce only a single ordering. Both these workflows seem valid for example:

Even though our instinct may be that the first example is more common in practice, I'd like to strongly consider options that support leaving the relevant statement ordering up to the user. Another way to look at this is to ask if rather than leaving enforcement up to the SR and/or connect services: What if we introduced some additional semantics and made enforcement the user's responsibility?

Here's a sketch of new syntax I imagine might support the first example above:

CREATE SOURCE CONNECTOR my_source WITH ('kafka.topic'='source-input', 'output.format'='avro', ...);

ASSERT EXISTS TOPIC 'source-input' TIMEOUT 3 MINUTES;
ASSERT EXISTS SCHEMA 'source-input-value' TIMEOUT 3 MINUTES;

CREATE STREAM source_input WITH (kafka_topic='source-input', format='avro');

And the second example:

CREATE STREAM source_input (`col1` VARCHAR, `col2` BIGINT)
  WITH (kafka_topic='source-input', format='avro', partitions=6);

-- other statements that build out an ETL pipeline ...

ASSERT EXISTS TOPIC 'source-input' TIMEOUT 15 SECONDS;
ASSERT EXISTS SCHEMA 'source-input-value' TIMEOUT 15 SECONDS;

CREATE SOURCE CONNECTOR my_source WITH ('kafka.topic'='source-input', 'output.format'='avro', ...);

The idea is the ASSERT EXIST statements would poll using the ksql server's instance of the admin client and SR client respectively. If the client gets a valid response within the timeout, the execution proceeds, if not, no further statements are executed and an error is returned to the requestor.

Would love to get some feedback on this sketch. At the expense of introducing new syntax, it seems to offer more expressibility to users while helping maintain determinism. It would also appear to require fewer if any interdependencies with SR and connect.

davidwmartines2 commented 2 years ago

@colinhicks The ASSERT EXIST ... TIMEOUT is the approach that I was also thinking about. As an end-user without much insight into the internals, it seems intuitive.

colinhicks commented 2 years ago

Thanks for the feedback, @davidwmartines2! I wonder if you have a take on this as well: In the context of migrations, I wonder if these assert statements also offer a pattern for establishing a baseline state for an environment.

I'm imagining a migration file with only ASSERT EXIST ... statements that is required ahead of one or more subsequent migrations that apply DDL statements based on the constraints in the preceding migration. I think this pattern could afford clearer error messages and reduce the chance of partially-applied migrations.

-- migrations/V000001__check-prerequisites.sql

-- ensure expected topics already exist in Kafka
-- leave out the TIMEOUT clause to use a default duration
ASSERT EXIST TOPIC 'my-topic-1';
ASSERT EXIST TOPIC 'my-topic-1' ;
-- <other topic assertions> ...
ASSERT EXIST TOPIC 'my-topic-5';

-- we expect that a schema is already registered for my-topic-5's key and value
ASSERT EXIST SCHEMA 'my-topic-5-key';
ASSERT EXIST SCHEMA 'my-topic-5-value';
-- migrations/V000002__create-streams.sql

CREATE STREAM my_stream_1 (foo VARCHAR) WITH (kafka_topic='my-topic-1', value_format='json');
-- <other CS statements> ...
CREATE STREAM my_stream_5 WITH (kafka_topic='my-topic-5', format='protobuf');

An alternative pattern would be to include the specific assertion and relevant DDL statement in individual migrations:

-- migrations/V000001__create-stream-1.sql
ASSERT EXIST TOPIC 'my-topic-1';
CREATE STREAM my_stream_1 (foo VARCHAR) WITH (kafka_topic='my-topic-1', value_format='json');
-- migrations/V000005__create-stream-5.sql
ASSERT EXIST TOPIC 'my-topic-5';
ASSERT EXIST SCHEMA 'my-topic-5-key';
ASSERT EXIST SCHEMA 'my-topic-5-value';
CREATE STREAM my_stream_5 WITH (kafka_topic='my-topic-5', format='protobuf');
spena commented 2 years ago

@colinhicks Another alternative is to use a similar approach of Snowflake with WAIT_FOR_COMPLETION. It is used to block the ALTER WAREHOUSE until the resize work has completed (https://docs.snowflake.com/en/sql-reference/sql/alter-warehouse.html).

We can implement it with the CREATE CONNECTOR like this:

CREATE SOURCE CONNECTOR my_source WITH (
   'kafka.topic'='source-input', 
   'output.format'='avro', 
   'wait_for_completition'='true', 
   ...);

CREATE STREAM source_input WITH (kafka_topic='source-input', format='avro');

The CREATE SOURCE CONNECTOR will block until all topics and SR schemas are created. We could add a TIMEOUT to the CREATE CONNECTOR in case something fails. Seems simpler than adding several ASSERT statements. What you think?

davidwmartines2 commented 2 years ago

@colinhicks I like the explicitness of the ASSERTs. That is a nice pattern, and it makes it clear what the expected baseline state is. I think this has potential for usage outside of the connector scenario.

It does require that the user would need to know what to assert. In the case of schemas, it might not be clear what the names of them are.

@spena It seems the wait_for_completion would do assertion logic behind the scenes, which is a little more user-friendly. Would it be able to determine the objects and names to check for?

wicknicks commented 2 years ago

I like the explicitness of the EXIST TOPIC|STREAM|TABLE statements. btw, for SCHEMA, are we referring to a subject in schema registry or a schema entry in the ksql metastore?

@spena wait_for_completion sounds like its waiting for connector creation? the topic itself might take much longer to be created and written to. also, we might not want it as a connector config in case it conflicts with any connector's config property.

guozhangwang commented 2 years ago

I also like the ASSERT EXIST TOPIC|STREAM|TABLE|SCHEMA approach (and I think for EXIST SCHEMA we should check in the SR) since it's more flexible to work as a sync barrier between different composing modules, than enforcing a wait for completion which is more like a heavy synchronized block to me.

danilomr12 commented 2 years ago

Just want to add some inputs. Sometimes it would take too much time for the connector to create the topics. So, I'm not sure if the ASSERT option is always a good one. Here I'm using the source connector to take data from two MSSQL tables and for each the connector creates a topic. As the first table it takes has about 400.000 rows and the database is located outside the Kafka cluster it sometimes takes a lot of time. Would it be weird if the connector validates and creates all topics needed before starting ingesting data?

wicknicks commented 2 years ago

@danilomr12 the names of the topics that a source connector may produce to are not known at connector creation time. this completely depends on how a connector is implemented. the final topics may also change if certain SMTs are used (for example, a RegexRouter may add prefixes or suffixes to topic names effectively writing records over to a completely different topic than what the connector intended).

in your example, would it be possible to run two connectors instead of one to start seeing data in topics quickly?

danilomr12 commented 2 years ago

@wicknicks You're right, I use some SMT too. That's one option I have, it won't solve completely the problem of topic creation delay but will be so much faster. While this solution is not ready I'll test some workarounds.

suhas-satish commented 2 years ago

@jzaralim , can we close this issue as resolved?

jzaralim commented 2 years ago

Yup! The ASSERT TOPIC/SCHEMA syntax will be out in version 0.27