twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.6+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.61k stars 158 forks source link

Batch and retry uncommitted records with group consumer and kotel #684

Closed yakirSalt closed 2 months ago

yakirSalt commented 4 months ago

Hi,

I have 2 issues, one with batch and retry of records using group consumer, the second with making kotel work. I'm trying to understand how can I configure Batch fetch with group consumer, and for the messages that are not committed manually due to some logical error (AutoCommit is disabled) - retry the business logic.

Client configuration:

opts := []kgo.Opt{
  kgo.SeedBrokers("localhost:9092"),
  kgo.ConsumerGroup(myGroupId),
  kgo.ConsumeTopics(myTopicName),
  kgo.DisableAutoCommit(),
}
client, _ := kgo.NewClient(opts...)

A pseudo code example:

for {
  fetches := client.PollFetches()
  if fetches.Errors() > 0 {
    continue
  }

  fetches.EachRecord(func(record){
    res, err := businessLogic(record)
    if err != nil {
      // error occurred, hopes to try again next iteration (PollFetches)
      log.Error("failed logic")
    } else {
      // logic finished successfully, commit message to prevent next iteration over the same message
     client.CommitRecords(ctx, record)
    }
  })
}

There are a few issues I encountered:

  1. Once the records were fetched, they won't be iterateable again unless I restart the connection itself as no commits were done
  2. I can't manage batch polling myself due to same reason in point 1 above

So I have 2 questions:

  1. Is there a way I can poll the messages in selected batch using consumer group, and re-iterate messages that weren't committed without re-creating the client? I've seen the client.PollRecords with kgo.BlockRebalanceOnPoll and client.AllowRebalance but didn't really understand that, nor was I sure it's what I needed.
  2. I tried to instrument using kotel, but I just couldn't make it work. my local Jaeger didn't get any traces.

Example of the code I tried to use when implementing kotel, I used the example file examples/hooks_and_logging/plugin_kotel/main.go:

func jaegerTP() (*sdktrace.TracerProvider, error) {
    exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint("http://localhost:14268/api/traces")))
    if err != nil {
        return nil, err
    }

    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exp),
        sdktrace.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceNameKey.String("my-service"),
            semconv.DeploymentEnvironmentKey.String("my-env"),
        )),
    )

    //otel.SetTracerProvider(tp) // tried with that too...

    return tp, err
}

func newClientWithTracing(topicName string) (*kgo.Client, *kotel.Tracer) {
    //tracerProvider, err := initTracerProvider() // tried with that too...
    tracerProvider, err := jaegerTP()
    if err != nil {
        panic(err)
    }
    defer func() {
        if err := tracerProvider.Shutdown(context.Background()); err != nil {
            log.Printf("Error shutting down tracer provider: %v", err)
        }
    }()

    kotelTracer := newKotelTracer(tracerProvider)
    kotelService := newKotel(kotelTracer)
    opts := []kgo.Opt{
        kgo.SeedBrokers("localhost:9092"),
        kgo.ConsumerGroup("my-service"),
        kgo.ConsumeTopics(topicName),
        kgo.DisableAutoCommit(),
        kgo.WithHooks(kotelService.Hooks()...),
    }

    client, err := kgo.NewClient(opts...)
    if err != nil {
        panic(err)
    }
    return client, kotelTracer
}

func consumeMessages() {
  // code...
  fetches.EachRecord(func(msg *kgo.Record) {
  _, span := ktracer.WithProcessSpan(msg)
  log.Printf("received message: key=%s value=%s", string(msg.Key), string(msg.Value))
  span.End()
  })
  // code...
}
yakirSalt commented 4 months ago

I understand why the retry won't work with PollFetches, as it will fetch all new records to be managed and will wait until new records are arrive. So is there a way that I can re-process uncommitted records by polling the topic?

twmb commented 4 months ago
  1. No, and this is a common question I should probably move to a FAQ somewhere. This last came up in this repo here: https://github.com/twmb/franz-go/issues/440, but has come up in Slack/Discord as well. You can manually rewind with SetOffsets but it's finicky. My recommendation is to stay on the current record until you successfully process it or, if that's untenable, reproduce it to a dead letter queue for after-the-fact follow up.

  2. I don't use the kotel endpoint myself, but know that others have successfully. I don't think I can answer this one. Tagging @brunsgaard to take a quick glance if he has time but I know he's also busy.

yakirSalt commented 4 months ago

hey @twmb, thanks for the clarification about question 1! knowing it's not supported also helps

yakirSalt commented 4 months ago

@twmb anyone who can help me Kotel?

twmb commented 3 months ago

I'll also tag @yianni perhaps but honestly, I'm not positive. kotel seems to work fine for other users so I'd suspect it's something on the ingestion side, or configuration for how to send?

yianni commented 3 months ago

Hi @yakirSalt! Quick note to confirm Jaeger and Kotel do work together. It seems the issue could be what @twmb pointed out. Maybe a quick config check could help? Cheers

twmb commented 2 months ago

This issue looks to be resolved; kotel integration looks to be a configuration setup issue perhaps.