harupy / mlflow

Open source platform for the machine learning lifecycle
https://mlflow.org
Apache License 2.0
0 stars 1 forks source link

a #74

Open harupy opened 1 year ago

harupy commented 1 year ago
sequenceDiagram
    participant Client
    participant Server
    participant RemoteStorage

    Client->>Server: Start multipart upload
    note over Server: Generate random ID (upload_id) and create directory with it
    Server->>Client: Return upload_id
    loop Upload Chunks
        Client->>Server: Upload chunk using upload ID
        note over Server: Save chunk with unique ID (chunk_id)
        Server->>Client: Return chunk_id
    end
    Client->>Server: Complete multipart upload using upload_id
    note over Server: Combine chunks into a file
    Server->>RemoteStorage: Send combined file to Remote Storage
harupy commented 1 year ago
from fastapi import FastAPI, UploadFile, File, HTTPException
from uuid import uuid4
import os

app = FastAPI()

@app.post("/mpu/{path}?uploads")
async def start_mpu(path: str):
    upload_id = str(uuid4())
    os.mkdir(upload_id)
    return {"upload_id": upload_id}

@app.put("/mpu/{path}")
async def upload_chunk(path: str, upload_id: str, file: UploadFile = File(...)):
    if not os.path.exists(upload_id):
        raise HTTPException(status_code=404, detail="Upload not found")

    chunk_id = str(uuid4())
    chunk_data = await file.read()
    with open(f"{upload_id}/{chunk_id}", 'wb') as f:
        f.write(chunk_data)

    return {"chunk_id": chunk_id}

@app.post("/mpu/{path}")
async def complete_mpu(path: str, upload_id: str):
    if not os.path.exists(upload_id):
        raise HTTPException(status_code=404, detail="Upload not found")

    combined_file_path = f"{upload_id}/combined"
    with open(combined_file_path, 'wb') as combined_file:
        for chunk_file_name in os.listdir(upload_id):
            if chunk_file_name != 'combined':
                with open(f"{upload_id}/{chunk_file_name}", 'rb') as chunk_file:
                    combined_file.write(chunk_file.read())

    # Here you would send the combined file to remote storage
    # For example, using boto3 for Amazon S3:
    # s3_client = boto3.client('s3')
    # s3_client.upload_file(combined_file_path, 'your_bucket', path)

    for chunk_file_name in os.listdir(upload_id):
        os.remove(f"{upload_id}/{chunk_file_name}")

    os.rmdir(upload_id)
    return {"message": "Upload completed"}

@app.delete("/mpu/{path}")
async def abort_mpu(path: str, upload_id: str):
    if not os.path.exists(upload_id):
        raise HTTPException(status_code=404, detail="Upload not found")

    for chunk_file_name in os.listdir(upload_id):
        os.remove(f"{upload_id}/{chunk_file_name}")

    os.rmdir(upload_id)
    return {"message": "Upload aborted"}
harupy commented 1 year ago
sequenceDiagram
    participant Client
    participant Server
    participant Remote Storage
    Client->>Server: Request with file size and path
    Server->>Remote Storage: Generate presigned URLs for MPU operations
    Remote Storage-->>Server: Presigned URLs
    Server-->>Client: Respond with presigned URLs
    Client->>Remote Storage: Use presigned URLs to initiate MPU
    Client->>Remote Storage: Upload chunks using presigned URLs
    Client->>Remote Storage: Complete MPU using presigned URL