apache-spark-on-k8s / spark

Apache Spark enhanced with native Kubernetes scheduler back-end: NOTE this repository is being ARCHIVED as all new development for the kubernetes scheduler back-end is now on https://github.com/apache/spark/
https://spark.apache.org/
Apache License 2.0
612 stars 118 forks source link

Implements a server for refreshing HDFS tokens, a part of secure HDFS support. #453

Open kimoonkim opened 7 years ago

kimoonkim commented 7 years ago

@ifilonenko @liyinan926

Implements a standalone server, named hadoop token refresh server, that extends the lifetimes of HDFS tokens stored in K8s Secrets. This is part of the secure HDFS support.

HDFS tokens expire after every 24 hours or some configured time period. You can renew them until max time 7 days. After that, you need to obtain brand new tokens.

This refresh server takes care of both short term token renewal and obtaining new tokens.

The server runs in a k8s pod. There is a Dockerfile for building docker images and a k8s deployment yaml file for launching the pod.

This code exists in a separate maven project with its own pom.xml file. You can build a tarball using:

$ mvn clean package

For details, see README.md

The refresh server has three parallel components:

  1. Token Refresh Service: An actor-based service that takes discovered secrets as input and schedules tasks to renew tokens in the secrets. Other components below send commands to this service.
  2. Secret Scanner: A periodic thread that scans and discovers secrets that have a well-known label key value pair. refresh-hadoop-tokens: "yes"
  3. Secret Watcher: An event watcher that discovers new or deleted secrets that have the said label.

Although its role overlaps with the secret watcher, the secret scanner is needed for bootstrapping and recovery:

Each renew task is in charge of one or more tokens stored in a K8s secret. When it runs, the task does:

  1. Renew tokens 24-hours if they are about to expire soon.
  2. Optionally, check if some tokens are about to reach their max time 7 days. If yes, obtain replacement tokens for them. And write the snapshot of all tokens back to the associated K8s secret.
  3. Schedule a new task near the next expire time by sending a command to the token refresh service.

Tested manually so far. (I'll add unit tests in this PR. I plan to add integration tests later when both the other secure HDFS PR and this PR are merged in)

Here's the log that shows how a brand new token is obtained:

17/10/05 13:48:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/10/05 13:48:09 INFO UserGroupInformation: Login successful for user kimoonkim using keytab file /mnt/secrets/krb5.keytab 17/10/05 13:48:19 INFO SecretScanner: Scanned /api/v1/namespaces/default/secrets/spark.kubernetes.kerberos.dt 17/10/05 13:48:19 INFO TokenRefreshService: Started refresh of tokens in /api/v1/namespaces/default/secrets/spark.kubernetes.kerberos.dt with akka.actor.LightArrayRevolverScheduler$$anon$3@50bec29b 17/10/05 13:48:19 INFO StarterTask: Matching secret data hadoop-tokens-1504043833345-86400127, result true 17/10/05 13:48:19 INFO StarterTask: Read Hadoop tokens: Map(Kind: HDFS_DELEGATION_TOKEN, Service: 192.168.6.157:8020, Ident: (HDFS_DELEGATION_TOKEN token 9 for kimoonkim) -> 1504130233472) 17/10/05 13:48:19 INFO TokenRefreshService: Scheduling refresh of tokens with /api/v1/namespaces/default/secrets/spark.kubernetes.kerberos.dt at now + 0 millis. 17/10/05 13:48:19 INFO DFSClient: Created HDFS_DELEGATION_TOKEN token 15 for kimoonkim on 192.168.6.157:8020 17/10/05 13:48:19 INFO RenewTask: Obtained token Kind: HDFS_DELEGATION_TOKEN, Service: 192.168.6.157:8020, Ident: (HDFS_DELEGATION_TOKEN token 15 for kimoonkim) 17/10/05 13:48:20 INFO RenewTask: Wrote a new token to a2ba24a7-8c14-11e7-8c63-02f2c310e88c 17/10/05 13:48:20 INFO RenewTask: Renewed tokens Map(Kind: HDFS_DELEGATION_TOKEN, Service: 192.168.6.157:8020, Ident: (HDFS_DELEGATION_TOKEN token 15 for kimoonkim) -> 1507322899852). Next expire time 1507322899852 17/10/05 13:48:20 INFO TokenRefreshService: Scheduling refresh of tokens with /api/v1/namespaces/default/secrets/spark.kubernetes.kerberos.dt at now + 77759847 millis.

After the above run, the K8s secret now has two data items, with the latest item containing the new token:

$ kubectl get secret -o json spark.kubernetes.kerberos.dt
{
    "apiVersion": "v1",
    "data": {
        "hadoop-tokens-1504043833345-86400127": "SERUUwABEjE5Mi4xNjguNi4xNTc6ODAyMDUAGGtpbW9vbmtpbUBQRVBQRVJEQVRBLkNPTQlraW1vb25raW0AigFeL/+XWYoBXlQMG1kJChQhrFzhamtHiO0PFswswtbjpaadaBVIREZTX0RFTEVHQVRJT05fVE9LRU4SMTkyLjE2OC42LjE1Nzo4MDIwAA==",
        "hadoop-tokens-1507236499185-86400667": "SERUUwABEjE5Mi4xNjguNi4xNTc6ODAyMDUAGGtpbW9vbmtpbUBQRVBQRVJEQVRBLkNPTQlraW1vb25raW0AigFe7kvRFYoBXxJYVRUPNRT9sWFg3sxt8q4A/ELbneeWSj2BahVIREZTX0RFTEVHQVRJT05fVE9LRU4SMTkyLjE2OC42LjE1Nzo4MDIwAA=="
    },
...
kimoonkim commented 7 years ago

Perhaps @ifilonenko @foxish would be most interested. There is one sore point in the token refresh server prototype. I used the akka library for message-driven actor thread model, which I think simplified the code a lot. (akka is the standard actor library for Scala)

But this is not a good thing to do for Spark as app framework, which moved away from akka for dependency management reason. See details at http://apache-spark-developers-list.1001551.n3.nabble.com/Why-did-spark-switch-from-AKKA-to-net-td21522.html.

More specifically, many user applications that link to Spark also linked to Akka as a library (e.g. say you want to write a service that receives requests from Akka and runs them on Spark). In that case, you'd have two conflicting versions of the Akka library in the same JVM.

But it's a fine thing to do for the token refresh server, which is an independent service app and has little to do with Spark. So I think it's best for the token refresh server to live outside the Spark repo. Or, if it's too much work, keep it as an independent project with its own pom.xml with no dependencies to/from Spark.

Thoughts?

liyinan926 commented 7 years ago

Components 2 and 3 seem to serve similar purposes. Why do we need a periodic task to scan secrets given that we can use the watch API to receive secret-related events?

kimoonkim commented 7 years ago

Glad you asked. Thanks for the question, @liyinan926. The scanner is needed for two cases:

  1. When the refresh server started for the first time or restarted after crashing. The watcher would not inform the refresh server of the list of secrets to support if those secrets were added before the refresh server started.

  2. When the watcher connection to the K8s API server was not stable intermittently and the refresh server was missing some events because of that. The periodic scanning would eventually discover new secrets added/deleted during the bad period.

I'll update the PR description as well.

liyinan926 commented 7 years ago

@kimoonkim Thanks for the clarification. Makes sense to me.

kimoonkim commented 7 years ago

@ifilonenko and I talked last Thu in a meeting. Two improvements were suggested:

  1. Currently, the refresh server starts by issuing DT renew RPCs for new DTs. This is done only for finding DT expire times. We should encode those expire times in secrets as annotations so we can save unnecessary NN RPCs.

  2. People would want to know status of DTs that the refresh server is maintaining in terms of refreshing. The refresh server should have a REST API for showing status.

kimoonkim commented 7 years ago

But it's a fine thing to do for the token refresh server, which is an independent service app and has little to do with Spark. So I think it's best for the token refresh server to live outside the Spark repo. Or, if it's too much work, keep it as an independent project with its own pom.xml with no dependencies to/from Spark.

The refresh server code now exists as an independent project with its own pom.xml. This is to avoid including akka into the spark distribution.

kimoonkim commented 7 years ago

Now the refresh server has a Dockerfile and a k8s deployment yaml file. I successfully launched a pod.

There is a Kerberos login issue. Debugging it now.

kimoonkim commented 7 years ago

The logic issue was caused because the Dockerfile did not put the hadoop conf dir in the classpath. Fixed in the latest commit. And confirmed the refresh server works as expected. I'll now address other issues.

kimoonkim commented 7 years ago

Retargeted to our main branch, and ready for review. @ifilonenko @liyinan926 PTAL.

TODOs:

  1. Turn some hard-coded parameters into config options.
  2. Add unit tests while review is in progress.
kimoonkim commented 7 years ago

Thanks for the review, @liyinan926. Addressed the comments. PTAL.

kimoonkim commented 7 years ago

Thanks for the review @ifilonenko. Yes, I plan to add unit tests before this gets merged in. But I hope we can delay integration tests until the other PR is merged in so that we can verify end-to-end in the integration test.

kimoonkim commented 7 years ago

I have been adding unit tests. I am getting there, but still need to add tests for renew tasks.

kimoonkim commented 7 years ago

Reminder to myself. I found an edge case that the current code does not handle. If a refresh server was down for a while, like a few days, and brought back up, the namenode may have removed some expired tokens in the mean time. It will return the following error when the refresh server tries to renew the token in the orphaned secret. We should think about how to fix this:

7/10/12 12:00:12 WARN RenewTask: Renewal request for unknown token at org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.renewToken(AbstractDelegationTokenSecretManager.java:502) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renewDelegationToken(FSNamesystem.java:6684) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.renewDelegationToken(NameNodeRpcServer.java:570) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.renewDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1005) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

org.apache.hadoop.security.token.SecretManager$InvalidToken: Renewal request for unknown token at org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.renewToken(AbstractDelegationTokenSecretManager.java:502) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renewDelegationToken(FSNamesystem.java:6684) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.renewDelegationToken(NameNodeRpcServer.java:570) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.renewDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1005) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
at org.apache.hadoop.hdfs.DFSClient$Renewer.renew(DFSClient.java:1121)
at org.apache.hadoop.security.token.Token.renew(Token.java:377)
at org.apache.spark.security.kubernetes.RenewTask.maybeGetNewExpireTime(TokenRefreshService.scala:283)
at org.apache.spark.security.kubernetes.RenewTask.org$apache$spark$security$kubernetes$RenewTask$$refresh(TokenRefreshService.scala:248)
at org.apache.spark.security.kubernetes.RenewTask$$anonfun$4.apply(TokenRefreshService.scala:226)
at org.apache.spark.security.kubernetes.RenewTask$$anonfun$4.apply(TokenRefreshService.scala:224)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.security.kubernetes.RenewTask.run(TokenRefreshService.scala:224)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): Renewal request for unknown token at org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.renewToken(AbstractDelegationTokenSecretManager.java:502) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renewDelegationToken(FSNamesystem.java:6684) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.renewDelegationToken(NameNodeRpcServer.java:570) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.renewDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1005) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

at org.apache.hadoop.ipc.Client.call(Client.java:1470)
at org.apache.hadoop.ipc.Client.call(Client.java:1401)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
at com.sun.proxy.$Proxy28.renewDelegationToken(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.renewDelegationToken(ClientNamenodeProtocolTranslatorPB.java:924)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy29.renewDelegationToken(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient$Renewer.renew(DFSClient.java:1119)
... 16 more
kimoonkim commented 7 years ago

I added unit tests for all major components and their common case code paths. (Error case code paths are not covered yet) PTAL.

ifilonenko commented 7 years ago

Will be doing another pass for unit tests, thank you for this!

kimoonkim commented 7 years ago

Yes, please take a look at unit tests. Also, please note that unit tests are not run yet by CI. I'll see what I can do about that.

kimoonkim commented 7 years ago

Running unit tests from the top like below fails. Seems like dependency issues. I need to investigate:

./build/mvn clean test -Pmesos -Pyarn -Phadoop-2.7 -Pkubernetes -Pkubernetes-hdfs-extra -pl core,resource-managers/kubernetes/core,resource-managers/kubernetes/token-refresh-server -am -Dtest=none '-Dsuffixes=^org.apache.spark.(?!SortShuffleSuite$|rdd.LocalCheckpointSuite$|deploy.SparkSubmitSuite$|deploy.StandaloneDynamicAllocationSuite$).*'