confluentinc / kafka-connect-hdfs

Kafka Connect HDFS connector
Other
12 stars 397 forks source link

use of many connections with multiple keytabs #405

Open aminejamoussi opened 5 years ago

aminejamoussi commented 5 years ago

I used two hdfs-sink connections, the first will get data from topic 1 and sink to /hdfs/url1 (use of keytab1) the second will get data from topic 2 and sink to /hdfs/url2 (use of keytab1)

==> the problem is that this configuration will generate an error of permission denied and then I noticed that the second connection will use the first keytab and not the second.

I don't know if it is possible to use multiple connections with different keytabs or it must be the same keytab for all the connections.

OneCricketeer commented 5 years ago

Each connector can have its own principals and keytabs, and entirely separate hadoop.conf.dir directories

https://docs.confluent.io/current/connect/kafka-connect-hdfs/configuration_options.html#security

aminejamoussi commented 5 years ago

I checked that but it shows an error, like it will use always the first keytabs for all connections.

OneCricketeer commented 5 years ago

I'm not sure I understand.

If you post more than one config to /connectors it should work. The property itself is not a list of values

Can you show your configurations?

aminejamoussi commented 5 years ago

le premier connecteur

{ "name": "monitoring_sink", "config": { "name": "monitoring_sink", "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "format.class": "io.confluent.connect.hdfs.avro.AvroFormat", "flush.size": "1000", "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", "schema.compatibility":"NONE", "partition.duration.ms":"60000", "rotate.schedule.interval.ms":"30000", "locale":"FR", "timezone":"GMT", "path.format":"YYYYMMdd", "hdfs.authentication.kerberos": "true", "hdfs.namenode.principal": "nn@_HOST@DDHAD", "connect.hdfs.principal": "bpapdadm@DDHAD", "connect.hdfs.keytab": "/home/x175114/kafka_connect/config/bpapdadm.DDHAD.applicatif.keytab", "topics.dir": "/project/bsc/bpa/monitoring/row_data_maycem", "logs.dir": "/project/bsc/bpa/monitoring/row_data_maycem/logs", "shutdown.timeout.ms": "10000", "hadoop.conf.dir": "/etc/hadoop/conf", "tasks.max": "5", "topics": "testbpa", "timestamp.field":"amine", "hdfs.url": "hdfs://dhadcluster02/"

}

le deuxième connecteur

{ "name": "wfm_sink_ppa", "config": { "name": "wfm_sink_ppa", "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "format.class": "io.confluent.connect.hdfs.avro.AvroFormat", "flush.size": "10", "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", "schema.compatibility":"NONE", "partition.duration.ms":"60000", "rotate.schedule.interval.ms":"30000", "locale":"FR", "timezone":"GMT", "path.format":"YYYYMMdd", "hdfs.authentication.kerberos": "true", "hdfs.namenode.principal": "nn@_HOST@DDHAD", "connect.hdfs.principal": "bddfddadm@DDHAD", "connect.hdfs.keytab": "/home/x175114/kafka_connect/config/bddfddadm.DDHAD.applicatif.keytab", "topics.dir": "/lake/bddf/a5151_ppa/hive/lake_bddf_a5151_ppa.db", "shutdown.timeout.ms": "10000", "hadoop.conf.dir": "/etc/hadoop/conf", "logs.dir": "/lake/bddf/a5151_ppa/logs", "tasks.max": "1", "topics": "bddfppadev2brut", "timestamp.field":"amine", "hdfs.url": "hdfs://dhadcluster02/" } }

OneCricketeer commented 5 years ago

Thanks. I have not used Kerberos with HDFS, but I'm sure someone else can give you a clearer response.

My first thought is that it should work because each connector task maintiains its own Hadoop configurations, but you might want to be sure there is no keytabs that are specified within /etc/hadoop/conf configuration files that could override the other properties.

aminejamoussi commented 5 years ago

no there aren't any keytabs there.

liko9 commented 5 years ago

In your configurations, you're showing the same keytab and principal for both connectors. It would also be helpful if you could share the log output where you're seeing the error you referenced

charlielin commented 3 years ago

I am facing the same issue exactly. I got two hdfs-sink connections, in the config, the first will get data from topic 1 and sink to /hdfs/url1 (using keytab1), the second will get data from topic 2 and sink to /hdfs/url2 (using keytab2). But in fact, the first sink was using the keytab2 while the second sink using keytab1. Hear are my configs: the first hdfs-sink:

{
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "topics.dir": "ywmy",
    "flush.size": "1000000",
    "tasks.max": "2",
    "connect.hdfs.principal": "ywmy",
    "timezone": "Asia/Shanghai",
    "consumer.sasl.mechanism": "PLAIN",
    "locale": "CHINA",
    "format.class": "io.confluent.connect.hdfs.string.StringFormat",
    "consumer.auto.offset.reset": "earliest",
    "hdfs.namenode.principal": "hadoop/longzhou-hdpnn.lz.dscc.99.com@LZ.DSCC.99.COM",
    "consumer.security.protocol": "SASL_PLAINTEXT",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"****\" password=\"*****\";",
    "partition.duration.ms": "100",
    "hadoop.conf.dir": "/home/work/hadoop-conf",
    "topics": "log-ywmy",
    "connect.hdfs.keytab": "/home/work/keytab/ywmy/ywmy.keytab",
    "hdfs.url": "hdfs://bigdata-test/user/nd_rdg/raw-data/realtime",
    "hdfs.authentication.kerberos": "true",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "value.converter.schemas.enable": "false",
    "name": "ywmy-hdfs-sink",
    "path.format": "YYYYMMdd",
    "rotate.schedule.interval.ms": "86400000"
}

the second hdfs-sink,

{
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "topics.dir": "ue4my",
    "flush.size": "1000000",
    "tasks.max": "2",
    "connect.hdfs.principal": "xhmy",
    "timezone": "Asia/Shanghai",
    "consumer.sasl.mechanism": "PLAIN",
    "locale": "CHINA",
    "format.class": "io.confluent.connect.hdfs.string.StringFormat",
    "consumer.auto.offset.reset": "earliest",
    "hdfs.namenode.principal": "hadoop/longzhou-hdpnn.lz.dscc.99.com@LZ.DSCC.99.COM",
    "consumer.security.protocol": "SASL_PLAINTEXT",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"***\" password=\"***\";",
    "partition.duration.ms": "100",
    "hadoop.conf.dir": "/home/work/hadoop-conf",
    "topics": "log-ue4my",
    "connect.hdfs.keytab": "/home/work/keytab/xhmy/xhmy.keytab",
    "hdfs.url": "hdfs://bigdata-test/user/nd_xhmy/raw-data/realtime",
    "hdfs.authentication.kerberos": "true",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "value.converter.schemas.enable": "false",
    "name": "xhmy-hdfs-sink",
    "path.format": "YYYYMMddHH",
    "rotate.schedule.interval.ms": "3600000"
}

here is my error log:

[2021-10-20 14:00:01,930] DEBUG Attempting to acquire lease for WAL file: hdfs://bigdata-test/user/nd_rdg/raw-data/realtime/logs/log-ywmy/9/log (io.confluent.connect.hdfs.wal.FSWAL:78)
[2021-10-20 14:00:01,930] DEBUG Looking for FS supporting hdfs (org.apache.hadoop.fs.FileSystem:3208)
[2021-10-20 14:00:01,930] DEBUG looking for configuration option fs.hdfs.impl (org.apache.hadoop.fs.FileSystem:3212)
[2021-10-20 14:00:01,930] DEBUG Looking in service filesystems for implementation class (org.apache.hadoop.fs.FileSystem:3219)
[2021-10-20 14:00:01,930] DEBUG FS for hdfs is class org.apache.hadoop.hdfs.DistributedFileSystem (org.apache.hadoop.fs.FileSystem:3228)
[2021-10-20 14:00:01,931] DEBUG dfs.client.use.legacy.blockreader.local = false (org.apache.hadoop.hdfs.client.impl.DfsClientConf:676)
[2021-10-20 14:00:01,931] DEBUG dfs.client.read.shortcircuit = false (org.apache.hadoop.hdfs.client.impl.DfsClientConf:678)
[2021-10-20 14:00:01,931] DEBUG dfs.client.domain.socket.data.traffic = false (org.apache.hadoop.hdfs.client.impl.DfsClientConf:680)
[2021-10-20 14:00:01,931] DEBUG dfs.domain.socket.path =  (org.apache.hadoop.hdfs.client.impl.DfsClientConf:682)
[2021-10-20 14:00:01,931] DEBUG Sets dfs.client.block.write.replace-datanode-on-failure.min-replication to 0 (org.apache.hadoop.hdfs.DFSClient:331)
[2021-10-20 14:00:01,933] DEBUG No HA service delegation token found for logical URI hdfs://bigdata-test/user/nd_rdg/raw-data/realtime/logs/log-ywmy/9/log (org.apache.hadoop.hdfs.HAUtilClient:145)
[2021-10-20 14:00:01,933] DEBUG dfs.client.use.legacy.blockreader.local = false (org.apache.hadoop.hdfs.client.impl.DfsClientConf:676)
[2021-10-20 14:00:01,933] DEBUG dfs.client.read.shortcircuit = false (org.apache.hadoop.hdfs.client.impl.DfsClientConf:678)
[2021-10-20 14:00:01,933] DEBUG dfs.client.domain.socket.data.traffic = false (org.apache.hadoop.hdfs.client.impl.DfsClientConf:680)
[2021-10-20 14:00:01,933] DEBUG dfs.domain.socket.path =  (org.apache.hadoop.hdfs.client.impl.DfsClientConf:682)
[2021-10-20 14:00:01,934] DEBUG multipleLinearRandomRetry = null (org.apache.hadoop.io.retry.RetryUtils:76)
[2021-10-20 14:00:01,934] DEBUG getting client out of cache: org.apache.hadoop.ipc.Client@42082a86 (org.apache.hadoop.ipc.Client:63)
[2021-10-20 14:00:01,935] DEBUG DataTransferProtocol using SaslPropertiesResolver, configured QOP dfs.data.transfer.protection = integrity, configured class dfs.data.transfer.saslproperties.resolver.class = class org.apache.hadoop.security.SaslPropertiesResolver (org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil:200)
[2021-10-20 14:00:01,935] DEBUG IPC Client (1730643031) connection to longzhou-hdpnn.lz.dscc.99.com/192.168.19.25:8020 from xhmy@LZ.DSCC.99.COM sending #68763 org.apache.hadoop.hdfs.protocol.ClientProtocol.getFileInfo (org.apache.hadoop.ipc.Client:1147)
[2021-10-20 14:00:01,936] DEBUG IPC Client (1730643031) connection to longzhou-hdpnn.lz.dscc.99.com/192.168.19.25:8020 from xhmy@LZ.DSCC.99.COM got value #68763 (org.apache.hadoop.ipc.Client:1201)
[2021-10-20 14:00:01,936] DEBUG Call: getFileInfo took 1ms (org.apache.hadoop.ipc.ProtobufRpcEngine:253)
[2021-10-20 14:00:01,936] DEBUG /user/nd_rdg/raw-data/realtime/logs/log-ywmy/9/log: masked=rw-r--r-- (org.apache.hadoop.hdfs.DFSClient:1228)
[2021-10-20 14:00:01,936] DEBUG IPC Client (1730643031) connection to longzhou-hdpnn.lz.dscc.99.com/192.168.19.25:8020 from xhmy@LZ.DSCC.99.COM sending #68764 org.apache.hadoop.hdfs.protocol.ClientProtocol.create (org.apache.hadoop.ipc.Client:1147)
[2021-10-20 14:00:01,937] DEBUG IPC Client (1730643031) connection to longzhou-hdpnn.lz.dscc.99.com/192.168.19.25:8020 from xhmy@LZ.DSCC.99.COM got value #68764 (org.apache.hadoop.ipc.Client:1201)
[2021-10-20 14:00:01,938] DEBUG Exception while invoking call #68764 ClientNamenodeProtocolTranslatorPB.create over longzhou-hdpnn.lz.dscc.99.com/192.168.19.25:8020. Not retrying because try once and fail. (org.apache.hadoop.io.retry.RetryInvocationHandler:379)
[2021-10-20 14:00:01,939] ERROR Exception on topic partition log-ywmy-9:  (io.confluent.connect.hdfs.TopicPartitionWriter:452)
org.apache.kafka.connect.errors.DataException: Error creating writer for log file, ywmy-hdfs-sink-1, file hdfs://bigdata-test/user/nd_rdg/raw-data/realtime/logs/log-ywmy/9/log
  at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:112)
  at io.confluent.connect.hdfs.wal.FSWAL.append(FSWAL.java:65)
  at io.confluent.connect.hdfs.TopicPartitionWriter.beginAppend(TopicPartitionWriter.java:842)
  at io.confluent.connect.hdfs.TopicPartitionWriter.appendToWAL(TopicPartitionWriter.java:833)
  at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:441)
  at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:376)
  at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:133)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=xhmy, access=WRITE, inode="/user/nd_rdg/raw-data/realtime/logs/log-ywmy/9":ywmy:hadoop:drwxr-xr-x
  at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:350)
  at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:251)
  at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:189)
  at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1756)
  at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1740)
  at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1699)
  at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:293)
  at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2303)
  at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2247)
  at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:779)
  at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:420)
  at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
  at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:507)
  at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1034)
  at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1003)
  at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:931)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1926)
  at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2854)

  at sun.reflect.GeneratedConstructorAccessor62.newInstance(Unknown Source)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
  at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)

From the log, we can find that the first hdfs-sink tried to use user xhmy(keytab2) to write messages to hdfs/url1(hdfs://bigdata-test/user/nd_rdg/raw-data/realtime/logs/log-ywmy/9/log)

OneCricketeer commented 3 years ago

Refer #325