mrpowers-io / spark-daria

Essential Spark extensions and helper methods ✨😲
MIT License
747 stars 150 forks source link

Run CustomTransform objects as a DAG #38

Closed MrPowers closed 3 years ago

MrPowers commented 6 years ago

See the discussion in this PR: https://github.com/MrPowers/spark-daria/pull/37#issuecomment-417429769

snithish commented 6 years ago

@MrPowers a dag based execution of transformations would be a great learning experience and a great functionality.

MrPowers commented 6 years ago

@lizparody - want to grab this one? 🤓

MrPowers commented 6 years ago

@snithish - BTW, I created a directed acyclic graph library in Ruby a few years ago: https://github.com/MrPowers/directed_graph We might be able to leverage this and build something similar in Scala 🤓

MrPowers commented 6 years ago

@snithish @afranzi - Here is a blog post I just wrote on limiting function order dependencies: https://www.mungingdata.com/episodes/9-limiting-order-dependencies-in-spark-functions

I will start thinking about how to make directed acylic graphs in Scala. We'll have to make some sort of validation script to make sure the DAG of CustomTransform objects is valid (e.g. one CustomTransform is in fact appending the columns that are required for the next CustomTransform to run). I will keep you both posted 😄

afranzi commented 6 years ago

Nice blog post @MrPowers, Would you like to try to define the column dependencies with StructField? This should allow improving the dependency validation between transformations.

So instead of being a country column, it's a StructField("country", StringType).

MrPowers commented 6 years ago

@afranzi - Thanks for the comments. I updated the containsColumn() method, so it can take a StructField argument as well: https://github.com/MrPowers/spark-daria/commit/45374f0689df30b8f7c6f25831abddb496115a1d

I updated the blog post with this text to make sure all readers know this is an option.

You can also pass the containsColumn() method a StructField argument if you'd like to validate the column name, type, and nullable property.

df.containsColumn(StructField("country", StringType, true))

Thanks again - really appreciate your feedback!

MrPowers commented 6 years ago

@snithish @afranzi - I updated the composeTrans() method so it automatically skips any transformations that don't need to be run: https://github.com/MrPowers/spark-daria/commit/8bb64daacd717b768a806af7fd10b9fcba49a6c3

I'll write the DAG code now. There will be a method that'll return the shortest path from one node to another in the DAG. That method will return the shortest path as a list of CustomTransform objects. The user will be able to plug that list right into the composeTrans() method.

I'll keep you posted on my progress.

afranzi commented 6 years ago

Hi @MrPowers,

I've added a comment to your commit since there wasn't a PR :shame: :) https://github.com/MrPowers/spark-daria/commit/8bb64daacd717b768a806af7fd10b9fcba49a6c3#r30382361

MrPowers commented 6 years ago

@afranzi - You're right that I should be working off of pull requests and you're also right that the skipWhenPossible option should be in the CustomTransform class. I added this fix in a PR: https://github.com/MrPowers/spark-daria/pull/41 Does this look good to merge?

afranzi commented 6 years ago

LGTM :)

MrPowers commented 6 years ago

@afranzi - Awesome, I merged the PR: https://github.com/MrPowers/spark-daria/pull/41

I will keep working on the DAG approach and will keep you posted.

I am always open to new ideas about how to solve this problem, so let me know if you have any additional ideas and I'll be happy to explore them.

fbbergamo commented 5 years ago

Hi people,

I have been working to organise a spark project on the last months on a DAG way, but I just build the interface like this to try to isolate the dataframes, I don't know if its related with this work because on my project we are doing a lot of transformations on the same object..

trait SparkOperation {
   def transform(): DataFrame
   val requirements: Map[String, SparkOperation] = Map()
  def apply(): DataFrame =  transform()
}
object SalesPerBranch extends SparkOperation {
   override val requirements: Map[String, SparkOperation] = Map(
    "sales" -> Sales // this also could be a file like a json, parquet.. using the interface build to extract data or could be another dataframe already built 
  )

  def transform: DataFrame = {
    // on this method could be a lot of transf like pivot, count...
    this.sales().groupBy("branch")
  }
}

The unit test is done by each object so we have a good test coverage on the project and just testing the output of each SparkOperation

The future idea its to read all the dependencies using the requirements and build a DAG to improve caches, custom transformations and even to run dataframes that are not releated with each other in different clusters.

I based this work on a tech-talk that I watched in a meetup in São Paulo but I don't know what the community think about this idea and I am also new to scala, spark and fp programming so I am open to new ideas. :)

We just include spark-daria on our project and We might start to contribute on the next months also :)

afranzi commented 5 years ago

Hi @fbbergamo, I would recommend to read the following article: Chaining Custom DataFrame Transformations.

It seems weird to have the def transform() without attributes. Could you share more code on how is your Spark Job structured?

If we follow your approach using the Spark Transforms it would look something like

  def transform(sales:DataFrame): DataFrame = {
      sales.groupBy("branch")
  }
fbbergamo commented 5 years ago

Hi @afranzi Thanks for the recommendation

Its common for us to use multiples dataframes to merge into a single on for us it make sense to have a DAG of requirement dataframes, maybe this its different from the problem that you guys are dealing.

At the early stage of the project we decide to let the requirements inside the object implementation but this help at least to split our pipeline in small pieces of code. Maybe the method transform its a bad name for this interface.

I will work to upload the basic structure that we are using to a git file to be more clear about.

Is there any structure on spark-daria nowadays that might help me to organize the dataframes dependencies?

MrPowers commented 5 years ago

@afranzi - did you give the presentation on custom transformations at the Spark Summit? Do you have a link to the video or the slides? I'd like to check it out :neckbeard:

afranzi commented 5 years ago

Here it goes! --> https://www.youtube.com/watch?v=WVPSQVyrpR8 :)

MrPowers commented 3 years ago

The Scala community really needs a good DAG library, similar to networkx.

I built this for PySpark, see the unicron repo.

This'll be a good addition if Scala ever gets a good DAG library.