datastax / pulsar-sink

An Apache Pulsar® sink for transferring events/messages from Pulsar topics to Apache Cassandra®, DataStax Astra or DataStax Enterprise (DSE) tables.
Apache License 2.0
14 stars 10 forks source link

sink not working with column with datatype 'timestamp' and python producer #15

Open saveriogzz opened 3 years ago

saveriogzz commented 3 years ago

Hello,

I'd like an Astra sink to consume some data. The sink should pull data to an Astra table with some columns having type timestamp but this is not working.

CREATE TABLE streaming_test.test_streaming_timestamp (
    sensor text,
    added timeuuid,
    temperature int,
    timestamp timestamp,
    PRIMARY KEY (sensor, added)
)

My sink's mapping is sensor=value.sensor, added=now(), temperature=value.temperature, timestamp=value.timestamp

My test python producer script is

import pulsar
import uuid
from pulsar.schema import *
from datetime import datetime

class Example(Record):
    sensor = String()
    temperature = Integer()
    timestamp = String()

service_url = 'pulsar+ssl://my.cloud.somewhere.my.streaming.example:port'

# Use default CA certs for your environment
# Debian/Ubuntu:
trust_certs='/etc/ssl/certs/ca-certificates.crt'

token='myToken'
client = pulsar.Client(service_url,
                        authentication=pulsar.AuthenticationToken(token),
                        tls_trust_certs_file_path=trust_certs)

producer = client.create_producer('persistent://my-tenant/my-namespame/my-topic', schema=JsonSchema(Example))

now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

producer.send(Example(sensor=str(uuid.uuid4()), temperature=20, timestamp=now))

client.close()

Referring to the pulsar python client (definition.py), I cannot see any ways to specify Timestamp() in the schema. That is the reason why I am specifying timestamp as a String(), but still does not work. What am I missing?

Thanks a lot

STomashevych commented 1 year ago

Timestamp is stored as Integer in Cassandra, so converting it to int like this timestamp = int(time.time())*1000 works for me.