faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.65k stars 183 forks source link

Tables don't accept value_serializer kw #556

Open fonty422 opened 1 year ago

fonty422 commented 1 year ago

Steps to reproduce

table = app.Table('test-table', key_type=str, value_serializer='my_serializer', default=SomeModel)

Expected behavior

According to the docs we should be able to specify our own table serializer.

Actual behavior

TypeError: Service.__init__() got an unexpected keyword argument 'value_serializer'

Full traceback

Traceback (most recent call last):
  File "C:\Users\debooyj\AppData\Local\Programs\Python\Python310\lib\runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "C:\Users\debooyj\AppData\Local\Programs\Python\Python310\lib\runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "C:\Users\debooyj\AppData\Local\Programs\Python\Python310\Scripts\faust.exe\__main__.py", line 7, in <module>
  File "C:\Users\debooyj\AppData\Local\Programs\Python\Python310\lib\site-packages\click\core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "C:\Users\debooyj\AppData\Local\Programs\Python\Python310\lib\site-packages\click\core.py", line 1054, in main
    with self.make_context(prog_name, args, **extra) as ctx:
  File "C:\Users\debooyj\AppData\Local\Programs\Python\Python310\lib\site-packages\faust\cli\base.py", line 423, in make_context
    self._maybe_import_app()
  File "C:\Users\debooyj\AppData\Local\Programs\Python\Python310\lib\site-packages\faust\cli\base.py", line 388, in _maybe_import_app
    find_app(appstr)
  File "C:\Users\debooyj\AppData\Local\Programs\Python\Python310\lib\site-packages\faust\cli\base.py", line 312, in find_app
    val = symbol_by_name(app, imp=imp)
  File "C:\Users\debooyj\AppData\Local\Programs\Python\Python310\lib\site-packages\mode\utils\imports.py", line 268, in symbol_by_name
    module = imp(  # type: ignore
  File "C:\Users\debooyj\AppData\Local\Programs\Python\Python310\lib\site-packages\mode\utils\imports.py", line 381, in import_from_cwd
    return imp(module, package=package)
  File "C:\Users\debooyj\AppData\Local\Programs\Python\Python310\lib\importlib\__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1006, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 688, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 883, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "C:\Users\debooyj\Documents\scripts\faust-epics-stress-testing\kafka\python\Tests\consumer.py", line 14, in <module>
    table = app.Table('test-table', key_type=str, value_serializer='raw_pydantic', default=AlarmUpdate)
  File "C:\Users\debooyj\AppData\Local\Programs\Python\Python310\lib\site-packages\faust\app\base.py", line 1168, in Table
    self.conf.Table(  # type: ignore
  File "C:\Users\debooyj\AppData\Local\Programs\Python\Python310\lib\site-packages\faust\tables\base.py", line 128, in __init__
    Service.__init__(self, loop=app.loop, **kwargs)
TypeError: Service.__init__() got an unexpected keyword argument 'value_serializer'
fonty422 commented 1 year ago

This might be solvable purely through using a schema instead. However, this does require setting the schema to a class, which is then called each time a new message comes in. An example of this is in the docs already, but I wouldn't have thought this would really be required as the value_serializer works just fine for topics, but not for tables. It's also a moderately clunky and ugly bit of code.

I'm ok with this not being fixed as there is a workable solution, but it would be nicer to just set up a codec and use that rather than set up the more ugly and unreadable schema class.

dada-engineer commented 1 year ago

Can you elaborate what your usecase is? It seems like currently the serializers are guessed from typ, which can be either bytes, str, faust.Record, or faust.ModelT custom class. What is 'my_serializer' here?

The allowed types make sense as we need to ensure we can store it in the table. The serializer would currently be raw or json or any option set on the Record class.

The docs should be fixed at least! Is it enough for you to correct the docs and use key_type instead?

fonty422 commented 1 year ago

Happy to elaborate. The issue arose when we had a discussion on the Slack channel regarding nesting Faust Records and that it appears to serialize them ok to send, but when it tries to deserialize them when consuming it only deserializes the top level and leaves the rest as a dictionary. We thought about using Pydantic instead as the initial benefit was that it serialized nicely to a JSON string and deserialized the whole JSON string to the full model - even nested models - automatically, but it needed to be manually told to perform this action. This was almost as painful and if you needed to compare the new event with some state stored in a table with a default value it was a nightmare - the code became very ugly and hard to read mostly due to all the if/else for type checking and deserializing. We spent a little bit of time writing two ser/des codecs - one that works for Faust Records where fields might be nested and another to work for Pydantic models instead of Faust Records (Pydantic does automatic field checking, which might be nice to keep) and tested them by first registering the codec and then setting up a topic with the value serializer set to the codec (e.g. value_serializer='my_serializer' where my_serializer is the name of the registered codec), then sending and receiving messages and showing they automatically serialized and deserialized the event value. This worked, but we hadn't tested with tables. The docs say you can add a key and value serializer for tables, but when we tried this it returned an error saying it received an unexpected keyword argument 'value_serializer'. So at the very least, the docs need to be changed so that value_serializer is not in the example.

We did find a workaround where we could use a schema. But the basic schema like the one below does not work:

# Copied from Schemas section of Faust Docs
schema = faust.Schema(
    key_type=Point,
    value_type=Point,
    key_serializer='json',
    value_serializer='json',
)

topic = app.topic('mytopic', schema=schema)

Again, this works for topics, but not for tables. The only way to get a schema to work with tables is to create a class that inherits from faust.Schema (as in the Autodetect example in the same docs section) and use that as the schema.

So the solution is that to use the same ser/des in a topic and table you should create a ser/des class to handle that and instead of setting a value or key serializer, you should set a schema class to keep everything consistent. We should update the docs to remove reference to value_serializer for tables and replace with reference to a schema class, and talk about a workaround for nested records. The new docs should state that the only acceptable serialized outputs need to be table acceptable (which I guess is JSON or raw).

Faust already has the built-in options to override the default behaviour, it's just not clear and in some cases is wrong.

Here's a the ser/des schema we wrote for Pydantic models which allows you to just use the codec, or the schema if you need it for tables too:

#pydantic_models.py
class pydantic_ser_des(codecs.Codec):

    def _dumps(self, obj: Any) -> bytes:
        # print(f"Dumping:\n\tTYPE:\t{type(obj)}\n\tOBJ:\t{obj}")
        _dict = obj.dict()
        _dict["__faust"]={"ns":f"{self.__class__.__name__}.{obj.__class__.__name__}"}
        return want_bytes(json.dumps(_dict))

    def _loads(self, s: bytes) -> Any:
        deserialised=json.loads(s)
        model=base.registry[deserialised["__faust"]["ns"]]
        return model.parse_obj(deserialised)

codecs.register('pydantic_ser_des', pydantic_ser_des())

class Pydantic(Schema):
  def loads_value(self, app, message, *, loads=None, serializer=None):
    deserialised=json.loads(message.value)
    return pydantic_ser_des._loads(None, message.value)

  def dumps_value(self, app: AppT, value: V, *, serializer: CodecArg = None, headers: OpenHeadersArg) -> Tuple[Any, OpenHeadersArg]:
    value = pydantic_ser_des._dumps(None, value)
    return super().dumps_value(app, value, serializer=serializer, headers=headers)

# Now add any pydantic models that need to make use of this

Then to make use of the schema:

from pydantic_models import Pydantic #and any pydantic models in there you want to use.

app = faust.App(
  'my-app',
  broker="kafka://localhost:9092",
  store="memory://"
)

test_topic = app.topic('test', key_type=str, schema=Pydantic())

and the codec for nested Faust Records:

#faust_models.py
class nested_record(codecs.Codec):

    def _dumps(self, obj: Any) -> bytes:
        return want_bytes(_json.dumps(obj))

    def _loads(self, s: bytes) -> Any:
        print(f"Running _loads")
        deserialised = _json.loads(want_str(s))
        for field in deserialised:
          if type(deserialised[field])==dict and "__faust" in deserialised[field]:
            try:
              model=deserialised[field]['__faust']['ns']
              deserialised[field]=base.registry[model].loads(self._dumps(deserialised[field]))
            except Exception as e:
              print(f"Nah. didn't work: {e}")
        return deserialised

codecs.register('nested_record', nested_record())

Then to make use of the codec:

from faust_models import nested_record

app = faust.App(
'my-app',
  broker="kafka://localhost:9092",
  store="memory://"
)

test_topic = app.topic('test', key_type=str, value_serializer="nested_record")

Let me know if you want me to make these changes, or whether you need to look over the solutions first to see if there's something wrong in there or inefficient that makes it a bad idea. If the nested record option is a good idea, it might be worth including it in the build? I think the _dict["__faust"]={"ns":f"{self.__class__.__name__}.{obj.__class__.__name__}"} matches a similar line in the faust\models\record.py where it has payload[self._blessed_key] = {"ns": options.namespace} but perhaps that bit is not needed. I can't remember exactly why I included that but it might be that I was sending from a jupyter notebook and consuming from a python file where the location of the model was different for both so one came through as __main__.<model_name> while the other was pydantic_ser_des.<model_name> so I needed to force the name. I'm far from the most knowledgeable programmer, so it felt like a hack.

dada-engineer commented 1 year ago

Wow thanks for the detailed description. According to the code of faust the serializer type is guessed, and you can specify it on the model you define. The following very simplified example works for me in a testing application. If it works for you as well, I would propose to only change the docs. Please give feedback:

class Bar(faust.Record, serializer="nested_record"):
    baz: str

class Foo(faust.Record, serializer="nested_record"):
    bar: Bar
    num: int

class nested_record(faust.Codec):
    def _dumps(self, obj) -> bytes:
        return b'{"bar": {"baz": "baz"}, "num": 1}'

    def _loads(self, s: bytes):
        return Foo(bar=Bar(baz="baz"), num=1)

faust.serializers.codecs.register("nested_record", nested_record())

table = app.Table("test", value_type=Foo, partitions=1)

@app.agent(topic)
async def some_agent(stream):
    async for key, event in stream.items():
        table[key] = event

The table looks at ModelT._options.serializer to get the serializer used.

Is this working and the docs only suggest wrong behaviour? 🤔

fonty422 commented 1 year ago

Excellent. How did we not think of that simple solution for Faust Records? I'll test that today and report back. But it won't cover other types of object/model, such as Pydantic. I know that the docs say they currently only support Faust Record types, but it can do other types with a bit of massaging.

dada-engineer commented 1 year ago

True, in case of pydantic I am not sure if it is allowed to define an _options attribute that also has an serializer attribute. That way you could peobably trick it to work.

fonty422 commented 1 year ago

So I revisited my code that used Faust.Record and I already had utilised this method. Not sure how I missed that when I wrote the response, and it obviously works. I think we were so focused on using Pydantic that I wrote the code above by hand. So yes, when using faust record types, the best method is to create the codec and set that as the serializer in the record, then utilise the value_type in the topic or table and the rest is done by default. The schema class method is still valid for your own model types outside of faust records, so perhaps there's merit in including something in the docs about how to ensure the codec/schema is used if you want to do something funky like that.

Is there any reason to not modify the default behaviour of faust record dumps methods to take nested records into account? Is it highly unrecommended to use nested records for some reason? Is there a more efficient method to deserialize than we did?

dada-engineer commented 1 year ago

Actually yes there is, it basically works the same as pydantic v2 as well model_dump in pydantic dumps a python dict preserving the types of nested attributes, model_dumps_json on the other hand dumps the json serialized record. You can achieve this by either specifying json as a serializer on the model definition or by calling record.dumps(serializer="json") this produces the following output:

class Foo(faust.Record):
    foo: str

class Bar(faust.Record):
    foo: Foo

bar = Bar(foo=Foo(foo="something"))

bar.dumps()
### {'foo': <Foo: foo='something'>, '__faust': {'ns': '__main__.Bar'}}  # dict of python types

bar.dumps(serializer="json")
### b'{"foo":{"foo":"something","__faust":{"ns":"__main__.Foo"}},"__faust":{"ns":"__main__.Bar"}}'  # json serialized bytes

EDIT: okay actually pydantic dumps it to all dict not preserving nested model types. I wouldn't change it though

fonty422 commented 1 year ago

Sure, but the issue is that what is sent when it's a faust nested record is not deserialized fully when consumed. The example is:

#basic_records.py

from faust import Record

class Foo(Record):
    foo: str=None

class Bar(Record):
    foo: Foo=Foo()
#producer.py

from basic_records import Foo, Bar

test_topic = app.topic('test', key_type=str, value_type=Bar)

@app.timer(5)
async def send_test():
  await test_topic.send(
    key="some key",
    value=Bar(foo=Foo(foo="something"))
  )
#consumer.py

from basic_records import Foo, Bar

test_topic = app.topic('test', key_type=str, value_type=Bar)
table = app.Table('test-table', key_type=str, default=Bar)

@app.agent(test_topic)
async def processor(stream):
  async for event in stream.events():
    app.logger.warn(event.value)
    table[event.key]=event.value
    app.logger.info(table[event.key])

The output of event.value is <Bar: foo={'foo': 'something', '__faust': {'ns': 'basic_records.Foo'}}>, so you can't reference event.value.foo.foo The output of table[event.key] is <Bar: foo=<Foo: foo='something'>>, so it is fully deserialized there.

fonty422 commented 1 year ago

Hang on... but if you log event.value.foo it results in <Foo: foo='something'>... so it does know how to do it properly but only does that when you call on each nested level. But when you write that to a table and read that out, it appears as a fully deserialized complete model. I think my default is when creating a new stream processor I log the event value to check I'm getting what I expect as a default debug mode and it looks half deserialized so I figured it wasn't working. I'm sure in the past it didn't, but either it was fixed or I was wrong and it does work. Just double check me here and we can close this one.

dada-engineer commented 1 year ago

I am not sure if I know what you mean. I have the following program:

import logging
import faust

logger = logging.getLogger(__name__)

class Bar(faust.Record):
    baz: str

class Foo(faust.Record):
    bar: Bar
    num: int

app = faust.App(
    "myapp",
    broker="kafka://localhost:9092",
    broker_session_timeout=10,
    kafka_enabled=False,
)
topic = app.topic("test", key_type=str, value_type=Foo, partitions=1)
table = app.Table("test", value_type=Foo, partitions=1)

@app.agent(topic)
async def some_agent(stream):
    async for event in stream.events():
        table[event.key] = event.value
        logger.info(f"Received {event.value}")
        logger.info(f"Table {table[event.key]}")
        logger.info(event.value.bar.baz)

@app.timer(interval=1)
async def some_producer():
    await topic.send(key="some", value=Foo(bar=Bar(baz="baz"), num=1))

And get the output in consumer

[2023-10-30 22:27:23,764] [5191] [INFO] Received <Foo: bar=<Bar: baz='baz'>, num=1>
[2023-10-30 22:27:23,764] [5191] [INFO] Table <Foo: bar=<Bar: baz='baz'>, num=1>
[2023-10-30 22:27:23,764] [5191] [INFO] baz

So the log is the same and the attribute is accessible 🤔

Would leave the issue open though to fix documentation as you mentioned we state there it allows to pass value_serializer as argument to Table.

fonty422 commented 1 year ago

That's strange. What if you run those as a separate producer and consumer rather than producing and consuming in the same app? In my case I definitely get <Bar: foo={'foo': 'something', '__faust': {'ns': 'basic_records.Foo'}}> when I log the event.value to the console, but I can somehow still access event.value.foo.foo without error. Why would we both get something different? I was sure in the past that my issue was that in trying to access it that way resulted in an error that indicated that event.value.foo was a dict, not a Foo, which is where this all came from.

dada-engineer commented 1 year ago

Same for me with different files. Using python 3.9.7 and faust-streaming==0.10.16

fonty422 commented 1 year ago

Using Python 3.10.9 and faust-streaming==0.10.16 for me... Although I am doing this on a windows OS machine... surely that's not the differenc. Although I have found different behaviour between Windows and Centos, that's more been with respect to kafka/zookeeper and tables/filenames than anything specifically Faust related.

dada-engineer commented 1 year ago

python 3.10.9 same result on mac OS