tumblr / colossus

I/O and Microservice library for Scala
Apache License 2.0
1.14k stars 96 forks source link

Fixes to Pipe filterScan, CircuitBreaker Error suppression #504

Closed DanSimon closed 7 years ago

DanSimon commented 7 years ago

So this fixes to main issues:

  1. Pipe.filterScan was not correctly triggering push signals. In a nutshell, when a pipe is full, agents trying to push to the pipe will get a PushResult.Full containing a signal that they can map on for when the pipe clears up and is again pushable. filterScan is a method on BufferedPipe to traverse buffered items and remove any that don't pass a filter function. This is used by ServiceClient's pending and sent buffers. The issue was that when filterScan opened up a buffer, the push signals were not triggered. This lead to a bug where ServiceClient would not properly resume sending requests if it disconnected and reconnected while the buffer was full.

  2. CircuitBreaker now suppresses all closes/terminations. This type mutably wraps a Pipe and suppresses its error states. In particular the goal is for two circuitbreakers to link together, and even if the underlying pipes terminate/close, the link between the circuitbreakers is not severed. This is used, for example, to connect ServiceClient to its underlying Controller. In general this prevents things like disconnects from accidentally severing these links. However we still need to detect when the underlying Pipe terminates/closes, so a new onBreak method has been added to deal with such states. This is used, for example, in StreamTranscodingController to close the connection when a stream hits an error.

Issue 2 is leading me to believe that we need a different interface than pipes to handle the message passing between connection handler layers, something with the same back-pressure semantics but without the notion of termination/closed, which only makes sense for actual streams of data. For now though these changes will get us the functionality we want.

codecov-io commented 7 years ago

Codecov Report

Merging #504 into develop will decrease coverage by -0.22%. The diff coverage is 83.33%.

@@             Coverage Diff             @@
##           develop     #504      +/-   ##
===========================================
- Coverage    84.89%   84.67%   -0.22%     
===========================================
  Files           98       98              
  Lines         4264     4288      +24     
  Branches       346      348       +2     
===========================================
+ Hits          3620     3631      +11     
- Misses         644      657      +13
Impacted Files Coverage Δ
...in/scala/colossus/streaming/StreamTranscoder.scala 84.21% <100%> (-4.68%) :x:
...ossus/src/main/scala/colossus/streaming/Pipe.scala 91.66% <100%> (+0.07%) :white_check_mark:
...ossus/src/main/scala/colossus/streaming/Sink.scala 79.41% <100%> (ø) :white_check_mark:
...main/scala/colossus/streaming/CircuitBreaker.scala 84.61% <79.31%> (-5.39%) :x:
...n/scala/colossus/controller/OutputController.scala 88.09% <0%> (-9.53%) :x:
...rc/main/scala/colossus/controller/Controller.scala 75% <0%> (-8.34%) :x:
...sus/src/main/scala/colossus/streaming/Source.scala 84.72% <0%> (-1.39%) :x:

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update d7c6a2a...bc6b4b0. Read the comment docs.