outworkers / phantom

Schema safe, type-safe, reactive Scala driver for Cassandra/Datastax Enterprise
http://outworkers.github.io/phantom/
Apache License 2.0
1.05k stars 186 forks source link

Integrating Phantom with Opentracing #733

Open wsargent opened 7 years ago

wsargent commented 7 years ago

I'm looking to see how to integrate the java-cassandra-driver for opentracing with Phantom.

The Java API expects to wrap Cluster, and takes an Initializer:

// Instantiate tracer
Tracer tracer = ...

// Instantiate Cluster Builder
 Cluster.Builder builder = Cluster.builder().addContactPoints("127.0.0.1").withPort(9142);

// Instantiate Tracing Cluster
Cluster cluster = new TracingCluster(builder, tracer);

https://github.com/opentracing-contrib/java-cassandra-driver#usage

But there doesn't seem to be an accessible way to do this from Phantom -- the closest I've found is extend the SessionProvider:

class OpentracingSessionProvider(space: KeySpace, builder: ClusterBuilder) extends DefaultSessionProvider(space, builder) with Traceable {
  override lazy val cluster: Cluster = new TracingCluster(builder(Cluster.builder), tracer)
}

But from there, it doesn't seem practical to swap out the SessionProvider instantiation. I can't map it through the Cluster.Builder as the API expects the end result.

So where I am right now is...

package sbux.ucp.simple.user.cassandra

import com.datastax.driver.core.{Cluster, Session, _}
import com.outworkers.phantom.connectors.{
  ClusterBuilder,
  KeySpaceCQLQuery,
  SessionAugmenterImplicits,
  SessionProvider
}
import com.outworkers.phantom.dsl._
import io.opentracing.Tracer
import org.slf4j.LoggerFactory

import scala.concurrent.blocking
import scala.util.control.{NoStackTrace, NonFatal}
import scala.util.{Failure, Success, Try}

class OpentracingSessionProvider(tracer: Tracer,
                                 val space: KeySpace,
                                 builder: ClusterBuilder,
                                 autoinit: Boolean = true,
                                 keyspaceQuery: Option[KeySpaceCQLQuery] = None,
                                 errorHandler: Throwable => Throwable = identity)
    extends SessionProvider {

  val logger = LoggerFactory.getLogger(this.getClass)

  val cluster: Cluster = builder(Cluster.builder).build

  def defaultKeyspaceCreationQuery(session: Session, keySpace: String): String = {
    s"CREATE KEYSPACE IF NOT EXISTS $keySpace WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};"
  }

  /**
   * Initializes the keySpace with the given name on
   * the specified Session.
   */
  protected[this] def initKeySpace(session: Session, space: String): Session = blocking {
    blocking {
      val query =
        keyspaceQuery.map(_.queryString).getOrElse(defaultKeyspaceCreationQuery(session, space))
      logger.info(s"Automatically initialising keyspace $space with query $query")
      session.execute(query)
    }
    session
  }

  /**
   * Creates a new Session for the specified keySpace.
   */
  protected[this] def createSession(keySpace: String): Session = {
    Try {
      val session = blocking {
        cluster.connect
      }

      if (autoinit) {
        initKeySpace(session, keySpace)
      } else {
        logger.info(s"Auto-init set to false, keyspace $space is not being auto-created.")
        session
      }
    } match {
      case Success(value) => value
      case Failure(NonFatal(err)) => throw errorHandler(err);
    }
  }

  val session: Session = createSession(space.name)
}

class OpentracingCassandraConnection(tracer: Tracer,
                                     name: String,
                                     clusterBuilder: ClusterBuilder,
                                     autoinit: Boolean,
                                     keyspaceFn: Option[KeySpaceCQLQuery] = None,
                                     errorHandler: Throwable => Throwable = identity) { outer =>

  import scala.collection.JavaConverters._

  lazy val provider = new OpentracingSessionProvider(
    tracer,
    KeySpace(name),
    clusterBuilder,
    autoinit,
    keyspaceFn,
    errorHandler
  )

  /**
   * The Session associated with this keySpace.
   */
  lazy val session: Session = provider.session

  def cassandraVersions: Set[VersionNumber] = {
    session.getCluster.getMetadata.getAllHosts.asScala
      .map(_.getCassandraVersion)
      .toSet[VersionNumber]
  }

  def cassandraVersion: Option[VersionNumber] = {
    val versions = cassandraVersions

    if (versions.nonEmpty) {

      val single = versions.headOption

      if (cassandraVersions.size == 1) {
        single
      } else {

        if (single.forall(item => versions.forall(item ==))) {
          single
        } else {
          throw new RuntimeException(
            s"Illegal single version comparison. You are connected to clusters of different versions." +
              s"Available versions are: ${versions.mkString(", ")}"
          ) with NoStackTrace
        }
      }
    } else {
      throw new RuntimeException(
        "Could not extract any versions from the cluster, versions were empty")
    }
  }

  /**
   * Trait that can be mixed into `CassandraTable`
   * instances.
   */
  trait Connector
      extends com.outworkers.phantom.connectors.Connector
      with SessionAugmenterImplicits {

    lazy val provider: OpentracingSessionProvider = outer.provider

    lazy val keySpace: String = outer.name

    implicit val space: KeySpace = KeySpace(outer.name)

    def cassandraVersion: Option[VersionNumber] = outer.cassandraVersion

    def cassandraVersions: Set[VersionNumber] = outer.cassandraVersions
  }
}

object OpentracingContactPoints {

  /**
   * A keyspace builder based on the specified
   * contact points, all running on the default port.
   */
  def apply(tracer: Tracer, hosts: Seq[String]): OpentracingKeySpaceBuilder =
    new OpentracingKeySpaceBuilder(tracer, _.addContactPoints(hosts: _*))

  /**
   * A keyspace builder based on the specified
   * contact points, all running on the specified port.
   */
  def apply(tracer: Tracer, hosts: Seq[String], port: Int): OpentracingKeySpaceBuilder =
    new OpentracingKeySpaceBuilder(tracer, _.addContactPoints(hosts: _*).withPort(port))
}

class OpentracingKeySpaceBuilder(tracer: Tracer, clusterBuilder: ClusterBuilder) {

  /**
   * Specify an additional builder to be applied when creating the Cluster instance.
   * This hook exposes the underlying Java API of the builder API of the Cassandra
   * driver.
   */
  def withClusterBuilder(builder: ClusterBuilder): OpentracingKeySpaceBuilder =
    new OpentracingKeySpaceBuilder(tracer, clusterBuilder andThen builder)

  /**
   * Disables the heartbeat for the current builder.
   * This is designed for local instantiations of connectors or test environments.
   * @return A new cluster builder, with the heartbeat interval set to 0(disabled).
   */
  def noHeartbeat(): OpentracingKeySpaceBuilder = {
    new OpentracingKeySpaceBuilder(tracer,
                                   clusterBuilder andThen (_.withPoolingOptions(
                                     new PoolingOptions().setHeartbeatIntervalSeconds(0))))
  }

  /**
   * Creates and can initialise a keyspace with the given name.
   * @param name The name of the keyspace, case sensititve by default.
   * @param autoinit Whether or not to automatically initialise the keyspace before the session is created.
   * @param query The builder to use when producing the keyspace query.
   * @return
   */
  def keySpace(
      name: String,
      autoinit: Boolean = true,
      query: Option[KeySpaceCQLQuery] = None,
      errorHandler: Throwable => Throwable = identity
  ): OpentracingCassandraConnection = {
    new OpentracingCassandraConnection(tracer, name, clusterBuilder, autoinit, query, errorHandler)
  }

  /**
   * Creates and can initialise a keyspace with the given name.
   * This will automatically initialise the keyspace by default, as we consider
   * passing a specific keyspace query indicates clear intent you want this to happen.
   * @param name The name of the keyspace, case sensititve by default.
   * @param query The builder to use when producing the keyspace query.
   * @return
   */
  @deprecated("Simply pass in a keySpace query, the keyspace is not required", "2.8.5")
  def keySpace(
      name: String,
      query: KeySpaceCQLQuery
  ): OpentracingCassandraConnection = {
    new OpentracingCassandraConnection(tracer, name, clusterBuilder, true, Some(query))
  }

  /**
   * Creates and can initialise a keyspace with the given name.
   * This will automatically initialise the keyspace by default, as we consider
   * passing a specific keyspace query indicates clear intent you want this to happen.
   * @param query The builder to use when producing the keyspace query.
   * @return
   */
  def keySpace(
      query: KeySpaceCQLQuery
  ): OpentracingCassandraConnection = {
    new OpentracingCassandraConnection(tracer, query.keyspace, clusterBuilder, true, Some(query))
  }
}
alexflav23 commented 7 years ago

Hi @wsargent,

There's probably a far far easier way to expose this from within phantom, let us think of a way to include it in the next release, sounds like all you need is access to that session initialiser, we can de-couple that implementation and allow you to deal with it from within the phantom builder itself.

Regards,

wsargent commented 7 years ago

Thanks! I know that working with a type safe builder pattern can be involved -- what I've done in the past has been to use a trait with an abstract type member Self:

https://github.com/playframework/play-ws/blob/master/play-ws-standalone/src/main/scala/play/api/libs/ws/StandaloneWSRequest.scala#L14

so implementations can return the subtype:

https://github.com/playframework/play-ws/blob/master/play-ahc-ws-standalone/src/main/scala/play/api/libs/ws/ahc/StandaloneAhcWSRequest.scala#L44

and not the super type like you'd normally get. (There was a reason I didn't use this.type, but I'm afraid I don't remember it right now).

alexflav23 commented 7 years ago

@wsargent Where is the actual line where the tracer gets used? This has now become top priority and we can deliver the functionality as part of phantom, but I can't really tell rom the above code where it's interplaying with Cassandra, I'm guessing that's inside OpentracingCassandraConnection, but that particular code is not found in the things you pasted above.

Please let me know if you can so we can push this to prod quickly for you.

wsargent commented 7 years ago

The TracingCluster is part of the opentracing-cassandra-driver, so I've been mapping everything out through there. OpentracingCassandraConnection is my rendering of https://github.com/outworkers/phantom/blob/develop/phantom-connectors/src/main/scala/com/outworkers/phantom/connectors/CassandraConnection.scala

alexflav23 commented 5 years ago

Hi @wsargent This is becoming a priority on our part soon. However, I don't really like all the open tracing code, because it's got things that work around query strings, which is necessary in the raw driver but not so much in Phantom, because we actually have complex ADTs to model all CQL data-structures and things like that. Query generation is an afterthought from the DSL structures, so we wouldn't need most of the code found in the example.

It may take a while, but if you could invest some time at least to sanity check the output in follow up PRs, I'm sure we can get this out, I know it's already been more than a while since we last discussed this.

wsargent commented 5 years ago

@alexflav23 if you can point me at them I'll take a look when I have time. Fair warning, it may be a couple of weeks.