confluentinc / kafka-connect-hdfs

Kafka Connect HDFS connector
Other
10 stars 397 forks source link

Kafka hdfs Connect writes empty file when HDFS quota is reached in tmp folder #453

Open swathimocharla19 opened 5 years ago

swathimocharla19 commented 5 years ago

We've run into an issue where sometimes when Kafka HDFS connect is stopped for any reason, an empty parquet file is generated in HDFS. After the connect worker is started, the subsequent writes fail on HDFS as the worker tries to gather the schema from the latest available parquet file, based on its offsets (which is empty). This situation occurs when "schema.compatibility": "BACKWARD"

swathimocharla19 commented 5 years ago

To overcome this issue, the following change was made in FileUtils.java

  public static FileStatus fileStatusWithMaxOffset(
      Storage storage,
      Path path,
      CommittedFileFilter filter
  ) {
      if (!storage.exists(path.toString())) {
          return null;
      }
      long maxOffset = -1L;
      FileStatus fileStatusWithMaxOffset = null;
      List <FileStatus> statuses = storage.list(path.toString());
      for (FileStatus status: statuses) {
          if (status.isDirectory()) {
              FileStatus fileStatus = fileStatusWithMaxOffset(storage, status.getPath(), filter);
              if (fileStatus != null) {
                  if (fileStatus.getPath().getName().contains(".parquet") && fileStatus.getLen() < 8) {
                      log.trace("Found an empty parquet file and deleting it: {}", fileStatus.getPath().getName());
                      storage.delete(fileStatus.getPath().toString());
                  } else {
                      long offset = extractOffset(fileStatus.getPath().getName());
                      if (offset > maxOffset) {
                          maxOffset = offset;
                          fileStatusWithMaxOffset = fileStatus;
                      }
                  }
              }
          } else {
              String filename = status.getPath().getName();
              log.trace("Checked for max offset: {}", status.getPath());
              if (filter.accept(status.getPath())) {
                  if (status.getPath().getName().contains(".parquet") && status.getLen() < 8) {
                      log.trace("Found an empty parquet file and deleting it: {}", status.getPath().getName());
                      storage.delete(status.getPath().toString());
                  } else {
                      long offset = extractOffset(filename);
                      if (offset > maxOffset) {
                          maxOffset = offset;
                          fileStatusWithMaxOffset = status;
                      }
                  }
              }
          }
      }
      return fileStatusWithMaxOffset;
  }

To test this, after inserting data into HDFS, the latest file (with max offsets) was deleted and re-created , the size is now 0. At this point the connect worker was restarted, and as part of the restart, the 0 sized parquet file is deleted. Now, the previous parquet file becomes the latest and the offsets are read from there. So, the deleted offsets parquet file is recreated, given that Kafka still holds this data at this given point.

ncliang commented 5 years ago

Thanks for reporting the issue and sending out the PR! I am trying to repro the issue on my end without success. Could you share some details of how to repro? Does it only repro with parquet format? What other configurations do you have on the connector other than schema.compatability=BACKWARD? How much traffic are you pushing through the topic when you see the issue? Thanks!

swathimocharla19 commented 5 years ago

hi @ncliang Thanks for taking a look at this PR. This issue was reported on our customer environment, where some parquet files were of size 0.

Steps to reproduce:

  1. Bring up a hdfs connector with hive integration. Here is a simple json:
    {
    "name": "test6",
    "config": {
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "tasks.max": "1",
        "topics": "test6",
        "hdfs.url": "hdfs://10.15.173.27:8020/user/cloud-user/patch/",
        "hadoop.conf.dir": "/etc/hadoop/conf",
        "schema.compatibility": "BACKWARD",
        "flush.size":"1",
        "format.class":"io.confluent.connect.hdfs.parquet.ParquetFormat",
        "hive.integration": "true",
        "hive.metastore.uris": "thrift://10.15.173.27:9083",
        "hive.database": "hive_connect"
    }
    }
  2. Start the connector and start producing some avro data.
  3. At this point check the hdfs to see the .parquet files generated and data in the table in hive:
    [root@vm-10-15-173-27 cloud-user]# sudo -u hdfs hdfs dfs -ls /user/cloud-user/patch/topics/test6/partition=0
    Found 3 items
    -rw-r--r--   3 kafka cloud-user        432 2019-08-22 10:25 /user/cloud-user/patch/topics/test6/partition=0/test6+0+0000000000+0000000003.parquet
    -rw-r--r--   3 kafka cloud-user        432 2019-08-22 10:25 /user/cloud-user/patch/topics/test6/partition=0/test6+0+0000000004+0000000007.parquet
    -rw-r--r--   3 kafka cloud-user        433 2019-08-22 10:27 /user/cloud-user/patch/topics/test6/partition=0/test6+0+0000000008+0000000011.parquet
hive> select * from  test6;
OK
........
value30 0
value31 0
value32 0
Time taken: 0.087 seconds, Fetched: 12 row(s)
  1. Delete a file from hdfs and create the same with size 0, to simulate this issue of 0 sized parquet file.
[root@vm-10-15-173-27 cloud-user]# sudo -u hdfs hdfs dfs -rm  /user/cloud-user/patch/topics/test6/partition=0/test6+0+0000000008+0000000011.parquet
{"type":"log","host":"host_name","category":"YARN-yarn-GATEWAY-BASE","level":"INFO","system":"etcd_clcm_cdlk_nonSecure","time": "19/08/22 10:27:34","logger":"fs.TrashPolicyDefault","timezone":"EST","log":{"message":"Moved: 'hdfs://vm-10-15-173-27:8020/user/cloud-user/patch/topics/test6/partition=0/test6+0+0000000008+0000000011.parquet' to trash at: hdfs://vm-10-15-173-27:8020/user/hdfs/.Trash/Current/user/cloud-user/patch/topics/test6/partition=0/test6+0+0000000008+0000000011.parquet"}}
[root@vm-10-15-173-27 cloud-user]# sudo -u hdfs hdfs dfs -touchz  /user/cloud-user/patch/topics/test6/partition=0/test6+0+0000000008+0000000011.parquet

At this point, keep sending data to kafka At the connector, you will see this error:

org.apache.kafka.connect.errors.DataException: java.io.IOException: Could not read footer: java.lang.RuntimeException: hdfs://10.15.173.27:8020/user/cloud-user/patch/topics/test6/partition=0/test6+0+0000000008+0000000011.parquet is not a Parquet file (too small)
        at io.confluent.connect.hdfs.parquet.ParquetFileReader.getSchema(ParquetFileReader.java:53)
        at io.confluent.connect.hdfs.parquet.ParquetFileReader.getSchema(ParquetFileReader.java:30)
        at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:372)
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:387)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:158)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Could not read footer: java.lang.RuntimeException: hdfs://10.15.173.27:8020/user/cloud-user/patch/topics/test6/partition=0/test6+0+0000000008+0000000011.parquet is not a Parquet file (too small)

and, this on hive:

hive> select * from  test6;
OK
Failed with exception java.io.IOException:java.lang.RuntimeException: hdfs://10.15.173.27:8020/user/cloud-user/patch/topics/test6/partition=0/test6+0+0000000008+0000000011.parquet is not a Parquet file (too small)
Time taken: 0.084 seconds
  1. With the fix mentioned in the PR, the file with maxoffset and any other parquet files of size 0 will be deleted. The topic partition writer will recreate this parquet file based on the offsets read, given that the retention policy is set accordingly, else it will continue to read the new data into this parquet file. But, in either case, the connector doesnt fail at this point anymore.

With the fix, at hdfs:

[root@vm-10-15-173-27 cloud-user]# sudo -u hdfs hdfs dfs -ls /user/cloud-user/patch/topics/test6/partition=0
Found 4 items
-rw-r--r--   3 kafka cloud-user        432 2019-08-22 10:25 /user/cloud-user/patch/topics/test6/partition=0/test6+0+0000000000+0000000003.parquet
-rw-r--r--   3 kafka cloud-user        432 2019-08-22 10:25 /user/cloud-user/patch/topics/test6/partition=0/test6+0+0000000004+0000000007.parquet
-rw-r--r--   3 kafka cloud-user        433 2019-08-22 10:30 /user/cloud-user/patch/topics/test6/partition=0/test6+0+0000000008+0000000011.parquet
-rw-r--r--   3 kafka cloud-user        432 2019-08-22 10:30 /user/cloud-user/patch/topics/test6/partition=0/test6+0+0000000012+0000000015.parquet

and, at hive:

hive> select * from  test6;
OK
......
value30 0
value31 0
value32 0
value33 0
value34 0
value35 0
value36 0
Time taken: 0.079 seconds, Fetched: 16 row(s)

Hope that this answers your queries!

ncliang commented 5 years ago

@swathimocharla19 Thank you for the very detailed repro steps! I understand that you are reproing by artificially touching a parquet file with size zero. I would rather like to know how the customer ended up with a parquet file of size zero to begin with. You can see here that the connector really just uses the AvroParquetWriter provided by Parquet. Hence I was asking whether it only repro'ed with parquet format, in which case it may indicate a bug in the upstream writer, or that we are maybe using the writer incorrectly. Other pieces of information that may be relevant to root-causing the issue that I can think of:

RossierFl commented 5 years ago

Hi, I just saw this issue and PR and we are also suffering with this issue. I can try to give you more insight on this. On our side the issue append recently because of HDFS Quota (multi tenant cluster with control on HDFS quota for each users)

Last week the quota was reach for one of them, thus multiple connectors weren't able to write new datas and many of those empty file were presents in multiple folders. Would it be possible that the connectors were able to create the file but weren't able to write any datas in it because of the quota ? On our side, it was with Avro format, so I suppose that the issue is not related to the format used.

swathimocharla19 commented 5 years ago

hi @ncliang , @RossierFl My initial thought was that the issue would be similar for both formats. In my initial approach to this fix, i didn't make it specific to parquet. Without the check specific to parquet, the tests failed with more than 20 errors. These were specific to avro format. I didn't look deeper into the tests, but, it seemed like some simulated data.

ncliang commented 5 years ago

@RossierFl Thanks for this info! It is very useful. I can try to repro this locally with quotas.

ncliang commented 5 years ago

Yep, the issue is indeed quota related. We write things to a temp file first before committing every rotateIntervalMs or when we hit flush.size. In this case, we fail to write to the tmp file due to reaching space quota. But, on rotateIntervalMs, we still try to commit, resulting in zero size file being produced.

Unfortunately, I don't think the PR by @swathimocharla19 solves the root cause. I'm happy to review any PRs to fix the underlying issue though. Relevant lines are around here: https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java#L421

yifatshani commented 4 years ago

Hi, We have the same issue, but we have unlimited quota in HDFS. Am I right to assume that the current solution is to change the fileUtils.java as suggested by @swathimocharla19? If yes, can I please get some guidelines on how to do that?

Thanks, Yifat

shmyer commented 3 years ago

Same issue here, we also have no quota defined for the directory we're writing to. Is it possible this might occur under different circumstances too?