StarRocks / starrocks

The world's fastest open query engine for sub-second analytics both on and off the data lakehouse. With the flexibility to support nearly any scenario, StarRocks provides best-in-class performance for multi-dimensional analytics, real-time analytics, and ad-hoc queries. A Linux Foundation project.
https://starrocks.io
Apache License 2.0
9.17k stars 1.82k forks source link

Setting up Flink/Flink CDC to Starrocks sink fails using yaml file #51599

Open anand-siva opened 1 month ago

anand-siva commented 1 month ago

Steps to reproduce the behavior (Required)

On this page, it says how to use Flink for CDC https://docs.starrocks.io/docs/loading/Flink-connector-starrocks/

For quick start, see Streaming ELT from MySQL to StarRocks using Flink CDC 3.0 with StarRocks Pipeline Connector.

  1. Download Flink 1.18.0
  2. Update conf/flink-conf.yaml and add execution.checkpointing.interval: 3000. Then start cluster.
    
    cd flink-1.18.0
    echo "execution.checkpointing.interval: 3000" >> conf/flink-conf.yaml
    ./bin/start-cluster.sh

Starting cluster. Starting standalonesession daemon on host Anands-MacBook-Pro.local. Starting taskexecutor daemon on host Anands-MacBook-Pro.local.

3. Create docker compose file for Starrocks and MySQL. And start docker instance.

docker-compose.yml 

version: '2.1' services: StarRocks: image: starrocks/allin1-ubuntu:3.2.6 ports:

docker compose up -d

  1. Connect to MySQL and create table and data

docker-compose exec MySQL mysql -uroot -p123456

-- create database
CREATE DATABASE app_db;

USE app_db;

-- create orders table
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);

-- insert records
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);

-- create shipments table
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);

-- insert records
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');

-- create products table
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);

-- insert records
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
  1. Download flink cdc 3.1.0 - flink-cdc-3.1.0-bin.tar.gz

Also extract the directory

wget https://www.apache.org/dyn/closer.lua/flink/flink-cdc-3.1.0/flink-cdc-3.1.0-bin.tar.gz
tar -xvf flink-cdc-3.1.0-bin.tar.gz
  1. Download the connector package for mysql and starrocks pipeline and put the in the flink-cdc bin directory
wget https://search.maven.org/remotecontent?filepath=org/apache/flink/flink-cdc-pipeline-connector-mysql/3.1.0/flink-cdc-pipeline-connector-mysql-3.1.0.jar
wget wget https://search.maven.org/remotecontent\?filepath\=org/apache/flink/flink-cdc-pipeline-connector-starrocks/3.1.0/flink-cdc-pipeline-connector-starrocks-3.1.0.jar

mv *flink-cdc-pipeline-connector-mysql-3.1.0.jar flink-cdc-3.1.0/lib
mv *flink-cdc-pipeline-connector-starrocks-3.1.0.jar flink-cdc-3.1.0/lib
  1. You also need to place MySQL connector into Flink lib folder and bounce Flink cluster
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar
mv mysql-connector-java-8.0.27.jar flink-1.18.0/lib
./flink-1.18.0/bin/start-cluster.sh
  1. Create yaml file for CDC changes vi flink-cdc-3.1.0/mysql-to-starrocks.yaml
    
    ################################################################################
    # Description: Sync MySQL all tables to StarRocks
    ################################################################################
    source:
    type: mysql
    hostname: localhost
    port: 3306
    username: root
    password: 123456
    tables: app_db.\.*
    server-id: 5400-5404
    server-time-zone: UTC

sink: type: starrocks name: StarRocks Sink jdbc-url: jdbc:mysql://127.0.0.1:9030 load-url: 127.0.0.1:8080 username: root password: "" table.create.properties.replication_num: 1

pipeline: name: Sync MySQL Database to StarRocks parallelism: 2

9. Run the cdc bash script against the flink home

export FLINK_HOME=flink-1.18.0 bash flink-cdc-3.1.0/bin/flink-cdc.sh flink-cdc-3.1.0/mysql-to-starrocks.yaml


### Expected behavior (Required)

The Flink CDCs job starts running and starts to sync mysql to starrocks docker containers

### Real behavior (Required)

The script errors out with this error message 

Loading class com.mysql.jdbc.Driver'. This is deprecated. The new driver class iscom.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary. Exception in thread "main" org.apache.flink.util.FlinkException: Failed to execute job 'Sync MySQL Database to StarRocks'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2253) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2219) at org.apache.flink.cdc.composer.flink.FlinkPipelineExecution.execute(FlinkPipelineExecution.java:43) at org.apache.flink.cdc.cli.CliExecutor.run(CliExecutor.java:74) at org.apache.flink.cdc.cli.CliFrontend.main(CliFrontend.java:71) Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) at java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:479) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: Flink CDC Event Source: mysql -> SchemaOperator -> PrePartition at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) ... 3 more Caused by: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: Flink CDC Event Source: mysql -> SchemaOperator -> PrePartition at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ... 3 more Caused by: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: Flink CDC Event Source: mysql -> SchemaOperator -> PrePartition at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:234) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:894) at org.apache.flink.runtime.executiongraph.ExecutionGraph.initializeJobVertex(ExecutionGraph.java:224) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:875) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:829) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:221) at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:163) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:371) at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:214) at org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:140) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:156) at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379) at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:356) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ... 4 more Caused by: java.lang.ClassCastException: cannot assign instance of com.starrocks.connector.flink.catalog.StarRocksCatalog to field org.apache.flink.cdc.connectors.starrocks.sink.StarRocksMetadataApplier.catalog of type com.starrocks.connector.flink.catalog.StarRocksCatalog in instance of org.apache.flink.cdc.connectors.starrocks.sink.StarRocksMetadataApplier at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076) at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039) at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293) at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527) at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67) at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:477) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:292) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:225) ... 20 more



### StarRocks version (Required)

+-------------------+
| current_version() |
+-------------------+
| 3.2.6-2585333     |
+-------------------+
prclin commented 1 month ago

同问,试了所有3.0.1以上版本,都会报这个错误