RedisLabs / spark-redis

A connector for Spark that allows reading and writing to/from Redis cluster
BSD 3-Clause "New" or "Revised" License
935 stars 366 forks source link

how to push spark streamint data into redis by spark-redis? #26

Open flowers2023 opened 8 years ago

flowers2023 commented 8 years ago

hi, Thanks for you provide spark-reidis code for us. Now I having a problem . when I use KafkaUtils to create a stream ,then I donot known how to push kakfa stream data to redis. my code is bellow, val streams = KafkaUtils.createStream(ssc, params.ZK_URL, params.GROUPID, topicMap).map(._2)

sunheehnus commented 8 years ago

Hello @DRUNK2013 , you can use

import com.redislabs.provider.redis._

val sc = new SparkContext(new SparkConf()
    .setMaster("local")
    .setAppName("myApp")
    .set("redis.host", "localhost")
    .set("redis.port", "6379")
    .set("redis.auth", "xxx")
)

streams.foreachRDD{
      rdd => {
        sc.toRedisLIST(rdd, "kafka msgs")
      }
}

to push all the msgs to a list whose name is "kafka msgs"

If you want to write to a different Redis instance/cluster, you can use:

implicit  val redisConfig1 = new RedisConfig(new RedisEndpoint("127.0.0.1", 7379, "passwd"))

streams.foreachRDD{
      rdd => {
        sc.toRedisLIST(rdd, "kafka msgs")
      }
}

You can find the reference in Readme. :-)

clunny commented 8 years ago

Hi. I have tried your solution @sunheehnus but I get an issue with serialization. It is probably caused by me using checkpointing - which is necessary so that I can use the updateStateByKey function.

Can you help me to solve this?

Here is my simple test program:

import com.datastax.spark.connector._
import org.apache.spark._
import org.apache.spark.streaming._
import com.redislabs.provider.redis._

object StreamToRedisTest{
    def main(args: Array[String]) {
        val conf = new SparkConf()
            .setMaster("local[2]")
            .setAppName("Test")
            .set("redis.host", "localhost")
            .set("redis.port", "6379")

        val sc = new SparkContext(conf)
        val ssc = new StreamingContext(sc, Seconds(1))

        val checkpointDirectory = "/data/spark-checkpoint-directory"
        ssc.checkpoint(checkpointDirectory)

        val lines = ssc.socketTextStream("localhost", 9999)
        val test = lines
            .filter(l => {
                l.startsWith("Example")
            })

        test.foreachRDD(rdd => {
                sc.toRedisLIST(rdd, "Test")
            })

        test.print()

        ssc.start()
        ssc.awaitTermination()
    }
}

This is the error message I get when I run the program:

16/07/21 18:18dG:56 ERROR StreamingContext: Error starting the context, marking it as stopped java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable org.apache.spark.SparkContext Serialization stack:

  • object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@1e9469b8)
  • field (class: USmartTfLPredictionToRedis$$anonfun$main$1, name: sc$1, type: class org.apache.spark.SparkContext)
  • object (class USmartTfLPredictionToRedis$$anonfun$main$1, )
aparnam2 commented 7 years ago

Hi, I have the same problem, is there any solution to this?

mccstan commented 7 years ago

Try this ....., Operations on RDDs are not serializable so you can't write a rdd to a list on REDIS. You have to access partions (located on specific Spark Worker node) and then proceed to the write operation. THe spark driver could now serialize this opration and send it to the nodes who hold a partition of data processed.

import com.datastax.spark.connector._
import org.apache.spark._
import org.apache.spark.streaming._
import com.redislabs.provider.redis._

object StreamToRedisTest{
    def main(args: Array[String]) {
        val conf = new SparkConf()
            .setMaster("local[2]")
            .setAppName("Test")
            .set("redis.host", "localhost")
            .set("redis.port", "6379")

        val sc = new SparkContext(conf)
        val ssc = new StreamingContext(sc, Seconds(1))

        val checkpointDirectory = "/data/spark-checkpoint-directory"
        ssc.checkpoint(checkpointDirectory)

        val lines = ssc.socketTextStream("localhost", 9999)
        val test = lines
            .filter(l => {
                l.startsWith("Example")
            })

        test.foreachRDD(rdd => 
        rdd.foreachPartition(partitionRdd =>
            partitionRdd.foreach(message =>
                        sc.toRedisLIST(message, "Test")
            )
        )
        )

        test.print()

        ssc.start()
        ssc.awaitTermination()
    }
}
JavaSparker commented 6 years ago

Hello, I try to find examples with saprk and redis. Could you help me please? Read in redis with spark or store a dataset in redis.

mccstan commented 6 years ago

This is a spark streaming app that use Redis as Streaming source : https://github.com/SoatGroup/spark-redis-nodejs

JavaSparker commented 6 years ago

Hello mccstan and thank you for your answer. But, I am looking for an example in java spark : Read in redis with spark or store a dataset in redis with JAVA. I do not know how to translate scala to java. Thank you.

mccstan commented 6 years ago

Hello @JavaSparker this is the official redisLab connector for reading and writing spark data from/to Redis : https://github.com/RedisLabs/spark-redis

You could find interesting examples

There is the corresponding maven (gradle or sbt too) dependancy

<dependency>
    <groupId>RedisLabs</groupId>
    <artifactId>spark-redis</artifactId>
    <version>0.3.2</version>
</dependency>
JavaSparker commented 6 years ago

Hello @mccstan and Thank you for your answer. The example is in Scala and unfortunately I do not know Scala and i m new in Spark. I'm trying to iterate a Dataset with Foreach or Map to push each Row in Redis, but I'm having serialization problems. Have you an axample with Java please. Thank you.

mccstan commented 6 years ago

Let me see your code and the error stack trace @JavaSparker

JavaSparker commented 6 years ago

Hello @mccstan I try to push in redis foreach the dataset: ds is a dataset

Jedis jedis = new Jedis("localhost"); ds.foreach((ForeachFunction) row -> jedis.lpush("RDS", row.toString()) );

Planning scan with bin packing, max size: 96298086 bytes, open cost is considered as scanning 4194304 bytes. org.apache.spark.SparkException: Task not serializable Closing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@67c27493: startup date [Wed Jan 24 13:50:28 CET 2018]; root of context hierarchy

Thank you

mccstan commented 6 years ago

@JavaSparker There is a publish exemple using java Note that this is not a good solution as you have create a connection for each partition :

package fr.mccstan;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import redis.clients.jedis.Jedis;

import java.util.Arrays;
import java.util.List;

public class App {
    public static void main( String[] args ) {
        SparkConf conf = new SparkConf()
                .setAppName("Test")
                .setMaster("local");

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);

        JavaRDD<Integer> distData = sc.parallelize(data);

        distData.foreachPartition(
                line -> {
                    final Jedis jedis = new Jedis("localhost");
                    line.forEachRemaining( inte ->
                            jedis.publish("intNumbers", inte.toString())
                    );
                    jedis.close();
                });

    }
}
jasonnerothin commented 5 years ago

I'm trying to read a spark stream from Kafka and pipe some output to Redis from a DataFrame or Dataset. I believe what is needed is a ForeachWriter.

This would function similar to @mccstan's foreachPartition (that is: requires a connection per partition). According to Spark: Definitive Guide book, the requirements for such an implementation are:

  1. The writer must be Serializable (I've tried the scala-redis lib, in which RedisClient is not Serializable)
  2. open, process, close are called on each partition
  3. The writer must do its initialization only in the open method

In my case, I'd like to use Redis as a sort of process metastore: Keeping track of overall process status outside of Spark.

If I find a solution, I'll post back here.

mccstan commented 5 years ago

Hey @jasonnerothin , can you give details on what you call process status ? Sure you can't serialize a connection (TCP or ...) Did you thought about statefull streaming with UpdateStateByKey ?

jasonnerothin commented 5 years ago

@mccstan I don't think updateStateByKey is what I'm looking for.

I'm processing ~1k feeds from ~1k files and dumping in Kafka. So I have 1k hashmaps in redis that say that the ImportStatus is READY, WORKING, ERROR, or DONE for each feed. When the job starts up, it just asks for a READY feed and reads it up.

In open() (or in the driver), I'd like to flip the status from READY to WORKING then let the driver flip it to ERROR or DONE when the partitions have close()d for a particular feed.

I've thought about using Spark Broadcast Accumulators to get me past this step, but even so, I'd still be back at the same spot when I'm trying to kafka source -> spark job -> redis sink. Even if I were to serialize a low-level TCP connection, upon "re-instantiation" of RedisClient on the worker, I'd still be stuck with the fact that Spark doesn't know how to serialize it.

Perhaps I could add debasishg.redis-client to the worker classpath. Will see.

jasonnerothin commented 5 years ago

If I copy this dependency

libraryDependencies += "net.debasishg" %% "redisclient" % "3.9" % Provided withSources() withJavadoc()

into

${SPARK_HOME}/jars, the driver will not attempt to serialize the RedisClient and relatives across the pipe (works for client and cluster mode).

If my implementation of ForeachWriter is useful, I'll share out a link...

fe2s commented 5 years ago

Hi Jason,

Not sure I understood the problem correctly, do you read from files (HDFS/S3?) and write to Kafka? If so, why do you use spark streaming and not spark batch? Any code or pseudo-code would be helpful.

As for the redis client - jedis library is already available in spark-redis, so not sure why you are considering to bring 'net.debasishg' in classpath.

As for ForeachWriter, if you are using structured streaming you can leverage ForeachBatch to persist streaming dataframe to redis, here is an example:

    val streamingDf = ....

    val query = streamingDf
      .writeStream
      .outputMode("update")
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        batchDF
          .write
          .format("org.apache.spark.sql.redis")
          .option("table", "yourTableName")
          .option("key.column", "yourColumn")
          .mode(SaveMode.Append)
          .save()
      }
      .start()
jasonnerothin commented 5 years ago

Hi Oleksiy,

I am serializing from text files into Kafka and then doing Structured Streaming out the other side.

I am using Redis to maintain (mostly book-keeping) state across streams (and stacks - e.g. pandas). Started with 'net.debashishg' because I just needed some Redis primitives at the time to keep track of my Kafka streams/topics...

Thanks for the tip, looks like it should work. Will post back in a few days.

Jason

jasonnerothin commented 5 years ago

Hi Oleksiy,

I'm not sure if it'll work after all:


case class TextRecord(eventTime: Timestamp, lineLength: Int, stationId: String, windDirection: Float, windSpeed: Float, gustSpeed: Float, waveHeight: Float, dominantWavePeriod: Float, averageWavePeriod: Float, mWaveDirection: Float, seaLevelPressure: Float, airTemp: Float, waterSurfaceTemp: Float, dewPointTemp: Float, visibility: Float, pressureTendency: Float, tide: Float) extends WxRecord(eventTime, lineLength, stationId)

My Dataset is mapped out of flatMapGroupsWithState and through groupByKey.

So, my version of your snippet looks like this:


.writeStream
      .foreachBatch { (batchDs: Dataset[List[TextRecord]], batchId: Long) =>
        batchDs
          .write
          .format("org.apache.spark.sql.redis")
          .option("table", "interrupts")
          .option("key.column", "eventTime")
          .mode(SaveMode.Append)
          .save()
      }

My problem is "key.column": I could make a key from something like concat(col('eventTime), col('stationId), but a single column isn't a candidate key at that location. I'll try to add an upstream .withColumn and see if I can make that work.

Ideally, I'd like to target a table per groupBy key, so that I can look up records with Redis' keys command...

jasonnerothin commented 5 years ago

I'm also having issues resolving your snapshot in sbt...


[warn]  Note: Unresolved dependencies path:
[warn]      com.redislabs:jedis:3.0.0-20181113.105826-9
fe2s commented 5 years ago

Hi Jason,

Sorry for the late response. Did you manage to use .withColumn with key.column property?

As for SBT, the issue is that the jedis snapshot that we reference is from Sonatype OSS repo. Theoretically adding Sonatype resolver to SBT should fix it:

resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots"

In practice it doesn't work. For some reason SBT builds a wrong URL and cannot find the pom/jar.

The workaround is to exclude the transitive dependency and include it with the absolute jar URL:

libraryDependencies += "com.redislabs" % "spark-redis" % "2.3.1" exclude("com.redislabs", "jedis")
libraryDependencies += "com.redislabs" % "jedis" % "3.0.0-20181113.105826-9" from "https://oss.sonatype.org/content/repositories/snapshots/com/redislabs/jedis/3.0.0-SNAPSHOT/jedis-3.0.0-20181113.105826-9.jar"

Once we move from the jedis snapshot to release version the issue should be fixed.