Open luckydarnell opened 5 months ago
24/01/05 10:07:16 INFO org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator: Starting the RocketMQSourceEnumerator for consumer group X without periodic partition discovery. 24/01/05 10:07:16 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator: Source Source: s(MQ)- registering reader for parallel task 0 @ 24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer thread:MessageQueue [topic=topicx, brokerName=broker-a, queueId=0] has no committed offset,use Strategy:LATEST instead 24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer queue:broker-a-0 start from offset of: 0 24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer thread:MessageQueue [topic=topicx, brokerName=broker-a, queueId=2] has no committed offset,use Strategy:LATEST instead 24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer queue:broker-a-2 start from offset of: 0 24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer thread:MessageQueue [topic=topicx, brokerName=broker-a, queueId=1] has no committed offset,use Strategy:LATEST instead 24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer queue:broker-a-1 start from offset of: 0 24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer thread:MessageQueue [topic=topicx, brokerName=broker-a, queueId=3] has no committed offset,use Strategy:LATEST instead 24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer queue:broker-a-3 start from offset of: 0 24/01/05 10:07:17 INFO org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator: Assigning splits to readers {0=[[Topic: topicx, Broker: broker-a, Partition: 1, StartingOffset: 0, StoppingTimestamp: 9223372036854775807], [Topic: topicx, Broker: broker-a, Partition: 0, StartingOffset: 0, StoppingTimestamp: 9223372036854775807], [Topic: topicx, Broker: broker-a, Partition: 3, StartingOffset: 0, StoppingTimestamp: 9223372036854775807], [Topic: topicx, Broker: broker-a, Partition: 2, StartingOffset: 0, StoppingTimestamp: 9223372036854775807]]} 24/01/05 10:07:17 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase: Adding split(s) to reader: [[Topic: topicx, Broker: broker-a, Partition: 1, StartingOffset: 0, StoppingTimestamp: 9223372036854775807], [Topic: topicx, Broker: broker-a, Partition: 0, StartingOffset: 0, StoppingTimestamp: 9223372036854775807], [Topic: topicx, Broker: broker-a, Partition: 3, StartingOffset: 0, StoppingTimestamp: 9223372036854775807], [Topic: topicx, Broker: broker-a, Partition: 2, StartingOffset: 0, StoppingTimestamp: 9223372036854775807]] 24/01/05 10:07:17 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase: Reader received NoMoreSplits event. 24/01/05 10:07:17 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher: Starting split fetcher 0
修改死信队列perm值为6,使消息可读
修改死信队列perm值为6,使消息可读 1、没有创建死信队列 2、队列本身的perm值为6 现在的问题是我在idea里面跑是正常,启动集群在集群里面跑就有问题
I have tried several times and found that RocketMQSourceFunction interface can run normally。so I guess there are some bugs in the RocketMQSource interface。
code
public static RocketMQSourceFunction<TransData> createMqsource2(SourceCfg cfg, DataSourceCfg dataSource) {
// 构造mq地址
String[] hostNameSplit = dataSource.getHostname().split(SymbolConstant.SEPARATOR_SEMI_COLON);
String addr = null;
for (String s : hostNameSplit) {
String joinStr = s + SymbolConstant.SEPARATOR_COLON + dataSource.getPort();
addr = StringUtils.isEmpty(addr) ? joinStr : addr + SymbolConstant.SEPARATOR_SEMI_COLON + joinStr;
}
Properties consumerProps = new Properties();
consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, addr);
consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, cfg.getTag());
consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, cfg.getTopic());
KeyValueDeserializationSchema<TransData> deserializationSchema = MqDeserializationSchemaFactory.crateMqDeserializationSchema(cfg);
// 判断启动模式
OffsetResetStrategy startUpMode = cfg.getStartupMode() == null || cfg.getStartupMode().equals(StartupModeEnum.LATEST) ?
OffsetResetStrategy.LATEST : OffsetResetStrategy.EARLIEST;
RocketMQSourceFunction<TransData> source = new RocketMQSourceFunction(deserializationSchema, consumerProps);
// use group offsets.
// If there is no committed offset,consumer would start from the latest offset.
source.setStartFromGroupOffsets(startUpMode);
log.info("addr:{}, group:{}, tag:{}", addr, cfg.getConsumerGroup(), cfg.getTag());
return source;
}
In standalone cluster
code
2024-01-05 13:48:01,579 INFO org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator [] - Starting the RocketMQSourceEnumerator for consumer group X without periodic partition discovery.
2024-01-05 13:48:01,589 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: T_AC_RE_WZ_JJJZ(MQ)- registering reader for parallel task 0 @ 127.0.0.1 2024-01-05 13:48:01,886 INFO RocketmqRemoting [] - closeChannel: close the connection to remote address[10.200.38.118:9876] result: true 2024-01-05 13:48:01,887 INFO RocketmqRemoting [] - closeChannel: close the connection to remote address[10.200.38.118:9876] result: true 2024-01-05 13:48:01,889 INFO RocketmqRemoting [] - closeChannel: close the connection to remote address[10.200.38.118:9876] result: true 2024-01-05 13:48:01,889 INFO RocketmqRemoting [] - closeChannel: close the connection to remote address[10.200.38.118:9876] result: true 2024-01-05 13:48:01,890 INFO RocketmqRemoting [] - closeChannel: close the connection to remote address[10.200.38.118:9876] result: true 2024-01-05 13:48:01,887 ERROR org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext [] - Exception while handling result from async call in SourceCoordinator-Source: S(MQ)-. Triggering job failover. org.apache.flink.util.FlinkRuntimeException: Failed to handle partition splits change due to at org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator.handlePartitionSplitChanges(RocketMQSourceEnumerator.java:279) ~[flink-rocketmq-1.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83) ~[flink-runtime-1.14.6.jar:1.14.6] at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) [flink-core-1.14.6.jar:1.14.6] at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:1.8.0_271] at java.util.concurrent.FutureTask.run(Unknown Source) [?:1.8.0_271] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source) [?:1.8.0_271] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:1.8.0_271] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:1.8.0_271] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:1.8.0_271] at java.lang.Thread.run(Unknown Source) [?:1.8.0271] Caused by: org.apache.rocketmq.client.exception.MQClientException: Can not find Message Queue for this topic_, FPC_SYNC_FI_AC_REPTILE1 See http://rocketmq.apache.org/docs/faq/ for further details. at org.apache.rocketmq.client.impl.MQAdminImpl.fetchSubscribeMessageQueues(MQAdminImpl.java:177) ~[rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.fetchSubscribeMessageQueues(DefaultMQPullConsumerImpl.java:147) ~[rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.client.consumer.DefaultMQPullConsumer.fetchSubscribeMessageQueues(DefaultMQPullConsumer.java:290) ~[rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator.discoverAndInitializePartitionSplit(RocketMQSourceEnumerator.java:248) ~[flink-rocketmq-1.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80) ~[flink-runtime-1.14.6.jar:1.14.6] ... 7 more Caused by: org.apache.rocketmq.remoting.exception.RemotingSendRequestException: send request to <10.200.38.118:9876> failed_ at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeSyncImpl(NettyRemotingAbstract.java:440) ~[rocketmq-remoting-4.9.2.jar:4.9.2] at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:377) ~[rocketmq-remoting-4.9.2.jar:4.9.2] at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1367) ~[rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1357) ~[rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.client.impl.MQAdminImpl.fetchSubscribeMessageQueues(MQAdminImpl.java:166) ~[rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.fetchSubscribeMessageQueues(DefaultMQPullConsumerImpl.java:147) ~[rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.client.consumer.DefaultMQPullConsumer.fetchSubscribeMessageQueues(DefaultMQPullConsumer.java:290) ~[rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator.discoverAndInitializePartitionSplit(RocketMQSourceEnumerator.java:248) ~[flink-rocketmq-1.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80) ~[flink-runtime-1.14.6.jar:1.14.6] ... 7 more