opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
22 stars 33 forks source link

New trendline ppl command (WMA) #872

Closed andy-k-improving closed 1 week ago

andy-k-improving commented 2 weeks ago

Description

Introduce a new variant (WMA) for existing trendline ppl command, by compositing a logical plan similar to the following with function nth_value( ) to calculate the WMA value by perform event look behind.

-- +- 'Project ['name, 'salary, 
-- (((('nth_value('salary, 3) windowspecdefinition('age ASC NULLS FIRST, specifiedwindowframe(RowFrame, -2, currentrow$())) * 3) + 
-- ('nth_value('salary, 2) windowspecdefinition('age ASC NULLS FIRST, specifiedwindowframe(RowFrame, -2, currentrow$())) * 2)) + 
-- ('nth_value('salary, 1) windowspecdefinition('age ASC NULLS FIRST, specifiedwindowframe(RowFrame, -2, currentrow$())) * 1)) / 6) AS WMA#708]
   -- +- 'UnresolvedRelation [employees], [], false

Some high level code changes:

Related Issues

Prior implement for SMA formula: https://github.com/opensearch-project/opensearch-spark/pull/833

Check List

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Test plan:

Despite the existing unit test / integration test, the feature can also be tested manually, by first inserting a simple table, then run PPL trend line command against the table to calculate WMA value.

# Produce the artifact
sbt clean sparkPPLCosmetic/publishM2

# Start Spark with the plugin
bin/spark-sql --jars "/ABSOLUTE_PATH_TO_ARTIFACT/opensearch-spark-ppl_2.12-0.6.0-SNAPSHOT.jar" \
--conf "spark.sql.extensions=org.opensearch.flint.spark.FlintPPLSparkExtensions"  \
--conf "spark.sql.catalog.dev=org.apache.spark.opensearch.catalog.OpenSearchCatalog" \
--conf "spark.hadoop.hive.cli.print.header=true"

# Insert test table and data
CREATE TABLE employees (name STRING, dept STRING, salary INT, age INT, con STRING);

INSERT INTO employees VALUES ("Lisa", "Sales------", 10000, 35, 'test');
INSERT INTO employees VALUES ("Evan", "Sales------", 32000, 38, 'test');
INSERT INTO employees VALUES ("Fred", "Engineering", 21000, 28, 'test');
INSERT INTO employees VALUES ("Alex", "Sales", 30000, 33, 'test');
INSERT INTO employees VALUES ("Tom", "Engineering", 23000, 33, 'test');
INSERT INTO employees VALUES ("Jane", "Marketing", 29000, 28, 'test');
INSERT INTO employees VALUES ("Jeff", "Marketing", 35000, 38, 'test');
INSERT INTO employees VALUES ("Paul", "Engineering", 29000, 23, 'test');
INSERT INTO employees VALUES ("Chloe", "Engineering", 23000, 25, 'test');

# Execute WMA with basic option:

source=employees | trendline sort age wma(2, salary);

name    dept    salary  age con salary_trendline
Paul    Engineering 29000   23  test    NULL
Chloe   Engineering 23000   25  test    25000.0
Jane    Marketing   29000   28  test    27000.0
Fred    Engineering 21000   28  test    23666.666666666668
Alex    Sales------ 30000   33  test    27000.0
Tom Engineering 23000   33  test    25333.333333333332
Lisa    Sales------ 10000   35  test    14333.333333333334
Jeff    Marketing   35000   38  test    26666.666666666668
Evan    Sales------ 32000   38  test    33000.0

# Execute WMA with alias:

source=employees | trendline sort age wma(2, salary) as CUSTOM_NAME

name    dept    salary  age con CUSTOM_NAME
Paul    Engineering 29000   23  test    NULL
Chloe   Engineering 23000   25  test    25000.0
Jane    Marketing   29000   28  test    27000.0
Fred    Engineering 21000   28  test    23666.666666666668
Alex    Sales------ 30000   33  test    27000.0
Tom Engineering 23000   33  test    25333.333333333332
Lisa    Sales------ 10000   35  test    14333.333333333334
Jeff    Marketing   35000   38  test    26666.666666666668
Evan    Sales------ 32000   38  test    33000.0

# Execute WMA with multiple calculations:

source=employees | trendline sort age wma(2, salary) as WMA_2 wma(3, salary) as WMA_3;

name    dept    salary  age con WMA_2   WMA_3
Paul    Engineering 29000   23  test    NULL    NULL
Chloe   Engineering 23000   25  test    25000.0 NULL
Jane    Marketing   29000   28  test    27000.0 27000.0
Fred    Engineering 21000   28  test    23666.666666666668  24000.0
Alex    Sales------ 30000   33  test    27000.0 26833.333333333332
Tom Engineering 23000   33  test    25333.333333333332  25000.0
Lisa    Sales------ 10000   35  test    14333.333333333334  17666.666666666668
Jeff    Marketing   35000   38  test    26666.666666666668  24666.666666666668
Evan    Sales------ 32000   38  test    33000.0 29333.333333333332
Time taken: 0.466 seconds, Fetched 9 row(s)
YANG-DB commented 2 weeks ago

@andy-k-improving please add DCO (sign-off)

andy-k-improving commented 2 weeks ago

@andy-k-improving please add DCO (sign-off)

Done.

andy-k-improving commented 2 weeks ago

@andy-k-improving

please add the relevant documentation references including examples

Done.

andy-k-improving commented 2 weeks ago

@YANG-DB I have updated the example and documentation, would you mind to have another look?

Thanks,

andy-k-improving commented 1 week ago

Hi @dai-chen , @LantaoJin and @salyh , would you guys mind to have look on this, any feedback would be appreciated :)

LantaoJin commented 1 week ago

The CI failure caused by https://github.com/opensearch-project/opensearch-spark/pull/903 is not related but blocks your whole testing process. @andy-k-improving could you merge the latest code from main?

LantaoJin commented 1 week ago

Thanks for your contribution. Merging to main.