confluentinc / kafka-connect-hdfs

Kafka Connect HDFS connector
Other
7 stars 396 forks source link

Creation of log directory fails because of permission issues when using keytab #81

Open zako opened 8 years ago

zako commented 8 years ago

We are experiencing an issue where HDFS connector uses incorrect permissions when creating log directories given configurations with heterogeneous set of keytabs. This happens when we run a script to create several HDFS connectors, each having a different keytab, on an empty one node Kafka Connect cluster.

It seems the problem is within DataWriter which uses the keytab to call UserGroupInformation.loginUserFromKeytab which is a static instance. Since each DataWriter will be running in a separate task/thread, this static call may interfere with other running tasks. So it is possible for the log directory to get created under myuser2 and have the FS WAL file created under a different user, i.e. myuser1 in our example.

Here are the permissions of the log directories in HDFS (scrubbed some names): $ hdfs dfs -ls /logs/mytopic/ Found 1 items drwxrwxrwx - myuser2 supergroup 0 2016-06-21 18:01 /logs/mytopic/0 $ hdfs dfs -ls /logs/mytopic/0 Found 1 items -rw-r--r-- 2 myuser1 supergroup 417 2016-06-21 18:25 /logs/mytopic/0/log

Here is the full stack trace (scrubbed some names): [2016-06-21 18:51:21,479] ERROR Recovery failed at state RECOVERY_PARTITION_PAUSED (io.confluent.connect.hdfs.TopicPartitionWriter) org.apache.kafka.connect.errors.ConnectException: Error creating writer for log file hdfs://mynamenode:8020/logs/mytopic/0/log at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:91) at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:105) at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:441) at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:197) at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:227) at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234) at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:91) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:370) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=myuser2, access=WRITE, inode="/logs/mytopic/0/log":myuser1:supergroup:-rw-r--r-- at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:281) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:262) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:175) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:152) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6590) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6572) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6497) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2887) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:3189) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:3153) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:612) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.append(AuthorizationProviderProxyClientProtocol.java:125) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:414) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080) at sun.reflect.GeneratedConstructorAccessor92.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1769) at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1803) at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1796) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:323) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:319) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:319) at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1173) at io.confluent.connect.hdfs.wal.WALFile$Writer.<init>(WALFile.java:221) at io.confluent.connect.hdfs.wal.WALFile.createWriter(WALFile.java:67) at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:73) ... 17 more Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=myuser2, access=WRITE, inode="/logs/mytopic/0/log":myuser1:supergroup:-rw-r--r-- at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:281) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:262) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:175) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:152) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6590) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6572) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6497) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2887) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:3189) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:3153) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:612) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.append(AuthorizationProviderProxyClientProtocol.java:125) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:414) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080) at org.apache.hadoop.ipc.Client.call(Client.java:1468) at org.apache.hadoop.ipc.Client.call(Client.java:1399) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy52.append(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:313) at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy53.append(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1767) ... 27 more

Ishiihara commented 8 years ago

I think one easy workaround here is to use different topic and log directory for different connectors. This also needed as you want to make sure that WAL by different connectors are isolated. This is crucial to ensure the correct behavior of each connector as the connector relies on the data in HDFS to set the correct offset in case of rebalance and restart.

zako commented 8 years ago

We do create a connector per topic in our setup. Here is an example of the configuration for 2 connectors: `{ "name": "TopicA", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "topics.dir": "/data/", "topics": "TopicA", "connect.hdfs.principal": "KeytabA@domain", "connect.hdfs.keytab": "./keytabs/KeytabA.keytab",

}, "tasks": [ { "connector": "TopicA", "task": 0 } ] }` `{ "name": "TopicB", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "topics.dir": "/data/", "topics": "TopicB", "connect.hdfs.principal": "KeytabB@domain", "connect.hdfs.keytab": "./keytabs/KeytabB.keytab", }, "tasks": [ { "connector": "TopicB", "task": 0 } ] }` The problem is both of these are initialized and created on the same Kafka Connect node. Kafka Connect will create a task per configuration and submit them to the cached thread pool executor in org.apache.kafka.connect.runtime.Worker to run concurrently. The creation of log, tmp and data directories all happen when instantiating a new instance of DataWriter which occurs when the HDFSSinkTask runs. Since permissions are static and apply to all threads, some directories will be created with the incorrect keytab.
Ishiihara commented 8 years ago

@zako That makes sense and thanks for the analysis. I think to allow multiple users for different connector jobs, we need to use secure impersonation and use doAs to perform creation of topics.dir and logs.dir as well as writing data to HDFS. However, this involves relatively large change to the connector.

Another option is to use different Classloaders to load different connectors.

zako commented 8 years ago

I think doAs sounds reasonable, however, my familiarity with Java security is very limited.

We will forgo the alternative to use different Classloaders. Our temporary workaround will be to use a single keytab to continue testing Kafka Connect and other functionality.

cotedm commented 7 years ago

Marking this as an enhancement for later evaluation. If there are other users who require multiple users for different connector jobs in a secure environment, it would be good to know about it.

dbolshak commented 5 years ago

@cotedm I've faced the same problem.