spring-projects / spring-data-redis

Provides support to increase developer productivity in Java when using Redis, a key-value store. Uses familiar Spring concepts such as a template classes for core API usage and lightweight repository style data access.
https://spring.io/projects/spring-data-redis/
Apache License 2.0
1.77k stars 1.17k forks source link

AWS Serverless Causing PubSub Issues - can this be turned off in Spring Boots Connection Factory? #3027

Closed dreamstar-enterprises closed 1 month ago

dreamstar-enterprises commented 1 month ago

On AWS Redis Cluster this works,

but when migrate to AWS Redis Serverless, I keep getting PubSub errors, like psubscribe, punsubscribe, or io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@75ed1223 was either previously returned or does not belong to this connection provider

Is there a way of explicity turning pub sub off if its not supported by AWS Redis Serverless?

Here is my connection factory code so far (ignore the Redisson code, as I've migrated to lettuce)

/**
 * Establishes a Connection (factory) with Redis
 */
@Configuration
@EnableRedisRepositories(
    enableKeyspaceEvents = RedisKeyValueAdapter.EnableKeyspaceEvents.OFF,
    keyspaceNotificationsConfigParameter = ""
)
internal class RedisConnectionFactoryConfig(
    private val springDataProperties: SpringDataProperties,
    private val profileProperties: ProfileProperties
) {

    // REDDISSON
    /*
    @Bean
    fun redissonClient(): RedissonClient {
        val config = Config()

        if (profileProperties.active == "prod") {

            // Use cluster server mode configuration for production
            val clusterConfig = config.useClusterServers()
                .addNodeAddress(
                    "rediss://${springDataProperties.redis.host}:${springDataProperties.redis.port}"
                )
                .setScanInterval(2000) // Cluster state scan interval in milliseconds
                .setTimeout(60000) // Command timeout
                .setRetryAttempts(3) // Retry attempts for failed commands
                .setRetryInterval(1500) // 1.5 seconds between retry attempts
                .setConnectTimeout(60000) // 60 seconds
                .setMasterConnectionPoolSize(100)
                .setMasterConnectionMinimumIdleSize(10)
                .setIdleConnectionTimeout(10000)
                .setSubscriptionsPerConnection(5)
                .setSslEnableEndpointIdentification(true) // enable endpoint identification for SSL

            val redisPassword = springDataProperties.redis.password
            if (redisPassword.isNotBlank()) {
                clusterConfig.setPassword(redisPassword)
            }

            // Additional SSL configuration if needed
            clusterConfig
                .setSslTruststore(null) // Replace with actual truststore file path if necessary
                .setSslKeystore(null) // Replace with actual keystore file path if necessary
        } else {

            // Use single server mode configuration for non-production
            val singleServerConfig = config.useSingleServer()
                .setAddress("redis://${springDataProperties.redis.host}:${springDataProperties.redis.port}")
                .setTimeout(60000) // Command timeout
                .setRetryAttempts(3) // Retry attempts for failed commands
                .setRetryInterval(1500) // 1.5 seconds between retry attempts
                .setConnectTimeout(60000) // 60 seconds
                .setConnectionPoolSize(100)
                .setConnectionMinimumIdleSize(10)
                .setIdleConnectionTimeout(10000) // 10 seconds
                .setSubscriptionsPerConnection(5)
                .setSslEnableEndpointIdentification(false) // disable endpoint identification for SSL

            val redisPassword = springDataProperties.redis.password
            if (redisPassword.isNotBlank()) {
                singleServerConfig.setPassword(redisPassword)
            }

            // Additional SSL configuration if needed
            singleServerConfig
                .setSslTruststore(null) // Replace with actual truststore file if needed
                .setSslKeystore(null) // Replace with actual keystore file if needed
        }

        // Return the configured Redisson client
        return Redisson.create(config)
    }

    // reactive RedisConnectionFactory for key expiration event handling
    @Bean
    @Primary
    fun reactiveRedisConnectionFactory(redissonClient: RedissonClient): ReactiveRedisConnectionFactory {
        return RedissonConnectionFactory(redissonClient)
    }
    */

    // LETTUCE
    // reactive RedisConnectionFactory for key expiration event handling
    @Component
    internal class ClusterConfigurationProperties(
        springDataProperties: SpringDataProperties
    ) {
        /**
         * Get initial collection of known cluster nodes in format `host:port`.
         * @return
         */
        var nodes = listOf(
            "${springDataProperties.redis.host}:${springDataProperties.redis.port}",
        )
    }

    @Bean
    @Primary
    internal fun reactiveRedisConnectionFactory(
        clusterProperties: ClusterConfigurationProperties,
    ): ReactiveRedisConnectionFactory {

        // declare the config variable here
        val config: RedisConfiguration

        // determine configuration to use based on the environment
        if (profileProperties.active == "prod") {
            // configure Redis Cluster configuration for production
            val clusterConfig = RedisClusterConfiguration(clusterProperties.nodes)
            clusterConfig.setPassword(RedisPassword.of(springDataProperties.redis.password))
            config = clusterConfig
        } else {
            // configure Redis Standalone configuration for non-production
            val staticConfig = RedisStandaloneConfiguration()
            staticConfig.hostName = springDataProperties.redis.host
            staticConfig.port = springDataProperties.redis.port

            staticConfig.setPassword(RedisPassword.of(springDataProperties.redis.password))
            config = staticConfig
        }

        // create client options

        // Create SSL options if SSL is required
        val sslOptions = SslOptions.builder()
            .jdkSslProvider()  // Or use OpenSslProvider if you prefer
            .build()

        // Create timeout options
        val timeoutOptions = TimeoutOptions.builder()
            .fixedTimeout(Duration.ofSeconds(20))
            .timeoutCommands(true)
            .build()

        // cluster specific settings for optimal reliability.
        val clusterTopologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
            .enablePeriodicRefresh(Duration.ofSeconds(5))
            .dynamicRefreshSources(false)
            .closeStaleConnections(true)
            .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(5))
            .enableAllAdaptiveRefreshTriggers().build()

        // create socket options
        val socketOptions = SocketOptions.builder()
            .keepAlive(SocketOptions.DEFAULT_SO_KEEPALIVE)
            .tcpNoDelay(SocketOptions.DEFAULT_SO_NO_DELAY)
            // time to wait for connection to be established, before considering it as a failed connection
            .connectTimeout(Duration.ofSeconds(60))
            .build()

        val mappingFunction: (HostAndPort) -> HostAndPort = { hostAndPort ->
            val host = springDataProperties.redis.host
            val addresses: Array<InetAddress> = try {
                DnsResolvers.JVM_DEFAULT.resolve(host)
            } catch (e: UnknownHostException) {
                e.printStackTrace()
                emptyArray() // Handle error and return an empty array
            }

            val cacheIP = addresses.firstOrNull()?.hostAddress ?: ""
            var finalAddress = hostAndPort

            if (hostAndPort.hostText == cacheIP) {
                finalAddress = HostAndPort.of(host, hostAndPort.port)
            }

            finalAddress
        }

        val resolver = MappingSocketAddressResolver.create(DnsResolvers.JVM_DEFAULT, mappingFunction)

        // customize thread pool size
        val clientResources = DefaultClientResources.builder()
            .ioThreadPoolSize(8)
            .computationThreadPoolSize(8)
            .socketAddressResolver(resolver)
            .build()

        val clusterClientOptionsBuilder = ClusterClientOptions.builder()
            .autoReconnect(true)
            .pingBeforeActivateConnection(true)
            .timeoutOptions(timeoutOptions)
            .socketOptions(socketOptions)
            .topologyRefreshOptions(clusterTopologyRefreshOptions)
            .validateClusterNodeMembership(true)
            .suspendReconnectOnProtocolFailure(true)
            .disconnectedBehavior(DEFAULT_DISCONNECTED_BEHAVIOR)
            .decodeBufferPolicy(DecodeBufferPolicies.ratio(0.5F))
            .requestQueueSize(1000)
            .maxRedirects(DEFAULT_MAX_REDIRECTS)
            .publishOnScheduler(true) //DEFAULT_PUBLISH_ON_SCHEDULER.
            .protocolVersion(ProtocolVersion.RESP3) // Use RESP3 Protocol to ensure AUTH command is used for handshake.

        // conditionally use sslOptions if profileProperties.active is 'prod'
        if (profileProperties.active == "prod") {
            clusterClientOptionsBuilder.sslOptions(sslOptions)
        }

        // build the clientClusterOptions configuration
        val clusterClientOptions = clusterClientOptionsBuilder.build()

        // configure connection pool settings
        fun buildLettucePoolConfig(): GenericObjectPoolConfig<Any> {
            val poolConfig = GenericObjectPoolConfig<Any>()
            poolConfig.maxTotal = 100
            poolConfig.maxIdle = 50
            poolConfig.minIdle = 10
            poolConfig.setMaxWait(Duration.ofSeconds(120))
            poolConfig.timeBetweenEvictionRuns = Duration.ofSeconds(120)
            poolConfig.minEvictableIdleTime = Duration.ofMinutes(5)
            poolConfig.testOnBorrow = true
            poolConfig.testWhileIdle = true
            poolConfig.testOnReturn = true
            poolConfig.blockWhenExhausted = true
            poolConfig.lifo = true
            return poolConfig
        }

        // create Lettuce client configuration with authentication details
        val clientConfigBuilder = LettucePoolingClientConfiguration.builder()
            .readFrom(REPLICA_PREFERRED)
            // maximum time allowed for a Redis command to execute before the operation is considered timed out.
            .commandTimeout(Duration.ofSeconds(60))
            .clientResources(clientResources)
            .clientOptions(clusterClientOptions)
            .poolConfig(buildLettucePoolConfig())

        // conditionally enable SSL only if profileProperties.active is 'prod'
        if (profileProperties.active == "prod") {
            clientConfigBuilder.useSsl()
        }

        // build the clientConfig configuration
        val clientConfig = clientConfigBuilder.build()

        // create Lettuce connection factory
        return LettuceConnectionFactory(config, clientConfig).apply {
            afterPropertiesSet()
            validateConnection = false
            setShareNativeConnection(true)
        }
    }
}
mp911de commented 1 month ago

AWS Serverless Redis doesn't allow pub/sub. Using

@EnableRedisRepositories(
    enableKeyspaceEvents = RedisKeyValueAdapter.EnableKeyspaceEvents.OFF,
    keyspaceNotificationsConfigParameter = ""
)

,as you did, causes that Spring Data does not subscribe to any channels.

If any issues should persist, please attach the full stack trace.

dreamstar-enterprises commented 1 month ago

Hi Mark,

This is my point, I had the setting turned to off, see below, but on AWS Redis Serverless, I was still getting Pub/Sub errors With it turned off, there should be no errors right? I will re-test tonight and share he Serverless errors again, with the below connection factory. (Unless you can see / spot something already wrong) Will revert tonight with the specific AWS Redis Serverless errors I get.

:

/**
 * Establishes a Connection (factory) with Redis
 */
@Configuration
@EnableRedisRepositories(
    enableKeyspaceEvents = RedisKeyValueAdapter.EnableKeyspaceEvents.OFF,
    keyspaceNotificationsConfigParameter = ""
)
internal class RedisConnectionFactoryConfig(
    private val springDataProperties: SpringDataProperties,
    private val profileProperties: ProfileProperties
) {

    // REDDISSON
    /*
    @Bean
    fun redissonClient(): RedissonClient {
        val config = Config()

        if (profileProperties.active == "prod") {

            // Use cluster server mode configuration for production
            val clusterConfig = config.useClusterServers()
                .addNodeAddress(
                    "rediss://${springDataProperties.redis.host}:${springDataProperties.redis.port}"
                )
                .setScanInterval(2000) // Cluster state scan interval in milliseconds
                .setTimeout(60000) // Command timeout
                .setRetryAttempts(3) // Retry attempts for failed commands
                .setRetryInterval(1500) // 1.5 seconds between retry attempts
                .setConnectTimeout(60000) // 60 seconds
                .setMasterConnectionPoolSize(100)
                .setMasterConnectionMinimumIdleSize(10)
                .setIdleConnectionTimeout(10000)
                .setSubscriptionsPerConnection(5)
                .setSslEnableEndpointIdentification(true) // enable endpoint identification for SSL

            val redisPassword = springDataProperties.redis.password
            if (redisPassword.isNotBlank()) {
                clusterConfig.setPassword(redisPassword)
            }

            // Additional SSL configuration if needed
            clusterConfig
                .setSslTruststore(null) // Replace with actual truststore file path if necessary
                .setSslKeystore(null) // Replace with actual keystore file path if necessary
        } else {

            // Use single server mode configuration for non-production
            val singleServerConfig = config.useSingleServer()
                .setAddress("redis://${springDataProperties.redis.host}:${springDataProperties.redis.port}")
                .setTimeout(60000) // Command timeout
                .setRetryAttempts(3) // Retry attempts for failed commands
                .setRetryInterval(1500) // 1.5 seconds between retry attempts
                .setConnectTimeout(60000) // 60 seconds
                .setConnectionPoolSize(100)
                .setConnectionMinimumIdleSize(10)
                .setIdleConnectionTimeout(10000) // 10 seconds
                .setSubscriptionsPerConnection(5)
                .setSslEnableEndpointIdentification(false) // disable endpoint identification for SSL

            val redisPassword = springDataProperties.redis.password
            if (redisPassword.isNotBlank()) {
                singleServerConfig.setPassword(redisPassword)
            }

            // Additional SSL configuration if needed
            singleServerConfig
                .setSslTruststore(null) // Replace with actual truststore file if needed
                .setSslKeystore(null) // Replace with actual keystore file if needed
        }

        // Return the configured Redisson client
        return Redisson.create(config)
    }

    // reactive RedisConnectionFactory for key expiration event handling
    @Bean
    @Primary
    fun reactiveRedisConnectionFactory(redissonClient: RedissonClient): ReactiveRedisConnectionFactory {
        return RedissonConnectionFactory(redissonClient)
    }
    */

    // LETTUCE
    // reactive RedisConnectionFactory for key expiration event handling
    @Component
    internal class ClusterConfigurationProperties(
        springDataProperties: SpringDataProperties
    ) {
        /**
         * Get initial collection of known cluster nodes in format `host:port`.
         * @return
         */
        var nodes = listOf(
            "${springDataProperties.redis.host}:${springDataProperties.redis.port}",
        )
    }

    @Bean
    @Primary
    internal fun reactiveRedisConnectionFactory(
        clusterProperties: ClusterConfigurationProperties,
    ): ReactiveRedisConnectionFactory {

        // declare the config variable here
        val config: RedisConfiguration

        // determine configuration to use based on the environment
        if (profileProperties.active == "prod") {
            // configure Redis Cluster configuration for production
            val clusterConfig = RedisClusterConfiguration(clusterProperties.nodes)
            clusterConfig.setPassword(RedisPassword.of(springDataProperties.redis.password))
            config = clusterConfig
        } else {
            // configure Redis Standalone configuration for non-production
            val staticConfig = RedisStandaloneConfiguration()
            staticConfig.hostName = springDataProperties.redis.host
            staticConfig.port = springDataProperties.redis.port

            staticConfig.setPassword(RedisPassword.of(springDataProperties.redis.password))
            config = staticConfig
        }

        // create client options

        // Create SSL options if SSL is required
        val sslOptions = SslOptions.builder()
            .jdkSslProvider()  // Or use OpenSslProvider if you prefer
            .build()

        // Create timeout options
        val timeoutOptions = TimeoutOptions.builder()
            .fixedTimeout(Duration.ofSeconds(20))
            .timeoutCommands(true)
            .build()

        // cluster specific settings for optimal reliability.
        val clusterTopologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
            .enablePeriodicRefresh(Duration.ofSeconds(5))
            .dynamicRefreshSources(false)
            .closeStaleConnections(true)
            .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(5))
            .enableAllAdaptiveRefreshTriggers().build()

        // create socket options
        val socketOptions = SocketOptions.builder()
            .keepAlive(SocketOptions.DEFAULT_SO_KEEPALIVE)
            .tcpNoDelay(SocketOptions.DEFAULT_SO_NO_DELAY)
            // time to wait for connection to be established, before considering it as a failed connection
            .connectTimeout(Duration.ofSeconds(60))
            .build()

        val mappingFunction: (HostAndPort) -> HostAndPort = { hostAndPort ->
            val host = springDataProperties.redis.host
            val addresses: Array<InetAddress> = try {
                DnsResolvers.JVM_DEFAULT.resolve(host)
            } catch (e: UnknownHostException) {
                e.printStackTrace()
                emptyArray() // Handle error and return an empty array
            }

            val cacheIP = addresses.firstOrNull()?.hostAddress ?: ""
            var finalAddress = hostAndPort

            if (hostAndPort.hostText == cacheIP) {
                finalAddress = HostAndPort.of(host, hostAndPort.port)
            }

            finalAddress
        }

        val resolver = MappingSocketAddressResolver.create(DnsResolvers.JVM_DEFAULT, mappingFunction)

        // customize thread pool size
        val clientResources = DefaultClientResources.builder()
            .ioThreadPoolSize(8)
            .computationThreadPoolSize(8)
            .socketAddressResolver(resolver)
            .build()

        val clusterClientOptionsBuilder = ClusterClientOptions.builder()
            .autoReconnect(true)
            .pingBeforeActivateConnection(true)
            .timeoutOptions(timeoutOptions)
            .socketOptions(socketOptions)
            .topologyRefreshOptions(clusterTopologyRefreshOptions)
            .validateClusterNodeMembership(true)
            .suspendReconnectOnProtocolFailure(true)
            .disconnectedBehavior(DEFAULT_DISCONNECTED_BEHAVIOR)
            .decodeBufferPolicy(DecodeBufferPolicies.ratio(0.5F))
            .requestQueueSize(1000)
            .maxRedirects(DEFAULT_MAX_REDIRECTS)
            .publishOnScheduler(true) //DEFAULT_PUBLISH_ON_SCHEDULER.
            .protocolVersion(ProtocolVersion.RESP3) // Use RESP3 Protocol to ensure AUTH command is used for handshake.

        // conditionally use sslOptions if profileProperties.active is 'prod'
        if (profileProperties.active == "prod") {
            clusterClientOptionsBuilder.sslOptions(sslOptions)
        }

        // build the clientClusterOptions configuration
        val clusterClientOptions = clusterClientOptionsBuilder.build()

        // configure connection pool settings
        fun buildLettucePoolConfig(): GenericObjectPoolConfig<Any> {
            val poolConfig = GenericObjectPoolConfig<Any>()
            poolConfig.maxTotal = 100
            poolConfig.maxIdle = 50
            poolConfig.minIdle = 10
            poolConfig.setMaxWait(Duration.ofSeconds(120))
            poolConfig.timeBetweenEvictionRuns = Duration.ofSeconds(120)
            poolConfig.minEvictableIdleTime = Duration.ofMinutes(5)
            poolConfig.testOnBorrow = true
            poolConfig.testWhileIdle = true
            poolConfig.testOnReturn = true
            poolConfig.blockWhenExhausted = true
            poolConfig.lifo = true
            return poolConfig
        }

        // create Lettuce client configuration with authentication details
        val clientConfigBuilder = LettucePoolingClientConfiguration.builder()
            .readFrom(REPLICA_PREFERRED)
            // maximum time allowed for a Redis command to execute before the operation is considered timed out.
            .commandTimeout(Duration.ofSeconds(60))
            .clientResources(clientResources)
            .clientOptions(clusterClientOptions)
            .poolConfig(buildLettucePoolConfig())

        // conditionally enable SSL only if profileProperties.active is 'prod'
        if (profileProperties.active == "prod") {
            clientConfigBuilder.useSsl()
        }

        // build the clientConfig configuration
        val clientConfig = clientConfigBuilder.build()

        // create Lettuce connection factory
        return LettuceConnectionFactory(config, clientConfig).apply {
            afterPropertiesSet()
            validateConnection = false
            setShareNativeConnection(true)
        }
    }
}