Closed kumar-asista closed 7 years ago
This is my code
//SchedulerImpl
val port = 4011 val config = ConfigFactory.load() val aSettings = Settings(config)
def makeClusterCtx() = new ClusterCtx( clusterName = "CassCluster", astyanaxConfig = new AstyanaxConfigurationImpl() .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE), connectionPoolConfig = new ConnectionPoolConfigurationImpl("CassConnectionPool") .setSeeds("localhost:9042") .setPort(9042), connectionPoolMonitor = new CountingConnectionPoolMonitor() )
lazy val clusterCtx = makeClusterCtx()
val cluster = clusterCtx.cluster val keyspace = cluster.getKeyspace("SchedulerIntegrationSpec") val erisSettings = new ErisSettings() lazy val attemptHistoryDaoImpl = new AttemptHistoryDaoImpl(cluster, keyspace, erisSettings)
val taskRunner = (_: Task) => {}
val taskService=CassandraTaskExecutorTestService.cassTestExecutorFactory(makeClusterCtx _,taskRunner)
val schedulerSettings = SchedulerSettings(ConfigFactory.load()) val actualScheduler = new SchedulerImpl( schedulerSettings, config, NullMetrics, cluster, keyspace, taskService )( logging = new Scheduler.LoggingImpl(schedulerSettings, NullMetrics, Some(attemptHistoryDaoImpl)) ) actualScheduler.start()
object CassandraTaskExecutorTestService { val KeyspaceName = "testscheduler" val WorkersPerPartition = 2
class LogEntryDao( protected val cluster: Cluster, protected val keyspace: Keyspace, override protected val settings: ErisSettings ) extends Dao {
protected implicit val executor: ExecutionContextExecutor = ExecutionContext.Implicits.global
protected val logEntries = new WideRowMap(
columnFamily[String, TimeUuid, String]("LogEntries"),
pageSize = 100
)
private val rowKey = "mainLog"
def insert(logEntry: String): Future[Unit] = {
logEntries(rowKey).queueInsert(EntryColumn(TimeUuid(), logEntry)).executeAsync()
}
}
def schemaLoader(cluster: Cluster): SchemaLoader = {
val dao = makeLogEntryDao(cluster)
new SchemaLoader(cluster, dao.columnFamilyDefs)
}
private def makeLogEntryDao(cluster: Cluster) = {
val keyspace = cluster.getKeyspace(KeyspaceName)
new LogEntryDao(cluster, keyspace, new ErisSettings())
}
def cassTestExecutorFactory(
mkClusterCtx: () => ClusterCtx,
taskRunner: Task => Unit
): Set[PartitionId] => TaskExecutorService = {
val managedTaskRunner = new ManagedCassandraTaskRunner[LogEntryDao] {
def makeClusterCtx(): ClusterCtx = mkClusterCtx()
def makeManagedResource(clusterCtx: ClusterCtx): LogEntryDao = {
makeLogEntryDao(clusterCtx.cluster)
}
def runTask(task: Task, logEntryDao: LogEntryDao): Unit = {
taskRunner(task)
Await.result(logEntryDao.insert(task.taskKey.toString()), Duration.Inf)
}
}
CassandraTaskExecutorService.factory(WorkersPerPartition, managedTaskRunner)
}
}
This problem with com.google.guava dependency version mismatch .
Uncaught error from thread [Scheduler-akka.actor.default-dispatcher-5] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[Scheduler] java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor; at com.pagerduty.eris.FutureConversions$.googleListenableToScalaFuture(FutureConversions.scala:53) at com.pagerduty.eris.widerow.WideRowDriverImpl.fetchData(WideRowDriverImpl.scala:99) at com.pagerduty.eris.dao.WideRowDriverWithMetrics.fetchData(WideRowDriverWithMetrics.scala:23) at com.pagerduty.widerow.chain.QueryPage.fetchNextPage(QueryPage.scala:68) at com.pagerduty.widerow.chain.QueryPage.nextPage(QueryPage.scala:54) at com.pagerduty.widerow.WideRowView.com$pagerduty$widerow$WideRowView$$rec$2(WideRowView.scala:260) at com.pagerduty.widerow.WideRowView.get(WideRowView.scala:280) at com.pagerduty.scheduler.dao.TaskScheduleDaoImpl.load(TaskScheduleDao.scala:148) at com.pagerduty.scheduler.dao.TaskScheduleDaoImpl.load(TaskScheduleDao.scala:129) at com.pagerduty.scheduler.akka.TaskPersistence$$anonfun$2.applyOrElse(TaskPersistence.scala:106) at com.pagerduty.scheduler.akka.TaskPersistence$$anonfun$2.applyOrElse(TaskPersistence.scala:103) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) at akka.actor.FSM$class.processEvent(FSM.scala:604) at com.pagerduty.scheduler.akka.TaskPersistence.akka$actor$LoggingFSM$$super$processEvent(TaskPersistence.scala:90) at akka.actor.LoggingFSM$class.processEvent(FSM.scala:734) at com.pagerduty.scheduler.akka.TaskPersistence.processEvent(TaskPersistence.scala:90) at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:598) at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:592) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at com.pagerduty.scheduler.akka.TaskPersistence.aroundReceive(TaskPersistence.scala:90) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)