apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.35k stars 2.42k forks source link

[SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI #7591

Closed soumilshah1995 closed 1 year ago

soumilshah1995 commented 1 year ago

Hello I am trying out Kinesis Data Analytics with Apache Flink here is issue I am facing Taking following ticket as references https://github.com/apache/hudi/issues/3707

image Here you can see my JAR Files

image

Taking following Hello world into Account https://hudi.apache.org/docs/flink-quick-start-guide/ image

%flink.ssql(type=update)
INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01');

Error Message

java.io.IOException: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01')'.
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
    at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
    at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
    at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
    at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
    at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01')'.
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
    at org.apache.flink.api.java.ScalaShellStreamEnvironment.executeAsync(ScalaShellStreamEnvironment.java:73)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)
    at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1461)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
    ... 14 more
Caused by: java.lang.RuntimeException: Error while waiting for job to be initialized
    at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160)
    at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:82)
    at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
    at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
    ... 1 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
    at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$0(AbstractSessionClusterExecutor.java:83)
    at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:144)
    ... 5 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    ... 1 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/172.20.211.130:8082
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063)
    ... 19 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/172.20.211.130:8082
Caused by: java.net.ConnectException: Connection refused
    at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
    at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.base/java.lang.Thread.run(Thread.java:829)

Packages

danny0405 commented 1 year ago

I see you use the streaming execution mode for VALUES SQL statement, did you try the batch execution mode instead then ?

soumilshah1995 commented 1 year ago

Hey Danny Yes I did try that yesterday I could not get it to work. I keep getting this same error message

also i tried inserting data from kinesis stream i keep getting same error for that as well :D

On Tue, Jan 3, 2023 at 12:08 AM Danny Chan @.***> wrote:

I see you use the streaming execution mode for VALUES SQL statement, did you try the batch execution mode instead then ?

— Reply to this email directly, view it on GitHub https://github.com/apache/hudi/issues/7591#issuecomment-1369400346, or unsubscribe https://github.com/notifications/unsubscribe-auth/AJMF5P6ZHZ5RE7JPFL2FDTLWQOX3BANCNFSM6AAAAAATO3NBLA . You are receiving this because you authored the thread.Message ID: @.***>

-- Thanking You, Soumil Nitin Shah

soumilshah1995 commented 1 year ago

Here is details again i have tried again this morning

Please note This time i am on US-WEST-2 previously i was trying on US-EAST-1

Kinesis Streams

image

Python Code to Dump Dummy Data

try:
    import datetime
    import json
    import random
    import boto3
    import os
    import uuid
    import time
    from faker import Faker

    from dotenv import load_dotenv
    load_dotenv(".env")
except Exception as e:
    pass

global faker
faker = Faker()

def getReferrer():
    data = {}
    now = datetime.now()
    str_now = now.isoformat()
    data['uuid'] = str(uuid.uuid4())
    data['event_time'] = str_now

    data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
    price = random.random() * 100
    data['price'] = round(price, 2)
    return data

while True:
    data = json.dumps(getReferrer())
    print(data)
    global kinesis_client

    kinesis_client = boto3.client('kinesis',
                                  region_name=os.getenv("DEV_AWS_REGION_NAME"),
                                  aws_access_key_id=os.getenv("DEV_ACCESS_KEY"),
                                  aws_secret_access_key=os.getenv("DEV_SECRET_KEY")
                                  )

    res = kinesis_client.put_record(
        StreamName="stock-streams",
        Data=data,
        PartitionKey="1")
    time.sleep(3)

KDA

image

Settings Added JAR Files

image

%flink.ssql(type=update)

DROP TABLE if exists stock_table;

CREATE TABLE stock_table (
    uuid varchar,
    ticker VARCHAR,
    price DOUBLE,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (ticker)
WITH (
    'connector' = 'kinesis',
    'stream' = 'stock-streams',
    'aws.region' = 'us-west-2',
    'scan.stream.initpos' = 'LATEST',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
);

image

%flink.ssql(type=update)

DROP TABLE if exists stock_table_hudi;

CREATE TABLE stock_table_hudi(
    uuid varchar,
    ticker VARCHAR,
    price DOUBLE,
    event_time TIMESTAMP(3)
)
WITH (
  'connector' = 'hudi',
  'path' = 's3://soumil-dms-learn',
  'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);

image

Real Time Data

image

Inserting Data

%flink.ssql(type=update)

INSERT INTO stock_table_hudi 
SELECT  uuid, ticker, price, event_time as ts from stock_table;

Error Messages Same as above

ConnectException: Connection refused
java.io.IOException: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi 
SELECT  uuid, ticker, price, event_time as ts from stock_table'.
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
    at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
    at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
    at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
    at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
    at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi 
SELECT  uuid, ticker, price, event_time as ts from stock_table'.
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
    at org.apache.flink.api.java.ScalaShellStreamEnvironment.executeAsync(ScalaShellStreamEnvironment.java:73)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)
    at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1461)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
    ... 14 more
Caused by: java.lang.RuntimeException: Error while waiting for job to be initialized
    at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160)
    at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:82)
    at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
    at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
    ... 1 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
    at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$0(AbstractSessionClusterExecutor.java:83)
    at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:144)
    ... 5 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    ... 1 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/172.20.217.181:8082
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063)
    ... 19 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/172.20.217.181:8082
Caused by: java.net.ConnectException: Connection refused
    at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
    at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.base/java.lang.Thread.run(Thread.java:829)

image

soumilshah1995 commented 1 year ago

Hey guys Good morning Any updates ?

soumilshah1995 commented 1 year ago

Good Evening do we have any updates

danny0405 commented 1 year ago

I didn't see any error stack trace about hudi, it is hard to locate the cause of the problem, there is a Connection refused error thrown from Netty, maybe the TaskManager has been crashed, can you see the errors from the TM log file then ? Or JM log.

soumilshah1995 commented 1 year ago

Hi danny i do not have logs i am using AWS kinesis managed service for Flink i have provided all necessary steps if needed team can replicate the error. please let me know if you need any further information

soumilshah1995 commented 1 year ago

@davidshtian

davidshtian commented 1 year ago

@davidshtian

@soumilshah1995 Have you tried 1.13.2 version of the packege flink-s3-fs-hadoop-1.13.2.jar? As KDA supports for Apache Flink version 1.13.2, thanks~

soumilshah1995 commented 1 year ago

Detailed Steps i have followed that allows anyone to replicate

@nsivabalan @davidshtian @danny0405

Step 1: Download and Upload the JAR into S3

https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop/1.13.0https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink-bundle_2.12/0.10.1/hudi-flink-bundle_2.12-0.10.1.jar image

Step 2: Create KDA Application

image

Provide Name to KDA APP

image

Select the Glue database

image

NOTE i have given Admin Access there should not be problem with access

Add Custom JAR we downloaded

image

image image

Step 3: Launch Zeplin Notebooks

image

Step 4: Create a Kinesis Stream called stock-streams

image

Add some data into streams
try:
    import datetime
    import json
    import random
    import boto3
    import os
    import uuid
    import time
    from faker import Faker

    from dotenv import load_dotenv
    load_dotenv(".env")
except Exception as e:
    pass

global faker
faker = Faker()

def getReferrer():
    data = {}
    now = datetime.datetime.now()
    str_now = now.isoformat()
    data['uuid'] = str(uuid.uuid4())
    data['event_time'] = str_now

    data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
    price = random.random() * 100
    data['price'] = round(price, 2)
    return data

while True:
    data = json.dumps(getReferrer())

    global kinesis_client

    kinesis_client = boto3.client('kinesis',
                                  region_name='us-west-2',
                                  aws_access_key_id=os.getenv("DEV_ACCESS_KEY"),
                                  aws_secret_access_key=os.getenv("DEV_SECRET_KEY")
                                  )

    res = kinesis_client.put_record(
        StreamName="stock-streams",
        Data=data,
        PartitionKey="1")
    print(data, " " , res)
    time.sleep(2)

image

Step 5: Lets go try hello world on HUDI

%flink.ssql(type=update)
DROP TABLE if exists stock_table;
CREATE TABLE stock_table (
    uuid varchar,
    ticker VARCHAR,
    price DOUBLE,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
    PARTITIONED BY (ticker)
WITH (
    'connector' = 'kinesis',
    'stream' = 'stock-streams',
    'aws.region' = 'us-west-2',
    'scan.stream.initpos' = 'TRIM_HORIZON',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
);

image image

%flink.ssql(type=update)
DROP TABLE if exists stock_table_hudi;
CREATE TABLE stock_table_hudi(
    uuid varchar  ,
    ticker VARCHAR,
    price DOUBLE,
    event_time TIMESTAMP(3),
    PRIMARY KEY (`uuid`) NOT Enforced

)
WITH (
    'connector' = 'hudi',
    'path' = 's3://soumilshah-hudi-demos/tmp/',
    'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);

INSERT Into HUDI

%flink.ssql(type=update)
INSERT INTO stock_table_hudi
SELECT  uuid, ticker, price, event_time  from stock_table;

Error Messages

java.io.IOException: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi
SELECT  uuid, ticker, price, event_time  from stock_table'.
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
    at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
    at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
    at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
    at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
    at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi
SELECT  uuid, ticker, price, event_time  from stock_table'.
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
    at org.apache.flink.api.java.ScalaShellStreamEnvironment.executeAsync(ScalaShellStreamEnvironment.java:73)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)
    at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1461)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
    ... 14 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)
    at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
    at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    ... 1 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
    ... 21 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/10.100.42.225:8082
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063)
    ... 19 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/10.100.42.225:8082
Caused by: java.net.ConnectException: Connection refused
    at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
    at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.base/java.lang.Thread.run(Thread.java:829)

please let me know if you guys need any other details i have given all steps which help to rectify the problem and solve it happy to hop on call or meeting to show you steps too Email shahsoumil519@gmail.com

danny0405 commented 1 year ago

Hello, @umehrot2 sorry to bother, is there any chance you can give some validations here ? It seems this customer is from AWS though.

davidshtian commented 1 year ago

@soumilshah1995 I used 1.13.2 version of the packege flink-s3-fs-hadoop-1.13.2.jar not flink-s3-fs-hadoop-1.13.0.jar and it could work some time, because the resources within KDA Studio seems not very stable, while in my opinion it's still worth a try.

By the way, it's really hard to troubleshoot as CloudWatch log stream is always spammed with errors like below, no more useful information showed what was going on under the hood.

{
    "applicationARN": "arn:aws:kinesisanalytics:us-east-1:xxx:application/kda-studio-david",
    "applicationVersionId": "19",
    "locationInformation": "org.apache.zeppelin.flink.JobManager$FlinkJobProgressPoller.run(JobManager.java:269)",
    "logger": "org.apache.zeppelin.flink.JobManager",
    "message": "Fail to poll flink job progress via rest api",
    "messageSchemaVersion": "1",
    "messageType": "ERROR",
    "threadName": "JobProgressPoller-Thread-paragraph_1673235629990_1003353402",
    "throwableInformation": "com.mashape.unirest.http.exceptions.UnirestException: java.lang.RuntimeException: java.lang.RuntimeException: org.json.JSONException: A JSONArray text must start with '[' at 1 [character 2 line 1]\n\tat com.mashape.unirest.http.HttpClientHelper.request(HttpClientHelper.java:143)\n\tat com.mashape.unirest.request.BaseRequest.asJson(BaseRequest.java:68)\n\tat org.apache.zeppelin.flink.JobManager$FlinkJobProgressPoller.run(JobManager.java:212)\nCaused by: java.lang.RuntimeException: java.lang.RuntimeException: org.json.JSONException: A JSONArray text must start with '[' at 1 [character 2 line 1]\n\tat com.mashape.unirest.http.HttpResponse.<init>(HttpResponse.java:106)\n\tat com.mashape.unirest.http.HttpClientHelper.request(HttpClientHelper.java:139)\n\t... 2 more\nCaused by: java.lang.RuntimeException: org.json.JSONException: A JSONArray text must start with '[' at 1 [character 2 line 1]\n\tat com.mashape.unirest.http.JsonNode.<init>(JsonNode.java:51)\n\tat com.mashape.unirest.http.HttpResponse.<init>(HttpResponse.java:95)\n\t... 3 more\nCaused by: org.json.JSONException: A JSONArray text must start with '[' at 1 [character 2 line 1]\n\tat org.json.JSONTokener.syntaxError(JSONTokener.java:433)\n\tat org.json.JSONArray.<init>(JSONArray.java:106)\n\tat org.json.JSONArray.<init>(JSONArray.java:145)\n\tat com.mashape.unirest.http.JsonNode.<init>(JsonNode.java:48)\n\t... 4 more\n"
}
davidshtian commented 1 year ago

Or lots of similar logs like this "RemoteTransportException: Connection unexpectedly closed by remote task manager '142.151.160.20/142.151.160.20:6121'. This might indicate that the remote task manager was lost. This may indicate that the taskmanagers are overloaded."

"applicationARN": "arn:aws:kinesisanalytics:us-east-1:xxx:application/kda-studio-david",
"applicationVersionId": "19",
"locationInformation": "org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:163)",
"logger": "org.apache.zeppelin.flink.FlinkSqlInterrpeter",
"message": "Fail to run sql:INSERT INTO t1 SELECT uuid, event_time, ticker,price from stock_table",
"messageSchemaVersion": "1",
"messageType": "ERROR",
"threadName": "ParallelScheduler-Worker-9",
"throwableInformation": "java.io.IOException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 3e6e3e07934084f5fafc542a4a91bc2c)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538
at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112
at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744
at org.apache.zeppelin.scheduler.Job.run(Job.java:172
at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132
at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628
at java.base/java.lang.Thread.run(Thread.java:829
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 3e6e3e07934084f5fafc542a4a91bc2c
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:125
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$24(RestClusterClient.java:670
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506
at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478
... 3 mor
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:123
... 23 mor
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrateg
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:222
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:212
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:203
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:718
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443
at jdk.internal.reflect.GeneratedMethodAccessor100.invoke(Unknown Source
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43
at java.base/java.lang.reflect.Method.invoke(Method.java:566
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172
at akka.actor.Actor.aroundReceive(Actor.scala:517
at akka.actor.Actor.aroundReceive$(Actor.scala:515
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592
at akka.actor.ActorCell.invoke(ActorCell.scala:561
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258
at akka.dispatch.Mailbox.run(Mailbox.scala:225
at akka.dispatch.Mailbox.exec(Mailbox.scala:235
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager '142.151.160.20/142.151.160.20:6121'. This might indicate that the remote task manager was lost
at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:160
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241
at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81
at org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:389
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354
at org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1106
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472
at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74
at java.base/java.lang.Thread.run(Thread.java:829)
"
}
davidshtian commented 1 year ago

@soumilshah1995 For your reference, in order to write data to S3 hudi sink, checkpoint need to be enabled (restart the interpreter and execute).

%flink.conf
execution.checkpointing.interval 1000

And I also add below option to hudi table for disabling timeline server, otherwise errors like "Caused by: org.apache.hudi.exception.HoodieRemoteException: Connect to 142.151.165.206:37851 [/142.151.165.206] failed: Connection timed out (Connection timed out)" will be thrown (based on actual tests).

'hoodie.embed.timeline.server' = 'false'

Thanks~

soumilshah1995 commented 1 year ago

Thank you @davidshtian here are my results based on feedback

I am now trying this with following changes

Attempt 3

Step 1: Made the Stream Again

image

Step 2 Upload the Jar on S3 as mentioned by David

image

Step 3 Creating KDA

image

Step 4 Executing Code

image

Same Error

java.io.IOException: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi
SELECT  uuid, ticker, price, event_time  from stock_table where uuid is NOT NULL'.
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
    at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
    at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
    at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
    at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
    at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi
SELECT  uuid, ticker, price, event_time  from stock_table where uuid is NOT NULL'.
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
    at org.apache.flink.api.java.ScalaShellStreamEnvironment.executeAsync(ScalaShellStreamEnvironment.java:73)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)
    at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1461)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
    ... 14 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)
    at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
    at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    ... 1 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
    ... 21 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/172.20.189.165:8082
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063)
    ... 19 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/172.20.189.165:8082
Caused by: java.net.ConnectException: Connection refused
    at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
    at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.base/java.lang.Thread.run(Thread.java:829)
davidshtian commented 1 year ago

@soumilshah1995 I tried again, it worked as below, for your reference. Thanks~

Step 1 – Kinesis Stream

image

Step 2 – Jars

image image

Step 3 – KDA Studio

image

Step 4 – Executing the code

image image image image image image image image image
davidshtian commented 1 year ago

From the logs java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/172.20.189.165:8082, it seems that either the Flink cluster was not started at all, or it might be a network issue.

image
soumilshah1995 commented 1 year ago

im trying this today itself lets hope for best

soumilshah1995 commented 1 year ago

Dear @davidshtian

I wanted to take a moment to thank you for the help you provided me. Your assistance was extremely valuable and I appreciate your willingness to lend a hand. Your support is greatly appreciated and I'm thankful for your help.

i was able to resolve this issue. i am going to work release a nice tutorial for community on Flink and hudi and hope that helps all

Thanks again for your help.

Sincerely, shah

davidshtian commented 1 year ago

@soumilshah1995 Great~ Kindly looking forward to the tutorial! Curious about what is the root cause for your case. Thanks~

danny0405 commented 1 year ago

@soumilshah1995

i am going to work release a nice tutorial for community on Flink and hudi and hope

What a great news, welcome on board the Hudi community, there are so many nice guys that are smart and warm, I think you can make the community better and better.

davidshtian commented 1 year ago

@soumilshah1995 Is it related to s3 and s3a schema?

soumilshah1995 commented 1 year ago

@davidshtian would be hard to tell exact root cause i think issue was with jar files Here is video for community

https://www.youtube.com/watch?v=8XS8egfrS_o

Code : https://github.com/soumilshah1995/kinesis-flink-labs

1
soumilshah1995 commented 1 year ago

@davidshtian what are recommend setting for querying data in Athena AKA Hive SYNC?

%flink.ssql(type=update)
DROP TABLE if exists orders;
CREATE TABLE orders(
    orderid VARCHAR PRIMARY KEY NOT ENFORCED,
    customer_id VARCHAR,
    ts TIMESTAMP(3),
    order_value DOUBLE,
    priority VARCHAR
)
WITH (
    'connector' = 'hudi',
    'path' = 's3a://soumilshah-hudi-demos/tmp/',
    'table.type' = 'COPY_ON_WRITE' ,
    'hoodie.embed.timeline.server' = 'false',
    'hive_sync.enable' = 'true',  
    'hive_sync.mode' = 'hms'     

);

@davidshtian also curious why did you use %ssql instead of %flink.ssql(type=update)

Also follow up Question at times KDA throws Error is that normal looks like its very unstable what are your honest feedback on that

davidshtian commented 1 year ago

@davidshtian what are recommend setting for querying data in Athena AKA Hive SYNC?

%flink.ssql(type=update)
DROP TABLE if exists orders;
CREATE TABLE orders(
    orderid VARCHAR PRIMARY KEY NOT ENFORCED,
    customer_id VARCHAR,
    ts TIMESTAMP(3),
    order_value DOUBLE,
    priority VARCHAR
)
WITH (
    'connector' = 'hudi',
    'path' = 's3a://soumilshah-hudi-demos/tmp/',
    'table.type' = 'COPY_ON_WRITE' ,
    'hoodie.embed.timeline.server' = 'false',
    'hive_sync.enable' = 'true',  
    'hive_sync.mode' = 'hms'     

);

@davidshtian also curious why did you use %ssql instead of %flink.ssql(type=update)

Also follow up Question at times KDA throws Error is that normal looks like its very unstable what are your honest feedback on that

@soumilshah1995 %ssql is also configured by default.

image

For Glue catalog sync, yes it should be using hive_sync but it is required to upload a new version of hudi-flink jar file (and others related), as hudi docs shows:

hudi-flink-bundle module pom.xml sets the scope related to hive as provided by default. If you want to use hive sync, you need to use the profile flink-bundle-shade-hive during packaging.

'hive_sync.enable'='true',
'hive_sync.db'='<your db>',
'hive_sync.table' = '<your table>',
'hive_sync.mode' = 'hms',

I've tried it but failed to sync, not sure if it is due to this part:

image

And I used manual way to create the table for Athena.

Cool content on Youtube~ 👍

soumilshah1995 commented 1 year ago

You are right man i used it manually to using DDL statement to create table in Athena

soumilshah1995 commented 1 year ago

Since issue has been resolved i will close this i shall re open if i find bugs thank you all

Adi0000 commented 1 year ago

Hi @soumilshah1995 , i am try to insert into hudi table from source table but getting this below error. i tried multiple time re-start and also tried increase kpu to 12 but same error. can you please help here. i am doing your project RDS -> DMS -> KINESIS -> kinesis analytics -> HUDI

java.io.IOException: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO cust_info_hudi (
    SELECT
        data.id,
        data.first_name,
        data.middle_name,
        data.last_name,
        data.about_me,
        data.phone_number,
        data.age,
        data.address,
        data.street,
        data.dist,
        data.city,
        data.state,
        data.country
    FROM
        source_cust_info
    )'.
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
    at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
    at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
    at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
    at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
    at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO cust_info_hudi (
    SELECT
        data.id,
        data.first_name,
        data.middle_name,
        data.last_name,
        data.about_me,
        data.phone_number,
        data.age,
        data.address,
        data.street,
        data.dist,
        data.city,
        data.state,
        data.country
    FROM
        source_cust_info
    )'.
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
    at org.apache.flink.api.java.ScalaShellStreamEnvironment.executeAsync(ScalaShellStreamEnvironment.java:73)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)
    at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1461)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
    ... 14 more
Caused by: java.lang.RuntimeException: Error while waiting for job to be initialized
    at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160)
    at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:82)
    at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
    at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
    ... 1 more
soumilshah1995 commented 1 year ago

i am doing your project RDS -> DMS -> KINESI

Hey looks like jar or configuration issuer can you follow steps here https://github.com/soumilshah1995/aws-dms-kinesis-flink-hudi

https://github.com/soumilshah1995/dynamodb-flink-kinesis