petergdoyle / estreaming

create a twitter-like streaming app with spring-xd, mongodb, and node.js
2 stars 0 forks source link

The Redis Sink and LTRIM #3

Open artembilan opened 8 years ago

artembilan commented 8 years ago

See http://stackoverflow.com/questions/36116475/spring-xd-custom-redis-sink for more info.

This issue for the discussion and sharing solutions for testing and review.

artembilan commented 8 years ago

According to the existing solution with:

  <int-redis:store-outbound-channel-adapter
        id="redisListAdapter" collection-type="LIST" channel="input">

we have in the background:

else if (this.collectionType == CollectionType.LIST) {
    this.writeToList((RedisList<Object>) store, message);
}
...
list.add(payload);
...
public boolean add(E value) {
    listOps.rightPush(value);
    cap();
    return true;
}
...
private void cap() {
    if (capped) {
        listOps.trim(0, maxSize - 1);
    }
}

That looks like exactly what are you looking for: the maxSize retaining after RPUSH.

From here I'd be glad to see the JIRA capped option for the <int-redis:store-outbound-channel-adapter>.

From other hand that should be as some clue that we should fix our onSuccessExpression to this:

 <beans:property name="onSuccessExpression" value="#redisTemplate.boundListOps(${collection}).trim(0, 49)"/>

Let me know how that works for you!

artembilan commented 8 years ago

We even can figure out that with the RedisQueueOutboundChannelAdapter, but as I said you in the SO question, that is fully different queue-like component.

artembilan commented 8 years ago

If you would like to retain the newest elements in the list after RPUSH, you can use an LTRIM like: trim(-50, -1):

127.0.0.1:6379> rpush mylist 1 2 3 4 5
(integer) 5
127.0.0.1:6379> lrange mylist 0 -1
1) "1"
2) "2"
3) "3"
4) "4"
5) "5"
127.0.0.1:6379> ltrim mylist -3 -1
OK
127.0.0.1:6379> lrange mylist 0 -1
1) "3"
2) "4"
3) "5"
petergdoyle commented 8 years ago

I disappeared because we were have a bit of a blizzard here in Colorado and had to clear the snow.

Thanks for responding. I redeployed my redis-capped-sink module with the trim (-50,-1)

    <int-redis:store-outbound-channel-adapter
        id="redisListAdapter" collection-type="LIST" channel="input"
        key="${collection}" auto-startup="false">
        <int-redis:request-handler-advice-chain>
            <beans:bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
                <!-- after the redisTemplate does a lpush into redis, we keep the collection bounded to 50 -->
                <beans:property name="onSuccessExpression" value="#redisTemplate.boundListOps(${collection}).trim(-50,-1)"/>
            </beans:bean>
        </int-redis:request-handler-advice-chain>
    </int-redis:store-outbound-channel-adapter>

But unfortunately again, it seems to have the effect of reversing the insertion order and knocking out the odd-number indexed items. As I continuously push new message through and check in the redis-cli as items are being pushed into the collection, I see this:

127.0.0.1:6379> lrange car_echo_jms_to_redis 0 -1
1) "0000000001"
2) "0000000003"
127.0.0.1:6379> lrange car_echo_jms_to_redis 0 -1
1) "0000000001"
2) "0000000003"
3) "0000000005"
127.0.0.1:6379> lrange car_echo_jms_to_redis 0 -1
1) "0000000001"
2) "0000000003"
3) "0000000005"
127.0.0.1:6379> lrange car_echo_jms_to_redis 0 -1
1) "0000000001"
2) "0000000003"
3) "0000000005"
4) "0000000007"
127.0.0.1:6379> lrange car_echo_jms_to_redis 0 -1
1) "0000000001"
2) "0000000003"
3) "0000000005"
4) "0000000007"
127.0.0.1:6379> lrange car_echo_jms_to_redis 0 -1
1) "0000000001"
2) "0000000003"
3) "0000000005"
4) "0000000007"
5) "0000000009"

I really like the idea of a "capped option for the int-redis:store-outbound-channel-adapter" if we cannot achieve this with the redisTemplate.boundListOps(${collection}).trim(-50,-1) parameters.

The idea for this use-case is to provide a stream buffer with redis that allows a streaming http server to establish connection to redis (like redis-stream) and allow clients to reposition themselves at a position the buffer should a disconnection occur and data would not be lost if a connection with an offset is reestablished. Otherwise if they disconnect, and the redis-stream uses a pop then we are indeed simulating a p2p queue rather than a capped buffer - so if I am off base with that idea, then please let me know, as I don't want to go down a bad path but maybe that is just a different architectural pattern other than the queue that has been implemented?

Thanks!

petergdoyle commented 8 years ago

and with #redisTemplate.boundListOps(${collection}).trim(0,49) we sort of have the same problem...

127.0.0.1:6379> lrange car_echo_jms_to_redis 0 -1
1) "0000000002"
2) "0000000004"
127.0.0.1:6379> lrange car_echo_jms_to_redis 0 -1
1) "0000000002"
2) "0000000004"
3) "0000000006"
127.0.0.1:6379> lrange car_echo_jms_to_redis 0 -1
1) "0000000002"
2) "0000000004"
3) "0000000006"
4) "0000000008"
127.0.0.1:6379> lrange car_echo_jms_to_redis 0 -1
1) "0000000002"
2) "0000000004"
3) "0000000006"
4) "0000000008"
5) "0000000010"
artembilan commented 8 years ago

That odd/even play looks really strange for me. How does it look without any trim()?

How about to the same just with direct RedisTemplate.boundListOps(listName) and some <service-activator> around that?

petergdoyle commented 8 years ago

with the

<beans:property name="onSuccessExpression" value="#redisTemplate.boundListOps(${collection}).trim()"/>

same thing

127.0.0.1:6379> lrange car_echo_jms_to_redis 0 -1
1) "0000000002"
2) "0000000004"
127.0.0.1:6379> lrange car_echo_jms_to_redis 0 -1
1) "0000000002"
2) "0000000004"
3) "0000000006"
127.0.0.1:6379> lrange car_echo_jms_to_redis 0 -1
1) "0000000002"
2) "0000000004"
3) "0000000006"
4) "0000000008"
127.0.0.1:6379> lrange car_echo_jms_to_redis 0 -1
1) "0000000002"
2) "0000000004"
3) "0000000006"
4) "0000000008"
5) "0000000010"
127.0.0.1:6379> lrange car_echo_jms_to_redis 0 -1
1) "0000000002"
2) "0000000004"
3) "0000000006"
4) "0000000008"
5) "0000000010"
6) "0000000012"
127.0.0.1:6379> lrange car_echo_jms_to_redis 0 -1
1) "0000000002"
2) "0000000004"
3) "0000000006"
4) "0000000008"
5) "0000000010"
6) "0000000012"
7) "0000000014"

But if I just run the trim directly from the Unit Test as it builds...


    @Test
    public void test() {
        applicationContext.start();
        input.send(new GenericMessage<String>("hello"));
        assertEquals("hello", redisTemplate.boundListOps("mycollection").leftPop(5, TimeUnit.SECONDS));
        redisTemplate.boundListOps("car_echo_jms_to_redis").trim(0, 4);
    }

then we get the desired behavior (on the same list) as we trim down from index 0 to 4 inclusively and dropped off everything older than the top 4

127.0.0.1:6379> lrange car_echo_jms_to_redis 0 -1
1) "0000000002"
2) "0000000004"
3) "0000000006"
4) "0000000008"
5) "0000000010"

but that is not the order we want anyway, it should be in descending order (i have created ascending numbers in the jms message generator) and we are building a FIFO queue

and if i modify the unit test code to call the trim after the leftPush as we are hoping the RedisQueueOutboundChannelAdapter is doing...

    @Test
    public void test() {
        applicationContext.start();
        input.send(new GenericMessage<String>("hello"));
        assertEquals("hello", redisTemplate.boundListOps("mycollection").leftPop(5, TimeUnit.SECONDS));

        String template = "%010d";
        for (int i = 0; i < 10; i++) {
            redisTemplate.boundListOps("car_echo_jms_to_redis").leftPush(String.format(template, i));
            redisTemplate.boundListOps("car_echo_jms_to_redis").trim(0, 4);
        }
    }

then we get the desired effect: the list is in the correct entry order and the last 5 items inserted are maintained and the older ones sluffed off - but this behavior is not happening through the RedisQueueOutboundChannelAdapter it would seem.

127.0.0.1:6379> del car_echo_jms_to_redis
(integer) 1
127.0.0.1:6379> lrange car_echo_jms_to_redis 0 -1
1) "0000000009"
2) "0000000008"
3) "0000000007"
4) "0000000006"
5) "0000000005"
artembilan commented 8 years ago

Hello, @petergdoyle !

Sorry for delay: we were busy with Spring AMQP 1.6 release.

Well, let me answer to your concerns if I understand them correctly!

  1. <int-redis:store-outbound-channel-adapter> is based on the RedisStoreWritingMessageHandler, where for LIST store it uses:
public boolean add(E value) {
    listOps.rightPush(value);
    cap();
    return true;
}

So, to "cat" for the oldest item we should use the reverse trim - LTRIM -50 -1

  1. The RedisQueueOutboundChannelAdapter is a MessageHandler for the <int-redis:queue-outbound-channel-adapter> and yes, its logic is leftPush() by default. That is for queue simulation because on the other side we have <int-redis:queue-inbound-channel-adapter> who performs rightPop() (by default).
  2. For the <int-redis:queue-outbound-channel-adapter> we also can configure <int-redis:request-handler-advice-chain> for the ExpressionEvaluatingRequestHandlerAdvice with desired direct LTRIM logic:
<beans:property name="onSuccessExpression" value="#redisTemplate.boundListOps(${collection}).trim(0, 49)"/>

to achieve this behavior:

127.0.0.1:6379> lpush list 1 2 3 4 5
(integer) 5
127.0.0.1:6379> lrange list 0 -1
1) "5"
2) "4"
3) "3"
4) "2"
5) "1"
127.0.0.1:6379> ltrim list 0 2
OK
127.0.0.1:6379> lrange list 0 -1
1) "5"
2) "4"
3) "3"
  1. Since you see in your XD stream some strange odd/even results, I think we should count something who POP data from that list. No?
  2. Finally as you noticed the code is pretty simple to bypass all those SI Redis components and use RedisTemplate directly from the custom service:
public void storeToCappedList(Integer payload) {
    redisTemplate.boundListOps("car_echo_jms_to_redis").leftPush(String.format(template, payload));
    redisTemplate.boundListOps("car_echo_jms_to_redis").trim(0, 4);
}
...
<service-activator input-channel="input" ref="myRedisService" method="storeToCappedList"/>

The collection can be configured on that service.

P.S. IMO we end up with the same logic and behavior. You should seek a guilty somewhere else.