logbee / keyscore

Apache License 2.0
3 stars 0 forks source link

Unordered Filters #58

Closed mlandth closed 5 years ago

mlandth commented 5 years ago

I started a new IntegrationTest (WorkflowTest) to check actual filter operations in a pipeline. The WorkflowTest should build the following Pipeline: KafkaSource -> RetainFields -> RemoveFields -> RemoveFields -> KafkaSink

I'm inserting 3 Datasets before the RetainFields Filter. Each Dataset contains one Record with 7 Fields. (3x Text, 2x Number, 1x Boolean, 1x Health)

After the second RemoveFields Filter i am now extracting the Datasets and check that

  1. Exactly 3 Datasets come back
  2. The Record of each Dataset contains now exactly two specific Fields

Now to the Problem:

Sometimes the test passes. But for ~70% the test breaks with two different failures:

  1. When i extract the Datasets, i get an empty list.

OR

  1. The Records of the extracted Datasets are containing 3 instead of 2 Fields => Apparently the first isn't working (Which should remove one Field, that the second RemoveFields Filter doesn't remove).

I added some logs in the RetainFieldsFilterLogic and in the RemoveFieldsFilterLogic:

  override def onPull(): Unit = {
    log.debug("#Workflow: onPull")
    Thread.sleep(5)
    pull(in)
  }

Order of onPull() logs from the passing test:

RemoveFieldsFilterLogic - #Workflow: onPull
RemoveFieldsFilterLogic - #Workflow: onPull
RetainFieldsFilterLogic - #Workflow: onPull

One order of onPull() logs from the failing test (empty list of datasets):

RetainFieldsFilterLogic - #Workflow: onPull
RemoveFieldsFilterLogic - #Workflow: onPull
RemoveFieldsFilterLogic - #Workflow: onPull

Is it possible that we ignore the order of filters when we create a Pipeline?

@kKdH Houston, we have a problem.

mlandth commented 5 years ago

The Problem is that we completely ignore the in and out BlueprintRefs in the Backend when whe assemble the filters in the PipelineSupervisor. I'm on it.

kKdH commented 5 years ago

Yes that is true. We completely ignore the new concept of in- and out-ports. I thought we could defer it until we re-build that mechanism to support branching and merging.

I think there is a point where we lost the order of stages - e.g. construction of stages is now asynchronous. Maybe you can do a quick fix to keep the order. Do as less as possible. As i said these parts get re-build anyway.

kKdH commented 5 years ago

Ok. The problem is clear: We start a BlueprintMaterializer for each Stage. They finish their jobs in any order. And we just append their results in the order they finish.

That was my fault. 😞

Suggestion: Rewrite the withFilterStage function of the Pipeline class and sort the stages by their Ref/UUID. You get the right order from the PipelineBlueprint of the Pipeline.

kKdH commented 5 years ago

Hey @mlandth let me know whether you could solve/fix the problem for now.

mlandth commented 5 years ago

@kKdH For now this quickfix is working and all tests are passing until we rebuid the whole mechanism. Closing this issue.