akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka/current/
Other
1.26k stars 647 forks source link

Monitoring connectors #463

Open svezfaz opened 6 years ago

svezfaz commented 6 years ago

Would Alpakka be the right place to put monitoring stages to augment streams with metrics? Something simple that would read like

val meter : Flow[T, T, NotUsed] = Monitoring.meter("meter-name")(ctx)
source.via(meter).runWith(sink)

or even as implicit combinators

val source: Source[T, M] = ???
source.metered("meter-name").runWith(sink)

different backend context could be used to make use of Kamon, Dropwizard (and others?)

tg44 commented 6 years ago

Or just make flows with callbacks. Today I implemented again a GraphStage with measurement logic and thought this could be a good place for it.

Some metric flow idea:

I think these type of flows could be good for load/acceptance testing, but could be used with MBeans, Kamon or any other metric tools (because you could do whatever you want on the function calls).

(If I will have some freetime I start a PR with these.)

huntc commented 6 years ago

I don't think Alpakka or Akka stream needs anything special to support instrumentation.

Here's an example of how I've been instrumenting streams. First, the instrumentation itself. I'm using OpenTracing for tracing and Dropwizard for metrics. Note that OpenTracing is morphing into more than just tracing and already supports the logging of structured data and metrics. Therefore, keeping what it means to represent an event in one place is recommended.

Note: Your Flow types will differ - the essential thing to consider is that a Span is carried through your stream when tracing. In addition, for when you're not tracing and just want to, say, count up a metric, drop the begin/end terminology and just have an event represented e.g. downlinkRxEvent

class ServerInstrumentation(metricRegistry: MetricRegistry, tracer: Tracer) {

  private val downlinkPacketsRxMetric = metricRegistry.counter("lora-server.downlink-packets-rx")
  def downlinkRxToAckBeginEvent[A]: Flow[A, (Span, A), NotUsed] =
    Flow.fromFunction[A, (Span, A)] { received =>
      val span = {
        val scope = tracer.buildSpan("downlink-rx-to-ack").startActive(false)
        try {
          scope.span()
        } finally {
          scope.close()
        }
      }
      downlinkPacketsRxMetric.inc()
      span -> received
    }

  private val downlinkPacketsTxMetric = metricRegistry.counter("lora-server.downlink-packets-tx")
  def downlinkRxToAckEndEvent[A]: Flow[(Span, A), A, NotUsed] =
    Flow.fromFunction[(Span, A), A] {
      case (span, received) =>
        downlinkPacketsTxMetric.inc()
        tracer.scopeManager().activate(span, true).close()
        received
    }
}

These flows can then be used without noise in your flow using via, for example:

Source
  .actorRef[UdpListener.ReceivedWithReplyTo](bufferSize, OverflowStrategy.dropHead)
  .via(serverInstrumentation.downlinkRxToAckBeginEvent)

HTH.

P.S. I'd happily take further advice on the naming of things - cc @viktorklang ;-)

tg44 commented 6 years ago

You don't need "special". These codes (like the one you posted) are often written, and most of the cases it has something wrong. For example the code you showed us modifies the type of the subflow, which can lead to huge refactors just to use that instrumentation code. I did that before, I know that pain... If you make a bidi from that you can share this state without boxing and sending this information through the whole stream.

I think this repo and the stream users can profit some well-written measuring flows.

(And it's not about the size. Why you use libs? I think you could build a http client too if you needed, but its easier to use one well-tested tool, than inventing it again and making the same mistakes as the lib has at the beginning.)

huntc commented 6 years ago

I acknowledge the additional complexity.

Here’s an additional reference indicating that metadata such as spans should be passed along with an element: https://groups.google.com/forum/m/#!topic/akka-user/N69wUSnDB4Q

tg44 commented 6 years ago

I think we are talking about different topics :D

As you said there is no need to add "monitoring" stages like some prebuild Kamon, Dropwizard, or OpenMetrics integration. This would be too specific, maybe too restrictive, and more complex than writing by hand for your exact needs. But. I'm trying to speak about some measuring stages whose could be easily used - for example with the three previously mentioned tools. Something like this really easy stage. But sometimes we write more complex stages just to measure if the up or downstream is the slower. Or we want to know periodically what was the throughput on the stream. I think these kind of "utility" stages would be helpful.

So the monitoring in my context is measuring the stream throughput, and not the processing and data propagation.

(Btw it would be interesting if we could pass (meta)data transparently between stages, but totally out of context from my original idea. Passing the metadata sometimes needed and good, but some cases really big overkill if you just want to check something :) )

huntc commented 6 years ago

I've probably added complexity to this issue, but what I was trying to do was show that when considering instrumentation in general, there are those instruments that require a start and an end.

Where a start and end isn't required though e.g. for the simple case of incrementing a counter like yours, then I still think that using .via(someMetricCountingFlow) is straightforward. Perhaps it'd be great to have commonly used metric registries supported in Alpakka. For example, I could envisage a flow for supporting various Dropwizard metric instruments. I'd say that Alpakka is definitely the place for these types of integrations.

With the example you cited, I think that'd be better placed in akka-contrib-extra as it isn't an application integration per se. That said, I'd personally prefer to rely on well tested/proven instruments that can be found in libraries such as DropWizard.

HTH.

svezfaz commented 6 years ago

Partially covering the subject discussed above, I published a small library trying to address some of the monitoring issues around Akka Streams.

docs: https://svezfaz.github.io/akka-stream-checkpoint/ repo: https://github.com/svezfaz/akka-stream-checkpoint

if you're in the mood for feedback feel free to take a look :) /cc @tg44 @huntc @francisdb @tfayruzov

tg44 commented 6 years ago

@svezfaz this is exactly what I missed. Seems good. Maybe I will try to use it in my new "mini" service at the weekend.

francisdb commented 6 years ago

@svezfaz will sure have a go at using your lib, looks like you did a great job on the docs.

mohnishkodnani commented 6 years ago

+1 on having these utility stages. Would be really helpful.