keytiong / opentelemetry-spark

MIT License
0 stars 3 forks source link

Spark on Kubernetes: context propagation doesn't work #8

Open progxaker opened 3 months ago

progxaker commented 3 months ago

Hello @keytiong. I hope you are doing well.

The problem statement: the task advice doesn't work with Spark on Kubernetes, but it works locally. This causes a problem with context propagation, because as far as I understand, the enterConstructor_V3_4 function injects stageContext, and since it isn't called, spark_task spans has no parent (spark_stage has a parent - spark_job, no problem here). Note that spark_task is created, it just doesn't have a parent.

What was done

Logs (Kubernetes)

Driver

[otel.javaagent 2024-06-12 10:17:09:448 +0000] [dag-scheduler-event-loop] DEBUG io.opentelemetry.javaagent.tooling.AgentInstaller$TransformLoggingListener Transformed org.apache.spark.scheduler.Task -- jdk.internal.loader.ClassLoaders$AppClassLoader@6a10b263
24/06/12 10:17:09 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage (MapPartitions RDD [3] at load at DataValidationService.java:101) (first 15 tasks are for partitions Vector (0))
24/06/12 10:17:09 INFO TaskScheduler Impl: Adding task set 0.0 with 1 tasks resource profile
24/06/12 10:17:09 DEBUG TaskSetManager: Epoch for TaskSet 0.0: 0
24/06/12 10:17:09 DEBUG TaskSetManager: Adding pending tasks took 1 ms
24/06/12 10:17:09 DEBUG TaskSetManager: Valid locality levels for TaskSet 0.0: NO_PREF, ANY
24/06/12 10:17:09 DEBUG TaskSchedulerImpl: parentName:, name: TaskSet_0.0, runningTasks: 0
24/06/12 10:17:09 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (10.244.21.109, executor 1, partition 0, PROCESS_LOCAL, 8519 bytes)
enter_LiveListenerBus
24/06/12 10:17:09 DEBUG TaskSetManager: No tasks for locality level NO_PREF, so moving to locality level ANY

Executor

24/06/12 10:17:09 INFO CoarseGrainedExecutorBackend: Got assigned task 0
[otel.javaagent 2024-06-12 10:17:09:503 +0000] [dispatcher-Executor] DEBUG io.opentelemetry.javaagent.tooling.AgentInstaller$TransformLoggingListener - Transformed org.apache.spark.executor.Executor$TaskRunner -- jdk.internal.loader.ClassLoaders$AppClassLoader@78c1a023
enter_TaskRunnerAdvice
[otel.javaagent 2024-06-12 10:17:09:514 +0000] [Executor task launch worker-0] DEBUG io.opentelemetry.javaagent.tooling.AgentInstaller$TransformLoggingListener - Transformed org.apache.spark.scheduler.Task -- jdk.internal.loader.ClassLoaders$AppClassLoader@78c1a023
[otel.javaagent 2024-06-12 10:17:09:515 +0000] [Executor task launch worker-0] DEBUG io.opentelemetry.javaagent.shaded. instrumentation.api.internal.EmbeddedInstrumentationProperties - Did not find embedded instrumentation properties file META-INF/io/opentelemetry/instrumentation/apache-spark.properties
24/06/12 10:17:09 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

Logs (Local)

[otel.javaagent 2024-06-13 09:21:44:063 +0000] [dag-scheduler-event-loop] DEBUG io.opentelemetry.javaagent.tooling.AgentInstaller$TransformLoggingListener - Transformed org.apache.spark.scheduler.Task -- jdk.internal.loader.ClassLoaders$AppClassLoader@1dbd16a6
enter_TaskAdvice_V3_4
24/06/13 09:21:44 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[8] at count at SimpleApp.java:10) (first 15 tasks are for partitions Vector(0))
24/06/13 09:21:44 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0
enter_LiveListenerBus
24/06/13 09:21:44 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (d88fe25403ff, executor driver, partition 0, PROCESS_LOCAL, 7896 bytes)
enter_LiveListenerBus
[otel.javaagent 2024-06-13 09:21:44:252 +0000] [dispatcher-event-loop-2] DEBUG io.opentelemetry.javaagent.tooling.AgentInstaller$TransformLoggingListener - Transformed org.apache.spark.executor.Executor$TaskRunner -- jdk.internal.loader.ClassLoaders$AppClassLoader@1dbd16a6
enter_TaskRunner
[otel.javaagent 2024-06-13 09:21:44:265 +0000] [Executor task launch worker-0] DEBUG io.opentelemetry.javaagent.shaded.instrumentation.api.internal.EmbeddedInstrumentationProperties - Did not find embedded instrumentation properties file META-INF/io/opentelemetry/instrumentation/apache-spark.properties
24/06/13 09:21:44 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)       
progxaker commented 1 month ago

I prepared a quick fix. It's not perfect because the advice needs to be applied more specifically, but anyway, I just wanted to see if it works.

Patch ```patch From f828b347b142ba460a6b1c8a09f97298acaaa83d Mon Sep 17 00:00:00 2001 From: Eduard Mielieshkin Date: Fri, 2 Aug 2024 17:28:45 +0300 Subject: [PATCH] Hot fix for the TaskInsrumentation Advice --- .../spark/TaskInstrumentation.java | 25 ++----------------- 1 file changed, 2 insertions(+), 23 deletions(-) diff --git a/src/main/java/io/opentelemetry/javaagent/instrumentation/spark/TaskInstrumentation.java b/src/main/java/io/opentelemetry/javaagent/instrumentation/spark/TaskInstrumentation.java index 107d941..37ef69f 100644 --- a/src/main/java/io/opentelemetry/javaagent/instrumentation/spark/TaskInstrumentation.java +++ b/src/main/java/io/opentelemetry/javaagent/instrumentation/spark/TaskInstrumentation.java @@ -44,19 +44,7 @@ public class TaskInstrumentation implements TypeInstrumentation { @Override public void transform(TypeTransformer typeTransformer) { - typeTransformer.applyAdviceToMethod( - isConstructor() - .and(takesArgument(0, Integer.TYPE)) - .and(takesArgument(1, Integer.TYPE)) - .and(takesArgument(2, Integer.TYPE)) - .and(takesArgument(3, Integer.TYPE)) - .and(takesArgument(4, Properties.class)) - .and(takesArgument(5, byte[].class)) - .and(takesArgument(6, named("scala.Option"))) - .and(takesArgument(7, named("scala.Option"))) - .and(takesArgument(8, named("scala.Option"))) - .and(takesArgument(9, Boolean.TYPE)), - this.getClass().getName() + "$TaskAdvice_V3_4"); + typeTransformer.applyAdviceToMethod(isConstructor(), this.getClass().getName() + "$TaskAdvice_V3_4"); typeTransformer.applyAdviceToMethod( isConstructor() .and(takesArgument(0, Integer.TYPE)) @@ -75,16 +63,7 @@ public class TaskInstrumentation implements TypeInstrumentation { @Advice.OnMethodEnter(suppress = Throwable.class) public static void enterConstructor_v3_4( - int stageId, - int stageAttemptId, - int partitionId, - int numPartitions, - Properties localProperties, - byte[] serializedTaskMetrics, - scala.Option jobId, - scala.Option appId, - scala.Option appAttemptId, - boolean isBarrier) { + @Advice.Argument(0) int stageId, @Advice.Argument(5) Properties localProperties) { Context stageContext = ApacheSparkSingletons.getStageContext(stageId); -- 2.34.1 ```
keytiong commented 1 month ago

@progxaker,

Apology for missing the issue when it was raised back in June. I have not tested the spark extension on Kubernetes. Will spend some time setting this up to investigate that.

In the mean time, can I confirm that the Spark extension does work to some extent in Kubernetes but just not context propagation? I read it that you're getting spans but no parent id.

And may I know which version of Spark are you running? Are you running the same Spark version locally and on Kubernetes?

progxaker commented 1 month ago

In the mean time, can I confirm that the Spark extension does work to some extent in Kubernetes but just not context propagation? I read it that you're getting spans but no parent id.

Yes, it works, but with some limitations, I suppose.

  1. The current issue.

  2. Adopting of spans by application spans. For example:

    Current behavior ``` app trace ==== start \- read \- process \- write one of the spark traces === spark_job \- spark_stage \- spark_task ```
    Expected behavior ``` app trace ==== start \- read \- process \- spark_job \- spark_job \- spark_task \- write \- spark_job \- spark_job \- spark_task ```

    I haven't tried (seen) this on the local machine, but I fixed it for Kubernetes. When I clean up the test code, I'll create an issue and pull request with changes.

  3. Executor ID. At this point I'm not sure if this should be in the attributes, but I'm interested in adding them to create a chart with a load (tasks per executor) or something like that.


And may I know which version of Spark are you running? Are you running the same Spark version locally and on Kubernetes?


Thanks for the response, I really appreciate it :)