elastic / elasticsearch-php

Official PHP client for Elasticsearch.
https://www.elastic.co/guide/en/elasticsearch/client/php-api/current/index.html
MIT License
5.25k stars 963 forks source link

Add support for Elasticsearch with Bulk API and data stream #1389

Closed muratpurc closed 5 months ago

muratpurc commented 5 months ago

Summary of problem or feature request

We use the Monolog\Handler\ElasticsearchHandler and the Elasticsearch PHP client to write log entries to Elasticsearch (Bulk API and data stream) with Monolog.

It is not possible for us to use Monolog ElasticsearchHandler with the Elasticsearch PHP client in order to write entries into Elasticsearch by using Bulk API and data stream.

The versions used are:

Code snippet of problem

Our Monolog setup looks like this:

$index = 'my-elasticsearch-index';
$formatter = new Monolog\Formatter\ElasticsearchFormatter($index, '_doc');

$host = 'https://<username>:<password>@hostname';
$client = Elastic\Elasticsearch\ClientBuilder::create()
    ->setHosts([$host])
    ->build();

$handler = new Monolog\Handler\ElasticsearchHandler($client, ['op_type' => 'create']);
$handler->setFormatter($formatter);

$logger = new Monolog\Logger('Logger name', [$handler]);
$logger->info('Some message');

The Elasticsearch documentation says the following about Bulk API and data stream in it:

To automatically create a data stream, Elasticsearch expects the following request:

PUT my-data-stream/_bulk
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/ bg.jpg HTTP/1.0\" 200 24736" }
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon. ico HTTP/1.0\" 200 3638" }

In the current configuration, the request from above is sent as POST, which leads to an Elasticsearch error.

We can get around this by creating the data stream manually using an HttpClient that calls the API with the following request:

PUT _data_stream/my-data-stream

After that is is possible to add entries with the following request:

PUT my-data-stream/_bulk
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/ bg.jpg HTTP/1.0\" 200 24736" }
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon. ico HTTP/1.0\" 200 3638" }

It doesn't matter whether we create the data stream automatically or create it manually beforehand. When we want to add a log entry ($logger->info()), a parameter array is created in Monolog\Handler\ElasticsearchHandler->bulkSend(), which is not compatible with the Elasticsearch Bulk API and data stream. In order for the function Elastic\Elasticsearch\Traits\ClientEndpointsTrait->bulk() to create a request that is accepted by Elasticsearch, it needs some changes in Monolog\Handler\ElasticsearchHandler->bulkSend(). We managed this by adjusting bulkSend() as follows:

     protected function bulkSend(array $records): void
     {
         try {
             $params = [
                 'body' => [],
             ];

             foreach ($records as $record) {
                 if ($this->options['op_type'] === 'create') {
                     if (!isset($params['index'])) {
                         $params['index'] = $record['_index'];
                     }
                     $params['body'][] = ['create' => new \stdClass()];
                 } else {
                     $params['body'][] = [
                         'index' => $this->needsType ? [
                             '_index' => $record['_index'],
                             '_type' => $record['_type'],
                         ] : [
                             '_index' => $record['_index'],
                         ],
                     ];
                 }

                 unset($record['_index'], $record['_type']);

                 $params['body'][] = $record;
             }

             /** @var Elasticsearch */
             $responses = $this->client->bulk($params);

             if ($responses['errors'] === true) {
                 throw $this->createExceptionFromResponses($responses);
             }
         } catch (Throwable $e) {
             if (! $this->options['ignore_error']) {
                 throw new RuntimeException('Error sending messages to Elasticsearch', 0, $e);
             }
         }
     }

I'm not sure if that's the correct place and if it has any side effects to other Monolog-Elasticsearch-setups. It also seems to be necessary to make certain adjustments to the Elasticsearch PHP client. More on this is below. Therefore I didn't create a pull-request at Monolog, instead of this, I've created a ticket 1867 to address the issue on the Monolog side.

The Elastic\Elasticsearch\Traits\ClientEndpointsTrait->bulk() creates the following POST request, which according to the documentation should be PUT, but it still works:

POST my-data-stream/_bulk
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/ bg.jpg HTTP/1.0\" 200 24736" }
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon. ico HTTP/1.0\" 200 3638" }

The function ClientEndpointsTrait->bulk() should also be adapted to create a PUT request for Bulk API with data stream.

It would be very desirable if both packages support the Elasticsearch Bulk API with data stream.

System details

muratpurc commented 5 months ago

The issue was with the used Monolog version 2.9.2, it works with Monolog >=3.3. The ticket can be closed.