opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.06k stars 1.67k forks source link

[RFC] OpenSearch Tracing Collector #7352

Open khushbr opened 1 year ago

khushbr commented 1 year ago

Is your feature request related to a problem? Please describe. https://github.com/opensearch-project/OpenSearch/issues/1061

Introduction

Colloquially Tracking/Traceability, Distributed Tracing is the ability to trace a request end-to-end in the system to get a complete view of the request execution, With respect to OpenSearch: From Coordinator, fanning to all the respective nodes with primary/replica shard and aggregating the result back on the coordinator. The supported use-cases will include Bad Query Debugging, Critical Path Analysis and A/B Testing.

The Distributed Tracing system will consist of 2 major components: Tracing Framework(details at https://github.com/opensearch-project/OpenSearch/issues/6750) and Telemetry Collector. The Tracing Framework will instrument and collect (node)local and (cluster-wide)distributed traces for OpenSearch search, index workloads and background jobs etc. The OpenSearch Telemetry Collector will offer a vendor-agnostic implementation to read, process and export telemetry (Initially trace, later Performance Analyzer metrics as well) data to stores like Prometheus, Jaeger, AWS XRay etc.


Proposed Implementation

Components

The Collector Agent on high level will have 3 main components. A user, based on their specific use-case, can configure the individual component in the collector pipeline yaml.

  1. Reader: Reads the data written by the Receiver component of tracing framework, the default implementation will be poll-based - the reader periodically polls the data from the shared memory location.
  2. Processor: Performs pre-processing - correlations, aggregations, filtering, and sampling before sending the data to the store. Additionally, the Processor also performs the house-keeping operations like Memory Circuit Breaking, Data Batching.
  3. Exporter: Transforms the OpenSearch trace data into the configured data-store’s format, encrypts and sends it over the network to the data-store(s).

    Packaging

    The OpenSearch Collector will be a distribution of Otel Collector, curated for OpenSearch specific use-case. The Collector Agent will be designed as a Golang Binary, running as a side-car process on the same node to ensure process level resource isolation and no over-the-network cost. It will run as a custom user/RBAC role with limited privileged access to network, specific Disk Locations etc.

    High Level Design

    image

    Sample Pipeline Configuration:

    
    receivers:
    opensearch:                     
    trace_read_interval: 1m            
    trace_deletion_interval: 1m       
    tmps_location: /dev/shm/tracing

extensions: sigv4auth: assume_role: arn: "arn:aws:iam::123456789:role/distributed-tracing-NodeInstanceRole" sts_region: "eu-west-1" oauth2client: client_id: agent client_secret: Dl76n1p8P2SEdqhcyfcqtmNJi8Kf5LG3 token_url: http://localhost:8080/auth/realms/opentelemetry/protocol/openid-connect/token tls: insecure: true ca_file: ca.pem cert_file: cert.pem key_file: key.pem timeout: 2s

exporter: awsxray: resource_arn: "arn:aws:ec2:eu-west-1:123456789:instance/i-293hiuhe0u" indexed_attributes: [ "TraceID", "ShardID"] aws_log_groups: ["group1", "group2"] compression: zstd auth: authenticator: sigv4auth jaeger: auth: authenticator: oauth2client

service: pipelines: traces/agent: receivers: [opensearch] processors: [memory_limiter, batch] exporters: [jaeger, awsxray]



 #### 1. Reader
Borrowing from current[ Performance Analyzer-RCA](https://github.com/opensearch-project/performance-analyzer-rca/blob/main/docs/rfc-rca.pdf) inter-process communication model,  `/dev/shm` (also known as [tmpfs](https://en.wikipedia.org/wiki/Tmpfs)) - A [shared memory](https://en.wikipedia.org/wiki/Shared_memory) implementation is used to write and read the tracing data. This Shared Memory Write/Read model works without any need for explicit synchronization and locking between the writer and reader. However, it involves the risk of Disk Read Congestion - the polling interval will be tuned for both less disk access frequency and low memory. 
1.  Receiver in Tracing Framework atomically writes the tracing data snapshots to tmpfs location at regular interval. 
2. Reader scans the shared memory directory for updates every X seconds (TBD Benchmark), reads the data from the snapshot files, deletes the file in shared memory and passes the data stream to the processor(s). 

 #### 2. Processor
Following the plug-and-play model, Users can configure ‘N’ processor types depending on their use-case. By default, only Memory Limiter and Batch processor will be enabled. The Collector will support dynamic turning on/off of the individual processor. The data will flow through the processors sequentially, strictly in the order defined, before reaching the exporter. The following processor types will be supported in the initial release:

* [Mandatory] **Memory Limiter Processor**: The memory limiter processor is meant to prevent out of memory(OOM) issues. The amount and type of trace data processed in the tracing pipeline will be dependent on the TPS, shards and request type etc on cluster.  
*  **Data Aggregation**: Compiles the data from Long-running queries and tasks such as Background job. In the interest of memory, the Tracing Framework will flush out the background job traces promptly, and they will be aggregated on the collector end. The trace data is enriched with data collected from Performance Analyzer to build the resource usage view across CPU, Memory, Disk I/O etc. 
* **Data Analysis**: Performs short-term analysis on data flowing through the Node. The MVP use-case are: 1/ TopK Resource Consumer queries (tasks ?) on Co-ordinator 2/ TopK Long Running queries on Co-ordinator , and 3/ Moving average of resource consumption on Co-ordinator. These aggregate metric will be used for real-time debugging and can feed into the live, node-level Search Resiliency feature in the future.
* [Mandatory] **Batcher**: The batch processor accepts spans, metrics, or logs and places them into batches. Batching helps better compress the data and reduce the number of outgoing connections required to transmit the data. 

 #### 3. Exporter
The Exporter is the final step in the Collector pipeline; it translates the OpenSearch trace format into the data-store format, and the http/grpc client in the exporter writes data to the data-store. The initial release will support OpenSearch, Prometheus and Jaeger exporter.

 ### Telemetry Data Store
Users can configure the Agent to work with their Telemetry Data Stores. A Time-series Database which optimizes space with built-in data folding and compression is highly recommended as a Data-Store. The telemetry data-store should satisfy the following requirements:
    *  Must provide Correlational Analytics, Aggregation, Filtering functionality - required for building the global view of the cluster and also to serve Visualization. 
    * Visualization - Run Visualizations on top of the data-store, Example - Graphana, OpenSearch Dashboard.
    * Support multiple telemetry data types (metrics, stats, trace and logs)
    * Archival upto 30 days - configurable and depending on the use-case scenario.

***
How can you help?
Any feedback on the overall proposal is welcome. If you have specific suggestions on:
1.  Packaging of Collector as an Agent - Separate golang process, running in a side-car
2. Processors type: Any other processor types we can include in the Collector
3. Sampling strategy: Ideally, the system should send default-level trace data to the telemetry data-store at all times in steady state and with higher granularity, on-demand during an event. The control for down-sampling will be built at the instrumentation stage in the Tracing Framework but do you have any use-case that require Tail-based sampling in the Collector ? 
khushbr commented 1 year ago

@elfisher @reta @nknize @dblock @Bukhtawar @shwetathareja @backslasht @rishabhmaurya

khushbr commented 1 year ago

I explored the alternative Communication model for the Receiver(Tracing Framework) and the Reader(Collector) as mentioned in the below table:

  Shared Memory Pipes (Anonymous, Named) Message Queues IPC Socket
Overview Kernel level, shared memory can be simultaneously accessed by multiple programs with an intent to provide communication among them or avoid redundant copies. OS-level, half(Anonymouns) or full(Named) duplex method. Based on shared memory. Asynchronous point-to-point model. Messages are stored on the queue until they are processed and deleted. Each message is processed only once, by a single consumer. Kernel level, sockets use the file system as their address space. Processes reference a domain socket as an inode, and multiple processes can communicate with one socket
Implementation The Receiver writes the trace data at fixed interval to /dev/shm (also called tmpfs), virtual memory filesystem. The Reader scans the filesystem every Nth interval. This removed the need for any explicit synchronization. The Reader creates the pipe server and creates pipe with write permission for sever and read for the client. The Receiver process creates the pipe client and streams the trace data to server. Within Node - Receiver defines message structures and creates message queue (a linked list stored in kernel). The receiver periodically publishes the message to the queue, which is recieved by the Reader.Distributed/Outside the Node - SQS, Kafka etc The Receiver sets up a Unix Socket server and writes the trace data to the stream socket. The Reader connects and reads the data from the listenting socket.
Language and OS Support Unix and Microsoft windows Unix and Microsoft windows Unix and Microsoft windows(MSMQ). Unix and Microsoft windows
Security & Access Control subject to file system permission Can be secured with ACL Access defined via the common Key used in message queue creation subject to file system permission
Benefits Fastest - current PA /RCA model is lock-free. The PA(Writer) writes the data at 5s granularity to the shared memory and the RCA(Reader) reads the (t-5) snapshot of data. Low-level, thus fast. Asynchronous. Since queue provide data persistence, have better Fault Tolerance. Bi-directional. Can be Synchronous or Aysnchronous
Limitations Since tmpfs lives completely in the page cache and on swap; On systems without swap space, or where swap space is running low, tmpfs may consume large amounts of memory.Volatile Storage, in case the node undergoes reboot, the data in tmpfs will be lost. Out-of-the box supports Synchronous operations and requires interprocess synchronization. Less efficient for larger data sets The OpenSearch process (Receiver) has additional overhead of handling connection and running the server.

The recommended approach from our end is Shared Memory because of following reasons:

Sizing: The shared memory (/dev/shm) usage depends on the number of spans generate on the node, which in turn is a factor of user workload & number of shards allocated on the node. The default size will be 10% of the total RAM memory, this setting will be configurable (If usage exceeds this capacity, traces will not be written to the shared memory). The implementation will ensure that on Agent crash/ shutdown, all the data files in shared memory are cleaned up.

Access Permission: mode, uid and gid will be used at the mount time of Virtual FileSystem. The volume will be encrypted to secure the trace data, which will consist of : performance metrics(CPU, JVM, Latency), metadata(span_id, trace_id), and Cluster Information (IndexName, Shard ID, Node ID)

lewis262626 commented 1 year ago

Why not just add your own opensearch receiver? Instead of creating your own collector

reta commented 1 year ago

Thanks @khushbr for putting this together , the overall solution looks overly complicated to me, so please couple of questions.

1) Receiver(Tracing Framework) and the Reader(Collector) - why do we need that? The simplest (and probably easiest) way is to have async reporter that collects the traces/spans in batches and flushes them to the collector periodically (push model essentially).

2) The OpenSearch Collector will be a distribution of Otel Collector, curated for OpenSearch specific use-case - why do we need this curation? The reporter could flush the spans/traces in whatever format we want to support (the OTEL being the default one). There are tons of different formats the data could be exported to out of the box, I don't think building translation layers is what we should spend our time on.

Bukhtawar commented 1 year ago

Thanks @khushbr for the doc I somehow resonate @reta's thoughts, whats the point of having two-levels of readers one that reads off in-memory buffer and other that reads off shared memory. The listener as you mention is async anyways so I don't see a performance impact unless the second reader is expected to aggregate data from in-memory buffer and then write to shared memory.

The processor component, would this be modelled as a part of PerformanceAnalyser agent or could this be pushed out to components hosted externally like logstash to transform/enrich the data

khushbr commented 1 year ago

Thank you for the feedback @lewis262626 and @reta. For some additional context, the proposed core tenets for OpenSearch Distributed Tracing feature (Instrumentation Framework and Collector) are:

  1. Low memory footprint 1.1. Collector is running as an Agent in side-car on the node, and thus can be allocated only limited memory and CPU. 1.2. The trace data should be flushed out of the core process ASAP and written to a socket/disk/shared memory location - which will be read by the Collector Agent. The implementation should avoid adding any clients, blocking threads or introduce any resource contention on disk, memory and CPU.
  2. Pluggability(support for multiple data formats/store) - Users can use Exporters from OpenSearch Collector or any other collector distros (see opentelemetry-collector-contrib to export the trace data out of the box in their required format.

The customization proposed is at the Collection and Processing layer. The first is discussed at https://github.com/opensearch-project/OpenSearch/issues/7352#issuecomment-1531958525 in regard to Shared Memory. Let me elaborate on the processing part. In OpenSearch, there are long-running background tasks like peer recovery, and it would be expensive to persist trace data for such tasks in the core processes. Instead, this post-processing can be delegated to the Collector. In the future, Collector can support other use-cases, such as topK expensive queries in last hour - which can be plugged into the task API and other on-node systems.

Thoughts?

reta commented 1 year ago

@khushbr thanks @khushbr

1, Low memory footprint

I would argue that the design you are suggesting is having considerably larger memory footprint than the simplified version:

Pluggability(support for multiple data formats/store) - Users can use Exporters from OpenSearch Collector or any other collector distros (see opentelemetry-collector-contrib to export the trace data out of the box in their required format.

This is the place where it would great to see opensearchexporter

Memory. Let me elaborate on the processing part. In OpenSearch, there are long-running background tasks like peer recovery, and it would be expensive to persist trace data for such tasks in the core processes.

I would argue collecting whole traces in memory is not sustainable. The traces / spans could (and probably should) be reconciled on query level (with the data available at that moment).

Collector can support other use-cases, such as topK expensive queries in last hour - which can be plugged into the task API and other on-node systems.

I would argue this is not the responsibility of the collector: once the traces are stored, it is easy to answer such questions at query time, or any other questions that may come up in the future. The role of the collector (in my view) is as simple as receive the traces / spans, possible augment with some metadata, and flush it to the storage.

khushbr commented 1 year ago

Hello @reta ! Reviving this thread.

Picking from our last conversation,

I would argue collecting whole traces in memory is not sustainable. The traces / spans could (and probably should) be reconciled on query level (with the data available at that moment).

Agreed. This requirement has been updated - Now, the trace data will be reconciled with supplementary, resource usage data at the query time.

I would argue this is not the responsibility of the collector: once the traces are stored, it is easy to answer such questions at query time, or any other questions that may come up in the future. The role of the collector (in my view) is as simple as receive the traces / spans, possible augment with some metadata, and flush it to the storage.

Agreed. I thought about this more, and it makes sense to delegate the responsibility of aggregation and analysis to the DB layer. This can come out-of-the-box with stores like Prometheus + Grafana or can be custom-built (as per the use-case) with a thin layer on top of data store.

Coming back to the memory question and if we need a Collector Agent. The alternatives : Writer periodically flushes data directly to datastore, over the network has drawbacks:

I see Collector Agent with shared memory (/tmpfs) as following Separation of Concerns principle and offering superior IPC b/w the writer (Instrumentation code) and the Reader (Collector). In the long term, Collector can be the single funnel through which all the telemetry data (traces, service metrics, performance analyzer metrics, slow logs) is written to the user Observability data-store for visualization and monitoring.

Let me know your thoughts.

reta commented 1 year ago

Thanks @khushbr

Coming back to the memory question and if we need a Collector Agent. The alternatives : Writer periodically flushes data directly to datastore, over the network has drawbacks:

Agree, I think the Otel has the vendor agnostic Collector implementation, I don't think we should not have one, I think we should not implement one ourselves.

[1] https://opentelemetry.io/docs/collector/

khushbr commented 1 year ago

@reta Agreed. The proposal here is to create a distribution of Otel Collector - Otel Framework with curated set of processors and exporters relevant to OpenSearch, with custom Receiever/Reader logic.

peternied commented 11 months ago

@khushbr This feature sounds like it has a large intersection with the existing Audit Logs feature in the Security Plugin [1]; how do you envision the Audit Log be impacted by this change, or do you think there are components of that framework that should be reused?

reta commented 11 months ago

@peternied I am afraid this feature serves different purpose. Audit Log are always on, access driven triggers (please correct me if I am wrong), where tracing is purely optional infrastructure component designed to help troubleshooting issues in the distributed systems.