flink-extended / flink-scala-api

Flink Scala API is a thin wrapper on top of Flink Java API which support Scala Types for serialisation as well the latest Scala version
Apache License 2.0
62 stars 14 forks source link

Using uPickle library to deserialize JSON strings to case classes #137

Closed geazi-anc closed 2 months ago

geazi-anc commented 2 months ago

Hello, everyone! I was trying out Apache Flink at the first time with this awesome library, and I built a pipeline that reads data from a Pub/Sub topic, converts to a JSON format, transforms it and loads to another Pub/Sub topic.

However, I had a lot of problems to deserialize the JSON string messages to a case class. Firstly, I used the flink-json module to deserializers, but it didn't work for me. So, I thought of using another way to deserializer JSON messages to a ccase class.

In this way, I used Upickle to deserialize JSON strings to case classes, and it worked prefectly! But I still have a question: I'll have a performance degradation using uPickle to deserialize JSON instead of using flink-json library?

Is below a simple data pipeline that reads JSON strings from a socket (such as broker), and releases the JSON strings to case classes:

//> using toolkit default
//> using dep "org.flinkextended::flink-scala-api:1.18.1_1.1.6"
//> using dep "org.apache.flink:flink-clients:1.18.1"

import org.apache.flinkx.api._
import org.apache.flinkx.api.serializers._
import upickle.default.*

case class Person(name: String, age: Int) derives ReadWriter

@main def main =
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env
    .socketTextStream("localhost", 9999)
    .map(t => read[Person](t)) // convert a JSON string to Person class
    .map(p => p.copy(p.name.toUpperCase, p.age * 2)) // transform data converted to Person class
    .map(p => s"Hello, ${p.name}, you're ${p.age} years old!")
    .print()

  env.execute("myStream")
# send these messages to socket:
# {"name": "John", "age": 20}
# {"name": "Mary", "age": 25}
#
# Application console
SLF4J(W): No SLF4J providers were found.
SLF4J(W): Defaulting to no-operation (NOP) logger implementation
SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further details.
5> Hello, JOHN, you're 40 years old!
6> Hello, MARY, you're 50 years old!

In short, it's ok to work this way to deserialize JSON strings to case classes, instead of using flink-json module? With uPickle, the code is cleaner too!

Thanks in advance.

novakov-alexey commented 2 months ago

yes, it is ok to use any other library to parse JSON string into a case class instance. I have never used flink-json. The problems you had are probably related to the support of Scala in the first place, just guessing here.

geazi-anc commented 2 months ago

Thank you so much for your answer, @novakov-alexey! And congrats for this awesome library. Please keep it up to date :).