Closed CloudNiner closed 5 years ago
Spark jobs using the new Pipeline API will fail to serialize due to the loggers and UDFs not being marked as lazy and transient.
I was able to get my Spark job to run with the following diff, but there are some other UDFs defined in the project that may cause the same issue:
diff --git a/src/main/scala/vectorpipe/GenerateVT.scala b/src/main/scala/vectorpipe/GenerateVT.scala index 74cb25b..bf45ea9 100644 --- a/src/main/scala/vectorpipe/GenerateVT.scala +++ b/src/main/scala/vectorpipe/GenerateVT.scala @@ -22,7 +22,7 @@ import scala.util.{Failure, Success, Try} object GenerateVT { - lazy val logger = Logger.getRootLogger() + @transient lazy val logger = Logger.getRootLogger() type VTF[G <: Geometry] = Feature[G, Map[String, Value]] // type VTContents = (Seq[VTF[Point]], Seq[VTF[MultiPoint]], Seq[VTF[Line]], Seq[VTF[MultiLine]], Seq[VTF[Polygon]], Seq[VTF[MultiPolygon]]) diff --git a/src/main/scala/vectorpipe/internal/package.scala b/src/main/scala/vectorpipe/internal/package.scala index 3d6bda3..9d54f5d 100644 --- a/src/main/scala/vectorpipe/internal/package.scala +++ b/src/main/scala/vectorpipe/internal/package.scala @@ -22,7 +22,7 @@ package object internal { val RelationType: Byte = 3 val MultiPolygonRoles: Seq[String] = Set("", "outer", "inner").toSeq - lazy val logger: Logger = Logger.getLogger(getClass) + @transient lazy val logger: Logger = Logger.getLogger(getClass) lazy val BareElementSchema = StructType( StructField("changeset", LongType, nullable = false) :: diff --git a/src/main/scala/vectorpipe/relations/MultiPolygons.scala b/src/main/scala/vectorpipe/relations/MultiPolygons.scala index 27f6887..a6b24dc 100644 --- a/src/main/scala/vectorpipe/relations/MultiPolygons.scala +++ b/src/main/scala/vectorpipe/relations/MultiPolygons.scala @@ -7,7 +7,7 @@ import org.apache.log4j.Logger import vectorpipe.internal.WayType object MultiPolygons { - private lazy val logger = Logger.getLogger(getClass) + @transient private lazy val logger = Logger.getLogger(getClass) val prepGeomFactory = new PreparedGeometryFactory def build(id: Long, diff --git a/src/main/scala/vectorpipe/relations/Routes.scala b/src/main/scala/vectorpipe/relations/Routes.scala index 191f31e..c3fb8c9 100644 --- a/src/main/scala/vectorpipe/relations/Routes.scala +++ b/src/main/scala/vectorpipe/relations/Routes.scala @@ -6,7 +6,7 @@ import org.apache.log4j.Logger import vectorpipe.internal.WayType object Routes { - private lazy val logger = Logger.getLogger(getClass) + @transient private lazy val logger = Logger.getLogger(getClass) def build(id: Long, version: Int, diff --git a/src/main/scala/vectorpipe/vectortile/package.scala b/src/main/scala/vectorpipe/vectortile/package.scala index 94aca0f..f5c33fc 100644 --- a/src/main/scala/vectorpipe/vectortile/package.scala +++ b/src/main/scala/vectorpipe/vectortile/package.scala @@ -13,12 +13,12 @@ import scala.concurrent._ import scala.concurrent.duration._ import scala.util.{Try, Success, Failure} -package object vectortile { +package object vectortile extends Serializable { type VectorTileFeature[+G <: Geometry] = Feature[G, Map[String, Value]] - val logger = org.apache.log4j.Logger.getRootLogger + @transient lazy val logger = org.apache.log4j.Logger.getRootLogger - val st_reprojectGeom = udf { (g: jts.Geometry, srcProj: String, destProj: String) => + @transient lazy val st_reprojectGeom = udf { (g: jts.Geometry, srcProj: String, destProj: String) => val trans = Proj4Transform(CRS.fromString(srcProj), CRS.fromString(destProj)) val gt = Geometry(g) gt.reproject(trans).jtsGeom
Couple notes: (1) GenerateVT has been taken behind the chemical sheds and shot. (2) This should be submitted as a PR. In the worst case, these changes will do no harm, and may insulate us from future trouble.
Spark jobs using the new Pipeline API will fail to serialize due to the loggers and UDFs not being marked as lazy and transient.
I was able to get my Spark job to run with the following diff, but there are some other UDFs defined in the project that may cause the same issue: