veeso / suppaftp

a super FTP/FTPS client library for Rust with support for both passive and active mode
Apache License 2.0
121 stars 31 forks source link

[QUESTION] - Potential timeout? Advice on correct usage. #71

Closed moranbw closed 4 months ago

moranbw commented 9 months ago

Hello, thanks for the great crate!

I am hoping you can help me with a strange issue some of my users are experiencing. I am essentially just looping through a list of files to download. Some users have reported that the download will begin, but eventually will stall out and not continue. In the attached logs, you can see the downloads start, but the user reported waiting for about 15 minutes with no response (the files are not large, and it is all on a LAN, so should not be network issue). I am doing so in batches of 10 via tokio::sync::Semaphore, but even when I was just looping through one by one, this issue seemed to pop up here and there.

Am I doing anything wrong in my code? At one point I was using the standard ftp_stream.retr_as_buffer method, but I switched to ftp_stream.retr_as_stream to see if that would help, but it does not seem to have made a difference. As you can tell from the logs, I never make it out of ap_ftp_copy by Ok or Err.

Any advice is welcome!

Code

//////////////////////////////////////////////////////////////////
        info!("Copying POS from AP...");
        progress_bar.set_message(format!("{} (Copying POS from AP...)", date_string.clone()));
        let pos_ap_path = Path::new(&parent_path).join("pos_ap");

        let permits = Arc::new(Semaphore::new(10));
        let futures = date.pos.ap.clone().into_iter().map(|pos| {
            let permits = Arc::clone(&permits);
            let pos_ap_path_clone = pos_ap_path.clone();
            let pos_clone: String = pos.clone();
            async move {
                let permit = match permits.acquire().await {
                    Ok(permit) => permit,
                    Err(e) => {
                        return Err(CommandError {
                            message: format!(
                                "Could not acquire permit to copy {}",
                                pos_clone.clone()
                            ),
                            cause: e.to_string(),
                        })
                    }
                };
                info!("[pos_ap] Copying {}", pos_clone);
                match ap_ftp_copy(pos_clone.as_str(), &pos_ap_path_clone).await {
                    Ok(message) => {                
                        info!("[pos_ap] Successfully copied {}", pos_clone);
                        drop(permit);
                        progress_bar.inc(1);
                        Ok(message)
                    }
                    Err(e) => return Err(e),
                }
            }
        });

        let results = future::join_all(futures).await;
        for result in results {
            match result {
                Ok(message) => debug!("{}", message),
                Err(e) => return Err(e),
            }
        }
//////////////////////////////////////////////////////////////////

pub async fn ap_ftp_copy(file: &str, dest: &PathBuf) -> Result<String, CommandError> {
    if async_std::fs::metadata(&dest).await.is_err() {
        debug!(" mkdir: {:?}", dest);
        match async_std::fs::create_dir_all(&dest).await {
            Ok(()) => (),
            Err(e) => {
                return Err(CommandError {
                    message: format!("Problem creating dirs for {:?}", dest),
                    cause: e.to_string(),
                })
            }
        };
    }

    let mut new_file = match async_std::fs::OpenOptions::new()
        .write(true)
        .create(true)
        .truncate(true)
        .open(dest.join(file))
        .await
    {
        Ok(new_file) => new_file,
        Err(e) => {
            return Err(CommandError {
                message: format!("Problem creating file for {}", file),
                cause: e.to_string(),
            })
        }
    };

    let mut ftp_stream = match ap_ftp::connect_async().await {
        Ok(ftp_stream_async) => ftp_stream_async,
        Err(e) => return Err(e),
    };

    let mut file_stream = match ftp_stream.retr_as_stream(file).await {
        Ok(file_stream) => file_stream,
        Err(e) => {
            return Err(CommandError {
                message: format!("Problem creating stream for file {}", file),
                cause: e.to_string(),
            })
        }
    };

    let copy_response = match async_std::io::copy(&mut file_stream, &mut new_file).await {
        Ok(_bytes) => {
            format!("Successfully copied file: {}", file)
        }
        Err(e) => {
            return Err(CommandError {
                message: format!("Problem writing file {} from stream", file),
                cause: e.to_string(),
            })
        }
    };

    match ftp_stream.finalize_retr_stream(file_stream).await {
        Ok(()) => (),
        Err(e) => {
            return Err(CommandError {
                message: format!("Problem finalizing stream for file {}", file),
                cause: e.to_string(),
            })
        }
    }

    match ftp_stream.quit().await {
        Ok(()) => Ok(copy_response),
        Err(e) => {
            warn!("Could not close FTP session in discover: {}", e.to_string());
            return Ok(copy_response);
        }
    }
}

Log

ap_ftp.log