confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
44 stars 1.04k forks source link

UDF that fails on returning Struct but returns String with no error. #5394

Open tanmaybansal22 opened 4 years ago

tanmaybansal22 commented 4 years ago

Describe the bug When a custom UDF returns STRUCT which has optional fields, push queries are not working. The function definition shows nothing in variations.

To Reproduce

Steps to reproduce the behavior, include: The version of Confluent Platform is 5.5.0 and ksqlDB is 0.7.1.

  1. pom.xml for the UDTF:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.abc.ksqldb.udf</groupId>
    <artifactId>extract-stat</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!-- Specify the repository for Confluent dependencies -->
    <repositories>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

    <!-- Specify build properties -->
    <properties>
        <exec.mainClass>org.abc.ksqldb.udf.main</exec.mainClass>
        <java.version>1.8</java.version>
        <kafka.version>5.5.0-ccs</kafka.version>
        <kafka.scala.version>2.12</kafka.scala.version>
        <scala.version>${kafka.scala.version}.8</scala.version>
        <confluent.version>5.5.0</confluent.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>

        <!-- Specify the ksqldb-udf dependency -->
        <dependency>
            <groupId>io.confluent.ksql</groupId>
            <artifactId>ksqldb-udf</artifactId>
            <version>5.5.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/connect-api -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>5.5.0-ccs</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.json/json -->
        <dependency>
            <groupId>org.json</groupId>
            <artifactId>json</artifactId>
            <version>20190722</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                    <encoding>UTF-8</encoding>
                    <compilerArgs>
                        <arg>-parameters</arg>
                    </compilerArgs>
                </configuration>
            </plugin>

            <!-- Package all dependencies as one jar -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.5.2</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <mainClass>${exec.mainClass}</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>assemble-all</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. Create the file src/main/java/org/abc/ksqldb/udf/ExtractData.java.
    
    package org.abc.ksqldb.udf;
    import io.confluent.ksql.function.udf.Udf;
    import io.confluent.ksql.function.udf.UdfDescription;

import java.util.*;

import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.json.*;

import org.slf4j.Logger; import org.slf4j.LoggerFactory;

@UdfDescription(name = "extract data", description = "extract data") public class ExtractData {

private static final String E_ID = "E_ID";
private static final String v_id = "v_id";
private static final String U_ROLE = "U_ROLE";
private static final String U_ID = "U_ID";
private static final String TYPE = "TYPE";
private static final String T_TT = "T_TT";
private static final String T_FT = "T_FT";
private static final String A_TS = "A_TS";

private static Logger logger = LoggerFactory.getLogger(ExtractTeacherVcLocalStats.class);

@Udf(description = "extract teacher and students vc local stats",
        schema = "STRUCT<E_ID VARCHAR, v_id VARCHAR, U_ROLE VARCHAR, U_ID VARCHAR" +
                "TYPE VARCHAR, T_TT BIGINT, T_FT BIGINT, A_TS VARCHAR  >"
)
public Struct extractdata(String id, String source, String ch_n,
                                         String name, String c_id,
                                         String data, String cc_id, String cc_key, String enc,
                                         String extr, String ts
) {

    String e_id = id;
    String v_id = ch_n;
    String u_role = "u";
    String u_id = "i";
    String type = "t";
    Long t_tt = 0L;
    Long t_ft = 0L;
    String a_ts = ts;

    if(name.equals("EVENT")){

        logger.info(data);

        //using schema builder
        final Schema SCHEMA = SchemaBuilder.struct()
                            .optional()
                            .field(E_ID,Schema.OPTIONAL_STRING_SCHEMA)
                            .field(v_id,Schema.OPTIONAL_STRING_SCHEMA)
                            .field(U_ROLE,Schema.OPTIONAL_STRING_SCHEMA)
                            .field(U_ID,Schema.OPTIONAL_STRING_SCHEMA)
                            .field(TYPE,Schema.OPTIONAL_STRING_SCHEMA)
                            .field(T_TT,Schema.OPTIONAL_INT64_SCHEMA)
                            .field(T_FT,Schema.OPTIONAL_INT64_SCHEMA)
                            .field(A_TS,Schema.OPTIONAL_STRING_SCHEMA)
                            .build();

        Struct extractedData = new Struct(SCHEMA)
                            .put(E_ID, e_id)
                .put(v_id, v_id)
                .put(U_ID, u_id)
                .put(U_ROLE, u_role)
                .put(TYPE,type)
                .put(T_TT, t_tt)
                .put(T_FT, t_ft)
                .put(A_TS, a_ts);

        return extractedData;

    }
    return null;
}

}


3. (Optional)Create a stream test_stream with 

create stream test_stream( id varchar, source varchar, ch_n varchar, name varchar, c_id varchar, data varchar, cc_id varchar, cc_key varchar, enc varchar, extr varchar, ts varchar ) with (kafka_topic = 'new_topic', partitions = 1, replications = 1, value_format = 'json')


4. (Optional )Insert some data into stream.

**Expected behavior**
1. _KSQL_ : ```describe function extractdata;``` should return function description along with function variations
2. On running KSQL query : 
`select * from extractdata(id,source,ch_n,name,c_id,data,cc_id,cc_key,enc,extr,ts) from test_stream;
`
It should return string with input paramters returned as it is.

**Actual behaviour**

1. Function variation in empty in function description 
2. On running KSQL query I get

KSQL Error when I execute function : Function 'extractdata' does not accept parameters (STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING).

Valid alternatives are:



**Additional context**
When I changed return type to string and returned the string, it worked fine. As soon as I change it to struct it fails.
agavra commented 4 years ago

hi @freakytanmay I think the problem might be with the lowercased v_id in the struct definition. Can you attach any ksql server logs you may have? If it doesn't show up when you DESCRIBE FUNCTION there will probably be an associated error in the error logs failing to load it.

big-andy-coates commented 4 years ago

Not the same as, but in a related code area to https://github.com/confluentinc/ksql/issues/5364. Who ever picks this up should probably try to address both.

agavra commented 3 years ago

removing p0 as we're awaiting more info on this one and #5364 has been closed

big-andy-coates commented 3 years ago

FYI #5364 was only closed as a duplicate of #4961. It wasn't fixed.