Seldaek / monolog

Sends your logs to files, sockets, inboxes, databases and various web services
https://seldaek.github.io/monolog/
MIT License
21.01k stars 1.9k forks source link

Add support for Elasticsearch with Bulk API and data stream #1867

Closed muratpurc closed 9 months ago

muratpurc commented 9 months ago

Summary of problem or feature request

We use the Monolog\Handler\ElasticsearchHandler to write log entries to Elasticsearch (Bulk API and data stream) with Monolog.

It is not possible for us to use Monolog ElasticsearchHandler with the Elasticsearch PHP client in order to write entries into Elasticsearch by using Bulk API and data stream.

The versions used are:

Code snippet of problem

Our Monolog setup looks like this:

$index = 'my-elasticsearch-index';
$formatter = new Monolog\Formatter\ElasticsearchFormatter($index, '_doc');

$host = 'https://<username>:<password>@hostname';
$client = Elastic\Elasticsearch\ClientBuilder::create()
    ->setHosts([$host])
    ->build();

$handler = new Monolog\Handler\ElasticsearchHandler($client, ['op_type' => 'create']);
$handler->setFormatter($formatter);

$logger = new Monolog\Logger('Logger name', [$handler]);
$logger->info('Some message');

The Elasticsearch documentation says the following about Bulk API and data stream in it:

To automatically create a data stream, Elasticsearch expects the following request:

PUT my-data-stream/_bulk
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/ bg.jpg HTTP/1.0\" 200 24736" }
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon. ico HTTP/1.0\" 200 3638" }

In the current configuration, the request from above is sent as POST, which leads to an Elasticsearch error.

We can get around this by creating the data stream manually using an HttpClient that calls the API with the following request:

PUT _data_stream/my-data-stream

After that is is possible to add entries with the following request:

PUT my-data-stream/_bulk
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/ bg.jpg HTTP/1.0\" 200 24736" }
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon. ico HTTP/1.0\" 200 3638" }

It doesn't matter whether we create the data stream automatically or create it manually beforehand. When we want to add a log entry ($logger->info()), a parameter array is created in Monolog\Handler\ElasticsearchHandler->bulkSend(), which is not compatible with the Elasticsearch Bulk API and data stream. In order for the function Elastic\Elasticsearch\Traits\ClientEndpointsTrait->bulk() to create a request that is accepted by Elasticsearch, it needs some changes in Monolog\Handler\ElasticsearchHandler->bulkSend(). We managed this by adjusting bulkSend() as follows:

     protected function bulkSend(array $records): void
     {
         try {
             $params = [
                 'body' => [],
             ];

             foreach ($records as $record) {
                 if ($this->options['op_type'] === 'create') {
                     if (!isset($params['index'])) {
                         $params['index'] = $record['_index'];
                     }
                     $params['body'][] = ['create' => new \stdClass()];
                 } else {
                     $params['body'][] = [
                         'index' => $this->needsType ? [
                             '_index' => $record['_index'],
                             '_type' => $record['_type'],
                         ] : [
                             '_index' => $record['_index'],
                         ],
                     ];
                 }

                 unset($record['_index'], $record['_type']);

                 $params['body'][] = $record;
             }

             /** @var Elasticsearch */
             $responses = $this->client->bulk($params);

             if ($responses['errors'] === true) {
                 throw $this->createExceptionFromResponses($responses);
             }
         } catch (Throwable $e) {
             if (! $this->options['ignore_error']) {
                 throw new RuntimeException('Error sending messages to Elasticsearch', 0, $e);
             }
         }
     }

I'm not sure if that's the correct place and if it has any side effects to other Monolog-Elasticsearch-setups. It also seems to be necessary to make certain adjustments to the Elasticsearch PHP client. More on this is below. Therefore I didn't create a pull-request.

The Elastic\Elasticsearch\Traits\ClientEndpointsTrait->bulk() creates the following POST request, which according to the documentation should be PUT, but it still works:

POST my-data-stream/_bulk
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/ bg.jpg HTTP/1.0\" 200 24736" }
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon. ico HTTP/1.0\" 200 3638" }

The Elasticsearch PHP client should also be adapted to create a PUT request for Bulk API with data stream. I've created a ticket 1389 to address the issue on the Elasticsearch PHP client side.

It would be very desirable if both packages support the Elasticsearch Bulk API with data stream.

System details

muratpurc commented 9 months ago

The issue was with the used Monolog version 2.9.2, it works with Monolog >=3.3. The ticket can be closed.