nielsbasjes / yauaa

Yet Another UserAgent Analyzer
https://yauaa.basjes.nl
Apache License 2.0
761 stars 130 forks source link

Unable to import the Flink table UDF in pyflink #1563

Open jmengoni opened 1 day ago

jmengoni commented 1 day ago

Please read this to ensure this is really a bug: https://yauaa.basjes.nl/developer/reportingissues/#these-are-not-bugs

Describe the bug The Flink table UDF is not compatible with pyflink as it requires a no-argument constructor.

Component where the bug happens [ ] Core analyzer [x] UDF : Flink/Beam/Nifi/... [ ] Other

To Reproduce Steps or code fragment to reproduce the behavior:

  1. Import version 7.28.1 of yauaa and yauaa-flink-table libraries
  2. Register the Java UDF in your pyflink code table_env.create_java_temporary_function("parse_user_agent", "nl.basjes.parse.useragent.flink.table.AnalyzeUseragentFunction")
  3. When running the Flink job, the following error is thrown:
  4. Py4JJavaError: An error occurred while calling o9.createTemporaryFunction. : org.apache.flink.table.api.ValidationException: Function class 'nl.basjes.parse.useragent.flink.table.AnalyzeUseragentFunction' must have a public default constructor. at org.apache.flink.table.functions.UserDefinedFunctionHelper.validateInstantiation(UserDefinedFunctionHelper.java:491) at org.apache.flink.table.functions.UserDefinedFunctionHelper.validateClass(UserDefinedFunctionHelper.java:387) at org.apache.flink.table.functions.UserDefinedFunctionHelper.instantiateFunction(UserDefinedFunctionHelper.java:234) at org.apache.flink.table.api.internal.TableEnvironmentImpl.createTemporaryFunction(TableEnvironmentImpl.java:442) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829)

Expected behavior The job should not fail and the UDF should be available to use in the Flink table API.

Additional context I don't know why a no-argument constructor was not provided, but I believe we should be able to have one using the DEFAULT_PARSE_CACHE_SIZE and an empty desiredFields to default to all fields.

jmengoni commented 1 day ago

@nielsbasjes I create a PR that should solve that issue: https://github.com/nielsbasjes/yauaa/pull/1564.

nielsbasjes commented 1 day ago

Awesome!

2 questions:

jmengoni commented 23 hours ago
  1. I haven't run it locally myself, but there are 2 options that I know of:

Then the following code should let you run a quick test:

from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment

env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = TableEnvironment.create(environment_settings=env_settings)
t_env.get_config().get_configuration().set_string('parallelism.default', '1')

t_env.create_java_temporary_function("parse_user_agent", "nl.basjes.parse.useragent.flink.table.AnalyzeUseragentFunction")

t_env.execute_sql("""
        CREATE TABLE print (
          ua MAP<STRING,STRING>
        ) WITH (
          'connector' = 'print'
        )
    """)

t_env.execute_sql("""
        INSERT INTO print
        SELECT parse_user_agent("your_user_agent_string")
    """)

Paragraph 1:

%pyflink

st_env.create_java_temporary_function("parse_user_agent", "nl.basjes.parse.useragent.flink.table.AnalyzeUseragentFunction")

Paragraph 2:

%flink.ssql(type=update)

select parse_user_agent("your_user_agent_string");

I have unfortunately not had time to test these solutions, but in theory they should work. Flink 1.13 is old but behave the same as newer version regarding Pyflink java UDFs. Finally, if you have access to a AWS account where you can spare some credit (careful, it's not cheap if you keep it running), you can also run the notebook above into a Managed Flink studio notebook (that's how I tested).

  1. I'm afraid that wouldn't be possible with the way pyflink is implemented. So the only solutions that I'm aware of are to create a new Java UDF with the required parameters, use a python UDF with an equivalent python library instead (way less performant), or switch to Flink java.
nielsbasjes commented 22 hours ago

Thanks, this is the info I need.