Open XiXiTan opened 2 months ago
写alluxio的客户端有增加如下配置,也同样报错 // 设置RPC重试参数 properties.set(PropertyKey.USER_RPC_RETRY_BASE_SLEEP_MS, "30000ms"); properties.set(PropertyKey.USER_RPC_RETRY_MAX_SLEEP_MS, "2min"); properties.set(PropertyKey.USER_RPC_RETRY_MAX_DURATION, "5min");
写alluxio的客户端有增加如下配置,也同样报错 // 设置RPC重试参数 properties.set(PropertyKey.USER_RPC_RETRY_BASE_SLEEP_MS, "30000ms"); properties.set(PropertyKey.USER_RPC_RETRY_MAX_SLEEP_MS, "2min"); properties.set(PropertyKey.USER_RPC_RETRY_MAX_DURATION, "5min");
` AlluxioProperties properties = new AlluxioProperties(); //将master节点设置成当前你alluxio服务器的IP地址或hostname properties.set(PropertyKey.MASTER_HOSTNAME, this.masterHost); properties.set(PropertyKey.MASTER_RPC_PORT, this.masterPort);
//操作alluxio集群的用户名,一般为运行alluxio-start.sh命令的用户
properties.set(PropertyKey.SECURITY_LOGIN_USERNAME, "alluxio");
// 设置RPC重试参数
properties.set(PropertyKey.USER_RPC_RETRY_BASE_SLEEP_MS, "30000ms");
properties.set(PropertyKey.USER_RPC_RETRY_MAX_SLEEP_MS, "2min");
properties.set(PropertyKey.USER_RPC_RETRY_MAX_DURATION, "5min");
//生成配置类
configuration = new InstancedConfiguration(properties);
fileSystem = FileSystem.Factory.create(configuration);
writeOptions = CreateFilePOptions.newBuilder()
.setWriteType(WritePType.CACHE_THROUGH)
.buildPartial();
`
` /**
@throws AlluxioException */ public static void writeWithDurationConfig(String inputFile, Integer duration) { //创建一个之前不存在的文件,并生成输出流 AlluxioURI path = new AlluxioURI(inputFile);
long startTime = System.currentTimeMillis();
long endTime = startTime + duration * 60 * 1000; // Convert duration to milliseconds
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try (FileOutStream fileOutStream = fileSystem.createFile(path, writeOptions)) {
while (System.currentTimeMillis() < endTime) {
String data = "this is test " + dateFormat.format(new Date()) + "\n";
try {
fileOutStream.write(data.getBytes());
} catch (IOException e) {
if (e instanceof CancelledException) {
System.out.println(tryNum + " 1 Failed to write data, retrying...");
e.printStackTrace();
tryNum++;
try {
Thread.sleep(1000 * 30);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println(tryNum + " 2 Failed to write data, retrying sleep finish...");
} else {
System.out.println(tryNum + " 3 Failed to write data, other...");
e.printStackTrace();
}
}
}
} catch (IOException | AlluxioException e) {
System.out.println("33 write file failed ... ");
e.printStackTrace();
}
} `
0 3 Failed to write data, other... alluxio.exception.status.UnavailableException: io exception (GrpcDataWriter{request=type: UFS_FILE id: -1 tier: 0 create_ufs_file_options { ufs_path: "hdfs://hdfs-ha/kcde/mem_para/writetest_2024_09_03_14_48_15___1_1" owner: "alluxio" group: "" mode: 438 mount_id: 7337261102989507363 acl { owningUser: "alluxio" owningGroup: "" userActions { name: "" actions { actions: READ actions: WRITE } } groupActions { name: "" actions { actions: READ actions: WRITE } } otherActions { actions: READ actions: WRITE } isDefault: false isEmpty: false } } medium_type: "" pin_on_create: false space_to_reserve: 67108864 , address=WorkerNetAddress{host=ip, containerHost=, rpcPort=29999, dataPort=29999, webPort=30000, domainSocketPath=, tieredIdentity=TieredIdentity(node=ip, rack=null)}}) at alluxio.exception.status.AlluxioStatusException.from(AlluxioStatusException.java:161) at alluxio.client.block.stream.GrpcBlockingStream.toAlluxioStatusException(GrpcBlockingStream.java:303) at alluxio.client.block.stream.GrpcBlockingStream.checkError(GrpcBlockingStream.java:284) at alluxio.client.block.stream.GrpcBlockingStream.send(GrpcBlockingStream.java:105) at alluxio.client.block.stream.GrpcDataMessageBlockingStream.sendDataMessage(GrpcDataMessageBlockingStream.java:103) at alluxio.client.block.stream.GrpcDataWriter.writeChunk(GrpcDataWriter.java:190) at alluxio.client.block.stream.BlockOutStream.updateCurrentChunk(BlockOutStream.java:255) at alluxio.client.block.stream.BlockOutStream.writeInternal(BlockOutStream.java:166) at alluxio.client.block.stream.BlockOutStream.write(BlockOutStream.java:136) at alluxio.client.file.AlluxioFileOutStream.writeInternal(AlluxioFileOutStream.java:289) at alluxio.client.file.AlluxioFileOutStream.write(AlluxioFileOutStream.java:233) at com.ksyun.xixi.WriteFile.writeDataWithRetry(WriteFile.java:150) at com.ksyun.xixi.WriteFile.writeWithDurationTry(WriteFile.java:138) at com.ksyun.xixi.AlluxioWriteFileDirectTest.main(AlluxioWriteFileDirectTest.java:34) Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception at io.grpc.Status.asRuntimeException(Status.java:535) at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:478) at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) at alluxio.grpc.GrpcChannel$1$1$1.onClose(GrpcChannel.java:160) at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:553) at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:68) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:739) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:718) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer
0 1 Failed to write data, retrying... alluxio.exception.status.CancelledException: Failed to send request chunk { data: "this is test 2024-09-03 14:49:06\nthis is test 2024-09-03 14:49:06\nthis is test 2024-09-03 14:49:06\nthis is test 2024-09-03 14:49:06\nthis is test 2024-09-03 14:49:06\nthis is test 2024-09-03 14:49:06\nthis is test 2024-09-03 14:49:06\nthis is test 2024-09-03 14:49:06\nthis is test 2024-0 ... <truncated 1080061 characters> }: stream is already closed or cancelled. clientClosed: false clientCancelled: true serverClosed: false (GrpcDataWriter{request=type: UFS_FILE id: -1 tier: 0 create_ufs_file_options { ufs_path: "hdfs://hdfs-ha/kcde/mem_para/writetest_2024_09_03_14_48_15___1_1" owner: "alluxio" group: "" mode: 438 mount_id: 7337261102989507363 acl { owningUser: "alluxio" owningGroup: "" userActions { name: "" actions { actions: READ actions: WRITE } } groupActions { name: "" actions { actions: READ actions: WRITE } } otherActions { actions: READ actions: WRITE } isDefault: false isEmpty: false } } medium_type: "" pin_on_create: false space_to_reserve: 67108864 , address=WorkerNetAddress{host=ip, containerHost=, rpcPort=29999, dataPort=29999, webPort=30000, domainSocketPath=, tieredIdentity=TieredIdentity(node=ip, rack=null)}}) at alluxio.client.block.stream.GrpcBlockingStream.send(GrpcBlockingStream.java:97) at alluxio.client.block.stream.GrpcDataMessageBlockingStream.sendDataMessage(GrpcDataMessageBlockingStream.java:103) at alluxio.client.block.stream.GrpcDataWriter.writeChunk(GrpcDataWriter.java:190) at alluxio.client.block.stream.BlockOutStream.updateCurrentChunk(BlockOutStream.java:255) at alluxio.client.block.stream.BlockOutStream.writeInternal(BlockOutStream.java:166) at alluxio.client.block.stream.BlockOutStream.write(BlockOutStream.java:136) at alluxio.client.file.AlluxioFileOutStream.writeInternal(AlluxioFileOutStream.java:289) at alluxio.client.file.AlluxioFileOutStream.write(AlluxioFileOutStream.java:233) at com.ksyun.xixi.WriteFile.writeDataWithRetry(WriteFile.java:150) at com.ksyun.xixi.WriteFile.writeWithDurationTry(WriteFile.java:138) at com.ksyun.xixi.AlluxioWriteFileDirectTest.main(AlluxioWriteFileDirectTest.java:34) 1 2 Failed to write data, retrying sleep finish...
增加rpc重试参数后,作业依然失败。从失败的异常栈里分析: worker挂了后, 1)第一次报错异常栈里有alluxio.client.block.stream.GrpcBlockingStream.checkError(GrpcBlockingStream.java:284)捕获到异常,修改了mCanceled=ture, 2)上面的修改,导致后续报错日志里反复抛异常 stream is already closed or cancelled. clientClosed: false clientCancelled: true serverClosed: false 导致写数据持续失败。
所以请问,除了新增rpc重试参数,是否还应该新增其他的参数配置?
请问目前alluxio 2901的设计,是只要worker 进程挂了,读写alluxio的作业就会失败。没有考虑故障转移,也就是把问题的worker的任务,转移到正常的worker继续提供服务么?
Alluxio Version: What version of Alluxio are you using? 2.9.0.1 Describe the bug A clear and concise description of what the bug is. worker挂了后,内存+ufs都写不了,但worker 10s内很快就恢复了。alluxio client针对worker暂不可用的情况,是否有相关配置,可以支持重试,以便wroker可用时,作业可以继续正常写数据?
To Reproduce Steps to reproduce the behavior (as minimally and precisely as possible) alluxio部署在k8s容器里 1)启动一个作业持续写alluxio 2) 期间kill alluxio worker pod ,alluxio worker pod最多在10s内会自行恢复 3)观察写alluxio作业的日志,一直报错 报错日志: alluxio.exception.status.UnavailableException: io exception (GrpcDataWriter{request=type: UFS_FILE id: -1 tier: 0 create_ufs_file_options { ufs_path: "hdfs://hdfs-ha/kcde/mem_para/writetest_2024_09_03_12_06_39___1_1" owner: "alluxio" group: "" mode: 438 mount_id: 7337261102989507363 acl { owningUser: "alluxio" owningGroup: "" userActions { name: "" actions { actions: READ actions: WRITE } } groupActions { name: "" actions { actions: READ actions: WRITE } } otherActions { actions: READ actions: WRITE } isDefault: false isEmpty: false } } medium_type: "" pin_on_create: false space_to_reserve: 67108864 , address=WorkerNetAddress{host=ip, containerHost=, rpcPort=29999, dataPort=29999, webPort=30000, domainSocketPath=, tieredIdentity=TieredIdentity(node=ip, rack=null)}}) at alluxio.exception.status.AlluxioStatusException.from(AlluxioStatusException.java:161) at alluxio.client.block.stream.GrpcBlockingStream.toAlluxioStatusException(GrpcBlockingStream.java:303) at alluxio.client.block.stream.GrpcBlockingStream.checkError(GrpcBlockingStream.java:284) at alluxio.client.block.stream.GrpcBlockingStream.send(GrpcBlockingStream.java:105) at alluxio.client.block.stream.GrpcDataMessageBlockingStream.sendDataMessage(GrpcDataMessageBlockingStream.java:103) at alluxio.client.block.stream.GrpcDataWriter.writeChunk(GrpcDataWriter.java:190) at alluxio.client.block.stream.BlockOutStream.updateCurrentChunk(BlockOutStream.java:255) at alluxio.client.block.stream.BlockOutStream.writeInternal(BlockOutStream.java:166) at alluxio.client.block.stream.BlockOutStream.write(BlockOutStream.java:136) at alluxio.client.file.AlluxioFileOutStream.writeInternal(AlluxioFileOutStream.java:289) at alluxio.client.file.AlluxioFileOutStream.write(AlluxioFileOutStream.java:233) at com.ksyun.xixi.WriteFile.writeWithDurationConfig(WriteFile.java:190) at com.ksyun.xixi.AlluxioWriteFileDirectTest.main(AlluxioWriteFileDirectTest.java:37) Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception at io.grpc.Status.asRuntimeException(Status.java:535) at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:478) at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) at alluxio.grpc.GrpcChannel$1$1$1.onClose(GrpcChannel.java:160) at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:553) at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:68) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:739) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:718) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer
Expected behavior A clear and concise description of what you expected to happen. 期望在alluxio worker pod恢复后,作业能够正常写入数据,而worker恢复后,该作业也无法正常写入数据
Urgency Describe the impact and urgency of the bug. 紧急。这个问题,导致worker重启会导致线上写alluxio作业失败
Are you planning to fix it Please indicate if you are already working on a PR. 暂时没有找到解决方案
Additional context Add any other context about the problem here.