robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

Can't write back to topic - TypeError: Not supported type for value: <class 'dict'> #363

Closed CesarPantoja closed 5 years ago

CesarPantoja commented 5 years ago

faust v1.6.0

Checklist

Steps to reproduce

I'm trying to run an application that reads from a topic, does some processing, and writes back to another topic. The code is the following:

import faust
from schema_registry.client import SchemaRegistryClient
from schema_registry.serializer import MessageSerializer
from collections import defaultdict

app = faust.App(
    "cesar_test_groups_5",
    broker="kafka://172.31.0.16:9092",
    store='rocksdb://',
    value_serializer="raw",
)

app.config_from_object({
    "topic_replication_factor": 2,
    "topic_partitions": 1,
    "consumer_auto_offset_reset": "earliest"})

class Group(faust.Record):
    pubKeyHashes: list

transactions_topic = app.topic("cesar-test-btc-graph-confirmed-transactions", value_type=bytes)
groups_topic = app.topic("cesar-test-btc-graph-groups", value_type=Group)

locked_by = app.Table("test_btc_locked_by", default=defaultdict)

@app.agent(transactions_topic)
async def processor(stream):
    serde = MessageSerializer(SchemaRegistryClient("http://localhost:8081"))

    async for payload in stream:
        tx = serde.decode_message(payload)
        transaction = tx['transaction']
        print(f'processing transaction {transaction["transactionId"]}')

        outputs = tx['outputs']
        for vout in outputs:
            if len(vout['scriptPubKeyAddresses']) > 0:
                # send the new group back to the stream
                await groups_topic.send(value=Group(pubKeyHashes=vout['scriptPubKeyAddresses']))

if __name__ == '__main__':
    app.main()

Expected behaviour

The app should write to the groups_topic topic

Actual behaviour

Instead, I get a TypeError("Not supported type for value: <class 'dict'>") error.

[2019-06-12 14:22:33,178: WARNING]: processing transaction e1cf3476234d8446653ad52a8939ed792003eefdcd0e897319ab9d2cb4c14c8c 
[2019-06-12 14:22:33,180: ERROR]: [^---Agent*: __main__.processor]: Crashed reason=TypeError("Not supported type for value: <class 'dict'>") 
Traceback (most recent call last):
  File "/home/cesar/dev/TA/mono/stream/txns-inflow-to-address/venv/lib/python3.7/site-packages/faust/agents/agent.py", line 601, in _execute_task
    await coro
  File "/home/cesar/dev/TA/mono/stream/txns-inflow-to-address/txns_inflow_to_address.py", line 42, in processor
    await groups_topic.send(value=Group(pubKeyHashes=vout['scriptPubKeyAddresses']))
  File "/home/cesar/dev/TA/mono/stream/txns-inflow-to-address/venv/lib/python3.7/site-packages/faust/topics.py", line 184, in send
    callback=callback,
  File "/home/cesar/dev/TA/mono/stream/txns-inflow-to-address/venv/lib/python3.7/site-packages/faust/channels.py", line 218, in _send_now
    key_serializer, value_serializer, callback))
  File "/home/cesar/dev/TA/mono/stream/txns-inflow-to-address/venv/lib/python3.7/site-packages/faust/topics.py", line 373, in publish_message
    headers=headers,
  File "/home/cesar/dev/TA/mono/stream/txns-inflow-to-address/venv/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 631, in send
    transactional_id=transactional_id,
  File "/home/cesar/dev/TA/mono/stream/txns-inflow-to-address/venv/lib/python3.7/site-packages/aiokafka/producer/producer.py", line 317, in send
    timestamp_ms=timestamp_ms, headers=headers)
  File "/home/cesar/dev/TA/mono/stream/txns-inflow-to-address/venv/lib/python3.7/site-packages/aiokafka/producer/message_accumulator.py", line 336, in add_message
    future = batch.append(key, value, timestamp_ms, headers=headers)
  File "/home/cesar/dev/TA/mono/stream/txns-inflow-to-address/venv/lib/python3.7/site-packages/aiokafka/producer/message_accumulator.py", line 138, in append
    timestamp=timestamp_ms, key=key, value=value, headers=headers)
  File "/home/cesar/dev/TA/mono/stream/txns-inflow-to-address/venv/lib/python3.7/site-packages/aiokafka/producer/message_accumulator.py", line 53, in append
    headers=headers)
  File "/home/cesar/dev/TA/mono/stream/txns-inflow-to-address/venv/lib/python3.7/site-packages/aiokafka/record/default_records.py", line 416, in append
    "Not supported type for value: {}".format(type(value)))
TypeError: Not supported type for value: <class 'dict'>

Versions

Any help is appreciated! Thanks!

StephenSorriaux commented 5 years ago

Hi,

As far as I can see, I think you need to send the JSON serialized object to the topic, not the object: await groups_topic.send(value=Group(pubKeyHashes=vout['scriptPubKeyAddresses'])) becomes await groups_topic.send(value=Group(pubKeyHashes=vout['scriptPubKeyAddresses']).to_representation())

CesarPantoja commented 5 years ago

Thanks, I solved it by adding the 'json' serialiser to the model, but I guess your solution would work too as there would be no need to serialise/deserialise.