SETL-Framework / setl

A simple Spark-powered ETL framework that just works 🍺
Apache License 2.0
178 stars 32 forks source link

SETL requirement failed Exception #176

Closed LiuxyEric closed 3 years ago

LiuxyEric commented 3 years ago

Hi, I am new to the SETL and trying to build my own application on top of it. However when I ran my application, It failed with the message: image

In my application, I have two stages.

I used @ Delivery (producer = classOf[FactoryName]) in Stage two's factory to get three outputs from Stage one's factories.

  @Delivery(producer = classOf[ClassName1])
  val df1 = spark.emptyDataFrame

  @Delivery(producer = classOf[ClassName2])
  val df2 = spark.emptyDataFrame

  @Delivery(producer = classOf[ClassName3])
  val df3 = spark.emptyDataFrame

I can't find out the problem. Would you please help me out.

qxzzxq commented 3 years ago

Hey sorry, I didn't see this issue. Did you solve it finally?

JorisTruong commented 3 years ago

Hi @LiuxyEric, I could not reproduce your issue. Did you add your second Stage before adding your first Stage ? Here's what I tried.

Declaring three Factories for the first Stage:

class Factory1 extends Factory[DataFrame] with HasSparkSession {
  import spark.implicits._

  override def read(): Factory1.this.type = this

  override def process(): Factory1.this.type = this

  override def write(): Factory1.this.type = this

  override def get(): DataFrame = List("1").toDF("id")
}
class Factory2 extends Factory[DataFrame] with HasSparkSession {
  import spark.implicits._

  override def read(): Factory2.this.type = this

  override def process(): Factory2.this.type = this

  override def write(): Factory2.this.type = this

  override def get(): DataFrame = List("2").toDF("id")
}
class Factory3 extends Factory[DataFrame] with HasSparkSession {
  import spark.implicits._

  override def read(): Factory3.this.type = this

  override def process(): Factory3.this.type = this

  override def write(): Factory3.this.type = this

  override def get(): DataFrame = List("3").toDF("id")
}

Declaring a Factory that ingest the results of the three previous Factories for the second Stage:

class FinalFactory extends Factory[Unit] with HasSparkSession {

  @Delivery(producer = classOf[Factory1])
  val resultOne: DataFrame = spark.emptyDataFrame
  @Delivery(producer = classOf[Factory2])
  val resultTwo: DataFrame = spark.emptyDataFrame
  @Delivery(producer = classOf[Factory3])
  val resultThree: DataFrame = spark.emptyDataFrame

  override def read(): FinalFactory.this.type = {
    resultOne.show(false)
    resultTwo.show(false)
    resultThree.show(false)

    this
  }

  override def process(): FinalFactory.this.type = this

  override def write(): FinalFactory.this.type = this

  override def get(): Unit = {}
}

My main function:

val setl: Setl = Setl.builder()
    .withDefaultConfigLoader()
    .getOrCreate()

val stageOne = new Stage()
    stageOne.addFactory[Factory1]()
    stageOne.addFactory[Factory2]()
    stageOne.addFactory[Factory3]()

setl
  .newPipeline()
  .addStage(stageOne) // notice the stageOne is before FinalFactory
  .addStage[FinalFactory]()
  .run()

Output:

+---+
|id |
+---+
|1  |
+---+

+---+
|id |
+---+
|2  |
+---+

+---+
|id |
+---+
|3  |
+---+

You can see that the second Stage correctly ingested the results of the first Stage's Factories.

For other people that might encounter the same issue and that are looking for an answer, the requirement failed is due to the pipeline expecting some deliverables but cannot find them. This feature has been added on v0.4.3. In the current v1.0.0-SNAPSHOT, we added more explicit exception messages, detailing the missing delivery. This might probably help in fixing the error. So if you are using SETL v0.4.3 onwards, make sure to check all the available Deliveries in your Pipeline, or an exception will be thrown.