mtesseract / nakadi-client

Haskell Client Library for the Nakadi Event Broker
Other
13 stars 9 forks source link

Make consumption function return Void #135

Closed fghibellini closed 4 years ago

fghibellini commented 5 years ago

Our Nakadi instance experienced a data loss and thus started reporting negative unconsumed event counts. When a consumption is started in this state Nakadi will return a 200 with an empty body.

Currently this simply terminated the conduit @linesUnboundedAsciiC@. With this change this will throw a @ConsumptionStoppedException@.

I change the return type to Void to signal that the consumption function should never terminate (in our codebase people simply wrapped it in an infinite loop). The Nakadi docs (https://nakadi.io/manual.html#consuming-events-from-a-subscription) only talk about an "info" field returned in batches, which can "potentialy contain the reason Nakadi closed the connection". No more information about Nakadi-initiated closing of connections is provided.

fghibellini commented 5 years ago

To test the behaviour after fixing our Nakadi instance I used the following expressjs server:

const express = require('express')
const app = express()
const port = 9999

app.get('/subscriptions/33eee289-e57d-44af-9859-09501db0fcee/events', (req, res) => {
  res.header('X-Nakadi-StreamId', '123343')
  res.send('') // NOTICE EMPTY BODY
})

app.get('/subscriptions/33eee289-e57d-44af-9859-09501db0fcee', (req, res) => {
  res.header('Content-type', 'application/json')
  res.send(`
    {
      "owning_application": "order-service",
      "event_types": [
        "order.ORDER_RECEIVED"
      ],
      "consumer_group": "default",
      "read_from": "end",
      "id": "038fc871-1d2c-4e2e-aa29-1579e8f2e71f",
      "created_at": "2016-09-23T16:35:13.273Z"
    }
  `)
})

app.get('/event-types/order.ORDER_RECEIVED/partitions', (req, res) => {
  res.header('Content-type', 'application/json')
  res.send(`
    [
      {
        "partition": "1",
        "oldest_available_offset": "1",
        "newest_available_offset": "1",
        "unconsumed_events": 1
      }
    ]
  `)
})

app.listen(port, () => console.log(`Fake Nakadi listening on port ${port}!`))
mtesseract commented 5 years ago

Hi Filippo,

thank you very much for your contribution. I don't understand the situation entirely, so if you could elaborate a little bit, this would be very helpful for me. In particular I have the following questions:

When you say that you experienced data loss of "your" Nakadi instance, you are talking about data loss of a Nakadi server, I suppose?

If I understand you correctly, in your situation, the Nakadi server terminated a streaming request immediately with an empty body and you are suggesting a different behaviour for nakadi-client to deal better with such a scenario?

What is not clear to me, why exactly is it an advantage to have subscriptionProcessHandler throw an exception instead of returning unit when a streaming request is terminated? As I understand it, both approaches require the user of subscriptionProcess to deal with disconnects and wrap the subscription processing in a loop. I have the feeling that there is something here that I am missing.

[Side note: We could of course think about implementing a function, which is even more high-level than subscriptionProcess and takes care of reconnecting logic for the user, simplifying the code on the side of Nakadi consumers. But this is not what this PR is about.]

Thanks, Moritz

fghibellini commented 5 years ago

@mtesseract

When you say that you experienced data loss of "your" Nakadi instance, you are talking about data loss of a Nakadi server, I suppose?

Yes server

What is not clear to me, why exactly is it an advantage to have subscriptionProcessHandler throw an exception instead of returning unit when a streaming request is terminated? As I understand it, both approaches require the user of subscriptionProcess to deal with disconnects and wrap the subscription processing in a loop. I have the feeling that there is something here that I am missing.

the current return type is m () which would suggest the operation is expected to terminate. Above I linked the only point in the Nakadi docs that would suggest such a subscription should actually terminate. The problem is the code that it leads to - in our codebase we have effectively the following:

recovering recoveryPolicy exceptionHandlers $ \_ ->
  $ retrying retryPolicy (\retryStatus () -> pure True)
  $ \_ ->
    runResourceT $ runNakadiT nakadiConfiguration $ subscriptionProcess

You can see that the user has to wrap the call in 2 combinators - one to handle failure through exceptions and and one to handle "pure" failures.

This not only looks confusing but also doesn't work nicely with the retry policies.

Also in this specific case the user didn't feel the need to log anything in the case of the "pure failure" since "this should never happen" and there is no directly accessible exception to log so it was pretty painful to track down.

My argument is that given that the termination is not something the user wants to handle anyway (at least as long as Nakadi doesn't document the termination cases/scenarios so he would at least know what to do in such a situation) it should be handled as an error.

Thus allowing us to have simply:

recovering recoveryPolicy exceptionHandlers
  $ \_ ->
    runResourceT $ runNakadiT nakadiConfiguration $ subscriptionProcess

with this code anytime Nakadi fails and recovering restarts the process we simply log the exception and the recovery policy has predictable behavior.

I have sent you a link to the actual code in Zalando's gchat. You can also see more about the server issues in the "nakadi users" channel.

mtesseract commented 4 years ago

Hi,

thanks for your elaboration. I have the feeling that I understand the issue now and I am fine with merging something along these lines.

Some minor points:

fghibellini commented 4 years ago

Done! Let me know if it's ok like this.

mtesseract commented 4 years ago

This looks good to me, thank you! One thing is missing: the tests are currently broken. I don't know why the travis pipeline is not linked anymore from the PR. I think that was the case in the past. But I checked on https://travis-ci.org/mtesseract/nakadi-client and there one can see that the tests do not build at the moment. Do you want to fix this? I could also see if I have time to look into this.

fghibellini commented 4 years ago

I tried to fix it, and I think it should build fine now. ~~But github complains about a merge conflict so I rebased it on master and I think it's just stuck. Travis cannot pick the PR neither because of this.~~

mtesseract commented 4 years ago

Hmm. stream_limit.

I think your recent change to the tests causes the tests to not terminate?

nakadi-client Test Suite
  EventTypes
    BusinessEvents
      createAndDeleteEvent:                                       OK (0.68s)
      publishAndConsume:                                          
No output has been received in the last 10m0s, this potentially indicates a stalled build or something wrong with the build itself.
Check the details on how to adjust your build configuration on: https://docs.travis-ci.com/user/common-build-problems/#build-times-out-because-no-output-was-received
The build has been terminated

Let's think again about how to handle the stram_limit case. I would prefer to be able to keep it.

fghibellini commented 4 years ago

Could you describe its purpose? I removed it because there was no documentation about it neither in the code nor in the Nakadi docs.

mtesseract commented 4 years ago

This is what we are talking about, right? -> https://nakadi.io/manual.html#/subscriptions/subscription_id/events_get*stream_limit

fghibellini commented 4 years ago

I see. Then there actually is a documented reason for Nakadi to close the connection.

I don't think it makes sense for this driver to keep count of the number of events and conditionally throw an exception so I guess this PR can just be abbandoned.

(Still a weird decision for Nakadi to provide a way to get "the first N events" when it doesn't provide guarantees on the ordering across partitions. 🤷‍♂ )