awslabs / aws-config-rules

[Node, Python, Java] Repository of sample Custom Rules for AWS Config.
http://aws.amazon.com/config/
Creative Commons Zero v1.0 Universal
1.58k stars 851 forks source link

Getting [ERROR] KeyError: 'invokingEvent' while executing IAM_USER_MFA_ENABLED #415

Open preethira opened 9 months ago

preethira commented 9 months ago

I am trying to execute the code from https://github.com/awslabs/aws-config-rules/blob/master/python/IAM_USER_MFA_ENABLED/IAM_USER_MFA_ENABLED.py in a lambda. I am getting the following error:

[ERROR] KeyError: 'invokingEvent'Traceback (most recent call last):  File "/var/task/lambda_function.py", line 365, in lambda_handler    invoking_event = json.loads(event['invokingEvent']) | [ERROR] KeyError:

Can someone please help.

vegisureshsp commented 6 months ago

The error you're encountering, KeyError: 'invokingEvent', indicates that the key 'invokingEvent' is not present in the event dictionary. To rectify this error, you should check if the key exists before trying to access it.

def lambda_handler(event, context):

global AWS_CONFIG_CLIENT
AWS_CONFIG_CLIENT = get_client('config', event)

# Check if 'invokingEvent' key exists in the event dictionary
if 'invokingEvent' not in event:
    return build_error_response("Missing invokingEvent key", "The 'invokingEvent' key is not present in the event dictionary.", 'MissingKeyError', 'Missing invokingEvent key')

#print(event)
invoking_event = json.loads(event['invokingEvent'])
rule_parameters = {}
if 'ruleParameters' in event:
    rule_parameters = json.loads(event['ruleParameters'])

try:
    valid_rule_parameters = evaluate_parameters(rule_parameters)
except ValueError as ex:
    return build_parameters_value_error_response(ex)

try:
    configuration_item = get_configuration_item(invoking_event)
    if invoking_event['messageType'] in ['ConfigurationItemChangeNotification', 'ScheduledNotification', 'OversizedConfigurationItemChangeNotification']:
        if is_applicable(configuration_item, event):
            compliance_result = evaluate_compliance(event, configuration_item, valid_rule_parameters)
        else:
            compliance_result = "NOT_APPLICABLE"
    else:
        return {'internalErrorMessage': 'Unexpected message type ' + str(invoking_event)}
except botocore.exceptions.ClientError as ex:
    if is_internal_error(ex):
        return build_internal_error_response("Unexpected error while completing API request", str(ex))
    return build_error_response("Customer error while making API request", str(ex), ex.response['Error']['Code'], ex.response['Error']['Message'])
except ValueError as ex:
    return build_internal_error_response(str(ex), str(ex))

evaluations = []
latest_evaluations = []

if not compliance_result:
    latest_evaluations.append(build_evaluation(event['accountId'], "NOT_APPLICABLE", event, resource_type='AWS::::Account'))
    evaluations = clean_up_old_evaluations(latest_evaluations, event)
elif isinstance(compliance_result, str):
    evaluations.append(build_evaluation_from_config_item(configuration_item, compliance_result))
elif isinstance(compliance_result, list):
    for evaluation in compliance_result:
        missing_fields = False
        for field in ('ComplianceResourceType', 'ComplianceResourceId', 'ComplianceType', 'OrderingTimestamp'):
            if field not in evaluation:
                print("Missing " + field + " from custom evaluation.")
                missing_fields = True

        if not missing_fields:
            latest_evaluations.append(evaluation)
    evaluations = clean_up_old_evaluations(latest_evaluations, event)
elif isinstance(compliance_result, dict):
    missing_fields = False
    for field in ('ComplianceResourceType', 'ComplianceResourceId', 'ComplianceType', 'OrderingTimestamp'):
        if field not in compliance_result:
            print("Missing " + field + " from custom evaluation.")
            missing_fields = True
    if not missing_fields:
        evaluations.append(compliance_result)
else:
    evaluations.append(build_evaluation_from_config_item(configuration_item, 'NOT_APPLICABLE'))

# Put together the request that reports the evaluation status
resultToken = event['resultToken']
testMode = False
if resultToken == 'TESTMODE':
    # Used solely for RDK test to skip actual put_evaluation API call
    testMode = True
# Invoke the Config API to report the result of the evaluation
evaluation_copy = []
evaluation_copy = evaluations[:]
while(evaluation_copy):
    AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluation_copy[:100], ResultToken=resultToken, TestMode=testMode)
    del evaluation_copy[:100]
# Used solely for RDK test to be able to test Lambda function
return evaluations