confluentinc / kafka-connect-hdfs

Kafka Connect HDFS connector
Other
6 stars 396 forks source link

Confluent 5.0.0 : Kafka hdfs sink connector failures when the connector log file being corrupt/another task cannot acquire lease. #372

Open rupeshmore85 opened 5 years ago

rupeshmore85 commented 5 years ago

We upgraded from confluent 5.0.0 from 4.0.0 in production to run kafka connect hdfs connectors? I see connector task failures with the below error. I think this issue is related to : https://github.com/confluentinc/kafka-connect-hdfs/issues/53. The manual resolution we know is to manually delete the corrupt log files. How do we automate deleting these corrupt log files? We fear that we might loose data for that kafka partition having these failures.

[2018-09-16 22:44:21,946] INFO Successfully acquired lease for hdfs://hadoop-das//home/scribe/kafka_connect/logs/scribeLogBackend/10/log (io.confluent.connect.hdfs.wal.FSWAL)
[2018-09-16 22:44:21,955] ERROR Error applying WAL file: hdfs://hadoop-das//home/scribe/kafka_connect/logs/scribeLogBackend/10/log, {} (io.confluent.connect.hdfs.wal.FSWAL)
java.io.IOException: File is corrupt!
    at io.confluent.connect.hdfs.wal.WALFile$Reader.readRecordLength(WALFile.java:749)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.next(WALFile.java:817)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.next(WALFile.java:775)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.next(WALFile.java:801)
    at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:112)
    at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:601)
    at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:256)
    at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:321)
    at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:375)
    at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2018-09-16 22:44:21,990] ERROR Recovery failed at state RECOVERY_PARTITION_PAUSED (io.confluent.connect.hdfs.TopicPartitionWriter)
org.apache.kafka.connect.errors.DataException: java.io.IOException: File is corrupt!
    at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:133)
    at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:601)
    at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:256)
    at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:321)
    at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:375)
    at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: File is corrupt!
    at io.confluent.connect.hdfs.wal.WALFile$Reader.readRecordLength(WALFile.java:749)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.next(WALFile.java:817)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.next(WALFile.java:775)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.next(WALFile.java:801)
    at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:112)
    ... 16 more
rupeshmore85 commented 5 years ago

More errors:

[2018-09-18 21:34:08,246] ERROR Exception on topic partition scribeLogJson-4:  (io.confluent.connect.hdfs.TopicPartitionWriter)
org.apache.kafka.connect.errors.DataException: Error creating writer for log file hdfs://hadoop-das//home/scribe/kafka_connect/logs/scribeLogJson/4/log
    at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:91)
    at io.confluent.connect.hdfs.wal.FSWAL.append(FSWAL.java:56)
    at io.confluent.connect.hdfs.TopicPartitionWriter.beginAppend(TopicPartitionWriter.java:701)
    at io.confluent.connect.hdfs.TopicPartitionWriter.appendToWAL(TopicPartitionWriter.java:692)
    at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:389)
    at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:375)
    at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot obtain block length for LocatedBlock{BP-1833411454-10.224.157.11-1520059322805:blk_1203122048_139471903; getBlockSize()=7910; corrupt=false; offset=0; locs=[DatanodeInfoWithStorage[10.224.157.82:50010,DS-1c99435c-5fa8-4c1e-8e94-26d4c1bf4907,DISK], DatanodeInfoWithStorage[10.224.157.95:50010,DS-ad8cd7c1-d3ab-4aa7-9193-2119dae01cc7,DISK], DatanodeInfoWithStorage[10.224.157.85:50010,DS-4c3e29d4-83a2-49b8-a76f-3f70cd185c68,DISK], DatanodeInfoWithStorage[10.224.157.37:50010,DS-965173c6-4af8-41d5-878c-d9d120d37d78,DISK], DatanodeInfoWithStorage[10.224.157.61:50010,DS-edf15c5a-37c0-49c8-b4ef-3bd07295fdbd,DISK]]}
    at org.apache.hadoop.hdfs.DFSInputStream.readBlockLength(DFSInputStream.java:428)
    at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:336)
    at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
    at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
    at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
    at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
    at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.openFile(WALFile.java:568)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.<init>(WALFile.java:453)
    at io.confluent.connect.hdfs.wal.WALFile$Writer.<init>(WALFile.java:163)
    at io.confluent.connect.hdfs.wal.WALFile.createWriter(WALFile.java:77)
    at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:73)
    ... 17 more
rupeshmore85 commented 5 years ago

We think these frequent failures creates bad sequence files in hdfs causing EOFException. I have the issue here : https://stackoverflow.com/q/52359502/2256189

It is a blocker for us as we are running it in production and these frequent failures related to connector log files causes these badfiles. Any help (pointers to the fix) would be highly appreciated. Thanks

dasbh commented 4 years ago

FSWAL implementation depends on append on log file (I think it assumes cluster is enabled for file append support "dfs.support.append" ).

Internally code takes a lease on WAL file using fs.append(walLogFile) and then reads the file on the same thread. I have seen "Cannot obtain block length for LocatedBlock" in such scenario.

Simple test

FileSystem fs = ..
fs.append("/some_existing_file");
fs.open("/some_existing_file");

You would see "Cannot obtain block length for LocatedBlock" if append is not enabled on cluster

gnkishore commented 3 years ago

We upgraded from confluent 5.0.0 from 4.0.0 in production to run kafka connect hdfs connectors? I see connector task failures with the below error. I think this issue is related to : #53. The manual resolution we know is to manually delete the corrupt log files. How do we automate deleting these corrupt log files? We fear that we might loose data for that kafka partition having these failures.

[2018-09-16 22:44:21,946] INFO Successfully acquired lease for hdfs://hadoop-das//home/scribe/kafka_connect/logs/scribeLogBackend/10/log (io.confluent.connect.hdfs.wal.FSWAL)
[2018-09-16 22:44:21,955] ERROR Error applying WAL file: hdfs://hadoop-das//home/scribe/kafka_connect/logs/scribeLogBackend/10/log, {} (io.confluent.connect.hdfs.wal.FSWAL)
java.io.IOException: File is corrupt!
    at io.confluent.connect.hdfs.wal.WALFile$Reader.readRecordLength(WALFile.java:749)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.next(WALFile.java:817)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.next(WALFile.java:775)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.next(WALFile.java:801)
    at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:112)
    at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:601)
    at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:256)
    at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:321)
    at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:375)
    at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2018-09-16 22:44:21,990] ERROR Recovery failed at state RECOVERY_PARTITION_PAUSED (io.confluent.connect.hdfs.TopicPartitionWriter)
org.apache.kafka.connect.errors.DataException: java.io.IOException: File is corrupt!
    at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:133)
    at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:601)
    at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:256)
    at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:321)
    at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:375)
    at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: File is corrupt!
    at io.confluent.connect.hdfs.wal.WALFile$Reader.readRecordLength(WALFile.java:749)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.next(WALFile.java:817)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.next(WALFile.java:775)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.next(WALFile.java:801)
    at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:112)
    ... 16 more

is there a solution found for this problem. We are also having similar issues in production. Is there a fix planned for this.

BlueEldur commented 3 years ago

This has been addressed by #550