heroku / salesforce-bulk

Python interface to the Salesforce.com Bulk API
MIT License
206 stars 157 forks source link

get_all_results_for_query_batch extremely slow. #69

Open peterHoburg opened 5 years ago

peterHoburg commented 5 years ago

Hi all, I am fetching ~10m records from a SF object using sf_bulk.create_query_job(sf_object_name, contentType="CSV", concurrency='Parallel', pk_chunking=True). All of the chunks finish in a reasonable amount of time. Then it comes time to get all of the records from each chunk, ~200,000 per chunk, I am using a list comprehension with get_all_results_for_query_batch() to get all of the results and return a list.

records_comprehension = [
record.decode("utf-8")
for chunk in sf_bulk.get_all_results_for_query_batch(batch_id, job)
for record in IteratorBytesIO(chunk)
]

To do this for ~200,000 items it is taking 20+ minutes and using ~15% of my 8 core CPU (I know I know python and multi core and all that. Just an interesting number. 12.5% is 100% usage of one core). As you can imagine that's not the best thing when trying to get 10m items. Right now I think I am being limited by single core performance during the creation of the records_comprehension specifically therecord.decode("utf-8") if I had to guess.

I am planning on parallelize the entire program in to 10 or so different processes to maximize the cpu resources I have available.

Other than parrelleizing the consumption of chunks is there something I am doing wrong? Is there a better way to go about this? In the examples it is essentially doing it this way, but with a double for loop instead of a comprehension (comprehension should be faster)

PS. When I get my code working as efficiently as possible I will submit a PR with a PK chunk example and some more function docs. Thank you Christopher Lambacher for making this project!

nikhyada commented 5 years ago

@peterHoburg I have around 30+ chunks (~100,000 per chunk) and it's taking to much time for the program to execute when I am passing batch id's one after another into 'get_all_results_for_query_batch '. Did you find multithreading an efficient or some other approach you tried? Please advise?

lambacck commented 5 years ago

get_all_results_for_query_batch is a convenience method. For very large quantities of records you likely want to be using some of the lower level methods to control which batches you get and when. There certainly is a bunch of IO overhead that can be parallelized. See https://github.com/heroku/salesforce-bulk/issues/62#issuecomment-436219222 for some ideas of where to start.

nikhyada commented 5 years ago

@lambacck Hi, I implemented multiprocessing and getting below always this error: Traceback (most recent call last): File "/usr/lib64/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib64/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "thread.py", line 78, in write_op for row in reader: File "/usr/lib64/python3.6/csv.py", line 112, in next row = next(self.reader) File "/usr/lib/python3.6/dist-packages/unicodecsv/py3.py", line 55, in next return self.reader.next() File "/usr/lib/python3.6/dist-packages/unicodecsv/py3.py", line 51, in f = (bs.decode(encoding, errors=errors) for bs in f) File "/usr/lib/python3.6/dist-packages/salesforce_bulk/util.py", line 13, in read return bytes(bytearray(islice(self.iterator, None, n))) File "/usr/lib/python3.6/dist-packages/salesforce_bulk/salesforce_bulk.py", line 495, in iter = (x.replace(b'\0', b'') for x in resp.iter_content(chunk_size=chunk_size)) File "/usr/lib/python3.6/dist-packages/requests/models.py", line 753, in generate raise ChunkedEncodingError(e) requests.exceptions.ChunkedEncodingError: ('Connection broken: IncompleteRead(527 bytes read, 1521 more expected)', IncompleteRead(527 bytes read, 1521 more expected))

Sorry, I am new to python, could you please advise if I am doing something wrong.

nikhyada commented 5 years ago

Just to add on above, when I have controlled multiprocessing i.e. around 8-10 at a time it works fine but when I tried to use more than 10, in my scenario I used 16 I continuously got above error. I am using an octa-core processor and I believe it's not an OS related issue, is it due to SF API? Please advise?

Also, the result return Dict item, can I directly get a CSV o/p instead of DictCSV so I don't have to convert it explicitly to CSV

aayushsf commented 6 months ago

Hi, I'm facing a similar issue. Was this issue ever resolved?

I'm fetching a million records from Salesforce is taking more than 4 hours. I've identified that the IteratorBytesIO() function is consuming a substantial amount of time due to the unpacking process of the resulting JSON data. Is there a possibility to directly retrieve the data in JSON or CSV format without receiving it as an iterator?