spring-projects / spring-integration

Spring Integration provides an extension of the Spring programming model to support the well-known Enterprise Integration Patterns (EIP)
http://projects.spring.io/spring-integration/
Apache License 2.0
1.54k stars 1.11k forks source link

Add ability to limit connections in FtpSessionFactory [INT-2146] #6128

Closed spring-operator closed 11 years ago

spring-operator commented 13 years ago

Ivan Lagunov opened INT-2146 and commented

The destination FTP server (set up by ftp:outbound-channel-adapter) has a limit for concurrent FTP connections. I use task:executor to parallelize file transfers as I have a router and a number of different destination servers. Thus, it's unacceptable for me to limit concurrent connections on task:executor. As a result, I'm getting the following exceptions:

Caused by: org.apache.commons.net.ftp.FTPConnectionClosedException: FTP response 421 received. Server closed connection.

It'd be great to have maximal concurrent ftp connections limit on the side of FtpSessionFactory.


Affects: 2.0.5

Issue Links:

1 votes, 1 watchers

spring-operator commented 13 years ago

Mark Fisher commented

This should most likely be passed through to the CachingSessionFactory. However, after a quick look there, I noticed that 'maxPoolSize' is not actually restricting the number of Sessions made available to the application. It will always create a new Session, but only stores up to 'maxPoolSize' within the Queue.

spring-operator commented 13 years ago

Mark Fisher commented

Enabling a timeout on the queue.poll() call used in CachingSessionFactory might be a good first step for this. It would also require that an Exception be thrown if no Session is available when that call finally returns. That would probably need to be enabled by a boolean flag though since the current implementation does not throw any Exceptions. The current behavior would essentially be "0" timeout and "allow-sessions-to-be-created-when-pool-is-exhausted" equal to TRUE. Obviously we need a better name for the boolean flag, but that's the general idea.

spring-operator commented 13 years ago

Oleg Zhurakousky commented

On the other note I am thinking that even with the timeout it would still have the same problem, only deferred till time-out expires. Let me try a different approach. If I know that my server has a limit of concurrent connection set to 10, then I can never have more then 10 concurrent downloads or uploads right? If so, than shouldn't the task-executor be tuned up to have no more then 10 active threads for this case?

spring-operator commented 13 years ago

Mark Fisher commented

If the behavior is not going to change, the name "maxPoolSize" probably needs to change since it is not really setting a "max", and I think the word "cache" should be used instead of "pool" here. The corresponding property from CachingConnectionFactory in Spring's JMS support would be "sessionCacheSize". That is a good alternative since it clearly states that your cache will have a certain size. If we create more Sessions than that, fine, but they won't be cached.

Other than that change, I do think we could consider documenting the ability to limit at the TaskExecutor level instead of in the FtpSessionFactory.

spring-operator commented 13 years ago

Mark Fisher commented

Moving to 2.1 RC1. Hopefully, this is just a documentation issue as discussed above.

spring-operator commented 13 years ago

Sylvain Mougenot commented

Good think to correct the documentation issue. But the real point is that the use case is realy common (I mean have a limited number of concurrent FTP connections). So is there a plan to have a "PoolingSessionFactory"? Or is it already existing and I don't know it?

And it's not surely a good way to rely on the number of thread using the task-executor configuration. Why? Because some of the logic involved may use different outputs or providers (ex: different FTP).

So it may be usefull to have this limitation done in a provider (FTP) point of vue rather than a processor point of view. This looks also realy easier to set up (do not mix concerns between message processsing and FTP configuration).

Anyway, I may have to code it. So do you have an advice?

spring-operator commented 13 years ago

Oleg Zhurakousky commented

Interesting

I guess in the end its really not about min/max, but about simply limiting connections. For now you can probably still utilize task-executor, but we'll make sure that this issue makes it to RC1. I'll alos rephrase it a little

spring-operator commented 13 years ago

Gary Russell commented

I just independently implemented a simple connection pool for tcp in my sandbox (#5859) - although it's less capable, in that it doesn't support over-committing the pool, but it does block (with timeout) if the pool is exhausted.

We should consider a general mechanism that can be reused across endpoints.

spring-operator commented 13 years ago

Oleg Zhurakousky commented

I was able to reproduce the issue so here are my thoughts. Limiting connections is easy. The real question what to do when a thread is requesting a connection which is above the limit threshold? If we throw exception than result would be identical to the behavior reported by Ivan (just different exception) I think the intent was to only operate within the limits of what is available which means if connection is not available i think the intent is to wait, but not wait indefinitely which goes along with what Mark suggested originally time-to-wait-for-connection-to-become-available attribute. . . we obviously need a better name ;)

So that is what I am going to do.

Ivan/Sylvain, please comment if you had something else in mind.

spring-operator commented 13 years ago

Gary Russell commented

That's exactly what I did for TCP in my sandbox here...

https://github.com/garyrussell/sandbox/blob/master/spring-integration-ip-addons/src/main/java/org/springframework/integration/ip/addons/CachingTcpConnectionFactory.java

private volatile int poolSize = 2;
...
private volatile int availableTimeout = -1;
...
        if (connection == null) {
            long now = System.currentTimeMillis();
            int timeout = this.availableTimeout;
            while (connection == null) {
                if (timeout > 0) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Waiting for connection to become available");
                    }
                    connection = retrieveConnection(timeout);
                }
                timeout -= (System.currentTimeMillis() - now);
                if (connection == null && timeout <= 0) {
                    throw new MessagingException("No connections available");
                }
            }
        }
        return connection;
...

    protected TcpConnection retrieveConnection(int timeout) throws Exception {
        TcpConnection connection;
        if (timeout > 0) {
            connection = this.available.poll(timeout,
                TimeUnit.MILLISECONDS);
        } else {
            connection = this.available.poll();
        }
        if (connection == null) {
            return null;
        }
... (additional code to to check the connection is healthy; if not, returns null).

It would be nice if we could factor out this (or similar) code so we can reuse it in both (and maybe more) places.

spring-operator commented 13 years ago

Oleg Zhurakousky commented

Implemented and PR has been issued: https://github.com/SpringSource/spring-integration/pull/131

Here is an example of configuration that would limit connections (sessionCacheSize can be set as second constructor arg):

<bean id="ftpSessionFactory" class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
        <property name="host" value="localhost"/>
        <property name="port" value="22"/>
</bean>

<bean id="cachingSessionFactory" class="org.springframework.integration.file.remote.session.CachingSessionFactory">
    <constructor-arg ref="ftpSessionFactory"/>
        <constructor-arg value="10"/>
    <property name="sessionWaitTimeout" value="1000"/>
</bean>
<int-ftp:outbound-channel-adapter id="ftpOutbound"
                channel="ftpChannel" 
                session-factory="cachingSessionFactory"/>
spring-operator commented 13 years ago

Ivan Lagunov commented

Thanks for resolving the issue! Waiting for a stable 2.1 released.

Regards, Ivan

spring-operator commented 13 years ago

Oleg Zhurakousky commented

The 2.1.RC1 should be out shortly (few days) and 2.1.GA some time in November