teragrep / pth_10

Data Processing Language (DPL) translator for Apache Spark
GNU Affero General Public License v3.0
0 stars 6 forks source link

Aggregate commands can't be used after sequential only commands #205

Closed 51-code closed 5 months ago

51-code commented 7 months ago

Describe the bug

For example:

%dpl
index=example | sort +num(offset) | stats avg(offset) as avg_offset

Results to

org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;

This means that aggregations can't be used after "Sequential_only" commands, that will trigger StepList.java to run the steps inside ForEachBatch. So no aggregations after sort, sendemail etc.

However, aggregation -> sequential_only -> aggregation works, as there is now (referring to the Exception above) a streaming aggregation just before the sequential_only command. It is unclear if the second aggregation works correctly though.

Expected behavior

Aggregations should work in ForEachBatch, and that is why the Output mode is changed in the first place. However, the complete output mode should only be used if the aggregation was before the "breakpoint", that is, before a sequential_only command. Otherwise the dataset is no longer streaming and the output mode change is unnecessary and will result in an exception.

Fixing this isn't as easy as it looks, as most of the aggregation commands will likely break in ForEachBatch, providing results for only the last batch given to the command. This is unclear, however.

How to reproduce

%dpl
index=example | sort +num(offset) | stats avg(offset) as avg_offset

Screenshots

Software version

Desktop (please complete the following information if relevant):

Additional context

kortemik commented 7 months ago

This has worked in previous versions, if i recall. Parallel mode's output mode complete provides full results, aggregated over the batches, into foreach batch, where further aggregations should be applied just for the data that parallel modes aggregation flushed there?

ke 31. tammik. 2024 klo 9.14 51-code @.***> kirjoitti:

Describe the bug

For example:

%dpl index=example | sort +num(offset) | stats avg(offset) as avg_offset

Results to

org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;

This means that aggregations can't be used after "Sequential_only" commands, that will trigger StepList.java to run the steps inside ForEachBatch. So no aggregations after sort, sendemail etc.

However, aggregation -> sequential_only -> aggregation works, as there is now (referring to the Exception above) a streaming aggregation just before the sequential_only command. It is unclear if the second aggregation works correctly though.

Expected behavior

Aggregations should work in ForEachBatch, and that is why the Output mode is changed in the first place. However, the complete output mode should only be used if the aggregation was before the "breakpoint", that is, before a sequential_only command. Otherwise the dataset is no longer streaming and the output mode change is unnecessary and will result in an exception.

Fixing this isn't as easy as it looks, as most of the aggregation commands will likely break in ForEachBatch, providing results for only the last batch given to the command. This is unclear, however.

How to reproduce

%dpl index=example | sort +num(offset) | stats avg(offset) as avg_offset

Screenshots

Software version

Desktop (please complete the following information if relevant):

  • OS:
  • Browser:
  • Version:

Additional context

— Reply to this email directly, view it on GitHub https://github.com/teragrep/pth_10/issues/205, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAGX2OR2AOGZZVZ4R4ADQALYRHVNFAVCNFSM6AAAAABCSRBLGSVHI2DSMVQWIX3LMV43ASLTON2WKOZSGEYDSNBTG4YDCOA . You are receiving this because you are subscribed to this thread.Message ID: @.***>

eemhu commented 5 months ago

Internal PR 595 merged, closing