lightbend / cloudflow

Cloudflow enables users to quickly develop, orchestrate, and operate distributed streaming applications on Kubernetes.
https://cloudflow.io
Apache License 2.0
321 stars 90 forks source link

Support for polymorphic ADTs as protocol between two Streamlets #203

Open wjglerum opened 4 years ago

wjglerum commented 4 years ago

We would like to use a polymorphic ADT as the message protocol between two Streamlets. Let's say for example I would like to design a SubscriptionProtocol with the following commands:

And I would like to use this as the protocol between two Streamlets with Avro. We can tune avrohugger in our build.sbt to generate ADTs from Avro IDL files. See below with the following example protocol:

@namespace("subscriptions.avro")
protocol SubscriptionProtocol {

  record SubscribeCmd {
    string subscriberId;
    string topicId;
  }

  record UnsubscribeCmd {
    string subscriptionId;
    string subscriberId;
  }
}

And the following sbt configuration for our datamodel module:

lazy val datamodel = appModule("datamodel")
  .enablePlugins(CloudflowLibraryPlugin)
  .settings(
    commonSettings,
    avroScalaSpecificCustomTypes in Compile :=
      avrohugger.format.SpecificRecord.defaultTypes.copy(protocol = avrohugger.types.ScalaADT),
    (sourceGenerators in Compile) += (avroScalaGenerateSpecific in Test).taskValue
  )

This will generate the following code for us (note: simplified)

sealed trait extends org.apache.avro.specific.SpecificRecordBase with Product with Serializable
final case class SubscribeCmd(var subscriberId: String, var topicId: String) extends org.apache.avro.specific.SpecificRecordBase with SubscriptionProtocol { ... }
final case class UnsubscribeCmd(var subscriptionId: String, var subscriberId: String) extends org.apache.avro.specific.SpecificRecordBase with SubscriptionProtocol { ... }

However when we use this as the protocol in our Inlets and Outlets with our Streamlets it doesn't work:

class ExampleStreamlet extends AkkaServerStreamlet {
  val out = AvroOutlet[SubscriptionProtocol]("out")
  final override val shape = StreamletShape.withOutlets(out)

  final override def createLogic = HttpServerLogic.default(this, out)
}
Streamlet 'com.example.ExampleStreamlet' could not be introspected. Its descriptor method threw an exception: null

So I was stuck on this approach.

Alternative approach

Next I took another approach where I would wrap different command inside a generic event. Which looks something like this:

@namespace("subscriptions.avro")
protocol SubscriptionProtocolV2 {

  enum EventType {
    SUBSCRIBE, UNSUBSCRIBE
  }

  record SubscribeCmdV2 {
    string subscriberId;
    string topicId;
  }

  record UnsubscribeCmdV2 {
    string subscriptionId;
    string subscriberId;
  }

  record SubscribeEvent {
    EventType eventType;
    union { SubscribeCmdV2, UnsubscribeCmdV2 } payload;
  }

This also fails to compile due to a limitiaton in the avrohugger plugin, as it can't generate a union with different records, only nullable fielts. Also support https://github.com/julianpeeters/avrohugger/issues/116 and below:

[error] (datamodel / Compile / avroScalaGenerateSpecific) Unions beyond nullable fields are not supported

We can fix this by using a GenericRecordBase, however the AvroInlets and AvroOutlet require a protocol which extends SpecificRecordBase. So we are stuck here too.

Workaround

We now use a temporary workaround by defining option fields for each command:

  record SubscribeEvent {
    EventType eventType;
    union { null, SubscribeCmdV2 } subscribeCmd;
    union { null, UnsubscribeCmdV2 } unsubscribeCmd;
  }

This generates the following Scala code:

object EventType extends Enumeration {
  type EventType = Value
  val SUBSCRIBE, UNSUBSCRIBE = Value
}
case class SubscribeCmdV2(subscriberId: String, topicId: String)
case class UnsubscribeCmdV2(subscriptionId: String, subscriberId: String)
case class SubscribeEvent(eventType: EventType.Value, subscribeCmd: Option[SubscribeCmdV2], unsubscribeCmd: Option[UnsubscribeCmdV2])

This works, but isn't pretty and not really extensible when we want to add more commands to this event.

Question

How could we support polymorphic ADTs in Cloudflow? Thanks!

Also see the discussion on [Gitter])https://gitter.im/lightbend/cloudflow?at=5e7ba8efaf5fed7748602ca8)

SemanticBeeng commented 2 years ago

I relate to this request because the business logic should speak the ubiquitous language of the domain model and not Avro. Here's my take on the potential solution:

AvroInlet does indeed depend on SpecificRecordBase image

But cloudflow itself does not. This can be seen looking around CodecInlet and Codec image

image

image

and see how the streamlets call Codec image

So, there is no cloudflow direct dependency on the SpecificRecordBase

This allows us to implement avro <-> Scala serde using GenericRecords.

This article has a very nice approach using avro-hugger, avro4s and shapeless, Coproducts and polymorphic functions. https://bitrock.it/blog/polymorphic-messages-in-kafka-streams.html

Avro doesn’t support inheritance between records, so any OOP strategy to have assets inherit properties from a common ancestor is unfortunately not viable.

image

image

image

@wjglerum @RayRoestenburg @michaelpnash please review and advise if this is agreeable.

Maybe we do not need expect cloudflow main code base to handle this because support can be implemented externally, in cloudflow-contrib for example.