benchflow / data-analyses-scheduler

A service scheduling Spark's data-transformers and analysers application according to messages published on Kafka's topics
Other
1 stars 0 forks source link

Expand spark-tasks-sender to launch analyser scripts #9

Open Cerfoglg opened 8 years ago

Cerfoglg commented 8 years ago

The current spark-task-sender launches scripts to take certain data from minio, transform it, and store it in our Cassandra database. After this, we also want to be able to analyse the data from Cassandra to compute useful metrics.

Expand the current implementation so that we can define a series of analysing scripts that will be sent to Spark once certain requirements have been met.

These involve primarily strong requirements that either the data for a certain replication of an experiment is ready on Cassandra, or all replication for a given experiment are ready on Cassandra, which means waiting for the associated data transformer scripts to have successfully finished running.

Requirements and analyser scripts should be defined in the same configuration file we already have for the other scripts, for the sake of cohesion.

Given that currently Python scripts cannot be launched on Spark in cluster mode (see #7), what we can do is launch the scripts for data transformers in client mode, wait for successful completion, and afterwards signal that a certain requirement has been met, so any script that can be sent can now be sent to Spark to analyse the data.

VincenzoFerme commented 8 years ago

@Cerfoglg ok, go for it. We will check again in the future the possibility to switch to the cluster mode (see #7). Just check that while waiting the response, you are not wasting computational resources.

Cerfoglg commented 8 years ago

@VincenzoFerme The way I ended up implementing this at the moment is using golang channels. Essentially, in the conf file the user can specifiy which analyser scripts need to be run, and what are the requirements to be met before the script is sent to Spark. When the spark-task-sender is started, it creates a list of channels, one channel per requirement, and launches a series of goroutines, one per analyser, which will try reading from all the channels associated with their requirements, and thus block and wait. Once a data transformer script has successfully finished running it will close the channel associated with the requirement that is now met. Thus, once all requirement for a certain analyser are met, all channels associated with those requirements will be closed, and thus the goroutine for that analyser will no longer be blocked, and thus that routine will launch the script for the analysis to Spark, knowing that the data will be on Cassandra for the analysis.

Considering that goroutine do not keep the CPU busy when blocked (when attempting to read from an unbuffered channel for instance), this solution won't cause issue on that front. This will tax the memory a bit more I imagine, but at the same time we do make sure to orchestrate how scripts are handled all in one location, the spark tasks sender.

Of course, there's always room for further optimization in case we find new tricks.

VincenzoFerme commented 8 years ago

@Cerfoglg cool. Looking forward for the pull request of this first prototype. Some things to be considered. Let me know what do you think about them:

  1. Consider that your solution is not resilient to failure if you don't store the state of the application somewhere on the file system or on a database
  2. You should switch to a more scalable way to handle dependency among tasks, for the second version of the prototype.

Some hints about 2:

Both of the queues should be FIFO and independent one to the other in term of submitting the execution.

When you add dependency among analysers, the same approach can be repeated between an analyser and its dependent analysers.

We should take care that we also need to define a way to state the dependency between a metric on an experiment and a metrics on trials. We can start the computation of the metric on the experiment, only when we have the dependent metric on all the trial.

The collector should decide when to pick new messages from Kafka, according to the numbers of waiting answers from Spark and the length of the data-analysers queue (that should be fixed) and the analysers queue. _CURRENT IMPLEMENTATION: _ we rely on Go lang buffered channel with a limited number of available spaces, to limit the previous.

Some references about 2:

Cerfoglg commented 8 years ago

@VincenzoFerme

  1. Yes, dealing with failure is something that will be done after the first prototype
  2. This is definitey the solution I was looking for, although it will take some work to get all of this done, as I'm unfamiliar with how to code all this. But I agree with this solution.
VincenzoFerme commented 8 years ago

@Cerfoglg

  1. Ok, open a issue for that then
  2. Cool. Go for a pull request with the current prototype and then plan the work for the new version. Feel free to ask me in case of any doubt.
Cerfoglg commented 8 years ago

@VincenzoFerme This issue needs closing, since we do launch analysers now, other issues cover other aspects to improve.

VincenzoFerme commented 8 years ago

@Cerfoglg no, we can't. https://github.com/benchflow/spark-tasks-sender/issues/9#issuecomment-164926667 is still open and we don't discuss it in other issues.