sshnet / SSH.NET

SSH.NET is a Secure Shell (SSH) library for .NET, optimized for parallelism.
http://sshnet.github.io/SSH.NET/
MIT License
3.95k stars 931 forks source link

Sftpclient uploadfile concurrency issues #640

Open srini77 opened 4 years ago

srini77 commented 4 years ago

I have a singleton sftpclient. Multiple threads use this instance and call upload file method asynchronously(using begin upload file). I’m getting the following errors “Message type 52 is not valid in the current context” “The requested operation cannot be performed because there is a file transfer in progress”

jimmytricks commented 3 years ago

Having a similar issue, anyone able to shed any light on this? Cheers

zybexXL commented 3 years ago

I use multiple sftpClient instances, one per thread. I define a maximum number of simultaneous uploads/downloads, and create the corresponding number of threads to handle the transfer queue. Each thread has its own sftpClient, each of them connects independently to the SFTP server. Each of them can only upload/download one file at a time.

vpetkovic commented 2 years ago

@zybexXL do you mind sharing your code that does what you have described?

zybexXL commented 2 years ago

I can't share my production code as it's internal to my company and it's a few thousands of lines of code, with a bunch of extra features - queuing, prioritization, retries, resume, zip/extract, progress reports, events, etc.

There are many ways of implementing something like that. Here's a proof of concept for you, with 4 parallel uploads:

using System;
using System.IO;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using Renci.SshNet;

namespace SFTPthing
{
    class SftpUploader
    {
        SftpClient sftp;
        string _host; int _port; string _user; string _pass;

        public SftpUploader(string host, int port, string user, string pass)
        {
            _host = host;
            _port = port;
            _user = user;
            _pass = pass;
        }

        private bool Connect(bool reconnect = false)
        {
            try
            {
                if (!reconnect && sftp != null && sftp.IsConnected) return true;
                if (sftp != null)
                    sftp.Disconnect();
                sftp = new SftpClient(_host, _port, _user, _pass);
                sftp.Connect();
                return sftp.IsConnected;
            }
            catch { }
            Console.WriteLine("SFTP connection failed!");
            return false;
        }

        private void UploadQueue(ConcurrentQueue<string> fileQueue, string destFolder)
        {
            while (fileQueue.TryDequeue(out string path))
            {
                if (!Connect()) return;
                try
                {
                    using (FileStream file = File.OpenRead(path))
                    {
                        string dest = Path.Combine(destFolder, Path.GetFileName(path)).Replace('\\', '/');
                        Console.WriteLine($"Uploading: {path} to {dest}");
                        sftp.UploadFile(file, dest);
                        Console.WriteLine($"     Done: {path}");
                    }
                }
                catch { Console.WriteLine($"Uploader failed!"); }
            }
        }

        // upload a list of files using N parallel tasks/threads
        public static void UploadFiles(string[] files, string destFolder, string host, int port, string user, string pass, int maxThreads)
        {
            ConcurrentQueue<string> queue = new ConcurrentQueue<string>(files);
            List<Task> uploaders = new List<Task>();
            for (int i = 0; i < maxThreads; i++)
                uploaders.Add(Task.Run(() =>
                {
                    SftpUploader uploader = new SftpUploader(host, port, user, pass);
                    uploader.UploadQueue(queue, destFolder);
                }));

            Task.WaitAll(uploaders.ToArray());
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            var files = Directory.GetFiles("c:\\temp\\upload");
            SftpUploader.UploadFiles(files, "/", "server", 22, "username", "password", 4);
            Console.WriteLine($"finished");
        }
    }
}
vpetkovic commented 2 years ago

Much appreciated for quick reply and POC is all I needed to see what I was doing wrong using ConcurrentQueue and even Parallel.ForEach that kept giving me message type 52 error no matter what. I already see what I overlooked. Cheers! 🙏

mvcprogrammer commented 2 years ago

Don't forget to disconnect and dispose the Sftpclient:

using System.Collections.Concurrent;
using Renci.SshNet;

namespace SftpUpload;

internal static class SftpDestinationConfig
{
    public static string Host => "192.168.1.127";
    public static int Port => 2222;
    public static string UserName => "user123";
    public static string Password => "pass123";
    public static string Folder => "";
    public static int MaxThreads => 4;
}

internal class SftpUploader
{
    private SftpClient? _sftp;

    public static void UploadFiles(IEnumerable<string> files)
    {
        var queue = new ConcurrentQueue<string?>(files);
        var uploaders = new List<Task>();

        for (var numConnections = 0; numConnections < SftpDestinationConfig.MaxThreads; numConnections++)
            uploaders.Add(Task.Run(() =>
            {
                SftpUploader uploader = new();
                uploader.UploadQueue(queue, SftpDestinationConfig.Folder);
            }));

        Task.WaitAll(uploaders.ToArray());
    }

    private void UploadQueue(ConcurrentQueue<string?> fileQueue, string destFolder)
    {
        while (fileQueue.TryDequeue(out var path))
        {
            if (!Connect()) 
                return;

            try
            {
                if (path == null) continue;

                using var file = File.OpenRead(path);
                var dest = Path.Combine(destFolder, Path.GetFileName(path)).Replace('\\', '/');
                Console.WriteLine($"Uploading: {path} to {dest}");
                _sftp?.UploadFile(file, dest);
                Console.WriteLine($"Upload Complete: {path}");
            }
            catch (Exception e)
            {
                Console.WriteLine($"Uploader failed: {e.Message ?? "Unknown Error"}");
            }
        }

        _sftp?.Disconnect();
       _sftp?.Dispose();
    }

    private bool Connect()
    {
        try
        {
            if (_sftp is { IsConnected: true }) return true;

            _sftp = new SftpClient(
                SftpDestinationConfig.Host, 
                SftpDestinationConfig.Port, 
                SftpDestinationConfig.UserName, 
                SftpDestinationConfig.Password);

            _sftp.Connect();
            return _sftp.IsConnected;
        }
        catch
        {
            Console.WriteLine("SFTP upload connection failed.");
            return false;
        }
    }
}
okarpov commented 11 months ago

have got the same error executing WGET -O file.zip http://.... right after SSH connection esteblished in C# multi-Task environment. Some VPSs run fine and other give the error. Less simultaneous tasks are there less VPSs getting the error