locationtech / geotrellis

GeoTrellis is a geographic data processing engine for high performance applications.
http://geotrellis.io
Other
1.34k stars 361 forks source link

Cassandra server handling in tests #1097

Closed allixender closed 9 years ago

allixender commented 9 years ago

GSoc2015 GeoTrellis Cassandra tracker for shared scratch documentation

If cassandra server is not running, geotrellis-spark (feature-cassandra as of now) tests will fail.

Update 26.05.2015 This is not directly true actually. In fact, the geotrellis-spark cassandra tests use the useCassandraConfig embedded service from the datastax spark-cassandra-connector, which actually starts a cassandra instance in the JVM, just like we want. However, it has a timeout of 10 seconds and if the cassandra daemon isn't responsive than the test will fail. Also the tests cannot be run in parallel! Spark-Cassandra-Connector EmbeddedCassandra.scala


Workarounds/solutions, either define switch to skip Cassandra tests if no cassandra instance is available (keyspace config?) or ideally launch embedded instance, similar to accumulo tests

Some links I'll follow up on, where projects use Embedded Cassandra testing environments:

allixender commented 9 years ago

Update:

I'll use this issue for another Cassandra connection problem I stumbled over. The spark submit for the CassandraIngestCommand includes the parameter --host:

### INGEST SPATIAL GEOTIFF IN CASSANDRA ###
# geotrellis-spark JAR. Shouldn't have to change this one if running in the root folder (remember to run ./sbt "project spark" assembly)
JAR=/home/akmoch/dev/build/geotrellis-cassandra/spark/target/scala-2.10/geotrellis-spark-assembly-0.10.0-SNAPSHOT.jar

# Directory with the input tiled GeoTIFF's, need to comply to ../ne*.tif rule **weird?*
# INPUT=file:/home/akmoch/dev/build/geotrellis-cassandra/raster-test/data/one-month-tiles-tiff
INPUT=/home/akmoch/dev/build/geotrellis-cassandra/raster-test/data/one-month-tiles-tiff

export SPARK_HOME=/home/akmoch/dev/spark-1.2.2-bin-hadoop2.4

# Table to store tiles
TABLE=nexmonth

# Name of the layer. This will be used in conjunction with the zoom level to reference the layer (see LayerId)
LAYER_NAME=nexmonth

# This defines the destination spatial reference system we want to use
# (in this case, Web Mercator)
CRS=EPSG:3857

# true means we want to pyramid the raster up to larger zoom levels,
# so if our input rasters are at a resolution that maps to zoom level 11, pyramiding will also save
# off levels 10, 9, ..., 1.
PYRAMID=true

# true will delete the HDFS data for the layer if it already exists.
# CLOBBER=true
CLOBBER=false
# true will delete the HDFS data for the layer if it already exists.
# CLOBBER=true
CLOBBER=false

# We need to remove some bad signatures from the assembled JAR. We're working on excluding these
# files as part of the build step, this is a workaround.
zip -d $JAR META-INF/ECLIPSEF.RSA
zip -d $JAR META-INF/ECLIPSEF.SF

# Run the spark submit job

# sbt, sbt assembly to gen JAR

$SPARK_HOME/bin/spark-submit \
--class geotrellis.spark.ingest.CassandraIngestCommand \
$JAR \
--host localhost --keyspace ingest \
--crs $CRS \
--pyramid $PYRAMID --clobber $CLOBBER \
--input $INPUT \
--layerName $LAYER_NAME \
--table $TABLE

Hint! Cassandra keyspace ingest was hardcoded here, should probably factor out as VARIABLE too.

Although obviously --host localhost is provided and in geotrellis-spark CassandraIngestCommand.scala(line 37) sparkConf.set("spark.cassandra.connection.host", args.host) seems to promote this paramter to the spark context which in turn would be evaluated from the spark-cassandra-connector ...

Exception in thread "main" java.io.IOException: Failed to open native connection to Cassandra at {161.65.43.142}:9042

.. it consistently want to connect the external host ip, but not localhost :-( With external eth0 IP, spark-master bound to external and cassandra listen_address to external, the CassandraIngest worked now. What a journey. This IP handling needs to be more reliable, particularly for test runs on localhost.

allixender commented 9 years ago

Update on EmbeddedCassandra

Overall it seems it isn't too complicated to actually start an "embedded" cassandra instance in the testing JVM, e.g. as can be seen here from Netflix/astyanax . However, there are also tool kits that make life easier.

I spend some time on CassandraUnit. Like Netflix this also pure Java stuff. CassandraUnit provided an easy interface to start a cassandra instance, this requires a typical cassandra.yaml config file. They suggest to put the storage and log paths into targetembeddedCassandra` of your project. Seems to work fine. It starts basically a full cassandra db.

Secondary, CassandraUni provides some neat helpers to load datasets, create keyspaces etc. CassandraUnit is tightly integrated with JUnit, so from Scala / Specs2 perspective I couldn't immediately take all advantages of JUnit, like @Rule or @Before. But with a few more lines of code, one can easily reproduce the starting of Cassandra, creating of keyspaces and loading of initial CQL statements.

libraryDependencies ++= Seq(
  "org.cassandraunit"           % "cassandra-unit"        % "2.1.3.1"   % "test",
  "org.cassandraunit"           % "cassandra-unit-spring" % "2.1.3.1"   % "test"  exclude("org.cassandraunit", "cassandra-unit"),
  "org.springframework"         % "spring-test"           % "4.1.6.RELEASE" % "test"
)
import org.specs2.mock._
import org.specs2.mutable._
import org.specs2.runner._
import org.specs2.execute._
import org.specs2.specification.{ SpecificationStructure, Fragments, Step }
import org.junit.runner._

import com.datastax.driver.core.{ ProtocolOptions, Session, Cluster }
import com.datastax.driver.core.{ BoundStatement, Cluster, Row }
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.ResultSet;

import org.cassandraunit.CassandraCQLUnit
import org.cassandraunit.CQLDataLoader
import org.cassandraunit.dataset.CQLDataSet
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet

import org.cassandraunit.utils.EmbeddedCassandraServerHelper
import scala.io.Source
import java.io.File

@RunWith(classOf[JUnitRunner])
class EmbeddedCassandraSpec extends PlaySpecification {

  // this statement is supposed to take care of cassandra start and load dataset, but uses JUnit annotations that I didn't seem to integrate here with specs2
  // var cassandraUnit = new CassandraCQLUnit(new ClassPathCQLDataSet("test1-embedded.cql"), "cassandra.yaml", "127.0.0.1", 9146, 30000)
  EmbeddedCassandraServerHelper.startEmbeddedCassandra("cassandra.yaml", 30000)

  "EmbeddedCassandra Tests" should {

    "connect an embedded cassandra instance" in new WithApplication {
      // your test code
      val dataSet = new ClassPathCQLDataSet("test1-embedded.cql", "geotrellistest")

      lazy val embeddedCluster: Cluster =
        Cluster.builder().
          addContactPoint("127.0.0.1").
          withPort(9146).
          build()
      val t_session = embeddedCluster.connect()
      var dataLoader: CQLDataLoader = new CQLDataLoader(t_session)
      dataLoader.load(dataSet)
      val session = dataLoader.getSession()

      // val session = embeddedCluster.connect("geotrellistest")
      val countAll = new BoundStatement(session.prepare("select count(*) from attributes;"))
      val retVal = session.executeAsync(countAll)
      retVal.get.one().getLong("count") must equalTo(50)

    }

    "demo load an embedded cassandra instance" in new WithEmbeddedData {

      // here we can directly connect to the keyspace already, because cassandra is setup
      lazy val embeddedCluster: Cluster =
        Cluster.builder().
          addContactPoint("127.0.0.1").
          withPort(9146).
          build()

      val session = embeddedCluster.connect("peekabootest")
      val preparedStatement = session.prepare("insert INTO attributes (name, index, tileid ) VALUES (?, ?, ?);")
      val insert = session.execute(preparedStatement.bind("layerX","XCV45vC4","101")
      val countAll = new BoundStatement(session.prepare("select count(*) from attributes;"))
      val retVal = session.executeAsync(countAll)
      retVal.get.one().getLong("count") must equalTo(101)

    }
  }
}
allixender commented 9 years ago

Side note on Testing with sbt

~/dev/geotrellis$ ./sbt "project spark" 'test-only geotrellis.spark.io.cassandra.CassandraRasterCatalogSpec'

Also, parallelExecution in Test := false is set in project/Build.scala

allixender commented 9 years ago

When integrating cassandraunit in geotrellis-spark and write a test, it'll fail with a sadly obscure sbt.ForkMain 37848 failed with exit code 137

Exception in thread "Thread-1" Exception in thread "Thread-5" java.io.EOFException
        at java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2598)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1318)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.scalatest.tools.Framework$ScalaTestRunner$Skeleton$1$React.react(Framework.scala:945)
        at org.scalatest.tools.Framework$ScalaTestRunner$Skeleton$1.run(Framework.scala:934)
        at java.lang.Thread.run(Thread.java:745)
java.io.EOFException
        at java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2598)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1318)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at sbt.React.react(ForkTests.scala:114)
        at sbt.ForkTests$$anonfun$mainTestTask$1$Acceptor$2$.run(ForkTests.scala:74)
        at java.lang.Thread.run(Thread.java:745)

presumably a memory thing? I have 8GB RAM available, Spark would start and take a lot and then in the same JVM a Cassandra will be started and take easily another 1-2GB, computer becomes even unresponsive at times ... I think I need to limit some resources somewhere.

allixender commented 9 years ago

TL;DR: an EmbeddedCassandra daemon can be started, CassandraAttributeStoreSpec test successfully completes. With same config CassandraRasterCatalogSpec test fails because it can't connect to Cassandra. I don't get it.

So in hindsight this was apparently a problem with the cassandra template https://github.com/geotrellis/geotrellis/blob/feature/cassandra/spark/src/test/resources/cassandra-default.yaml.template from a commit in April https://github.com/geotrellis/geotrellis/commit/402d1ecc4150cb081476b7a8bc7de8db132f7fbb where the cassandra template +native_transport_port: 9160 was set to the same port number as the +rpc_port: 9160

Therefore the EmbeddedCassandra daemon would load and really hard try to bind the same ports and just die. However, this was not visible immediately in the logs or the console.

Having a valid cassandra.yaml in place, the EmbeddedCassandra daemon starts up nicely in the tests.

Also, updating to the DataStax Spark Cassandra Connector version 2.1.2 the setup methods of this EmbeddedCassandra instance have slightly changed signatures. Have to get the Host and and the port properly to "find" the instance for the session in the tests. It would be of advantage to possibly trim down some of the parameters, as the test instance does not need the performance tuning of production instances. Commit https://github.com/allixender/geotrellis/commit/32df63f5fbe864e4f5921375e5aca71c6692b2a4

So geotrellis.spark.io.cassandra.CassandraAttributeStoreSpec test would run successfully now.

But in the geotrellis.spark.io.cassandra.CassandraRasterCatalogSpec it would still fail, as this Test actually tries to connect from within the SparkContext, which fails, because it didn't receive the config:

Cause: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /161.65.43.142:9042 (com.datastax.driver.core.TransportException: [/161.65.43.142:9042]

(This is my networked IP address, instead the provided cassandra host and port 127.0.0.1:9042), by the way, with the new yaml config and getHost/getNativePort parse also connect to a random other configured port number will work, which is nice if you have another cassandra instance running locally.

UPDATE might actually not work, changing native port fails, presumably because the EmbeddedCassandra that we are inheriting from DataStax spark cassandra connector integration test only waits for the default post

Doesn't make any sense, because both tests and also the scala main imlementation eg CassandraIngestCommand only rely on the actual session object which is available :angry:

So, how does the GeoTrellis Spark Tests' sparkContext actually take and use the provided values?

def withSession(host: String, nativePort: Int, keySpace: String)(f: CassandraSession => Unit)(implicit sc: SparkContext): Unit = {
    var sparkConf = sc.getConf
    sparkConf.set("spark.cassandra.connection.host", host)
    sparkConf.set("spark.cassandra.connection.native.port", nativePort.toString)
    val hadoopConf = sc.hadoopConfiguration
    val connector = CassandraConnector(sparkConf)
    val session = new CassandraSession(connector, keySpace)
    session.execute(deleteKeyspaceCql)
    session.execute(createKeyspaceCql)
    f(session)
  }

This is the shared session provider for both test specs, but only one test always works, and the always fails, even when run independently with testOnly ?!

allixender commented 9 years ago

In CassandraAttributeStore.scala the session object is used directly for the QueryBuilder. However, in CassandraRasterCatalog.scala the session object is actually not used. AFAIR the cassandra session object is actually an underlying handle under the spark cassandra connector object, and within CassandraRRD spark usage is usually not exposed and only used for explicit queries. But for CassandraRRD usage, the SparkContext must be "like really active" and has to provide the connection details. And apparently this is not happening here in the test. That's why the AttributeStoreTest works, because all tests are executed on the explicit session object, which is successfully created in the EmbeddedCassandra object, because here a valid spark cassandra connector is created from the (at least transitively accessible) updated sc SparkConf

allixender commented 9 years ago

Ha :triumph:

Ok, the SparkConf update in the embedded cassandra object didn't propagate into the SparkContext for the actual tests, it was only available as a config element holder for the connector to create a session

val testValue = sc.getConf.get("spark.cassandra.connection.host") is not available in the SparkConf for the tests

[error] Uncaught exception when running geotrellis.spark.io.cassandra.CassandraAttributeStoreSpec: java.util.NoSuchElementException: spark.cassandra.connection.host
[trace] Stack trace suppressed: run last spark/test:testOnly for the full output.
[info] Run completed in 17 seconds, 112 milliseconds.
[info] Total number of tests run: 0
[info] Suites: completed 0, aborted 0
[info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
[info] No tests were executed.
[error] Error during tests:
[error]         geotrellis.spark.io.cassandra.CassandraAttributeStoreSpec
[error] (spark/test:testOnly) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 23 s, completed 5/06/2015 8:50:44 PM
spark > last spark/test:testOnly
[debug] Forking tests - parallelism = false
[debug] Create a single-thread test executor
[debug] Runner for org.scalatest.tools.Framework produced 1 initial tasks for 1 tests.
[debug]   Running TaskDef(geotrellis.spark.io.cassandra.CassandraAttributeStoreSpec, sbt.ForkMain$SubclassFingerscan@15ea0f2c, false, [SuiteSelector])
[error] Uncaught exception when running geotrellis.spark.io.cassandra.CassandraAttributeStoreSpec: java.util.NoSuchElementException: spark.cassandra.connection.host
sbt.ForkMain$ForkError: spark.cassandra.connection.host
        at org.apache.spark.SparkConf$$anonfun$get$1.apply(SparkConf.scala:167)
        at org.apache.spark.SparkConf$$anonfun$get$1.apply(SparkConf.scala:167)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.SparkConf.get(SparkConf.scala:167)
        at geotrellis.spark.io.cassandra.CassandraAttributeStoreSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(CassandraAttributeStoreSpec.scala:39)
        at geotrellis.spark.OnlyIfCanRunSpark$class.ifCanRunSpark(OnlyIfCanRunSpark.scala:46)
        at geotrellis.spark.io.cassandra.CassandraAttributeStoreSpec.ifCanRunSpark(CassandraAttributeStoreSpec.scala:22)
        at geotrellis.spark.io.cassandra.CassandraAttributeStoreSpec$$anonfun$1.apply$mcV$sp(CassandraAttributeStoreSpec.scala:31)
        at org.scalatest.SuperEngine.registerNestedBranch(Engine.scala:613)
        at org.scalatest.FunSpecLike$class.describe(FunSpecLike.scala:357)
        at org.scalatest.FunSpec.describe(FunSpec.scala:1626)
        at geotrellis.spark.io.cassandra.CassandraAttributeStoreSpec.<init>(CassandraAttributeStoreSpec.scala:30)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at java.lang.Class.newInstance(Class.java:383)
        at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:641)
        at sbt.ForkMain$Run$2.call(ForkMain.java:294)
        at sbt.ForkMain$Run$2.call(ForkMain.java:284)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
allixender commented 9 years ago

Latest commit https://github.com/allixender/geotrellis/commit/9c4ef7f4cea786004035faf315a9e4319ab0cd0b

all cassandra unit tests complete successfully (embedded)

allixender commented 9 years ago

With JDK 7 there might not be enough PermGenSpace provided to safely run Cassandra / CassandraSparkConnector / Spark / GeoTrellis Cassandra Unit tests together

And only in CassandraRasterCatalogSpec.scala where the the Cassandra Spark Connector for CassandraRRD is actually used, not in the CassandraAttributeStoreSpec.scala where only the underlying native CQL session is used.

[info] Cassandra Raster Catalog with Spatial Rasters
1024
Internal error when running tests: java.lang.OutOfMemoryError: PermGen space
Exception in thread "Thread-1" java.io.EOFException
        at java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2598)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1318)
        at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
...

the actual OutOfMem / PermGenSpace exception here:

15:54:21 EmbeddedCassandra: finally in EmbeddedCassandraSession
[info] CassandraRasterCatalogSpec:
[info] Cassandra Raster Catalog with Spatial Rasters
1024
[info] Exception encountered when attempting to run a suite with class name: geotrellis.spark.io.cassandra.CassandraRasterCatalogSpec *** ABORTED ***
[info]   java.lang.OutOfMemoryError: PermGen space
[info]   at java.lang.ClassLoader.defineClass1(Native Method)
[info]   at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
[info]   at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
[info]   at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
[info]   at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
[info]   at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
[info]   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
[info]   at java.security.AccessController.doPrivileged(Native Method)
[info]   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
[info]   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

Most interestingly it will run fine under JDK 8 (Oracle)