apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.34k stars 2.19k forks source link

Flink connector thrown `Trying to access closed classloader` #3776

Closed yittg closed 2 years ago

yittg commented 2 years ago

Hi, i'm trying to test the unreleased Flink 1.13 connector on a Flink 1.13.1 cluster running on k8s.

The task is simple like INSERT OVERWRITE a SELECT * FROM b.

When i submit a task for the first time, it's ok. But when i submit the task for the second time(run on the same TaskManager with the first time), the following exception is thrown, it's weird.

java.lang.IllegalStateException: Trying to access closed classloader ``` 2021-12-20 11:53:11,697 INFO org.apache.flink.runtime.taskmanager.Task [] - IcebergFilesCommitter -> Sink: IcebergSink ice.t1.d (1/1)#0 (e74babbac1ebb51fe72958bd177ae534) switched from INITIALIZING to RUNNING. 2021-12-20 11:53:11,854 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Committing dynamic partition overwrite with 1 data files and 0 delete files to table ice.t1.d 2021-12-20 11:53:11,946 WARN org.apache.flink.runtime.taskmanager.Task [] - IcebergFilesCommitter -> Sink: IcebergSink ice.t1.d (1/1)#0 (e74babbac1ebb51fe72958bd177ae534) switched from RUNNING to FAILED with failure cause: java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'. at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Unknown Source) at org.apache.iceberg.common.DynClasses$Builder.impl(DynClasses.java:68) at org.apache.iceberg.avro.GenericAvroReader$ReadBuilder.record(GenericAvroReader.java:85) at org.apache.iceberg.avro.GenericAvroReader$ReadBuilder.record(GenericAvroReader.java:72) at org.apache.iceberg.avro.AvroSchemaVisitor.visit(AvroSchemaVisitor.java:50) at org.apache.iceberg.avro.GenericAvroReader.initReader(GenericAvroReader.java:47) at org.apache.iceberg.avro.GenericAvroReader.setSchema(GenericAvroReader.java:53) at org.apache.iceberg.avro.ProjectionDatumReader.newDatumReader(ProjectionDatumReader.java:80) at org.apache.iceberg.avro.ProjectionDatumReader.setSchema(ProjectionDatumReader.java:69) at org.apache.iceberg.shaded.org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:133) at org.apache.iceberg.shaded.org.apache.avro.file.DataFileReader.(DataFileReader.java:130) at org.apache.iceberg.shaded.org.apache.avro.file.DataFileReader.(DataFileReader.java:122) at org.apache.iceberg.shaded.org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:66) at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:100) at org.apache.iceberg.avro.AvroIterable.getMetadata(AvroIterable.java:66) at org.apache.iceberg.ManifestReader.(ManifestReader.java:103) at org.apache.iceberg.ManifestFiles.read(ManifestFiles.java:87) at org.apache.iceberg.SnapshotProducer.newManifestReader(SnapshotProducer.java:378) at org.apache.iceberg.MergingSnapshotProducer$DataFileFilterManager.newManifestReader(MergingSnapshotProducer.java:716) at org.apache.iceberg.ManifestFilterManager.filterManifest(ManifestFilterManager.java:295) at org.apache.iceberg.ManifestFilterManager.lambda$filterManifests$0(ManifestFilterManager.java:186) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:405) at org.apache.iceberg.util.Tasks$Builder.access$300(Tasks.java:71) at org.apache.iceberg.util.Tasks$Builder$1.run(Tasks.java:311) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) ```
yittg commented 2 years ago

After some digging, i find the direct cause is that: when we build a AvroIterable, we didn't set the class loader, then the default Thread#getContextClassLoader will be used.

The task is running in a persist shared thread pool, whose context class loader will be closed in Flink TaskManager, after being used for the first time.

However just setting the class loader manually is not enough, because others code may still use the context class loader, which can not be avoided. An example exception stack like following, which is caused by ServiceLoader#load in JDK,

at javax.xml.parsers.FactoryFinder.findServiceProvider ``` java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'. at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:159) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResources(FlinkUserCodeClassLoaders.java:188) at java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1196) at java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1221) at java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1265) at java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1300) at java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1385) at javax.xml.parsers.FactoryFinder$1.run(FactoryFinder.java:287) at java.security.AccessController.doPrivileged(Native Method) at javax.xml.parsers.FactoryFinder.findServiceProvider(FactoryFinder.java:283) at javax.xml.parsers.FactoryFinder.find(FactoryFinder.java:261) at javax.xml.parsers.SAXParserFactory.newInstance(SAXParserFactory.java:147) at org.jdom.input.JAXPParserFactory.createParser(JAXPParserFactory.java:125) at jdk.internal.reflect.GeneratedMethodAccessor26.invoke(Unknown Source) at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:566) at org.jdom.input.SAXBuilder.createParser(SAXBuilder.java:585) at org.jdom.input.SAXBuilder.build(SAXBuilder.java:460) at org.jdom.input.SAXBuilder.build(SAXBuilder.java:807) at com.aliyun.oss.internal.ResponseParsers.getXmlRootElement(ResponseParsers.java:1015) at com.aliyun.oss.internal.ResponseParsers.parseListObjects(ResponseParsers.java:1028) at com.aliyun.oss.internal.ResponseParsers$ListObjectsReponseParser.parse(ResponseParsers.java:562) at com.aliyun.oss.internal.ResponseParsers$ListObjectsReponseParser.parse(ResponseParsers.java:556) at com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:152) at com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:113) at com.aliyun.oss.internal.OSSBucketOperation.listObjects(OSSBucketOperation.java:421) at com.aliyun.oss.OSSClient.listObjects(OSSClient.java:445) at org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore.listObjects(AliyunOSSFileSystemStore.java:434) at org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.getFileStatus(AliyunOSSFileSystem.java:273) at org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.create(AliyunOSSFileSystem.java:115) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:987) at org.apache.iceberg.hadoop.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:85) at org.apache.iceberg.avro.AvroFileAppender.(AvroFileAppender.java:51) at org.apache.iceberg.avro.Avro$WriteBuilder.build(Avro.java:198) at org.apache.iceberg.ManifestWriter$V1Writer.newAppender(ManifestWriter.java:277) at org.apache.iceberg.ManifestWriter.(ManifestWriter.java:58) at org.apache.iceberg.ManifestWriter.(ManifestWriter.java:34) at org.apache.iceberg.ManifestWriter$V1Writer.(ManifestWriter.java:256) at org.apache.iceberg.ManifestFiles.write(ManifestFiles.java:117) at org.apache.iceberg.SnapshotProducer.newManifestWriter(SnapshotProducer.java:370) at org.apache.iceberg.MergingSnapshotProducer$DataFileFilterManager.newManifestWriter(MergingSnapshotProducer.java:711) at org.apache.iceberg.ManifestFilterManager.filterManifestWithDeletedFiles(ManifestFilterManager.java:383) at org.apache.iceberg.ManifestFilterManager.filterManifest(ManifestFilterManager.java:308) at org.apache.iceberg.ManifestFilterManager.lambda$filterManifests$0(ManifestFilterManager.java:186) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:405) at org.apache.iceberg.util.Tasks$Builder.access$300(Tasks.java:71) at org.apache.iceberg.util.Tasks$Builder$1.run(Tasks.java:311) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.lang.Thread.run(Thread.java:829) ```

So looks like we should avoid using shared thread pool across different job.

yittg commented 2 years ago

To be clear, i encountered this issue by putting the iceberg Flink runtime jar into the lib directory as Flink suggested.

So i tested another scenario next, submit the iceberg jar as user code. As expected, it can work well for the first several jobs, then the followed job will fail on meta space OOM

java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has occurred. This can mean two things: either the job requires a larger size of JVM metaspace to load classes or there is a class loading leak. In the first case 'taskmanager.memory.jvm-metaspace.size' configuration option should be increased. If the error persists (usually in cluster after several job (re-)submissions) then there is probably a class loading leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...

You can also see many iceberg-worker-pool-0 threads because the worker pool can not be released along with the loaded user classes.

So either we need pass a ExecutorService to the tasks from the Flink context, ~or provide a mechanism to release these threads after job complete, like making use of RuntimeContext#registerUserCodeClassLoaderReleaseHookIfAbsent.~

release a shared pool may be not a good choice.

yittg commented 2 years ago

How about fixes this issue simply as #3797 ?

yittg commented 2 years ago

@rdblue Would you help to check this issue?

yittg commented 2 years ago

@openinx Would you help to check this issue?

wuwangben commented 2 years ago

I have the same problem

wuwangben commented 2 years ago

MetaSpace OOM

rdblue commented 2 years ago

I merged #3906. Ping me on the next one and I'll review that as well.