faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.6k stars 180 forks source link

`relative_to_field` raised error when the field is string type(isoformat) #389

Open rightx2 opened 1 year ago

rightx2 commented 1 year ago

Checklist

I have a simple kafka producer code:

import pandas as pd

from confluent_kafka import avro, Consumer
from confluent_kafka.avro import AvroProducer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient

from dataclasses_avroschema import AvroModel

class MyDtype(AvroModel):
    job_id: str
    value: int
    date_time: str

topic_name = "practice24"
bootstrap_server = "192.168.59.100:30887"

sr_server = "http://localhost:8081"
avro_schema = avro.loads(MyDtype.avro_schema())
sr_client = CachedSchemaRegistryClient({"url": sr_server})

producer = AvroProducer(
    {
        "bootstrap.servers": bootstrap_server,
        "client.id": 'test',
    },
    schema_registry=sr_client,
    default_value_schema=avro_schema,
)

for data_dict in [
    {"job_id": "a", "value": 1, "date_time": pd.Timestamp("2022-11-01 13:00:00").isoformat()},
    {"job_id": "a", "value": 2, "date_time": pd.Timestamp("2022-11-01 13:03:00").isoformat()},
    {"job_id": "a", "value": 3, "date_time": pd.Timestamp("2022-11-01 13:04:00").isoformat()},
    {"job_id": "a", "value": 4, "date_time": pd.Timestamp("2022-11-01 13:06:00").isoformat()},
    # missing
    {"job_id": "b", "value": 2, "date_time": pd.Timestamp("2022-11-01 13:02:00").isoformat()},
]:
    _ = producer.produce(topic=topic_name, value=data_dict,)
    _ = producer.poll(0)
producer.flush()

Faust code(working well):

import faust

from datetime import timedelta

from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers.faust import FaustSerializer

from dataclasses_avroschema import AvroModel

topic_name = "practice24"
subject_name = f"{topic_name}-value"
serializer_name = f"{topic_name}_serializer"
bootstrap_server = "192.168.59.100:30887"

sr_server = "http://localhost:8081"
client = SchemaRegistryClient({"url": sr_server})
topic_schema = client.get_schema(subject_name)
fp_avro_schema = schema.AvroSchema(topic_schema.schema.raw_schema)

avro_fp_serializer = FaustSerializer(client, serializer_name, fp_avro_schema)
faust.serializers.codecs.register(name=serializer_name, codec=avro_fp_serializer)

app = faust.App('sample_app', broker=bootstrap_server)

class MyDtype(faust.Record, AvroModel, isodates=True):
    job_id: str
    value: int
    date_time: str

count_table = app.Table(
    f'{topic_name}_count_table', default=int,
).hopping(
    size=timedelta(minutes=100),
    step=timedelta(minutes=50),
    expires=timedelta(minutes=100)
).relative_to_field(MyDtype.date_time)

faust_topic = app.topic(
    topic_name,
    value_type=MyDtype,
    value_serializer=serializer_name
)

@app.agent(faust_topic)
async def process_fp(fps):
    async for fp in fps.group_by(MyDtype.job_id):
        print(fp)

It worked well.

However, When I changed below code

@app.agent(faust_topic)
async def process_fp(fps):
    async for fp in fps.group_by(MyDtype.job_id):
        print(fp)

to

@app.agent(faust_topic)
async def process_fp(fps):
    async for fp in fps.group_by(MyDtype.job_id):
        count_table[fp.job_id] += 1

It raised error:

[2022-10-15 08:53:42,009] [603736] [ERROR] [^----Agent*: hello_world4.process_fp]: Crashed reason=TypeError('must be real number, not str')
Traceback (most recent call last):
  File "/home/newdisk/miniconda3/envs/nftbank-data-ingestor/lib/python3.8/site-packages/faust/agents/agent.py", line 674, in _execute_actor
    await coro
  File "/home/my/hello_world4.py", line 53, in process_fp
    count_table[fp.job_id] += 1
  File "/home/newdisk/miniconda3/envs/nftbank-data-ingestor/lib/python3.8/site-packages/faust/tables/wrappers.py", line 273, in __iadd__
    return self.apply(operator.add, other)
  File "/home/newdisk/miniconda3/envs/nftbank-data-ingestor/lib/python3.8/site-packages/faust/tables/wrappers.py", line 208, in apply
    table._apply_window_op(op, self.key, value, timestamp)
  File "/home/newdisk/miniconda3/envs/nftbank-data-ingestor/lib/python3.8/site-packages/faust/tables/base.py", line 508, in _apply_window_op
    for window_range in self._window_ranges(timestamp):
  File "/home/newdisk/miniconda3/envs/nftbank-data-ingestor/lib/python3.8/site-packages/faust/tables/base.py", line 521, in _window_ranges
    for window_range in window.ranges(timestamp):
  File "faust/_cython/windows.pyx", line 62, in faust._cython.windows.HoppingWindow.ranges
TypeError: must be real number, not str
[2022-10-15 08:53:42,010] [603736] [INFO] [^----OneForOneSupervisor: (1@0x7f9740e72bb0)]: Restarting dead <Agent*: hello_world4.process_fp>! Last crash reason: TypeError('must be real number, not str')

I think this happened because date_time is str type, not datetime field so relative_on_field raised error.

Setting isodates=True didn't help me....

How can I solve this?

Versions

wbarnha commented 1 year ago

I think this happened because date_time is str type, not datetime field so relative_on_field raised error.

Setting isodates=True didn't help me....

I initially was going to suggest referring to the datetime example for dataclasses_avroschema until you provided the example comparing print(fp) and count_table[fp.job_id] += 1.

@app.agent(faust_topic)
async def process_fp(fps):
    async for fp in fps.group_by(MyDtype.job_id):
        count_table[fp.job_id] += 1

fp.job_id is a string being passed to _window_ranges when it expects a float as an argument: https://github.com/faust-streaming/faust/blob/948dd054b623b64812e87b83fff7a94eb0a5209d/faust/tables/base.py#L519-L522 group_by works as intended as we can see from the page_views.py example in https://faust-streaming.github.io/faust/playbooks/pageviews.html, so I think the issue is here:

count_table = app.Table(
    f'{topic_name}_count_table', default=int,
).hopping(
    size=timedelta(minutes=100),
    step=timedelta(minutes=50),
    expires=timedelta(minutes=100)
).relative_to_field(MyDtype.date_time)

From the example in https://github.com/faust-streaming/faust/blob/master/examples/windowed_aggregation.py, I think if you change your record to this:

class MyDtype(faust.Record, AvroModel):
    job_id: str
    value: int
    date_time: datetime

your table should behave properly. I haven't had the opportunity to run your code yet so take my advice with a grain of salt.