Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: Unable to send monitoring logs to Elastic APM through OTLPHttpExporter #4919

Open
ConsciousML opened this issue Aug 14, 2024 · 3 comments
Labels
bug Something isn't working

Comments

@ConsciousML
Copy link

ConsciousML commented Aug 14, 2024

Describe the bug

Hi folks,

I'm having a issue to push monitoring logs to Elastic APM through OTLPHttpExporter.

I'm able to send dummy logs with this standalone code and see them in Elastic APM:

import os
import logging
from dotenv import load_dotenv
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource

if __name__ == '__main__':
    load_dotenv()

    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    otel_exporter_otlp_endpoint = os.environ.get('OTEL_EXPORTER_OTLP_ENDPOINT') + "/v1/traces"
    otel_exporter_otlp_headers = os.environ.get('OTEL_EXPORTER_OTLP_HEADERS')

    key, value = otel_exporter_otlp_headers.split('=')
    otel_headers_dict = {key: value}

    exporter = OTLPSpanExporter(endpoint=otel_exporter_otlp_endpoint, headers=otel_headers_dict)

    resource_attributes = os.environ.get('OTEL_RESOURCE_ATTRIBUTES')

    key_value_pairs = resource_attributes.split(',')
    result_dict = {}

    for pair in key_value_pairs:
        key, value = pair.split('=')
        result_dict[key] = value

    otel_service_name = "reconciliation-ranker"
    resource_attributes = {
        "service.name": otel_service_name,
        "service.version": result_dict['service.version'],
        "deployment.environment": result_dict['deployment.environment'],
    }

    resource = Resource.create(resource_attributes)

    provider = TracerProvider(resource=resource)
    processor = BatchSpanProcessor(exporter)
    provider.add_span_processor(processor)

    trace.set_tracer_provider(provider)

    tracer = trace.get_tracer(otel_service_name)

    try:
        with tracer.start_as_current_span("test-span") as span:
            logger.info("Hello from OpenTelemetry!")
            span.set_attribute("custom.attribute", "test-value")

        provider.force_flush()

        logger.info("Traces sent to Elastic APM")
    except Exception as e:
        logger.error(f"An error occurred: {str(e)}")

    provider.shutdown()

I load an .env file in the following format:

OTEL_EXPORTER_OTLP_ENDPOINT=my_elastic_apm_url
OTEL_EXPORTER_OTLP_HEADERS="Authorization=Bearer my_token"
OTEL_METRICS_EXPORTER=otlp
OTEL_LOGS_EXPORTER=otlp
OTEL_RESOURCE_ATTRIBUTES="service.name=my-service,service.version=0.0.1,deployment.environment=production"

First, I tried to input the same arguments in my BentoML service.py:

class InferenceInput(BaseModel):  # pylint: disable=too-few-public-methods
    """Pydantic class for the inputs of the inference API method of the ReconciliationService"""

    site_ids: List[str] = Field(description='The IDs of the reporting site data to reconciliate.')
    provider_ids: List[str] = Field(
        description='The IDs of the reporting provider data to reconciliate.'
    )
    top_k: int = Field(default=sys.maxsize, description='Number of recommendations to return.')
    only_recommend: bool = Field(
        default=False, description='Whether to only perform recommendation and not auto-match.'
    )
    remove_matched: bool = Field(
        default=True,
        description='Whether to remove the auto-match data from the recommendations.',
    )


env_variables = load_service_env_vars()

key, value = env_variables["OTEL_EXPORTER_OTLP_HEADERS"].split('=')
otel_headers_dict = {key: value}


@bentoml.service(  # pylint: disable=no-member
    name="reconciliation-ranker",
    resources={"cpu": "cpu_count"},  # Adjust as needed
    traffic={"timeout": 10},
    monitoring={
        "enabled": True,
        "type": "otlp",
        "options": {
            "endpoint": env_variables['OTEL_EXPORTER_OTLP_ENDPOINT'] + "/v1/traces",
            "headers": otel_headers_dict,
            "insecure": False,
            "timeout": 10,
            "meta_sample_rate": 1.0,
        },
    },
)
class ReconciliationService:
    """
    Service for the reconciliation model to create APIs that return auto-match and recommendations
    """

    def __init__(self) -> None:
        """
        Initializes the Reconciliation Runner.

        Loads the model based on the provided paths and environment variables.
        """
        # Load the model
        env_vars = load_service_env_vars()

        self.model = load_reconciliation_engine(env_vars)

        with open(env_vars['ONLINE_STORE_CONFIG'], 'r') as json_file:
            online_store_config = json.load(json_file)

        self.feature_site_id = online_store_config['storage']['embedding_site']
        self.feature_provider_id = online_store_config['storage']['embedding_provider']

        self.online_store = create_online_store(env_vars['ONLINE_STORE_CONFIG'])

    @bentoml.api(input_spec=InferenceInput)  # pylint: disable=no-member
    def inference(self, **params: Any) -> Dict:
        """
        Predicts the reconciliation between the site and the provider tables
        """
        with bentoml.monitor("bentoml_reconciliation_ranker") as mon:

            mon.log("dummy_text", name="input_text", role="original_text", data_type="text")

            site_ids = params['site_ids']
            provider_ids = params['provider_ids']

            site_ids_array = np.array(site_ids)
            provider_ids_array = np.array(provider_ids)

            df_site = self.fetch_online_features(
                site_ids_array, feature_view_id=self.feature_site_id
            )
            df_provider = self.fetch_online_features(
                provider_ids_array, feature_view_id=self.feature_provider_id
            )

            df_site['encoded_date'] = encode_str_date(df_site.date.tolist())
            df_provider['encoded_date'] = encode_str_date(df_provider.date.tolist())

            response_dict = self.model.predict(
                df_site,
                df_provider,
                top_k=params['top_k'],
                remove_matched=params['remove_matched'],
                only_recommend=params['only_recommend'],
            )
            return response_dict

When tried to make an HTTP request, I got the following error:

2024-08-14 15:07:53,424 - bentoml._internal.server.http_app - ERROR - Exception on /inference [POST]
Traceback (most recent call last):
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/_bentoml_impl/server/app.py", line 561, in api_endpoint_wrapper
    resp = await self.api_endpoint(name, request)
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/_bentoml_impl/server/app.py", line 655, in api_endpoint
    output = await self._to_thread(func, *input_args, **input_params)
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/_bentoml_impl/server/app.py", line 507, in _to_thread
    output = await anyio.to_thread.run_sync(func, limiter=self._limiter)
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/anyio/to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 2177, in run_sync_in_worker_thread
    return await future
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 859, in run
    result = context.run(func, *args)
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/_bentoml_sdk/method.py", line 132, in wrapped
    return self.func(instance, *args, **kwargs)
  File "/home/consciousml/ml-reconciliation/bentoml_ranker/service.py", line 206, in inference
    with bentoml.monitor("bentoml_reconciliation_ranker") as mon:
  File "/usr/lib/python3.10/contextlib.py", line 142, in __exit__
    next(self.gen)
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/bentoml/_internal/monitoring/api.py", line 136, in monitor
    mon.stop_record()
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/bentoml/_internal/monitoring/base.py", line 75, in stop_record
    self.export_data(datas)
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/bentoml/_internal/monitoring/otlp.py", line 241, in export_data
    self._init_logger()
  File "/home/consciousml/.virtualenvs/bentoml-rec-1.3/lib/python3.10/site-packages/bentoml/_internal/monitoring/otlp.py", line 175, in _init_logger
    os.environ[OTEL_EXPORTER_OTLP_HEADERS] = self.headers
  File "/usr/lib/python3.10/os.py", line 685, in __setitem__
    value = self.encodevalue(value)
  File "/usr/lib/python3.10/os.py", line 757, in encode
    raise TypeError("str expected, not %s" % type(value).__name__)
TypeError: str expected, not dict

I realized that it was coming from the OTLPMonitor._init_logger when trying to set the environment variable.

I was confused as in the opentelemetry source code the OTLPLogExporter takes a dictionary as input: headers: Optional[Dict[str, str]] = None.

I found my way around it by not passing the header in the monitoring config as so:

@bentoml.service(  # pylint: disable=no-member
    name="reconciliation-ranker",
    resources={"cpu": "cpu_count"},  # Adjust as needed
    traffic={"timeout": 10},
    monitoring={
        "enabled": True,
        "type": "otlp",
        "options": {
            "endpoint": env_variables['OTEL_EXPORTER_OTLP_ENDPOINT'] + "/v1/traces",
            "insecure": False,
            "timeout": 10,
            "meta_sample_rate": 1.0,
        },
    },
)

The OTEL_EXPORTER_OTLP_ENDPOINT environment variable is used in the OLTPLogExporter anyway.

When I make a request, I get the error:

2024-08-14 15:18:30,234 - bentoml.access - INFO - 127.0.0.1:55220 (scheme=http,method=POST,path=/inference,type=multipart/form-data; boundary=03ad167cc00f4c3abdb5f98eae823407,length=137883) (status=200,type=application/json,length=127393) 2359.814ms
2024-08-14T15:18:34+0200 [INFO] [cli] Got signal SIG_WINCH
2024-08-14T15:18:35+0200 [DEBUG] [entry_service:reconciliation-ranker:1] Starting new HTTPS connection (1): 14175a94be434fad8b6c58a81013bd1d.apm.europe-west9.gcp.elastic-cloud.com:443
2024-08-14T15:18:35+0200 [DEBUG] [entry_service:reconciliation-ranker:1] https://14175a94be434fad8b6c58a81013bd1d.apm.europe-west9.gcp.elastic-cloud.com:443 "POST /v1/traces HTTP/11" 400 81
2024-08-14T15:18:35+0200 [DEBUG] [entry_service:reconciliation-ranker:1] Encoding detection: utf_8 will be used as a fallback match
2024-08-14T15:18:35+0200 [DEBUG] [entry_service:reconciliation-ranker:1] Encoding detection: Found utf_8 as plausible (best-candidate) for content. With 0 alternatives.
2024-08-14T15:18:35+0200 [ERROR] [entry_service:reconciliation-ranker:1] Failed to export logs batch code: 400, reason:Mfailed to unmarshal request body: proto: wrong wireType = 1 for field TraceId

Btw, I can't debug the parse_env_headers because when I run my service.py in debug mode, the log is added to the batch_log of the exporter but does not get sent.

Is there a way to access the TracerProvider from a BentoML method to force_flush?

To reproduce

No response

Expected behavior

The logs should be uploaded without error to Elastic APM.

Environment

bentoml: 1.3.0
python: 3.10.12
platform: Linux-5.15.153.1-microsoft-standard-WSL2-x86_64-with-glibc2.35

@ConsciousML ConsciousML added the bug Something isn't working label Aug 14, 2024
@ConsciousML
Copy link
Author

I don't think that it is related to library versions as I was able to send logs to Elastic APM successfully with manual instrumentation.

@stephenhoran
Copy link

I believe not specifying otlp to use the http protocol will use grpc calls. We use the open telemetry operator to inject instrumentation into Benton’s via env variables that forward to an Otel collector and then from an otel collector forward to ES AP M but we has to be very specific about avoiding grpc.

@stephenhoran
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants