heroku / salesforce-bulk

Python interface to the Salesforce.com Bulk API
MIT License
207 stars 154 forks source link

IndexError in get_batch_result_iter #20

Closed kako-nawao closed 7 years ago

kako-nawao commented 9 years ago

Happens when trying to update the results from an upsert job.

Seems like the second request to fetch the result data is not necessary, since the response data from the first one already contains the final results with each upserted object, instead of the intermediate one with the results ids.

So when we try to extract the result id in line 505 (result_id = r.text.split("<result>")[1].split("</result>")[0]) it blows up, since there is no xml tag at all.

Here's an example I've just ran:

uri = self.endpoint + "/job/%s/batch/%s/result" % (job_id, batch_id)
r = requests.get(uri, headers=self.headers(), stream=True)
print(r.text)
>>> u'"Id","Success","Created","Error"\n"701o0000000kiShAAI","true","true",""\n'

I assume this behaviour is not observed with other kinds of jobs, so in that case I guess we could check whether the text contains the ids or the full result set.

kako-nawao commented 9 years ago

Yeah, making that check ("<result>" in r.text) seems to do the trick, but smells really hacky. Anybody know if there's a proper pattern we can follow? Is there any doc that says that responses vary depending on job type? That seems highly irregular.

barry-nelson commented 8 years ago

A Query job will return the proper xml tag with "< result >" and therefore returns a Result_Id used to obtain the query results. Insert and Delete jobs do not return any xml tags, they do not require a Result_Id in order to get the batch results. I found this with just trial and error.

It seems as though they have stopped supporting this module.

andrewfogg commented 8 years ago

For myself I amended the get_batch_result_iter function so that it takes a new argument query_job that I can set to True. And then have put those 3 offending lines of code inside a conditional.

        if query_job == True:
            #https://github.com/heroku/salesforce-bulk/issues/20
            result_id = r.text.split("<result>")[1].split("</result>")[0]
            uri = self.endpoint + \
                "/job/%s/batch/%s/result/%s" % (job_id, batch_id, result_id)
            r = requests.get(uri, headers=self.headers(), stream=True)

The full function is below.

    def get_batch_result_iter(self, job_id, batch_id, parse_csv=False, query_job=False,
                              logger=None):
        """
        Return a line interator over the contents of a batch result document. If
        csv=True then parses the first line as the csv header and the iterator
        returns dicts.
        """
        status = self.batch_status(job_id, batch_id)
        if status['state'] != 'Completed':
            return None
        elif logger:
            if 'numberRecordsProcessed' in status:
                logger("Bulk batch %d processed %s records" %
                       (batch_id, status['numberRecordsProcessed']))
            if 'numberRecordsFailed' in status:
                failed = int(status['numberRecordsFailed'])
                if failed > 0:
                    logger("Bulk batch %d had %d failed records" %
                           (batch_id, failed))

        uri = self.endpoint + \
            "/job/%s/batch/%s/result" % (job_id, batch_id)
        r = requests.get(uri, headers=self.headers(), stream=True)

        if query_job == True:
            #https://github.com/heroku/salesforce-bulk/issues/20
            result_id = r.text.split("<result>")[1].split("</result>")[0]
            uri = self.endpoint + \
                "/job/%s/batch/%s/result/%s" % (job_id, batch_id, result_id)
            r = requests.get(uri, headers=self.headers(), stream=True)

        if parse_csv:
            return csv.DictReader(r.iter_lines(chunk_size=2048), delimiter=",",
                                  quotechar='"')
        else:
            return r.iter_lines(chunk_size=2048)

Then usage would be something like this...

Bulk Query Example

job = bulk.create_query_job("Contact", contentType='CSV')
batch = bulk.query(job, "select Id,LastName from Contact")
while not bulk.is_batch_done(job, batch):
    sleep(10)
bulk.close_job(job)

for row in bulk.get_batch_result_iter(job, batch, parse_csv=True, query_job=True):
    print row   #row is a dict

Bulk Insert, Update, Delete Example (no change)

from salesforce_bulk import CsvDictsAdapter

job = bulk.create_insert_job("Account", contentType='CSV')

accounts = [dict(Name="Account%d" % idx) for idx in xrange(5)]

csv_iter = CsvDictsAdapter(iter(accounts))

batch = bulk.post_bulk_batch(job, csv_iter)

bulk.wait_for_batch(job, batch)

bulk.close_job(job)

print "Done. Accounts uploaded."

I had a look around and there is no obvious way of telling from the batch_statuses that are returned from Salesforce what is a bulk query-job versus a bulk insert-, update- or delete-job.

msandstrom commented 8 years ago

Thanks andrewfogg, good solution

lambacck commented 7 years ago

Fixed by #40