apache / apisix

The Cloud-Native API Gateway
https://apisix.apache.org/blog/
Apache License 2.0
14.45k stars 2.52k forks source link

bug: kafka-proxy Setting PubSubReq.CmdKafkaFetch.offset does not take effect #9779

Open robertluoxu opened 1 year ago

robertluoxu commented 1 year ago

Current Behavior

use kafka+apisix(kafka-proxy)+vue , kafka have 5 7offset,code set offset:5 ,Return the content of offset 0-7, expect to return the content of 5-7; in addition, cmdKafkaListOffset cannot get the value

let cmdKafkaFetch = {topic: "testTopic", partition: 0 , offset: 5}
let cmdKafkaListOffset = {topic: "testTopic", partition: 0 , timestamp: 0}
let pubSubReq = { sequence:1, cmd_kafka_fetch: cmdKafkaFetch, cmd_kafka_list_offset: cmdKafkaListOffset}

image

Expected Behavior

No response

Error Logs

No response

Steps to Reproduce

code

Environment

shreemaan-abhishek commented 1 year ago

Please share your route configuration and any relevant error logs.

robertluoxu commented 1 year ago

my router

{
  "uri": "/kafka",
  "name": "kafka",
  "methods": [
    "GET",
    "POST",
    "PUT",
    "DELETE",
    "PATCH",
    "HEAD",
    "OPTIONS",
    "CONNECT",
    "TRACE",
    "PURGE"
  ],
  "upstream_id": "467794562773943068",
  "status": 1
}

upstream:

{
  "nodes": [
    {
      "host": "ip",
      "port":  port,
      "weight": 1
    }
  ],
  "timeout": {
    "connect": 6,
    "send": 6,
    "read": 6
  },
  "type": "roundrobin",
  "scheme": "kafka",
  "pass_host": "pass",
  "name": "kafka-test",
  "keepalive_pool": {
    "idle_timeout": 60,
    "requests": 1000,
    "size": 320
  }
}

vue code:

created: function() {
    this.$options.sockets.onmessage = (res) => { 
        // eslint-disable-next-line no-console
        console.log('sockets.onmessage:' , res)
        res.data.arrayBuffer().then(buffer=>{
            // ArrayBuffer
            let s = new Uint8Array(buffer);
            let devcodePubsubRespdata  = decodePubSubResp(s);
            // eslint-disable-next-line no-console
            console.log('devcodePubsubRespdata', devcodePubsubRespdata)
            devcodePubsubRespdata.kafka_fetch_resp.messages.forEach(element => {
                // eslint-disable-next-line no-console
                console.log('kafka offset', element.offset.high)
                // eslint-disable-next-line no-console
                console.log('kafka timestamp', element.timestamp)
                // eslint-disable-next-line no-console
                console.log('kafka value', String.fromCharCode.apply(null,element.value))
            });
        })};
    // eslint-disable-next-line no-console
    this.$options.sockets.onerror = (data) => {console.log('sockets.onerror:' , data)}
    // eslint-disable-next-line no-console
    this.$options.sockets.onopen  = (data) => {console.log('sockets.onopen :' , data)}
    // eslint-disable-next-line no-console
    this.$options.sockets.onclose   = (data) => {console.log('sockets.onclose  :' , data)}
  },
  methods: {
    ws(){
        let cmdKafkaFetch = {topic: "testTopic", partition: 0 , offset: 5}
        let cmdKafkaListOffset = {topic: "testTopic", partition: 0 }
        let pubSubReq = { sequence:1,cmd_kafka_fetch: cmdKafkaFetch, cmd_kafka_list_offset: cmdKafkaListOffset}
        this.pubsubdata  = encodePubSubReq(pubSubReq);
        // eslint-disable-next-line no-console
        console.log('$socket.send', this.pubsubdata);
        this.$socket.send(this.pubsubdata);
    },
    decodeWs(){
        let result  = decodePubSubReq(this.pubsubdata);
        // eslint-disable-next-line no-console
        console.log('decodePubSubReq', result);
        // this.$socket.send('Hello WebSocket')
    },
  }

kafka : image

I want to get offset starting from 5, how to get it, how to use java-like @KafkaListener to consume kafka new messages

shreemaan-abhishek commented 1 year ago

I have no experience in working with Kafka so I will unassign myself.