garyfeng / flink-playgrounds

Apache Flink Playgrounds
https://flink.apache.org/
Apache License 2.0
0 stars 0 forks source link

PyLink: streamTable Required context properties mismatch. #5

Open garyfeng opened 4 years ago

garyfeng commented 4 years ago

With this dockerfile, I tried to run the following python script and got the following errors:

Python 3.7.3 (default, Dec 20 2019, 18:57:59)
[GCC 8.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import os
>>>
... from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
>>> from pyflink.table import StreamTableEnvironment, CsvTableSink, DataTypes
>>> from pyflink.table.descriptors import Schema, Rowtime, Json, Kafka
>>> from pyflink.table.window import Tumble
>>>
>>> s_env = StreamExecutionEnvironment.get_execution_environment()
>>> s_env.set_parallelism(1)
<pyflink.datastream.stream_execution_environment.StreamExecutionEnvironment object at 0x7f3642d1e080>
>>> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>
>>> # Stream Table
... st_env = StreamTableEnvironment.create(s_env)
>>>
>>> # Set source Kafka table
... st_env \
...     .connect(  # declare the external system to connect to
...         Kafka()
...             .version("0.11")
...             .topic("input")
...             .start_from_earliest()
...             .property("zookeeper.connect", "zookeeper:2181")
...             .property("bootstrap.servers", "kafka:9092")
...     ) \
...     .with_format(  # declare a format for this system
...         Json()
...             .fail_on_missing_field(True)
...             .json_schema(
...             "{"
...             "  type: 'object',"
...             "  properties: {"
...             "    timestamp: {"
...             "      type: 'string'"
...             "    },"
...             "    page: {"
...             "      type: 'string'"
...             "    }"
...             "  }"
...             "}"
...         )
...     ) \
...     .with_schema(  # declare the schema of the table
...         Schema()
...             .field("timestamp", DataTypes.TIMESTAMP()).proctime()
...             .field("page", DataTypes.STRING())
...     ) \
...     .in_append_mode() \
...     .register_table_source("ClickEvent Source")
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/apache_flink-1.10.dev0-py3.7.egg/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.7/dist-packages/py4j-0.10.8.1-py3.7.egg/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o44.registerTableSource.
: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
        at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
        at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:42)
        at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
'format.type' expects 'csv', but is 'json'

The following properties are requested:
connector.properties.bootstrap.servers=kafka:9092
connector.properties.zookeeper.connect=zookeeper:2181
connector.property-version=1
connector.startup-mode=earliest-offset
connector.topic=input
connector.type=kafka
connector.version=0.11
format.fail-on-missing-field=true
format.json-schema={  type: 'object',  properties: {    timestamp: {      type: 'string'    },    page: {      type: 'string'    }  }}
format.property-version=1
format.type=json
schema.0.data-type=TIMESTAMP(3)
schema.0.name=timestamp
schema.0.proctime=true
schema.1.data-type=VARCHAR(2147483647)
schema.1.name=page
update-mode=append

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
        at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
        at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
        at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
        at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
        at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
        ... 13 more

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<stdin>", line 34, in <module>
  File "/usr/local/lib/python3.7/dist-packages/apache_flink-1.10.dev0-py3.7.egg/pyflink/table/descriptors.py", line 1295, in register_table_source
    self._j_connect_table_descriptor.registerTableSource(name)
  File "/usr/local/lib/python3.7/dist-packages/py4j-0.10.8.1-py3.7.egg/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/lib/python3.7/dist-packages/apache_flink-1.10.dev0-py3.7.egg/pyflink/util/exceptions.py", line 154, in deco
    raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.'

JARs under $FLINK_HOME/lib:

root@33ac860cb667:/opt/flink/bin# ls ../lib
flink-connector-kafka-base_2.11-1.10.0.jar  flink-table-blink_2.11-1.10.0.jar  slf4j-log4j12-1.7.15.jar
flink-dist_2.11-1.10.0.jar                  flink-table_2.11-1.10.0.jar
flink-json-1.10.0.jar                       log4j-1.2.17.jar
root@33ac860cb667:/opt/flink/bin#
garyfeng commented 4 years ago

Googling for 'connector.type' expects 'filesystem', but is 'kafka' points me to https://issues.apache.org/jira/browse/FLINK-15552, where in SQL/Kafka testing they have identified an issue with class loading that was supposedly fixed in v1.10.

One thing they mentioned is that defining the environment via YAML seemed to have avoided this problem. See https://github.com/apache/flink/blob/master/flink-end-to-end-tests/test-scripts/test_sql_client.sh

The problem is that I don't know whether this is the same issue, and/or whether we can define env using YAML in pyFlink.