ing-bank / scruid

Scala + Druid: Scruid. A library that allows you to compose queries in Scala, and parse the result back into typesafe classes.
Apache License 2.0
115 stars 29 forks source link

Application not finishing #26

Closed tonipenya closed 6 years ago

tonipenya commented 6 years ago

I am using scruid to read from a druid database. However the following simple application does not finish.

object Playground extends App with LazyLogging {

  case class TopEventType(count: Int, eventType: String = null)

  logger.info("=== About to read")
  TopNQuery(
    dimension = Dimension(
      dimension = "eventType"
    ),
    threshold = 2,
    metric = "count",
    aggregations = List(
      CountAggregation(name = "count")
    ),
    intervals = List("2018-01-01/2018-12-31")
  ).execute
    .map(_.list[TopEventType].foreach(evt => logger.info(evt.toString)))
}

It produces the following output

16:57:47.245 [ForkJoinPool-1-worker-13] INFO Playground$ - TopEventType(11,addProductToCart) 
16:57:47.245 [ForkJoinPool-1-worker-13] INFO Playground$ - TopEventType(10,finishPurchase)   

all right but does not finish execution.

I am pretty sure there is something obvious I am missing.

bjgbeelen commented 6 years ago

Can you give a bit more context? :-) I'm not yet following you based on what you provided above

tonipenya commented 6 years ago

I have a druid running on my machine with data already on it. The schema has a single dimension (eventType) and a single metric (count) of type count.

I am writing a simple application to recover the top 2 eventTypes with higher count with the code above. The code runs and logs as expected. The issue is that the execution of the application does not finish.

The following code, however, finishes just fine:

object Playground extends App with LazyLogging {

  case class TopEventType(count: Int, eventType: String = null)

  logger.info("=== Will log and end")

}

I am wondering if there is some kind of connection that is not being closed on scruid side.

(I hope that makes things more clear. Otherwise, could you tell me what I can clarify?)

bjgbeelen commented 6 years ago

I was just about to type that I figured out the issue you were facing. I got confused with you business logic which also mentioned something 'finishing' ;-)

Just did a test. Had a suspicion that it is due to the ActorSystem that is alive in the DruidClient.

Using this code:

package ing.wbaa.druid

import com.typesafe.scalalogging._
import ing.wbaa.druid.definitions._
import ing.wbaa.druid.definitions.FilterOperators._
import io.circe.generic.auto._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
import scala.concurrent.duration._

object Playground extends App with LazyLogging {

  case class TimeseriesCount(count: Int)

  logger.info("=== About to read")

  val request = TimeSeriesQuery(
    aggregations = List(
      CountAggregation(name = "count")
    ),
    granularity = GranularityType.Hour,
    intervals = List("2011-06-01/2017-06-01")
  ).execute

  val result = Await.result(request, 10 seconds)

  result.list[TimeseriesCount].foreach(evt => logger.info(evt.toString))

  DruidClient.system.terminate
}

This finishes

tonipenya commented 6 years ago

Aha! I was not aware of scruid using akka's actors. Thanks a lot @bjgbeelen !

We ended up using a non-blocking version of your code :-)

package ing.wbaa.druid

import com.typesafe.scalalogging._
import ing.wbaa.druid.definitions._
import ing.wbaa.druid.definitions.FilterOperators._
import io.circe.generic.auto._
import scala.concurrent.ExecutionContext.Implicits.global

object Playground extends App with LazyLogging {

  case class TimeseriesCount(count: Int)

  logger.info("=== About to read")

  val response = TimeSeriesQuery(
    aggregations = List(
      CountAggregation(name = "count")
    ),
    granularity = GranularityType.Hour,
    intervals = List("2011-06-01/2017-06-01")
  ).execute

  response
      .map(_.list[TopEventType].foreach(evt => logger.info(evt.toString)))
      .map(_ => DruidClient.system.terminate())
}
Fokko commented 6 years ago

Nice one @tonipenya

To be super puristic, the .map(_ => DruidClient.system.terminate()) should be .foreach(_ => DruidClient.system.terminate()) because of the side-effect :)

tonipenya commented 6 years ago

Super puristic is good :+1:

bjgbeelen commented 6 years ago

:thumbsup: