confluentinc / kafka-connect-hdfs

Kafka Connect HDFS connector
Other
12 stars 396 forks source link

Exceptions when hdfs is down or hard disk is full #143

Open skyahead opened 8 years ago

skyahead commented 8 years ago

If a hard disk in the hdfs cluster is full, or making it very bad, if we shut down the hdfs cluster (e.g., run stop-dfs.sh) while the Kafka connect is writing into it, then depending on luck, I keep getting one of all of the following exceptions.

After I restarted the hdfs cluster (i.e., running start-dfs.sh. BTW, this seems not something we should be doing in production, but our QA are doing extreme tests), the connectors keep throwing these exceptions for ever.

Exception one:

org.apache.kafka.connect.errors.ConnectException: java.net.ConnectException: Call From tianjili/xxx.120.162.86 to allinone2:54310 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused at io.confluent.connect.hdfs.wal.FSWAL.append(FSWAL.java:64) ~[main/:na] at io.confluent.connect.hdfs.TopicPartitionWriter.beginAppend(TopicPartitionWriter.java:593) ~[main/:na] at io.confluent.connect.hdfs.TopicPartitionWriter.appendToWAL(TopicPartitionWriter.java:584) ~[main/:na] at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:310) ~[main/:na] at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234) [main/:na] at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:103) [main/:na] at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384) [main/:na] at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240) [main/:na] at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172) [main/:na] at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143) [main/:na] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) [connect-runtime-0.10.0.1-cp1.jar:na] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) [connect-runtime-0.10.0.1-cp1.jar:na] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_101] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_101] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_101] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_101] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101] Caused by: java.net.ConnectException: Call From tianjili/xxx.120.162.86 to allinone2:54310 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_101] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_101] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_101] at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_101] at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:791) ~[hadoop-common-2.6.0.jar:na] at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:731) ~[hadoop-common-2.6.0.jar:na] at org.apache.hadoop.ipc.Client.call(Client.java:1472) ~[hadoop-common-2.6.0.jar:na] at org.apache.hadoop.ipc.Client.call(Client.java:1399) ~[hadoop-common-2.6.0.jar:na] at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) ~[hadoop-common-2.6.0.jar:na] at com.sun.proxy.$Proxy89.getAdditionalDatanode(Unknown Source) ~[na:na] at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:424) ~[hadoop-hdfs-2.6.0.jar:na]

Exception two:

java.io.IOException: Stream closed at java.io.BufferedWriter.ensureOpen(BufferedWriter.java:116) ~[na:1.8.0_101] at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:126) ~[na:1.8.0_101] at java.io.BufferedWriter.flush(BufferedWriter.java:253) ~[na:1.8.0_101] at com.alu.bna.connect.common.CsvRecordWriter.close(CsvRecordWriter.java:86) ~[main/:na] at com.alu.bna.connect.csv.CsvRecordWriterProvider$InnerRecordWriter.close(CsvRecordWriterProvider.java:99) ~[main/:na] at io.confluent.connect.hdfs.TopicPartitionWriter.closeTempFile(TopicPartitionWriter.java:550) ~[main/:na] at io.confluent.connect.hdfs.TopicPartitionWriter.closeTempFile(TopicPartitionWriter.java:561) ~[main/:na] at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:307) ~[main/:na] at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234) [main/:na] at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:103) [main/:na] at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384) [main/:na] at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240) [main/:na] at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172) [main/:na] at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143) [main/:na] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) [connect-runtime-0.10.0.1-cp1.jar:na] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) [connect-runtime-0.10.0.1-cp1.jar:na] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_101] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_101] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_101] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_101] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]

Exception three:

org.apache.kafka.connect.errors.ConnectException: Error creating writer for log file hdfs://allinone2:54310/tianjil/logs/xxxx/3/log at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:91) ~[main/:na] at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:105) ~[main/:na] at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:484) [main/:na] at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:212) [main/:na] at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:256) [main/:na] at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234) [main/:na] at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:103) [main/:na] at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384) [main/:na] at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240) [main/:na] at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172) [main/:na] at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143) [main/:na] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) [connect-runtime-0.10.0.1-cp1.jar:na] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) [connect-runtime-0.10.0.1-cp1.jar:na] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_101] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_101] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_101] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_101] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]

Caused by: java.io.IOException: Cannot obtain block length for LocatedBlock{BP-1032838698-xxx.120.9.80-1476364592496:blk_1073745753_6894; getBlockSize()=35; corrupt=false; offset=0; locs=[xxx.120.9.84:50010, xxx.120.9.80:50010]; storageIDs=[DS-e2d7231e-e690-4ae2-a9bd-e1257a065f2f, DS-4973e814-95c8-44f8-a0be-fc437f39e542]; storageTypes=[DISK, DISK]} at org.apache.hadoop.hdfs.DFSInputStream.readBlockLength(DFSInputStream.java:359) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:301) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:238) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.hdfs.DFSInputStream.(DFSInputStream.java:231) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1498) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:302) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[hadoop-common-2.6.0.jar:na] at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298) ~[hadoop-hdfs-2.6.0.jar:na] at io.confluent.connect.hdfs.wal.WALFile$Reader.openFile(WALFile.java:579) ~[kafka-connect-hdfs-3.0.1.jar:na] at io.confluent.connect.hdfs.wal.WALFile$Reader.(WALFile.java:528) ~[kafka-connect-hdfs-3.0.1.jar:na]

lakeofsand commented 8 years ago

@skyahead We encounter a lot these days when hdfs-connector face hdfs/net abnormal,such as the thrid exception you listed here.Exceptions can only recovered by restart connect or rm hdfs files manually.

These issues clound be classfiled to two types: 1、“Kafka-connect” cant't recovered from some exception automatically,actually it can do it. This it important when face with some common/temporary service down. "The problem is inevitable" 2、 connect -- hdfs same econunter some coordination problem. The same problem may not exist in other context for process is not permanent.

skyahead commented 8 years ago

@lakeofsand I have some code to prevent these exceptions form happening and make Kafka Connect survive network breakdown and HDFS outage, etc automatically. But Our QA reported missing records last week and I am in the process of resolving this issue. Will definitely report back when I fix this issue.

cotedm commented 7 years ago

@skyahead it seems like these three exceptions all stem from an inability to reconnect to HDFS even though the tasks are retrying to establish a connection.

The first one looks like maybe potentially a problem with trying to append to the WAL repeatedly when the state machine gets stuck in the TEMP_FILE_CLOSED state. Seems like this could be related to our inability to properly re-establish a lease but without more log it's difficult to say for sure.

The second one looks similar except we're stuck in the SHOULD_ROTATE state.

The third one looks like we've never even established a lease and again can be related to the same problem. My comments on #142 I believe are relevant here and can help you out if we properly null out the writer when we fail to close it. With that explanation, do you think we can close this as a dupe and reopen if the problem persists after pulling in an adapted version of #142?

For reference, this is the state machine I'm talking about.

https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java#L262

skyahead commented 7 years ago

@cotedm Could you have another look at the current PR, when you get a chance?

Perdjesk commented 7 years ago

Hitting a very similar issue when a rolling restart of HDFS datanodes is executed (1 datanode every 120seconds).

Following ERROR is indefinitely looping. Impacted tasks appears to be random. Some survive the datanodes rolling restart some not.

[2017-05-18 14:32:56,507] ERROR Recovery failed at state RECOVERY_PARTITION_PAUSED (io.confluent.connect.hdfs.TopicPartitionWriter)
org.apache.kafka.connect.errors.ConnectException: Error creating writer for log file hdfs://hadoop//srv/prod/blu/connect-data/logs/topicname/6/log
        at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:91)
        at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:105)
        at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:486)
        at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:213)
        at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:257)
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:247)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:103)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Cannot obtain block length for LocatedBlock{BP-2074729095-ip-1486128728174:blk_1082143745_9382741; getBlockSize()=2640; corrupt=false; offset=0; locs=[DatanodeInfoWithStorage[ip:1019,DS-cce32714-xxxx-499b-9798-2803954eafe9,DISK], DatanodeInfoWithStorage[ip:1019,DS-ef0cdb7d-91ad-xxxx-ba50-fa3e2c124bc5,DISK], DatanodeInfoWithStorage[ip:1019,DS-6a920e7c-cd74-xxxx-ac27-02f07ceb9e13,DISK], DatanodeInfoWithStorage[ip:1019,DS-cfb5a118-b23f-xxxx-bb96-c2b85e5654b1,DISK], DatanodeInfoWithStorage[ip:1019,DS-6dccb500-2e19-xxxxx-a3d6-6976a6127f9c,DISK]]}
        at org.apache.hadoop.hdfs.DFSInputStream.readBlockLength(DFSInputStream.java:428)
        at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:336)
        at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
        at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
        at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
        at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
        at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
        at io.confluent.connect.hdfs.wal.WALFile$Reader.openFile(WALFile.java:573)
        at io.confluent.connect.hdfs.wal.WALFile$Reader.<init>(WALFile.java:522)
        at io.confluent.connect.hdfs.wal.WALFile$Writer.<init>(WALFile.java:215)
        at io.confluent.connect.hdfs.wal.WALFile.createWriter(WALFile.java:67)
        at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:73)
        ... 17 more
blootsvoets commented 6 years ago

Still experiencing a similar issue with Confluent 4.1.0:

[2018-07-19 09:39:57,033] ERROR Recovery failed at state RECOVERY_PARTITION_PAUSED (io.confluent.connect.hdfs.TopicPartitionWriter)
org.apache.kafka.connect.errors.DataException: Error creating writer for log file hdfs://hdfs-namenode-1:8020/logs/mytopic/2/log
    at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:91)
    at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:105)
    at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:601)
    at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:256)
    at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:321)
    at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
    at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Cannot obtain block length for LocatedBlock{BP-848146280-172.19.0.5-1531990635873:blk_1073741849_1025; getBlockSize()=35; corrupt=false; offset=0; locs=[DatanodeInfoWithStorage[172.19.0.6:9866,DS-44ed10ee-5e44-4189-b4c5-5f8579a21184,DISK], DatanodeInfoWithStorage[172.19.0.4:9866,DS-642d35fe-fe6b-43ae-82be-2c5817dbf478,DISK], DatanodeInfoWithStorage[172.19.0.3:9866,DS-b3765134-b9e3-4a99-bb2e-897ee45f3f76,DISK]]}
    at org.apache.hadoop.hdfs.DFSInputStream.readBlockLength(DFSInputStream.java:428)
    at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:336)
    at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
    at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
    at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
    at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
    at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.openFile(WALFile.java:551)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.<init>(WALFile.java:436)
    at io.confluent.connect.hdfs.wal.WALFile$Writer.<init>(WALFile.java:156)
    at io.confluent.connect.hdfs.wal.WALFile.createWriter(WALFile.java:75)
    at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:73)
    ... 17 more
[2018-07-19 09:39:57,037] ERROR Recovery failed at state RECOVERY_PARTITION_PAUSED (io.confluent.connect.hdfs.TopicPartitionWriter)
org.apache.kafka.connect.errors.DataException: Error creating writer for log file hdfs://hdfs-namenode-1:8020/logs/android_empatica_e4_electrodermal_activity/0/log
    at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:91)
    at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:105)
    at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:601)
    at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:256)
    at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:321)
    at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
    at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Cannot obtain block length for LocatedBlock{BP-848146280-172.19.0.5-1531990635873:blk_1073741854_1030; getBlockSize()=35; corrupt=false; offset=0; locs=[DatanodeInfoWithStorage[172.19.0.3:9866,DS-b3765134-b9e3-4a99-bb2e-897ee45f3f76,DISK], DatanodeInfoWithStorage[172.19.0.6:9866,DS-44ed10ee-5e44-4189-b4c5-5f8579a21184,DISK], DatanodeInfoWithStorage[172.19.0.4:9866,DS-642d35fe-fe6b-43ae-82be-2c5817dbf478,DISK]]}
    at org.apache.hadoop.hdfs.DFSInputStream.readBlockLength(DFSInputStream.java:428)
    at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:336)
    at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
    at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
    at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
    at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
    at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.openFile(WALFile.java:551)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.<init>(WALFile.java:436)
    at io.confluent.connect.hdfs.wal.WALFile$Writer.<init>(WALFile.java:156)
    at io.confluent.connect.hdfs.wal.WALFile.createWriter(WALFile.java:75)
    at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:73)
    ... 17 more

Our current resolution is to remove the entire /logs directory whenever this occurs, but that seems a heavy solution. This state is also hard to detect without parsing the logs. Or does it affect change the output on host:8083/connectors/hdfs-sink/status as well?

In general, I'd prefer shutting down a connector or a task to endlessly looping exceptions.

skyahead commented 6 years ago

Have you waited for > 1 hour after this happened? If yes, something else was going one. If not, please try.

Waiting 1 hour is a hard limit of hdfs client code and can not be avoided by current design, which keeps the exactly one guarantee between Kafka and HDFS.

If you delete the logs/etc, then yes, you do not have to wait for this annoying timeout, But you will lose the exactly once feature.

On Thu, Jul 19, 2018 at 5:58 AM, Joris Borgdorff notifications@github.com wrote:

Still experiencing a similar issue with Confluent 4.1.0:

[2018-07-19 09:39:57,033] ERROR Recovery failed at state RECOVERY_PARTITION_PAUSED (io.confluent.connect.hdfs.TopicPartitionWriter) org.apache.kafka.connect.errors.DataException: Error creating writer for log file hdfs://hdfs-namenode-1:8020/logs/mytopic/2/log at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:91) at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:105) at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:601) at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:256) at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:321) at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374) at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Cannot obtain block length for LocatedBlock{BP-848146280-172.19.0.5-1531990635873:blk_1073741849_1025; getBlockSize()=35; corrupt=false; offset=0; locs=[DatanodeInfoWithStorage[172.19.0.6:9866,DS-44ed10ee-5e44-4189-b4c5-5f8579a21184,DISK], DatanodeInfoWithStorage[172.19.0.4:9866,DS-642d35fe-fe6b-43ae-82be-2c5817dbf478,DISK], DatanodeInfoWithStorage[172.19.0.3:9866,DS-b3765134-b9e3-4a99-bb2e-897ee45f3f76,DISK]]} at org.apache.hadoop.hdfs.DFSInputStream.readBlockLength(DFSInputStream.java:428) at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:336) at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272) at org.apache.hadoop.hdfs.DFSInputStream.(DFSInputStream.java:264) at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526) at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304) at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312) at io.confluent.connect.hdfs.wal.WALFile$Reader.openFile(WALFile.java:551) at io.confluent.connect.hdfs.wal.WALFile$Reader.(WALFile.java:436) at io.confluent.connect.hdfs.wal.WALFile$Writer.(WALFile.java:156) at io.confluent.connect.hdfs.wal.WALFile.createWriter(WALFile.java:75) at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:73) ... 17 more [2018-07-19 09:39:57,037] ERROR Recovery failed at state RECOVERY_PARTITION_PAUSED (io.confluent.connect.hdfs.TopicPartitionWriter) org.apache.kafka.connect.errors.DataException: Error creating writer for log file hdfs://hdfs-namenode-1:8020/logs/android_empatica_e4_electrodermal_activity/0/log at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:91) at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:105) at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:601) at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:256) at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:321) at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374) at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Cannot obtain block length for LocatedBlock{BP-848146280-172.19.0.5-1531990635873:blk_1073741854_1030; getBlockSize()=35; corrupt=false; offset=0; locs=[DatanodeInfoWithStorage[172.19.0.3:9866,DS-b3765134-b9e3-4a99-bb2e-897ee45f3f76,DISK], DatanodeInfoWithStorage[172.19.0.6:9866,DS-44ed10ee-5e44-4189-b4c5-5f8579a21184,DISK], DatanodeInfoWithStorage[172.19.0.4:9866,DS-642d35fe-fe6b-43ae-82be-2c5817dbf478,DISK]]} at org.apache.hadoop.hdfs.DFSInputStream.readBlockLength(DFSInputStream.java:428) at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:336) at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272) at org.apache.hadoop.hdfs.DFSInputStream.(DFSInputStream.java:264) at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526) at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304) at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312) at io.confluent.connect.hdfs.wal.WALFile$Reader.openFile(WALFile.java:551) at io.confluent.connect.hdfs.wal.WALFile$Reader.(WALFile.java:436) at io.confluent.connect.hdfs.wal.WALFile$Writer.(WALFile.java:156) at io.confluent.connect.hdfs.wal.WALFile.createWriter(WALFile.java:75) at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:73) ... 17 more

Our current resolution is to remove the entire /logs directory whenever this occurs, but that seems a heavy solution. This state is also hard to detect without parsing the logs. Or does it affect change the output on host:8083/connectors/hdfs-sink/status as well?

In general, I'd prefer shutting down a connector or a task to endlessly looping exceptions.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/confluentinc/kafka-connect-hdfs/issues/143#issuecomment-406223020, or mute the thread https://github.com/notifications/unsubscribe-auth/AAhjmf2tNk8ZlSjPXzIA4rx7FVUiRi89ks5uIFg8gaJpZM4KfIDK .

blootsvoets commented 6 years ago

@skyahead will try to wait next time. If this behaviour is expected and automatically resolved by the code, I would not expect the logs filling up with stack traces for every (very frequent) retry though. A message that it will be automatically resolved would also be useful.

alexandrfox commented 6 years ago

Similarly, we still see TopicPartitionWriter stuck in SHOULD_ROTATE loop:

[2018-07-27 08:16:23,712] ERROR Exception on topic partition topic-name-1:  (io.confluent.connect.hdfs.TopicPartitionWriter)
org.apache.kafka.connect.errors.DataException: java.nio.channels.ClosedChannelException
        at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.close(AvroRecordWriterProvider.java:96)
        at io.confluent.connect.hdfs.TopicPartitionWriter.closeTempFile(TopicPartitionWriter.java:655)
        at io.confluent.connect.hdfs.TopicPartitionWriter.closeTempFile(TopicPartitionWriter.java:662)
        at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:386)
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.ClosedChannelException
        at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1546)
        at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:104)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:458)
        at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
        at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
        at org.apache.avro.io.BufferedBinaryEncoder.flushBuffer(BufferedBinaryEncoder.java:93)
        at org.apache.avro.io.BufferedBinaryEncoder.ensureBounds(BufferedBinaryEncoder.java:108)
        at org.apache.avro.io.BufferedBinaryEncoder.writeLong(BufferedBinaryEncoder.java:129)
        at org.apache.avro.file.DataFileStream$DataBlock.writeBlockTo(DataFileStream.java:367)
        at org.apache.avro.file.DataFileWriter.writeBlock(DataFileWriter.java:395)
        at org.apache.avro.file.DataFileWriter.sync(DataFileWriter.java:413)
        at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:422)
        at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:445)
        at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.close(AvroRecordWriterProvider.java:94)
        ... 16 more

@skyahead I wonder if we should bring back the change from the original commit of yours to try/finally writer.close() method call in TopicPartitionWriter.closeTempFile() (or at least treat ClosedChannelException differently in this case). ~I don't see how can we lose messages by doing that since writeRecord() call won't be affected, as well as appendToWAL/commitFile state machine logic.~~ EDIT: after taking another hard look: there might be a situation where we have unflushed messages in FSDataOutputStream after calling writeRecord(), so we count them in recordCounter, offsets and startOffsets but network partition happens before buffer is flushed. So subsequent commitFile() call will commit an incomplete logfile.