DanielaSfregola / twitter4s

An asynchronous non-blocking Scala client for both the Twitter Rest and Streaming API
Apache License 2.0
256 stars 101 forks source link

Akka conflict in Flink environment #225

Open wzorgdrager opened 6 years ago

wzorgdrager commented 6 years ago

Hi,

I'm currently running a Twitter4s application in a Flink cluster. And I'm running into the following error:

taskrunner_1  | Uncaught error from thread [twitter4s-rest-akka.actor.default-dispatcher-5]: loader constraint violation: when resolving method "akka.actor.ActorCell$$anonfun$3.<init>(Lakka/actor/ActorCell;)V" the class loader (instance of org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader) of the current class, akka/actor/ActorCell, and the class loader (instance of sun/misc/Launcher$AppClassLoader) for the method's defining class, akka/actor/ActorCell$$anonfun$3, have different Class objects for the type akka/actor/ActorCell used in the signature, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for for ActorSystem[twitter4s-rest]

taskrunner_1  | java.lang.LinkageError: loader constraint violation: when resolving method "akka.actor.ActorCell$$anonfun$3.<init>(Lakka/actor/ActorCell;)V" the class loader (instance of org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader) of the current class, akka/actor/ActorCell, and the class loader (instance of sun/misc/Launcher$AppClassLoader) for the method's defining class, akka/actor/ActorCell$$anonfun$3, have different Class objects for the type akka/actor/ActorCell used in the signature

taskrunner_1  | Exception in thread "twitter4s-streaming-shutdown-hook-1" java.lang.NoClassDefFoundError: akka/actor/CoordinatedShutdown$$anonfun$totalTimeout$1

Basically what my application does:

  1. Retrieve trending topics.
  2. Start statuses stream and filter on trending topics.
  3. Repeat this every 15 minutes.

Here some snippets of my code:

override def run(ctx: SourceFunction.SourceContext[TweetWrapper]): Unit = {
    var stream: TwitterStream = null

    while (isRunning) {
      //get stream
      val currentTrends = requestTrending()
      logger.info(s"Retrieved new trends: $currentTrends")

      //close the old stream
      if (stream != null) Await.result(stream.close(), 10 seconds)

      //start new stream
      stream = Await.result(startStream(currentTrends, ctx), 10 seconds)

      //time to sleep
      logger.info(s"Started new stream, now sleeping for $sleepTime minute(s).")
      Thread.sleep(sleepTimeMilli)
    }
  }

And the startStream method:

def startStream(trending: List[String], ctx: SourceFunction.SourceContext[TweetWrapper]): Future[TwitterStream] = {
    getStreamingClient.filterStatuses(tracks = trending) {
      case tweet: Tweet => ctx.collectWithTimestamp(TweetWrapper(tweet), tweet.created_at.getTime)
      case x => logger.info(x)
    }
  }

I think this error is caused by version conflicts related to Akka (Flink also uses Akka), but I have no idea how to solve it. The Twitter4s version I'm using: 5.5

wzorgdrager commented 6 years ago

Update: what I tried now is to 'shade' akka in the twitter4s library into an alternative package with the following lines in my build:

assemblyShadeRules in assembly := Seq(
  ShadeRule
    .rename("akka.**" -> "twitterAkka.@1")
    .inAll
)

The shading works fine, but I then run into the following runtime error:

com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'twitterAkka'
    at com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:156)
    at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:149)
    at com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:188)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:193)
    at com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:250)
    at twitterAkka.actor.ActorSystem$Settings.<init>(ActorSystem.scala:315)
    at twitterAkka.actor.ActorSystemImpl.<init>(ActorSystem.scala:683)
    at twitterAkka.actor.ActorSystem$.apply(ActorSystem.scala:245)
    at twitterAkka.actor.ActorSystem$.apply(ActorSystem.scala:288)
    at twitterAkka.actor.ActorSystem$.apply(ActorSystem.scala:233)
    at com.danielasfregola.twitter4s.TwitterRestClient$.$lessinit$greater$default$3(TwitterRestClient.scala:31)
    at com.danielasfregola.twitter4s.TwitterRestClient$.apply(TwitterRestClient.scala:71)
    at org.codefeedr.plugin.twitter.stages.TwitterTrendingStatusSource.getRestClient(TwitterTrendingStatusInput.scala:80)
    at org.codefeedr.plugin.twitter.stages.TwitterTrendingStatusSource.requestTrending(TwitterTrendingStatusInput.scala:126)
    at org.codefeedr.plugin.twitter.stages.TwitterTrendingStatusSource.run(TwitterTrendingStatusInput.scala:91)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)

Hope you can help.

DanielaSfregola commented 5 years ago

My apologies for the late reply.

It seems that it is having problems creating the ActorSystem...? At least, this is my best guess. What happens if you use the function withActorSystem and you pass the actor system as a parameter?

Regards, D.

ljurukov commented 5 years ago

So I am finding a similar issues for unrelated reasons. The reason as to why shading isn't working in this case is because of how Akka names its packages (ie akka.blah.blah). The relocation process ends up changing more than is intended however due to this. Akka has reference.conf files that it uses to keep defaults. The structure of which, when referenced in text looks similar "akka.blah.blah". I suspect it changes this in the code thinking that it is a stringified class name, resulting in your case looking for a key twitterAkka.blah.blah However the associated reference.conf files are not changed as well and have the original akka.blah.blah path.

wzorgdrager commented 5 years ago

Yes, you're right on that. We resolved this back then by downgrading some versions of dependencies we were using. However, in the long run that isn't really a good solution.