SigNoz / signoz

SigNoz is an open-source observability platform native to OpenTelemetry with logs, traces and metrics in a single application. An open-source alternative to DataDog, NewRelic, etc. 🔥 🖥. 👉 Open source Application Performance Monitoring (APM) & Observability tool
https://signoz.io
Other
17.66k stars 1.1k forks source link

Document collecting Azure resource logs #4077

Open rlaveycal opened 7 months ago

rlaveycal commented 7 months ago

Capturing Azure resource logs is a common pattern (Diagnostic settings -> Event Hub -> Azure Function).

This is from Solar Winds and describes the process.

This is an adaptation of their function code to forward logs to SigNoz Cloud

/* Copyright 2022 SolarWinds Worldwide, LLC. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations
* under the License.
*/

// Derived From https://github.com/solarwinds/cloud-observability-integration/blob/master/azure/logs/template/run.csx

#r "Azure.Messaging.EventHubs"
#r "System.Text.Json"
#r "System.Memory.Data"

using System;
using System.Text;
using Azure.Messaging.EventHubs;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text.Json;

static var _uri = System.Environment.GetEnvironmentVariable("SIGNOZ_OTEL_ENDPOINT", EnvironmentVariableTarget.Process) ?? throw new InvalidOperationException("SIGNOZ_OTEL_ENDPOINT is not defined");
static var _token = System.Environment.GetEnvironmentVariable("SIGNOZ_API_KEY", EnvironmentVariableTarget.Process) ?? throw new InvalidOperationException("SIGNOZ_API_KEY is not defined");

static async Task PostLog(string data, ILogger logger)
{
    using (HttpClient client = new HttpClient())
    {
        client.DefaultRequestVersion = new Version(2, 0);
        client.DefaultRequestHeaders.Add("signoz-access-token", _token);

        var content = new StringContent($"[{data}]", Encoding.UTF8, "application/json");
        //logger.LogDebug($"Sending: {data}");
        var response = await client.PostAsync(_uri, content);

        response.EnsureSuccessStatusCode();
    }
}

static void ExtractProperty(string recordAttribute, JsonElement record, string targetAttribute, Dictionary<string, object> target)
{
    if (record.TryGetProperty(recordAttribute, out JsonElement element))
    {
        target[targetAttribute] = element.GetString();
    }
}

static async Task ProcessLogRecord(JsonElement record, ILogger logger)
{
    var attributes = new Dictionary<string, object>();
    var resources = new Dictionary<string, object>()
    {
        { "cloud.provider", "azure" }
    };
    // https://signoz.io/docs/userguide/send-logs-http/
    var payload = new Dictionary<string, object>()
    {
        { "severity_text", "info" },
        { "body", record.ToString() },
        { "attributes", attributes },
        { "resources", resources }
    };

    if (record.TryGetProperty("time", out JsonElement time))
    {
        var dateTime = DateTime.Parse(time.GetString());
        var unixTimestamp = ((DateTimeOffset)dateTime).ToUnixTimeMilliseconds() * 1000000;
        payload.Add("timestamp", unixTimestamp);
    }

    ExtractProperty("location", record, "cloud.region", attributes);
    ExtractProperty("operationName", record, "operation", attributes);
    ExtractProperty("category", record, "category", attributes);

    ExtractProperty("level", record, "severity_text", payload);

    if (record.TryGetProperty("resourceId", out JsonElement resourceId))
    {
        var id = resourceId.GetString().ToLower();
        resources.Add("service.instance.id", id);

        var parts = id.Split("/");
        resources.Add("cloud.account.id", parts[2]);
        resources.Add("azure.resourcegroup.name", parts[4]);
        attributes.Add("cloud.platform", parts[6]);
        resources.Add("service.name", parts[8]);

        if (parts[6] == "microsoft.containerservice")
        {
            resources.Add("k8s.cluster.name", parts[8]);
            resources.Add("k8s.deployment.name", attributes["category"]);
        }
    }

    if (record.TryGetProperty("properties", out JsonElement properties))
    {
        ExtractProperty("pod", properties, "k8s.pod.name", resources);

        // set a "message" resource with the inner message
        ExtractProperty("message", properties, "message", resources);
        ExtractProperty("log", properties, "message", resources);
    }

    var data = JsonSerializer.Serialize(payload);
    await PostLog(data, logger);
}

public static async Task Run(EventData[] events, ILogger logger)
{
    var exceptions = new List<Exception>();

    foreach (EventData eventData in events)
    {
        try
        {
            string messageBody = Encoding.UTF8.GetString(eventData.EventBody);
//            logger.LogDebug($"C# Event Hub trigger function processed a message: {messageBody}");

            var log = JsonSerializer.Deserialize<dynamic>(messageBody);

            if (log.TryGetProperty("records", out JsonElement recordsElement))
            {
                var records = recordsElement.EnumerateArray();
                foreach (var record in records)
                {
                    await ProcessLogRecord(record, logger);
                }
            }
            else
            {
                await ProcessLogRecord(log, logger);
            }
        }
        catch (Exception e)
        {
            // We need to keep processing the rest of the batch - capture this exception and continue.
            // Also, consider capturing details of the message that failed processing so it can be processed again later.
            exceptions.Add(e);
        }
    }

    // Once processing of the batch is complete, if any messages in the batch failed processing throw an exception so that there is a record of the failure.

    if (exceptions.Count > 1)
        throw new AggregateException(exceptions);

    if (exceptions.Count == 1)
        throw exceptions.Single();
}
welcome[bot] commented 7 months ago

Thanks for opening this issue. A team member should give feedback soon. In the meantime, feel free to check out the contributing guidelines.

rlaveycal commented 7 months ago

There's also a receiver that can pull data from the event hub : https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/azureeventhubreceiver