prestodb / presto

The official home of the Presto distributed SQL query engine for big data
http://prestodb.io
Apache License 2.0
15.93k stars 5.33k forks source link

kafka plugin failed in some cases #8364

Closed icanfly closed 5 years ago

icanfly commented 7 years ago

hi,I have met a problem (may be a bug) in presto-kafka module.

I checkout the master code, compile , and run it .

My testcase is:

  1. Sending some messages (json string as message format and Long number as key) to kafka topic: presto_test. source code like blew:
package com.zbj.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @author luopeng@zbj.com
 */
public class MsgProducer extends Thread {

    private final KafkaProducer<Long, String> producer;
    private final String topic;
    private final int msgCount;
    private Random rand = new Random();
    private String template ="{\"f1\":%d,\"f2\":%d}";

    public MsgProducer(String topic,final int msgCount) {
        Properties properties = new Properties();
        this.msgCount = msgCount;
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.17.31.245:9092");
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "MsgProducer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.LongSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        this.producer = new KafkaProducer<Long, String>(properties);
        this.topic = topic;
    }

    @Override
    public void run() {
        int msgNo = msgCount;

        while (true) {

            Long key = rand.nextLong();
            String msg = String.format(template,rand.nextInt(),rand.nextInt());
            producer.send(new ProducerRecord<Long, String>(this.topic, key, msg));

            msgNo--;
            if (msgNo <= 0) {
                //just wait for a moment until kafka client finished sending
                try {
                    TimeUnit.MILLISECONDS.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                break;
            }
        }
    }

    public static void main(String args[]) {
        new MsgProducer("presto_test", 1000).start();
    }
}

message key: random long number message body: json string (contains two field, each field typed int number also)

  1. Configure presto kafka plugin for tables. config file like this:

kafka.properties:

connector.name=kafka
kafka.nodes=bdata.kafka01.zbj:9092,bdata.kafka02.zbj:9092,bdata.kafka03.zbj:9092
kafka.table-names=presto.test
kafka.hide-internal-columns=true

and presto.test.json:

{
    "tableName": "test",
    "schemaName": "presto",
    "topicName": "presto_test",
    "key": {
        "dataFormat": "raw",
        "fields": [
            {
                "name": "kafka_key",
                "type": "BIGINT",
                "dataFormat":"LONG",
                "hidden": "false"
            }
        ]
    },
    "message": {
        "dataFormat": "json",
        "fields": [
            {
                "name": "f1",
                "dataFormat": "_default",
                "mapping":"f1",
                "type": "BIGINT",
                "hidden": "false"
            },
            {
                "name": "f2",
                "dataFormat": "_default",
                "mapping":"f2",
                "type": "BIGINT",
                "hidden": "false"
            }
       ]
    }
}
  1. Rerun presto server, and run some sql queries for kafka

In the third step,I run a simple query :

select * from kafka.presto.test where kafka_key > 0;

and get an error like this:

2017-06-26T15:16:04.216+0800    ERROR   remote-task-callback-2  com.facebook.presto.execution.StageStateMachine Stage 20170626_071604_00001_bmazi.1 failed
java.nio.BufferUnderflowException
    at java.nio.Buffer.nextGetIndex(Buffer.java:506)
    at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:412)
    at com.facebook.presto.decoder.raw.RawFieldDecoder$RawValueProvider.getLong(RawFieldDecoder.java:197)
    at com.facebook.presto.kafka.KafkaRecordSet$KafkaRecordCursor.getLong(KafkaRecordSet.java:258)
    at com_facebook_presto_$gen_CursorProcessor_45.project_0(Unknown Source)
    at com_facebook_presto_$gen_CursorProcessor_45.process(Unknown Source)
    at com.facebook.presto.operator.ScanFilterAndProjectOperator.getOutput(ScanFilterAndProjectOperator.java:232)
    at com.facebook.presto.operator.Driver.processInternal(Driver.java:378)
    at com.facebook.presto.operator.Driver.processFor(Driver.java:301)
    at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:622)
    at com.facebook.presto.execution.TaskExecutor$PrioritizedSplitRunner.process(TaskExecutor.java:534)
    at com.facebook.presto.execution.TaskExecutor$Runner.run(TaskExecutor.java:670)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

And I run some sql queries like blew, runs fine:

select * from kafka.presto.test;
select * from kafka.presto.test ;
select * from kafka.presto.test where f1 > 0;
select f1,kafka_key from kafka.presto.test where f1 > 0;

It was wrong, when projections and where conditions both contains kafka_key field (kafka key field and raw as dataformat).

After debug the code, I found that :

  1. RawValueProvider class hold the value as ByteBuffer, and getLong() method can't be invoked like standard sql resultset for many times, ByteBuffer get Long() method is not idempotent, and can not get the same result for multiple invokes. But it seems getLong() has been invoked more than one time for each row data in my case.

  2. I think value should be cached for repeatable read. refer the hive plugin implements. I got the idea, I have changed and added a little codes. And it seems works ok.

  3. I will PR the code soon. Hope it will be the right code for resolving this problem.

stale[bot] commented 5 years ago

This issue has been automatically marked as stale because it has not had any activity in the last 2 years. If you feel that this issue is important, just comment and the stale tag will be removed; otherwise it will be closed in 7 days. This is an attempt to ensure that our open issues remain valuable and relevant so that we can keep track of what needs to be done and prioritize the right things.