spring-projects / spring-data-redis

Provides support to increase developer productivity in Java when using Redis, a key-value store. Uses familiar Spring concepts such as a template classes for core API usage and lightweight repository style data access.
https://spring.io/projects/spring-data-redis/
Apache License 2.0
1.77k stars 1.17k forks source link

ObjectHashMapper not able to resolve type alias [DATAREDIS-1179] #1753

Closed spring-projects-issues closed 4 years ago

spring-projects-issues commented 4 years ago

Kevin Madhu opened DATAREDIS-1179 and commented

Currently, I'm writing an object into Redis stream using ObjectRecord and when I try to read the same entry from the stream using another project with the target type as a class with the same properties, the program just crashes throwing an Exception.

 

java.lang.IllegalArgumentException: Value must not be null!java.lang.IllegalArgumentException: Value must not be null! at org.springframework.util.Assert.notNull(Assert.java:198) at org.springframework.data.redis.connection.stream.Record.of(Record.java:81) at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147) at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecord(StreamObjectMapper.java:132) at org.springframework.data.redis.core.StreamObjectMapper.map(StreamObjectMapper.java:158) at org.springframework.data.redis.core.StreamOperations.read(StreamOperations.java:377) at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$2(DefaultStreamMessageListenerContainer.java:232) at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:138) at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:123) at java.lang.Thread.run(Thread.java:748)
2020-07-07 18:32:05.270  WARN 32376 --- [scoveryClient-0] c.netflix.discovery.TimedSupervisorTask  : task supervisor timed out
java.util.concurrent.TimeoutException: null at java.util.concurrent.FutureTask.get(FutureTask.java:205) at com.netflix.discovery.TimedSupervisorTask.run(TimedSupervisorTask.java:68) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) at java.util.concurrent.FutureTask.run(FutureTask.java) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

 

The code could be found in the following links under the multitenancy branches. https://github.com/muellners/finscale-core https://github.com/muellners/finscale-identity

 

The classes in these projects relevant to this issue are:

1) org.muellners.finscale.core.config.RedisConfiguration

https://github.com/muellners/finscale-core/blob/multitenancy/src/main/kotlin/org/muellners/finscale/core/config/RedisConfiguration.kt

 

@Configuration
class RedisConfiguration() {
    private val log = LoggerFactory.getLogger(javaClass)    @Bean
    fun redisConnectionFactory(): LettuceConnectionFactory {
        log.info("Configuring Redis connection")
        return LettuceConnectionFactory()
    }
}

 

2) org.muellners.finscale.core.service.impl.TenantServiceImpl

https://github.com/muellners/finscale-core/blob/multitenancy/src/main/kotlin/org/muellners/finscale/core/service/impl/TenantServiceImpl.kt

 

override fun save(tenantDTO: TenantDTO): TenantDTO {
    log.debug("Request to save Tenant : {}", tenantDTO)        var tenant = tenantMapper.toEntity(tenantDTO)
    tenant = tenantRepository.save(tenant)
    val tenantDTO = tenantMapper.toDto(tenant)
    val tenantRecord: ObjectRecord<String, TenantDTO> = StreamRecords.newRecord()
        .ofObject(tenantDTO).withStreamKey("core:tenants")
    this.redisTemplate.opsForStream<String, TenantDTO>().add(tenantRecord)        return tenantDTO
}

 

3) org.muellners.finscale.identity.config.RedisConfiguration

https://github.com/muellners/finscale-identity/blob/multitenancy/src/main/kotlin/org/muellners/finscale/identity/config/RedisConfiguration.kt

 

class RedisConfiguration() {
    private val log = LoggerFactory.getLogger(javaClass)    @Bean
    fun redisConnectionFactory(): LettuceConnectionFactory {
        log.info("Configuring Redis connection")
        return LettuceConnectionFactory()
    }
}

 

 

4) org.muellners.finscale.identity.config.RedisStreamConfiguration

https://github.com/muellners/finscale-identity/blob/multitenancy/src/main/kotlin/org/muellners/finscale/identity/config/RedisStreamConfiguration.kt

 

@Configuration
class RedisStreamConfiguration(
    private val streamListener: StreamListener<String, ObjectRecord<String, TenantDTO>>,
    private val redisTemplate: StringRedisTemplate
) {
    val streamName = "core:tenants" // Should be made a constant provided by the finscale-multitenancy lib project
    val consumerGroupName = "identity-service" // Should be made some kind of project constant    @PostConstruct
    fun createTenantConsumerGroup() {
        redisTemplate.execute(
            RedisCallback { connection: RedisConnection ->
                val result = connection.execute(CommandType.XGROUP.name,
                    CommandKeyword.CREATE.name.toByteArray(), streamName.toByteArray(),
                    consumerGroupName.toByteArray(), ReadOffset.latest().offset.toByteArray(),
                    "MKSTREAM".toByteArray())                result
            } as RedisCallback<Any>)
    }    @Bean
    fun tenantSubscription(redisConnectionFactory: RedisConnectionFactory): Subscription {
        val options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
            .builder()
            .pollTimeout(Duration.ofSeconds(1))
            .targetType(TenantDTO::class.java)
            .build()        val listenerContainer = StreamMessageListenerContainer
            .create(redisConnectionFactory, options)
        val subscription = listenerContainer.receiveAutoAck(
            Consumer.from(consumerGroupName, InetAddress.getLocalHost().hostName),
            StreamOffset.create(streamName, ReadOffset.lastConsumed()),
            this.streamListener)        listenerContainer.start()
        return subscription
    }
}

 

 

5) org.muellners.finscale.identity.service.TenantConsumerService

https://github.com/muellners/finscale-identity/blob/multitenancy/src/main/kotlin/org/muellners/finscale/identity/service/TenantConsumerService.kt

 

@Service
class TenantConsumerService(
    private val redisTemplate: RedisTemplate<String, String>
) : StreamListener<String, ObjectRecord<String, TenantDTO>> {    override fun onMessage(message: ObjectRecord<String, TenantDTO>) {
        println("New tenant created!!!")
        println(message.value)
    }
}

 

 Something I've noted is that when I replaced 

ObjectRecord<String, TenantDTO>>

with

Map<String, String>>

everything worked. So, it's got something to do with the mapper to an object I guess.

Inside org.springframework.data.redis.connection.stream.MapRecord, there's this function which is called

default <OV> ObjectRecord<S, OV> toObjectRecord(HashMapper<? super OV, ? super K, ? super V> mapper) {
      return Record.<S, OV> of((OV) mapper.fromHash((Map)
   getValue())).withId(getId())
   .withStreamKey(getStream());
}

In this, mapper.fromHash((Map) getValue() returns null


Affects: 2.3.1 (Neumann SR1)

Attachments:

Referenced from: pull request https://github.com/spring-projects/spring-data-redis/pull/548

Backported to: 2.3.2 (Neumann SR2)

spring-projects-issues commented 4 years ago

Christoph Strobl commented

The setup seems rather complex which makes it hard to narrow down the cause of the issue. Can you craft a minimal sample reproducing the error? Something like a failing integration test maybe?

spring-projects-issues commented 4 years ago

Kevin Madhu commented

Christoph Strobl Please take a look at this project. I've removed all the unnecessary dependencies and code and narrowed it down to the issue. It only needs Redis to be running.

"GET /trigger" request triggers the addition of record to the Redis stream.  https://github.com/kevin-madhu/spring-data-redis-example

spring-projects-issues commented 4 years ago

Kevin Madhu commented

Christoph Strobl Please take a look at this project. I've removed all the unnecessary dependencies and code and narrowed it down to the issue. It only needs Redis to be running.

"GET /trigger" request triggers the addition of record to the Redis stream.  https://github.com/kevin-madhu/spring-data-redis-example

spring-projects-issues commented 4 years ago

Christoph Strobl commented

As it seems the @TypeAlias is only used when writing the entity but not on read. Can you please remove it for now while we investigate the issue in more detail. 

spring-projects-issues commented 4 years ago

Kevin Madhu commented

Christoph Strobl I've commented it out now

spring-projects-issues commented 4 years ago

Kevin Madhu commented

Christoph Strobl Thanks for the fix :)