qubole / kinesis-sql

Kinesis Connector for Structured Streaming
http://www.qubole.com
Apache License 2.0
137 stars 80 forks source link

Does Kinesis connector support reading compressed payloads on stream? #105

Open sudeshnasengupta opened 3 years ago

sudeshnasengupta commented 3 years ago

Hi, We currently use the Qubole open source Kinesis connector in a Spark structured streaming application on AWS, to read payloads from the Kinesis stream, and generate Parquet files. Would it be possible to use Qubole Kinesis connector for reading compressed payloads (say, in GZip format), that have been posted to the stream using the same compression format as is recognized from the reading side (i.e. in terms of Kinesis Qubole source)? Thanks.

d4r3topk commented 2 years ago

Hello! I have been using this connector and I haven't seen a configuration for automatically unzipping the content. If you are expecting data from kinesis as GZIP compressed, you can use the following snippet to uncompress it in Python.

import gzip
from io import BytesIO
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

def decompress(row):
    return gzip.GzipFile(mode="rb", fileobj=BytesIO(row)).read()

def decode_fn(row):
    return row.decode("utf8")

decompress_udf = udf(decompress, StringType())
decode_udf = udf(decode_fn, StringType())

kinesis_stream_name = "xxxx"
kinesis_endpoint_url = "xxxx"

data_source = (
        spark.readStream.format("kinesis")
        .option("streamName", kinesis_stream_name)
        .option("endpointUrl", kinesis_endpoint_url)
        .option("startingPosition", "TRIM_HORIZON")
        .load()
    )

# If you're using kinesis stream, then all the uncompressed parameters, eg. ApproximateArrivalTimestamp, 
# and others will be available here like so col('ApproximateArrivalTimestamp')
data_source = data_source.select(
        decode_udf(decompress_udf(col("data")))
    ).alias("data")

# Now you're data is in uncompressed format parallelized according to the number of partitions