[APM] Internal Server Error returns on apm/traces/aggregated_critical request #178892

Open ablnk opened 6 months ago

ablnk commented 6 months ago

Version: Serverless project v 8.14.0 Stateful deployment v 8.14.0-SNAPSHOT

Description: POST /internal/apm/traces/aggregated_critical_path request return Internal Server Error.

Preconditions: I reproduced the issue having ~780k documents in APM data view within 15 minutes interval from 761 services.

Steps to reproduce:

  1. Go to Applications - Traces - Explorer.
  2. Ensure the date picker is set to at least 15 minutes interval.
  3. Click Aggregated critical path tab.

Expected behavior: Data presentation should be rendered.




    "statusCode": 500,
    "error": "Internal Server Error",
    "message": "{\"ok\":false,\"message\":\"The instance rejected the connection.\"}",
    "attributes": {
        "data": {},
        "_inspect": [
                "id": "get_aggregated_critical_path (/internal/apm/traces/aggregated_critical_path) a03c86a1-1fce-444d-8b11-44078efe4ce1",
                "json": {
                    "size": 0,
                    "track_total_hits": false,
                    "query": {
                        "bool": {
                            "filter": [
                                    "terms": {
                                        "processor.event": [
                            "must": [
                                    "bool": {
                                        "filter": [
                                                "terms": {
                                                    "trace.id": [
                                                "range": {
                                                    "@timestamp": {
                                                        "gte": 1710606359591,
                                                        "lte": 1710952859591,
                                                        "format": "epoch_millis"
                    "aggs": {
                        "critical_path": {
                            "scripted_metric": {
                                "params": {},
                                "init_script": {
                                    "source": "\n                state.eventsById = [:];\n                state.metadataByOperationId = [:];\n              "
                                "map_script": {
                                    "source": "\n                String toHash (def item) {\n                  long FNV_32_INIT = 0x811c9dc5L;\n                  long FNV_32_PRIME = 0x01000193L;\n                  char[] chars = item.toString().toCharArray();\n                  long rv = FNV_32_INIT;\n                  int len = chars.length;\n                  for(int i = 0; i < len; i++) {\n                      byte bt = (byte) chars[i];\n                      rv ^= bt;\n                      rv *= FNV_32_PRIME;\n                  }\n                  return rv.toString();\n                }\n                \n                def id;\n                double duration;\n                \n                def operationMetadata = [\n                  \"service.name\": doc['service.name'].value,\n                  \"processor.event\": doc['processor.event'].value,\n                  \"agent.name\": doc['agent.name'].value\n                ];\n\n                def isSpan = !doc['span.id'].empty && !doc['span.name'].empty;\n                \n                if (isSpan) {\n                  id = doc['span.id'].value;\n                  operationMetadata.put('span.name', doc['span.name'].value);\n                  if (!doc['span.type'].empty) {\n                    operationMetadata.put('span.type', doc['span.type'].value);\n                  }\n                  if (!doc['span.subtype'].empty) {\n                    operationMetadata.put('span.subtype', doc['span.subtype'].value);\n                  }\n                  duration = doc['span.duration.us'].value;\n                } else {\n                  id = doc['transaction.id'].value;\n                  operationMetadata.put('transaction.name', doc['transaction.name'].value);\n                  operationMetadata.put('transaction.type', doc['transaction.type'].value);\n                  duration = doc['transaction.duration.us'].value;\n                }\n                 \n                String operationId = toHash(operationMetadata);\n                \n                def map = [\n                  \"traceId\": doc['trace.id'].value,\n                  \"id\": id,\n                  \"parentId\": doc['parent.id'].empty ? null : doc['parent.id'].value,\n                  \"operationId\": operationId,\n                  \"timestamp\": doc['timestamp.us'].value,\n                  \"duration\": duration\n                ];\n                \n                if (state.metadataByOperationId[operationId] == null) {\n                  state.metadataByOperationId.put(operationId, operationMetadata);\n                }\n                state.eventsById.put(id, map);\n              "
                                "combine_script": {
                                    "source": "return state;"
                                "reduce_script": {
                                    "source": "\n                String toHash (def item) {\n                  long FNV_32_INIT = 0x811c9dc5L;\n                  long FNV_32_PRIME = 0x01000193L;\n                  char[] chars = item.toString().toCharArray();\n                  long rv = FNV_32_INIT;\n                  int len = chars.length;\n                  for(int i = 0; i < len; i++) {\n                      byte bt = (byte) chars[i];\n                      rv ^= bt;\n                      rv *= FNV_32_PRIME;\n                  }\n                  return rv.toString();\n                }\n                \n                def processEvent (def context, def event) {\n                  if (context.processedEvents[event.id] != null) {\n                    return context.processedEvents[event.id];\n                  }\n                  \n                  def processedEvent = [\n                    \"children\": []\n                  ];\n                  \n                  if(event.parentId != null) {\n                    def parent = context.events[event.parentId];\n                    if (parent == null) {\n                      return null;\n                    }\n                    def processedParent = processEvent(context, parent);\n                    if (processedParent == null) {\n                      return null;\n                    }\n                    processedParent.children.add(processedEvent);\n                  }\n                  \n                  context.processedEvents.put(event.id, processedEvent);\n                  \n                  processedEvent.putAll(event);\n\n                  if (context.params.serviceName != null && context.params.transactionName != null) {\n                    \n                    def metadata = context.metadata[event.operationId];\n                    \n                    if (metadata != null\n                      && context.params.serviceName == metadata['service.name']\n                      && metadata['transaction.name'] != null \n                      && context.params.transactionName == metadata['transaction.name']\n                    ) {\n                      context.entryTransactions.add(processedEvent);\n                    }\n\n                  } else if (event.parentId == null) {\n                    context.entryTransactions.add(processedEvent);\n                  }\n                  \n                  return processedEvent;\n                }\n                \n                double getClockSkew (def context, def item, def parent ) {\n                  if (parent == null) {\n                    return 0;\n                  }\n                  \n                  def processorEvent = context.metadata[item.operationId]['processor.event'];\n                  \n                  def isTransaction = processorEvent == 'transaction';\n                  \n                  if (!isTransaction) {\n                    return parent.skew;\n                  }\n                  \n                  double parentStart = parent.timestamp + parent.skew;\n                  double offsetStart = parentStart - item.timestamp;\n                  if (offsetStart > 0) {\n                    double latency = Math.round(Math.max(parent.duration - item.duration, 0) / 2);\n                    return offsetStart + latency;\n                  }\n                  \n                  return 0;\n                }\n                \n                void setOffsetAndSkew ( def context, def event, def parent, def startOfTrace ) {\n                  event.skew = getClockSkew(context, event, parent);\n                  event.offset = event.timestamp - startOfTrace;\n                  for(child in event.children) {\n                    setOffsetAndSkew(context, child, event, startOfTrace);\n                  }\n                  event.end = event.offset + event.skew + event.duration;\n                }\n                \n                void count ( def context, def nodeId, def duration ) {\n                  context.timeByNodeId[nodeId] = (context.timeByNodeId[nodeId] ?: 0) + duration;\n                }\n                \n                void scan ( def context, def item, def start, def end, def path ) {\n                  \n                  def nodeId = toHash(path);\n        \n                  def childNodes = context.nodes[nodeId] != null ? context.nodes[nodeId] : [];\n                  \n                  context.nodes[nodeId] = childNodes;\n                  \n                  context.operationIdByNodeId[nodeId] = item.operationId;\n                  \n                  if (item.children.size() == 0) {\n                    count(context, nodeId, end - start);\n                    return;\n                  }\n                  \n                  item.children.sort((a, b) -> {\n                    if (b.end === a.end) {\n                      return 0;\n                    }\n                    if (b.end > a.end) {\n                      return 1;\n                    }\n                    return -1;\n                  });\n                  \n                  def scanTime = end;\n                  \n                  for(child in item.children) {\n                    double normalizedChildStart = Math.max(child.offset + child.skew, start);\n                    double childEnd = child.offset + child.skew + child.duration;\n                    \n                    double normalizedChildEnd = Math.min(childEnd, scanTime);\n              \n                    def isOnCriticalPath = !(\n                      normalizedChildStart >= scanTime ||\n                      normalizedChildEnd < start ||\n                      childEnd > scanTime\n                    );\n                    \n                    if (!isOnCriticalPath) {\n                      continue;\n                    }\n                    \n                    def childPath = path.clone();\n                    \n                    childPath.add(child.operationId);\n                    \n                    def childId = toHash(childPath);\n                    \n                    if(!childNodes.contains(childId)) {\n                      childNodes.add(childId);\n                    }\n                    \n                    if (normalizedChildEnd < (scanTime - 1000)) {\n                      count(context, nodeId, scanTime - normalizedChildEnd); \n                    }\n                    \n                    scan(context, child, normalizedChildStart, childEnd, childPath);\n                    \n                    scanTime = normalizedChildStart;\n                  }\n                  \n                  if (scanTime > start) {\n                    count(context, nodeId, scanTime - start);\n                  }\n                  \n                }\n              \n                def events = [:];\n                def metadata = [:];\n                def processedEvents = [:];\n                def entryTransactions = [];\n                def timeByNodeId = [:];\n                def nodes = [:];\n                def rootNodes = [];\n                def operationIdByNodeId = [:];\n                \n                \n                def context = [\n                  \"events\": events,\n                  \"metadata\": metadata,\n                  \"processedEvents\": processedEvents,\n                  \"entryTransactions\": entryTransactions,\n                  \"timeByNodeId\": timeByNodeId,\n                  \"nodes\": nodes,\n                  \"operationIdByNodeId\": operationIdByNodeId,\n                  \"params\": params\n                ];\n              \n                for(state in states) {\n                  if (state.eventsById != null) {\n                    events.putAll(state.eventsById);\n                  }\n                  if (state.metadataByOperationId != null) {\n                    metadata.putAll(state.metadataByOperationId);\n                  }\n                }\n                \n                \n                for(def event: events.values()) {\n                  processEvent(context, event);\n                }\n                \n                for(transaction in context.entryTransactions) {\n                  transaction.skew = 0;\n                  transaction.offset = 0;\n                  setOffsetAndSkew(context, transaction, null, transaction.timestamp);\n                  \n                  def path = [];\n                  def parent = transaction;\n                  while (parent != null) {\n                    path.add(parent.operationId);\n                    if (parent.parentId == null) {\n                      break;\n                    }\n                    parent = context.processedEvents[parent.parentId];\n                  }\n\n                  Collections.reverse(path);\n\n                  def nodeId = toHash(path);\n                  \n                  scan(context, transaction, 0, transaction.duration, path);\n                  \n                  if (!rootNodes.contains(nodeId)) {\n                    rootNodes.add(nodeId);\n                  }\n                  \n                }\n                \n                return [\n                  \"timeByNodeId\": timeByNodeId,\n                  \"metadata\": metadata,\n                  \"nodes\": nodes,\n                  \"rootNodes\": rootNodes,\n                  \"operationIdByNodeId\": operationIdByNodeId\n                ];"
                "name": "get_aggregated_critical_path (/internal/apm/traces/aggregated_critical_path)",
                "response": {
                    "json": {
                        "name": "ResponseError",
                        "message": "{\"ok\":false,\"message\":\"The instance rejected the connection.\"}"
                "startTime": 1710780063428,
                "stats": {
                    "kibanaApiQueryParameters": {
                        "label": "Kibana API query parameters",
                        "description": "The query parameters used in the Kibana API request that initiated the Elasticsearch request.",
                        "value": "{\n  \"_inspect\": \"true\"\n}"
                    "kibanaApiRoute": {
                        "label": "Kibana API route",
                        "description": "The route of the Kibana API request that initiated the Elasticsearch request.",
                        "value": "POST /internal/apm/traces/aggregated_critical_path"
                    "indexPattern": {
                        "label": "Data view",
                        "value": [
                        "description": "The data view that connected to the Elasticsearch indices."
                "status": 2
chrisdistasio commented 5 months ago

I wonder if this is unique to serverless or if the same challenge exists in ESS? Can we determine this?

As a Tech Preview feature, we can prioritize this investigation lower than GA features like Service Map

dgieselaar commented 5 months ago

@chrisdistasio for context, the feature is built on a scripted_metric aggregation which we kind of expect to break down in some cases. The issues are similar to the service map, in the sense that we need to look at trace events and cannot use aggregated metrics, and thus its performance characteristics become unpredictable. We can build in some safe guards to get it to GA though. Happy to help out if needed.

ablnk commented 5 months ago

@chrisdistasio issue is not unique to serverless, just reproduced it stateful deployment too.

crespocarlos commented 5 months ago

Hey @dgieselaar what guardrails you have in mind for this?

crespocarlos commented 5 months ago

This might be related to #181790

paulb-elastic commented 4 months ago

Reiterating the points made above, Traces Explorer is in Tech Preview and this affects both serverless and stateful - as such, this is a lower priority.

dgieselaar commented 4 months ago

@paulb-elastic FWIW, the concerns around service maps were not "this breaks for our users", but "this can take down a cluster and page the ES team and it isn't their responsibility". I think the same applies here, but the risk is lower due the fact it is not enabled by default. I assume the ES team still wants a fix for this as well though.

neptunian commented 4 months ago

@chrisdistasio I think @dgieselaar 's comment above answers your question from earlier about whether or not there is currently a mechanism in elasticsearch to guard itself against being taken down when handling our requests and that we need to build for that on our end.

crespocarlos commented 3 months ago

I can't really think of an alternative other than rewriting the query not to use scripted_metrics aggregation. @dgieselaar @neptunian do you think the investigation conducted here https://github.com/elastic/kibana/issues/179229#issuecomment-2163872017 would help in this scenario too?

Perhaps the fact that it still in technical preview it also gives us more flexibility to rewrite this feature to not use scripted_metrics aggs, provided that there will be performance gains in doing so.

crespocarlos commented 3 months ago

I'm constantly experiencing another issue where, for the same time range, sometimes the data is returned by the server, while other times it keeps loading forever or returns empty.

paulb-elastic commented 3 months ago

As per the discussion with @chrisdistasio, this won't be tackled right now, but moved to the backlog