apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.35k stars 3.49k forks source link

[C++][S3] Crash on exit #36346

Closed kou closed 1 year ago

kou commented 1 year ago

Describe the bug, including details regarding any error messages, version, and platform.

arrow::FinalizeS3() doesn't call both of RegionResolver::ResetDefaultInstance() and Aws::ShutdownAPI() by #33858. This may cause a crash on exit by the "SubTreeFileSystem$create() with URI" R test: https://github.com/apache/arrow/blob/0344a2cdf6219708a25f39e580406e0ce692b61e/r/tests/testthat/test-filesystem.R#L154-L164

For example, it's not happen on the current main but it's happen on #36230:

https://github.com/apache/arrow/actions/runs/5384835055/jobs/9793825156?pr=36230#step:6:33597

pure virtual method called
terminate called without an active exception
Aborted (core dumped)

I could reproduce this by running only the test (I commented out all other tests). And here is the backtrace for the case:

(gdb) bt
#0  0x00007fd53f20600b in raise () from /usr/lib/x86_64-linux-gnu/libc.so.6
#1  0x00007fd53f1e5859 in abort () from /usr/lib/x86_64-linux-gnu/libc.so.6
#2  0x00007fd53cd45911 in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#3  0x00007fd53cd5138c in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#4  0x00007fd53cd513f7 in std::terminate() () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#5  0x00007fd53cd52155 in __cxa_pure_virtual () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#6  0x00007fd536724dc4 in Aws::Http::CurlHandleContainer::~CurlHandleContainer (
    this=0x559db1f2ac78, __in_chrg=<optimized out>)
    at /build/cpp/awssdk_ep-prefix/src/awssdk_ep/aws-cpp-sdk-core/source/http/curl/CurlHandleContainer.cpp:27
#7  0x00007fd5366e6464 in Aws::Http::CurlHttpClient::~CurlHttpClient (this=0x559db1f2ac10, 
    __in_chrg=<optimized out>)
    at /build/cpp/awssdk_ep-prefix/src/awssdk_ep/aws-cpp-sdk-core/include/aws/core/http/curl/CurlHttpClient.h:26
#8  0x00007fd5366dd5a7 in __gnu_cxx::new_allocator<Aws::Http::CurlHttpClient>::destroy<Aws::Http::CurlHttpClient> (this=0x559db1f2ac10, __p=0x559db1f2ac10)
    at /usr/include/c++/9/ext/new_allocator.h:152
#9  0x00007fd5366dd4f3 in std::allocator_traits<std::allocator<Aws::Http::CurlHttpClient> >::destroy<Aws::Http::CurlHttpClient> (__a=..., __p=0x559db1f2ac10)
    at /usr/include/c++/9/bits/alloc_traits.h:496
#10 0x00007fd5366dd31b in std::_Sp_counted_ptr_inplace<Aws::Http::CurlHttpClient, std::allocator<Aws::Http::CurlHttpClient>, (__gnu_cxx::_Lock_policy)2>::_M_dispose (this=0x559db1f2ac00)
    at /usr/include/c++/9/bits/shared_ptr_base.h:557
#11 0x00007fd5346cb074 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release (
    this=0x559db1f2ac00) at /usr/include/c++/9/bits/shared_ptr_base.h:155
#12 0x00007fd5346c7881 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count (
    this=0x559dada631b0, __in_chrg=<optimized out>)
    at /usr/include/c++/9/bits/shared_ptr_base.h:730
#13 0x00007fd5363cd5d2 in std::__shared_ptr<Aws::Http::HttpClient, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr (this=0x559dada631a8, __in_chrg=<optimized out>)
    at /usr/include/c++/9/bits/shared_ptr_base.h:1169
#14 0x00007fd5363cd5f2 in std::shared_ptr<Aws::Http::HttpClient>::~shared_ptr (
    this=0x559dada631a8, __in_chrg=<optimized out>) at /usr/include/c++/9/bits/shared_ptr.h:103
#15 0x00007fd5363cd736 in Aws::Client::AWSClient::~AWSClient (this=0x559dada63180, 
    __in_chrg=<optimized out>)
    at /build/cpp/awssdk_ep-prefix/src/awssdk_ep/aws-cpp-sdk-core/include/aws/core/client/AWSClient.h:97
#16 0x00007fd5363cd960 in Aws::Client::AWSXMLClient::~AWSXMLClient (this=0x559dada63180, 
    __in_chrg=<optimized out>)
    at /build/cpp/awssdk_ep-prefix/src/awssdk_ep/aws-cpp-sdk-core/include/aws/core/client/AWSXmlClient.h:44
#17 0x00007fd5363e15c6 in Aws::S3::S3Client::~S3Client (this=0x559dada63180, 
    __in_chrg=<optimized out>)
    at /build/cpp/awssdk_ep-prefix/src/awssdk_ep/aws-cpp-sdk-s3/source/S3Client.cpp:246
#18 0x00007fd535f9c7c2 in arrow::fs::(anonymous namespace)::S3Client::~S3Client (
    this=0x559dada63180, __in_chrg=<optimized out>)
    at /arrow/cpp/src/arrow/filesystem/s3fs.cc:547
#19 0x00007fd535f9de17 in __gnu_cxx::new_allocator<arrow::fs::(anonymous namespace)::S3Client>::destroy<arrow::fs::(anonymous namespace)::S3Client> (this=0x559dada63180, __p=0x559dada63180)
    at /usr/include/c++/9/ext/new_allocator.h:152
#20 0x00007fd535f9da55 in std::allocator_traits<std::allocator<arrow::fs::(anonymous namespace)::S3Client> >::destroy<arrow::fs::(anonymous namespace)::S3Client> (__a=..., __p=0x559dada63180)
    at /usr/include/c++/9/bits/alloc_traits.h:496
#21 0x00007fd535f9d37b in std::_Sp_counted_ptr_inplace<arrow::fs::(anonymous namespace)::S3Client, std::allocator<arrow::fs::(anonymous namespace)::S3Client>, (__gnu_cxx::_Lock_policy)2>::_M_dispose (this=0x559dada63170) at /usr/include/c++/9/bits/shared_ptr_base.h:557
#22 0x00007fd5346cb074 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release (
    this=0x559dada63170) at /usr/include/c++/9/bits/shared_ptr_base.h:155
#23 0x00007fd5346c7881 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count (
    this=0x559dabe990e8, __in_chrg=<optimized out>)
    at /usr/include/c++/9/bits/shared_ptr_base.h:730
#24 0x00007fd535f81ed0 in std::__shared_ptr<arrow::fs::(anonymous namespace)::S3Client, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr (this=0x559dabe990e0, __in_chrg=<optimized out>)
    at /usr/include/c++/9/bits/shared_ptr_base.h:1169
#25 0x00007fd535f81eec in std::shared_ptr<arrow::fs::(anonymous namespace)::S3Client>::~shared_ptr (this=0x559dabe990e0, __in_chrg=<optimized out>) at /usr/include/c++/9/bits/shared_ptr.h:103
#26 0x00007fd535f96756 in arrow::fs::(anonymous namespace)::RegionResolver::~RegionResolver (
    this=0x559dabe98c90, __in_chrg=<optimized out>)
    at /arrow/cpp/src/arrow/filesystem/s3fs.cc:795
#27 0x00007fd535f9d650 in std::_Sp_counted_ptr<arrow::fs::(anonymous namespace)::RegionResolver*, (__gnu_cxx::_Lock_policy)2>::_M_dispose (this=0x559db17f9990)
    at /usr/include/c++/9/bits/shared_ptr_base.h:377
#28 0x00007fd5346cb074 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release (
    this=0x559db17f9990) at /usr/include/c++/9/bits/shared_ptr_base.h:155
#29 0x00007fd5346c7881 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count (
    this=0x7fd538084a38 <arrow::fs::(anonymous namespace)::RegionResolver::instance_+8>, 
    __in_chrg=<optimized out>) at /usr/include/c++/9/bits/shared_ptr_base.h:730
#30 0x00007fd535f8252e in std::__shared_ptr<arrow::fs::(anonymous namespace)::RegionResolver, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr (
    this=0x7fd538084a30 <arrow::fs::(anonymous namespace)::RegionResolver::instance_>, 
    __in_chrg=<optimized out>) at /usr/include/c++/9/bits/shared_ptr_base.h:1169
#31 0x00007fd535f8254e in std::shared_ptr<arrow::fs::(anonymous namespace)::RegionResolver>::~shared_ptr (this=0x7fd538084a30 <arrow::fs::(anonymous namespace)::RegionResolver::instance_>, 
    __in_chrg=<optimized out>) at /usr/include/c++/9/bits/shared_ptr.h:103
#32 0x00007fd53f2098a7 in ?? () from /usr/lib/x86_64-linux-gnu/libc.so.6
#33 0x00007fd53f209a60 in exit () from /usr/lib/x86_64-linux-gnu/libc.so.6
#34 0x00007fd53f63e781 in ?? () from /usr/lib/R/lib/libR.so
#35 0x00007fd53f641140 in R_CleanUp () from /usr/lib/R/lib/libR.so
#36 0x00007fd53f53d0ee in run_Rmainloop () from /usr/lib/R/lib/libR.so
#37 0x0000559da998e09f in main ()
#38 0x00007fd53f1e7083 in __libc_start_main () from /usr/lib/x86_64-linux-gnu/libc.so.6
#39 0x0000559da998e0de in _start ()
#6  0x00007fd536724dc4 in Aws::Http::CurlHandleContainer::~CurlHandleContainer (
    this=0x559db1f2ac78, __in_chrg=<optimized out>)
    at /build/cpp/awssdk_ep-prefix/src/awssdk_ep/aws-cpp-sdk-core/source/http/curl/CurlHandleContainer.cpp:27

is here: https://github.com/aws/aws-sdk-cpp/blob/1fb97256a2ae7211a741fda0033ef1e18d29e2f0/aws-cpp-sdk-core/source/http/curl/CurlHandleContainer.cpp#L27

And AWS_LOGSTREAM_INFO is here: https://github.com/aws/aws-sdk-cpp/blob/1fb97256a2ae7211a741fda0033ef1e18d29e2f0/aws-cpp-sdk-core/include/aws/core/utils/logging/LogMacros.h#L159-L168

It seems that Aws::Utils::Logging::GetLogSystem() returns a destroyed object in the context. Note that this is called in exit() (#33 0x00007fd53f209a60 in exit () from /usr/lib/x86_64-linux-gnu/libc.so.6). So object destroyed order will be undefined.

Can we call RegionResolver::ResetDefaultInstance() and Aws::ShutdownAPI() from arrow::FinalizeS3() again? For example:

diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index c3a6eb0ea..886405b52 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -2608,9 +2608,12 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource {
         ARROW_LOG(WARNING)
             << " arrow::fs::FinalizeS3 was not called even though S3 was initialized.  "
                "This could lead to a segmentation fault at exit";
-        RegionResolver::ResetDefaultInstance();
-        Aws::ShutdownAPI(aws_options_);
       }
+      // Don't let S3 be shutdown until all Arrow threads are done using it
+      ARROW_UNUSED(arrow::internal::GetCpuThreadPool()->Shutdown());
+      ARROW_UNUSED(io::internal::GetIOThreadPool()->Shutdown());
+      RegionResolver::ResetDefaultInstance();
+      Aws::ShutdownAPI(aws_options_);
     }
   }

@@ -2670,16 +2673,8 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource {
   std::atomic<bool> is_finalized_;
 };

-std::shared_ptr<AwsInstance> CreateAwsInstance() {
-  auto instance = std::make_shared<AwsInstance>();
-  // Don't let S3 be shutdown until all Arrow threads are done using it
-  arrow::internal::GetCpuThreadPool()->KeepAlive(instance);
-  io::internal::GetIOThreadPool()->KeepAlive(instance);
-  return instance;
-}
-
 AwsInstance& GetAwsInstance() {
-  static auto instance = CreateAwsInstance();
+  static auto instance = std::make_shared<AwsInstance>();
   return *instance;
 }

I can avoid the crash on my environment by this patch.

@westonpace What do you think about this problem?

Component(s)

C++

pitrou commented 1 year ago

I agree that S3 shutdown should happen earlier than on final exit (otherwise an explicit shutdown wouldn't be needed at all).

I am unsure about the proposed solution, though.

pitrou commented 1 year ago

A potential solution is to:

But I don't know if that would work reliably in a multi-threaded context. Edit: C++17 has a shared-exclusive mutex that would be used for that: grab the mutex in shared mode for all S3 client calls, and in exclusive mode in FinalizeS3.

kou commented 1 year ago

@westonpace Do you have any idea for this problem?

westonpace commented 1 year ago

@pitrou and I had some discussion of this ticket.

As @pitrou mentioned, we are currently calling finalize too late. We can't wait for the thread pools to shutdown because that happens at program exit and by then it is too late to call AWS shutdown.

Edit: C++17 has a shared-exclusive mutex that would be used for that: grab the mutex in shared mode for all S3 client calls, and in exclusive mode in FinalizeS3.

This approach sounds like the simplest solution.

kou commented 1 year ago

How about just using AwsInstance::is_finalized_ instead of introducing shared-exclusive mutex?

```diff diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index c3a6eb0ea..2971c4314 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -398,12 +398,19 @@ namespace { Status CheckS3Initialized() { if (!IsS3Initialized()) { return Status::Invalid( - "S3 subsystem not initialized; please call InitializeS3() " + "S3 subsystem is not initialized; please call InitializeS3() " "before carrying out any S3-related operation"); } return Status::OK(); } +Status CheckS3Finalized() { + if (IsS3Finalized()) { + return Status::Invalid("S3 subsystem is finalized"); + } + return Status::OK(); +} + // XXX Sanitize paths by removing leading slash? struct S3Path { @@ -1008,6 +1015,8 @@ class ObjectInputFile final : public io::RandomAccessFile { content_length_(size) {} Status Init() { + RETURN_NOT_OK(CheckS3Finalized()); + // Issue a HEAD Object to get the content-length and ensure any // errors (e.g. file not found) don't wait until the first Read() call. if (content_length_ != kNoSize) { @@ -1099,6 +1108,8 @@ class ObjectInputFile final : public io::RandomAccessFile { return 0; } + RETURN_NOT_OK(CheckS3Finalized()); + // Read the desired range of bytes ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result, GetObjectRange(client_.get(), path_, position, nbytes, out)); @@ -1182,6 +1193,8 @@ class ObjectOutputStream final : public io::OutputStream { } Status Init() { + RETURN_NOT_OK(CheckS3Finalized()); + // Initiate the multi-part upload S3Model::CreateMultipartUploadRequest req; req.SetBucket(ToAwsString(path_.bucket)); @@ -1217,6 +1230,8 @@ class ObjectOutputStream final : public io::OutputStream { return Status::OK(); } + RETURN_NOT_OK(CheckS3Finalized()); + S3Model::AbortMultipartUploadRequest req; req.SetBucket(ToAwsString(path_.bucket)); req.SetKey(ToAwsString(path_.key)); @@ -1245,6 +1260,8 @@ class ObjectOutputStream final : public io::OutputStream { Future<> CloseAsync() override { if (closed_) return Status::OK(); + RETURN_NOT_OK(CheckS3Finalized()); + if (current_part_) { // Upload last part RETURN_NOT_OK(CommitCurrentPart()); @@ -1307,6 +1324,8 @@ class ObjectOutputStream final : public io::OutputStream { return Status::Invalid("Operation on closed stream"); } + RETURN_NOT_OK(CheckS3Finalized()); + const int8_t* data_ptr = reinterpret_cast(data); auto advance_ptr = [&data_ptr, &nbytes](const int64_t offset) { data_ptr += offset; @@ -1359,6 +1378,7 @@ class ObjectOutputStream final : public io::OutputStream { if (closed_) { return Status::Invalid("Operation on closed stream"); } + RETURN_NOT_OK(CheckS3Finalized()); // Wait for background writes to finish std::unique_lock lock(upload_state_->mutex); return upload_state_->pending_parts_completed; @@ -1367,6 +1387,7 @@ class ObjectOutputStream final : public io::OutputStream { // Upload-related helpers Status CommitCurrentPart() { + RETURN_NOT_OK(CheckS3Finalized()); ARROW_ASSIGN_OR_RAISE(auto buf, current_part_->Finish()); current_part_.reset(); current_part_size_ = 0; @@ -1379,6 +1400,8 @@ class ObjectOutputStream final : public io::OutputStream { Status UploadPart(const void* data, int64_t nbytes, std::shared_ptr owned_buffer = nullptr) { + RETURN_NOT_OK(CheckS3Finalized()); + S3Model::UploadPartRequest req; req.SetBucket(ToAwsString(path_.bucket)); req.SetKey(ToAwsString(path_.key)); @@ -1574,6 +1597,8 @@ struct TreeWalker : public std::enable_shared_from_this { S3Model::ListObjectsV2Request req; Status operator()(const Result& result) { + RETURN_NOT_OK(CheckS3Finalized()); + // Serialize calls to operation-specific handlers if (!walker->ok()) { // Early exit: avoid executing handlers if DoWalk() returned @@ -1692,6 +1717,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this BucketExists(const std::string& bucket) { + RETURN_NOT_OK(CheckS3Finalized()); + S3Model::HeadBucketRequest req; req.SetBucket(ToAwsString(bucket)); @@ -1709,6 +1736,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this IsEmptyDirectory( const std::string& bucket, const std::string& key, const S3Model::HeadObjectOutcome* previous_outcome = nullptr) { + RETURN_NOT_OK(CheckS3Finalized()); + if (previous_outcome) { // Fetch the backend from the previous error DCHECK(!previous_outcome->IsSuccess()); @@ -1850,6 +1885,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this IsNonEmptyDirectory(const S3Path& path) { + RETURN_NOT_OK(CheckS3Finalized()); S3Model::ListObjectsV2Request req; req.SetBucket(ToAwsString(path.bucket)); req.SetPrefix(ToAwsString(path.key) + kSep); @@ -1939,6 +1975,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this* out) { + RETURN_NOT_OK(CheckS3Finalized()); + FileInfoCollector collector(bucket, key, select); auto handle_error = [&](const AWSError& error) -> Status { @@ -2027,6 +2065,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this> WalkForDeleteDirAsync(const std::string& bucket, const std::string& key) { + RETURN_NOT_OK(CheckS3Finalized()); + auto state = std::make_shared(); auto handle_results = [state](const std::string& prefix, @@ -2064,6 +2104,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this DeleteObjectsAsync(const std::string& bucket, const std::vector& keys) { + RETURN_NOT_OK(CheckS3Finalized()); + struct DeleteCallback { const std::string bucket; @@ -2156,6 +2198,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this> ProcessListBuckets( const Aws::S3::Model::ListBucketsOutcome& outcome) { + RETURN_NOT_OK(CheckS3Finalized()); if (!outcome.IsSuccess()) { return ErrorToStatus(std::forward_as_tuple("When listing buckets: "), "ListBuckets", outcome.GetError()); @@ -2169,11 +2212,13 @@ class S3FileSystem::Impl : public std::enable_shared_from_this> ListBuckets() { + RETURN_NOT_OK(CheckS3Finalized()); auto outcome = client_->ListBuckets(); return ProcessListBuckets(outcome); } Future> ListBucketsAsync(io::IOContext ctx) { + RETURN_NOT_OK(CheckS3Finalized()); auto self = shared_from_this(); return DeferNotOk(SubmitIO(ctx, [self]() { return self->client_->ListBuckets(); })) // TODO(ARROW-12655) Change to Then(Impl::ProcessListBuckets) @@ -2187,6 +2232,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this(client_, fs->io_context(), path); RETURN_NOT_OK(ptr->Init()); @@ -2205,6 +2251,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this(client_, fs->io_context(), path, info.size()); @@ -2223,6 +2270,7 @@ S3FileSystem::~S3FileSystem() {} Result> S3FileSystem::Make( const S3Options& options, const io::IOContext& io_context) { RETURN_NOT_OK(CheckS3Initialized()); + RETURN_NOT_OK(CheckS3Finalized()); std::shared_ptr ptr(new S3FileSystem(options, io_context)); RETURN_NOT_OK(ptr->impl_->Init()); @@ -2250,6 +2298,8 @@ S3Options S3FileSystem::options() const { return impl_->options(); } std::string S3FileSystem::region() const { return impl_->region(); } Result S3FileSystem::GetFileInfo(const std::string& s) { + RETURN_NOT_OK(CheckS3Finalized()); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); FileInfo info; info.set_path(s); @@ -2313,6 +2363,8 @@ Result S3FileSystem::GetFileInfo(const std::string& s) { } Result S3FileSystem::GetFileInfo(const FileSelector& select) { + RETURN_NOT_OK(CheckS3Finalized()); + ARROW_ASSIGN_OR_RAISE(auto base_path, S3Path::FromString(select.base_dir)); FileInfoVector results; @@ -2383,6 +2435,8 @@ FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector& select) } Status S3FileSystem::CreateDir(const std::string& s, bool recursive) { + RETURN_NOT_OK(CheckS3Finalized()); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); if (path.key.empty()) { @@ -2426,6 +2480,8 @@ Status S3FileSystem::CreateDir(const std::string& s, bool recursive) { } Status S3FileSystem::DeleteDir(const std::string& s) { + RETURN_NOT_OK(CheckS3Finalized()); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); if (path.empty()) { @@ -2455,6 +2511,8 @@ Status S3FileSystem::DeleteDirContents(const std::string& s, bool missing_dir_ok } Future<> S3FileSystem::DeleteDirContentsAsync(const std::string& s, bool missing_dir_ok) { + RETURN_NOT_OK(CheckS3Finalized()); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); if (path.empty()) { @@ -2480,6 +2538,8 @@ Status S3FileSystem::DeleteRootDirContents() { } Status S3FileSystem::DeleteFile(const std::string& s) { + RETURN_NOT_OK(CheckS3Finalized()); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); RETURN_NOT_OK(ValidateFilePath(path)); @@ -2506,6 +2566,8 @@ Status S3FileSystem::DeleteFile(const std::string& s) { } Status S3FileSystem::Move(const std::string& src, const std::string& dest) { + RETURN_NOT_OK(CheckS3Finalized()); + // XXX We don't implement moving directories as it would be too expensive: // one must copy all directory contents one by one (including object data), // then delete the original contents. @@ -2525,6 +2587,8 @@ Status S3FileSystem::Move(const std::string& src, const std::string& dest) { } Status S3FileSystem::CopyFile(const std::string& src, const std::string& dest) { + RETURN_NOT_OK(CheckS3Finalized()); + ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src)); RETURN_NOT_OK(ValidateFilePath(src_path)); ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest)); @@ -2562,6 +2626,8 @@ Result> S3FileSystem::OpenOutputStream( ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); RETURN_NOT_OK(ValidateFilePath(path)); + RETURN_NOT_OK(CheckS3Finalized()); + auto ptr = std::make_shared(impl_->client_, io_context(), path, impl_->options(), metadata); RETURN_NOT_OK(ptr->Init()); @@ -2600,6 +2666,8 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource { bool IsInitialized() { return !is_finalized_ && is_initialized_; } + bool IsFinalized() { return is_finalized_; } + void Finalize(bool from_destructor = false) { bool expected = true; is_finalized_.store(true); @@ -2608,9 +2676,9 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource { ARROW_LOG(WARNING) << " arrow::fs::FinalizeS3 was not called even though S3 was initialized. " "This could lead to a segmentation fault at exit"; - RegionResolver::ResetDefaultInstance(); - Aws::ShutdownAPI(aws_options_); } + RegionResolver::ResetDefaultInstance(); + Aws::ShutdownAPI(aws_options_); } } @@ -2672,9 +2740,6 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource { std::shared_ptr CreateAwsInstance() { auto instance = std::make_shared(); - // Don't let S3 be shutdown until all Arrow threads are done using it - arrow::internal::GetCpuThreadPool()->KeepAlive(instance); - io::internal::GetIOThreadPool()->KeepAlive(instance); return instance; } @@ -2713,6 +2778,8 @@ Status EnsureS3Finalized() { return FinalizeS3(); } bool IsS3Initialized() { return GetAwsInstance().IsInitialized(); } +bool IsS3Finalized() { return GetAwsInstance().IsFinalized(); } + // ----------------------------------------------------------------------- // Top-level utility functions ```
kou commented 1 year ago

36437 is for the AwsInstance::is_finalized_ approach.