In the current version Scruid, the results of a query should be collected first inside an instance of DruidResponse. In situations where the payload of a result is large (e.g., half a million of records), we should increase the heap memory, as well as increase limits of a response in Akka HTTP.
Furthermore, Druid performs JSON streaming in its responses, resulting to an array of JSON records. Therefore, instead of collecting the payload first, it would be better to process the payload as stream. Since Scruid uses underneath Akka HTTP, the response can be transformed into an Akka Stream Source and thereafter processed using ordinary Akka Stream Flow operations or passed to an Akka Stream Sink.
Instead of the execute method of a query, I propose to provide additionally the following methods:
.stream: gives a Source of DruidResult.
.streamAs[T](implicit decoder: Decoder[T]): gives a Source where each JSON record is being decoded to the type of T.
.streamSeriesAs[T](implicit decoder: Decoder[T]): gives a Source where each JSON record is being decoded to the type of T and it is accompanied by its corresponding timestamp.
All the methods above can be applied to any timeseries, group-by or top-N query created either directly by using query constructors or by DQL.
For example, a timeseries query can be processed as in the example below:
implicit val mat = DruidClient.materializer
case class TimeseriesCount(count: Int)
val query = TimeSeriesQuery(
aggregations = List(
CountAggregation(name = "count")
),
granularity = "hour",
intervals = List("2011-06-01/2017-06-01")
)
// Decode each record into the type of `TimeseriesCount` and sum all `count` results
val result: Future[Int] = query
.streamAs[TimeseriesCount]
.map(_.count)
.runWith(Sink.fold(0)(_ + _))
In order to support Streaming JSON with Circe as a back-end, Scruid should switch the dependency from "de.heikoseeberger" %% "akka-http-circe" % "1.22.0" to the following:
In the current version Scruid, the results of a query should be collected first inside an instance of
DruidResponse
. In situations where the payload of a result is large (e.g., half a million of records), we should increase the heap memory, as well as increase limits of a response in Akka HTTP.Furthermore, Druid performs JSON streaming in its responses, resulting to an array of JSON records. Therefore, instead of collecting the payload first, it would be better to process the payload as stream. Since Scruid uses underneath Akka HTTP, the response can be transformed into an Akka Stream Source and thereafter processed using ordinary Akka Stream Flow operations or passed to an Akka Stream Sink.
Instead of the
execute
method of a query, I propose to provide additionally the following methods:.stream
: gives a Source ofDruidResult
..streamAs[T](implicit decoder: Decoder[T])
: gives a Source where each JSON record is being decoded to the type ofT
..streamSeriesAs[T](implicit decoder: Decoder[T])
: gives a Source where each JSON record is being decoded to the type ofT
and it is accompanied by its corresponding timestamp.All the methods above can be applied to any timeseries, group-by or top-N query created either directly by using query constructors or by DQL.
For example, a timeseries query can be processed as in the example below:
In order to support Streaming JSON with Circe as a back-end, Scruid should switch the dependency from
"de.heikoseeberger" %% "akka-http-circe" % "1.22.0"
to the following:All the above dependencies originate from a fork of akka-stream-json which adds support for JSON streaming with Akka Streams and Circe.