confluentinc / kafka-connect-hdfs

Kafka Connect HDFS connector
Other
475 stars 397 forks source link

Issue when kafka connect worker pod restart, during creation of nested partition directories in hdfs file system. #538

Open kaushiksrinivas opened 3 years ago

kaushiksrinivas commented 3 years ago

In our production labs, an issue is observed. Below is the sequence of the same.

  1. hdfs connector is added to the connect worker.
  2. hdfs connector is creating folders in hdfs /test1=1/test2=2/ Based on the custom partitioner. Here test1 and test2 are two separate nested directories derived from multiple fields in the record using a custom partitioner.
  3. Now kafka connect hdfs connector uses below function calls to create the directories in the hdfs file system. fs.mkdirs(new Path(filename)); ref: https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/storage/HdfsStorage.java

Now the important thing to note is that if mkdirs() is a non atomic operation (i.e can result in partial execution if interrupted) then suppose the first directory ie test1 is created and just before creation of test2 in hdfs happens if there is a restart to the connect worker pod. Then the hdfs file system will remain with partial folders created for partitions during the restart time frames.

So we might have conditions in hdfs as below /test1=0/test2=0/ /test1=1/ /test1=2/test2=2 /test1=3/test2=3

So the second partition has a missing directory in it. And if hive integration is enabled, hive metastore exceptions will occur since there is a partition expected from hive table is missing for few partitions in hdfs.

Below is the stack trace for the same, io.confluent.connect.storage.errors.HiveMetaStoreException: Invalid partition for xxxxxxxx.syscore: /apps/hdfs-writer/warehouse/xxxxxxxx/topics/syscore/test1=20201122090000/ at io.confluent.connect.storage.hive.HiveMetaStore$1.call(HiveMetaStore.java:122) at io.confluent.connect.storage.hive.HiveMetaStore$1.call(HiveMetaStore.java:107) at io.confluent.connect.storage.hive.HiveMetaStore.doAction(HiveMetaStore.java:97) at io.confluent.connect.storage.hive.HiveMetaStore.addPartition(HiveMetaStore.java:132) at io.confluent.connect.hdfs.DataWriter.syncWithHive(DataWriter.java:432) at io.confluent.connect.hdfs.DataWriter.open(DataWriter.java:446) at io.confluent.connect.hdfs.HdfsSinkTask.open(HdfsSinkTask.java:200) at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:587) at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:67) at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:652) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: InvalidObjectException(message:incomplete partition name - missing test2) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$append_partition_by_name_with_environment_context_result$append_partition_by_name_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java:51619) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$append_partition_by_name_with_environment_context_result$append_partition_by_name_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java:51596) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$append_partition_by_name_with_environment_context_result.read(ThriftHiveMetastore.java:51519) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_append_partition_by_name_with_environment_context(ThriftHiveMetastore.java:1667) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.append_partition_by_name_with_environment_context(ThriftHiveMetastore.java:1651) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.appendPartition(HiveMetaStoreClient.java:607) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.appendPartition(HiveMetaStoreClient.java:601) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:152) at com.sun.proxy.$Proxy47.appendPartition(Unknown Source) at io.confluent.connect.storage.hive.HiveMetaStore$1.call(HiveMetaStore.java:114) ... 28 more

This is a very critical issue and needs some attention on ways for handling the same.

kaushiksrinivas commented 3 years ago

/critical

kaushiksrinivas commented 3 years ago

To add some more additional details,

when syncWithHive() is called upon connector restart, below code snipped in DataWriter.java does create table in hive, get list of partitions in hive and tries to find any missing partitions in hive but present in hdfs and creates those partitions in the hive table

List<String> partitions = hiveMetaStore.listPartitions(hiveDatabase, topicTableMap.get(topic), (short) -1);
FileStatus[] statuses = FileUtils.getDirectories(storage, new Path(topicDir));
for (FileStatus status : statuses) {
  String location = status.getPath().toString();
  if (!partitions.contains(location)) {
    String partitionValue = getPartitionValue(location);
    hiveMetaStore.addPartition(hiveDatabase, topicTableMap.get(topic), partitionValue);
  }

Now going one step inside into function getDirectories > getDirectoriesImpl (from FileUtils). here, those paths are returned as partition path if a. the path is a directory b. path does not contain nested directories (by way of checking no of non directory files is equal to no of (directory + non directory) files in the path.

If above conditions are met, then the path is added as partition path.

So in the erroneous case where the actual path is supposed to look like /test1=0/test2=0/xxxxxxx.parquet But instead due to a crash looks like below, /test1=0/

In this case /test1=0 , satisfies the above a&b conditions and hence is returned as a new partition path to be updated to hive. Doing this update to hive fails because the actual partition for hive is expected to be /test1=0/test2=0 and not /test1=0/

As a fix to this issue, we are proposing to enhance the condition in getDirectoriesImpl function of (FileUtils). enhancement is to, not just ensure its a directory and and does not contain any nested directories . But also make sure its a leaf directory. i.e it should contain more than 0 non directory files.

i.e change this "if (count == fileStatuses.size())" to "if (count == fileStatuses.size()) && count>0" in getDirectoriesImpl to consider a path as a partition path. This means if there is a corrupt path in between which is a directory but does not have any actual parquet files, then that would not be considered as a new partition.

Please let us know more on this issue, analysis and the fix.

kaushiksrinivas commented 3 years ago

Any insights on this issue ?