This repository is for active development of the Azure SDK for Python. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/python/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-python.
MIT License
4.61k
stars
2.82k
forks
source link
Optimizing ContainerClient Usage and Memory Management in Azure Blob Storage Operations for a FastAPI Application on Kubernetes. #38332
Background:
When an API is called, we create and simultaneously execute multiple asynchronous tasks. Each task performs operations such as downloading and uploading various files to Azure Storage by calling functions like download_blobs_by_pattern and upload_files_to_azure(refer to "Related codes" section)
After completing all tasks, when we output the memory status using tracemalloc, it shows that aiohttp is using a significant amount of memory.
e.g., .venv/lib/python3.12/site-packages/aiohttp/connector.py:1098: size=5447 KiB, count=482, average=11.3 KiB
Question:
We have the following questions:
1. In all functions related to Azure Storage, we are creating ContainerClient objects using async with. Is there anything we should change in this approach?
e.g.,
async with ContainerClient.from_connection_string(azure_connection_string, azure_container_name) as container_client:
.....file upload & download to azure storage....
2. Is it possible and recommended to create only one ContainerClient object when starting the Python app, and then share this ContainerClient object across all tasks that perform file uploads and downloads to Azure Storage every time an API is called?
Additionally, in this case, we're curious if there would be any issues with continuing to use the ContainerClient object even if the network connection is temporarily lost and then reconnected.
Related codes:
@classmethod
async def upload_files_to_azure(
cls, azure_connection_string: str, azure_container_name: str, azure_dir_path: str, local_file_paths: list
):
try:
async with **ContainerClient.from_connection_string**(azure_connection_string, azure_container_name) as container_client:
if not await container_client.exists():
await container_client.create_container()
total_files = len(local_file_paths)
success_count = 0
failed_count = 0
successed_files = []
failed_files = []
blob_files = []
async def upload_file(file_path):
nonlocal success_count, failed_count
file_name = os.path.basename(file_path)
blob_path = os.path.join(azure_dir_path, file_name).replace("\\", "/")
# blob_client = container_client.get_blob_client(blob_path)
try:
async with aiofiles.open(file_path, "rb") as file:
if not await container_client.upload_blob(blob_path, file, length=os.path.getsize(file_path), overwrite=True):
raise Exception("Upload failed")
success_count += 1
successed_files.append(file_path)
blob_files.append(blob_path)
except Exception as e:
logger.info(f"Error uploading {file_name}: {str(e)}")
failed_count += 1
# failed_files.append((file_name, str(e)))
failed_files.append((file_path, str(e)))
tasks = [upload_file(file_path) for file_path in local_file_paths]
await asyncio.gather(*tasks)
return total_files, success_count, successed_files, failed_count, failed_files, blob_files
@classmethod
async def download_blobs_by_pattern(
cls,
azure_connection_string: str,
azure_container_name: str,
directory_path: str,
file_pattern: str,
local_directory: str,
half_download: bool = False,
) -> Optional[List[str]]:
downloaded_files = []
try:
async with **ContainerClient.from_connection_string**(azure_connection_string, azure_container_name) as container_client:
full_prefix = os.path.join(directory_path, file_pattern).replace("\\", "/")
os.makedirs(local_directory, exist_ok=True)
blob_list = [blob async for blob in container_client.list_blobs(name_starts_with=full_prefix)]
def get_page_number(blob):
match = re.search(r"page_(\d+)", blob.name)
return int(match.group(1)) if match else 0
sorted_blob_list = sorted(blob_list, key=get_page_number)
total_count = len(sorted_blob_list) if not half_download else len(sorted_blob_list) // 2
current_index = 0
for current_index, blob in enumerate(sorted_blob_list):
if half_download and current_index >= total_count:
break
local_file_path = os.path.join(local_directory, os.path.basename(blob.name))
success, _ = await cls.download_blob(container_client, blob.name, local_file_path)
if not success:
logger.error(f"Failed to download: {blob.name}")
return None
downloaded_files.append(local_file_path)
logger.info(f"Downloaded: {local_file_path}")
current_index += 1
return downloaded_files
except Exception as e:
logger.error(f"Error occurred while downloading blobs: {str(e)}")
return **None**
@classmethod
async def download_blob(cls, container_client, azure_blob_path, local_file_path):
try:
os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
logger.info(f"Downloading blob to {local_file_path}")
stream_downloader = await container_client.download_blob(azure_blob_path)
async with aiofiles.open(local_file_path, "wb") as file:
async for chunk in stream_downloader.chunks():
await file.write(chunk)
logger.info(f"Successfully downloaded: {azure_blob_path}")
return True, (azure_blob_path, "")
except IOError as e:
logger.error(f"IO Error occurred while downloading {azure_blob_path}: {str(e)}")
return False, (azure_blob_path, str(e))
except AzureError as e:
logger.error(f"Azure Error occurred while downloading {azure_blob_path}: {str(e)}")
return False, (azure_blob_path, str(e))
except Exception as e:
logger.error(f"Unexpected error occurred while downloading {azure_blob_path}: {str(e)}")
return False, (azure_blob_path, str(e))
Environment: Execution Environment: Kubernetes Container (Linux) Python: 3.12.3 Python Framework: FastAPI Azure SDK: azure-storage-blob==12.22.0
Background: When an API is called, we create and simultaneously execute multiple asynchronous tasks. Each task performs operations such as downloading and uploading various files to Azure Storage by calling functions like download_blobs_by_pattern and upload_files_to_azure(refer to "Related codes" section) After completing all tasks, when we output the memory status using tracemalloc, it shows that aiohttp is using a significant amount of memory. e.g., .venv/lib/python3.12/site-packages/aiohttp/connector.py:1098: size=5447 KiB, count=482, average=11.3 KiB
Question: We have the following questions: 1. In all functions related to Azure Storage, we are creating ContainerClient objects using async with. Is there anything we should change in this approach? e.g., async with ContainerClient.from_connection_string(azure_connection_string, azure_container_name) as container_client: .....file upload & download to azure storage.... 2. Is it possible and recommended to create only one ContainerClient object when starting the Python app, and then share this ContainerClient object across all tasks that perform file uploads and downloads to Azure Storage every time an API is called? Additionally, in this case, we're curious if there would be any issues with continuing to use the ContainerClient object even if the network connection is temporarily lost and then reconnected.
Related codes: