timeplus-io / proton

A streaming SQL engine, a fast and lightweight alternative to ksqlDB and Apache Flink, 🚀 powered by ClickHouse.
https://timeplus.com
Apache License 2.0
1.49k stars 62 forks source link

The result of tow changelog streaming join can not be written into Kafka topic because the message size is too large. #375

Closed ilselintp closed 9 months ago

ilselintp commented 9 months ago

I create a MATERIALIZED VIEW to get the results of tow changelog streaming join and write the results into kafka. But it can not be written into Kafka topic because the Message size of proton is too large and the mv also can not run. The rows of EDC_ITEM_TABLE is 3000 and the rows of EDC_CRF_DATA_TIME_TABLE is 600. Here are the two streams: Here is the streams information:

CREATE stream EDC_ITEM_TABLE
(
    `projectId` string,
    `envId` string,
    `envSiteId` string,
    `subjectId` string,
    `mandatory` int32,
    `filled` int32,
    `repeated` int32,
    `active` int32,
    `name` string,
    `itemOid` string,
    `updateTime` datetime64(3),
    `raw` string
);

CREATE stream EDC_CRF_DATA_TIME_TABLE
                                (
                                    `projectId` string,
                                    `envId` string,
                                    `subjectId` string,
                                    `updateTime` datetime64(3),
                                    `raw` string
                                )
                                PRIMARY KEY (projectId, envId, subjectId)
                                SETTINGS mode = 'versioned_kv';

CREATE STREAM default.EDC_ITEM_ACTIVE
(`projectId` string, `envId` string, `subjectId` string, `name` string, `itemOid` string, `flag` bool, `updateTime` datetime64(3), `raw` string
)
ENGINE = ExternalStream
SETTINGS type = 'kafka', brokers = '192.168.31.165:9092,192.168.31.165:9094,192.168.31.165:9096', topic = 'test_result', data_format = 'JSONEachRow’;

— failed query:
CREATE MATERIALIZED VIEW etl_data_combine INTO default.EDC_ITEM_ACTIVE AS
SELECT
  projectId, envId, subjectId, name, itemOid, (mandatory = 1) AND (active = 1) AND (filled = 0) AS flag, updateTime, raw
FROM
  (
  SELECT
    t1.projectId, t1.envId, t1.subjectId, t1.name, t1.itemOid, t1.mandatory, t1.active, t1.filled, t1.raw, t1.updateTime
  FROM
    changelog(EDC_ITEM_TABLE, projectId, envId, subjectId, itemOid) AS t1
  SETTINGS
    seek_to = 'earliest'
) AS temp
INNER JOIN changelog(EDC_CRF_DATA_TIME_TABLE) AS t2 ON (temp.projectId = t2.projectId) AND (temp.envId = t2.envId) AND (temp.subjectId = t2.subjectId) AND (temp.updateTime = t2.updateTime) AND lag_behind(1ms, t2.updateTime, temp.updateTime)

Error.log: 2023.12.03 04:39:06.906057 [ 758 ] {} StorageMaterializedView (default.etl_data_combine): CANNOT_WRITE_TO_KAFKA: Wait for 39th recovering background pipeline of matierialized view 'default.etl_data_combine'. Background runtime error: Cannot write to kafka topic at offset 2097152, error: Broker: Message size too large (Background status: EXECUTING_PIPELINE) 2023.12.03 04:39:12.930590 [ 758 ] {} StorageMaterializedView (default.etl_data_combine): CANNOT_WRITE_TO_KAFKA: Wait for 40th recovering background pipeline of matierialized view 'default.etl_data_combine'. Background runtime error: Cannot write to kafka topic at offset 2097152, error: Broker: Message size too large (Background status: EXECUTING_PIPELINE) 2023.12.03 04:39:18.954028 [ 758 ] {} StorageMaterializedView (default.etl_data_combine): CANNOT_WRITE_TO_KAFKA: Wait for 41th recovering background pipeline of matierialized view 'default.etl_data_combine'. Background runtime error: Cannot write to kafka topic at offset 2097152, error: Broker: Message size too large (Background status: EXECUTING_PIPELINE) 2023.12.03 04:39:24.975121 [ 758 ] {} StorageMaterializedView (default.etl_data_combine): CANNOT_WRITE_TO_KAFKA: Wait for 42th recovering background pipeline of matierialized view 'default.etl_data_combine'. Background runtime error: Cannot write to kafka topic at offset 2097152, error: Broker: Message size too large (Background status: EXECUTING_PIPELINE) 2023.12.03 04:39:30.989224 [ 758 ] {} StorageMaterializedView (default.etl_data_combine): CANNOT_WRITE_TO_KAFKA: Wait for 43th recovering background pipeline of matierialized view 'default.etl_data_combine'. Background runtime error: Cannot write to kafka topic at offset 2097152, error: Broker: Message size too large (Background status: EXECUTING_PIPELINE) 2023.12.03 04:39:37.004406 [ 758 ] {} StorageMaterializedView (default.etl_data_combine): CANNOT_WRITE_TO_KAFKA: Wait for 44th recovering background pipeline of matierialized view 'default.etl_data_combine'. Background runtime error: Cannot write to kafka topic at offset 2097152, error: Broker: Message size too large (Background status: EXECUTING_PIPELINE) 2023.12.03 04:39:43.018830 [ 758 ] {} StorageMaterializedView (default.etl_data_combine): CANNOT_WRITE_TO_KAFKA: Wait for 45th recovering background pipeline of matierialized view 'default.etl_data_combine'. Background runtime error: Cannot write to kafka topic at offset 2097152, error: Broker: Message size too large (Background status: EXECUTING_PIPELINE) 2023.12.03 04:39:49.040310 [ 758 ] {} StorageMaterializedView (default.etl_data_combine): CANNOT_WRITE_TO_KAFKA: Wait for 46th recovering background pipeline of matierialized view 'default.etl_data_combine'. Background runtime error: Cannot write to kafka topic at offset 2097152, error: Broker: Message size too large (Background status: EXECUTING_PIPELINE) 2023.12.03 04:39:55.066375 [ 758 ] {} StorageMaterializedView (default.etl_data_combine): CANNOT_WRITE_TO_KAFKA: Wait for 47th recovering background pipeline of matierialized view 'default.etl_data_combine'. Background runtime error: Cannot write to kafka topic at offset 2097152, error: Broker: Message size too large (Background status: EXECUTING_PIPELINE) 2023.12.03 04:40:01.087715 [ 758 ] {} StorageMaterializedView (default.etl_data_combine): CANNOT_WRITE_TO_KAFKA: Wait for 48th recovering background pipeline of matierialized view 'default.etl_data_combine'. Background runtime error: Cannot write to kafka topic at offset 2097152, error: Broker: Message size too large (Background status: EXECUTING_PIPELINE) 2023.12.03 04:40:07.124643 [ 758 ] {} StorageMaterializedView (default.etl_data_combine): CANNOT_WRITE_TO_KAFKA: Wait for 49th recovering background pipeline of matierialized view 'default.etl_data_combine'. Background runtime error: Cannot write to kafka topic at offset 2097152, error: Broker: Message size too large (Background status: EXECUTING_PIPELINE) 2023.12.03 04:40:13.175475 [ 758 ] {} StorageMaterializedView (default.etl_data_combine): CANNOT_WRITE_TO_KAFKA: Wait for 50th recovering background pipeline of matierialized view 'default.etl_data_combine'. Background runtime error: Cannot write to kafka topic at offset 2097152, error: Broker: Message size too large (Background status: EXECUTING_PIPELINE)

Kafka settings: max.message.bytes 10240000

Additional context

image image
zliang-min commented 9 months ago

Please check https://github.com/timeplus-io/proton/issues/335, you will need to set the message.max.bytes or use on_message_per_row setting.

And in your case, you probably need to set one_message_per_row to true to make it work. This is not a bug, this is how it works, because without on_message_per_row the whole chunk will be sent as one single message (because you have only one partition).