manticoresoftware / manticoresearch

Easy to use open source fast database for search | Good alternative to Elasticsearch now | Drop-in replacement for E in the ELK soon
https://manticoresearch.com
GNU General Public License v3.0
8.93k stars 495 forks source link

Native support for message queues #1500

Open AbstractiveNord opened 11 months ago

AbstractiveNord commented 11 months ago

Is your feature request related to a problem? Please describe. Data ingestion into ManticoreSearch is not trivial as it may be. SQL and JSON interfaces is great, but require some code to be written, tested, and deployed. Native support for tools like Vector.dev, FluentBit and others is great, but it's locked to logs usecase. Usually, data flows through messages queue, such as Apache Kafka, RabbitMQ, NATS Jetstream. Native support for consuming data from message queues should be easiest way to plug-and-play way to bring ManticoreSearch to the projects.

Describe the solution you'd like Native support ManticoreSearch for consuming data from message queues, dynamically manage them via SQL as data ingestion engines (may be different engine for different tables). Engines may be threat as backend, which makes periodical inserts into table. Good example of this feature is MQs support in ClickHouse database.

Describe alternatives you've considered Manually written a ton of backends by users of ManticoreSearch, with different interfaces, different approaches and bugs.

Additional context Related to: Issue1493

sanikolaev commented 8 months ago

@tomatolog please make a research of this task to prepare a better specification and estimate.

nickchomey commented 8 months ago

Related #1686, though I agree that a broader message queue abstraction would be more important than a specific NATS integration. What ClickHouse has done seems perfect!

tomatolog commented 8 months ago

could someone provide dockerized simple example of the setup Kafka > Clickhouse. There I could insert data into Kafka via curl then see its at the Clickhouse?

AbstractiveNord commented 8 months ago

could someone provide dockerized simple example of the setup Kafka > Clickhouse. There I could insert data into Kafka via curl then see its at the Clickhouse?

I can make docker compose example of Kafka and ClickHouse tomorrow. Meanwhile, Issue is not about just support Apache Kafka, but also NATS Jetstream, for example. Nice to see activity in there.

AbstractiveNord commented 8 months ago

Related #1686, though I agree that a broader message queue abstraction would be more important than a specific NATS integration. What ClickHouse has done seems perfect!

I want to clarify the difference. This Issue, #1500, is about support data ingestion from message queue. Issue #1686 is about support client API requests, such as searching, ingesting, deleting and operating indexes and others, directly from NATS Request-Reply API.

sanikolaev commented 8 months ago

A docker-compose based demo which exemplifies https://clickhouse.com/docs/en/engines/table-engines/integrations/nats would be also helpful. I think kafka vs nats is of less importance now than general understanding of the best way to deal with it in principle:

AbstractiveNord commented 8 months ago

A docker-compose based demo which exemplifies https://clickhouse.com/docs/en/engines/table-engines/integrations/nats would be also helpful. I think kafka vs nats is of less importance now than general understanding of the best way to deal with it in principle:

* inside the daemon or outside (in Buddy)

* hooks, queues, buffers, locks etc.

As I can see, Buddy may not help with data ingestion, but may help daemon to get information about Kafka cluster status and ingestion settings.

Also, have to notice, ClickHouse NATS integration is incomplete, because ClickHouse support NATS Core, not persistence network layer of NATS. Durable queues, such as Kafka, is about NATS Jetstream, which works on top of NATS Core. Jetstream is baked into same binary, nats-server, and available via startup flag. Dockerized example of ClickHouse -- NATS will be also provided.

AbstractiveNord commented 8 months ago

About Clickhouse -- Kafka, have you tried this example from @nmicra? I tested it, clickhouse-client binary require port 9000, so I just opened 9000 port for "clickhouseSrv" service in docker compose file, and that's it. Docker compose service now looks like:

clickhouseSrv:
    image: 'yandex/clickhouse-server:latest'
    ports:
      - '8123:8123'
      - '9000:9000'
    networks:
      - app-tier
tomatolog commented 8 months ago

does MQ implementation need setup of all consumers, topics, partitions and that management? or the only scenation is to fetch the data from the MQ topic into Manticore RT index.

As complex setup is not very user friendly and might fit only DB masters these could use custom app instead tring to setup multiple Manticore nodes and make them to use partitions to fetch the data with the max throutput.

AbstractiveNord commented 8 months ago

does MQ implementation need setup of all consumers, topics, partitions and that management? or the only scenation is to fetch the data from the MQ topic into Manticore RT index.

As complex setup is not very user friendly and might fit only DB masters these could use custom app instead tring to setup multiple Manticore nodes and make them to use partitions to fetch the data with the max throutput.

Kafka setup require that management, yes, but ManticoreSearch not need to manage it. All settings comes from user. There's two scenarios:

  1. Ingest data into RT index
  2. Query data through PQ index

Most benefit may be achieved with distributed tables and shards, which manages data ingestion by self. No way to user write some app, which can get actual ManticoreSearch cluster topology, statuses of every node, and updates. It's even close to kubernetes operator rather user app.

I consider this example as user friendly if user is familiar with MQs like Kafka, but if I am wrong, please share any problems you meet.

Also, check NATS examples, such as this subscription There some work on ClickHouse -- NATS integration, may worth attention. NATS have bridge to Kafka and backwards, so there's no problem.

sanikolaev commented 8 months ago

As discussed, here's what we can do:

  1. There's no compelling reason to implement this within the daemon. For example, librdkafka, which we plan to use, has its own multithreading that can conflict with the daemon's coroutine-based multithreading, etc. There's a small benefit in that we wouldn't have to parse incoming data twice, but this can probably be neglected given the effort it would take to implement it in C++. So, we can implement it in Buddy.
  2. In the future, we can extrapolate this idea to other sources that we already support in plain indexation: mysql, pgsql, etc., to make it possible to sync data from them to a real-time table.
  3. Since the term source is already well known, we can use it too. Then, the SQL syntax for creating a source for Kafka would be create source (<fields>) type='kafka'. Doing this would result in creating a real-time table with a special name to store the kafka metadata. The table can later be hidden from show tables, similar to how we plan to hide special tables related to autosharding.
  4. Specifying fields in the source is important since the set of field names in the source might differ from the schema in the destination table.
  5. Now we need to link the Kafka source and the RT table. The command CREATE MATERIALIZED VIEW view_name TO rt_table_name AS SELECT * FROM kafka_source_name appears to be ok. CREATE MATERIALIZED VIEW is a well known term and is used in Postgres and Clickhouse. CREATE EVENT, CREATE TABLE ... SELECT and triggers that is the mysql's alternative seems to be much more complicated. The SELECT ... clause is necessary to establish the mapping between the source and destination schemas.
  6. SHOW SOURCES/VIEWS should show sources/views.
  7. DROP SOURCE/VIEW should drop the source/view.
  8. ALTER SOURCE/VIEW can be postponed for now.
  9. CREATE SOURCE/VIEW ... LIKE should be supported.
AbstractiveNord commented 8 months ago

As discussed, here's what we can do:

1. There's no compelling reason to implement this within the daemon. For example, librdkafka, which we plan to use, has its own multithreading that can conflict with the daemon's coroutine-based multithreading, etc. There's a small benefit in that we wouldn't have to parse incoming data twice, but this can probably be neglected given the effort it would take to implement it in C++. So, we can implement it in Buddy.

2. In the future, we can extrapolate this idea to other sources that we already support in plain indexation: mysql, pgsql, etc., to make it possible to sync data from them to a real-time table.

3. Since the term `source` is already well known, we can use it too. Then, the SQL syntax for creating a source for Kafka would be `create source (<fields>) type='kafka'`. Doing this would result in creating a real-time table with a special name to store the kafka metadata. The table can later be hidden from `show tables`, similar to how we plan to hide special tables related to autosharding.

4. Specifying fields in the source is important since the set of field names in the source might differ from the schema in the destination table.

5. Now we need to link the Kafka source and the RT table. The command `CREATE MATERIALIZED VIEW view_name TO rt_table_name AS SELECT * FROM kafka_source_name` appears to be ok. `CREATE MATERIALIZED VIEW` is a well known term and is used in Postgres and Clickhouse. `CREATE EVENT`, `CREATE TABLE ... SELECT` and triggers that is the mysql's alternative seems to be much more complicated.  The `SELECT ...` clause is necessary to establish the mapping between the source and destination schemas.

6. `SHOW SOURCES/VIEWS` should show sources/views.

7. `DROP SOURCE/VIEW` should drop the source/view.

8. `ALTER SOURCE/VIEW` can be postponed for now.

9. `CREATE SOURCE/VIEW ... LIKE` should be supported.
  1. Please benchmark this approach on different batch sizes. That's concern.
  2. There need option to ALTER sources, change batch size, consumer, add and remove brokers addresses, etc. Please, ensure special table for offsets. That's single way to achieve exactly once delivery.
  3. MATERIALIZED VIEW may contain complex queries, will it be supported?
djklim87 commented 7 months ago

Blocked by https://github.com/manticoresoftware/manticoresearch/issues/1662

sanikolaev commented 7 months ago

Blocked by https://github.com/manticoresoftware/manticoresearch/issues/1662

Done

djklim87 commented 6 months ago

Blocked by https://github.com/manticoresoftware/manticoresearch-buddy/issues/244

AbstractiveNord commented 6 months ago

@djklim87 Please comment NATS PHP Client stream example, is it easier than Kafka integration, may be some thoughts on that?

djklim87 commented 5 months ago

@donhardman https://github.com/manticoresoftware/manticoresearch-buddy/pull/224 Review please

djklim87 commented 4 months ago

Just for help

docker compose --profile=queues -f docker-compose-dev.yml up -d
docker exec kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic my-data --partitions 4 --bootstrap-server localhost:9092
docker exec manticore-buddy composer update
docker exec -it manticore-buddy searchd --nodetach

docker exec manticore-buddy mysql -h0 -P9306 -e "CREATE SOURCE kafka (id bigint, term text, abbrev text, GlossDef json) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore' num_consumers='2' batch=50;CREATE TABLE destination_kafka (id bigint, name text, short_name text, received_at text, size multi);CREATE MATERIALIZED VIEW view_table TO destination_kafka AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size FROM kafka"

docker exec kafka ./import.sh
sanikolaev commented 4 months ago

Just for help

Looks like a good base for the test

djklim87 commented 4 months ago
djklim87 commented 4 months ago

@donhardman check why shutdown not work correctly https://github.com/manticoresoftware/manticoresearch-buddy/pull/276

donhardman commented 4 months ago

After researching and testing how we use workers, I found the simplest and most manageable way to implement them.

You can view the changes in this pull request: https://github.com/manticoresoftware/buddy-core/pull/50

I rewrote and tested the code for using workers, pushing those changes to the same branch. It's working as expected, and I'm seeing the results I wanted.

Image

sanikolaev commented 4 months ago

docs update https://github.com/manticoresoftware/manticoresearch/pull/2128

Done and waiting for the functionality to be released (it's not going to be included in the upcoming one).

djklim87 commented 4 months ago
djklim87 commented 2 months ago

@djklim87 new issues for me

Everything done in https://github.com/manticoresoftware/manticoresearch-buddy/pull/319

PavelShilin89 commented 2 months ago

@djklim87 ran into a problem, when working with timestamp, can't get the data even when escaping and running in manual mode without Clt. I did it this way:

dc11b00d40bd:/.clt# docker exec manticore mysql -h0 -P9306 -e "CREATE SOURCE kafka_ts (id bigint, term text, abbrev text, GlossDef json, \`timestamp\` timestamp) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore_ts' num_consumers='1' batch=50;"; echo $?
0
dc11b00d40bd:/.clt# docker exec manticore mysql -h0 -P9306 -e "CREATE TABLE destination_kafka_ts (id bigint, name text, short_name text, received_at text, size multi, timestamp_field timestamp);"; echo $?
0
dc11b00d40bd:/.clt# docker exec manticore mysql -h0 -P9306 -e "CREATE MATERIALIZED VIEW view_table_ts TO destination_kafka_ts AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size, \`timestamp\` as timestamp_field FROM kafka_ts WHERE timestamp_field > 1688990400;"; echo $?
0
dc11b00d40bd:/.clt# docker exec kafka ./import.sh; echo $?
0
dc11b00d40bd:/.clt# sleep 20; docker exec manticore mysql -h0 -P9306 -e "SELECT * FROM destination_kafka_ts;"
dc11b00d40bd:/.clt# docker exec manticore mysql -h0 -P9306 -e "show source kafka_ts;"
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Source   | Create Table                                                                                                                                                                                                           |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| kafka_ts | CREATE SOURCE kafka_ts
(id bigint, term text, abbrev text, GlossDef json, `timestamp` timestamp)
type='kafka'
broker_list='kafka:9092'
topic_list='my-data'
consumer_group='manticore_ts'
num_consumers='1'
 batch=50 |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
dc11b00d40bd:/.clt# docker exec manticore mysql -h0 -P9306 -e "show mv view_table_ts;"
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
| View          | Create Table                                                                                                                                                                                                                                            | suspended |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
| view_table_ts | CREATE MATERIALIZED VIEW view_table_ts TO destination_kafka_ts AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size, `timestamp` as timestamp_field FROM kafka_ts WHERE timestamp_field > 1688990400 | 0         |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
PavelShilin89 commented 2 months ago

@djklim87 I also tried replacing the timestamp field name with another field name that is not an exception, but that didn't work either.

djklim87 commented 1 month ago

@PavelShilin89 Everything fine

mysql> CREATE SOURCE kafka_ts (id bigint, term text, abbrev text, GlossDef json, `timestamp` timestamp) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore_ts' num_consumers='1' batch=50;
mysql> CREATE TABLE destination_kafka_ts (id bigint, name text, short_name text, received_at text, size multi, timestamp_field timestamp);
mysql> CREATE MATERIALIZED VIEW view_table_ts TO destination_kafka_ts AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size, `timestamp` as timestamp_field FROM kafka_ts WHERE timestamp_field > 1722436718;

mysql> select * from destination_kafka_ts;
+------+---------------------------------------------+-----------------+---------------------+----------+-----------------+
| id   | name                                        | short_name      | received_at         | size     | timestamp_field |
+------+---------------------------------------------+-----------------+---------------------+----------+-----------------+
|    1 | The Quick Brown Fox                         | ISO 12345:6789  | 2024-07-31 14:45:28 | 20,25,30 |      1722436719 |
|    2 | Jumped Over The Lazy Dog                    | ISO 98765:4321  | 2024-07-31 14:45:28 | 20,25,30 |      1722436720 |
|    3 | A Stitch In Time Saves Nine                 | ISO 24680:1357  | 2024-07-31 14:45:28 | 20,25,30 |      1722436721 |
|    4 | An Apple A Day Keeps The Doctor Away        | ISO 97531:86420 | 2024-07-31 14:45:28 | 20,25,30 |      1722436722 |
|    5 | Actions Speak Louder Than Words             | ISO 56789:12345 | 2024-07-31 14:45:28 | 20,25,30 |      1722436723 |
|    6 | Every Cloud Has A Silver Lining             | ISO 86420:97531 | 2024-07-31 14:45:28 | 20,25,30 |      1722436724 |
|    7 | Beauty Is In The Eye Of The Beholder        | ISO 13579:86420 | 2024-07-31 14:45:28 | 20,25,30 |      1722436725 |
|    8 | Birds Of A Feather Flock Together           | ISO 97531:24680 | 2024-07-31 14:45:28 | 20,25,30 |      1722436726 |
|    9 | You Can't Judge A Book By Its Cover         | ISO 56789:86420 | 2024-07-31 14:45:28 | 20,25,30 |      1722436727 |
|   10 | Two Heads Are Better Than One               | ISO 86420:97531 | 2024-07-31 14:45:28 | 20,25,30 |      1722436728 |
|   11 | The Early Bird Catches The Worm             | ISO 97531:86420 | 2024-07-31 14:45:28 | 20,25,30 |      1722436729 |
|   12 | All That Glitters Is Not Gold               | ISO 24680:1357  | 2024-07-31 14:45:28 | 2,3,20   |      1722436730 |
|   13 | A Penny For Your Thoughts                   | ISO 97531:86420 | 2024-07-31 14:45:28 | 20,25,30 |      1722436731 |
|   14 | Bite The Bullet                             | ISO 13579:86420 | 2024-07-31 14:45:28 | 20,25,30 |      1722436732 |
|   15 | Break A Leg                                 | ISO 56789:12345 | 2024-07-31 14:45:28 | 20,25,30 |      1722436733 |
|   16 | A Picture Is Worth A Thousand Words         | ISO 86420:97531 | 2024-07-31 14:45:28 | 20,25,30 |      1722436734 |
|   17 | Curiosity Killed The Cat                    | ISO 13579:86420 | 2024-07-31 14:45:28 | 20,25,30 |      1722436735 |
|   18 | Don't Count Your Chickens Before They Hatch | ISO 97531:24680 | 2024-07-31 14:45:28 | 20,25,30 |      1722436736 |
|   19 | Every Dog Has His Day                       | ISO 56789:86420 | 2024-07-31 14:45:28 | 20,25,30 |      1722436737 |
|   20 | Haste Makes Waste                           | ISO 86420:97531 | 2024-07-31 14:45:28 | 20,25,30 |      1722436738 |
+------+---------------------------------------------+-----------------+---------------------+----------+-----------------+
mysql> show version;
+-----------+----------------------------------+
| Component | Version                          |
+-----------+----------------------------------+
| Daemon    | 6.3.5 01ef582bc@24073104 dev     |
| Columnar  | columnar 2.3.1 eb62283@24072718  |
| Secondary | secondary 2.3.1 eb62283@24072718 |
| KNN       | knn 2.3.1 eb62283@24072718       |
| Buddy     | buddy v2.3.13                    |
+-----------+----------------------------------+
djklim87 commented 1 month ago

I used this dump. Check yourself

dump_with_unique_timestamps.json

djklim87 commented 1 month ago

@djklim87

Waiting for MR https://github.com/manticoresoftware/manticoresearch-buddy/pull/336/files

djklim87 commented 1 month ago

@PavelShilin89

PavelShilin89 commented 1 month ago

@djklim87 I sent the dump.json I am working with. Found another bug, despite the batch=50 specified when creating the source, the created table receives significantly fewer rows.

mysql> CREATE SOURCE kafka (id bigint, term text, abbrev text, GlossDef json) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore' num_consumers='2' batch=50;
mysql> CREATE TABLE destination_kafka (id bigint, name text, short_name text, received_at text, size multi);
mysql> CREATE MATERIALIZED VIEW view_table TO destination_kafka AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size FROM kafka;
docker exec kafka chmod +x ./import.sh; docker exec kafka ./import.sh;
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -q -m2 "REPLACE+INTO+destination_kafka" && echo "Download completed."' || echo "Download failed."
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka;"
+----------+
| count(*) |
+----------+
|       20 |
+----------+
djklim87 commented 1 month ago

Waiting for approve https://github.com/manticoresoftware/manticoresearch-buddy/pull/339

djklim87 commented 1 month ago

Fixed since https://github.com/manticoresoftware/manticoresearch/commit/9017e4e320e99cca9a6fe05e4454758c56b415fd

djklim87 commented 1 month ago

Tests for:

PavelShilin89 commented 1 month ago

@djklim87 @donhardman I found another issue during testing, it is that after restarting the daemon, buddy can't pick up logs recorded before stopping, although the expected behavior is buddy should start reading from where the logging stopped when stopping the daemon.

MRE

#!/bin/bash

# Create Docker network
docker network create app-network --driver bridge > /dev/null

# Start Manticore container
docker run -it -e EXTRA=1 --network=app-network --platform linux/amd64 --name manticore -d ghcr.io/manticoresoftware/manticoresearch:test-kit-latest bash > /dev/null 2>&1

# Modify Manticore configuration file
docker exec manticore sed -i '/data_dir = \/var\/lib\/manticore/a\    buddy_path = manticore-executor -n /usr/share/manticore/modules/manticore-buddy/src/main.php --debugv\n' /etc/manticoresearch/manticore.conf

# Start Manticore
docker exec manticore searchd

# Start Kafka container
docker run -it -d -e EXTRA=1 --network=app-network --name kafka -v ./test/clt-tests/kafka/import.sh:/import.sh -v ./test/clt-tests/kafka/dump.json:/tmp/dump.json -e KAFKA_CFG_NODE_ID=0 -e KAFKA_CFG_PROCESS_ROLES=controller,broker -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT bitnami/kafka:3.7.0 > /dev/null 2>&1

# Create Kafka topic
docker exec kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic my-data --partitions 4 --bootstrap-server localhost:9092 2>&1 | grep -o 'Created topic my-data\.' | head -n 1

# Create Kafka source in Manticore
docker exec manticore mysql -h0 -P9306 -e "CREATE SOURCE kafka_stop (id bigint, term text, abbrev text, GlossDef json, location json, metadata json, tags json, timestamp_unix timestamp) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore_combined' num_consumers='1' batch=50;"

# Create destination table in Manticore
docker exec manticore mysql -h0 -P9306 -e "CREATE TABLE destination_kafka_stop (id bigint, name text, short_name text, received_at text, size multi, lat float, lon float, views int, info text, tags json, timestamp_combined timestamp);"

# Create materialized view in Manticore
docker exec manticore mysql -h0 -P9306 -e "CREATE MATERIALIZED VIEW view_table_stop TO destination_kafka_stop AS SELECT id, term AS name, abbrev AS short_name, UTC_TIMESTAMP() AS received_at, GlossDef.size AS size, location.lat AS lat, location.lon AS lon, metadata.views AS views, metadata.info AS info, tags, timestamp_unix AS timestamp_combined FROM kafka_stop;"

# Run Kafka import script
docker exec kafka chmod +x ./import.sh
docker exec kafka ./import.sh

# Wait for data to be processed
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m1 "REPLACE+INTO+destination_kafka_stop" > /dev/null && echo "Download completed."' || echo "Download failed."

# Display Manticore log
docker exec manticore cat /var/log/manticore/searchd.log

# Query the count of records in the destination table
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_stop;"

# Stop Manticore
sleep 1
docker exec manticore searchd --stopwait

# Start Manticore again
docker exec manticore searchd

# Query the count of records again
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_stop;"

# Display Manticore log again
docker exec manticore cat /var/log/manticore/searchd.log

Expected Result - the logs show two lines containing REPLACE+INTO+destination_kafka_stop

timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m2 "REPLACE+INTO+destination_kafka_stop" > /dev/null && echo "Download completed."' || echo "Download failed."

Download completed
djklim87 commented 1 month ago

@djklim87 @donhardman I found another issue during testing, it is that after restarting the daemon, buddy can't pick up logs recorded before stopping, although the expected behavior is buddy should start reading from where the logging stopped when stopping the daemon.

I'll take a look

djklim87 commented 1 month ago

I cant represent this problem. Try code below, maybe you even don't need to fix this bug

#!/usr/bin/env bash

set -x

docker stop kafka && docker rm kafka
docker stop manticore && docker rm manticore

wget -O /tmp/import.sh https://raw.githubusercontent.com/manticoresoftware/manticoresearch-buddy/main/test/Kafka/import.sh
wget -O /tmp/dump.json https://raw.githubusercontent.com/manticoresoftware/manticoresearch-buddy/main/test/Kafka/dump.json

docker network create app-network --driver bridge
docker run -it -e EXTRA=1 --network=app-network \
--platform linux/amd64 \
--name manticore -d ghcr.io/manticoresoftware/manticoresearch:test-kit-latest bash
docker exec manticore sed -i '/data_dir = \/var\/lib\/manticore/a\    buddy_path = manticore-executor -n /usr/share/manticore/modules/manticore-buddy/src/main.php --debugv\n' /etc/manticoresearch/manticore.conf
docker run -it -d --network=app-network \
--name kafka \
-v /tmp/import.sh:/import.sh \
-v /tmp/dump.json:/tmp/dump.json \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT bitnami/kafka:3.7.0
docker exec kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic my-data --partitions 4 --bootstrap-server localhost:9092
docker exec manticore searchd
#Wait until buddy started
sleep 10
docker exec manticore mysql -h0 -P9306 -e "CREATE SOURCE kafka_stop (id bigint, term text, abbrev text, GlossDef json, location json, metadata json, tags json, timestamp_unix timestamp) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore_combined' num_consumers='1' batch=50;CREATE TABLE destination_kafka_stop (id bigint, name text, short_name text, received_at text, size multi, lat float, lon float, views int, info text, tags json, timestamp_combined timestamp);CREATE MATERIALIZED VIEW view_table_stop TO destination_kafka_stop AS SELECT id, term AS name, abbrev AS short_name, UTC_TIMESTAMP() AS received_at, GlossDef.size AS size, location.lat AS lat, location.lon AS lon, metadata.views AS views, metadata.info AS info, tags, timestamp_unix AS timestamp_combined FROM kafka_stop;"
docker exec kafka chmod +x ./import.sh
docker exec kafka ./import.sh
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m1 "REPLACE+INTO+destination_kafka_stop" > /dev/null && echo "Download completed."' || echo "Download failed."
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_stop;"

# ---------------------- Should be one row ---------------- #
docker exec manticore cat /var/log/manticore/searchd.log | grep "REPLACE+INTO+destination_kafka_stop"

sleep 1
docker exec manticore searchd --stopwait
docker exec manticore searchd
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m2 "REPLACE+INTO+destination_kafka_stop" > /dev/null && echo "Download completed."' || echo "Download failed."
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_stop;"

# ---------------------- Should be two rows ---------------- #
docker exec manticore cat /var/log/manticore/searchd.log | grep "REPLACE+INTO+destination_kafka_stop"
PavelShilin89 commented 3 days ago

Blocked by https://github.com/orgs/manticoresoftware/projects/3/views/16?pane=issue&itemId=80141646