alibaba / pemja

Apache License 2.0
91 stars 26 forks source link

Issue with Thread mode in Flink 1.16 and M1 #23

Open kphan102 opened 2 years ago

kphan102 commented 2 years ago

I am running Flink 1.16 on Mac M1. Everything works as expected except few tweaks I had to make to get the pyflink 1.16 to work in my M1. However, when I decided to test the job in Thread mode, I got the following error:

2022-11-14 17:01:51
pemja.core.PythonException: <class 'TypeError'>: 'NoneType' object is not iterable
    at /usr/local/lib/python3.8/site-packages/pyflink/fn_execution/embedded/operations.process_element2(operations.py:140)
    at /usr/local/lib/python3.8/site-packages/pyflink/fn_execution/embedded/operations._output_elements(operations.py:57)
    at /usr/local/lib/python3.8/site-packages/pyflink/fn_execution/embedded/operations._process_elements_on_operation(operations.py:48)
    at /usr/local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/embedded/operations.process_element_func2(operations.py:208)
    at /usr/local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/embedded/operations.process_func(operations.py:111)
    at pemja.core.object.PyIterator.next(Native Method)
    at pemja.core.object.PyIterator.hasNext(PyIterator.java:40)
    at org.apache.flink.streaming.api.operators.python.embedded.AbstractTwoInputEmbeddedPythonFunctionOperator.processElement2(AbstractTwoInputEmbeddedPythonFunctionOperator.java:208)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord2(StreamTwoInputProcessorFactory.java:225)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$1(StreamTwoInputProcessorFactory.java:194)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.base/java.lang.Thread.run(Thread.java:829)

The following is the brief settings i have in my job

    env = StreamExecutionEnvironment.get_execution_environment()

    env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

    # Additional python settings
    env_config = Configuration(
        j_configuration=get_j_env_configuration(env._j_stream_execution_environment)
    )
    env_config.set_string("python.execution-mode", "thread")

I am running the job with 2 parallelism.

HuangXingBo commented 2 years ago

@kphan102 It looks like a bug in pyflink 1.16.0. Could you help create a JIRA to declare this problem in detail. Thx.