aws-samples / pyflink-getting-started

MIT No Attribution
53 stars 26 forks source link

[Bug?] Cannot invoke "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.createObjectNode()" because "this.mapper" is null #27

Open alucard001 opened 7 months ago

alucard001 commented 7 months ago

The problem:

  1. I execute pyflink-examples/GettingStarted/getting-started.py, it is pending for data, no error.
  2. I execute python stock.py in another terminal. Data is fetching OK.
  3. After a while (like 2 - 3 seconds), the following error occurs (the error is too long so I just keep the last part. The full one is at the end of this question):

Please note:

Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.createObjectNode()" because "this.mapper" is null
        at org.apache.flink.formats.json.JsonRowDataSerializationSchema.serialize(JsonRowDataSerializationSchema.java:99)
        at org.apache.flink.formats.json.JsonRowDataSerializationSchema.serialize(JsonRowDataSerializationSchema.java:43)
        at org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkElementConverter.apply(KinesisStreamsSinkElementConverter.java:56)
        at org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkElementConverter.apply(KinesisStreamsSinkElementConverter.java:34)
        at org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.write(AsyncSinkWriter.java:342)
        at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:160)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
        at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
        at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
        at java.base/java.lang.Thread.run(Thread.java:1583)

My Conda Environment:

name: pyflink
channels:
  - conda-forge
  - defaults
dependencies:
  - libcxx=16.0.6=h4653b0c_0
  - libzlib=1.2.13=h53f4e23_5
  - openjdk=21.0.2=hbeb2e11_0

Package info:

aiofiles                           23.2.1
aiohttp                            3.9.1
aiosignal                          1.3.1
altair                             5.2.0
annotated-types                    0.6.0
anyio                              3.7.1
apache-beam                        2.48.0
apache-flink                       1.19.0
apache-flink-libraries             1.19.0
attrs                              23.1.0
avro-python3                       1.10.2
beautifulsoup4                     4.12.3
black                              22.12.0
boto3                              1.34.74
botocore                           1.34.74
Brotli                             1.1.0
bs4                                0.0.2
build                              1.1.1
CacheControl                       0.14.0
certifi                            2023.11.17
cffi                               1.16.0
cfgv                               3.4.0
charset-normalizer                 3.3.2
cleo                               2.1.0
click                              8.1.7
cloudpickle                        2.2.1
colorama                           0.4.6
contourpy                          1.2.0
coverage                           7.3.3
crashtest                          0.4.1
crcmod                             1.7
cycler                             0.12.1
dataclasses-json                   0.5.14
Deprecated                         1.2.14
dill                               0.3.1.1
dirtyjson                          1.0.8
diskcache                          5.6.3
distlib                            0.3.8
distro                             1.8.0
dnspython                          2.4.2
docopt                             0.6.2
dulwich                            0.21.7
email-validator                    2.1.0.post1
fastapi                            0.110.0
fastavro                           1.9.4
fasteners                          0.19
fastjsonschema                     2.19.1
ffmpy                              0.3.1
filelock                           3.13.1
find_libpython                     0.4.0
fonttools                          4.46.0
frozenlist                         1.4.1
fsspec                             2023.12.2
gradio                             4.19.2
gradio_client                      0.10.1
greenlet                           3.0.2
grpcio                             1.60.0
grpcio-tools                       1.60.0
h11                                0.14.0
h2                                 4.1.0
hdfs                               2.7.3
hpack                              4.0.0
httpcore                           1.0.2
httplib2                           0.22.0
httptools                          0.6.1
httpx                              0.25.2
huggingface-hub                    0.19.4
hyperframe                         6.0.1
identify                           2.5.33
idna                               3.6
importlib_metadata                 7.0.2
importlib-resources                6.1.1
iniconfig                          2.0.0
injector                           0.21.0
installer                          0.7.0
itsdangerous                       2.1.2
jaraco.classes                     3.3.1
Jinja2                             3.1.2
jmespath                           1.0.1
joblib                             1.3.2
jsonschema                         4.20.0
jsonschema-specifications          2023.11.2
keyring                            24.3.1
kiwisolver                         1.4.5
llama_cpp_python                   0.2.53
llama-index-core                   0.10.14.post1
llama-index-embeddings-huggingface 0.1.4
llama-index-llms-llama-cpp         0.1.3
llama-index-readers-file           0.1.6
llama-index-vector-stores-qdrant   0.1.3
llamaindex-py-client               0.1.13
llvmlite                           0.42.0
markdown-it-py                     3.0.0
MarkupSafe                         2.1.3
marshmallow                        3.20.1
matplotlib                         3.8.2
mdurl                              0.1.2
more-itertools                     10.2.0
mpmath                             1.3.0
msgpack                            1.0.7
multidict                          6.0.4
mypy                               1.7.1
mypy-extensions                    1.0.0
nest-asyncio                       1.5.8
networkx                           3.2.1
nltk                               3.8.1
nodeenv                            1.8.0
numba                              0.59.0
numpy                              1.24.4
objsize                            0.6.1
openai                             1.5.0
openai-whisper                     20231117
orjson                             3.9.15
packaging                          23.2
pandas                             2.1.4
pathspec                           0.12.1
pemja                              0.4.1
pexpect                            4.9.0
Pillow                             10.1.0
pip                                24.0
pkginfo                            1.10.0
platformdirs                       4.1.0
pluggy                             1.3.0
poetry                             1.8.2
poetry-core                        1.9.0
poetry-plugin-export               1.7.0
portalocker                        2.8.2
pre-commit                         2.21.0
private-gpt                        0.4.0         /Users/alucard/privateGPT
proto-plus                         1.23.0
protobuf                           4.23.4
ptyprocess                         0.7.0
py4j                               0.10.9.7
pyarrow                            11.0.0
pycparser                          2.21
pydantic                           2.5.2
pydantic_core                      2.14.5
pydantic-extra-types               2.2.0
pydantic-settings                  2.1.0
pydot                              1.4.2
pydub                              0.25.1
pyflink                            1.0
Pygments                           2.17.2
pymongo                            4.6.3
PyMuPDF                            1.23.25
PyMuPDFb                           1.23.22
pyparsing                          3.1.1
pypdf                              4.0.2
pyproject_hooks                    1.0.0
PySocks                            1.7.1
pytest                             7.4.3
pytest-asyncio                     0.21.1
pytest-cov                         3.0.0
python-dateutil                    2.8.2
python-dotenv                      1.0.0
python-multipart                   0.0.9
pytz                               2023.3.post1
PyYAML                             6.0.1
qdrant-client                      1.7.3
rapidfuzz                          3.6.2
referencing                        0.32.0
regex                              2023.10.3
requests                           2.31.0
requests-toolbelt                  1.0.0
rich                               13.7.0
rpds-py                            0.14.1
ruamel.yaml                        0.18.6
ruamel.yaml.clib                   0.2.8
ruff                               0.2.2
s3transfer                         0.10.1
safetensors                        0.4.1
semantic-version                   2.10.0
setuptools                         69.0.2
shellingham                        1.5.4
six                                1.16.0
sniffio                            1.3.0
soupsieve                          2.5
SQLAlchemy                         2.0.27
starlette                          0.36.3
sympy                              1.12
tenacity                           8.2.3
tiktoken                           0.5.2
tokenizers                         0.15.2
tomli                              2.0.1
tomlkit                            0.12.0
toolz                              0.12.0
torch                              2.1.2
tqdm                               4.66.1
transformers                       4.38.2
trove-classifiers                  2024.3.3
typer                              0.9.0
types-PyYAML                       6.0.12.12
typing_extensions                  4.9.0
typing-inspect                     0.9.0
tzdata                             2023.3
ujson                              5.9.0
urllib3                            1.26.18
uvicorn                            0.24.0.post1
uvloop                             0.19.0
virtualenv                         20.25.0
watchdog                           4.0.0
watchfiles                         0.21.0
websockets                         11.0.3
wheel                              0.42.0
wrapt                              1.16.0
xattr                              1.1.0
yarl                               1.9.4
zipp                               3.17.0
zstandard                          0.22.0

Full JAVA error log:

(pyflink) MacBook-Pro:pyflink mymacbook$ python getting-started.py
Traceback (most recent call last):
  File "/Users/alucard/xdclass/pyflink/getting-started.py", line 167, in <module>
    main()
  File "/Users/alucard/xdclass/pyflink/getting-started.py", line 160, in main
    table_result.wait()
  File "/Users/alucard/Library/Enthought/Canopy_64bit/User/lib/python3.11/site-packages/pyflink/table/table_result.py", line 76, in wait
    get_method(self._j_table_result, "await")()
  File "/Users/alucard/Library/Enthought/Canopy_64bit/User/lib/python3.11/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "/Users/alucard/Library/Enthought/Canopy_64bit/User/lib/python3.11/site-packages/pyflink/util/exceptions.py", line 146, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File "/Users/alucard/Library/Enthought/Canopy_64bit/User/lib/python3.11/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o101.await.
: java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to wait job finish
        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
        at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:122)
        at org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:85)
        at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
        at java.base/java.lang.reflect.Method.invoke(Method.java:580)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.flink.table.api.TableException: Failed to wait job finish
        at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:85)
        at org.apache.flink.table.api.internal.InsertResultProvider.isFirstRowReady(InsertResultProvider.java:71)
        at org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:109)
        at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        ... 1 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
        at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
        ... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
        at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
        at org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
        at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1287)
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
        at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:47)
        at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:310)
        at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307)
        at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234)
        at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
        at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
        at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
        at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:629)
        at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:34)
        at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:33)
        at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:73)
        at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:110)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
        at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:110)
        at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59)
        at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180)
        at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261)
        at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
        at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
        at java.base/java.lang.reflect.Method.invoke(Method.java:580)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
        at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
        at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
        at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
        at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
        at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
        at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
        at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
        at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
        at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
        at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
        at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
        ... 5 more
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.createObjectNode()" because "this.mapper" is null
        at org.apache.flink.formats.json.JsonRowDataSerializationSchema.serialize(JsonRowDataSerializationSchema.java:99)
        at org.apache.flink.formats.json.JsonRowDataSerializationSchema.serialize(JsonRowDataSerializationSchema.java:43)
        at org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkElementConverter.apply(KinesisStreamsSinkElementConverter.java:56)
        at org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkElementConverter.apply(KinesisStreamsSinkElementConverter.java:34)
        at org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.write(AsyncSinkWriter.java:342)
        at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:160)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
        at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
        at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
        at java.base/java.lang.Thread.run(Thread.java:1583)

My Python code and application_properties.json, in case you want to know:

[
    {
        "PropertyGroupId": "kinesis.analytics.flink.run.options",
        "PropertyMap": {
            "python": "getting-started.py",
            "jarfile": "lib/flink-sql-connector-kinesis-1.15.2.jar"
        }
    },
    {
        "PropertyGroupId": "consumer.config.0",
        "PropertyMap": {
            "input.stream.name": "ExampleInputStream",
            "flink.stream.initpos": "LATEST",
            "aws.region": "ap-east-1"
        }
    },
    {
        "PropertyGroupId": "producer.config.0",
        "PropertyMap": {
            "output.stream.name": "ExampleOutputStream",
            "shard.count": "1",
            "aws.region": "ap-east-1"
        }
    }
]
# -*- coding: utf-8 -*-
"""
getting-started.py
~~~~~~~~~~~~~~~~~~~
This module:
    1. Creates a table environment
    2. Creates a source table from a Kinesis Data Stream
    3. Creates a sink table writing to a Kinesis Data Stream
    4. Inserts the source table data into the sink table
"""

from pyflink.table import EnvironmentSettings, TableEnvironment
import os
import json

# Set IS_LOCAL to True
os.environ["IS_LOCAL"] = "True"
os.environ["AWS_ACCESS_KEY_ID"] = "123456789"
os.environ["AWS_SECRET_ACCESS_KEY"] = "123456789"

# 1. Creates a Table Environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

statement_set = table_env.create_statement_set()

APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json"  # on kda

is_local = (
    True if os.environ.get("IS_LOCAL") else False
)  # set this env var in your local environment

if is_local:
    # only for local, overwrite variable to properties and pass in your jars delimited by a semicolon (;)
    APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json"  # local

    CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
    table_env.get_config().get_configuration().set_string(
        "pipeline.jars",
        "file:///" + CURRENT_DIR + "/lib/flink-sql-connector-kinesis-1.15.2.jar",
    )

def get_application_properties():
    if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
        with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
            contents = file.read()
            properties = json.loads(contents)
            return properties
    else:
        print('A file at "{}" was not found'.format(APPLICATION_PROPERTIES_FILE_PATH))

def property_map(props, property_group_id):
    for prop in props:
        if prop["PropertyGroupId"] == property_group_id:
            return prop["PropertyMap"]

def create_source_table(table_name, stream_name, region, stream_initpos):
    return """ CREATE TABLE {0} (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              )
              PARTITIONED BY (ticker)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'scan.stream.initpos' = '{3}',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(
        table_name, stream_name, region, stream_initpos
    )

def create_sink_table(table_name, stream_name, region, stream_initpos):
    return """ CREATE TABLE {0} (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              )
              PARTITIONED BY (ticker)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'sink.partitioner-field-delimiter' = ';',
                'sink.batch.max-size' = '100',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(
        table_name, stream_name, region
    )

def create_print_table(table_name, stream_name, region, stream_initpos):
    return """ CREATE TABLE {0} (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              )
              WITH (
                'connector' = 'print'
              ) """.format(
        table_name, stream_name, region, stream_initpos
    )

def main():
    # Application Property Keys
    input_property_group_key = "consumer.config.0"
    producer_property_group_key = "producer.config.0"

    input_stream_key = "input.stream.name"
    input_region_key = "aws.region"
    input_starting_position_key = "flink.stream.initpos"

    output_stream_key = "output.stream.name"
    output_region_key = "aws.region"

    # tables
    input_table_name = "input_table"
    output_table_name = "output_table"

    # get application properties
    props = get_application_properties()

    input_property_map = property_map(props, input_property_group_key)
    output_property_map = property_map(props, producer_property_group_key)

    input_stream = input_property_map[input_stream_key]
    input_region = input_property_map[input_region_key]
    stream_initpos = input_property_map[input_starting_position_key]

    output_stream = output_property_map[output_stream_key]
    output_region = output_property_map[output_region_key]

    # 2. Creates a source table from a Kinesis Data Stream
    table_env.execute_sql(
        create_source_table(input_table_name, input_stream, input_region, stream_initpos)
    )

    # 3. Creates a sink table writing to a Kinesis Data Stream
    table_env.execute_sql(
        create_sink_table(output_table_name, output_stream, output_region, stream_initpos)
        # It will work nicely if I use `create_print_table`
        # create_print_table(output_table_name, output_stream, output_region, stream_initpos)
    )

    # 4. Inserts the source table data into the sink table
    table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                                         .format(output_table_name, input_table_name))

    if is_local:
        table_result.wait()
    else:
        # get job status through TableResult
        print(table_result.get_job_client().get_job_status())

if __name__ == "__main__":
    main()

Can you please tell me what's wrong with it?

Thank you very much.

jeremyber-aws commented 4 months ago

Looks like a mismatch in Apache Flink dependencies:

apache-flink                       1.19.0
apache-flink-libraries             1.19.0

Can you use the correct version here and try again?

alucard001 commented 4 months ago

Looks like a mismatch in Apache Flink dependencies:

apache-flink                       1.19.0
apache-flink-libraries             1.19.0

Can you use the correct version here and try again?

Thanks. In my package list I am already using the above 1.19.0 version. If you mean I need to use correct version, may I ask what is the correct version?

Thank you.

jeremyber-aws commented 4 months ago

Sorry, I should be more specific. You're using the 1.15.2 version of the Kinesis connector with the 1.19 version of Apache Flink. Could you use the 1.15 version of Apache Flink and see if this error still occurs?