bentoml / BentoML

The easiest way to serve AI apps and models - Build Model Inference APIs, Job queues, LLM apps, Multi-model pipelines, and much more!
https://bentoml.com
Apache License 2.0
7.11k stars 791 forks source link

bug: Unable to send monitoring logs to Elastic APM through OTLPHttpExporter #4919

Open ConsciousML opened 2 months ago

ConsciousML commented 2 months ago

Describe the bug

Hi folks,

I'm having a issue to push monitoring logs to Elastic APM through OTLPHttpExporter.

I'm able to send dummy logs with this standalone code and see them in Elastic APM:

import os
import logging
from dotenv import load_dotenv
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource

if __name__ == '__main__':
    load_dotenv()

    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    otel_exporter_otlp_endpoint = os.environ.get('OTEL_EXPORTER_OTLP_ENDPOINT') + "/v1/traces"
    otel_exporter_otlp_headers = os.environ.get('OTEL_EXPORTER_OTLP_HEADERS')

    key, value = otel_exporter_otlp_headers.split('=')
    otel_headers_dict = {key: value}

    exporter = OTLPSpanExporter(endpoint=otel_exporter_otlp_endpoint, headers=otel_headers_dict)

    resource_attributes = os.environ.get('OTEL_RESOURCE_ATTRIBUTES')

    key_value_pairs = resource_attributes.split(',')
    result_dict = {}

    for pair in key_value_pairs:
        key, value = pair.split('=')
        result_dict[key] = value

    otel_service_name = "reconciliation-ranker"
    resource_attributes = {
        "service.name": otel_service_name,
        "service.version": result_dict['service.version'],
        "deployment.environment": result_dict['deployment.environment'],
    }

    resource = Resource.create(resource_attributes)

    provider = TracerProvider(resource=resource)
    processor = BatchSpanProcessor(exporter)
    provider.add_span_processor(processor)

    trace.set_tracer_provider(provider)

    tracer = trace.get_tracer(otel_service_name)

    try:
        with tracer.start_as_current_span("test-span") as span:
            logger.info("Hello from OpenTelemetry!")
            span.set_attribute("custom.attribute", "test-value")

        provider.force_flush()

        logger.info("Traces sent to Elastic APM")
    except Exception as e:
        logger.error(f"An error occurred: {str(e)}")

    provider.shutdown()

I load an .env file in the following format:

OTEL_EXPORTER_OTLP_ENDPOINT=my_elastic_apm_url
OTEL_EXPORTER_OTLP_HEADERS="Authorization=Bearer my_token"
OTEL_METRICS_EXPORTER=otlp
OTEL_LOGS_EXPORTER=otlp
OTEL_RESOURCE_ATTRIBUTES="service.name=my-service,service.version=0.0.1,deployment.environment=production"

First, I tried to input the same arguments in my BentoML service.py:

class InferenceInput(BaseModel):  # pylint: disable=too-few-public-methods
    """Pydantic class for the inputs of the inference API method of the ReconciliationService"""

    site_ids: List[str] = Field(description='The IDs of the reporting site data to reconciliate.')
    provider_ids: List[str] = Field(
        description='The IDs of the reporting provider data to reconciliate.'
    )
    top_k: int = Field(default=sys.maxsize, description='Number of recommendations to return.')
    only_recommend: bool = Field(
        default=False, description='Whether to only perform recommendation and not auto-match.'
    )
    remove_matched: bool = Field(
        default=True,
        description='Whether to remove the auto-match data from the recommendations.',
    )

env_variables = load_service_env_vars()

key, value = env_variables["OTEL_EXPORTER_OTLP_HEADERS"].split('=')
otel_headers_dict = {key: value}

@bentoml.service(  # pylint: disable=no-member
    name="reconciliation-ranker",
    resources={"cpu": "cpu_count"},  # Adjust as needed
    traffic={"timeout": 10},
    monitoring={
        "enabled": True,
        "type": "otlp",
        "options": {
            "endpoint": env_variables['OTEL_EXPORTER_OTLP_ENDPOINT'] + "/v1/traces",
            "headers": otel_headers_dict,
            "insecure": False,
            "timeout": 10,
            "meta_sample_rate": 1.0,
        },
    },
)
class ReconciliationService:
    """
    Service for the reconciliation model to create APIs that return auto-match and recommendations
    """

    def __init__(self) -> None:
        """
        Initializes the Reconciliation Runner.

        Loads the model based on the provided paths and environment variables.
        """
        # Load the model
        env_vars = load_service_env_vars()

        self.model = load_reconciliation_engine(env_vars)

        with open(env_vars['ONLINE_STORE_CONFIG'], 'r') as json_file:
            online_store_config = json.load(json_file)

        self.feature_site_id = online_store_config['storage']['embedding_site']
        self.feature_provider_id = online_store_config['storage']['embedding_provider']

        self.online_store = create_online_store(env_vars['ONLINE_STORE_CONFIG'])

    @bentoml.api(input_spec=InferenceInput)  # pylint: disable=no-member
    def inference(self, **params: Any) -> Dict:
        """
        Predicts the reconciliation between the site and the provider tables
        """
        with bentoml.monitor("bentoml_reconciliation_ranker") as mon:

            mon.log("dummy_text", name="input_text", role="original_text", data_type="text")

            site_ids = params['site_ids']
            provider_ids = params['provider_ids']

            site_ids_array = np.array(site_ids)
            provider_ids_array = np.array(provider_ids)

            df_site = self.fetch_online_features(
                site_ids_array, feature_view_id=self.feature_site_id
            )
            df_provider = self.fetch_online_features(
                provider_ids_array, feature_view_id=self.feature_provider_id
            )

            df_site['encoded_date'] = encode_str_date(df_site.date.tolist())
            df_provider['encoded_date'] = encode_str_date(df_provider.date.tolist())

            response_dict = self.model.predict(
                df_site,
                df_provider,
                top_k=params['top_k'],
                remove_matched=params['remove_matched'],
                only_recommend=params['only_recommend'],
            )
            return response_dict

When tried to make an HTTP request, I got the following error:

2024-08-14 15:07:53,424 - bentoml._internal.server.http_app - ERROR - Exception on /inference [POST]
Traceback (most recent call last):
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/_bentoml_impl/server/app.py", line 561, in api_endpoint_wrapper
    resp = await self.api_endpoint(name, request)
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/_bentoml_impl/server/app.py", line 655, in api_endpoint
    output = await self._to_thread(func, *input_args, **input_params)
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/_bentoml_impl/server/app.py", line 507, in _to_thread
    output = await anyio.to_thread.run_sync(func, limiter=self._limiter)
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/anyio/to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 2177, in run_sync_in_worker_thread
    return await future
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 859, in run
    result = context.run(func, *args)
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/_bentoml_sdk/method.py", line 132, in wrapped
    return self.func(instance, *args, **kwargs)
  File "/home/consciousml/ml-reconciliation/bentoml_ranker/service.py", line 206, in inference
    with bentoml.monitor("bentoml_reconciliation_ranker") as mon:
  File "/usr/lib/python3.10/contextlib.py", line 142, in __exit__
    next(self.gen)
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/bentoml/_internal/monitoring/api.py", line 136, in monitor
    mon.stop_record()
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/bentoml/_internal/monitoring/base.py", line 75, in stop_record
    self.export_data(datas)
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/bentoml/_internal/monitoring/otlp.py", line 241, in export_data
    self._init_logger()
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/bentoml/_internal/monitoring/otlp.py", line 175, in _init_logger
    os.environ[OTEL_EXPORTER_OTLP_HEADERS] = self.headers
  File "/usr/lib/python3.10/os.py", line 685, in __setitem__
    value = self.encodevalue(value)
  File "/usr/lib/python3.10/os.py", line 757, in encode
    raise TypeError("str expected, not %s" % type(value).__name__)
TypeError: str expected, not dict

I realized that it was coming from the OTLPMonitor._init_logger when trying to set the environment variable.

I was confused as in the opentelemetry source code the OTLPLogExporter takes a dictionary as input: headers: Optional[Dict[str, str]] = None.

I found my way around it by not passing the header in the monitoring config as so:

@bentoml.service(  # pylint: disable=no-member
    name="reconciliation-ranker",
    resources={"cpu": "cpu_count"},  # Adjust as needed
    traffic={"timeout": 10},
    monitoring={
        "enabled": True,
        "type": "otlp",
        "options": {
            "endpoint": env_variables['OTEL_EXPORTER_OTLP_ENDPOINT'] + "/v1/traces",
            "insecure": False,
            "timeout": 10,
            "meta_sample_rate": 1.0,
        },
    },
)

The OTEL_EXPORTER_OTLP_ENDPOINT environment variable is used in the OLTPLogExporter anyway.

When I make a request, I get the error:

2024-08-14 15:18:30,234 - bentoml.access - INFO - 127.0.0.1:55220 (scheme=http,method=POST,path=/inference,type=multipart/form-data; boundary=03ad167cc00f4c3abdb5f98eae823407,length=137883) (status=200,type=application/json,length=127393) 2359.814ms
2024-08-14T15:18:34+0200 [INFO] [cli] Got signal SIG_WINCH
2024-08-14T15:18:35+0200 [DEBUG] [entry_service:reconciliation-ranker:1] Starting new HTTPS connection (1): 14175a94be434fad8b6c58a81013bd1d.apm.europe-west9.gcp.elastic-cloud.com:443
2024-08-14T15:18:35+0200 [DEBUG] [entry_service:reconciliation-ranker:1] https://14175a94be434fad8b6c58a81013bd1d.apm.europe-west9.gcp.elastic-cloud.com:443 "POST /v1/traces HTTP/11" 400 81
2024-08-14T15:18:35+0200 [DEBUG] [entry_service:reconciliation-ranker:1] Encoding detection: utf_8 will be used as a fallback match
2024-08-14T15:18:35+0200 [DEBUG] [entry_service:reconciliation-ranker:1] Encoding detection: Found utf_8 as plausible (best-candidate) for content. With 0 alternatives.
2024-08-14T15:18:35+0200 [ERROR] [entry_service:reconciliation-ranker:1] Failed to export logs batch code: 400, reason:Mfailed to unmarshal request body: proto: wrong wireType = 1 for field TraceId

Btw, I can't debug the parse_env_headers because when I run my service.py in debug mode, the log is added to the batch_log of the exporter but does not get sent.

Is there a way to access the TracerProvider from a BentoML method to force_flush?

To reproduce

No response

Expected behavior

The logs should be uploaded without error to Elastic APM.

Environment

bentoml: 1.3.0 python: 3.10.12 platform: Linux-5.15.153.1-microsoft-standard-WSL2-x86_64-with-glibc2.35

ConsciousML commented 2 months ago

I don't think that it is related to library versions as I was able to send logs to Elastic APM successfully with manual instrumentation.