teragrep / pth_10

Data Processing Language (DPL) translator for Apache Spark
GNU Affero General Public License v3.0
0 stars 6 forks source link

refactor sendemail command to be a data sink rather than the current flush() workaround #206

Open eemhu opened 9 months ago

eemhu commented 9 months ago

Description Currently sendemail uses a lot of static objects and the flush() workaround to send the last batch. Could it be possible to create a data sink instead, which sends the email on close? Save all data to the sink and after everything is saved, the data could be emailed.

Removing static keywords from the current implementation will most likely break the command and send incomplete data.

51-code commented 8 months ago

After inspection it is safe to say that a datasink approach would work for streaming datasets (parallel mode) quite well as the last email could be sent when closing the datasink and no flush would be needed.

Sequential mode has to be taken into account in case there are, for example, aggregations or sorting happening before the sendemail command. This inevitably means that the datasets come in batches and a datasink would have to be initialized for all data batches independently. Even if the storing of data can be handled with a datasink, the solution runs into the same old problem of not knowing which batch is the last one and the flush() function has to be used.

To sum up: the datasink approach could be useful to allow the query to stay in parallel mode in some cases but I can't seem to find a solution for sequential mode where a flush() is not needed for the last email.

kortemik commented 8 months ago

The problem here is that foreachBatch is a sink itself, so our sink should be able to do the transformations of the foreachBatch sink within itself in order to implement this properly, please investigate the possibility.

51-code commented 8 months ago

That should we possible because the new sink could use StepList's call() method (the method that gets executed in forEachBatch) in the start of its own implementation. But wouldn't the issue still persist?

The data is sliced into batches in the StepList's datasink. I think that the only way to know in sequential mode when the last batch has been received is when the whole query is done; this can be known by having a listener. This is essentially what we have now as PTH-07 starts the query and then has a listener to call flush() when the query is done.