durch / rust-s3

Rust library for interfacing with S3 API compatible services
MIT License
498 stars 195 forks source link

Upload and download with Axum, streaming it #319

Closed frederikhors closed 1 year ago

frederikhors commented 1 year ago

I'm trying to write a small axum router that uploads and downloads files.

I would like to stream those files directly to the cloud, but I don't know what to use for streaming.

In Go (my primarily language) I'm using http.ResponseWriter with io.Copy.

But in Rust I'm having real difficulties to understand what to use, this is the code:

[dependencies] axum = { version = "0.5.17", features = ["multipart"] } futures = "0.3.25" tokio = { version = "1.21.2", features = ["full"] } tokio-util = { version = "0.7.4", features = ["io"] } rust-s3 = "0.32.3"


- main.rs:
```rust
use axum::{
    body::{boxed, StreamBody},
    extract::{Multipart, Query},
    http::StatusCode,
    response::Response,
    routing::{on, MethodFilter},
    Router,
};
use futures::TryStreamExt;
use s3::{creds::Credentials, Bucket};
use std::{io, net::SocketAddr, pin::Pin};
use tokio::io::{AsyncRead, AsyncWrite, BufWriter};
use tokio_util::io::StreamReader;

#[tokio::main]
async fn main() {
    let router = Router::new()
        .route("/upload", on(MethodFilter::POST, upload))
        .route("/download/*key", on(MethodFilter::GET, download));

    let addr = SocketAddr::from(([127, 0, 0, 1], 8080));

    axum::Server::bind(&addr)
        .serve(router.into_make_service())
        .await
        .unwrap();
}

pub async fn upload(mut multipart: Multipart) -> Result<Response, StatusCode> {
    while let Some(field) = multipart.next_field().await.unwrap() {
        let filename = if let Some(filename) = field.file_name() {
            filename.to_string()
        } else {
            continue;
        };

        let bucket = Bucket::new(
            "test",
            "us-east-1".parse().unwrap(),
            Credentials::default().unwrap(),
        )
        .unwrap();

        let body_with_io_error = field.map_err(|err| io::Error::new(io::ErrorKind::Other, err));

        let body_reader = StreamReader::new(body_with_io_error);

        futures::pin_mut!(body_reader);

        put_file(bucket, &filename, body_reader);

        return Ok(Response::builder()
            .status(StatusCode::CREATED)
            .body(boxed("OK".to_string()))
            .unwrap());
    }

    Err(StatusCode::INTERNAL_SERVER_ERROR)
}

async fn put_file(
    bucket: Bucket,
    filename: &str,
    mut reader: Pin<&mut (dyn AsyncRead + Send)>,
) -> Result<(), ()> {
    bucket
        .put_object_stream(&mut reader, filename)
        .await
        .unwrap();

    Ok(())
}

pub async fn download(Query(params): Query<Vec<(String, String)>>) -> Result<Response, StatusCode> {
    let filename = params[0].1.to_string();

    let bucket = Bucket::new(
        "test",
        "us-east-1".parse().unwrap(),
        Credentials::default().unwrap(),
    )
    .unwrap();

    // I DON'T KNOW HOW TO START HERE! HELP!!! :smile:
    // What should I use here?

    let writer = BufWriter::new();
    // let writer = ReaderStream::new(reader);
    // futures::pin_mut!(writer);

    get_file(bucket, &filename, writer).await.unwrap();

    let response = Response::builder()
        .body(boxed(StreamBody::new(writer)))
        .unwrap();

    Ok(response)
}

async fn get_file(
    bucket: Bucket,
    filename: &str,
    mut writer: Pin<&mut (dyn AsyncWrite + Send)>,
) -> Result<(), ()> {
    bucket.get_object_stream(filename, &mut writer).await?;

    Ok(())
}

QUESTIONS

  1. The upload function works but I don't know what these lines mean:

    let body_with_io_error = field.map_err(|err| io::Error::new(io::ErrorKind::Other, err));
    let body_reader = StreamReader::new(body_with_io_error);
    futures::pin_mut!(body_reader);
  2. The download function doesn't work because I don't know what to use, how to create the writer that get_object_stream() needs.


I can PR examples and docs, as soon as I figure out how to do it, with your help.

Thanks!

s3rius commented 1 year ago

Hi! Just see your question. I had almost the same problem. Solved it this way: link to my project

I created a GetObject command by hand, got a response and just redirected stream to my response body.

let key = self.get_s3_key(file_info);
let command = Command::GetObject;
let s3_request = Reqwest::new(&self.bucket, &key, command);
let s3_response = s3_request.response().await?;
let mut response = HttpResponseBuilder::new(actix_web::http::StatusCode::OK);
Ok(response.streaming(s3_response.bytes_stream()))

s3_response.bytes_stream() returns Stream<Item=Result<Bytes, Err>> which fits to actix's response.streaming, I guess axum has almost the same signature.

Hope it helps.

frederikhors commented 1 year ago

Thanks. I think I'll close this because I'm not using rust-s3 anymore. Thanks again!