robinhood / faust

Python Stream Processing
Other
6.75k stars 534 forks source link

TypeError: produce() got an unexpected keyword argument 'timestamp' #722

Open roalonsor opened 3 years ago

roalonsor commented 3 years ago

While trying to reproduce the example shown in Faust documentation http://fauststream.com/en/stable/userguide/agents.html and Confluent Kafka, I got the following stack:

Traceback (most recent call last): File "src/mainsend.py", line 10, in <module> loop.run_until_complete(send_value()) File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete return future.result() File "src/mainsend.py", line 6, in send_value print(await adding.ask(Add(a=4, b=4))) File "/home/ralonso/work/smu/ms-lc-mapper/env/lib/python3.8/site-packages/faust/agents/agent.py", line 752, in ask p = await self.ask_nowait( File "/home/ralonso/work/smu/ms-lc-mapper/env/lib/python3.8/site-packages/faust/agents/agent.py", line 788, in ask_nowait await self.channel.send( File "/home/ralonso/work/smu/ms-lc-mapper/env/lib/python3.8/site-packages/faust/topics.py", line 184, in send return await self._send_now( File "/home/ralonso/work/smu/ms-lc-mapper/env/lib/python3.8/site-packages/faust/channels.py", line 300, in _send_now return await self.publish_message( File "/home/ralonso/work/smu/ms-lc-mapper/env/lib/python3.8/site-packages/faust/topics.py", line 413, in publish_message fut2 = cast(asyncio.Future, await producer.send( File "/home/ralonso/work/smu/ms-lc-mapper/env/lib/python3.8/site-packages/faust/transport/drivers/confluent.py", line 516, in send self._quick_produce( TypeError: produce() got an unexpected keyword argument 'timestamp'

The code user for the worker is:

`

import logging
import boto3
from decouple import config
import faust
from typing import AsyncIterable
from faust import StreamT
import uuid
import datetime

KF_BOOTSTRAP_SERVERS = config('BOOTSTRAP_SERVERS')
KF_API_KEY = config('API_KEY')
KF_API_SECRET = config('API_SECRET')

app = faust.App('ms-mapper', 
    debug = True,
    broker = f'confluent://{KF_BOOTSTRAP_SERVERS}',
    protocol='SASL_SSL',
    mechanisms='PLAIN',
    broker_credentials = faust.SASLCredentials(
    username = KF_API_KEY,
    password = KF_API_SECRET
))

class Add(faust.Record):
    a: int
    b: int

inbound_topic = app.topic('lifecycle.geo.transaction.new.in', value_type=Add)

@app.agent(inbound_topic)
async def adding(stream):
    async for value in stream:
        # here we receive Add objects, add a + b.
        yield value.a + value.b`

And for the sender the code is:

`

# examples/send_to_agent.py
import asyncio
from maintest import Add, adding

async def send_value() -> None:
    print(await adding.ask(Add(a=4, b=4)))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send_value())

`

I tried using other transport codecs, adding a timestamp in teh record, but it doesn't work

bobh66 commented 3 years ago

This project appears to have been abandoned.

You might want to check out the fork of this project - https://github.com/faust-streaming/faust

It has a bunch of fixes merged for problems that were in the base project.