Open job-almekinders opened 3 months ago
Performance comparison sounds good. Both use COPY command under the hood, so I wouldn't expect too much of a difference, but stranger things have happened :). adbc
might have an upper hand in user-friendliness as input is just arrow tables, not sure how easy it will be to prepare the input for psycopg
COPY command.
Thanks for your input! We will probably give it a spin sometime soon, so let's see what the results are :)
So unfortunately COPY
(even using adbc
) is not faster than the current execute_values
of psycopg2
.
COPY
excels when you are inserting a large file at once. However, the materialize
method calls the _materialize_one
method of the LocalMaterializationJob
class, which loops over many smaller batches (source) using for batch in table.to_batches(DEFAULT_BATCH_SIZE):
. DEFAULT_BATCH_SIZE
is set to 10,000, but the to_batches
method on a pyarrow.Table
only uses that as a max chunk size. The docs note the following:
Maximum number of rows for each RecordBatch chunk. Individual chunks may be smaller depending on the chunk layout of individual columns.
This is indeed what happens (at least with my dataset and Postgres database). The chunks are oftentimes a lot smaller than 10,000.
So the online_write_batch
method on the PostgreSQLOnlineStore
class is called many times during a single materialization. Changing to COPY
here is therefore not beneficial, especially since COPY
doesn't support an ON CONFLICT
clause like INSERT
, which is required to properly update feature rows that are already present in the online store. So you need to COPY
into some staging table first and then still call an INSERT
using a CONFICT ON
clause.
So how best to proceed?
LocalMaterializationJob._materialize_one
method to not batch seems difficult as that will impact ALL materialization methods for all online stores. PostgreSQLOnlineStore.online_write_batch
method much more as it is called so often with such small batches. pyarrow.Table
method returns also does not seem possible according to the docs.Anyone any suggestions?
(P.S. I also had a look at the SnowflakeMaterializationEngine.materialize_to_external_online_store
method but that also calls the online_write_batch
in many smaller batches of which the size is not controllable: for i, df in enumerate(cursor.fetch_pandas_batches()):
(source).
Yet, it seems like we cannot speed up the PostgreSQLOnlineStore.online_write_batch method much more as it is called so often with such small batches.
There is one small improvement possible, and that is getting rid of the batching within PostgreSQLOnlineStore.online_write_batch
, where we batch again but now with batch size 5000 (source). This does result in some speed up in case the batches passed by LocalMaterializationJob._materialize_one
exceed 5000.
@TomSteenbergen Can you share some numbers as well? How much of an improvement are we talking about if we do away with batching in the local materialization engine?
@tokoko Testing using a feature view with 100,000 entities and 6 features (so 600,000 table rows):
execute_values
from psycopg2
: 192.6 secondsCOPY
using adbc_ingest
: 142.2 seconds
COPY
queryINSERT ON CONFLICT
query (!!!)@TomSteenbergen thanks, are the last numbers with materialization engine batching left intact or disabled? Can't you also set DEFAULT_BATCH_SIZE to a very large number and run workloads like that?
@TomSteenbergen thanks, are the last numbers with materialization engine batching left intact or disabled?
Intact, I did not alter DEFAULT_BATCH_SIZE
for the last tests under Change batch size of Postgres online store
. Just the batch size of 5000 in the PostgresOnlineStore.
Can't you also set DEFAULT_BATCH_SIZE to a very large number and run workloads like that?
I can, but changing that number will affect materialization for all online stores. Note that many of the returned batches by pyarrow.Table.to_batches
are actually smaller than DEFAULT_BATCH_SIZE
. See comment here
Just ran the materialization with the same feature set by changing DEFAULT_BATCH_SIZE
from 10,000 to 1,000,000 and keeping the batch size of the Postgres online store to 5000, but it doesn't change anything as the max batch size observed that's returned by pyarrow.Table.to_batches
is 2890, still well below 10,000
I can, but changing that number will affect materialization for all online stores.
Yeah, I got that. It might be worth it to mess with other online stores if it proves to be important for performance here (or we can make materialization engine ask online store for the value instead of using a default one).
Note that many of the returned batches by
pyarrow.Table.to_batches
are actually smaller thanDEFAULT_BATCH_SIZE
. See comment here
I'm thinking maybe that's the case because pyarrow is following some heuristics and keeping those numbers below DEFAULT_BATCH_SIZE
. Maybe with high enough batch size, actual batch sizes will also increase... just speculating.
Leaving all that aside, this seems to be much more of an upsert rather than an insert problem. We might have more luck optionally switching to "batch upsert" mode when the dataset to materialize is large enough (write to staging, lock table, run update and insert statements in a transaction). This means quite a few design changes in both materialization engines and online store interfaces, though. We might also need to distinguish between full table materializations and small inserts from /push
endpoint. I wonder how valid these concerns are for other online stores.
@TomSteenbergen Another request if it's not a trouble. Can you get rid of to_batches
call altogether and call ingest on the whole pyarrow table?
@TomSteenbergen Another request if it's not a trouble. Can you get rid of to_batches call altogether and call ingest on the whole pyarrow table?
With ingest
you mean the second approach, using COPY
through adbc_ingest
? That is exactly what I did here: a single COPY
using adbc_ingest
on the whole table without any batching by the LocalMaterializationEngine
or by PostgresOnlineStore
.
this seems to be much more of an upsert rather than an insert problem.
Agreed. Inserting data in a staging table with a COPY
command and just replacing the feature table with the staging table is quick. But if we need to keep any older rows in the table that are not updated by the materialization call, then we need to upsert using INSERT ON CONFLICT
, which is rather slow.
Perhaps we should, in addition to materialize
, introduce a different method that doesn't upsert but just replaces the feature table in the online store with the latest feature values observed in the offline store, and thus get rid of any old data. Not sure how to name this new method, perhaps synchronize
.
(This might be a nice method to have in any case, e.g. when an earlier materialization run materialized some erroneous data to the online store.)
Agreed. Inserting data in a staging table with a
COPY
command and just replacing the feature table with the staging table is quick. But if we need to keep any older rows in the table that are not updated by the materialization call, then we need to upsert usingINSERT ON CONFLICT
, which is rather slow.
Not necessarily, instead of INSERT ON CONFLICT
, you can lock the table and run two statements: one for updates (with inner join) and another for remaining inserts (whatever's left with an antijoin). That should be a lot faster. The main problem here is that other materialization engines (for example spark) run these method calls in parallel. We'll have to somehow change the whole thing to run in two phases (first to insert in parallel and then run merge after all inserts are done).
Perhaps we should, in addition to
materialize
, introduce a different method that doesn't upsert but just replaces the feature table in the online store with the latest feature values observed in the offline store, and thus get rid of any old data. Not sure how to name this new method, perhapssynchronize
.
Maybe, but if we solve the materialize problem above this could just be (remove
+ materialize
). Plus, this is a lot harder to solve for other online stores, redis collocates data with entities rather than feature views, it's not easy to truncate the existing data.
Not necessarily, instead of INSERT ON CONFLICT, you can lock the table and run two statements: one for updates (with inner join) and another for remaining inserts (whatever's left with an antijoin). That should be a lot faster.
@tokoko That's an option indeed, but why would that be a lot faster? A quick Google search seems to indicate that INSERT ON CONFLICT
is the most performant method for upserting data in Postgres. 😞
Plus, this is a lot harder to solve for other online stores, redis collocates data with entities rather than feature views, it's not easy to truncate the existing data.
Fair point, indeed not the easiest route.
As a short term improvement, I can get rid of the batching in PostgreSQLOnlineStore
as that doesn't seem to speed things up (see this comment). While there, I can also fix the tqdm
progress bar that now is not correctly initialized (num entities
instead of number of entities * number of features in feature view
)
WDYT @tokoko?
That's an option indeed, but why would that be a lot faster? A quick Google search seems to indicate that
INSERT ON CONFLICT
is the most performant method for upserting data in Postgres. 😞
hmm.. after a google search of my own, sure, I may be wrong. I still feel like there should be a more performant workaround though.. something like doing a merge with a simple join and write out results to a (second 😄) staging table, then truncate the existing table and move data from staging to main. idk, maybe, maybe not.
As a short term improvement, I can get rid of the batching in
PostgreSQLOnlineStore
as that doesn't seem to speed things up (see this comment). While there, I can also fix thetqdm
progress bar that now is not correctly initialized (num entities
instead ofnumber of entities * number of features in feature view
)
Sure, that certainly can't hurt.
@tokoko Created a PR here: https://github.com/feast-dev/feast/pull/4331
Now that we are using psycopg
, we use executemany
. I did the same performance comparison and found that a single execute_many
call in online_write_batch
is better:
executemany
for every 5000 rows: 188 secondsexecutemany
: 180 secondsexecute
for each row within a pipeline: 197 seconds
Is your feature request related to a problem? Please describe. The current implementation of
PostgreSQLOnlineStoreConfig.online_write_batch()
is quite slow.Describe the solution you'd like One of the suggested options discussed in this issue is using bulk writes with adbc.
Describe alternatives you've considered Another option could be to use
COPY FROM
command. This performance benchmark shows that the number of rows written per second is significantly larger compared to the batch insert we are doing now.This image shows the comparison from the performance benchmark:
Additional context I think we should benchmark both solutions and pick the one that achieves the best performance.