fsanaulla / chronicler-spark

InfluxDB connector to Apache Spark on top of Chronicler
Apache License 2.0
27 stars 4 forks source link

How to use this on pyspark #20

Closed ballerabdude closed 4 years ago

ballerabdude commented 4 years ago

I would really like to stream data directly to an Influxdb. Is there a way to add the required packages and use this as a sink for a structured stream?

ballerabdude commented 4 years ago

I was able to solve my problem

import json
from influxdb import InfluxDBClient
client = InfluxDBClient('localhost', 8086, 'admin', 'admin', 'database1')
def publish_to_influxdb(row):

    dict_data = row.asDict()
    json_body = [
        {
            "measurement": "datapoint-1",
            "tags": {
                "key": dict_data['key']
            },
            "time": dict_data['timestamp'].isoformat(),
            "fields": {
                "high": dict_data['high'],
                "low": dict_data['low'],
            }
        }
    ]
    print(client.write_points(json_body)) # Returns True
    pass

df_query = df.writeStream \
      .outputMode("append") \
      .format("foreach") \
      .foreach(publish_to_influxdb).start() 

df_query.awaitTermination()
fsanaulla commented 4 years ago

Nice one @ballerabdude!