ptdavies17 / CloudwatchFH2HEC

Cloudwatch Logs Transform for Firehose: formats into Splunk HEC Event
Other
14 stars 17 forks source link

Some records failed while calling PutRecordBatch to Firehose stream, retrying. Individual error codes: ServiceUnavailableException #6

Open sleworthy opened 3 years ago

sleworthy commented 3 years ago

Hi there, Thanks for your work on putting this script together it's helping us hugely! We are seeing an error (where we are processing 3-4000 records of various sizes) where we are presumably trying to send too much data to the Firehose (metrics seem to show it throttles on bytes per second limit being hit) and hitting the limits and doing that 20 times therefore the function is erroring out.

We see lots of "Some records failed while calling PutRecordBatch to Firehose stream, retrying. Individual error codes: ServiceUnavailableException," in the CLoudWatch logs for the function.

Sometimes it seems to get through eventually, but sometimes it hits the 20 retries limit and errors with:

"[ERROR] RuntimeError: Could not put records after 20 attempts. Individual error codes: ServiceUnavailableException"

Looking online it seems to general fix for such issues is to implement a back-off and retry process as per: https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html and https://docs.aws.amazon.com/general/latest/gr/api-retries.html.

I was planning on implementing this into your code, but before doing so wondered if there was an easier fix you knew of?

Thanks in advance

Dicondur commented 3 years ago

Max payload size for lambda is 6mb.

boto3 has builtin support for retries checkout below doc: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#guide-retries

LiamNewtonNHS commented 1 year ago

Hi There, We are currently seeing this issue. What was your work round if you don't mind me asking

Dicondur commented 1 year ago

We were able to increase firehose shards thru service limit increase with AWS support.

Ref: https://docs.aws.amazon.com/firehose/latest/dev/limits.html

sleworthy commented 1 year ago

Hi @LiamNewtonNHS we did implement a back off function which has seemed to solve the issue and also lowered the batch size. Some extracts below. This was based on the original now deprecated version of the lambda though (not the new one as of 10 months ago). If this doesn't help let me know and I'll share the whole lambda sanitised (as we added loads of debug logging statements in too to help us diagnose the issue).

def putRecordsToFirehoseStream(streamName, records, client, attemptsMade, maxAttempts):
    failedRecords = []
    codes = []
    errMessage = []
    errMsg = ''
    # if put_record_batch throws for whatever reason, response['xx'] will error out, adding a check for a valid
    # response will prevent this
    response = None
    try:
        response = client.put_record_batch(DeliveryStreamName=streamName, Records=records)
    except Exception as e:
        failedRecords = records
        errMsg = str(e)

    # if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
    if not failedRecords and response and response['FailedPutCount'] > 0:
        for idx, res in enumerate(response['RequestResponses']):
            # (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
            if 'ErrorCode' not in res or not res['ErrorCode']:
                continue

            codes.append(res['ErrorCode'])
            errMessage.append(res['ErrorMessage'])
            failedRecords.append(records[idx])

        errMsg = 'Individual error codes: ' + ','.join(codes)
        errMsg += 'Individual error messages: ' + ','.join(errMessage)

    if len(failedRecords) > 0:
        #If the failure is due to service timeouts then we will need to backoff and allow more time
        serviceThroughputError = 'ServiceUnavailableException'
        if serviceThroughputError in errMsg:
            print('Hitting capacity/throughput limit on Firehose Stream PutRecordBatch. Backing off and slowing down. Sleeping for the following seconds:',attemptsMade/2)
            time.sleep(attemptsMade/2)
            print('Finished sleeping')
        if attemptsMade + 1 < maxAttempts:
            print('Some records failed while calling PutRecordBatch to Firehose stream, retrying. %s' % (errMsg))
            putRecordsToFirehoseStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
        else:
            raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))

We also lowered the batch sizes in the entry function as so:

 # We have lowered this to 4000000 as we believe this correlates to the 4Mb PutRecordBatch maximum size AWS states, which cannot be changed.
        if projectedSize > 4000000:
            totalRecordsToBeReingested += 1
            recordsToReingest.append(
                getReingestionRecord(isSas, dataByRecordId[rec['recordId']])
            )
            records[idx]['result'] = 'Dropped'
            del(records[idx]['data'])