Open ilyanoskov opened 8 months ago
Additionally, I was not really able to run Pathway with multiple processes on WSL Linux (there was some kind of TCP issue?). What addresses do the processes use for inter-process communication? Is it some kind of internal server?
Steps to reproduce
I have a simple Pathway function that takes more than 30 seconds to run. I am curious if this is expected or there is something wrong? How long does this take for you?
The code is:
import pathway as pw class MySchema(pw.Schema): datetime: str flag1: bool val1: float val2: str val3: int def run(): data = pw.io.csv.read('data.csv', schema=MySchema, mode='static') clean_data = data.select(val1=pw.this.val1, val2=pw.this.val2, datetime=pw.this.datetime.dt.strptime(fmt='%Y-%m-%dT%H:%M:%S.%f')) pw.debug.compute_and_print(clean_data, n_rows=5) run()
Just create a CSV file with 1M rows to test this.
Now, I am trying to understand why is this taking so long. What is the best way to profile Pathway performance?
The culprit here is the pw.debug.compute_and_print
function – it is not really meant for handling non-trivial amounts of data.
You can try replacing it with something like
pw.io.csv.write(clean_data, 'clean_data.csv')
pw.run()
it should be significantly faster and additionally it will provide some monitoring output.
Also, what is the best way to load the data with the DateTimeNaive datatype from CSV? The logs from previous runs are telling me
parsing DateTimeNaive from an external datasource is not supported
.
What you have done is probably the best way currently.
I guess you tried to provide DateTimeNaive
as the type directly in the schema? This is something that it would be nice to support, but it is a bit tricky – for example, you need some way to pass the expected format, as there are many possibilities.
Hi @embe-pw, thanks a lot for taking a look! This helped, now the ingestion is fast.
However, I am seeing that subsequent operations (sliding windows, group by, join) on 1M rows of data takes a long time. At least 5 minutes. Is this expected?
Are there any benchmarks for Pathway to compare the performance on my machine? Or what is the best way to profile Pathway?
So I have this little Pathway snippet that takes a long time to execute (10 minutes+). Can you please advise, what can be the reason why?
The data is a simple CSV with 2 columns: datetime, val1(float). I read the file with a csv reader and then write it with a csv writer (static or streaming mode)
import datetime as dt
import pathway as pw
import math
def myfunc(val1: float, val2: float) -> float:
return math.log(val1) / val2
def calculate(data_stream):
data1 = data_stream
data2 = data1.copy()
calc = (data1.interval_join(data2,
data1.datetime,
data2.datetime,
pw.temporal.interval(-dt.timedelta(minutes=1), -dt.timedelta(minutes=1)))
.select(datetime=data1.datetime,
val3=pw.apply_with_type(myfunc, float, data1.val1, data2.val1)))
agg = (calc.windowby(pw.this.datetime,
window=pw.temporal.sliding(hop=dt.timedelta(minutes=1), duration=dt.timedelta(minutes=50)),
behavior=pw.temporal.exactly_once_behavior())
.reduce(datetime=pw.this._pw_window_end,
avg=pw.reducers.avg(pw.this.val3)))
return agg
It's very hard to debug what could be the reason behind this slowdown. Is it some kind of (silent) engine error on my machine? Or maybe the sliding window is silently failing / taking too long? How can I profile the performance of this query to find the slowest parts?
My version is 0.8.3
Hey @ilyanoskov, thank you for reporting that. We are investigating the issue and will get back with the fix soon.
Hey @ilyanoskov, I can confirm that the code can be slow and there's nothing incorrect on your side. The optimization of sliding windows is planned for the future. For now, you can use a slightly modified version of the code that performs the aggregation twice - once aggregates data over windows of length of 1 minute and then aggregates the windows to new windows of length of 50 minutes.
def calculate(data_stream):
data1 = data_stream
data2 = data1.copy()
calc = (data1.interval_join(data2,
data1.datetime,
data2.datetime,
pw.temporal.interval(-dt.timedelta(minutes=1), -dt.timedelta(minutes=1)))
.select(datetime=data1.datetime,
val3=pw.apply_with_type(myfunc, float, data1.val1, data2.val1)))
agg = (
calc.windowby(
pw.this.datetime,
window=pw.temporal.tumbling(duration=dt.timedelta(minutes=1)),
behavior=pw.temporal.exactly_once_behavior(),
)
.reduce(
datetime=pw.this._pw_window_start,
sum=pw.reducers.sum(pw.this.val3),
cnt=pw.reducers.count(),
)
.windowby(
pw.this.datetime,
window=pw.temporal.sliding(
hop=dt.timedelta(minutes=1), duration=dt.timedelta(minutes=50)
),
)
.reduce(
datetime=pw.this._pw_window_end,
avg=pw.reducers.sum(pw.this.sum) / pw.reducers.sum(pw.this.cnt),
)
)
return agg
Hey @ilyanoskov, I can confirm that the code can be slow and there's nothing incorrect on your side. The optimization of sliding windows is planned for the future.
For now, you can use a slightly modified version of the code that performs the aggregation twice - once aggregates data over windows of length of 1 minute and then aggregates the windows to new windows of length of 50 minutes.
def calculate(data_stream): data1 = data_stream data2 = data1.copy() calc = (data1.interval_join(data2, data1.datetime, data2.datetime, pw.temporal.interval(-dt.timedelta(minutes=1), -dt.timedelta(minutes=1))) .select(datetime=data1.datetime, val3=pw.apply_with_type(myfunc, float, data1.val1, data2.val1))) agg = ( calc.windowby( pw.this.datetime, window=pw.temporal.tumbling(duration=dt.timedelta(minutes=1)), behavior=pw.temporal.exactly_once_behavior(), ) .reduce( datetime=pw.this._pw_window_start, sum=pw.reducers.sum(pw.this.val3), cnt=pw.reducers.count(), ) .windowby( pw.this.datetime, window=pw.temporal.sliding( hop=dt.timedelta(minutes=1), duration=dt.timedelta(minutes=50) ), ) .reduce( datetime=pw.this._pw_window_end, avg=pw.reducers.sum(pw.this.sum) / pw.reducers.sum(pw.this.cnt), ) ) return agg
Thank you! How long does it run now?
I don't have access to your data and the execution time depends on the data distribution (it influences the number of rows that is produced in interval_join
). On 1M rows dataset generated by me (with consecutive records separated by 1 second) it takes about 1 minute.
With the fix that will be shipped in the next pathway version, the time is reduced to 25s.
@KamilPiechowiak Thanks for this fix. I'd say if we are down to ~25s single-threaded (and then ideally to under 5s on 8 worker threads), this can be considered reasonable timing for a snippet of this weight (50x input amplification x 1M input rows = 50M row updates, corresponding to a throughput of 1M-2M data row updates per second, per worker). However, the fix still relies heavily on the pre-reduction phase to eliminate amplification.
@ilyanoskov Thanks for reporting this. Happy to know if the improved version addresses your efficiency concern sufficiently.
@KamilPiechowiak @KamilPiechowiak thanks a lot for taking a look. I am afraid 25 seconds is still quite a long time for this, Clickhouse takes ~15seconds to calculate similar features (but 2x more and over more windows), also Polars is at around the same performance as Clickhouse (in my tests). I also understand that these are not really streaming frameworks and it's not a fair comparison, but here I am looking for the fastest way to calculate the features and then keep them up to date.
I was doing a POC for my use case with this small example, but in reality I will be calculating hundreds of features across many windows, and a much larger dataset (~1B records). So it looks like Pathway is perhaps not the best tool here. Unless there is a good way to pre-compute the large dataset of features (with Clickhouse or Polars) and then update it with Pathway incrementally in a streaming fashion? But then that makes one need to write two versions of the code, one for batch and one for streaming...
P.S. I think it would be really good to introduce some kind of benchmark for window aggregation with Pathway, just to be able to track it's performance
Unless there is a good way to pre-compute the large dataset of features (with Clickhouse or Polars) and then update it with Pathway incrementally in a streaming fashion? But then that makes one need to write two versions of the code, one for batch and one for streaming...
@ilyanoskov It's an excellent point, in a mixed setup like this, usually one of the two costs/efforts (1. batch 2. streaming/incremental) dominates the other. Most use cases focus on optimizing the streaming part only as it will dominate overall cost/effort by orders of magnitude, and it doesn't make sense to overfocus on the batch part as it will remain relatively small, even if it's a factor of say 5x away from optimal - BUT your use case may be different.
P.S. I think it would be really good to introduce some kind of benchmark for window aggregation with Pathway, just to be able to track it's performance
Thanks for the idea, @KamilPiechowiak I'm all for adding this one to benchmarking in CI/CD tests.
I am afraid 25 seconds is still quite a long time for this.
@ilyanoskov, did you take the number from my comment or did you measure it? I measured it on the data I generated on my own, so the distribution might be different than yours. For example in my dataset each input entry to interval join results in 121 output entries. If your dataset is less dense the number will be smaller and the computation faster.
Thanks for the idea, @KamilPiechowiak I'm all for adding this one to benchmarking in CI/CD tests.
Yes, we can add it.
Steps to reproduce
I have a simple Pathway function that takes more than 30 seconds to run. I am curious if this is expected or there is something wrong? How long does this take for you?
The code is:
Just create a CSV file with 1M rows to test this.
Now, I am trying to understand why is this taking so long. What is the best way to profile Pathway performance? Also, what is the best way to load the data with the DateTimeNaive datatype from CSV? The logs from previous runs are telling me
parsing DateTimeNaive from an external datasource is not supported
.Relevant log output
What did you expect to happen?
I expected that this operation would take a few hundred ms at best? Or maybe a second.
Version
0.8.3
Docker Versions (if used)
No response
OS
Linux
On which CPU architecture did you run Pathway?
x86-64