opensearch-project / opensearch-catalog

The OpenSearch Catalog is designed to make it easier for developers and community to contribute, search and install artifacts like plugins, visualization dashboards, ingestion to visualization content packs (data pipeline configurations, normalization, ingestion, dashboards).
Apache License 2.0
21 stars 19 forks source link

[Integration-RFC] Flint Integration Planning And Adaptation Tutorial #144

Open YANG-DB opened 7 months ago

YANG-DB commented 7 months ago

Flint Integration Planning And Adaptation Tutorial

The next tutorial is intended to support a general procedure template for integration developers to be able to adapt for transforming their OpenSearch index based integration into a S3 flint based integration.

Ingestion Tools

The initial step for an integration to become Flint (S3) compatible is to understand the ingestion policy. Ingestion can be achieved using many tools including:

AWS Data Firehose: This is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3. It's suitable for both proprietary API logs (by sending data to Firehose directly through its API) and standard logs. For AWS ELB logs or Apache logs, you'd typically stream these to Firehose using a log agent or AWS Lambda.

Amazon Kinesis Agent: A pre-built Java application that offers an easy way to collect and send data to AWS services like Amazon Kinesis Streams, Kinesis Data Firehose, or CloudWatch Logs. You can configure it to monitor log files for various services, including Apache HTTP Server, and forward them to S3.

FluentBit / Data-Prepper Both are open-source data collectors for unifying data collection and consumption for better use and understanding of data. They can be used to collect data from different sources, transform it, and send it to various destinations, including S3. They are highly flexible and can work with proprietary log formats as well as common ones like Apache logs.

AWS Lambda: For more custom ingestion logic, such as preprocessing or filtering before storing in S3, AWS Lambda can be used. Lambda can be triggered by various AWS services (like S3 uploads, CloudWatch Logs, API Gateway for proprietary APIs) to process and ingest logs into S3.

CloudWatch Logs: While primarily a monitoring and observability service, CloudWatch Logs can be configured to route your logs to S3 using a subscription filter and AWS Lambda or directly to S3 for AWS service logs like ELB. Custom API Solutions: For proprietary APIs, a custom application that ingests logs via API and then uploads them to S3 might be necessary. This can be hosted on EC2 or run as a containerized application on ECS or EKS.

Log Storage Format

The critical step is determining the optimal storage format for your data within Amazon S3.

Raw / CSV

Parquet

JSON

GZIP / ZIP

Table Definition Statement

Once the ingestion process has established and the log format was selected, a table definition is needed to allow queries to run on top of S3 using the build in catalog - wherever it is Hive or Glue.

It effectively maps the data stored in S3 to a structured format that query services can understand, allowing for complex analytical queries to be executed efficiently. This includes column names, data types, and potentially partition keys, which are critical for optimizing query performance.

Using AWS Glue Data Catalog

Integrating with Hive Metastore

Automated Schema Detection and Table Creation

In the context of Flint (S3) based integrations, manually defining table schemas for large or complex datasets can be cumbersome and prone to errors.

AWS provides a powerful tool, the AWS Glue Crawler, to automate the process of schema detection and table creation, significantly simplifying data management tasks.

AWS Glue Crawler

Considerations for Using AWS Glue Crawler

Best Practices for Automated Schema Detection

Projections and materialized views table definitions

Once a table is defined, it can now used as the base of creating multiple data projections and Materialized views that reflects different aspects of the data and allow both to visualize and to summarize data in an effective and efficient way.

There are many strategists for this phase and we cant review all of them here. For better understanding and selecting a projection strategy the following considerations are needed to be reviewed:

Each direct query will include a list of partitioned fields as part of its where clause

Each aggregation query will include a list of partitioned fields as part of its group-by clause

Each aggregation query will include a list of core dimensions (fields) as part of its group-by clause

The additional dimensions in the aggregation queries are meant to allow simple filtering and grouping without the need to fetch additional data


Index & Dashboard Naming Conventions

Index name should represent the integration source:

Live queries - should have a live phrase indicating the purpose of this MV query including the time frame this query works on:

Each dashboard / visualization should be aware of the MV index naming convention and have its index pattern take this into account during the integration installation process.

When a visualization uses this MV query projection it should state it in its description:

Aggregated queries - should have agg phrase indicating the purpose of this MV query including the time frame this query works on and the aggregated type and dimensions :

When a visualization uses this MV query projection it should state it in its description:

MV naming Parts:

$integration_id$_$projection-type$_$timeframe$_$projected_fields$_version

Another fields important for future considerations

Live stream view:

The live stream view is a direct copy of the most recent data stream (weekly or daily depending on the resolution) According to the partition nature of the data, the live stream should have the appropriate where clause that gets the correct partition in addition to the time based live stream specification

WHERE 
    ((`year` = 'StartYear' AND `month` >= 'StartMonth' AND `day` >= 'StartDay') OR
     (`year` = 'EndYear' AND `month` <= 'EndMonth' AND `day` <= 'EndDay'))

Aggregation view of Numeric field:

According to the domain this integration is covering, there will be numerous aggregations summaries of numeric data points such as amount of requests| duration of events | average of bytes and so on. Each such aggregation is mostly time based and has the following time bucket strategy: - Daily - Hourly - Weekly

Bysupporting multi-scale time based aggregation we can support multiple granular resolution while preserving efficient storage and compute processes .

The following is a 60 minute MV query aggregation summary of total bytes and packets. In addition It adds the following dimensions for filters


-- One Hour Aggregation MV of VPC connections / bytes / packets
CREATE MATERIALIZED VIEW IF NOT EXISTS vpcflow_mview_60_min_connections AS
    SELECT
        date_trunc('hour', from_unixtime(start_time / 1000)) AS start_time,
        date_trunc('hour', from_unixtime(start_time / 1000)) + INTERVAL 1 HOUR AS end_time,

        status_code as `aws.vpc.status_code`,
        -- action as `aws.vpc.action`, (add to groupBy)

        connection_info.direction AS `aws.vpc.connection.direction`,
        src_endpoint.svc_name as `aws.vpc.pkt-src-aws-service`,
        dst_endpoint.svc_name as `aws.vpc.pkt-dst-aws-service`,

        -- vpc_id as `aws.vpc.vpc-id`,(add to groupBy)
        accountid as `aws.vpc.account-id`,    
        region as `aws.vpc.region`,

        COUNT(*) AS total_connections,
        SUM(CAST(IFNULL(traffic.bytes, 0) AS LONG)) AS total_bytes,
        SUM(CAST(IFNULL(traffic.packets, 0) AS LONG)) AS total_packets
    FROM
        `vpcflow-db`.vpc_flow
    GROUP BY
        date_trunc('hour', from_unixtime(start_time / 1000)), region, accountid, status_code,src_endpoint.svc_name, dst_endpoint.svc_name, connection_info.direction
    ORDER BY
        start_time
WITH (
  auto_refresh = false
)

Timely window summary of a grouping query

A time window of aggregated summary of a group of values is important to be able to identify trends and anomalies . They allow to formulate a baseline to compare with.

The following is a 60 minute MV query aggregation window summary of top destination IPs ordered by amount of sent bytes

-- One Hour Aggregation time window  of top IP dest by  bytes sum group by hourly 
CREATE MATERIALIZED VIEW IF NOT EXISTS vpcflow_mview_60_min_network_ip_bytes_window AS
WITH hourly_buckets AS (
  SELECT
    date_trunc('hour', from_unixtime(start_time / 1000)) AS hour_bucket,
    dst_endpoint.ip AS dstaddr,
    SUM(CAST(IFNULL(traffic.bytes, 0) AS LONG)) AS total_bytes
  FROM
    `vpcflow-db`.vpc_flow
  GROUP BY
    hour_bucket,
    dstaddr
),
ranked_addresses AS (
  SELECT
    CAST(hour_bucket  AS TIMESTAMP),
    dstaddr,
    total_bytes,
    RANK() OVER (PARTITION BY hour_bucket ORDER BY total_bytes DESC) AS bytes_rank
  FROM
    hourly_buckets
)
SELECT
  CAST(hour_bucket  AS TIMESTAMP),
  dstaddr,
  total_bytes
FROM
  ranked_addresses
WHERE
  bytes_rank <= 50
ORDER BY
  hour_bucket ASC,
  bytes_rank ASC
WITH (
  auto_refresh = false
);

SQL support and best practice

CAST(`FROM_UNIXTIME``(`time/ 1000) AS TIMESTAMP) AS `@timestamp`
date_trunc('hour', from_unixtime(start_time / 1000)) AS start_time,
date_trunc('hour', from_unixtime(start_time / 1000)) + INTERVAL 1 HOUR AS end_time,
  `CAST``(``IFNULL``(`src_endpoint.port,0) `AS LONG``)`` `
SUM(CAST(IFNULL(traffic.bytes / 1000, 0) AS LONG)) AS total_kbytes,
  CASE
        WHEN regexp(dst_endpoint.ip, '(10\\..*)|(192\\.168\\..*)|(172\\.1[6-9]\\..*)|(172\\.2[0-9]\\..*)|(172\\.3[0-1]\\.*)')
        THEN 'ingress'
        ELSE 'egress'
  END AS `aws.vpc.flow-direction`,
Swiddis commented 7 months ago

Index name should represent the integration source

It's worth noting that we depend on the existing convention in the code based on the user's table name, changing this would be a code change.

YANG-DB commented 7 months ago

@Swiddis yes - this should be changed ASAP

seankao-az commented 7 months ago

ASAP, as in making it to AOS 2.13 release? Is it feasible at this point

Swiddis commented 7 months ago

I don't think this is in scope for 2.13. It'd be a hefty change to get all these fields bundled in query construction, and we already depend on the old convention.