spring-projects / spring-data-redis

Provides support to increase developer productivity in Java when using Redis, a key-value store. Uses familiar Spring concepts such as a template classes for core API usage and lightweight repository style data access.
https://spring.io/projects/spring-data-redis/
Apache License 2.0
1.76k stars 1.16k forks source link

Incompatibility with AWS ElastiCache Serverless 7.0 #2815

Closed fripoli closed 9 months ago

fripoli commented 10 months ago

We are trialing the migration from ElastiCache Redis to ElastiCache Serverless 7.0, both within AWS, and the startup fails with the following error:

"Caused by: io.lettuce.core.RedisCommandExecutionException: ERR unknown command 'psubscribe', with args beginning with: __keyevent@*__:expired"

The problem seems to be that the serverless version of AWS Elasticache does not support the psubscribe command as it can be seen here https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/SupportedCommands.html#RestrictedCommandsRedis

currently using version 3.1.6

also using the annotation on our configuration

@EnableRedisRepositories(enableKeyspaceEvents = RedisKeyValueAdapter.EnableKeyspaceEvents.ON_STARTUP,
    keyspaceNotificationsConfigParameter = "")

any suggestions around it?

mp911de commented 9 months ago

If your Redis server doesn't support pattern subscriptions then please disable keyspace events via EnableKeyspaceEvents.OFF. Redis Repositories require subscriptions for index maintenance when using TTL. We cannot listen using channels as each key has its own unique identifier and we cannot know these upfront. Also, we would exhaust the subscription table by creating a subscription for each possible channel.

dreamstar-enterprises commented 4 days ago

I'm getting the same error with AWS Redis Cache Serverless: https://github.com/redisson/redisson/issues/3992

I don't have the annotation @EnableRedisRepositories, only the following - what should I do to turn off keyspaceevents

@Configuration @EnableWebFluxSecurity @EnableRedisIndexedWebSession @Order(Ordered.LOWEST_PRECEDENCE - 1) internal class BffSecurityConfig ( private val serverProperties: ServerProperties, ) {

/**
 * Establishes a Connection (factory) with Redis
 */
@Configuration
internal class RedisConnectionFactoryConfig(
    private val springDataProperties: SpringDataProperties,
    private val profileProperties: ProfileProperties
) {

    // REDDISSON
    @Bean
    fun redissonClient(): RedissonClient {
        val config = Config()

        if (profileProperties.active == "prod") {

            // Use cluster server mode configuration for production
            val clusterConfig = config.useClusterServers()
                .addNodeAddress(
                    "rediss://${springDataProperties.redis.host}:${springDataProperties.redis.port}"
                )
                .setScanInterval(2000) // Cluster state scan interval in milliseconds
                .setTimeout(60000) // Command timeout
                .setRetryAttempts(3) // Retry attempts for failed commands
                .setRetryInterval(1500) // 1.5 seconds between retry attempts
                .setConnectTimeout(60000) // 60 seconds
                .setMasterConnectionPoolSize(100)
                .setMasterConnectionMinimumIdleSize(10)
                .setIdleConnectionTimeout(10000)
                .setSubscriptionsPerConnection(5)
                .setSslEnableEndpointIdentification(true) // enable endpoint identification for SSL

            val redisPassword = springDataProperties.redis.password
            if (redisPassword.isNotBlank()) {
                clusterConfig.setPassword(redisPassword)
            }

            // Additional SSL configuration if needed
            clusterConfig
                .setSslTruststore(null) // Replace with actual truststore file path if necessary
                .setSslKeystore(null) // Replace with actual keystore file path if necessary
        } else {

            // Use single server mode configuration for non-production
            val singleServerConfig = config.useSingleServer()
                .setAddress("redis://${springDataProperties.redis.host}:${springDataProperties.redis.port}")
                .setTimeout(60000) // Command timeout
                .setRetryAttempts(3) // Retry attempts for failed commands
                .setRetryInterval(1500) // 1.5 seconds between retry attempts
                .setConnectTimeout(60000) // 60 seconds
                .setConnectionPoolSize(100)
                .setConnectionMinimumIdleSize(10)
                .setIdleConnectionTimeout(10000) // 10 seconds
                .setSubscriptionsPerConnection(5)
                .setSslEnableEndpointIdentification(false) // disable endpoint identification for SSL

            val redisPassword = springDataProperties.redis.password
            if (redisPassword.isNotBlank()) {
                singleServerConfig.setPassword(redisPassword)
            }

            // Additional SSL configuration if needed
            singleServerConfig
                .setSslTruststore(null) // Replace with actual truststore file if needed
                .setSslKeystore(null) // Replace with actual keystore file if needed
        }

        // Return the configured Redisson client
        return Redisson.create(config)
    }

    // reactive RedisConnectionFactory for key expiration event handling
    @Bean
    @Primary
    fun reactiveRedisConnectionFactory(redissonClient: RedissonClient): ReactiveRedisConnectionFactory {
        return RedissonConnectionFactory(redissonClient)
    }

    // LETTUCE
    // reactive RedisConnectionFactory for key expiration event handling
    /*
    @Bean
    @Primary
    fun reactiveRedisConnectionFactory(): ReactiveRedisConnectionFactory {

        // configure Redis standalone configuration
        val config = RedisStandaloneConfiguration()
        config.hostName = springDataProperties.redis.host
        config.port = springDataProperties.redis.port
        config.setPassword(RedisPassword.of(springDataProperties.redis.password))

        // create client options

        // Create SSL options if SSL is required
        val sslOptions = SslOptions.builder()
            .jdkSslProvider()  // Or use OpenSslProvider if you prefer
            .build()

        // Create timeout options
        val timeoutOptions = TimeoutOptions.builder()
            .fixedTimeout(Duration.ofSeconds(20))
            .timeoutCommands(true)
            .build()

        // cluster specific settings for optimal reliability.
        val clusterTopologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
            .enablePeriodicRefresh(Duration.ofSeconds(5))
            .dynamicRefreshSources(false)
            .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(5))
            .enableAllAdaptiveRefreshTriggers().build()

        // create socket options
        val socketOptions = SocketOptions.builder()
            .keepAlive(SocketOptions.DEFAULT_SO_KEEPALIVE)
            .tcpNoDelay(SocketOptions.DEFAULT_SO_NO_DELAY)
            // time to wait for connection to be established, before considering it as a failed connection
            .connectTimeout(Duration.ofSeconds(60))
            .build()

        val mappingFunction: (HostAndPort) -> HostAndPort = { hostAndPort ->
            val host = springDataProperties.redis.host
            val addresses: Array<InetAddress> = try {
                DnsResolvers.JVM_DEFAULT.resolve(host)
            } catch (e: UnknownHostException) {
                e.printStackTrace()
                emptyArray() // Handle error and return an empty array
            }

            val cacheIP = addresses.firstOrNull()?.hostAddress ?: ""
            var finalAddress = hostAndPort

            if (hostAndPort.hostText == cacheIP) {
                finalAddress = HostAndPort.of(host, hostAndPort.port)
            }

            finalAddress
        }

        val resolver = MappingSocketAddressResolver.create(DnsResolvers.JVM_DEFAULT, mappingFunction)

        // customize thread pool size
        val clientResources = DefaultClientResources.builder()
            .ioThreadPoolSize(8)
            .computationThreadPoolSize(8)
            .socketAddressResolver(resolver)
            .build()

        val clusterClientOptionsBuilder = ClusterClientOptions.builder()
            .autoReconnect(true)
            .pingBeforeActivateConnection(true)
            .timeoutOptions(timeoutOptions)
            .socketOptions(socketOptions)
            .topologyRefreshOptions(clusterTopologyRefreshOptions)
            .validateClusterNodeMembership(true)
            .suspendReconnectOnProtocolFailure(true)
            .disconnectedBehavior(DEFAULT_DISCONNECTED_BEHAVIOR)
            .decodeBufferPolicy(DecodeBufferPolicies.ratio(0.5F))
            .requestQueueSize(1000)
            .maxRedirects(DEFAULT_MAX_REDIRECTS)
            .publishOnScheduler(true) //DEFAULT_PUBLISH_ON_SCHEDULER.
            .protocolVersion(ProtocolVersion.RESP3) // Use RESP3 Protocol to ensure AUTH command is used for handshake.

        // conditionally use sslOptions if profileProperties.active is 'prod'
        if (profileProperties.active == "prod") {
            clusterClientOptionsBuilder.sslOptions(sslOptions)
        }

        // build the clientClusterOptions configuration
        val clusterClientOptions = clusterClientOptionsBuilder.build()

        // configure connection pool settings
        fun buildLettucePoolConfig(): GenericObjectPoolConfig<Any> {
            val poolConfig = GenericObjectPoolConfig<Any>()
            poolConfig.maxTotal = 100
            poolConfig.maxIdle = 50
            poolConfig.minIdle = 10
            poolConfig.setMaxWait(Duration.ofSeconds(120))
            poolConfig.timeBetweenEvictionRuns = Duration.ofSeconds(120)
            poolConfig.minEvictableIdleTime = Duration.ofMinutes(5)
            poolConfig.testOnBorrow = true
            poolConfig.testWhileIdle = true
            poolConfig.testOnReturn = true
            poolConfig.blockWhenExhausted = true
            poolConfig.lifo = true
            return poolConfig
        }

        // create Lettuce client configuration with authentication details
        val clientConfigBuilder = LettucePoolingClientConfiguration.builder()
            // maximum time allowed for a Redis command to execute before the operation is considered timed out.
            .commandTimeout(Duration.ofSeconds(60))
            .clientResources(clientResources)
            .clientOptions(clusterClientOptions)
            .poolConfig(buildLettucePoolConfig())

        // conditionally enable SSL only if profileProperties.active is 'prod'
        if (profileProperties.active == "prod") {
            clientConfigBuilder.useSsl()
        }

        // build the clientConfig configuration
        val clientConfig = clientConfigBuilder.build()

        // create Lettuce connection factory
        return LettuceConnectionFactory(config, clientConfig).apply {
            afterPropertiesSet()
            validateConnection = false
            setShareNativeConnection(true)
        }
    }
    */
}