aws / aws-sdk-java-v2

The official AWS SDK for Java - Version 2
Apache License 2.0
2.18k stars 840 forks source link

Using MetricPublisher in AWS Lambda #2068

Open powerful23 opened 4 years ago

powerful23 commented 4 years ago

Hi AWS SDK team,

We want to publish the AWS SDK metrics in the lambda function following the guide: https://docs.aws.amazon.com/sdk-for-java/v2/developer-guide/configuration-metrics.html

We found there seems no way to manually flush the metrics without which the lambda function will lose part of the metrics data. If I am understanding correctly, the CloudWatchMetricsPublisher will only flush itself when closing and you can only setup the flush frequency when building the publisher. We want to know if there is way to manually flush the metrics.

Expected Behavior

Flush the metrics manually.

Current Behavior

Flush the metrics when closing the publisher.

Possible Solution

Make the flush method public.

Steps to Reproduce (for bugs)

Use MetricsPublisher in a lambda function.

Your Environment

debora-ito commented 4 years ago

Hi @powerful23, thank you for reaching out. We're aware of this issue with metrics in Lambda, but after some investigation we did not find a good way to prevent it. Currently there's no way to manually flush the metrics because this operation would be expensive.

I'm changing this to a feature request to improve the experience of the use of metrics in Lambda.

humanzz commented 4 years ago

Hi @debora-ito and @powerful23,

I was coming here for a similar feature request but rather than using the CloudWatchMetricsPublisher I was going to ask for an EMF-based metrics publisher.

I think that can be a solution to the problem above as I believe it should act in a manner very similar to the LoggingMetricPublisher and therefore might not need the async flushing behaviour required for CloudWatch client?

And I just realized there's an implementation of EMF suitable for Lambda at https://github.com/awslabs/aws-embedded-metrics-java

humanzz commented 3 years ago

I've implemented an EmfMetricPublisher using the aws-embedded-metrics-java in a project am working on.

I think the implementation can be generalized and should live somewhere public (not sure if here on aws-sdk-java-2 or if it requires its own repo) but am happy to contribute.

/**
 * A custom {@link MetricPublisher} that publishes Aws Service client metrics using CloudWatch EMF
 * It uses {@link MetricsClient}/{@link Metrics} to
 * 1. Publish metrics using EMF
 * 2. Ensure EMF entries have additional context e.g. Lambda requestId and X-Ray traceId
 *
 * See the following for context and for the list of metrics emitted
 * - https://docs.aws.amazon.com/sdk-for-java/v2/developer-guide/configuration-metrics-list.html
 * - https://github.com/aws/aws-sdk-java-v2/blob/master/docs/design/core/metrics/MetricsList.md
 * - {@link CoreMetric}, {@link software.amazon.awssdk.http.HttpMetric}
 *
 * Note: https://github.com/aws/aws-sdk-java-v2/issues/2068 suggests Aws Sdk v2 introduce its own
 * implementation so we should keep an eye in case that happens
 */
@Log4j2
@RequiredArgsConstructor
public class EmfMetricPublisher implements MetricPublisher {
    private static final String API_CALL = "ApiCall";
    private static final String API_CALL_ATTEMPT = "ApiCallAttempt";
    private static final String HTTP_CLIENT = "HttpClient";
    private static final Set<SdkMetric<?>> DIMENSIONS = ImmutableSet.of(CoreMetric.SERVICE_ID, CoreMetric.OPERATION_NAME);

    private final MetricsClient metricsClient;

    @Override
    public void publish(MetricCollection metricCollection) {
        if (!API_CALL.equals(metricCollection.name())) {
            log.warn("Unknown MetricCollection {} {}", metricCollection.name(), metricCollection);
            return;
        }
        Metrics metrics = metricsClient.newMetrics(dimensionSet(metricCollection));
        metricCollection.forEach(metricRecord -> addMetricsAndProperties(metrics, "", metricRecord));
        List<MetricCollection> children = metricCollection.children();
        for (int i = 0; i < children.size(); i++) {
            MetricCollection attemptCollection = children.get(i);
            if (!API_CALL_ATTEMPT.equals(attemptCollection.name())) {
                log.warn("Unknown Child MetricCollection {} {}", metricCollection.name(), metricCollection);
                continue;
            }
            for (MetricRecord<?> attemptRecord : attemptCollection) {
                // Prefix ApiAttempt metrics/properties with ApiAttempt<index>
                addMetricsAndProperties(metrics, API_CALL_ATTEMPT + i, attemptRecord);
                Optional<MetricCollection> httpCollection = attemptCollection.childrenWithName(HTTP_CLIENT).findFirst();
                if (httpCollection.isPresent()) {
                    for (MetricRecord<?> httpRecord : httpCollection.get()) {
                        // Prefix ApiAttempt's HttpClient metrics/properties with ApiAttempt<index>HttpClient
                        addMetricsAndProperties(metrics, API_CALL_ATTEMPT + i + HTTP_CLIENT, httpRecord);
                    }
                }
            }
        }
        metricsClient.publish(metrics);
    }

    @Override
    public void close() {
        // Do nothing, publish already emits the metrics to logs
    }

    /**
     * Create a {@link DimensionSet} of ServiceId and OperationName
     *
     * NOTE: Aws Sdk v2's CloudWatchMetricPublisher allows customizing dimensions but in this implementation
     * we fix the dimensions for simplicity
     */
    private DimensionSet dimensionSet(MetricCollection metricCollection) {
        DimensionSet dimensionSet = new DimensionSet();
        metricCollection.stream()
                .filter(metricRecord -> DIMENSIONS.contains(metricRecord.metric()))
                // Sort to ensure ServiceId comes before OperationName
                .sorted(Comparator.<MetricRecord<?>, String>comparing(metricRecord -> metricRecord.metric().name()).reversed())
                .forEach(metricRecord -> dimensionSet.addDimension(metricRecord.metric().name(), (String) metricRecord.value()));
        return dimensionSet;
    }

    /**
     * Map {@link MetricRecord} into the right metric/property type for {@link Metrics}
     */
    private void addMetricsAndProperties(Metrics metrics, String metricNamePrefix, MetricRecord<?> metricRecord) {
        Class<?> metricType = metricRecord.metric().valueClass();
        String metricName = metricNamePrefix + "" + metricRecord.metric().name();
        if (Duration.class.isAssignableFrom(metricType)) {
            Duration value = (Duration) metricRecord.value();
            metrics.putDuration(metricName, value);
        } else if (Number.class.isAssignableFrom(metricType)) {
            // Emit unitless numbers to support more than just counts
            Number value = (Number) metricRecord.value();
            metrics.putMetric(metricName, value.doubleValue());
        } else if (Boolean.class.isAssignableFrom(metricType)) {
            Boolean value = (Boolean) metricRecord.value();
            metrics.putBoolean(metricName, value);
        } else if (String.class.isAssignableFrom(metricType)) {
            String value = (String) metricRecord.value();
            // NOTE: This is not a metric, it's a property
            metrics.putProperty(metricName, value);
        }
    }
}

A couple of notes about the implementation above

akketcha commented 3 years ago

@humanzz - Am I correct in assuming that this publisher is synchronous and will therefore call System.out.println (https://github.com/awslabs/aws-embedded-metrics-java/blob/master/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/ConsoleSink.java) for the set of metrics associated with every API call?

This seems incredibly expensive.

Or is it that publish is called asynchronously? (If you have link to where publish is called, that'd be great)

humanzz commented 3 years ago

From a chat I had with @akketcha offline, here are some additional thoughts. Sharing them here for visibility

You’re absolutely right that every API call maps to a System.out.println which would incur its cost. Our use case is fine with this since our Lambda is invoked asynchronously to consume messages from SQS so it’s not really that latency sensitive.

The clients are actually calling publish synchronously… I think it’s all part of client codegen but looking at the generated code for DefaultStsClient.class for example you can see what’s happening in a sync client in the finally block below

@Override
    public AssumeRoleResponse assumeRole(AssumeRoleRequest assumeRoleRequest) throws MalformedPolicyDocumentException,
            PackedPolicyTooLargeException, RegionDisabledException, ExpiredTokenException, AwsServiceException,
            SdkClientException, StsException {
        HttpResponseHandler<AssumeRoleResponse> responseHandler = protocolFactory
                .createResponseHandler(AssumeRoleResponse::builder);
        HttpResponseHandler<AwsServiceException> errorResponseHandler = protocolFactory.createErrorResponseHandler();
        List<MetricPublisher> metricPublishers = resolveMetricPublishers(clientConfiguration, assumeRoleRequest
                .overrideConfiguration().orElse(null));
        MetricCollector apiCallMetricCollector = metricPublishers.isEmpty() ? NoOpMetricCollector.create() : MetricCollector
                .create("ApiCall");
        try {
            apiCallMetricCollector.reportMetric(CoreMetric.SERVICE_ID, "STS");
            apiCallMetricCollector.reportMetric(CoreMetric.OPERATION_NAME, "AssumeRole");
            return clientHandler
                    .execute(new ClientExecutionParams<AssumeRoleRequest, AssumeRoleResponse>().withOperationName("AssumeRole")
                            .withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler)
                            .withInput(assumeRoleRequest).withMetricCollector(apiCallMetricCollector)
                            .withMarshaller(new AssumeRoleRequestMarshaller(protocolFactory)));
        } finally {
            metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect()));
        }
    }

Trying to think what can be done to not incur all that cost…

  1. aws-embedded-metrics uses System.out.println for Lambda environment. I thought that maybe LambdaLogger is doing a better job with logging maybe batching log lines in some way and if so if aws-embedded-metrics can use that this would lead to improvements. Unfortunately from what I see LambdaLogger in the end does use System.out e.g. https://github.com/aws/aws-lambda-java-libs/blob/master/aws-lambda-java-log4j2/src/main/java/com/amazonaws/services/lambda/runtime/log4j2/LambdaAppender.java#L74 and https://github.com/aws/aws-lambda-java-libs/blob/master/aws-lambda-java-core/src/main/java/com/amazonaws/services/lambda/runtime/LambdaRuntime.java
  2. Another idea is to update the MetricPublisher to keep collecting the metrics to publish (MetricCollection) during a Lambda request potentially introduce a new method - similar to CloudWatchMetricPublisher’s flushMetrics - to force emitting/writing metrics. That method should be called in the end of the Lambda request… If the various collected MetricCollection objects (and the derived MetricLogger/MetricContext) can all be serialized and written using System.out methods all at once then we reduce the number of System.out calls to just one per Lambda request (or maybe we batch them if there’re limits to System.out). This approach would apply to the CloudWatchMetricPublisher as well and would solve this issue
  3. In general this is making me think that Lambda handler needs a concept of handler chains and there can be one handler that has access to the MetricPublisher and can ensure that metrics are flushed as opposed to CloudWatchMetricPublisher’s background thread.
humanzz commented 3 years ago

I just came across a related request at https://github.com/awslabs/aws-embedded-metrics-java/issues/29