faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.64k stars 182 forks source link

GlobalTable not working when using multiple instances #283

Open Hamdiovish opened 2 years ago

Hamdiovish commented 2 years ago

Checklist

Steps to reproduce

Running the following app in 2 instances, then by pushing a message to one partition, the instance listening to the related partition detect the message and update the GlobalTable, whilst the other instance will not update its version of the GlobalTable and keep showing nothing.

import faust
from faust.cli import option

app = faust.App(
    'hello-world',
    broker='kafka://localhost:29092',
    key_serializer='raw',
    value_serializer='raw',
    store="rocksdb://", 
    topic_disable_leader=True,
    )

greetings_topic = app.topic('greetings-topic',partitions=2,key_type=str, value_type=str,internal=True)
greetings_table = app.GlobalTable('greetings-table',partitions=2,key_type=str, value_type=str,default=str)

@app.agent(greetings_topic)
async def greet(greetings):
    """run: faust -A app worker"""
    async for event in greetings.events():
        k = event.key
        v = event.value
        print(f"processing: {k}: {v} on partition: {event.message}")
        greetings_table[k]=v

@app.command(
    option('--k', type=str, default='a',help='The key.'),
    option('--v', type=str, default='0',help='The value.'))
async def sim_greeting(self, k, v, p):
    """simulate: faust -A app01 sim-greeting --k a --v 0"""
    await greetings_topic.send(key=k,value=v)

@app.timer(interval=1.0)
async def plot(app):
    for k in greetings_table:
        print(f'[{k}:{greetings_table[k]}]')

Expected behavior

All instances should detect GlobalTable updates.

Actual behavior

The GlobalTable is not synced across instances.

Versions

Roman1us commented 2 years ago

Please check partitioning (what instance consume partition) and rocksdb storage use different paths on instances

Hamdiovish commented 2 years ago

@Roman1us regarding paths, i’m using different —datadir and —web-port when running the instances.

For partitions consumption, im using the topic “greetings_topic” as a source topic to write into the GlobalTable, and every instance is assigned to a different partition of the “greetings_topic”.

The problem is that instances can’t consume/write data from/into the GlobalTable but in the exact same partition of the source event, as if it is a local “Table”.

The only difference with “GlobalTable” is that at startup, the recovery is done based on all partitions, but once recovery done, every instances will consume/update the GlobalTable only in the assigned partition of the “greetings_topic”.

brennerd11 commented 2 years ago

We observed the same issue with GlobalTables on multiple workers. Essentially they do not work currently :-(

dario-collavini commented 2 years ago

Same here, we are running 5 worker instances that should take advantage of the same GlobalTable but they actually see it just as a LocalTable. It's a shame since we need to workaround by adding a lot of boilerplate code just to mock the GlobalTable behaviour.

Hamdiovish commented 2 years ago

Hi @dario-collavini, what’s the approach you used to mock the GlobalTable?

Roman1us commented 2 years ago

@Hamdiovish @dario-collavini how many partitions on GlobalTable topic you have?

Hamdiovish commented 2 years ago

@Roman1us as mentioned in « steps to reproduce » section, the global table of my example has 2 partitions.

Roman1us commented 2 years ago

@Hamdiovish so, you need same table data across all workers, right? Maybe set topic to 1 partition and use_partitioner=True in table config help you?

Hamdiovish commented 2 years ago

@Roman1us thank you for the prompt reply, I've updated the global table declaration in line 14 of my example above, added use_partitioner=True and updated the partitions=1, as following:

greetings_table = app.GlobalTable('greetings-table',partitions=1,key_type=str, value_type=str,default=str,use_partitioner=True)

The wrong behavior still happening, at initialization, the recovery occurs as expected and the two instances are showing the same content of the global table. But later at runtime, when the global table is being updated by the instance_1, the other instance_2 didn't detect the recent update and kept showing the old data. Here is the output of an example printing the content of the global table, based on the code above:

instance_0:

[2022-03-30 19:58:07,736] [4551] [WARNING] [EURUSD:2] 
[2022-03-30 19:58:07,737] [4551] [WARNING] [EURCHF:2] 

instance_1:

[2022-03-30 19:58:01,407] [4535] [WARNING] [EURUSD:2] 
[2022-03-30 19:58:01,408] [4535] [WARNING] [EURCHF:2] 

instance_0:

[2022-03-30 20:01:07,768] [4551] [WARNING] [EURUSD:2] 
[2022-03-30 20:01:07,770] [4551] [WARNING] [EURCHF:2] 

instance_1:

[2022-03-30 20:01:01,442] [4535] [WARNING] [EURUSD:3] 
[2022-03-30 20:01:01,442] [4535] [WARNING] [EURCHF:2] 

instance_0:

[2022-03-30 20:02:47,787] [4551] [WARNING] [EURUSD:2] 
[2022-03-30 20:02:47,788] [4551] [WARNING] [EURCHF:0] 

instance_1:

[2022-03-30 20:02:51,462] [4535] [WARNING] [EURUSD:3] 
[2022-03-30 20:02:51,462] [4535] [WARNING] [EURCHF:2] 
Roman1us commented 2 years ago

@Hamdiovish show output of kafka-topics.sh with table topic. I think in Kafka you have default (3 maybe) partitions count. Also, please check which consumer consume partition. We are using global tables around a year with partition count 1 and no problems happend

dario-collavini commented 2 years ago

We saw exactly the same issue reported by @Hamdiovish, updates were not available at runtime. We were running like 5 partitions.

In our case the workaround is just using local Tables instead of GlobalTables and ensuring the matching of topic partitions and key structure among agents and tables. We need to be sure that the topic of each worker instance's agent that needs to use that table is partitioned with the same key used to access table data, so that all global data is split into local subset accessible by the corresponding instance, which is granted by the usage of the same partition key.

Hamdiovish commented 2 years ago

Hi @Roman1us, here is the details: The output of kafka-topics-sh:

Topic: hello-world-greetings-table-changelog    TopicId: AiKHgqRmQ2W7Ob5CG-WpqA PartitionCount: 1   ReplicationFactor: 1    Configs: cleanup.policy=compact
    Topic: hello-world-greetings-table-changelog    Partition: 0    Leader: 1   Replicas: 1 Isr: 1

The output of the source topic:

Topic: greetings-topic  TopicId: FbrK1G84Tv-UfVF5K9EPSA PartitionCount: 2   ReplicationFactor: 1    Configs: 
    Topic: greetings-topic  Partition: 0    Leader: 1   Replicas: 1 Isr: 1
    Topic: greetings-topic  Partition: 1    Leader: 1   Replicas: 1 Isr: 1

Regarding Partition assignment: The boot log for instance_0:

(faust) root@zarbia:/workspace/fg/test/faust# faust --datadir=data/02 -A app01 worker -l info  --web-port 6061
┌ƒaµS† v0.8.4─┬──────────────────────────────────────────┐
│ id          │ hello-world                              │
│ transport   │ [URL('kafka://localhost:29092')]         │
│ store       │ rocksdb:                                 │
│ web         │ http://localhost:6061/                   │
│ log         │ -stderr- (info)                          │
│ pid         │ 21375                                    │
│ hostname    │ zarbia                                   │
│ platform    │ CPython 3.7.12 (Linux x86_64)            │
│ drivers     │                                          │
│   transport │ aiokafka=0.7.2                           │
│   web       │ aiohttp=3.8.1                            │
│ datadir     │ /workspace/fg/test/faust/data/02         │
│ appdir      │ /workspace/fg/test/faust/data/02/v1      │
└─────────────┴──────────────────────────────────────────┘
[2022-03-31 14:19:54,286] [21375] [INFO] [^Worker]: Starting... 
[2022-03-31 14:19:54,288] [21375] [INFO] [^-App]: Starting... 
[2022-03-31 14:19:54,288] [21375] [INFO] [^--Monitor]: Starting... 
[2022-03-31 14:19:54,288] [21375] [INFO] [^--Producer]: Starting... 
[2022-03-31 14:19:54,288] [21375] [INFO] [^---ProducerBuffer]: Starting... 
[2022-03-31 14:19:54,301] [21375] [INFO] [^--CacheBackend]: Starting... 
[2022-03-31 14:19:54,301] [21375] [INFO] [^--Web]: Starting... 
[2022-03-31 14:19:54,301] [21375] [INFO] [^---Server]: Starting... 
[2022-03-31 14:19:54,302] [21375] [INFO] [^--Consumer]: Starting... 
[2022-03-31 14:19:54,303] [21375] [INFO] [^---AIOKafkaConsumerThread]: Starting... 
[2022-03-31 14:19:54,321] [21375] [INFO] [^--LeaderAssignor]: Starting... 
[2022-03-31 14:19:54,321] [21375] [INFO] [^--ReplyConsumer]: Starting... 
[2022-03-31 14:19:54,321] [21375] [INFO] [^--AgentManager]: Starting... 
[2022-03-31 14:19:54,321] [21375] [INFO] [^---Agent: app01.greet]: Starting... 
[2022-03-31 14:19:54,323] [21375] [INFO] [^----OneForOneSupervisor: (1@0x7f407806e190)]: Starting... 
[2022-03-31 14:19:54,323] [21375] [INFO] [^---Conductor]: Starting... 
[2022-03-31 14:19:54,324] [21375] [INFO] [^--TableManager]: Starting... 
[2022-03-31 14:19:54,324] [21375] [INFO] [^---Conductor]: Waiting for agents to start... 
[2022-03-31 14:19:54,325] [21375] [INFO] [^---Conductor]: Waiting for tables to be registered... 
[2022-03-31 14:19:55,325] [21375] [INFO] [^---GlobalTable: greetings-table]: Starting... 
[2022-03-31 14:19:55,338] [21375] [INFO] [^----Store: rocksdb:greetings-table]: Starting... 
[2022-03-31 14:19:55,338] [21375] [INFO] [^--Producer]: Creating topic 'hello-world-greetings-table-changelog' 
[2022-03-31 14:19:55,345] [21375] [INFO] [^---Recovery]: Starting... 
[2022-03-31 14:19:55,346] [21375] [INFO] [^--Producer]: Creating topic 'hello-world-greetings-table-changelog' 
[2022-03-31 14:19:55,350] [21375] [INFO] Updating subscribed topics to: 
┌Requested Subscription─────────────────┐
│ topic name                            │
├───────────────────────────────────────┤
│ greetings-topic                       │
│ hello-world-greetings-table-changelog │
└───────────────────────────────────────┘ 
[2022-03-31 14:19:55,351] [21375] [INFO] Subscribed to topic(s): 
┌Final Subscription─────────────────────┐
│ topic name                            │
├───────────────────────────────────────┤
│ greetings-topic                       │
│ hello-world-greetings-table-changelog │
└───────────────────────────────────────┘ 
[2022-03-31 14:19:55,358] [21375] [INFO] Discovered coordinator 1 for group hello-world 
[2022-03-31 14:19:55,358] [21375] [INFO] Revoking previously assigned partitions set() for group hello-world 
[2022-03-31 14:19:55,359] [21375] [INFO] (Re-)joining group hello-world 
[2022-03-31 14:19:58,284] [21375] [INFO] Joined group 'hello-world' (generation 32) with member_id faust-0.8.4-d07053dd-c146-4268-a760-be83c23c9d82 
[2022-03-31 14:19:58,289] [21375] [INFO] Successfully synced group hello-world with generation 32 
[2022-03-31 14:19:58,290] [21375] [INFO] Setting newly assigned partitions 
┌Topic Partition Set────────────────────┬────────────┐
│ topic                                 │ partitions │
├───────────────────────────────────────┼────────────┤
│ greetings-topic                       │ {0}        │
│ hello-world-greetings-table-changelog │ {0}        │
└───────────────────────────────────────┴────────────┘ for group hello-world 
[2022-03-31 14:19:58,292] [21375] [INFO] Executing _on_partitions_assigned 
[2022-03-31 14:19:58,320] [21375] [INFO] generation id 32 app consumers id 32 
[2022-03-31 14:19:59,927] [21375] [INFO] [^---Recovery]: Highwater for active changelog partitions:
┌Highwater - Active─────────────────────┬───────────┬───────────┐
│ topic                                 │ partition │ highwater │
├───────────────────────────────────────┼───────────┼───────────┤
│ hello-world-greetings-table-changelog │ 0         │ 11        │
└───────────────────────────────────────┴───────────┴───────────┘ 
[2022-03-31 14:20:01,428] [21375] [INFO] [^---Recovery]: active offsets at start of reading:
┌Reading Starts At - Active─────────────┬───────────┬────────┐
│ topic                                 │ partition │ offset │
├───────────────────────────────────────┼───────────┼────────┤
│ hello-world-greetings-table-changelog │ 0         │ -1     │
└───────────────────────────────────────┴───────────┴────────┘ 
[2022-03-31 14:20:02,930] [21375] [INFO] [^---Recovery]: standby offsets at start of reading:
┌Reading Starts At - Standby────────────┬───────────┬────────┐
│ topic                                 │ partition │ offset │
├───────────────────────────────────────┼───────────┼────────┤
│ hello-world-greetings-table-changelog │ 0         │ -1     │
└───────────────────────────────────────┴───────────┴────────┘ 
[2022-03-31 14:20:02,932] [21375] [INFO] [^---Recovery]: Restoring state from changelog topics... 
[2022-03-31 14:20:02,932] [21375] [INFO] [^---Recovery]: Resuming flow... 
[2022-03-31 14:20:02,933] [21375] [INFO] [^---Fetcher]: Starting... 
[2022-03-31 14:20:04,508] [21375] [INFO] [^---Recovery]: Done reading from changelog topics 
[2022-03-31 14:20:04,509] [21375] [INFO] [^---Recovery]: Recovery complete 
[2022-03-31 14:20:04,509] [21375] [INFO] [^---Recovery]: Starting standby partitions... 
[2022-03-31 14:20:05,937] [21375] [INFO] [^---Recovery]: Highwater for standby changelog partitions:
┌Highwater - Standby────────────────────┬───────────┬───────────┐
│ topic                                 │ partition │ highwater │
├───────────────────────────────────────┼───────────┼───────────┤
│ hello-world-greetings-table-changelog │ 0         │ 11        │
└───────────────────────────────────────┴───────────┴───────────┘ 
[2022-03-31 14:20:05,938] [21375] [INFO] [^---Recovery]: Restore complete! 
[2022-03-31 14:20:05,939] [21375] [INFO] [^---Recovery]: Seek stream partitions to committed offsets. 
[2022-03-31 14:20:05,942] [21375] [INFO] [^---Recovery]: Worker ready 
[2022-03-31 14:20:05,943] [21375] [INFO] [^Worker]: Ready 
[2022-03-31 14:20:15,946] [21375] [WARNING] [EURUSD:3] 
**[2022-03-31 14:20:15,946] [21375] [WARNING] [EURCHF:1]** 
**[2022-03-31 14:20:23,054] [21375] [WARNING] processing: EURCHF: 2 on partition: <ConsumerMessage: TopicPartition(topic='greetings-topic', partition=0) offset=8>** 
[2022-03-31 14:20:25,946] [21375] [WARNING] [EURUSD:3] 
**[2022-03-31 14:20:25,947] [21375] [WARNING] [EURCHF:2]** 

[2022-03-31 14:20:35,950] [21375] [WARNING] [EURUSD:3] 
[2022-03-31 14:20:35,950] [21375] [WARNING] [EURCHF:2] 

The boot log of instance_1:

(faust) root@zarbia:/workspace/fg/test/faust# faust --datadir=data/01 -A app01 worker -l info  --web-port 6060
┌ƒaµS† v0.8.4─┬──────────────────────────────────────────┐
│ id          │ hello-world                              │
│ transport   │ [URL('kafka://localhost:29092')]         │
│ store       │ rocksdb:                                 │
│ web         │ http://localhost:6060/                   │
│ log         │ -stderr- (info)                          │
│ pid         │ 21310                                    │
│ hostname    │ zarbia                                   │
│ platform    │ CPython 3.7.12 (Linux x86_64)            │
│ drivers     │                                          │
│   transport │ aiokafka=0.7.2                           │
│   web       │ aiohttp=3.8.1                            │
│ datadir     │ /workspace/fg/test/faust/data/01         │
│ appdir      │ /workspace/fg/test/faust/data/01/v1      │
└─────────────┴──────────────────────────────────────────┘
[2022-03-31 14:19:50,804] [21310] [INFO] [^Worker]: Starting... 
[2022-03-31 14:19:50,806] [21310] [INFO] [^-App]: Starting... 
[2022-03-31 14:19:50,807] [21310] [INFO] [^--Monitor]: Starting... 
[2022-03-31 14:19:50,807] [21310] [INFO] [^--Producer]: Starting... 
[2022-03-31 14:19:50,807] [21310] [INFO] [^---ProducerBuffer]: Starting... 
[2022-03-31 14:19:50,819] [21310] [INFO] [^--CacheBackend]: Starting... 
[2022-03-31 14:19:50,819] [21310] [INFO] [^--Web]: Starting... 
[2022-03-31 14:19:50,819] [21310] [INFO] [^---Server]: Starting... 
[2022-03-31 14:19:50,819] [21310] [INFO] [^--Consumer]: Starting... 
[2022-03-31 14:19:50,820] [21310] [INFO] [^---AIOKafkaConsumerThread]: Starting... 
[2022-03-31 14:19:50,836] [21310] [INFO] [^--LeaderAssignor]: Starting... 
[2022-03-31 14:19:50,837] [21310] [INFO] [^--ReplyConsumer]: Starting... 
[2022-03-31 14:19:50,837] [21310] [INFO] [^--AgentManager]: Starting... 
[2022-03-31 14:19:50,837] [21310] [INFO] [^---Agent: app01.greet]: Starting... 
[2022-03-31 14:19:50,839] [21310] [INFO] [^----OneForOneSupervisor: (1@0x7efff4120150)]: Starting... 
[2022-03-31 14:19:50,839] [21310] [INFO] [^---Conductor]: Starting... 
[2022-03-31 14:19:50,839] [21310] [INFO] [^--TableManager]: Starting... 
[2022-03-31 14:19:50,840] [21310] [INFO] [^---Conductor]: Waiting for agents to start... 
[2022-03-31 14:19:50,840] [21310] [INFO] [^---Conductor]: Waiting for tables to be registered... 
[2022-03-31 14:19:51,840] [21310] [INFO] [^---GlobalTable: greetings-table]: Starting... 
[2022-03-31 14:19:51,853] [21310] [INFO] [^----Store: rocksdb:greetings-table]: Starting... 
[2022-03-31 14:19:51,853] [21310] [INFO] [^--Producer]: Creating topic 'hello-world-greetings-table-changelog' 
[2022-03-31 14:19:51,860] [21310] [INFO] [^---Recovery]: Starting... 
[2022-03-31 14:19:51,861] [21310] [INFO] [^--Producer]: Creating topic 'hello-world-greetings-table-changelog' 
[2022-03-31 14:19:51,864] [21310] [INFO] Updating subscribed topics to: 
┌Requested Subscription─────────────────┐
│ topic name                            │
├───────────────────────────────────────┤
│ greetings-topic                       │
│ hello-world-greetings-table-changelog │
└───────────────────────────────────────┘ 
[2022-03-31 14:19:51,865] [21310] [INFO] Subscribed to topic(s): 
┌Final Subscription─────────────────────┐
│ topic name                            │
├───────────────────────────────────────┤
│ greetings-topic                       │
│ hello-world-greetings-table-changelog │
└───────────────────────────────────────┘ 
[2022-03-31 14:19:51,872] [21310] [INFO] Discovered coordinator 1 for group hello-world 
[2022-03-31 14:19:51,872] [21310] [INFO] Revoking previously assigned partitions set() for group hello-world 
[2022-03-31 14:19:51,874] [21310] [INFO] (Re-)joining group hello-world 
[2022-03-31 14:19:51,977] [21310] [INFO] Joined group 'hello-world' (generation 31) with member_id faust-0.8.4-ee0a42ea-0c88-4279-bd87-69e6cb5368c9 
[2022-03-31 14:19:51,977] [21310] [INFO] Elected group leader -- performing partition assignments using faust 
[2022-03-31 14:19:51,982] [21310] [INFO] Successfully synced group hello-world with generation 31 
[2022-03-31 14:19:51,983] [21310] [INFO] Setting newly assigned partitions 
┌Topic Partition Set────────────────────┬────────────┐
│ topic                                 │ partitions │
├───────────────────────────────────────┼────────────┤
│ greetings-topic                       │ {0-1}      │
│ hello-world-greetings-table-changelog │ {0}        │
└───────────────────────────────────────┴────────────┘ for group hello-world 
[2022-03-31 14:19:51,985] [21310] [INFO] Executing _on_partitions_assigned 
[2022-03-31 14:19:52,008] [21310] [INFO] opening partition 0 for gen id 31 app id 31 
[2022-03-31 14:19:52,271] [21310] [INFO] generation id 31 app consumers id 31 
[2022-03-31 14:19:53,616] [21310] [INFO] [^---Recovery]: Highwater for active changelog partitions:
┌Highwater - Active─────────────────────┬───────────┬───────────┐
│ topic                                 │ partition │ highwater │
├───────────────────────────────────────┼───────────┼───────────┤
│ hello-world-greetings-table-changelog │ 0         │ 11        │
└───────────────────────────────────────┴───────────┴───────────┘ 
[2022-03-31 14:19:55,117] [21310] [INFO] [^---Recovery]: active offsets at start of reading:
┌Reading Starts At - Active─────────────┬───────────┬────────┐
│ topic                                 │ partition │ offset │
├───────────────────────────────────────┼───────────┼────────┤
│ hello-world-greetings-table-changelog │ 0         │ -1     │
└───────────────────────────────────────┴───────────┴────────┘ 
[2022-03-31 14:19:55,121] [21310] [INFO] [^---Recovery]: Restoring state from changelog topics... 
[2022-03-31 14:19:55,121] [21310] [INFO] [^---Recovery]: Resuming flow... 
[2022-03-31 14:19:55,122] [21310] [INFO] [^---Fetcher]: Starting... 
[2022-03-31 14:19:56,627] [21310] [INFO] [^---Recovery]: Done reading from changelog topics 
[2022-03-31 14:19:56,627] [21310] [INFO] [^---Recovery]: Recovery complete 
[2022-03-31 14:19:56,628] [21310] [INFO] [^---Recovery]: Restore complete! 
[2022-03-31 14:19:56,628] [21310] [INFO] [^---Recovery]: Seek stream partitions to committed offsets. 
[2022-03-31 14:19:56,632] [21310] [INFO] [^---Recovery]: Worker ready 
[2022-03-31 14:19:56,633] [21310] [INFO] [^Worker]: Ready 

[2022-03-31 14:19:58,277] [21310] [WARNING] Heartbeat failed for group hello-world because it is rebalancing 
[2022-03-31 14:19:58,278] [21310] [INFO] Revoking previously assigned partitions 
┌Topic Partition Set────────────────────┬────────────┐
│ topic                                 │ partitions │
├───────────────────────────────────────┼────────────┤
│ greetings-topic                       │ {0-1}      │
│ hello-world-greetings-table-changelog │ {0}        │
└───────────────────────────────────────┴────────────┘ for group hello-world 
[2022-03-31 14:19:58,281] [21310] [INFO] (Re-)joining group hello-world 
[2022-03-31 14:19:58,284] [21310] [INFO] Joined group 'hello-world' (generation 32) with member_id faust-0.8.4-ee0a42ea-0c88-4279-bd87-69e6cb5368c9 
[2022-03-31 14:19:58,284] [21310] [INFO] Elected group leader -- performing partition assignments using faust 
[2022-03-31 14:19:58,289] [21310] [INFO] Successfully synced group hello-world with generation 32 
[2022-03-31 14:19:58,290] [21310] [INFO] Setting newly assigned partitions 
┌Topic Partition Set────────────────────┬────────────┐
│ topic                                 │ partitions │
├───────────────────────────────────────┼────────────┤
│ greetings-topic                       │ {1}        │
│ hello-world-greetings-table-changelog │ {0}        │
└───────────────────────────────────────┴────────────┘ for group hello-world 
[2022-03-31 14:19:58,292] [21310] [INFO] Executing _on_partitions_assigned 
[2022-03-31 14:19:58,321] [21310] [INFO] generation id 32 app consumers id 32 
[2022-03-31 14:19:59,626] [21310] [INFO] [^---Recovery]: Highwater for active changelog partitions:
┌Highwater - Active─────────────────────┬───────────┬───────────┐
│ topic                                 │ partition │ highwater │
├───────────────────────────────────────┼───────────┼───────────┤
│ hello-world-greetings-table-changelog │ 0         │ 11        │
└───────────────────────────────────────┴───────────┴───────────┘ 
[2022-03-31 14:19:59,630] [21310] [INFO] [^---Recovery]: active offsets at start of reading:
┌Reading Starts At - Active─────────────┬───────────┬────────┐
│ topic                                 │ partition │ offset │
├───────────────────────────────────────┼───────────┼────────┤
│ hello-world-greetings-table-changelog │ 0         │ 11     │
└───────────────────────────────────────┴───────────┴────────┘ 
[2022-03-31 14:19:59,634] [21310] [INFO] [^---Recovery]: Resuming flow... 
[2022-03-31 14:19:59,635] [21310] [INFO] [^---Recovery]: Recovery complete 
[2022-03-31 14:19:59,635] [21310] [INFO] [^---Recovery]: Restore complete! 
[2022-03-31 14:19:59,636] [21310] [INFO] [^---Recovery]: Seek stream partitions to committed offsets. 
[2022-03-31 14:19:59,639] [21310] [INFO] [^---Recovery]: Worker ready 
[2022-03-31 14:20:06,634] [21310] [WARNING] [EURUSD:3] 
[2022-03-31 14:20:06,635] [21310] [WARNING] [EURCHF:1] 
[2022-03-31 14:20:16,636] [21310] [WARNING] [EURUSD:3] 
**[2022-03-31 14:20:16,637] [21310] [WARNING] [EURCHF:1]** 
[2022-03-31 14:20:26,637] [21310] [WARNING] [EURUSD:3] 
**[2022-03-31 14:20:26,638] [21310] [WARNING] [EURCHF:1]** 

[2022-03-31 14:20:36,641] [21310] [WARNING] [EURUSD:3] 
[2022-03-31 14:20:36,641] [21310] [WARNING] [EURCHF:1] 

Just a quick note, Global Table works pretty well when there is only one instance, but the problems will start once another instance is added. @Roman1us do you have any minimal working example for GlobalTable with multiple instances? or feel free to run the script above to trigger the issue on your ends. Thank you!

Roman1us commented 2 years ago

@Hamdiovish okay, i'll try to reproduce this and maybe debug this behavior

lorinmetzger commented 2 years ago

I have experienced a similar problem, with a global table that has 1 partition changelog topic, and multiple workers

I've narrowed my problem down to the fact that at least one worker out of a group of workers sharing a global table will be assigned only "active" changelog topic partitions, and not standbys, all the other workers will get active and standbys assigned to them here

For the worker that has "actives" only when the _restart_recovery runs in recovery.py it pauses all active partitions after the initial recovery. Which it should do if the "active" partitions are also in the standby set, because the standby partition set gets resumed here but when the active partitions are not in the standby set nothing ever resumes the changelog topic partition for that worker.

This problem can be masked if the worker which has only "active" partitions assigned is also the worker which is the leader of the faust topic partition being used to modify the table in that case the local modifications keep the table in sync even though the "active" changelog topics were not resumed (but this pairing is not a guarantee). As soon as the leader of the topic updating the table is a different worker than the worker which has only the active changelog topic partitions and not standbys that workers table will not receive update events, and get out of sync.

I'm not sure that this is actually the right place to address this problem, but the workaround I've put in to address this problem is here. Curious if there is a better place to address this?

One last note for anyone not doing high volume table updates, if you are checking to make sure your tables are staying in sync one change at a time, you will want to set recovery_buffer_size=1 to make sure your changes are immediately applied, otherwise your tables will appear out of sync even though the change events are being received and just not applied.

wbarnha commented 2 years ago

Current solution to this problem as of v0.8.6 is to initialize a GlobalTable with the options partitions=1, recovery_buffer_size=1, as indicated above.

tobiasbhn commented 8 months ago

I am also facing this Issue. The mentioned Solution does not work for us, as we need more than 1 partition in order to split the work accross multiple physical nodes. Am I missing something?