import argparse
import boto3
from tabulate import tabulate
def assume_role(account_id, role_name):
# Assume the specified IAM role to get temporary credentials
sts_client = boto3.client('sts')
role_arn = f'arn:aws:iam::{account_id}:role/{role_name}'
assumed_role_object = sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName="kinesis-encryption-list"
)
return assumed_role_object
def get_firehose_stream_encryption_info(firehose_client, stream_name):
# Get encryption information for a single stream
stream_description = firehose_client.describe_delivery_stream(DeliveryStreamName=stream_name)
delivery_stream_description = stream_description['DeliveryStreamDescription']
if 'DeliveryStreamEncryptionConfiguration' in delivery_stream_description:
encryption_configuration = delivery_stream_description['DeliveryStreamEncryptionConfiguration']
encryption_status = encryption_configuration['Status']
if encryption_status == 'ENABLED':
key_type = encryption_configuration['KeyType']
else:
key_type = 'Not applicable'
else:
encryption_status = 'DISABLED'
key_type = 'Not enabled'
return {
'Stream Name': stream_name,
'Encryption Status': encryption_status,
'Key Type': key_type,
}
def list_firehose_streams_with_kms(credentials, filter_words=None):
# Connect to AWS using temporary credentials
session = boto3.Session(
aws_access_key_id=credentials['Credentials']['AccessKeyId'],
aws_secret_access_key=credentials['Credentials']['SecretAccessKey'],
aws_session_token=credentials['Credentials']['SessionToken']
)
firehose_client = session.client('firehose')
streams = firehose_client.list_delivery_streams()
table_data = []
for stream_name in streams['DeliveryStreamNames']:
# If filter_words is not provided, consider all streams; otherwise, filter based on the provided words
if filter_words is None or any(word in stream_name for word in filter_words):
stream_data = get_firehose_stream_encryption_info(firehose_client, stream_name)
table_data.append(stream_data)
headers = ['Stream Name', 'Encryption Status', 'Key Type']
print(tabulate(table_data, headers=headers, tablefmt="grid"))
if __name__ == '__main__':
# Parse command-line arguments
parser = argparse.ArgumentParser(description='List Kinesis Data Firehose streams with KMS encryption.')
parser.add_argument('filter_words', nargs='*', help='List of words to filter stream names')
parser.add_argument('--role-name', required=True, help='Name of the IAM role to assume')
parser.add_argument('--account-id', required=True, help='AWS account ID')
args = parser.parse_args()
filter_words = args.filter_words
role_name = args.role_name
account_id = args.account_id
# Assume the specified IAM role and handle errors
try:
credentials = assume_role(account_id, role_name)
list_firehose_streams_with_kms(credentials, filter_words)
except Exception as e:
print(f"Error: {str(e)}")