When HDFS is used as deep storage to store segments and index logs, there's a case that ingestion tasks fail. And unfortunately, we encountered this edge case this afternoon.
As we know, all index logs are uploaded to a directory specified by druid.indexer.logs.directory . In HDFS, there's a directory number limitation controlled by dfs.namenode.fs-limits.max-directory-items(default is 1048576). When the limitation is reached, tasks will be marked as FAILED due to failure of creating directory in HDFS.
Here's the exception from middle manager
2021-04-02T09:28:12,908 INFO [forking-task-runner-7] org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs - Writing task log to: /druid/indexing-logs/index_kafka_request-metrics-alarm_d0dde6eff3ebae8_ngflfhib
2021-04-02T09:28:12,915 INFO [forking-task-runner-7] org.apache.druid.indexing.overlord.ForkingTaskRunner - Exception caught during execution
org.apache.hadoop.ipc.RemoteException: The directory item limit of /druid/indexing-logs is exceeded: limit=1048576 items=1048576
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.verifyMaxDirItems(FSDirectory.java:2161)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.addChild(FSDirectory.java:2249)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.addLastINode(FSDirectory.java:2217)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.addINode(FSDirectory.java:2000)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.addFile(FSDirectory.java:368)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2859)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2739)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2624)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:599)
at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:112)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:401)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2141)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2137)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1783)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2135)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489) ~[?:?]
at org.apache.hadoop.ipc.Client.call(Client.java:1435) ~[?:?]
at org.apache.hadoop.ipc.Client.call(Client.java:1345) ~[?:?]
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227) ~[?:?]
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) ~[?:?]
at com.sun.proxy.$Proxy62.create(Unknown Source) ~[?:?]
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:297) ~[?:?]
at sun.reflect.GeneratedMethodAccessor113.invoke(Unknown Source) ~[?:?]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_65]
at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_65]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409) ~[?:?]
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163) ~[?:?]
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155) ~[?:?]
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) ~[?:?]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346) ~[?:?]
at com.sun.proxy.$Proxy63.create(Unknown Source) ~[?:?]
at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:265) ~[?:?]
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1274) ~[?:?]
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1216) ~[?:?]
at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:473) ~[?:?]
at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:470) ~[?:?]
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[?:?]
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:470) ~[?:?]
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:411) ~[?:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) ~[?:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) ~[?:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807) ~[?:?]
at org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs.pushTaskFile(HdfsTaskLogs.java:83) ~[?:?]
at org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs.pushTaskLog(HdfsTaskLogs.java:65) ~[?:?]
at org.apache.druid.indexing.overlord.ForkingTaskRunner$1.call(ForkingTaskRunner.java:374) [druid-indexing-service-0.16.0-incubating.jar:0.16.0-incubating]
at org.apache.druid.indexing.overlord.ForkingTaskRunner$1.call(ForkingTaskRunner.java:132) [druid-indexing-service-0.16.0-incubating.jar:0.16.0-incubating]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_65]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_65]
2021-04-02T09:28:12,932 INFO [forking-task-runner-7] org.apache.druid.indexing.overlord.ForkingTaskRunner - Removing task directory: var/druid/task/index_kafka_request-metrics-alarm_d0dde6eff3ebae8_ngflfhib
2021-04-02T09:28:12,939 INFO [WorkerTaskManager-NoticeHandler] org.apache.druid.indexing.worker.WorkerTaskManager - Job's finished. Completed [index_kafka_request-metrics-alarm_d0dde6eff3ebae8_ngflfhib] with status [FAILED]
When this case happens, there're several problems:
This problem can only be seen from logs of middle managers, and no information is provided in task report from console
Status of task is marked as FAILED and supervisor status is turned into UNHEALTHY
From the message above we can see that task log is deleted after exception happens, viewing task logs through console of course is no use, it's hard for people to know what happened when they see lots of task failure
I know there're some ways to avoid this problem:
Increasing the limitation in HDFS is one way to solve it , but changing of that value involves restarting of nodes, which is usually not a recommendation in production.
there's already a task log auto cleaner in druid which is useful to alleviate this problem, but it's disabled by default.
But from the design perspective, I think it's better to do some optimization here by:
putting task logs in a sub directory each of which is the datasource the task is running on so that the directory limitation is hard to reach for a single data source.
don't mark the task as FAILED due to failure of index logs uploading because FAILED state I think should be more about ingestion and segment handoff.
don't delete the index logs if there's a failure of uploading index log
enable task log auto cleaner by default
I have not go through the code to see if the proposals above are plausible, I write this issue in hope of having a discussion first. @jihoonson
When HDFS is used as deep storage to store segments and index logs, there's a case that ingestion tasks fail. And unfortunately, we encountered this edge case this afternoon.
As we know, all index logs are uploaded to a directory specified by
druid.indexer.logs.directory
. In HDFS, there's a directory number limitation controlled bydfs.namenode.fs-limits.max-directory-items
(default is 1048576). When the limitation is reached, tasks will be marked as FAILED due to failure of creating directory in HDFS.Here's the exception from middle manager
When this case happens, there're several problems:
I know there're some ways to avoid this problem:
Increasing the limitation in HDFS is one way to solve it , but changing of that value involves restarting of nodes, which is usually not a recommendation in production.
there's already a task log auto cleaner in druid which is useful to alleviate this problem, but it's disabled by default.
But from the design perspective, I think it's better to do some optimization here by:
I have not go through the code to see if the proposals above are plausible, I write this issue in hope of having a discussion first. @jihoonson