apache / doris-flink-connector

Flink Connector for Apache Doris
https://doris.apache.org/
Apache License 2.0
330 stars 226 forks source link

[Bug]charset=null #492

Open grainlin opened 1 month ago

grainlin commented 1 month ago

Search before asking

Version

flink-doris-connector-1.17-24.0.0.jar doris-2.1.0-rc11-91efb6a43d

What's Wrong?

2024-09-23 18:29:10,664 DEBUG org.apache.doris.flink.sink.BackendUtil [] - try to connect host 172.21.0.2:8030 2024-09-23 18:29:11,146 DEBUG org.apache.http.client.protocol.RequestAddCookies [] - CookieSpec selected: default 2024-09-23 18:29:11,155 DEBUG org.apache.http.client.protocol.RequestAuthCache [] - Auth cache not set in the context 2024-09-23 18:29:11,156 DEBUG org.apache.http.impl.conn.PoolingHttpClientConnectionManager [] - Connection request: [route: {}->http://172.21.0.2:8030][total available: 0; route allocated: 0 of 2; total allocated: 0 of 20] 2024-09-23 18:29:11,166 DEBUG org.apache.http.impl.conn.PoolingHttpClientConnectionManager [] - Connection leased: [id: 0][route: {}->http://172.21.0.2:8030][total available: 0; route allocated: 1 of 2; total allocated: 1 of 20] 2024-09-23 18:29:11,168 DEBUG org.apache.http.impl.execchain.MainClientExec [] - Opening connection {}->http://172.21.0.2:8030 2024-09-23 18:29:11,169 DEBUG org.apache.http.impl.conn.DefaultHttpClientConnectionOperator [] - Connecting to /172.21.0.2:8030 2024-09-23 18:29:11,170 DEBUG org.apache.http.impl.conn.DefaultHttpClientConnectionOperator [] - Connection established 172.21.0.6:49680<->172.21.0.2:8030 2024-09-23 18:29:11,170 DEBUG org.apache.http.impl.execchain.MainClientExec [] - Executing request POST /api/query/default_cluster/information_schema HTTP/1.1 2024-09-23 18:29:11,170 DEBUG org.apache.http.impl.execchain.MainClientExec [] - Proxy auth state: UNCHALLENGED 2024-09-23 18:29:11,171 DEBUG org.apache.http.headers [] - http-outgoing-0 >> POST /api/query/default_cluster/information_schema HTTP/1.1 2024-09-23 18:29:11,171 DEBUG org.apache.http.headers [] - http-outgoing-0 >> Authorization: Basic cm9vdDpkb3JpczEyMzQ= 2024-09-23 18:29:11,172 DEBUG org.apache.http.headers [] - http-outgoing-0 >> Content-Type: application/json;charset=null 2024-09-23 18:29:11,172 DEBUG org.apache.http.headers [] - http-outgoing-0 >> Content-Length: 99 2024-09-23 18:29:11,172 DEBUG org.apache.http.headers [] - http-outgoing-0 >> Host: 172.21.0.2:8030 2024-09-23 18:29:11,172 DEBUG org.apache.http.headers [] - http-outgoing-0 >> Connection: Keep-Alive 2024-09-23 18:29:11,172 DEBUG org.apache.http.headers [] - http-outgoing-0 >> User-Agent: Apache-HttpClient/4.5.13 (Java/1.8.0_422) 2024-09-23 18:29:11,172 DEBUG org.apache.http.headers [] - http-outgoing-0 >> Accept-Encoding: gzip,deflate 2024-09-23 18:29:11,172 DEBUG org.apache.http.wire [] - http-outgoing-0 >> "POST /api/query/default_cluster/information_schema HTTP/1.1[\r][\n]" 2024-09-23 18:29:11,172 DEBUG org.apache.http.wire [] - http-outgoing-0 >> "Authorization: Basic cm9vdDpkb3JpczEyMzQ=[\r][\n]" 2024-09-23 18:29:11,172 DEBUG org.apache.http.wire [] - http-outgoing-0 >> "Content-Type: application/json;charset=null[\r][\n]" 2024-09-23 18:29:11,172 DEBUG org.apache.http.wire [] - http-outgoing-0 >> "Content-Length: 99[\r][\n]" 2024-09-23 18:29:11,172 DEBUG org.apache.http.wire [] - http-outgoing-0 >> "Host: 172.21.0.2:8030[\r][\n]" 2024-09-23 18:29:11,172 DEBUG org.apache.http.wire [] - http-outgoing-0 >> "Connection: Keep-Alive[\r][\n]" 2024-09-23 18:29:11,172 DEBUG org.apache.http.wire [] - http-outgoing-0 >> "User-Agent: Apache-HttpClient/4.5.13 (Java/1.8.0_422)[\r][\n]" 2024-09-23 18:29:11,172 DEBUG org.apache.http.wire [] - http-outgoing-0 >> "Accept-Encoding: gzip,deflate[\r][\n]" 2024-09-23 18:29:11,172 DEBUG org.apache.http.wire [] - http-outgoing-0 >> "[\r][\n]"

http-outgoing-0 >> Content-Type: application/json;charset=null

2024-09-23 18:29:11,615 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure. java.lang.RuntimeException: failed to apply schema change. at org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.flushSuccess(SchemaRegistryRequestHandler.java:199) ~[?:?] at org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry.handleEventFromOperator(SchemaRegistry.java:139) ~[?:?] at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.handleEventFromOperator(OperatorCoordinatorHolder.java:200) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:121) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.scheduler.SchedulerBase.deliverOperatorEventToCoordinator(SchedulerBase.java:1040) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:590) ~[flink-dist-1.17.2.jar:1.17.2] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_422] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_422] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_422] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_422] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) ~[flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) ~[flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) ~[flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) ~[flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) [flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at akka.actor.ActorCell.invoke(ActorCell.scala:547) [flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_f27f5634-e303-4504-847c-6b962fa4af70.jar:1.17.2] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_422] at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_422] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_422] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_422] Caused by: java.lang.RuntimeException: Failed to schema change, CreateTableEvent{tableId=security.quota, schema=columns={id INT NOT NULL,l1 VARCHAR(255),l3 VARCHAR(255),type VARCHAR(255),parmas1 INT,parmas2 INT,parmas3 INT,parmas4 INT}, primaryKeys=id, opt ions=()}, reason: Failed to schemaChange, response: {"msg":"Internal Error","code":500,"data":"Invalid mime type \"application/json;charset=null\": unsupported charset 'null'","count":0} at org.apache.flink.cdc.connectors.doris.sink.DorisMetadataApplier.applySchemaChange(DorisMetadataApplier.java:87) ~[?:?] at org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.applySchemaChange(SchemaRegistryRequestHandler.java:108) ~[?:?] at org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.lambda$flushSuccess$0(SchemaRegistryRequestHandler.java:196) ~[?:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_422] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_422] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_422] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_422] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_422] 2024-09-23 18:29:11,653 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - 2 tasks will be restarted to recover from a global failure.

CDC job: SET 'execution.checkpointing.interval' = '30s'; EXECUTE PIPELINE WITHYAML ( source: type: mysql hostname: 172.21.0.8 port: 3306 username: root password: 'xxxxx' tables: security.quota server-id: 5400-5404

sink: type: doris class: com.example.DorisSinkWithContentType

fenodes: 172.21.0.2:8030 benodes: 172.21.0.2:8040 username: root password: 'xxx' table.create.properties.light_schema_change: true table.create.properties.replication_num: 1 pipeline: name: Sync MySQL Database to Doris parallelism: 1 )

What You Expected?

How to set http header charset Content-Type: application/json;charset=null

How to Reproduce?

No response

Anything Else?

-

Are you willing to submit PR?

Code of Conduct

grainlin commented 1 month ago

org.apache.doris.flink.sink.schema.SchemaChangeManager private String charsetEncoding = "UTF-8";

public HttpPost buildHttpPost(String ddl, String database) throws IllegalArgumentException, IOException { Map<String, String> param = new HashMap<>(); param.put("stmt", ddl); String requestUrl = String.format("http://%s/api/query/default_cluster/%s", new Object[] { RestService.randomEndpoint(this.dorisOptions.getFenodes(), LOG), database }); HttpPost httpPost = new HttpPost(requestUrl); httpPost.setHeader("Authorization", authHeader()); httpPost.setHeader("Content-Type",

    String.format("application/json;charset=%s", new Object[] { this.charsetEncoding }));
httpPost.setEntity((HttpEntity)new StringEntity(this.objectMapper
      .writeValueAsString(param), this.charsetEncoding));
return httpPost;

}

don't know why charset Assignment failed