vesoft-inc / nebula-flink-connector

Flink Connector for Nebula Graph
48 stars 30 forks source link

nebula-flink-connector中NebulaSinkFunction无法识别FlinkCDC的Source中Row的RowKind的类型 #104

Open fanqiejiang8 opened 2 weeks ago

fanqiejiang8 commented 2 weeks ago

我自己应用程序中的代码如下: VertexExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() .setGraphSpace(ennVertexConfig.getGraphSpace()) .setTag(ennVertexConfig.getTagName()) .setIdIndex(ennVertexConfig.getIndex())//vid .setFields(ennVertexConfig.getFields()) .setWriteMode(WriteModeEnum.INSERT) .setPositions(ennVertexConfig.getPositions()) .setBatchSize(ennVertexConfig.getBatchSize()) .build(); NebulaVertexBatchOutputFormat outputFormat = new NebulaVertexBatchOutputFormat(graphConnectionProvider, metaConnectionProvider, executionOptions); 从这里我们可以看出来,在创建NebulaVertexBatchOutputFormat的时候就把WriteMode给确定了,但是对于FlinkCDC Source的数据,只有在反序列化成Row格式之后才能知道RowKind的类型,所以当使用 FlinkCDC的MysqSqlSource + NebulaSinkFunction的时候是无法处理删除的数据。 我们从源码中也能看出来,下面是源码: ` @Override public void addToBatch(Row record) { NebulaEdge edge = converter.createEdge(record, executionOptions.getPolicy()); if (edge == null) { return; } nebulaEdgeList.add(edge); }

@Override public void executeBatch(Session session) throws IOException { if (isBatchEmpty()) { return; } NebulaEdges nebulaEdges = new NebulaEdges(executionOptions.getLabel(), executionOptions.getFields(), nebulaEdgeList, executionOptions.getPolicy(), executionOptions.getPolicy()); // generate the write ngql statement String statement = null; switch (executionOptions.getWriteMode()) { case INSERT: statement = nebulaEdges.getInsertStatement(); break; case UPDATE: statement = nebulaEdges.getUpdateStatement(); break; case DELETE: statement = nebulaEdges.getDeleteStatement(); break; default: throw new IllegalArgumentException("write mode is not supported"); } executeStatement(session, statement); clearBatch(); } ` 然后我自己测试的结果如下:

c5228c338112ebd8bcafe95c26ca54e

fanqiejiang8 commented 2 weeks ago

补充一下:FlinkCDC的MysqSqlSource + NebulaSinkFunction的时候是无法处理删除的数据。