confluentinc / kafka-connect-hdfs

Kafka Connect HDFS connector
Other
11 stars 396 forks source link

org.apache.hadoop.fs.FileAlreadyExistsException: when rotate.interval.ms is relatively small #296

Open chentao opened 6 years ago

chentao commented 6 years ago

This happens when I restart one of the connect-distributed process. or when new sink instance was created, or some else happens. I encoutered this some times, and tried a full new installation of kafka connect, the problem already exists.

But, when I increase the rotate.interval.ms to some big value(like 3600000, before 300000), the problem is gone. (But only the interval 300000 fits my requirement)

Kafka: 1.0.0 Kafka Connect: 4.0.0 Hadoop: 2.6.0

[2018-03-15 10:11:26,995] ERROR Exception on topic partition parquet_yjs_missionreward-6: (io.confluent.connect.hdfs.TopicPartitionWriter:403) org.apache.kafka.connect.errors.ConnectException: org.apache.hadoop.fs.FileAlreadyExistsException: /dw/today/+tmp/parquet_yjs_missionreward/dt=2018-03-14/7d4ee6f0-41c0-4690-9feb-49aae849333c_tmp.parquet for client 10.30.100.47 already exists at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2848) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2740) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2625) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:599) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:112) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:401) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2220) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2216) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2214)

at io.confluent.connect.hdfs.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:78)
at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:635)
at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:379)
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:101)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: /dw/today/+tmp/parquet_yjs_missionreward/dt=2018-03-14/7d4ee6f0-41c0-4690-9feb-49aae849333c_tmp.parquet for client 10.30.100.47 already exists at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2848) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2740) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2625) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:599) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:112) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:401) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2220) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2216) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2214)

at sun.reflect.GeneratedConstructorAccessor50.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1653)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624)
at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448)
at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:236)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:273)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:222)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:188)
at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:131)
at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:106)
at io.confluent.connect.hdfs.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:68)
... 15 more

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.FileAlreadyExistsException): /dw/today/+tmp/parquet_yjs_missionreward/dt=2018-03-14/7d4ee6f0-41c0-4690-9feb-49aae849333c_tmp.parquet for client 10.30.100.47 already exists at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2848) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2740) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2625) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:599) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:112) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:401) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2220) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2216) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2214)

at org.apache.hadoop.ipc.Client.call(Client.java:1475)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy53.create(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:296)
at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy54.create(Unknown Source)
at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1648)
... 31 more
bikeholik commented 6 years ago

as far as I have seen (at least in v4) avro writer has overwrite flag set to true when creating tmp files, but parquet writer uses CREATE mode for the same thing

should all overwirte tmp files?