apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.5k stars 1.02k forks source link

ASOF join support / Specialize Range Joins #318

Open nugend opened 3 years ago

nugend commented 3 years ago

Is your feature request related to a problem or challenge? Please describe what you are trying to do. In many timeseries workloads, the need to join one record to another based on recency or ordering is quite prevalent. This can be with two sensors that maintain different sampling rates, in handling market data (such as situations where you want to find the most recent quote for a given trade), or any other scenario where an ordering of events can be applied. In these situations, it is quite frequent that one set of data is at a much larger scale than the other, typically starting at an order of magnitude. It is also quite common that you would want to actually store your data in the appropriate order so as to minimize the effort when performing this kind of join operation (though that is not strictly the problem itself).

Describe the solution you'd like Essentially the solution implemented by Clickhouse and KDB (please note that the Clickhouse solution allows for the full breadth of ordering conditions for closest match).

Describe alternatives you've considered There is a general subquery solution that can be used to achieve the desired outcome, but it is typically not performant and can be fairly awkward to express. Provided implementation of #141 and sufficient query optimization of the subquery solution into sort-merge join, it may not be necessary as direct syntax.

Additional context This likely depends on #141

alippai commented 2 years ago

Related issue solved by duckdb: https://github.com/duckdb/duckdb/pull/3219 and https://github.com/duckdb/duckdb/pull/3342

nugend commented 2 years ago

I haven’t evaluated the polar join_asof solution yet, but I believe it is also pertinent.

alamb commented 7 months ago

There is a blog post about this from duckdbL https://duckdb.org/2022/05/27/iejoin.html

my-vegetable-has-exploded commented 6 months ago

There is a blog post about this from duckdbL https://duckdb.org/2022/05/27/iejoin.htmlduckdbL 有一篇关于此的博客文章 https://duckdb.org/2022/05/27/iejoin.html

We can treat asof join as a special kind of iejoin of course. Furthermore AsOf can stop searching when it finds the first match because there is at most one match.This leaves us room for optimization. I think it's ok to support asofjoin without iejoin (which can be done later). Here is another blog about AsOf join https://duckdb.org/2023/09/15/asof-joins-fuzzy-temporal-lookups.

I am willing to have a try if nobody is working on this.

alamb commented 6 months ago

I am willing to have a try if nobody is working on this.

I don't know of anyone working on this. Maybe @Dandandan @liukun4515 or @ozankabak know of others who are (or are interested in helping)

ozankabak commented 6 months ago

IMO the ASOF join syntax is not well-thought out and it is better to have such operators downstream (or in datafusion-contrib for public experimentation, but not a part of Datafusion core). I agree with the many of the points raised against this syntax (although not the tone) when it was brought up in PostgreSQL mailing list.

At Synnada, we handle time-series use cases like the ones ASOF tries to address -- but we stay within the realm of standard/consistent SQL. I would be happy to share some info on our approach after the holidays.

alamb commented 6 months ago

BTW I think "Range Join" is pretty similar in spirit and does not require special SQL syntax

https://www.vertica.com/blog/what-is-a-range-join-and-why-is-it-so-fastba-p223413/

https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/AnalyzingData/Queries/Joins/RangeJoins.htm#:~:text=Vertica%20provides%20performance%20optimizations%20for,two%20columns%20of%20another%20table.

I think there is some non trivial overlap with the notion of symmetric hash join that we already have (maybe we could extend the analysis used there)

my-vegetable-has-exploded commented 5 months ago

At Synnada, we handle time-series use cases like the ones ASOF tries to address -- but we stay within the realm of standard/consistent SQL. I would be happy to share some info on our approach after the holidays.

Looking forward to your sharing. Thanks @ozankabak

ozankabak commented 5 months ago

This is on my mind, this week is very busy but I will circle back as soon as possible.

PsiACE commented 3 months ago

If we plan to add ASOF JOIN /IEJOIN, I would like to keep an eye on the issue and be involved in its implementation.

Lordworms commented 3 months ago

I would like to take part in this one.

jirislav commented 1 week ago

IMO the ASOF join syntax is not well-thought out and it is better to have such operators downstream

@ozankabak Can you please elaborate on that a bit? Which of the ~10 existing ASOF join implementations are you considering as not well-thought out and why?

At Synnada, we handle time-series use cases like the ones ASOF tries to address.

It doesn't "try" to address anything, it solves a pain point, that is joining two datasets without matching timestamps efficiently. I'd love to see your "standard/consistent" SQL version though.

alamb commented 1 week ago

This topic came up at the recent DataFusion meetup in San Franciso as something of interest. Is there anyone willing to help drive / lead this project?

I can't offer to do it myself, but I can certainly offer to help review designs and implementations

Thinking about how to break down the work, I think we will need:

  1. Syntax support (I thinks sqlparser-rs already has this, but we may need to pick a dialect, etc)
  2. Figure out how we will represent ASOF join (is it a new JoinType? Is it a new LogicalPlan::AsOfJoin variant? Something else? -- maybe someone could research how other systems handle this)
  3. SQL Planner support (take the AST and make a LogicalPlan)
  4. Optimizer support (not sure what is needed here)
  5. An AsOfJoin ExecutionPlan

Reading https://duckdb.org/docs/guides/sql_features/asof_join.html

SELECT *
FROM holdings h
ASOF JOIN prices p USING (ticker, "when");

Given the limited semantics of this join (match the row with the largest value of "when") I suspect we could use a modified hash join and modify the "does the row match" clause to also check for "is when greater than the currently matched row"

Another way to implement this would be with a slightly modified merge join

AsOfJoinExec(on=ticker, range_match:"when")
  SortExec(expr=[ticker, when])
    Scan(holdings)
  SortExec(expr=[ticker, when])
    Scan(prices)

The idea being that you could drastically simplify the join if the input was already sorted by ticker as you just need to hold on to the most recently seen value of when)

This is probably slower (as you have to sort the entire input including many values that will be filtered) but would allow for other behaviors other than just "the most rececnt value)

ozankabak commented 1 week ago

@jirislav I'm not sure if you chose your tone intentionally to be this way but I have to say I didn't really appreciate the way it reads.

Nevertheless I will try to answer as best as I can. My concern about the ASOF join is not specific to a particular implementation -- it is a conceptual one. The pain point is of course real, but one doesn't really need a new, special syntax to solve it.

Consider a scenario where we are computing the most recent values of each position in a stock portfolio (akin to @alamb's example). Let's create some sample data via:

CREATE TABLE holdings (
    ts TIMESTAMP,
    ticker VARCHAR(10),
    amount INT
);

INSERT INTO holdings (ts, ticker, amount) VALUES
  ('2024-06-27 09:00:00', 'AAPL', '50'),
  ('2024-06-27 09:00:00', 'GOOGL', '30'),
  ('2024-06-27 09:00:00', 'MSFT', '20'),
  ('2024-06-28 09:00:00', 'AAPL', '60'),
  ('2024-06-28 09:00:00', 'GOOGL', '35'),
  ('2024-06-28 09:00:00', 'MSFT', '25');

CREATE TABLE prices (
    ts TIMESTAMP,
    ticker VARCHAR(10),
    price DECIMAL(10, 2)
);

INSERT INTO prices (ts, ticker, price) VALUES
  ('2024-06-27 09:00:00', 'AAPL', '145.50'),
  ('2024-06-27 09:00:00', 'GOOGL', '2754.36'),
  ('2024-06-27 09:00:00', 'MSFT', '301.22'),
  ('2024-06-28 09:00:00', 'AAPL', '146.20'),
  ('2024-06-28 09:00:00', 'GOOGL', '2770.01'),
  ('2024-06-28 09:00:00', 'MSFT', '305.50');

Consider the following standard query:

SELECT h.*, h.amount * LAST_VALUE(p.price ORDER BY p.ts)
FROM holdings as h, prices as p
WHERE h.ts >= p.ts AND h.ticker = p.ticker
GROUP BY h.ts, h.ticker, h.amount
ORDER BY h.ts, h.ticker

We get the following (expected) result:

+---------------------+--------+--------+----------+
| ts                  | ticker | amount | value    |
+---------------------+--------+--------+----------+
| 2024-06-27T09:00:00 | AAPL   | 50     | 7275.00  |
| 2024-06-27T09:00:00 | GOOGL  | 30     | 82630.80 |
| 2024-06-27T09:00:00 | MSFT   | 20     | 6024.40  |
| 2024-06-28T09:00:00 | AAPL   | 60     | 8772.00  |
| 2024-06-28T09:00:00 | GOOGL  | 35     | 96950.35 |
| 2024-06-28T09:00:00 | MSFT   | 25     | 7637.50  |
+---------------------+--------+--------+----------+

This solves the problem, it generalizes to arbitrary join conditions, and it already works in main branch. The plan one gets from OOTB DataFusion for this query is not very efficient today, but it can be made more efficient. For example, if we specify that the input tables are ordered w.r.t. ts (which we already allow today for external tables), the ORDER BY clause becomes unnecessary and the query gets even more succinct [1]. We can make versions of join algorithms that will take that data ordering and execute even more efficiently than the standard hash join this query compiles into. The nice thing about DF is that one can make this query efficiently run even in more challenging contexts (like streaming data) by implementing custom join operators. This is what we did at Synnada.

Given that we can solve this problem in a general way without any special syntax, I do not see any real benefit of adding another syntax for an already-solved problem in upstream DF. We are also improving built-in join operators constantly and adding new ones, so our plans will also get ever more efficient over time. Downstream users are obviously free to customize DF in any way they wish.

Let's keep collecting ideas, creating examples and comparing approaches. At the end of the day, my current position may turn out to be wrong and adding this to upstream DF may prove to be the right choice. That is OK, we will all learn something regardless of the outcome of these discussions so let's keep contributing to it in good faith. I hope this helps.

[1] @alamb, writing up this example makes me think it could be a good idea to file ticket to support order specification for memory tables too.

alamb commented 1 week ago

We can make versions of join algorithms that will take that data ordering and execute even more efficiently than the standard hash join this query compiles into.

FWIW this is the approach we took at Vertica many years ago (a specialized join implementation rather than specialized syntax). The basic idea is described in https://blogs.opentext.com/what-is-a-range-join-and-why-is-it-so-fastba-p223413/

alamb commented 1 week ago

[1] @alamb, writing up this example makes me think it could be a good idea to file ticket to support order specification for memory tables too.

I agree -- that would be a good improvement

jirislav commented 1 week ago

@ozankabak I'm deeply sorry, I didn't realize the tone is inappropriate. Thanks for pointing that out!

Just to share my point of view — I didn't understand why is SQL standard in DataFusion more important than "proper" time-series support and it seemed to me like this issue was blocked quite some time by waiting for your reaction. It's just something I have to get used to in open-source projects I guess.

Anyway, thanks for sharing the example! Though I see a few issues with the example you have shared: 1) The solution is "clean", but hard to both write & read. It cannot be used on a daily basis in the financial industry. Many other solutions have adopted this simple query syntax (you don't even need to use aliases):

SELECT *, amount * price
FROM holdings
ASOF JOIN prices USING (ticker, ts)
ORDER BY ts, ticker

2) A decent data analyst would be worried about the performance of the standard-SQL approach you have just shared. The query actually suggests to create a cartesian product first, then sort it and finally filter it. I'd much rather have proper support for de-facto standard ASOF JOIN syntax than making users believe the query engine is perfect. 3) The timestamps in both of your example tables use the same values. To actually prove it works, they should be shifted a little bit without any common timestamp values. Otherwise you don't need ASOF join, right?

alamb commented 1 week ago

We could contemplate implementing both 🤔

Specifically, if we got the DataFusion engine to be able to identify range joins in general and ASOF joins are special case, we could simply treat an ASOF join as syntatic sugar and pass the general predicates to the rest of the engine

ozankabak commented 6 days ago

@ozankabak I'm deeply sorry, I didn't realize the tone is inappropriate. Thanks for pointing that out!

Thank you @jirislav, I'm sure we will find a reasonable way forward and make DF and its ecosystem even better for everybody.

  1. A decent data analyst would be worried about the performance of the standard-SQL approach you have just shared. The query actually suggests to create a cartesian product first, then sort it and finally filter it. I'd much rather have proper support for de-facto standard ASOF JOIN syntax than making users believe the query engine is perfect.

I agree that some people may think this way, but this sounds like a conceptual issue on the part of the analyst. The query is declarative and doesn't specify how the calculation will be made. If someone is worried about the how, they can take a quick look at the plan to verify whether it is efficient enough for their needs. Obviously, the plan of the query today is not as efficient as it could be, but I'm optimistic that we will get there soon.

  1. The timestamps in both of your example tables use the same values. To actually prove it works, they should be shifted a little bit without any common timestamp values. Otherwise you don't need ASOF join, right?

Right. The query is correct but I could have chosen my sample data better.

--

I like @alamb's suggestion of starting with operators first and then thinking about various syntactic sugars. I anticipate that there will be other interesting constructs besides ASOF JOIN. To add to this idea, I think a good way forward could be:

  1. Keep making the core engine better, improving existing join operators and adding new ones.
  2. Create a datafusion-timeseries package/crate (or some other more appropriate name) that adds syntactic sugars, extra time-series related features etc. that may not necessarily belong to the main crate, but is useful in time-series contexts.