vaexio / vaex

Out-of-Core hybrid Apache Arrow/NumPy DataFrame for Python, ML, visualization and exploration of big tabular data at a billion rows per second 🚀
https://vaex.io
MIT License
8.29k stars 590 forks source link

Vaex not utilizing all the CPU cores #1034

Open revathik1991 opened 4 years ago

revathik1991 commented 4 years ago

Description It's more of a general question about the Vaex performance that some vaex operations like groupby,join,isin etc aren't performing well with my application. Though I run my application on 64 CPU cores, during the computation I don't observe the increase in CPU utilization , importantly I'm not observing more threads/processes getting spawned.

I tried updating the env variable (although vaex multiprocessing internally take default value multiprocessing.cpu_count()) , but no luck. os.environ['VAEX_NUM_THREADS'] = '64' os.environ['NUMEXPR_NUM_THREADS'] = '64' os.environ['NUMEXPR_MAX_THREADS'] = '64'

Could you please suggest any other way I can accelerate these processes?

Software information Vaex version : 3 Vaex was installed via: pip OS: Ubuntu 18.04.3 LTS

JovanVeljanoski commented 4 years ago

Hi,

Does your machine have an SSD or a HDD? How do you read in the data? How big is the data? Can you monitor the disk speed, I/O when doing any of those operations?

revathik1991 commented 4 years ago

Hi @JovanVeljanoski

Thanks for the quick reply

My machine runs on SSD. I do open_many(6 arrow files, each file is around 17 G,so totally 100G) and these files are stored on the VM disk. As I check the disk read and write operation when I was running my application , I observed Disk Read(0.03/s) and Disk Write(4.16/s)

Please let me know if you need any further details.

JovanVeljanoski commented 4 years ago

Ah so this is a Cloud somewhere?

Can you try exporting the files as a single hdf5 file, or perhaps (if possible) install the latest alpha version of vaex-core. The latest alpha have many performance improvements regarding concatenating files.

I don't know if that is the case here however.

Perhaps @maartenbreddels has other ideas.

revathik1991 commented 4 years ago

Yes. Just one additional point, when I did previous analysis on the source dataframe(without any filters applied), it was working better (for 6 files, it took just 82 secs). With same files, after adding some filters (like filtering on dates, country etc), although filtering doesn't take much time, the total time for computation taking around 300 to 400 secs. I guess, it's because its evaluating the expressions on-the-fly and do the rest of groupby and join operation?

I did have the latest alpha version installed, I will try and let you know.

maartenbreddels commented 4 years ago

Hi,

Could you say something more about the groupby operation you do? Or something for us to reproduce using fake data?

cheers,

Maarten

revathik1991 commented 4 years ago

@maartenbreddels , Thanks for the reply. source_df schema is {Transaction_ID, product} and other few columns like country,sate,date of purchase->Where Transaction_id is NOT unique, it can be T1 P1 T1 P2 T1 P3 T2 P2 T3 P2 T3 P3 T4 P5

Input - list of products [ P1,P2]

My main goal is filter 2.5 Billion rows vaex dataframe (used open_many of read 6 arrow files where each one have 400M records) and get frequency of the input products and also the associated product frequency in whole transaction. For example, for the above input, my output would be,
P1 - 1 P2 - 3 P3 - 2

P1 and P2 are input products, and P3 is the product bought along with the transactions which have P1 or P2.

My current logic:

Do some filter like country, date_of_purchase Step1 : Take transaction_id that has the input products

filter_trans_df = source_df[source_df.product.isin(input_list, use_hashmap= True)].groupby(source_df.Transaction_ID, agg = ['count']) Step2 : Do join to get the records that has the transaction_ID which are in filter_trans_df

product_trans_df = source_df.join( filter_trans_df , 
                                        left_on='Transaction_ID', 
                                        right_on='Transaction_ID', 
                                        how='inner', 
                                        allow_duplication=True)

Step3: Do values counts to get the sorted series of product frequency result_df= product_trans_df .product.value_counts()

Observed Response Time:

  Try1 Try2 Try3
Step1 106 secs 121 secs 93 secs
Step2 88 secs 46 secs 46
Step3 2 secs 5 Secs 1.5secs
  196 secs 172 secs 139 secs

Sometimes,without any filter (step before the Step1) I have observed the whole process completed in 80 secs.

Why there is such variance in process time? Also, with the lack of other options like distinct, drop_duplicates,count_distinct, I had to use this logic. Other suggestions or ideas are welcome.

revathik1991 commented 4 years ago

@maartenbreddels @JovanVeljanoski Any progress on this? Let me know if you need any further clarifications from my side.

maartenbreddels commented 4 years ago

It would be great if you could provide me with some code that generates a fake dataset for me to reproduce this locally. From this it is difficult to see where the problem lies. Also, could you give me rough specs for the machine you are using?