mailgun / kafka-pixy

gRPC/REST proxy for Kafka
Apache License 2.0
773 stars 118 forks source link

gRPC GoLang quickstart guide #129

Closed kchodnicki closed 6 years ago

kchodnicki commented 6 years ago

Would be great to see gRPC quickstart guide for GoLang :)

hermanschaaf commented 6 years ago

Another 👍 from me. This is what it currently says in the quick-start-golang.md file:

If you need this, please create an issue and I will add the content in no time.

😄

horkhe commented 6 years ago

@hermanschaaf yea, it does not look like "no time" at all, but i finally did it. Please check it out and let me know if it is good enough.

hermanschaaf commented 6 years ago

Great, thanks @horkhe! I had to make some tweaks due to variable naming / syntax errors in the sample code, but got it working. Here is the modified version of RunConsumeNAck that works for me:


// Runs consume-n-ack loop until context is done. Note that inner gRPC calls
// do not use provided context, that is intentional because we want for the
// current request to finish gracefully. Otherwise previously consumed message
// may not be properly acknowledged and will be consumed again.
func RunConsumeNAck(ctx context.Context, group, topic string, msgHandler func(msg []byte)) error {
    // Consume first message.
    var rs *pb.ConsRs
    var err error
    for {
        select {
        case <-ctx.Done():
            return nil
        default:
        }
        rs, err = client.ConsumeNAck(context.TODO(), &pb.ConsNAckRq{
            Topic: topic,
            Group: group,
            NoAck: true,
        })
        if err != nil {
            if status.Code(err) == codes.NotFound {
                continue
            }
            return errors.Wrap(err, "while consuming first")
        }
        break
    }
    msgHandler(rs.Message)
    // Run consume+ack loop.
    ackPartition := rs.Partition
    ackOffset := rs.Offset
    for {
        select {
        case <-ctx.Done():
            return nil
        default:
        }
        rs, err = client.ConsumeNAck(context.TODO(), &pb.ConsNAckRq{
            Topic:        topic,
            Group:        group,
            AckPartition: ackPartition,
            AckOffset:    ackOffset,
        })
        if err != nil {
            if status.Code(err) == codes.NotFound {
                continue
            }
            return errors.Wrap(err, "while consuming")
        }
        ackPartition = rs.Partition
        ackOffset = rs.Offset
        msgHandler(rs.Message)
    }
    // Ack the last consumed message.
    _, err = client.Ack(context.TODO(), &pb.AckRq{
        Topic:     topic,
        Group:     group,
        Partition: ackPartition,
        Offset:    ackOffset,
    })
    if err != nil {
        return errors.Wrapf(err, "while acking last")
    }
    return nil
}
horkhe commented 6 years ago

@hermanschaaf thank you for corrections, I made your changes in the PR.