An error occurred during the the beginning to runing th flink pulsar, it shows "exception while polling the records",then the task switched running to failed. #253
PulsarPartitionSplit{partition=persistent://10001001/default/xdr_mss_input_alert-partition-0|0-65535} consumer for current reader.
2022-11-01 23:48:52,138 INFO org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase [] - Register split PulsarPartitionSplit{partition=persistent://10001001/default/xdr_outer_alert-partition-2|0-65535} consumer for current reader.
2022-11-01 23:48:52,141 INFO org.apache.pulsar.client.impl.ConsumerImpl [] - [persistent://10001001/default/xdr_outer_alert-partition-1][NAE] Subscribed to topic on pulsar-broker-1.pulsar-broker.pulsar.svc.cluster.local/10.83.237.69:6650 -- consumer: 2
2022-11-01 23:48:52,141 INFO org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase [] - Register split PulsarPartitionSplit{partition=persistent://10001001/default/xdr_outer_alert-partition-1|0-65535} consumer for current reader.
2022-11-01 23:48:52,221 INFO org.apache.pulsar.client.impl.ConsumerImpl [] - [persistent://10001001/default/xdr_outer_alert-partition-0][NAE] Subscribed to topic on pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/10.83.246.71:6650 -- consumer: 1
2022-11-01 23:48:52,222 INFO org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase [] - Register split PulsarPartitionSplit{partition=persistent://10001001/default/xdr_outer_alert-partition-0|0-65535} consumer for current reader.
2022-11-01 23:48:52,222 INFO org.apache.pulsar.client.impl.ConsumerImpl [] - [persistent://10001001/default/xdr_mss_input_alert-partition-1][NAE] Subscribed to topic on pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/10.83.246.71:6650 -- consumer: 1
2022-11-01 23:48:52,222 INFO org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase [] - Register split PulsarPartitionSplit{partition=persistent://10001001/default/xdr_mss_input_alert-partition-1|0-65535} consumer for current reader.
2022-11-01 23:48:52,223 INFO org.apache.pulsar.client.impl.ConsumerImpl [] - [persistent://10001001/default/xdr_mss_input_alert-partition-2][NAE] Subscribed to topic on pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local/10.83.15.71:6650 -- consumer: 2
2022-11-01 23:48:52,223 INFO org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase [] - Register split PulsarPartitionSplit{partition=persistent://10001001/default/xdr_mss_input_alert-partition-2|0-65535} consumer for current reader.
2022-11-01 23:48:52,220 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) ~[flink-table_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) [flink-table_2.12-1.14.3.jar:1.14.3]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_322]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_322]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_322]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_322]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322]
Caused by: java.lang.IllegalStateException: This split reader have assigned split.
at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.handleSplitsChanges(PulsarPartitionSplitReaderBase.java:164) ~[blob_p-2059b011f88b3e4aa29cedfbca6116da3eb668fb-c54f5c7463240e095b631c73d13d965a:1.0]
at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.handleSplitsChanges(PulsarUnorderedPartitionSplitReader.java:57) ~[blob_p-2059b011f88b3e4aa29cedfbca6116da3eb668fb-c54f5c7463240e095b631c73d13d965a:1.0]
at org.apache.flink.connector.base.source.reade
2022-11-01 23:48:52,324 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
2022-11-01 23:48:52,325 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
2022-11-01 23:48:52,325 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 0
2022-11-01 23:48:52,325 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 0
2022-11-01 23:48:52,325 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 1
2022-11-01 23:48:52,330 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 1
2022-11-01 23:48:52,330 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 2
2022-11-01 23:48:52,330 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 2
2022-11-01 23:48:52,331 INFO com.scurrilous.circe.checksum.Crc32cIntChecksum [] - SSE4.2 CRC32C provider initialized
2022-11-01 23:48:52,334 INFO org.apache.pulsar.client.impl.ConsumerImpl [] - [persistent://10001001/default/xdr_mss_input_alert-partition-0] [NAE] Closed consumer
2022-11-01 23:48:52,335 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 0 exited.
2022-11-01 23:48:52,421 INFO org.apache.pulsar.client.impl.ConnectionPool [] - [[id: 0xab6c92c2, L:/10.83.246.112:47500 - R:pulsar-broker.pulsar.svc.cluster.local/10.83.110.203:6650]] Connected to server
2022-11-01 23:48:52,527 INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl [] - Starting Pulsar producer perf with config: {"topicName":"persistent://10001001/default/xdr_outer_incident","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":true,"maxPendingMessages":10000,"maxPendingMessagesAcrossPartitions":500000,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":100000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":5242880,"batchingEnabled":true,"chunkingEnabled":false,"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
2022-11-01 23:48:52,531 INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl [] - Pulsar client config: {"serviceUrl":"pulsar://pulsar-broker.pulsar:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"useTcpNoDelay":true,"useTls":false,"tlsTrustCertsFilePath":"","tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
2022-11-01 23:48:52,622 INFO org.apache.pulsar.client.impl.ConnectionPool [] - [[id: 0xb5e423a6, L:/10.83.246.112:43872 - R:pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/10.83.246.71:6650]] Connected to server
2022-11-01 23:48:52,623 INFO org.apache.pulsar.client.im
2022-11-01 23:48:52,835 WARN com.sangfor.xdrassociation.io.pulsar.discarder.PulsarDiscarder [] - not discard message finish at publish time 1667317678216
2022-11-01 23:48:52,835 WARN com.sangfor.xdrassociation.io.pulsar.discarder.PulsarDiscarder [] - thread is [Thread[Source Data Fetcher for Source: topic:[persistent://10001001/default/xdr_outer_alert] -> ByteToJSONObject -> GlobalWatermark -> AlertCheckFilter -> (Main: Exclude -> (Common: FilterTargetAlert, Download: FilterEAlert, MinerVirus: ToMinerState, Filter), LogTrace: Report -> LogTrace: Tag) (2/2)#1175,5,Flink Task Threads]], switch is [false]
2022-11-01 23:48:52,919 INFO org.apache.pulsar.client.impl.ConsumerImpl [] - [persistent://10001001/default/xdr_outer_alert-partition-0] [NAE] Closed consumer
2022-11-01 23:48:52,920 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 1 exited.
2022-11-01 23:48:53,235 INFO org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation [] - Finished restoring from state handle: IncrementalRemoteKeyedStateHandle{backendIdentifier=304e4485-2399-4ed2-9e9d-abc0d4f3770f, keyGroupRange=KeyGroupRange{startKeyGroup=0, endKeyGroup=127}, checkpointId=1770, sharedState={003516.sst=ByteStreamStateHandle{handleName='file:/opt/flink/data/checkpoint/nae/10001001/00000000000000000000000000000000/shared/276eb24c-dec4-406c-a678-9518baabf82e', dataBytes=2353}, 002181.sst=ByteStreamStateHandle{handleName='file:/opt/flink/data/checkpoint/nae/10001001/00000000000000000000000000000000/shared/f0ca420b-c1b1-4e3c-8dcf-e1a52f2a6bbc', dataBytes=1115}, 002097.sst=ByteStreamStateHandle{handleName='file:/opt/flink/data/checkpoint/nae/10001001/00000000000000000000000000000000/shared/c33c9097-24cd-44f5-a48b-30ca579feab6', dataBytes=1182}, 002095.sst=ByteStreamStateHandle{handleName='file:/opt/flink/data/checkpoint/nae/10001001/00000000000000000000000000000000/shared/90010c8c-d5ac-41d7-9350-c2a543c51751', dataBytes=1443}, 002056.sst=ByteStreamStateHandle{handleName='file:/opt/flink/data/checkpoint/nae/10001001/00000000000000000000000000000000/shared/11d86aa9-fe38-4fc5-b964-d8b5f0d4f8c6', dataBytes=4811}, 002052.sst=ByteStreamStateHandle{handleName='file:/opt/flink/data/checkpoint/nae/10001001/00000000000000000000000000000000/shared/fcf019c4-9506-405a-89b9-2f17b123c93b', dataBytes=9834}, 002054.sst=File State: file:/opt/flink/data/checkpoint/nae/10001001/00000000000000000000000000000000/shared/356c51c2-1a4a-4eb6-bd2f-22e813b0b584 [208632 bytes], 003519.sst=ByteStreamStateHandle{handleName='file:/opt/flink/data/checkpoint/nae/10001001/00000000000000000000000000000000/shared/913c7ae5-f687-4d75-9d46-257c31b68b01', dataBytes=3241}, 003507.sst=ByteStreamStateHandle{handleName='file:/opt/flink/data/checkpoint/nae/10001001/00000000000000000000000000000000/shared/843e9e5c-271e-4c97-b800-60b0280cdd5b', dataBytes=1583}, 003513.sst=ByteStreamStateHandle{handleName='file:/opt/flink/data/checkpoint/nae/10001001/00000000000000000000000000000000/shared/a652001f-1798-4c7c-b7c4-076a580db25b', dataBytes=1431}, 002178.sst=ByteStreamStateHandle{handleName='file:/opt/flink/data/checkpoint/nae/10001001/00000000000000000000000000000000/shared/ef24d837-135d-4f92-9e54-db13e1be8b58', dataBytes=1458}, 003520.sst=ByteStreamStateHandle{handleName='file:/opt/flink/data/checkpoint/nae/10001001/00000000000000000000000000000000/shared/e70f20b7-f0f4-40d9-839d-c579c6b314c8', dataBytes=2365}, 002098.sst=ByteStreamStateHandle{handleName='file:/opt/flink/data/checkpoint/nae/10001001/00000000000000000000000000000000/shared/78feea72-3d2d-4ec4-aee5-d61a722c07aa', dataBytes=1113}, 002180.sst=ByteStreamStateHandle{handleName='file:/opt/flink/data/checkpoint/nae/10001001/00000000000000000000000000000000/shared/c5c3a6bd-73ed-4f59-90e2-d79ec693a1b9', dataBytes=1184}, 002096.sst=ByteStreamStateHandle{handleName='file:/opt/flink/data/checkpoint/nae/10001001/00000000000000000000000000000000/shared/ab61570f-189f-4e3b-8000-fdd72f941bdf', dataBytes=2978}, 002182.sst=ByteStreamStateHandle{handleName='file:/opt/flink/data/checkp
-----------------------------
2022-11-01 23:48:53,241 INFO org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - Finished building RocksDB keyed state-backend at /tmp/flink-io-7aa27018-0cb3-4c38-a5dc-d87f22f12516/job_00000000000000000000000000000000_op_KeyedProcessOperator_3571b91c8b2c3bb2097af2fa8f60e607__2_2__uuid_1cde3a92-af7b-46e2-8268-ae847e54c76b.
2022-11-01 23:48:53,321 INFO com.sangfor.xdrassociation.main.functions.download.MainProcessFunction [] - OnTimer alert cache cleaning. NAlert: 0->0
2022-11-01 23:48:53,321 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class com.sangfor.xdrassociation.core.dto.IncidentModify does not contain a setter for field tmgData
2022-11-01 23:48:53,321 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class com.sangfor.xdrassociation.core.dto.IncidentModify cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2022-11-01 23:48:53,322 INFO org.apache.flink.runtime.taskmanager.Task [] - Download: MainProcess (2/2)#1175 (a17a5020cca3546244706f6c6ab0891f) switched from INITIALIZING to RUNNING.
2022-11-01 23:48:53,324 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class com.sangfor.xdrassociation.core.dto.TaskStorage is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2022-11-01 23:48:53,325 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class com.alibaba.fastjson2.JSONObject does not contain a getter for field accessOrder
2022-11-01 23:48:53,325 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class com.alibaba.fastjson2.JSONObject does not contain a setter for field accessOrder
2022-11-01 23:48:53,325 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class com.alibaba.fastjson2.JSONObject cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2022-11-01 23:48:53,425 INFO org.apache.flink.runtime.taskmanager.Task [] - taskToIncidentDeduplication -> JSONObjectToByteDispatcher -> Sink: NAEOutputStream_10001001 (2/2)#1175 (36b4cf51a3935dde56ac858fa9113921) switched from INITIALIZING to RUNNING.
2022-11-01 23:48:53,442 INFO com.sangfor.xdrassociation.main.functions.download.MainProcessFunction [] - OnTimer alert cache cleaning. NAlert: 0->0
2022-11-01 23:49:02,222 INFO org.apache.pulsar.client.impl.ConsumerImpl [] - [persistent://10001001/default/xdr_outer_alert-partition-2] [NAE] Closed consumer
2022-11-01 23:49:02,223 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 2 exited.
2022-11-01 23:49:02,223 INFO org.apache.pulsar.client.impl.ConsumerImpl [] - [persistent://10001001/default/xdr_mss_input_alert-partition-1] [NAE] Closed consumer
2022-11-01 23:49:02,224 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 2 exited.
2022-11-01 23:49:02,225 INFO org.apache.pulsar.client.impl.ConsumerImpl [] - [persistent://10001001/default/xdr_mss_input_alert-partition-2] [NAE] Closed consumer
2022-11-01 23:49:02,226 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 1 exited.
2022-11-01 23:49:02,232 INFO org.apache.pulsar.client.impl.ClientCnx [] - [id: 0xba4c1d71, L:/10.83.246.112:51000 ! R:pulsar-broker-1.pulsar-broker.pulsar.svc.cluster.local/10.83.237.69:6650] Disconnected
2022-11-01 23:49:02,232 INFO org.apache.pulsar.client.impl.ClientCnx [] - [id: 0x92b3c097, L:/10.83.246.112:53200 ! R:pulsar
2022-11-01 23:49:04,259 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: topic:[persistent://10001001/default/xdr_mss_input_alert] -> ByteToJSONObject -> GlobalWatermark -> AlertCheckFilter -> MSSDOP: Filter (2/2)#1175 (35be45ced3221630ecdf811bcea7ca5d) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.lang.IllegalStateException: This split reader have assigned split.
at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.handleSplitsChanges(PulsarPartitionSplitReaderBase.java:164)
at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.handleSplitsChanges(PulsarUnorderedPartitionSplitReader.java:57)
at org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
... 6 more
2022-11-01 23:49:04,259 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: topic:[persistent://10001001/default/xdr_outer_alert] -> ByteToJSONObject -> GlobalWatermark -> AlertCheckFilter -> (Main: Exclude -> (Common: FilterTargetAlert, Download: FilterEAlert, MinerVirus: ToMinerState, Filter), LogTrace: Report -> LogTrace: Tag) (2/2)#1175 (9d0281470a07a74ce3ae6333ff2d46e6) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.s