influxdata / influxdb-client-python

InfluxDB 2.0 python client
https://influxdb-client.readthedocs.io/en/stable/
MIT License
718 stars 188 forks source link

Connect pyspark structured streaming to influxdb #137

Closed xyshell closed 4 years ago

xyshell commented 4 years ago

Hello everyone,

Currently I'm doing data ETL by pyspark version 3.0.0, and want to save the formatted data to influxdb version 2.0 betavia this python sdk.

After data ETL by pyspark, I got a <class 'pyspark.sql.dataframe.DataFrame'> object. Since I'm using structured streaming, using df.toPandas() to transfer the object to pd.DataFrame() doesn't work. Not quite sure which approach should I consider and how to do it.

Some code to elaborate:

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName('xxx') \
    .getOrCreate()

raw_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "xxx") \
    .load() \
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

formatted_df = raw_df.xxx().yyy() # some data ETL algorithm
assert type(df) == pyspark.sql.dataframe.DataFrame

# Print output
query = df.writeStream \
    .format("console") \
    .start()
query.awaitTermination()

Currently I'm just printing out the data, but I want to save it into influxdb. Could anyone tell me how to do this?

bednar commented 4 years ago

Hi @xyshell,

thanks for using our client.

What about create a custom Writer? Something like:

class InfluxDBWriter:
    def __init__(self):
        self.client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org")
        self.write_api = self.client.write_api()

    def open(self, partition_id, epoch_id):
        print("Opened %d, %d" % (partition_id, epoch_id))
        return True

    def process(self, row):
        self.write_api.write(bucket="my-bucket", record=self._row_to_line_protocol(row))

    def close(self, error):
        self.write_api.__del__()
        self.client.__del__()
        print("Closed with error: %s" % str(error))

    def _row_to_line_protocol(self, row: pyspark.sql.Row):
        # TODO map Row to LineProtocol
        return "measurement_name,tag_1=tag_1_value field_1=field_1_value 1"

df.writeStream.foreach(InfluxDBWriter())

Rergards

xyshell commented 4 years ago

@bednar works smoothly, thanks for your response!