Azure / azure-sdk-for-cpp

This repository is for active development of the Azure SDK for C++. For consumers of the SDK we recommend visiting our versioned developer docs at https://azure.github.io/azure-sdk-for-cpp.
MIT License
174 stars 125 forks source link

global state cleanup #5552

Open levongh opened 5 months ago

levongh commented 5 months ago

Hello, I am using the azure SDK for cpp in a multiprocessing context where in the main process as an initial step I am sending a couple of requests through azure SDK. and then In subprocesses I create a new blob client and use it in subprocesses. an in some random scenario the program crashes on m_cleanThread.join()

As I can see there is a m_cleanThread which takes care of g_curlConnectionPool cleanup task. Is there any common place where I can do pre_fork global state cleanup?

Setup (please complete the following information if applicable):

github-actions[bot] commented 5 months ago

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @EmmaZhu @Jinming-Hu @vinjiang.

ahsonkhan commented 5 months ago

Can you share a minimal, reproducible example (MRE) of an application which showcases the error you are hitting. That would be helpful in investigating the issue and provide guidance to your question.

levongh commented 4 months ago

@ahsonkhan Hoping it is enough to share as this is the core logic that is done. Note: I do not have any internal global states

below is provided the code where I create an AzureReader which is holding an internal client to send requests

#include <azure/core/http/http_status_code.hpp>
#include <azure/core/http/curl_transport.hpp>
#include <azure/storage/blobs.hpp>
#include <azure/identity.hpp>

std::unique_ptr<Azure::Storage::Blobs::BlobServiceClient> create_client(const std::string& blob_endpoint_)
{
    ::Azure::Storage::Blobs::BlobClientOptions client_options_
    Azure::Core::Http::CurlTransportOptions transport_opts;
    transport_opts.CAInfo = "/etc/ssl/cert.pem"

    transport_opts.SslVerifyPeer = true;

    std::make_shared<Azure::Core::Http::CurlTransport>(transport_opts);

    client_options_.Transport.Transport = std::make_shared<Azure::Core::Http::CurlTransport>(transport_opts);

    return std::make_unique<::Azure::Storage::Blobs::BlobServiceClient>(
        blob_endpoint_,
        client_options_);
}

class AzureReader
{
public:
    AzureReader(const std::string& container_name, const std::string& blob_endpoint_)
        : container_name_(container_name)
        , client_(create_client(create_client))
    {
    }

    std::vector<uint8_t> download(const std::string& path, std::pair<int64_t, int64_t> range)
    {
        ::Azure::Storage::Blobs::DownloadBlobOptions options;
        options.Range.Value().Offset = static_cast<int64_t>(range.first);
        options.Range.Value().Length = static_cast<int64_t>(range.second - range.first);
        client_->GetBlobContainerClient(config_.container_name_).Download(options).Value;

        auto length_returned = result.BodyStream->Length();
        std::vector<uint8_t> buffer(length_returned);
        result.BodyStream->ReadToCount(buffer.data(), length_returned);
        return buffer;
    }

private:
    std::string container_name_;
    std::unique_ptr<Azure::Storage::Blobs::BlobContainerClient> client_;
};

Also, I do have a pybind11 module where I expose AzureReader creation functionality and I just run random requests to container files

below is the Python side function calls

import mymodule
import multiprocessing
# multiprocessing.set_start_method("spawn", force=True)

mymodule.something()

def iterate(inp):
    mymodule.something()

if __name__ == "__main__":
    import os
    pool = multiprocessing.Pool(processes=1)

    a = list(zip([1]))

    pool.map_async(iterate, a)

    pool.close()
    pool.join()
levongh commented 3 months ago

Hi, pinging here to make sure the issue will not be closed by stale status.