manticoresoftware / manticoresearch

Easy to use open source fast database for search | Good alternative to Elasticsearch now | Drop-in replacement for E in the ELK soon
https://manticoresearch.com
GNU General Public License v3.0
9.03k stars 506 forks source link

suspended=1 does not stop data processing #2572

Closed PavelShilin89 closed 3 days ago

PavelShilin89 commented 1 month ago

In the kafka and manticore integration test, suspended=1 does not stop data processing. It should be noted that detection of this problem is possible under certain conditions. I discovered this problem this way:

MRE


docker network create app-network --driver bridge > /dev/null; echo $?
docker run -it -e EXTRA=1 --network=app-network --platform linux/amd64 --name manticore -d ghcr.io/manticoresoftware/manticoresearch:test-kit-latest bash > /dev/null 2>&1; echo $?
docker exec manticore sed -i '/data_dir = \/var\/lib\/manticore/a\    buddy_path = manticore-executor -n /usr/share/manticore/modules/manticore-buddy/src/main.php --debugv\n' /etc/manticoresearch/manticore.conf
docker exec manticore searchd
docker run -it -d -e EXTRA=1 --network=app-network --name kafka -v ./test/clt-tests/integrations/kafka/import.sh:/import.sh -v ./test/clt-tests/integrations/kafka/dump.json:/tmp/dump.json -e KAFKA_CFG_NODE_ID=0 -e KAFKA_CFG_PROCESS_ROLES=controller,broker -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT bitnami/kafka:3.7.0 > /dev/null 2>&1; echo $?
docker exec kafka kafka-topics.sh --create --topic my-data --partitions 4 --bootstrap-server localhost:9092 2>&1 | grep -o 'Created topic my-data\.' | head -n 1
docker exec manticore mysql -h0 -P9306 -e "CREATE SOURCE kafka (id bigint, term text, abbrev text, GlossDef json) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore' num_consumers='1' batch=50;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE TABLE destination_kafka (id bigint, name text, short_name text, received_at text, size multi);"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE MATERIALIZED VIEW view_table TO destination_kafka AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size FROM kafka"; echo $?
docker exec kafka chmod +x ./import.sh; docker exec kafka ./import.sh; echo $?
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m2 "REPLACE+INTO+destination_kafka" > /dev/null && echo "Data processing completed."' || echo "Data processing failed."
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka;"
docker exec manticore mysql -h0 -P9306 -e "SELECT * FROM destination_kafka ORDER BY id ASC;"
docker exec manticore mysql -h0 -P9306 -e "DROP SOURCE kafka;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "DROP table destination_kafka;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "SHOW TABLES;"
docker exec kafka kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group manticore_destination --reset-offsets --to-latest --topic my-data --execute > /dev/null; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE SOURCE kafka (id bigint, term text, abbrev text, GlossDef json, location json) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore_destination' num_consumers='1' batch=50;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE TABLE destination_kafka (id bigint, name text, short_name text, received_at text, size multi, lat float, lon float, distance float);"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE MATERIALIZED VIEW view_table_destination TO destination_kafka AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size, location.lat as lat, location.lon as lon, GEODIST(lat, lon, 49.0, 3.0) AS distance FROM kafka"; echo $?
docker exec kafka ./import.sh; echo $?
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m2 "REPLACE+INTO+destination_kafka" > /dev/null && echo "Data processing completed."' || echo "Data processing failed."
docker exec manticore mysql -h0 -P9306 -e "SELECT id, name, short_name, received_at, size, lat, lon AS distance FROM destination_kafka ORDER BY id ASC;"
docker exec kafka kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group manticore_metadata --reset-offsets --to-latest --topic my-data --execute > /dev/null; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE SOURCE kafka_metadata (id bigint, term text, abbrev text, GlossDef json, metadata json) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore_metadata' num_consumers='1' batch=50;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE TABLE destination_kafka_metadata (id bigint, name text, short_name text, received_at text, size multi, views int, info text);"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE MATERIALIZED VIEW view_table_metadata TO destination_kafka_metadata AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size, metadata.views as views, metadata.info as info FROM kafka_metadata WHERE views > 1000;"; echo $?
docker exec kafka ./import.sh; echo $?
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m2 "REPLACE+INTO+destination_kafka_metadata" > /dev/null && echo "Data processing completed."' || echo "Data processing failed."
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_metadata;"
docker exec manticore mysql -h0 -P9306 -e "SELECT * FROM destination_kafka_metadata ORDER BY id ASC;"
docker exec kafka kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group manticore_tags --reset-offsets --to-latest --topic my-data --execute > /dev/null; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE SOURCE kafka_tags (id bigint, term text, abbrev text, GlossDef json, tags json) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore_tags' num_consumers='1' batch=50;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE TABLE destination_kafka_tags (id bigint, name text, short_name text, received_at text, size multi, tags json);"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE MATERIALIZED VIEW view_table_tags TO destination_kafka_tags AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size, tags FROM kafka_tags WHERE tags IN ('item1', 'item2');"; echo $?
docker exec kafka ./import.sh; echo $?
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m2 "REPLACE+INTO+destination_kafka_tags" > /dev/null && echo "Data processing completed."' || echo "Data processing failed."
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_tags;"
docker exec manticore mysql -h0 -P9306 -e "SELECT * FROM destination_kafka_tags ORDER BY id ASC;"
docker exec kafka kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group manticore_alter --reset-offsets --to-latest --topic my-data --execute > /dev/null; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE SOURCE kafka_alter (id bigint, term text, abbrev text, GlossDef json, metadata json) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore_alter' num_consumers='1' batch=50;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE TABLE destination_kafka_alter (id bigint, name text, short_name text, received_at text, size multi, views bigint);"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE MATERIALIZED VIEW view_table_alter TO destination_kafka_alter AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size, metadata.views as views FROM kafka_alter;"; echo $?
docker exec kafka ./import.sh; echo $?
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m1 "REPLACE+INTO+destination_kafka_alter" > /dev/null && echo "Data processing completed."' || echo "Data processing failed."
docker exec manticore mysql -h0 -P9306 -e "ALTER MATERIALIZED VIEW view_table_alter suspended=1;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
sleep 10; docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
docker exec manticore mysql -h0 -P9306 -e "ALTER MATERIALIZED VIEW view_table_alter suspended=0;"; echo $?
timeout 120 bash -c 'while [[ $(docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;" | grep -o "[0-9]*") -ne 57 ]]; do sleep 1; done && echo "Data processing completed."'
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"

Expected result:

––– input –––
docker exec kafka ./import.sh; echo $?
––– output –––
0
––– input –––
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m1 "REPLACE+INTO+destination_kafka_alter" > /dev/null && echo "Data processing completed."' || echo "Data processing failed."
––– output –––
Data processing completed.
––– input –––
docker exec manticore mysql -h0 -P9306 -e "ALTER MATERIALIZED VIEW view_table_alter suspended=1;"; echo $?
––– output –––
0
––– input –––
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
––– output –––
+----------+
| count(*) |
+----------+
|       50 |
+----------+
––– input –––
sleep 10; docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
––– output –––
+----------+
| count(*) |
+----------+
|       50 |
+----------+
––– input –––
docker exec manticore mysql -h0 -P9306 -e "ALTER MATERIALIZED VIEW view_table_alter suspended=0;"; echo $?
––– output –––
0
––– input –––
timeout 120 bash -c 'while [[ $(docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;" | grep -o "[0-9]*") -ne 57 ]]; do sleep 1; done && echo "Data processing completed."'
––– output –––
Data processing completed.
––– input –––
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
––– output –––
+----------+
| count(*) |
+----------+
|       57 |
+----------+

Current result:

––– input –––
docker exec kafka ./import.sh; echo $?
––– output –––
0
––– input –––
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m1 "REPLACE+INTO+destination_kafka_alter" > /dev/null && echo "Data processing completed."' || echo "Data processing failed."
––– output –––
Data processing completed.
––– input –––
docker exec manticore mysql -h0 -P9306 -e "ALTER MATERIALIZED VIEW view_table_alter suspended=1;"; echo $?
––– output –––
0
––– input –––
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
––– output –––
+----------+
| count(*) |
+----------+
|       50 |
+----------+
––– input –––
sleep 10; docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
––– output –––
+----------+
| count(*) |
+----------+
|       57 |
+----------+
––– input –––
docker exec manticore mysql -h0 -P9306 -e "ALTER MATERIALIZED VIEW view_table_alter suspended=0;"; echo $?
––– output –––
0
––– input –––
timeout 120 bash -c 'while [[ $(docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;" | grep -o "[0-9]*") -ne 57 ]]; do sleep 1; done && echo "Data processing completed."'
––– output –––
Data processing completed.
––– input –––
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
––– output –––
+----------+
| count(*) |
+----------+
|       57 |
+----------+

Test data: dump.json

The test start command:

RUN_ARGS='--privileged -v ".:/docker"' /Users/pavelshilin/Desktop/WORK/clt/clt test -t ./test/clt-tests/integrations/kafka/test-integration-kafka-ms.rec -d manticoresearch/dind:v1
djklim87 commented 1 month ago

Can you remove unnecessary code and leave only code that related to issue?

PavelShilin89 commented 1 month ago

@djklim87 this is exactly the code that has to do with the problem and how to reproduce it. If the code is different, or the location of the problem part is changed, the error will not be reproduced. For clarity, the example itself is shown here Current result.

djklim87 commented 1 month ago

Test your MRE before put it into the issue.

  1. You creating table destination_kafka two times. So your results are different from what you write in issue.
  2. You checking in logs for grep -m1 "REPLACE+INTO+destination_kafka" however your table calls destination_kafka_alter so you should get results as provided below
    Data processing completed.
    0
    +----------+
    | count(*) |
    +----------+
    |        0 |
    +----------+
    +----------+
    | count(*) |
    +----------+
    |        0 |
    +----------+
    0
    Data processing completed.
    +----------+
    | count(*) |
    +----------+
    |       57 |
    +----------+
PavelShilin89 commented 1 month ago

@djklim87 MRE updated

djklim87 commented 1 month ago

Reproducible MRE

Run dind

docker run -it --privileged --name dind manticoresearch/dind:v1

All commands

docker exec -it dind bash

cd /tmp
wget https://raw.githubusercontent.com/manticoresoftware/manticoresearch/refs/heads/test/native-support-for-message-queues/test/clt-tests/integrations/kafka/import.sh
wget https://raw.githubusercontent.com/manticoresoftware/manticoresearch/refs/heads/test/native-support-for-message-queues/test/clt-tests/integrations/kafka/dump.json
chmod +x ./import.sh

docker network create app-network --driver bridge > /dev/null; echo $?
docker run -it -e EXTRA=1 --network=app-network --platform linux/amd64 --name manticore -d ghcr.io/manticoresoftware/manticoresearch:test-kit-latest bash ; echo $?
docker exec manticore sed -i '/data_dir = \/var\/lib\/manticore/a\    buddy_path = manticore-executor -n /usr/share/manticore/modules/manticore-buddy/src/main.php --debugv\n' /etc/manticoresearch/manticore.conf
docker exec manticore searchd
docker run -it -d -e EXTRA=1 --network=app-network --name kafka -v /tmp/import.sh:/import.sh -v /tmp/dump.json:/tmp/dump.json -e KAFKA_CFG_NODE_ID=0 -e KAFKA_CFG_PROCESS_ROLES=controller,broker -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT bitnami/kafka:3.7.0 ; echo $?
docker exec kafka kafka-topics.sh --create --topic my-data --partitions 4 --bootstrap-server localhost:9092 2>&1 | grep -o 'Created topic my-data\.' | head -n 1

docker exec manticore mysql -h0 -P9306 -e "CREATE SOURCE kafka (id bigint, term text, abbrev text, GlossDef json) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore' num_consumers='1' batch=50;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE TABLE destination_kafka (id bigint, name text, short_name text, received_at text, size multi);"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE MATERIALIZED VIEW view_table TO destination_kafka AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size FROM kafka"; echo $?
docker exec kafka ./import.sh; echo $?
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m2 "REPLACE+INTO+destination_kafka" > /dev/null && echo "Data processing completed."' || echo "Data processing failed."
docker exec manticore mysql -h0 -P9306 -e "DROP SOURCE kafka;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "DROP table destination_kafka;"; echo $?

echo "----------------------------------------------- RUN ------------------------------------------"
docker exec kafka kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group manticore_alter --reset-offsets --to-latest --topic my-data --execute > /dev/null; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE SOURCE kafka_alter (id bigint, term text, abbrev text, GlossDef json, metadata json) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore_alter' num_consumers='1' batch=50;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE TABLE destination_kafka_alter (id bigint, name text, short_name text, received_at text, size multi, views bigint);"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE MATERIALIZED VIEW view_table_alter TO destination_kafka_alter AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size, metadata.views as views FROM kafka_alter;"; echo $?
docker exec kafka ./import.sh; echo $?
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m1 "REPLACE+INTO+destination_kafka_alter" > /dev/null && echo "Data processing completed."' || echo "Data processing failed."
docker exec manticore mysql -h0 -P9306 -e "ALTER MATERIALIZED VIEW view_table_alter suspended=1;"; echo $?

echo "----------------------------------------------- Should be 50 ------------------------------------------"
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
echo "----------------------------------------------- Should be 50 ------------------------------------------"
sleep 10; docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
docker exec manticore mysql -h0 -P9306 -e "ALTER MATERIALIZED VIEW view_table_alter suspended=0;"; echo $?
echo "----------------------------------------------- Should be 57 ------------------------------------------"
timeout 120 bash -c 'while [[ $(docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;" | grep -o "[0-9]*") -ne 57 ]]; do sleep 1; done && echo "Data processing completed."'
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
djklim87 commented 1 month ago

Blocked by https://github.com/manticoresoftware/manticoresearch-buddy/issues/360

sanikolaev commented 4 days ago

Blocked by https://github.com/manticoresoftware/manticoresearch-buddy/issues/360

That's done.

djklim87 commented 3 days ago

Fixed in https://github.com/manticoresoftware/manticoresearch-buddy/commit/72d1c2e940acc75395afd62589af2fecce7cafae