pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
26.9k stars 1.65k forks source link

dt.truncate is slow #13157

Closed stas-sl closed 5 months ago

stas-sl commented 5 months ago

Checks

Reproducible example

import duckdb
import polars as pl
import numpy as np

start = np.datetime64('2015-01-01', 's').astype('int')
end = np.datetime64('2020-01-01', 's').astype('int')
n = 100_000_000

df = pl.DataFrame({'date': np.random.randint(start, end, n).astype('datetime64[s]').astype('datetime64[ms]')})

# looks like only one CPU core is utilised, same for `1y`
%time df.group_by(pl.col.date.dt.truncate('1mo')).agg(pl.count())
# CPU times: user 37 s, sys: 1.15 s, total: 38.1 s
# Wall time: 34.7 s

# though for '1d' it is significantly faster
%time df.group_by(pl.col.date.dt.truncate('1d')).agg(pl.count())
# CPU times: user 7.16 s, sys: 634 ms, total: 7.79 s
# Wall time: 4.38 s

# but duckdb is much faster
%time duckdb.sql("select date_trunc('month', date) as month, count() from df group by month").pl()
# CPU times: user 7.02 s, sys: 93.1 ms, total: 7.12 s
# Wall time: 1.27 s

# even if using time_bucket function which is probably a closer alternative to polars dt.truncate
%time duckdb.sql("select time_bucket(interval '1 month', date) as month, count() from df group by month").pl()
# CPU times: user 11.9 s, sys: 108 ms, total: 12 s
# Wall time: 1.88 s

Log output

No response

Issue description

dt.truncate is slow when used with intervals >= '1mo', for smaller intervals it seems to be faster, though still it falls behind duckdb. Also seems like it doesn't utilise all CPU cores.

Expected behavior

At least dt.truncate for month intervals to work as fast as for smaller intervals, or even better if it could match duckdb performance.

Installed versions

``` --------Version info--------- Polars: 0.20.1 Index type: UInt32 Platform: macOS-10.16-x86_64-i386-64bit Python: 3.10.13 (main, Sep 11 2023, 08:39:02) [Clang 14.0.6 ] ----Optional dependencies---- adbc_driver_manager: cloudpickle: connectorx: deltalake: fsspec: 2023.12.1 gevent: matplotlib: 3.7.1 numpy: 1.24.2 openpyxl: pandas: 2.1.4 pyarrow: 14.0.1 pydantic: 2.1.1 pyiceberg: pyxlsb: sqlalchemy: xlsx2csv: xlsxwriter: duckdb: 0.9.2 ```
MarcoGorelli commented 5 months ago

thanks @stas-sl for the report

this part is all done in Chrono

wondering if there's a way to do it by operating on the underlying integers directly...

mcrumiller commented 5 months ago

@MarcoGorelli I think we can. Let's think about this a bit.

In a non-leap-year month, first of every month corresponds to the following integers:

Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec
0 31 59 90 120 151 181 212 243 273 304 334

In a normal year, there are 365 days, and +1 in leap years. 1972 was a leap year, and our dates start with t=0 as 1970-01-01. So given any random date t (as an int), what we need to do is:

  1. Let y be the number of years prior
  2. Let p be the number of leap years
  3. Compute the start of the current year as s = y * 365 + p
  4. Compute day of year d = t - s.
  5. Find the difference with the nearest offset above less than the current value, with a +1 if it's a leap year and the number is greater than 58.
  6. Add year start minus 1.

Let's pick a random date and truncate it to the nearest month, say 2023-12-20 (hey that's today), or 19711:

>>> pl.Series([date.today()]).cast(pl.Int32)
shape: (1,)
Series: '' [i32]
[
        19711
]
  1. y = (2023 - 1970) = 53
  2. p = (53 + 2) // 4 = 13 (verify: 1972, 1976, 1980, 1984, 1988, 1992, 1996, 2000, 2004, 2008, 2012, 2016, 2020)
  3. s = 53 * 365 + 13 = 19358.
  4. d = 19711 - 19358 - 1 = 352.
  5. Nearest offset less than or equal: 335 (+0 for leap year)
  6. Final date = 19358 + 335 - 1 = 19692.

Check:

>>> pl.Series([19692]).cast(pl.Date)
shape: (1,)
Series: '' [date]
[
        2023-12-01
]

Next question is, would this be faster than using chrono, and are there any exceptions?

MarcoGorelli commented 5 months ago

Next question is, would this be faster than using chrono, and are there any exceptions?

if it's correct then yes, almost certainly 😄 if you want to give this a go, the function to search for is fn truncate_monthly

would need to hypothesis-test the hell out of this, but would be awesome if it could work

mcrumiller commented 5 months ago

@MarcoGorelli I'm working on expression-ifying it for now in order to validate as a proof-of-concept. If it holds we can move it into rust.

mcrumiller commented 5 months ago

@MarcoGorelli for 1mo we could actually do the much much simpler:

# truncate by 1mo
df.select(col("date") - pl.duration(days=col("date").dt.day()-1))

It's less flexible though.

Edit: slower than .dt.truncate.

stas-sl commented 5 months ago

Seems like the issue is more general than just truncation, I found another issue which has similar symptoms and also uses dates https://github.com/pola-rs/polars/issues/12895.

And I did a few more tests converting datetime to string:

# converting to string works a bit faster than truncate('1mo'), though still it utilises only single CPU core
%time df.group_by(pl.col.date.dt.strftime('%Y-%m')).agg(pl.count())
# CPU times: user 30.2 s, sys: 2.67 s, total: 32.9 s
# Wall time: 23 s

# CPU time for duckdb is comparable with polars, but due to utilising all cores, Wall time is much smaller
%time duckdb.sql("select strftime('%Y-%m', date) as month, count() from df group by month").pl()
# CPU times: user 26.6 s, sys: 162 ms, total: 26.7 s
# Wall time: 3.83 s

So, maybe there are 2 separate problems:

  1. Lack of parallelism, which I'm not sure if that's polars' or chrono's issue
  2. Slow truncation implementation inside chrono

If 1st point could be related to polars, then just fixing it could improve things.

stas-sl commented 5 months ago

Another alternative, inspired by duckdb date_trunc implementation

%time df.group_by(pl.datetime(year=pl.col.date.dt.year(), month=pl.col.date.dt.month(), day=1)).agg(pl.count())
# CPU times: user 12.9 s, sys: 11 s, total: 23.8 s
# Wall time: 18.6 s

A bit faster, though still not very impressive

mcrumiller commented 5 months ago

Ok, expression implementation is actually slower:

import numpy as np
import polars as pl
from polars import col, when

start = np.datetime64("2015-01-01", "s").astype("int")
end = np.datetime64("2020-01-01", "s").astype("int")
n = 1_000_000

df = pl.DataFrame({"date": np.random.randint(start, end, n).astype("datetime64[s]").astype("datetime64[ms]")})
df = df.with_columns(col("date").cast(pl.Date))

t = col("date")
y = t.dt.year() - 1970
p = (y + 1) // 4
s = y * 365 + p
d = t - s
days = pl.Series([31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31])
nearest_offset = sum((day * (d >= cum_day) for day, cum_day in zip(days, days.cum_sum())))
is_leap_year = (y + 2) % 4 == 0
offset = is_leap_year & (d > 58)
truncated = (s + nearest_offset + offset).cast(pl.Date)

# 324 ms ± 15.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
df.with_columns(col("date").dt.truncate("1mo"))

# 584 ms ± 10.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
df.with_columns(truncated)

Most of the time is taken up by the when/then chain, so if there's a way to simplify that we could make some improvements.

@stas-sl I know you've moved beyond truncate but I'm just having fun here.


Edit: faster implementation now with logical sums, still slower than .dt.truncate().

gab23r commented 5 months ago

It is slower with expressions but it doesn't mean it will be slower if it's coded in rust directly

cmdlineluser commented 5 months ago

@mcrumiller Just experimenting with your example, it seems coalesce + separate when/thens is faster than a single when/then chain

nearest_offset = pl.coalesce(
    when(d < 31).then(0),
    when(d < 59).then(31),
    ...,
    334
)

pl.cut seems to be a much faster operation, but getting the value back involves casting the labels to strings and back again which slows it down. (although still seems to be faster than the when/then approaches)

mcrumiller commented 5 months ago

@cmdlineluser did you compare those to my edit in the above where I replaced the when/then with sums?

stas-sl commented 5 months ago

Can someone explain >10x time difference between creating date using pl.date and casting integer to pl.Date?

n = 100_000_000

%time pl.select(pl.date(year=pl.zeros(n) + 1970, month=1, day=1))
# CPU times: user 8.78 s, sys: 4.07 s, total: 12.9 s
# Wall time: 13.3 s

%time pl.select((pl.zeros(n)).cast(pl.Date))
# CPU times: user 602 ms, sys: 468 ms, total: 1.07 s
# Wall time: 1.1 s
mcrumiller commented 5 months ago

@stas-sl, a pl.Date is simply a 32-bit integer representing the number of days since January 1st, 1970. To create a polars Date series from integers requires that we just make an i32 series which is very fast.

A python datetime.date object is a python class with a lot of stuff in it, but most importantly it stores the year/month/day as separate integers. These must be extracted and processed into the corresponding integer. I admit I'm not sure where or how this happens in the polars code so there may be room for improvement.

stas-sl commented 5 months ago

@mcrumiller hmm, are you sure it creates python datetime.date objects? I thought it return Expr which is then handled by Rust directly without any python objects. There is corresponding Rust datetime function, which I guess called from pl.date.

image image image image
mcrumiller commented 5 months ago

Oh, sorry! I misread your comment and thought you were saying that construction from a list of datetime.date values was much slower than int64.

MarcoGorelli commented 5 months ago

Can someone explain >10x time difference between creating date using pl.date and casting integer to pl.Date?

I think this is a completely different issue than truncate, could you open a new issue about this one so that this issue stays focused on truncate?

nameexhaustion commented 5 months ago

Unexpectedly, 1mo executes faster with streaming=True:

for trunc in ('1mo', '1d'):
    for streaming in (True, False):
        print(f"{trunc=} {streaming=}")
        %time df.lazy().select(pl.col.date.dt.truncate(trunc)).collect(streaming=streaming)
trunc='1mo' streaming=True
CPU times: user 1min 9s, sys: 1.33 s, total: 1min 11s
Wall time: 9.09 s
trunc='1mo' streaming=False
CPU times: user 16.7 s, sys: 1.35 s, total: 18.1 s
Wall time: 18.1 s
trunc='1d' streaming=True
CPU times: user 3.35 s, sys: 338 ms, total: 3.69 s
Wall time: 812 ms
trunc='1d' streaming=False
CPU times: user 2.03 s, sys: 561 ms, total: 2.59 s
Wall time: 2.6 s

*It is due to the streaming engine evaluating the expression in parallel across batches

DuckDB has the advantage here in part due to their execution being pipelined by default

deanm0000 commented 5 months ago

There are two issues.

The first is that truncate by month is inefficient and the second is that polars, by default, parallelizes across columns rather than within them.

For the first issue, it can be mitigated by constructing a date from the parts of the input (ie. pl.date(input.dt.year(), input.dt.month(), 1). That method is faster than dt.truncate('1mo') but it still lags behind duckdb.

Using @nameexhaustion's tip to use the streaming engine to force parallelization within the column along with using the data constructor methodology gets polars in line with duckdb.

I imagine that it'd be pretty straight forward to change how truncate('1mo') works but to get polars to automatically parallelize within columns would be more of a challenge. Maybe the optimizer could have something like if n_rows>threadhold and n_columns<other_threshold then switch to streaming engine.

Here's a graph and table of results

image

image

and the code to generate the above

``` import polars as pl import numpy as np import plotly.express as px import duckdb import time times={} start = np.datetime64("2015-01-01", "s").astype("int") end = np.datetime64("2020-01-01", "s").astype("int") for n_iter in range(1,11): n=10_000_000*n_iter print(f"{n:_}") df = pl.DataFrame({'date': np.random.randint(start, end, n).astype('datetime64[s]').astype('datetime64[ms]')}) times[str(n)]={} strt=time.time() df.group_by(pl.col.date.dt.truncate('1mo')).agg(pl.count()) times[str(n)]['pl_trunc_group']=time.time()-strt strt=time.time() df.with_columns(pl.col.date.dt.truncate('1mo')) times[str(n)]['pl_trunc']=time.time()-strt strt=time.time() df.lazy().group_by(pl.col.date.dt.truncate('1mo')).agg(pl.count()).collect(streaming=True) times[str(n)]['pl_trunc_streaming_group']=time.time()-strt strt=time.time() df.lazy().with_columns(pl.col.date.dt.truncate('1mo')).collect(streaming=True) times[str(n)]['pl_trunc_streaming']=time.time()-strt strt=time.time() df.group_by(pl.date(pl.col.date.dt.year(), pl.col.date.dt.month(), 1)).agg(pl.count()) times[str(n)]['pl_date_group']=time.time()-strt strt=time.time() df.with_columns(pl.date(pl.col.date.dt.year(), pl.col.date.dt.month(), 1)) times[str(n)]['pl_date']=time.time()-strt strt=time.time() df.lazy().group_by(pl.date(pl.col.date.dt.year(), pl.col.date.dt.month(), 1)).agg(pl.count()).collect(streaming=True) times[str(n)]['pl_date_streaming_group']=time.time()-strt strt=time.time() df.lazy().with_columns(pl.date(pl.col.date.dt.year(), pl.col.date.dt.month(), 1)).collect(streaming=True) times[str(n)]['pl_date_streaming']=time.time()-strt strt=time.time() duckdb.sql("select date_trunc('month', date) as month, count() from df group by month").pl() times[str(n)]['duckdb_trunc_group']=time.time()-strt strt=time.time() duckdb.sql("select date_trunc('month', date) as month from df").pl() times[str(n)]['duckdb_trunc']=time.time()-strt strt=time.time() duckdb.sql("select time_bucket(interval '1 month', date) as month, count() from df group by month").pl() times[str(n)]['duckdb_bucket_group']=time.time()-strt strt=time.time() duckdb.sql("select time_bucket(interval '1 month', date) as month from df").pl() times[str(n)]['duckdb_bucket']=time.time()-strt px.line( pl .from_dict(times) .melt(variable_name='n') .unnest('value') .melt('n',variable_name='method', value_name='seconds') .with_columns(pl.col.n.cast(pl.Int64)) .filter(~pl.col('method').str.contains('group')), x='n', y='seconds', color='method', markers=True ) ```
MarcoGorelli commented 5 months ago

Lack of parallelism, which I'm not sure if that's polars' or chrono's issue

For a single expression, this is expected: https://discord.com/channels/908022250106667068/1014967651656814694/1187398134997975111

Anyway, thanks for having brought attention to this - I think @mcrumiller 's formula is on the right track (though the leap years part needs adjusting - years divisible by 100 but not by 400 are not leap years) and, if done in Rust, I'd expect it to noticeably speed things up

@mcrumiller do you plan to implement this? I can do it if not

stas-sl commented 5 months ago

Thanks for clarification, I wasn't aware that by default polars parallelizes horizontally over columns and not vertically. And I had impression that streaming=True is mainly to be able to process data larger than RAM, rather than to parallelize.

trunc='1mo' streaming=True CPU times: user 1min 9s, sys: 1.33 s, total: 1min 11s Wall time: 9.09 s trunc='1mo' streaming=False CPU times: user 16.7 s, sys: 1.35 s, total: 18.1 s Wall time: 18.1 s

CPU times, though... it uses 4x more CPU resources than without streaming - 16 seconds vs 1min 9 seconds

%time df.lazy().select(pl.col.date.dt.truncate('1mo')).collect(streaming=False)
# CPU times: user 31.9 s, sys: 510 ms, total: 32.4 s
# Wall time: 32.7 s

# POLARS_MAX_THREADS=4
%time df.lazy().select(pl.col.date.dt.truncate('1mo')).collect(streaming=True)
# CPU times: user 1min 55s, sys: 1min 12s, total: 3min 8s
# Wall time: 49.9 s

On my quite dated Intel MacBook streaming=True is actually slower, though I was able to reproduce speed up on my Linux server. But that's probably another story, I will experiment more with different settings.

Anyway, regardless of parallelization, there seems to be something fishy with current date truncation implementation. I am a bit skeptical about @mcrumiller's solution, it seems quite overcomplicated to me, and not even faster as of now. I don't know Rust, but maybe try to look what's the difference between truncate_monthly and truncate_subweekly (I guess it is used for '1d' truncation) - shouldn't they perform more or less similarly? Just from common sense I don't know why one of them should be significantly slower than the other.

MarcoGorelli commented 5 months ago

and not even faster as of now

it's not a fair comparison though, only way to know is to implement it in rust - integer arithmetic should be very fast

what's the difference between truncate_monthly and truncate_subweekly

truncate_subweeky doesn't do anything with chrono (unless we're in the tz-aware case), and it just deals with fixed durations, so the integer arithmetic there is very simple - that's why it's a lot faster

deanm0000 commented 5 months ago

With respect, if @mcrumiller's method is going to utilize the year extractor then is it really going to be any better than just pl.date(input.dt.year(), input.dt.month(), 1) or pl.datetime(input.dt.year(), input.dt.month(),1)? Keep in mind that the starting point in this example is a datetime so there's already an implicit cast of number of nanoseconds since 1970-01-01 to days since 1970-01-01.

stas-sl commented 5 months ago

@deanm0000, year and month extractors seems to be relatively fast compared to construction new date from them using pl.date

%time df.select(pl.col.date.dt.year())
# CPU times: user 1.33 s, sys: 152 ms, total: 1.48 s
# Wall time: 1.52 s

%time df.select(pl.col.date.dt.month())
# CPU times: user 1.32 s, sys: 53.9 ms, total: 1.37 s
# Wall time: 1.51 s

%time df.select(pl.date(pl.col.date.dt.year(), pl.col.date.dt.month(), 1))
# CPU times: user 10.9 s, sys: 2.83 s, total: 13.8 s
# Wall time: 12.7 s

# just constants
%time pl.select(pl.date(year=pl.zeros(n) + 2020, month=1, day=1))
# CPU times: user 8.63 s, sys: 2.96 s, total: 11.6 s
# Wall time: 11.8 s

# comparing to duckdb
%time duckdb.sql('select make_date(2020, 1, 1) from range(100000000)').pl()
# CPU times: user 1.54 s, sys: 516 ms, total: 2.06 s
# Wall time: 1.6 s

# datetime contsruction
%time duckdb.sql('select make_timestamp(2020, 1, 1, 0, 0, 0) from range(100000000)').pl()
CPU times: user 2.14 s, sys: 973 ms, total: 3.11 s
Wall time: 2.57 s

I checked pl.datetime implementation https://github.com/pola-rs/polars/blob/5e3ffd25b2dd7bc53c13d95ec90d53d3312f5fc8/crates/polars-plan/src/dsl/function_expr/temporal.rs#L132-L155

And it looks quite involved and it uses chrono as well I guess. So it is understandable why it is slow. I can create a separate issue for it as @MarcoGorelli suggested, though I'm not sure if it possible to do something with it. Or maybe it should be much broader task to replace chrono with something more performant, if you ever decide.

MarcoGorelli commented 5 months ago

the currently solution does:

  1. timestamp to datetime
  2. some arithmetic
  3. construct new datetime
  4. new datetime to integer, and return

as far as I can tell, @mcrumiller 's soln would be:

  1. timestamp to datetime
  2. some arithmetic
  3. return integer

that's why I think it's got legs, even though it needs adjusting a bit - let's see

stas-sl commented 5 months ago

Actually duckdb's date/time implementation https://github.com/duckdb/duckdb/blob/main/src/common/types/date.cpp is not that scary, only 622 LOC including a hundred lines with lookup arrays 🙂 Maybe one day you'll manage to reimplement date/time logic inside polars as well 🤔

Upd: Though, that only 622 LOC for date functionality, then you need to add datetime, time, duration.... Maybe not so simple...

MarcoGorelli commented 5 months ago

might have something...gonna time it and found out

update: https://github.com/pola-rs/polars/pull/13192