Closed danotorrey closed 5 years ago
Only Kinesis currently supports subscribing to and receiving data via streams. This isn't supported by CloudWatch Logs at this time.
Right now, polling is your best bet.
Makes sense, thank you!
@millems Ok, so polling certainly works. We are currently looking at using CloudWatchLogsClient.filterLogEvents()
, because we want to read from an entire log group.
We want to read all logs in a particular log group, then periodically check back to read any new logs.
We looked at using the nextToken
functionality, but sometimes it is null when we have read to the end of the available logs (and sometimes not). Also, these tokens expire after 24 hours. If one of our reader nodes is down for 24 hours, we would not be able to keep reading just the new messages.
Do you recommend any alternative? Would filtering down by timestamp work? Are messages returned in timestamp order? Is it correct that we could filter messages excluding those older than a particular timestamp? This seems unreliable, since some messages might have a borderline timestamp that eight get left out or duplicate included.
Is there some kind of absolute sequence that we can use?
Sorry, we're not too familiar with the actual semantics of the CloudWatch APIs. We're only really familiar with the protocols used to communicate with CloudWatch.
Your best bet would be to ask in the AWS forums or stack overflow, but I feel bad sending you somewhere else right off the bat.
After some googling, it looks like you can send CloudWatch logs to a Kinesis shard, which DOES have a real-time streaming API in its subscribe-to-shard operation. That might work, but I'm definitely not an expert.
I couldn't find in the CloudWatch API docs what the ordering for results were (your eyes may be better than mine), so I'm not sure how to sequence the results using polling. If it's definitely sorted by date and there's no chance of logs being missing, then filtering by timestamp should work (it looks like the get-logs API also supports filtering by timestamp).
Looks like it works! Proof of concept:
try (IamClient iam = IamClient.builder().region(Region.AWS_GLOBAL).build();
CloudWatchLogsClient cloudWatch = CloudWatchLogsClient.create();
KinesisAsyncClient kinesisClient = KinesisAsyncClient.create()) {
// Create our CloudWatch log group
String logGroup = "test-log-group";
String logStream = "test-log-stream-" + UUID.randomUUID();
cloudWatch.createLogGroup(r -> r.logGroupName(logGroup));
cloudWatch.createLogStream(r -> r.logGroupName(logGroup).logStreamName(logStream));
// Create our Kinesis stream
String stream = "test-stream";
kinesisClient.createStream(r -> r.streamName(stream).shardCount(1)).join();
StreamDescription streamDescription;
do {
streamDescription = kinesisClient.describeStream(r -> r.streamName(stream)).join().streamDescription();
} while (streamDescription.streamStatus() != StreamStatus.ACTIVE);
String streamArn = streamDescription.streamARN();
String shardId = streamDescription.shards().get(0).shardId();
// Create a role that will allow CloudWatch to talk to Kinesis
String roleName = "test-cloudwatch-to-kinesis-role";
String rolePolicyName = "write-to-kinesis";
String assumeRolePolicy =
"{\n" +
" \"Statement\": [\n" +
" {\n" +
" \"Effect\": \"Allow\",\n" +
" \"Principal\": { \"Service\": \"logs.us-west-2.amazonaws.com\" },\n" +
" \"Action\": \"sts:AssumeRole\"\n" +
" }\n" +
" ]\n" +
"}";
String rolePolicy =
"{\n" +
" \"Statement\": [\n" +
" {\n" +
" \"Effect\": \"Allow\",\n" +
" \"Action\": \"kinesis:PutRecord\",\n" +
" \"Resource\": \"" + streamArn + "\"\n" +
" }\n" +
" ]\n" +
"}";
iam.createRole(r -> r.roleName(roleName).assumeRolePolicyDocument(assumeRolePolicy));
iam.putRolePolicy(r -> r.roleName(roleName).policyName(rolePolicyName).policyDocument(rolePolicy));
// Give IAM a bit - it's eventually consistent
Thread.sleep(10_000);
String roleArn = iam.getRole(r -> r.roleName(roleName)).role().arn();
// Connect Cloudwatch to Kinesis
String filterName = "test-filter";
cloudWatch.putSubscriptionFilter(r -> r.logGroupName(logGroup)
.filterName(filterName)
.filterPattern("")
.roleArn(roleArn)
.destinationArn(streamArn));
// Create a Kinesis consumer
String consumerName = "test-consumer";
kinesisClient.registerStreamConsumer(r -> r.streamARN(streamArn).consumerName(consumerName)).join();
// Connect to Kinesis and asynchronously log all information our consumer receives from CloudWatch
String consumerArn = kinesisClient.describeStreamConsumer(r -> r.streamARN(streamArn)
.consumerName(consumerName))
.join().consumerDescription().consumerARN();
SubscribeToShardResponseHandler.Visitor logEventVisitor = new SubscribeToShardResponseHandler.Visitor() {
@Override
public void visit(SubscribeToShardEvent event) {
String records = event.records().stream()
.map(r -> {
try (InputStream is = new GZIPInputStream(r.data().asInputStream())) {
return IoUtils.toUtf8String(is);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
})
.collect(Collectors.joining("\n"));
if (!records.isEmpty()) {
System.out.println("Log Events from Kinesis:\n" + records);
}
}
};
CompletableFuture<Void> subscriptionFuture =
kinesisClient.subscribeToShard(r -> r.shardId(shardId)
.consumerARN(consumerArn)
.startingPosition(p -> p.type(ShardIteratorType.LATEST)),
SubscribeToShardResponseHandler.builder().subscriber(logEventVisitor).build());
// Send some data to CloudWatch, to make sure all of this is working
String sequenceToken = null;
for (int i = 0; i < 10; ++i) {
String message = "Stuff " + i;
String requestSequenceToken = sequenceToken;
sequenceToken =
cloudWatch.putLogEvents(r -> r.logGroupName(logGroup)
.logStreamName(logStream)
.logEvents(e -> e.timestamp(Instant.now().toEpochMilli())
.message(message))
.sequenceToken(requestSequenceToken))
.nextSequenceToken();
}
// Wait a little while, just so we can see all of the results
Thread.sleep(10_000);
// Stop listening
subscriptionFuture.cancel(false);
}
I'd recommend using the Kinesis Client Library instead of the low-level kinesis client (as above), but otherwise this is one working method. I'm not sure if it's the best method, though. For that, the forums or stack overflow might be able to help.
LMK if there's anything else we can help with.
@millems Thank you so much for the responses. We had used Kinesis in the past, and it works well. If this completely ran in our AWS account, we would definitely go that route. However, our application is run by many, many end users/companies. Requiring the Kinesis setup for all of them adds a lot of complexity (and extra cost). One of our major goals for this project/feature is to eliminate the need for Kinesis to simplify the setup.
Another option: Filter and read by past time ranges. The only issue is if new messages come in with an earlier timestamp for a range that has already been read. Is there any way to filter by ingested time instead?
Also, is there any way to filter by event id? Is the event id always sequential chronologically? Another option would be to go back and read past messages until I find an event that is beyond the id of the last event I read.
We've definitely pushed past my knowledge on cloudwatch logs. The AWS forums or stack overflow is more knowledgeable on these types of issues. Could you ask there and cross-link with this issue so that people who come here from google can follow up?
@millems after looking at this very closely with our team, we have confirmed that CloudWatch does not support real time streaming and tailing. Kinesis (via Log Group subscription) or a Lambda function with a CloudWatch Logs trigger are the only ways we could find to get realtime logs from CloudWatch. So, to get guaranteed streaming with no missed messages, one of these services must be used.
The core of the issue is that the CloudWatch API and CLI only support filtering of logs by timestamp (and not ingestion time). The timestamp is user-definable. Even if we were polling for recent time ranges to simulate streaming/tailing, It's possible that a set of logs with timestamps older than the current polling time range are delivered to the log group. These messages would be lost.
There are third party tools available that use the AWS API to provide tailing functionality (such as saw, awslogs, and cw). These tools are great, and many users are happy with them. These tools use an event id or timestamp cache to not read past messages. Unfortunately, this does not always solve the delayed log message (with older timestamp issue) for our use case. Nevertheless, we have a solid workaround with Kinesis. Thanks for the sample code earlier.
Just wanted to circle back around with the result from our investigation. Thanks again for the help.
Thanks for closing the loop on this!
Question: What is the best way to monitor a CloudWatch Log Group for new log messages, then retrieve those messages when they are written to the log group?
Is the CloudWatchLogsClient.getLogEvents() method the best way to do this? (as my test code shows here https://github.com/danotorrey/aws-cloudwatch/blob/df7fe479c0a3211a8cbbcc22d5005b468f73acd1/src/main/java/CloudWatchReader.java#L83). I believe this approach would require periodic polling to check for new log messages.
I mostly wanted to check if the functionality exists in the SDK where a callback subscription can be established, which would be called when new logs are available. I had heard somewhere that there was something new in v2 of this SDK that helps with this, but I don't know if that was accurate.
Your Environment