Open Simong2214 opened 1 year ago
This script allows you to list all the Kinesis Data Streams in your AWS account and displays their encryption details, including whether encryption is enabled and the associated KMS Key ARN if applicable.
Clone the repository:
git clone <repository_url>
Install the required dependencies:
bash Copy code pip install boto3 Usage Open a terminal or command prompt.
Navigate to the directory where the script is located.
Execute the script:
bash Copy code python list_kinesis_streams.py The script will connect to your AWS account and list all the Kinesis Data Streams along with their encryption details.
Permissions Make sure that the AWS credentials used by the AWS CLI have the necessary permissions to access and describe Kinesis Data Streams. Refer to the AWS documentation for more information on configuring AWS credentials and permissions.
Limitations This script assumes that you have the necessary permissions to list and describe Kinesis Data Streams in your AWS account. It only retrieves the streams available in the current AWS account and does not support cross-account listing.
import boto3
def list_kinesis_streams():
# Create AWS Kinesis client
kinesis_client = boto3.client('kinesis')
# List the Kinesis Data Streams
response = kinesis_client.list_streams()
# Iterate over the streams
for stream_name in response['StreamNames']:
response = kinesis_client.describe_stream(StreamName=stream_name)
stream_description = response['StreamDescription']
# Get the KMS key ARN if encryption is enabled
kms_key_arn = stream_description.get('EncryptionType', '') == 'KMS' and \
stream_description.get('KeyId', '')
# Print stream details
print(f"Stream Name: {stream_name}")
print(f"Encryption Type: {stream_description.get('EncryptionType', 'Not encrypted')}")
print(f"KMS Key ARN: {kms_key_arn}")
print('-' * 50)
# Call the function
list_kinesis_streams()
import boto3
def list_kinesis_streams():
# Create AWS Kinesis Data Firehose client
firehose_client = boto3.client('firehose')
# List the Kinesis Data Firehose streams
streams = firehose_client.list_delivery_streams()
# Iterate over the streams
for stream in streams['DeliveryStreamNames']:
response = firehose_client.describe_delivery_stream(
DeliveryStreamName=stream
)
stream_name = response['DeliveryStreamDescription']['DeliveryStreamName']
stream_arn = response['DeliveryStreamDescription']['DeliveryStreamARN']
print(f"Stream Name: {stream_name}")
print(f"Delivery Stream ARN: {stream_arn}")
print()
# Call the function
list_kinesis_streams()