AbsaOSS / spline

Data Lineage Tracking And Visualization Solution
https://absaoss.github.io/spline/
Apache License 2.0
603 stars 155 forks source link

Migrator stalls at a certain record number #740

Closed wajda closed 4 years ago

wajda commented 4 years ago

Migrator seems to finish/stall on record 84200:

Processed: 84200     Failures: 2459      Speed [doc/sec]: 17.26     Queue: [..........]
wajda commented 4 years ago

@hamc17

cerveada commented 4 years ago

I identified and fixed two problems:

problem 1

sorting of lineages wasn't using an index efficiently - that caused mongo to run out of memory for sorting and throw:

com.mongodb.MongoCommandException: Command failed with error 16820 (Location16820):
'Sort exceeded memory limit of 104857600 bytes, but did not opt in to external sorting. Aborting operation. Pass allowDiskUse:true to opt in.

The solution is to create an index.

 db.lineages_v4.createIndex({ "timestamp": -1, "rootDataset._id": 1 }, { "name": "index for spline migrator" })

problem 2

The migrator stopped on certain lineages and eventually the JVM run out of memory. This was caused be a lineage containing operation with 20000 rows. Those jobs were run only for testing purposes, so I deleted the linages because they are not needed.

wajda commented 4 years ago
[ERROR] [07/22/2020 17:22:04.577] [system-akka.actor.default-dispatcher-3] [akka://system/user/batch-migrator/$c] e12ee3d9-63bc-430f-b126-ef2d69a0c0d7
java.util.concurrent.ExecutionException: Boxed Error
    at scala.concurrent.impl.Promise$.resolver(Promise.scala:59)
    at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:51)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:92)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:92)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:92)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:49)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: salat.util.ToObjectGlitch: 

  null

  $anon$3(class za.co.absa.spline.model.DataLineage @ za.co.absa.spline.persistence.mongo.serialization.BSONSalatContext$$anon$1@d2a5b90) toObject failed on:
  SYM: za.co.absa.spline.model.DataLineage
  CONSTRUCTOR
public za.co.absa.spline.model.DataLineage(java.lang.String,java.lang.String,long,java.lang.String,scala.collection.Seq<za.co.absa.spline.model.op.Operation>,scala.collection.Seq<za.co.absa.spline.model.MetaDataset>,scala.collection.Seq<za.co.absa.spline.model.Attribute>,scala.collection.Seq<za.co.absa.spline.model.dt.DataType>,boolean)

---------- CONSTRUCTOR EXPECTS FOR PARAM [0] --------------
NAME:         appId
TYPE:         java.lang.String
DEFAULT ARG   [Missing, but unnecessary because input value was supplied]
@Ignore       false
---------- CONSTRUCTOR INPUT ------------------------
TYPE: java.lang.String
VALUE:
application_1542271209010_0481
------------------------------------------------------------

---------- CONSTRUCTOR EXPECTS FOR PARAM [1] --------------
NAME:         appName
TYPE:         java.lang.String
DEFAULT ARG   [Missing, but unnecessary because input value was supplied]
@Ignore       false
---------- CONSTRUCTOR INPUT ------------------------
TYPE: java.lang.String
VALUE:
Standardisation ConformanceTradeInstrumentTest 1 2018-07-16 4
------------------------------------------------------------

---------- CONSTRUCTOR EXPECTS FOR PARAM [2] --------------
NAME:         timestamp
TYPE:         long
DEFAULT ARG   [Missing, but unnecessary because input value was supplied]
@Ignore       false
---------- CONSTRUCTOR INPUT ------------------------
TYPE: java.lang.Long
VALUE:
1542377580756
------------------------------------------------------------

---------- CONSTRUCTOR EXPECTS FOR PARAM [3] --------------
NAME:         sparkVer
TYPE:         java.lang.String
DEFAULT ARG   [Missing, but unnecessary because input value was supplied]
@Ignore       false
---------- CONSTRUCTOR INPUT ------------------------
TYPE: java.lang.String
VALUE:
2.x
------------------------------------------------------------

---------- CONSTRUCTOR EXPECTS FOR PARAM [4] --------------
NAME:         operations
TYPE:         scala.collection.Seq<za.co.absa.spline.model.op.Operation>
DEFAULT ARG   [Missing, but unnecessary because input value was supplied]
@Ignore       false
---------- CONSTRUCTOR INPUT ------------------------
TYPE: scala.collection.immutable.$colon$colon[B]
VALUE:
List(Write(OperationProps(bc0da5a4-becb-4ecf-870b-2ec2d62fabc7,WriteCommand,List(ea185691-600e-4389-...
------------------------------------------------------------

---------- CONSTRUCTOR EXPECTS FOR PARAM [5] --------------
NAME:         datasets
TYPE:         scala.collection.Seq<za.co.absa.spline.model.MetaDataset>
DEFAULT ARG   [Missing, but unnecessary because input value was supplied]
@Ignore       false
---------- CONSTRUCTOR INPUT ------------------------
TYPE: scala.collection.immutable.Nil$[scala.runtime.Nothing$]
VALUE:
List()
------------------------------------------------------------

---------- CONSTRUCTOR EXPECTS FOR PARAM [6] --------------
NAME:         attributes
TYPE:         scala.collection.Seq<za.co.absa.spline.model.Attribute>
DEFAULT ARG   [Missing, but unnecessary because input value was supplied]
@Ignore       false
---------- CONSTRUCTOR INPUT ------------------------
TYPE: scala.collection.immutable.$colon$colon[B]
VALUE:
List(Attribute(9273010c-1289-46e8-8ebc-105728b1feac,componentWeight,e1228467-cf30-4405-a2b3-f9189651...
------------------------------------------------------------

---------- CONSTRUCTOR EXPECTS FOR PARAM [7] --------------
NAME:         dataTypes
TYPE:         scala.collection.Seq<za.co.absa.spline.model.dt.DataType>
DEFAULT ARG   [Missing, but unnecessary because input value was supplied]
@Ignore       false
---------- CONSTRUCTOR INPUT ------------------------
TYPE: scala.collection.immutable.$colon$colon[B]
VALUE:
List(Simple(e1228467-cf30-4405-a2b3-f9189651890b,string,true), Simple(4e7968ae-9ec9-4932-bb1d-0c59da...
------------------------------------------------------------

---------- CONSTRUCTOR EXPECTS FOR PARAM [8] --------------
NAME:         writeIgnored
TYPE:         boolean
DEFAULT ARG   false
@Ignore       false
---------- CONSTRUCTOR INPUT ------------------------
TYPE: java.lang.Boolean
VALUE:
false
------------------------------------------------------------

    at salat.ConcreteGrater.feedArgsToConstructor(Grater.scala:359)
    at salat.ConcreteGrater.asObject(Grater.scala:326)
    at za.co.absa.spline.persistence.mongo.MongoDataLineageReader$$anonfun$loadByDatasetId$1$$anonfun$apply$1.apply(MongoDataLineageReader.scala:41)
    at za.co.absa.spline.persistence.mongo.MongoDataLineageReader$$anonfun$loadByDatasetId$1$$anonfun$apply$1.apply(MongoDataLineageReader.scala:41)
    at scala.Option.map(Option.scala:146)
    at za.co.absa.spline.persistence.mongo.MongoDataLineageReader$$anonfun$loadByDatasetId$1.apply(MongoDataLineageReader.scala:41)
    at za.co.absa.spline.persistence.mongo.MongoDataLineageReader$$anonfun$loadByDatasetId$1.apply(MongoDataLineageReader.scala:41)
    at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
    at scala.util.Try$.apply(Try.scala:192)
    at scala.util.Success.map(Try.scala:237)
    ... 15 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.GeneratedConstructorAccessor44.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at salat.ConcreteGrater.feedArgsToConstructor(Grater.scala:350)
    ... 24 more
Caused by: java.lang.IllegalArgumentException: requirement failed: list of datasets cannot be empty
    at scala.Predef$.require(Predef.scala:224)
    at za.co.absa.spline.model.DataLineage.<init>(DataLineage.scala:49)
    ... 28 more
wajda commented 4 years ago

and another one occurs often:

[ERROR] [07/22/2020 17:22:07.983] [system-akka.actor.default-dispatcher-17] [akka://system/user/batch-migrator/$c] d2d155dc-3a2a-4fcc-a475-de3f9762f99b
za.co.absa.spline.migrator.rest.RestClient$HttpException: {"error":"JSON parse error: ; nested exception is com.twitter.finatra.json.internal.caseclass.exceptions.CaseClassMappingException: \nErrors:\t\tcom.twitter.finatra.json.internal.caseclass.exceptions.CaseClassValidationException: operations.reads.inputSources: field is required\n\n"}
    at za.co.absa.spline.migrator.rest.RestClientPlayWsImpl$$anon$1$$anonfun$post$1.apply(RestClientPlayWsImpl.scala:54)
    at za.co.absa.spline.migrator.rest.RestClientPlayWsImpl$$anon$1$$anonfun$post$1.apply(RestClientPlayWsImpl.scala:52)
    at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:253)
    at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:251)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
    at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
cerveada commented 4 years ago

problem 3

com.twitter.finatra.json.internal.caseclass.exceptions.CaseClassValidationException:
operations.reads.inputSources: field is required

This was cause but non-standard data source (Cobol) with missing data source URI. Unfortunately the URI is required in Spline 0.4+. Spline was probably able to capture the lineage, but it hadn't known CobolRelation datasource so it used some generic logic to save it without URI.

I solved the problem by assigning artificial URI to all Cobol operations in the mongo database.

db.operations_v4.find({"sourceType" : "???: za.co.absa.cobrix.spark.cobol.source.CobolRelation"}).forEach( function(i) 
{
    db.operations_v4.updateOne
    (
        { _id: i._id },
        {$set: {"sources":
            [
                {
                    "path" : "cobol:unknown-" + i._lineageId,
                    "datasetsIds" : []
                }
            ]} 
        }
    )
});
cerveada commented 4 years ago

problem 4

com.arangodb.ArangoDBException: Response: 500, Error: 4 - Builder value not yet sealed

This seems to be a problem or bug directly in ArangoDB. I created a ticket for it: https://github.com/arangodb/arangodb/issues/12304

wajda commented 4 years ago

it doesn't seem to require any Spline code change so far. The only remaining issue seems to be caused by the size of the lineage document. We don't see any feasible workaround so far and are rather waiting for the aforementioned Arrangodb issue resolution.

To proceed with the migration we could either:

  1. Get rid of those remaining large lineages as unimportant, or
  2. Wait for Spline 0.6+ with the refined storage model for expressions, that could solve this issue automatically. (We could still migrate the rest of data now and just add the remaining records in the future)
cerveada commented 4 years ago

problem 4

According to Arango team this is not a bug, but simply ArangoDB limitation. It doesn't support more than 63 levels of nesting.

The issue occurs when too deeply-nested documents (more than 63 levels of nesting) were inserted via an API that required a VelocyPack-to-JavaScript coversion or vice versa.

wajda commented 4 years ago

So I think we can close it then. There is nothing we can do about it in Spline 0.5. I've created a remediation ticket #788 for the next major Spline release.