faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.6k stars 180 forks source link

AeroSpike state storage doesn't recover from stored data, replays changelogs instead #207

Closed xaralis closed 2 years ago

xaralis commented 2 years ago

Steps to reproduce

Configure faust to use aerospike:// as the table storage engine. Then, make sure some tables actually use that (need to provide AeroSpike instance as the target).

App config:

app = faust.App(
    ...
    store='aerospike://',
    ...
)

Table config:

aerospike_enabled_table = app.Table(
    'mytable',
    default=dict,
    key_type=bytes,
    options={
        'client': {
            'hosts': [('127.0.0.1', 3000)],
        },
    },
)

Expected behavior

When faust boots up, the data recovery uses AeroSpike instance rather than falling back to replaying Kafka changelog topics.

Actual behavior

Faust ignores what's stored in AeroSpike and replays changelogs. The behavior is basically identical to memory:// storage engine which makes me wonder what is the actual purpose of AeroSpike storage engine.

I suspect, the goal with AeroSpike was something else than data recovery. I'm guessing by looking at the code:

https://github.com/faust-streaming/faust/blob/7600f45e2f1beb1e1c1d431c8660c821bd8a7db8/faust/stores/aerospike.py#L232-L237

It seems that no matter what was stored, faust will always think the storage is empty. Is this intentional? Is the AeroSpike engine ready for real use? Shall we create an PR with recovery support?

Thanks!

Versions

patkivikram commented 2 years ago

We are currently not storing the offsets of aerospike in the tables as recovery is not needed for Aerospike as opposed to RocksDb. Aerospike being a managed solution should be able to recover its data using its own replication strategy and probably not from faust. What do you think?

patkivikram commented 2 years ago

The current functionality is that if the store is configured as Aerospike it does not event try to to recover

xaralis commented 2 years ago

@patkivikram I'm not sure I follow. From what I saw it looks like faust will replay the changelogs completely when the persisted offset resolves to None. Data recovery would work the way you describe if persisted offset was actually returned which is not the case.

I'm speaking about the situation when workers (we have a pool of dozens) get restarted (e.g. redeployed or scaled up).

patkivikram commented 2 years ago

If the store is aerospike it should not attempt to recover based on https://github.com/faust-streaming/faust/blob/master/faust/tables/recovery.py#L369 This needs to be config driven though

xaralis commented 2 years ago

Uh-uh! I see. Well, that's good news. It kinda looks like the changelog recovery was still happening but one only sees the summary (with -1 offset) and not the real changelog replay. This would really be worth mentioning in the docs.

Thanks. I guess we can close this now then.

xaralis commented 2 years ago

@patkivikram Maybe just one question - is it OK to run on AeroSpike CE or EE is required?

patkivikram commented 2 years ago

CE is fine