Closed flo-mair closed 3 years ago
Hi @flo-mair, thank you for the CloudWatch logs.
it appears that the python program itself has a user exception in it and the application cannot start due to this.
These code samples were developed and meant to be run on a local machine, and I've created supplemental code to support running in a distributed KDA environment. Working to get that merged in, but see if the code in this repository works better for you.
Got the same issue when running the KDA official guide, re-uploaded with modified code (S3Sink example) and had errors as below:
": org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.input_table'."
It seems connection issue with Kinesis Data Stream.
Hi, can you paste the entire stack trace that is in CloudWatch?
Also can you double check your IAM Permissions?
Thanks for the help~ Please refer to the CloudWatch logs and IAM policy below. Thanks.
{
"locationInformation": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.logConfiguration(JarRunOverrideHandler.java:502)",
"logger": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler",
"message": "kinesis.analytics.checkpoint.config.pause: 5000",
"threadName": "flink-rest-server-netty-worker-thread-1",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.logConfiguration(JarRunOverrideHandler.java:502)",
"logger": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler",
"message": "akka.framesize: 157286400b",
"threadName": "flink-rest-server-netty-worker-thread-1",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.logConfiguration(JarRunOverrideHandler.java:502)",
"logger": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler",
"message": "execution.attached: true",
"threadName": "flink-rest-server-netty-worker-thread-1",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.logConfiguration(JarRunOverrideHandler.java:502)",
"logger": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler",
"message": "high-availability: zookeeper",
"threadName": "flink-rest-server-netty-worker-thread-1",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.logConfiguration(JarRunOverrideHandler.java:502)",
"logger": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler",
"message": "execution.shutdown-on-attached-exit: false",
"threadName": "flink-rest-server-netty-worker-thread-1",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.logConfiguration(JarRunOverrideHandler.java:502)",
"logger": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler",
"message": "pipeline.jars: file:/tmp/flink-web-a1f86c38-4b3b-4e62-9b6f-e337a5128429/flink-web-upload/8eedc3a8-4c90-4d30-9f48-ecdcbcd26022_code/StreamingFileSink/lib/amazon-kinesis-connector-flink-2.0.0.jar;file:/opt/amazon/opt/flink-python_2.12-1.11.1.jar",
"threadName": "flink-rest-server-netty-worker-thread-1",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.logConfiguration(JarRunOverrideHandler.java:502)",
"logger": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler",
"message": "kinesis.analytics.jarrunoverride.job: true",
"threadName": "flink-rest-server-netty-worker-thread-1",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.logConfiguration(JarRunOverrideHandler.java:502)",
"logger": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler",
"message": "security.ssl.internal.keystore: /etc/ssl-certs/flink-internal/internal.keystore",
"threadName": "flink-rest-server-netty-worker-thread-1",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.logConfiguration(JarRunOverrideHandler.java:502)",
"logger": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler",
"message": "restart-strategy: fixed-delay",
"threadName": "flink-rest-server-netty-worker-thread-1",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.logConfiguration(JarRunOverrideHandler.java:502)",
"logger": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler",
"message": "fs.s3a.connection.maximum: 1000",
"threadName": "flink-rest-server-netty-worker-thread-1",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.logConfiguration(JarRunOverrideHandler.java:502)",
"logger": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler",
"message": "state.checkpoints.dir: s3://3db4bd0e0168751d35dc925c1fa9414b79d097b8/269b62d27ab2c7948705ae5ffd2ebc67-123456789012-1617154564870/checkpoints",
"threadName": "flink-rest-server-netty-worker-thread-1",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.logConfiguration(JarRunOverrideHandler.java:502)",
"logger": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler",
"message": "state.backend.rocksdb.log.size: 67108864",
"threadName": "flink-rest-server-netty-worker-thread-1",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.logConfiguration(JarRunOverrideHandler.java:502)",
"logger": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler",
"message": "fs.s3a.endpoint: s3.amazonaws.com",
"threadName": "flink-rest-server-netty-worker-thread-1",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:132)",
"logger": "org.apache.flink.client.ClientUtils",
"message": "Starting program (detached: true)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:90)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "--------------------------- Python Process Started --------------------------",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.contrib.streaming.state.RocksDBStateBackend.<init>(RocksDBStateBackend.java:354)",
"logger": "org.apache.flink.contrib.streaming.state.RocksDBStateBackend",
"message": "Using predefined options: DEFAULT.",
"threadName": "Thread-20",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.contrib.streaming.state.RocksDBStateBackend.configureOptionsFactory(RocksDBStateBackend.java:574)",
"logger": "org.apache.flink.contrib.streaming.state.RocksDBStateBackend",
"message": "Using default options factory: DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.files.open=65536, state.backend.rocksdb.logs.num=10, state.backend.rocksdb.log.size=67108864}}.",
"threadName": "Thread-20",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "hello 1",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "hello 2",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "hello 3",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "Traceback (most recent call last):",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": " File \"/tmp/flink-web-a1f86c38-4b3b-4e62-9b6f-e337a5128429/flink-web-upload/8eedc3a8-4c90-4d30-9f48-ecdcbcd26022_code/StreamingFileSink/getting-started.py\", line 155, in <module>",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": " main()",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": " File \"/tmp/flink-web-a1f86c38-4b3b-4e62-9b6f-e337a5128429/flink-web-upload/8eedc3a8-4c90-4d30-9f48-ecdcbcd26022_code/StreamingFileSink/getting-started.py\", line 141, in main",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": " .format(output_table_name, input_table_name))",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": " File \"/opt/amazon/opt/python/pyflink.zip/pyflink/table/table_environment.py\", line 543, in execute_sql",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": " File \"/opt/amazon/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py\", line 1286, in __call__",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": " File \"/opt/amazon/opt/python/pyflink.zip/pyflink/util/exceptions.py\", line 147, in deco",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": " File \"/opt/amazon/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py\", line 328, in get_return_value",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": ": org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.input_table'.",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "Table options are:",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "'aws.region'='us-east-1'",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "'connector'='kinesis'",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "'format'='json'",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "'json.timestamp-format.standard'='ISO-8601'",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "'scan.stream.initpos'='LATEST'",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "'sink.partitioner-field-delimiter'=';'",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "'sink.producer.collection-max-count'='100'",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "'stream'='ExampleInputStream'",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:140)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:774)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:746)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:236)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat java.base/java.lang.Thread.run(Thread.java:834)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "Caused by: java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/deser/BeanDeserializerModifier",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat software.amazon.kinesis.connectors.flink.util.KinesisConfigUtil.validateAwsConfiguration(KinesisConfigUtil.java:450)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat software.amazon.kinesis.connectors.flink.util.KinesisConfigUtil.validateConsumerConfiguration(KinesisConfigUtil.java:115)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat software.amazon.kinesis.connectors.flink.table.KinesisDynamicTableFactory.validateConsumerProperties(KinesisDynamicTableFactory.java:151)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat software.amazon.kinesis.connectors.flink.table.KinesisDynamicTableFactory.createDynamicTableSource(KinesisDynamicTableFactory.java:77)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\t... 30 more",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.deser.BeanDeserializerModifier",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\tat java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "\t... 35 more",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:102)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "--------------------------- Python Process Exited ---------------------------",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:107)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "Run python process failed",
"throwableInformation": "java.lang.RuntimeException: Python process exits with code: 1\n\tat org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:104) ~[flink-python_2.12-1.11.1.jar:1.11.1]\n\tat jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]\n\tat jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]\n\tat jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]\n\tat java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]\n\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:294) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:203) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?]\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]\n\tat java.lang.Thread.run(Thread.java:834) [?:?]\n",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "ERROR"
}
{
"locationInformation": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$4(JarRunOverrideHandler.java:206)",
"logger": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler",
"message": "Error occurred when trying to start the job",
"throwableInformation": "org.apache.flink.client.program.ProgramAbortException\n\tat org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:111) ~[?:?]\n\tat jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]\n\tat jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]\n\tat jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]\n\tat java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]\n\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:294) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:203) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ~[?:?]\n\t... 6 more\nWrapped by: java.util.concurrent.CompletionException: org.apache.flink.client.program.ProgramAbortException\n\tat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[?:?]\n\tat java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) [?:?]\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) [?:?]\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]\n\tat java.lang.Thread.run(Thread.java:834) [?:?]\n",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "ERROR"
}
{
"locationInformation": "org.apache.flink.runtime.rest.handler.AbstractHandler.handleException(AbstractHandler.java:210)",
"logger": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler",
"message": "Exception occurred in REST handler: Could not execute application.",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "ERROR"
}
{
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": 9,
"message": "org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$4(JarRunOverrideHandler.java:207)\n\tat java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)\n\tat java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)\n\tat java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: java.util.concurrent.CompletionException: org.apache.flink.client.program.ProgramAbortException\n\tat java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)\n\tat java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)\n\t... 6 more\nCaused by: org.apache.flink.client.program.ProgramAbortException\n\tat org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:111)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:294)\n\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:201)\n\tat org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:203)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)\n\t... 6 more\n",
"messageType": "ERROR",
"messageSchemaVersion": "1",
"errorCode": "CodeError.InvalidApplicationCode"
}
{
"locationInformation": "com.amazonaws.kinesis.analytics.s3credsproxy.CredentialsCache.lambda$scheduleRefresh$0(CredentialsCache.java:95)",
"logger": "com.amazonaws.kinesis.analytics.s3credsproxy.CredentialsCache",
"message": "Refresher is getting an entry for key proxied-credentials-provider with timeToRefresh 1617193112007",
"threadName": "pool-5-thread-1",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "com.amazonaws.kinesis.analytics.s3credsproxy.CredentialsCache$AWSCredentialsCacheBuilder$2.reload(CredentialsCache.java:299)",
"logger": "com.amazonaws.kinesis.analytics.s3credsproxy.CredentialsCache",
"message": "Replacing old entry with key proxied-credentials-provider with expireAt = 1617195008000",
"threadName": "Thread-15",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "com.amazonaws.kinesis.analytics.s3credsproxy.CredentialsCache.scheduleRefresh(CredentialsCache.java:92)",
"logger": "com.amazonaws.kinesis.analytics.s3credsproxy.CredentialsCache",
"message": "Scheduling refresh of key proxied-credentials-provider at 1617193532063",
"threadName": "Thread-15",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "com.amazonaws.kinesis.analytics.s3credsproxy.CredentialsCache.lambda$scheduleRefresh$0(CredentialsCache.java:95)",
"logger": "com.amazonaws.kinesis.analytics.s3credsproxy.CredentialsCache",
"message": "Refresher is getting an entry for key proxied-credentials-provider with timeToRefresh 1617193114943",
"threadName": "pool-5-thread-1",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "com.amazonaws.kinesis.analytics.s3credsproxy.CredentialsCache$AWSCredentialsCacheBuilder$2.reload(CredentialsCache.java:299)",
"logger": "com.amazonaws.kinesis.analytics.s3credsproxy.CredentialsCache",
"message": "Replacing old entry with key proxied-credentials-provider with expireAt = 1617195008000",
"threadName": "Thread-36",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "com.amazonaws.kinesis.analytics.s3credsproxy.CredentialsCache.scheduleRefresh(CredentialsCache.java:92)",
"logger": "com.amazonaws.kinesis.analytics.s3credsproxy.CredentialsCache",
"message": "Scheduling refresh of key proxied-credentials-provider at 1617193534961",
"threadName": "Thread-36",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ReadCode",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:GetObjectVersion"
],
"Resource": [
"arn:aws:s3:::<code-bucket>/*"
]
},
{
"Sid": "ListCloudwatchLogGroups",
"Effect": "Allow",
"Action": [
"logs:DescribeLogGroups"
],
"Resource": [
"arn:aws:logs:us-east-1:123456789012:log-group:*"
]
},
{
"Sid": "ListCloudwatchLogStreams",
"Effect": "Allow",
"Action": [
"logs:DescribeLogStreams"
],
"Resource": [
"arn:aws:logs:us-east-1:123456789012:log-group:/aws/kinesis-analytics/myapp:log-stream:*"
]
},
{
"Sid": "PutCloudwatchLogs",
"Effect": "Allow",
"Action": [
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:us-east-1:123456789012:log-group:/aws/kinesis-analytics/myapp:log-stream:kinesis-analytics-log-stream"
]
},
{
"Sid": "ReadInputStream",
"Effect": "Allow",
"Action": "kinesis:*",
"Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/ExampleInputStream"
},
{
"Sid": "WriteOutputStream",
"Effect": "Allow",
"Action": "kinesis:*",
"Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/ExampleOutputStream"
}
]
}
Hi @flo-mair, thank you for the CloudWatch logs.
it appears that the python program itself has a user exception in it and the application cannot start due to this.
These code samples were developed and meant to be run on a local machine, and I've created supplemental code to support running in a distributed KDA environment. Working to get that merged in, but see if the code in this repository works better for you.
Still same error with this code
Are you supplying the Kinesis Connector when bundling / deploying your application and referencing it in the jarfile
Environment Variable as described in the Configure The Application
section here?
I see the error is showcasing the Table Options, but it's cutting off the output which shows the options for table sources which is unfortunate.
The Kinesis Source needs to be passed in via a jar file which can be downloaded and bundled with your application here: https://repo1.maven.org/maven2/software/amazon/kinesis/amazon-kinesis-connector-flink/2.0.0/amazon-kinesis-connector-flink-2.0.0.jar
Yes, I've zip the jar file and specify the jarfile in kinesis.analytics.flink.run.options.
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "Caused by: java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/deser/BeanDeserializerModifier",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
{
"locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:98)",
"logger": "org.apache.flink.client.python.PythonDriver",
"message": "Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.deser.BeanDeserializerModifier",
"threadName": "Flink-DispatcherRestEndpoint-thread-2",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:123456789012:application/myapp",
"applicationVersionId": "9",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
Tried again and still got the same error. Go through the error logs, something related to jackson package?
We've identified the root cause as the maven central jar being provided does not include the shaded dependencies required to run the Flink Kinesis Connector.
Short Term: You can build an UBER jar manually to use within your pyflink application by building the project here and bundling the required dependencies.
Longer term: we will be publishing the same jar somewhere for ease of use and downloadability.
I'll leave the issue open until the short term solution is verified or the longer term solution is implemented.
Thanks for the info @jeremyber-aws will try this.
Hi, I was facing the same problem and it seems to be resolved by adding the dependency and packaging as @jeremyber-aws said.
<artifactSet combine.children="append">
<includes>
<include>com.amazonaws:*</include>
<include>com.google.protobuf:*</include>
<include>org.apache.httpcomponents:*</include>
<include>software.amazon.awssdk:*</include>
<include>software.amazon.eventstream:*</include>
<include>software.amazon.ion:*</include>
<include>org.reactivestreams:*</include>
<include>io.netty:*</include>
<include>com.typesafe.netty:*</include>
<include>com.fasterxml.jackson.core:*</include> <!-- Added -->
</includes>
</artifactSet>
My pyflink job is started on KDA 🎉
However, my job is FAILED soon and following exception is appeard on the Flink dashboard.
java.lang.NoClassDefFoundError: Could not initialize class software.amazon.kinesis.shaded.com.amazonaws.ClientConfiguration
My full codes (includes CDK) are here.
Glad the fix is working for you, @sambaiz!
In speaking with the development engineers, they included the following dependencies:
<includes>
<include>com.amazonaws:*</include>
<include>com.google.protobuf:*</include>
<include>org.apache.httpcomponents:*</include>
<include>software.amazon.awssdk:*</include>
<include>software.amazon.eventstream:*</include>
<include>software.amazon.ion:*</include>
<include>org.reactivestreams:*</include>
<include>io.netty:*</include>
<include>com.typesafe.netty:*</include>
<include>com.amazonaws:amazon-kinesis-producer</include>
<include>com.google.guava:*</include>
<include>commons-logging:*</include>
<include>com.fasterxml.jackson.core:*</include>
</includes>
Please see if you have any issues including these extra dependencies.
Thanks @jeremyber-aws! I have succeeded to run my job by adding some dependencies.
<includes>
<include>com.amazonaws:*</include>
<include>com.google.protobuf:*</include>
<include>org.apache.httpcomponents:*</include>
<include>software.amazon.awssdk:*</include>
<include>software.amazon.eventstream:*</include>
<include>software.amazon.ion:*</include>
<include>org.reactivestreams:*</include>
<include>io.netty:*</include>
<include>com.typesafe.netty:*</include>
<include>com.amazonaws:amazon-kinesis-producer</include>
<include>com.google.guava:*</include>
<include>commons-logging:*</include>
<include>com.fasterxml.jackson.core:*</include>
<!-- Added -->
<include>com.fasterxml.jackson.dataformat:*</include>
<include>joda-time:joda-time</include>
</includes>
When running the sample code from your repo @jeremyber-aws I still get an error in cloudwatch.
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:xxxxxxxxxxxxx:application/sample-app",
"applicationVersionId": 5,
"message": "org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$4(JarRunOverrideHandler.java:207)\n\tat java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)\n\tat java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)\n\tat java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: java.util.concurrent.CompletionException: org.apache.flink.client.program.ProgramAbortException\n\tat java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)\n\tat java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)\n\t... 6 more\nCaused by: org.apache.flink.client.program.ProgramAbortException\n\tat org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:111)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:294)\n\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:201)\n\tat org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:203)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)\n\t... 6 more\n",
"messageType": "ERROR",
"messageSchemaVersion": "1",
"errorCode": "CodeError.InvalidApplicationCode"
}`
Hi @flo-mair without any more context into the error (are there any more logs that are helpful?) I am wondering if you provided the environment variables required to run the application?
Search Cloudwatch for errors containing PythonDriver
which has helped me in debugging when my pyflink application has errors.
Hi all, thank you for your patience with this issue.
The Amazon Kinesis SQL Connector for Flink (v2.0.3) has been indexed into Maven and is recommended for use within your PyFlink applications reading from Kinesis.
You can find the jar here.
I will mark this issue as closed, as including this jar will resolve any issues within the thread.
@jeremyber-aws Hi~ I've tried the new Amazon Kinesis SQL Connector for Flink (v2.0.3) and the job run successfully. However, when I changed the format from 'csv' to 'parquet', the job failed. Based on the guide on Flink, I also tried combine the jar, but still with same errors.
Is there any extra configuration to make parquet format available? Thanks~
@davidshtian how did you combine the jar? You can include it in your application configuration properties at the jarfile
location. Ensure you're creating an uberjar including the flink-sql-parquet
jar as noted in your linked documentation.
@davidshtian how did you combine the jar? You can include it in your application configuration properties at the
jarfile
location. Ensure you're creating an uberjar including theflink-sql-parquet
jar as noted in your linked documentation.
@jeremyber-aws Hi~ As I need specify jarfile to amazon-kinesis-sql-connector-flink-2.0.3.jar, I've tried
Add _flink-parquet2.11 dependency in amazon-kinesis-sql-connector-flink-2.0.3.pom, and build with mvn clean package -Dmaven.test.skip=true
command.
<dependencies>
<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-connector-flink</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_2.11</artifactId>
<version>1.11.1</version>
</dependency>
</dependencies>
Extract and combine jar files with unzip
and jar
commands.
$ mkdir tmp
$ (cd tmp; unzip -uo ../jar1.jar)
$ (cd tmp; unzip -uo ../jar2.jar)
$ jar -cvf combined.jar -C tmp .
Not sure if I make it right, but jobs still failed. Thanks~
Please see the following link on how to create UBER jars--the jar does not need to be named anything in particular.
Please see the following link on how to create UBER jars--the jar does not need to be named anything in particular.
@jeremyber-aws Really appreciate for your help~ Following the link you provided, I tried many combo that could make PyFlink KDA job running for "S3 + Parquet", and finally I got it with below pom.xml. Thanks~
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.david.demo</groupId>
<artifactId>pyflink-kinesis-parquet-uber</artifactId>
<version>0.0.1</version>
<packaging>jar</packaging>
<name>pyflink-kinesis-parquet-uber</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.3</version>
<exclusions>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.8.3</version>
<exclusions>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_2.11</artifactId>
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-sql-connector-flink</artifactId>
<version>2.0.3</version>
</dependency>
</dependencies>
<build>
<finalName>pyflink-kinesis-parquet-uber</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Main-Class>com.david.demo.App</Main-Class>
<Build-Number>1.0</Build-Number>
</manifestEntries>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
I do the same thing but I got below error: java.io.IOException: org.apache.flink.table.api.TableException: Can not find format factory.
I do the same thing but I got below error: java.io.IOException: org.apache.flink.table.api.TableException: Can not find format factory.
For the latest 1.13.2 version of KDA Flink, you could try the pom file below, changes are flink-parquet_2.11 and flink-sql-connector-kinesis_2.12 packages.
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.david.demo</groupId>
<artifactId>pyflink-kinesis-parquet-uber</artifactId>
<version>0.0.1</version>
<packaging>jar</packaging>
<name>pyflink-kinesis-parquet-uber</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.3</version>
<exclusions>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.8.3</version>
<exclusions>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_2.11</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kinesis_2.12</artifactId>
<version>1.13.2</version>
</dependency>
</dependencies>
<build>
<finalName>pyflink-kinesis-parquet-uber</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Main-Class>com.david.demo.App</Main-Class>
<Build-Number>1.0</Build-Number>
</manifestEntries>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Tried with S3 sink example with parquet data format, it works well.
When running the tumbling window sample (python) the application does not start in KDA. Followed the instructions in the Docs. I see the following logs in CloudWatch: