Kotlin / kotlin-spark-api

This projects gives Kotlin bindings and several extensions for Apache Spark. We are looking to have this as a part of Apache Spark 3.x
Apache License 2.0
455 stars 34 forks source link

Dynamically invoking withSpark in dockerized Ktor web app throws UnsupportedFileSystemException #219

Open tyknkd opened 3 months ago

tyknkd commented 3 months ago

When withSpark is invoked dynamically in a dockerized Ktor web app, an UnsupportedFileSystemException is thrown.

Expected behavior: No exception is thrown.

A GitHub repo is here.

Broadcast.kt (from kotlin-spark-api example)

import org.jetbrains.kotlinx.spark.api.map
import org.jetbrains.kotlinx.spark.api.withSpark
import java.io.Serializable

object Broadcast {
    data class SomeClass(val a: IntArray, val b: Int) : Serializable

    fun broadcast(): MutableList<Int> {
        lateinit var result: MutableList<Int>
        withSpark(master = "local") {
            val broadcastVariable = spark.broadcast(SomeClass(a = intArrayOf(5, 6), b = 3))
            result = listOf(1, 2, 3, 4, 5)
                .toDS()
                .map {
                    val receivedBroadcast = broadcastVariable.value
                    it + receivedBroadcast.a.first()
                }
                .collectAsList()
            println(result)
        }
        return result
    }
}

Routing.kt

fun Application.configureRouting() {
    routing {
        get("/") {
            val list = Broadcast.broadcast()
            call.respondText(list.toString())
        }
    }
}

Dockerfile

# syntax=docker/dockerfile:1
FROM eclipse-temurin:11-jre-jammy AS jre-jammy-spark
RUN curl https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3-scala2.13.tgz -o spark.tgz && \
    tar -xf spark.tgz && \
    mv spark-3.3.2-bin-hadoop3-scala2.13 /opt/spark && \
    rm spark.tgz
ENV SPARK_HOME="/opt/spark"
ENV PATH="${PATH}:/opt/spark/bin:/opt/spark/sbin"

FROM gradle:8.4-jdk11 AS gradle-build
COPY --chown=gradle:gradle . /home/gradle/src
WORKDIR /home/gradle/src
RUN gradle buildFatJar --no-daemon

FROM jre-jammy-spark AS app
RUN mkdir -p /app
COPY --from=gradle-build /home/gradle/src/build/libs/*-all.jar /app/app.jar
ENTRYPOINT ["java","-jar","/app/app.jar"]

compose.yaml

services:
  app:
    build: .
    ports:
      - 8888:8888

In a shell, run:

$ docker compose up

Then, open http://localhost:8888 in a browser.

An org.apache.hadoop.fs.UnsupportedFileSystemException will be thrown:

app-1  | 2024-04-09 10:26:26.484 [main] INFO  ktor.application - Autoreload is disabled because the development mode is off.
app-1  | 2024-04-09 10:26:26.720 [main] INFO  ktor.application - Application started in 0.261 seconds.
app-1  | 2024-04-09 10:26:26.816 [DefaultDispatcher-worker-1] INFO  ktor.application - Responding at http://0.0.0.0:8888
app-1  | WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
app-1  | Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
app-1  | 2024-04-09 10:27:07.885 [eventLoopGroupProxy-4-1] WARN  o.a.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
app-1  | WARNING: An illegal reflective access operation has occurred
app-1  | WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/app/app.jar) to constructor java.nio.DirectByteBuffer(long,int)
app-1  | WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
app-1  | WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
app-1  | WARNING: All illegal access operations will be denied in a future release
app-1  | 2024-04-09 10:27:10.066 [eventLoopGroupProxy-4-1] WARN  o.a.spark.sql.internal.SharedState - URL.setURLStreamHandlerFactory failed to set FsUrlStreamHandlerFactory
app-1  | 2024-04-09 10:27:10.071 [eventLoopGroupProxy-4-1] WARN  o.a.spark.sql.internal.SharedState - Cannot qualify the warehouse path, leaving it unqualified.
app-1  | org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file"
app-1  |    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3585)
app-1  |    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3608)
app-1  |    at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
app-1  |    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3712)
app-1  |    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3663)
app-1  |    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:557)
app-1  |    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
app-1  |    at org.apache.spark.sql.internal.SharedState$.qualifyWarehousePath(SharedState.scala:282)
app-1  |    at org.apache.spark.sql.internal.SharedState.liftedTree1$1(SharedState.scala:80)
app-1  |    at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:79)
app-1  |    at org.apache.spark.sql.SparkSession.$anonfun$sharedState$1(SparkSession.scala:143)
app-1  |    at scala.Option.getOrElse(Option.scala:201)
app-1  |    at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:143)
app-1  |    at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:142)
app-1  |    at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:162)
app-1  |    at scala.Option.getOrElse(Option.scala:201)
app-1  |    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:160)
app-1  |    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:157)
app-1  |    at org.apache.spark.sql.SparkSession.$anonfun$new$3(SparkSession.scala:117)
app-1  |    at scala.Option.map(Option.scala:242)
app-1  |    at org.apache.spark.sql.SparkSession.$anonfun$new$1(SparkSession.scala:117)
app-1  |    at org.apache.spark.sql.internal.SQLConf$.get(SQLConf.scala:230)
app-1  |    at org.apache.spark.sql.catalyst.SerializerBuildHelper$.nullOnOverflow(SerializerBuildHelper.scala:29)
app-1  |    at org.apache.spark.sql.catalyst.SerializerBuildHelper$.createSerializerForJavaBigDecimal(SerializerBuildHelper.scala:158)
app-1  |    at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:549)
app-1  |    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
app-1  |    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:948)
app-1  |    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:947)
app-1  |    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
app-1  |    at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:448)
app-1  |    at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerForType$1(ScalaReflection.scala:437)
app-1  |    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
app-1  |    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:948)
app-1  |    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:947)
app-1  |    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
app-1  |    at org.apache.spark.sql.catalyst.ScalaReflection$.serializerForType(ScalaReflection.scala:429)
app-1  |    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:55)
app-1  |    at org.apache.spark.sql.Encoders$.DECIMAL(Encoders.scala:100)
app-1  |    at org.apache.spark.sql.Encoders.DECIMAL(Encoders.scala)
app-1  |    at org.jetbrains.kotlinx.spark.api.EncodingKt.<clinit>(Encoding.kt:87)
app-1  |    at com.example.plugins.Broadcast.broadcast(Broadcast.kt:62)
app-1  |    at com.example.plugins.RoutingKt$configureRouting$1$1.invokeSuspend(Routing.kt:10)
app-1  |    at com.example.plugins.RoutingKt$configureRouting$1$1.invoke(Routing.kt)
app-1  |    at com.example.plugins.RoutingKt$configureRouting$1$1.invoke(Routing.kt)
app-1  |    at io.ktor.server.routing.Route$buildPipeline$1$1.invokeSuspend(Route.kt:116)
app-1  |    at io.ktor.server.routing.Route$buildPipeline$1$1.invoke(Route.kt)
app-1  |    at io.ktor.server.routing.Route$buildPipeline$1$1.invoke(Route.kt)
app-1  |    at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
app-1  |    at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
app-1  |    at io.ktor.util.pipeline.SuspendFunctionGun.execute$ktor_utils(SuspendFunctionGun.kt:98)
app-1  |    at io.ktor.util.pipeline.Pipeline.execute(Pipeline.kt:77)
app-1  |    at io.ktor.server.routing.Routing$executeResult$$inlined$execute$1.invokeSuspend(Pipeline.kt:478)
app-1  |    at io.ktor.server.routing.Routing$executeResult$$inlined$execute$1.invoke(Pipeline.kt)
app-1  |    at io.ktor.server.routing.Routing$executeResult$$inlined$execute$1.invoke(Pipeline.kt)
app-1  |    at io.ktor.util.debug.ContextUtilsKt.initContextInDebugMode(ContextUtils.kt:17)
app-1  |    at io.ktor.server.routing.Routing.executeResult(Routing.kt:190)
app-1  |    at io.ktor.server.routing.Routing.interceptor(Routing.kt:64)
app-1  |    at io.ktor.server.routing.Routing$Plugin$install$1.invokeSuspend(Routing.kt:140)
app-1  |    at io.ktor.server.routing.Routing$Plugin$install$1.invoke(Routing.kt)
app-1  |    at io.ktor.server.routing.Routing$Plugin$install$1.invoke(Routing.kt)
app-1  |    at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
app-1  |    at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
app-1  |    at io.ktor.server.engine.BaseApplicationEngineKt$installDefaultTransformationChecker$1.invokeSuspend(BaseApplicationEngine.kt:124)
app-1  |    at io.ktor.server.engine.BaseApplicationEngineKt$installDefaultTransformationChecker$1.invoke(BaseApplicationEngine.kt)
app-1  |    at io.ktor.server.engine.BaseApplicationEngineKt$installDefaultTransformationChecker$1.invoke(BaseApplicationEngine.kt)
app-1  |    at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
app-1  |    at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
app-1  |    at io.ktor.util.pipeline.SuspendFunctionGun.execute$ktor_utils(SuspendFunctionGun.kt:98)
app-1  |    at io.ktor.util.pipeline.Pipeline.execute(Pipeline.kt:77)
app-1  |    at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1$invokeSuspend$$inlined$execute$1.invokeSuspend(Pipeline.kt:478)
app-1  |    at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1$invokeSuspend$$inlined$execute$1.invoke(Pipeline.kt)
app-1  |    at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1$invokeSuspend$$inlined$execute$1.invoke(Pipeline.kt)
app-1  |    at io.ktor.util.debug.ContextUtilsKt.initContextInDebugMode(ContextUtils.kt:17)
app-1  |    at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1.invokeSuspend(DefaultEnginePipeline.kt:123)
app-1  |    at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1.invoke(DefaultEnginePipeline.kt)
app-1  |    at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1.invoke(DefaultEnginePipeline.kt)
app-1  |    at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
app-1  |    at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
app-1  |    at io.ktor.util.pipeline.SuspendFunctionGun.execute$ktor_utils(SuspendFunctionGun.kt:98)
app-1  |    at io.ktor.util.pipeline.Pipeline.execute(Pipeline.kt:77)
app-1  |    at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1$invokeSuspend$$inlined$execute$1.invokeSuspend(Pipeline.kt:478)
app-1  |    at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1$invokeSuspend$$inlined$execute$1.invoke(Pipeline.kt)
app-1  |    at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1$invokeSuspend$$inlined$execute$1.invoke(Pipeline.kt)
app-1  |    at io.ktor.util.debug.ContextUtilsKt.initContextInDebugMode(ContextUtils.kt:17)
app-1  |    at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1.invokeSuspend(NettyApplicationCallHandler.kt:140)
app-1  |    at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1.invoke(NettyApplicationCallHandler.kt)
app-1  |    at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1.invoke(NettyApplicationCallHandler.kt)
app-1  |    at kotlinx.coroutines.intrinsics.UndispatchedKt.startCoroutineUndispatched(Undispatched.kt:44)
app-1  |    at kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:112)
app-1  |    at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:126)
app-1  |    at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch(Builders.common.kt:56)
app-1  |    at kotlinx.coroutines.BuildersKt.launch(Unknown Source)
app-1  |    at io.ktor.server.netty.NettyApplicationCallHandler.handleRequest(NettyApplicationCallHandler.kt:41)
app-1  |    at io.ktor.server.netty.NettyApplicationCallHandler.channelRead(NettyApplicationCallHandler.kt:33)
app-1  |    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
app-1  |    at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
app-1  |    at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:425)
app-1  |    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
app-1  |    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
app-1  |    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
app-1  |    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413)
app-1  |    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
app-1  |    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
app-1  |    at io.ktor.server.netty.EventLoopGroupProxy$Companion.create$lambda$1$lambda$0(NettyApplicationEngine.kt:296)
app-1  |    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
app-1  |    at java.base/java.lang.Thread.run(Unknown Source)
app-1  | [6, 7, 8, 9, 10]
tyknkd commented 3 months ago

For comparison, I created an equivalent dockerized Spring Boot app here. Notably, no exception is thrown. This seems to suggest that the issue lies within kotlin-spark.

In the Spring Boot version, it seems that Spark creates a local temporary directory as part of the preparation of the Spark session before it invokes the broadcast function:

app-1  | 2024-04-10 09:00:43.393  INFO 1 --- [nio-8888-exec-1] o.a.s.SparkEnv                           : Registering BlockManagerMasterHeartbeat
app-1  | 2024-04-10 09:00:43.410  INFO 1 --- [nio-8888-exec-1] o.a.s.s.DiskBlockManager                 : Created local directory at /tmp/blockmgr-c9cef486-62f2-431a-8408-1e48b933da34
app-1  | 2024-04-10 09:00:43.436  INFO 1 --- [nio-8888-exec-1] o.a.s.s.m.MemoryStore                    : MemoryStore started with capacity 2.1 GiB
. . .
app-1  | 2024-04-10 09:00:43.829  INFO 1 --- [nio-8888-exec-1] o.a.s.s.m.MemoryStore                    : Block broadcast_0 stored as values in memory (estimated size 72.0 B, free 2.1 GiB)
app-1  | 2024-04-10 09:00:43.856  INFO 1 --- [nio-8888-exec-1] o.a.s.s.m.MemoryStore                    : Block broadcast_0_piece0 stored as bytes in memory (estimated size 146.0 B, free 2.1 GiB)
app-1  | 2024-04-10 09:00:43.858  INFO 1 --- [ckManagerMaster] o.a.s.s.BlockManagerInfo                 : Added broadcast_0_piece0 in memory on 1d1b66d9e151:43605 (size: 146.0 B, free: 2.1 GiB)
app-1  | 2024-04-10 09:00:43.862  INFO 1 --- [nio-8888-exec-1] o.a.s.SparkContext                       : Created broadcast 0 from broadcast at SparkBroadcast.java:30

It seems the exception is thrown in the Ktor app roughly at the point where the temporary directory would have been created:

app-1  | 2024-04-10 09:53:18.476 [eventLoopGroupProxy-4-1] WARN  o.a.spark.sql.internal.SharedState - URL.setURLStreamHandlerFactory failed to set FsUrlStreamHandlerFactory
app-1  | 2024-04-10 09:53:18.477 [eventLoopGroupProxy-4-1] INFO  o.a.spark.sql.internal.SharedState - Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
app-1  | 2024-04-10 09:53:18.482 [eventLoopGroupProxy-4-1] WARN  o.a.spark.sql.internal.SharedState - Cannot qualify the warehouse path, leaving it unqualified.
app-1  | org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file"
. . .
app-1  | 2024-04-10 09:53:19.148 [eventLoopGroupProxy-4-1] INFO  o.a.spark.storage.memory.MemoryStore - Block broadcast_0 stored as values in memory (estimated size 72.0 B, free 2.1 GiB)
app-1  | 2024-04-10 09:53:19.180 [eventLoopGroupProxy-4-1] INFO  o.a.spark.storage.memory.MemoryStore - Block broadcast_0_piece0 stored as bytes in memory (estimated size 150.0 B, free 2.1 GiB)
app-1  | 2024-04-10 09:53:19.182 [dispatcher-BlockManagerMaster] INFO  o.a.spark.storage.BlockManagerInfo - Added broadcast_0_piece0 in memory on dcb91ee36ad3:36665 (size: 150.0 B, free: 2.1 GiB)
app-1  | 2024-04-10 09:53:19.186 [eventLoopGroupProxy-4-1] INFO  org.apache.spark.SparkContext - Created broadcast 0 from broadcast at Broadcast.kt:61
Jolanrensen commented 3 months ago

Like I responded on slack:

The file reading exception print happens when the DECIMAL encoder is pre-loaded by the Kotlin Spark API Encoders file. Can you try to instantiate the same encoder in the non-kotlin spark project to see what happens?

In the spark 3.4+ branch of the project the encoding part is completely overhauled, so this issue won't be there anymore. But it's still a WIP.

Your program still executes fine. It's a caught exception that's just logged to the output.

tyknkd commented 3 months ago

Thank you so much for pinpointing the source of the exception. I'm glad to know it's not because of an error on my part. Since this issue will go away with the new release, it seems like spending any more time on it would be purely academic, so I won't trouble you any more and will let you get back to your more important work on the 3.4+ fix. Thanks again and have a great weekend!