harbby / sylph

Stream computing platform for bigdata
https://harbby.github.io/project/sylph/index.html
Apache License 2.0
404 stars 173 forks source link

When `tableEnv.registerDataStream` is used multiple times, specifying `proctime.proctime` or `rowtime.rowtime` will cause this error. #23

Closed harbby closed 5 years ago

harbby commented 5 years ago

see:

Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long
    at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
    at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
    at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
    at DataStreamCalcRule$15.processElement(Unknown Source)
    at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)
    at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
    at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
harbby commented 5 years ago

HI, I have provided a simple code below to reproduce this problem.

/**
 * https://issues.apache.org/jira/browse/FLINK-10999
 */
public class FLINK_10999
{
    public static void main(String[] args)
            throws Exception
    {
        StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        execEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        execEnv.setParallelism(2);
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(execEnv);
        run(tableEnv);
    }

    public static DataStream<Row> getSource(StreamTableEnvironment tableEnv)
    {
        Table table = tableEnv.sqlQuery("SELECT * FROM (VALUES ('Bob', 1543052856000), ('Lucy', 1543054856000)) AS NameTable(name,event_time)");
        DataStream<Row> input = tableEnv.toAppendStream(table, Row.class);
        input.getTransformation().setOutputType(table.getSchema().toRowType());
        input = input.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Row>()
        {
            @Nullable
            @Override
            public Watermark getCurrentWatermark()
            {
                return new Watermark(System.nanoTime());
            }

            @Override
            public long extractTimestamp(Row element, long previousElementTimestamp)
            {
                return (long) element.getField(1);
            }
        });
        return input;
    }

    public static void run(StreamTableEnvironment tableEnv)
            throws Exception
    {
        DataStream<Row> inputStream = getSource(tableEnv);

        //add proctime.proctime
        Table tb1 = tableEnv.fromDataStream(inputStream, "name,event_time,proctime.proctime");
        DataStream<Row> stream1 = tableEnv.toAppendStream(tb1, Row.class);
        stream1.getTransformation().setOutputType(tb1.getSchema().toRowType());

        //--add rowtime.rowtime
        Table tb2 = tableEnv.fromDataStream(stream1, "name,proctime,rowtime.rowtime");
        tableEnv.toAppendStream(tb2, Row.class).print();

        System.out.println("**************************************************");
        System.out.println(tableEnv.execEnv().getExecutionPlan());
        System.out.println("*********************There are no errors here.*****************************");

        //---Running directly below will start to go wrong
        tableEnv.execEnv().execute();
    }
}
XuQianJin-Stars commented 5 years ago

hi ideal-hp After tracking, two problems were found: 1.Proctime and rowtime can't be used together in a table, you can only use either.

  1. The type returned by rowtime and proctime is timestamp, and you need to specify this type.

    public static DataStream getSource(StreamTableEnvironment tableEnv) { Table table = tableEnv.sqlQuery("SELECT * FROM (VALUES ('Bob', 1543052856000), ('Lucy', 1543054856000)) AS NameTable(name,event_time)"); DataStream input = tableEnv.toAppendStream(table, Row.class); input.getTransformation().setOutputType(table.getSchema().toRowType()); input = input.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks() { @Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(System.nanoTime()); }

        @Override
        public long extractTimestamp(Row element, long previousElementTimestamp) {
            return (long) element.getField(1);
        }
    });
    return input;

    }

    @Test public void run() throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); execEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); execEnv.setParallelism(1); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(execEnv);

    DataStream<Row> inputStream = getSource(tableEnv);
    
    //add proctime.proctime
    Table tb1 = tableEnv.fromDataStream(inputStream, "name,event_time,rowtime.rowtime");
    DataStream<Row> stream1 = tableEnv.toAppendStream(tb1, Row.class);
    System.out.println(tb1.getSchema().toRowType().toString());
    TypeInformation[] fieldTypes = new TypeInformation[3];
    String[] fieldNames = new String[3];
    fieldTypes[0] = TypeInformation.of(new TypeHint<String>(){});
    fieldNames[0] = "name";
    fieldTypes[1] = TypeInformation.of(new TypeHint<Long>(){});
    fieldNames[1] = "event_time";
    fieldTypes[2] = TypeInformation.of(new TypeHint<Timestamp>(){});
    fieldNames[2] = "rowtime";
    RowTypeInfo outRow =new RowTypeInfo(fieldTypes, fieldNames);
    System.out.println(outRow.toString());
    stream1.getTransformation().setOutputType(outRow);
    
    //--add rowtime.rowtime
    Table tb2 = tableEnv.fromDataStream(stream1, "name,event_time,rowtime,w.rowtime");
    tableEnv.toAppendStream(tb2, Row.class).print();
    
    System.out.println("***************************************************************************");
    System.out.println(tableEnv.execEnv().getExecutionPlan());
    System.out.println("*********************There are no errors here.*****************************");
    
    //---Running directly below will start to go wrong
    tableEnv.execEnv().execute();

    } @Test public void run2() throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); execEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); execEnv.setParallelism(1); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(execEnv);

    DataStream<Row> inputStream = getSource(tableEnv);
    
    //add proctime.proctime
    Table tb1 = tableEnv.fromDataStream(inputStream, "name,event_time,proctime.proctime");
    DataStream<Row> stream1 = tableEnv.toAppendStream(tb1, Row.class);
    System.out.println(tb1.getSchema().toRowType().toString());
    TypeInformation[] fieldTypes = new TypeInformation[3];
    String[] fieldNames = new String[3];
    fieldTypes[0] = TypeInformation.of(new TypeHint<String>(){});
    fieldNames[0] = "name";
    fieldTypes[1] = TypeInformation.of(new TypeHint<Long>(){});
    fieldNames[1] = "event_time";
    fieldTypes[2] = TypeInformation.of(new TypeHint<Timestamp>(){});
    fieldNames[2] = "proctime";
    RowTypeInfo outRow =new RowTypeInfo(fieldTypes, fieldNames);
    System.out.println(outRow.toString());
    stream1.getTransformation().setOutputType(outRow);
    
    //--add rowtime.rowtime
    Table tb2 = tableEnv.fromDataStream(stream1, "name,event_time,proctime,w.proctime");
    tableEnv.toAppendStream(tb2, Row.class).print();
    
    System.out.println("***************************************************************************");
    System.out.println(tableEnv.execEnv().getExecutionPlan());
    System.out.println("*********************There are no errors here.*****************************");
    
    //---Running directly below will start to go wrong
    tableEnv.execEnv().execute();

    }

harbby commented 5 years ago

Hi @XuQianJin-Stars I found that this is the only way to raise this error.

        Table table = tableEnv.fromDataStream(inputStream, "name,proctime.proctime");
        DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
        stream.getTransformation().setOutputType(table.getSchema().toRowType());
        stream.print();
XuQianJin-Stars commented 5 years ago

@harbby Yes, it is an error caused by type inconsistency when converting to row.

harbby commented 5 years ago

Not repaired yet

yeezychao commented 3 years ago

Hello,I'm meet the same problem as you, Try to turn the table into a stream(have rowtime), How to solve int finally? Use flink 1.12.1

wanghui93 commented 3 years ago

加我微信,少侠,17620356593


寄件者: yeezychao @.> 寄件日期: 2021年3月22日 14:35 收件者: harbby/sylph @.> 副本: WANG Hui @.>; Manual @.> 主旨: Re: [harbby/sylph] When tableEnv.registerDataStream is used multiple times, specifying proctime.proctime or rowtime.rowtime will cause this error. (#23)

Hello,I'm meet the same problem as you, Try to turn the table into a stream(have rowtime), How to solve int finally?

― You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHubhttps://github.com/harbby/sylph/issues/23#issuecomment-803801350, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AHECBJLZIALB5DEFQA7FRT3TE3QKDANCNFSM4GFD4L2A.