redis / redis-om-spring

Spring Data Redis extensions for better search, documents models, and more
MIT License
603 stars 93 forks source link

How to achieve high throughput using OM Library #383

Closed dsinghal-nice closed 7 months ago

dsinghal-nice commented 8 months ago

I have a requirement where I have to use Redis cache for storing event data received from various streams, and fetch these data by scanning the keys and returning the data in JSON format. Data will be stored for a day only, but will be in millions. Request per second will be 3000 for fetching the data, I am using redis server with redis json module only.

I have created a multi-threaded environment to first scan all the keys and then multiple threads will fire JSON.MGET command to fetch the data. But even doing this I am not able to get the latency below <100ms, the scan command is taking more than 1000 ms to fetch all the keys. I am currently doing this simulation on 7000 JSON objects only, but in real-time it will be in millions.

justTimTim commented 8 months ago

in this case you will scan all rows without any indexes

Time complexity: O(M*N) when path is evaluated to a single value where M is the number of keys and N is the size of the value, O(N1+N2+...+Nm) when path is evaluated to multiple values where m is the number of keys and Ni is the size of the i-th key

bsbodden commented 8 months ago

@dsinghal-nice Can you post a small example/benchmark of what you're doing? There hasn't been a strong focus on performance yet but I'll be happy to see how we can improve your scenario.

dsinghal-nice commented 8 months ago

Yes I tried that way, but when the number of users increase, the scan commands slows down and and even the JSON.GET also fetches the data slowly causing the latency to reach as high as 10 seconds which is not ideal in any scenario.

justTimTim commented 8 months ago

Yes I tried that way, but when the number of users increase, the scan commands slows down and and even the JSON.GET also fetches the data slowly causing the latency to reach as high as 10 seconds which is not ideal in any scenario.

use FT.SEARCH

dsinghal-nice commented 8 months ago

@bsbodden Below is my use case: I am building a real time dashboard and it will be displayed to thousands of customer at the same time. My dashboard also has an automatic refresh of 7 seconds. Data will be stored in Redis for a day only, user can filter data based on some criteria or fetch complete data (Criteria could be for example Team ID, Date & Time, etc.).

Below is the code that I have for this (Remember I have only RedisJSON module available with me and not RediSearch)

I have below code for firing the commands into Redis

public class CommandHelper {

    private Logger LOGGER = LoggerFactory.getLogger(CommandHelper.class);
    JedisPooled unifiedJedis;

    CommandHelper(JedisPooled jedisPooled) {
        this.unifiedJedis = jedisPooled;
    }
    public List<String> searchKeysWithPattern(String pattern) {
        // AbstractTransaction transaction = unifiedJedis.multi();
        String cursor = "0";
        ScanParams scanParams = new ScanParams();
        scanParams.match(pattern);
        scanParams.count(Integer.MAX_VALUE);
        List<String> keys = null;
        do {
            ScanResult<String> searchResult = unifiedJedis.scan(cursor, scanParams);
            if (keys == null)
                keys = new ArrayList<>();
            keys.addAll(searchResult.getResult());
            cursor = searchResult.getCursor();
        } while (!"0".equals(cursor));
        return keys;
    }

    public <T> List<T> getAllResultsUsingMGET(List<String> keys, Class<? extends T> clazz) {
        return this.getAllResultsUsingMGETAndPath(keys, clazz, ROOT_PATH);
    }

    public <T> List<T> getAllResultsUsingMGETAndPath(List<String> keys, Class<? extends T> clazz, Path2 path2) {
        String[] arrKeys = keys.toArray(new String[keys.size()]);
        ObjectMapper mapper = new ObjectMapper();
        List<T> result = new ArrayList<>();
        List<JSONArray> values = unifiedJedis.jsonMGet(path2, arrKeys);
        values.forEach(value -> {
            try {
                if(value != null && value.length() > 0) {
                    T convertedResult = mapper.readValue(value.get(0).toString(), clazz);
                    result.add(convertedResult);
                }
            } catch (JsonProcessingException e) {
                LOGGER.error("Parsing Exception occurred ", e);
                throw new RuntimeException(e);
            }
        });
        return result;
    }
}

I use this helper class in my aggregation class, which works upon a request object, validates the request object, then scans the keys from Redis based on a keyPattern provided, then fetches the object itself and performs aggregation if required.

@Component
public class Aggregator<T, R>{

    private Logger LOGGER = LoggerFactory.getLogger(Aggregator.class);

    @Autowired
    CommandHelper commandHelper;

    @Autowired
    BaseRequestValidator validator;

    @Autowired
    ThreadPoolTaskExecutor redisThreadPoolExecutor;

    private static int MAX_QUERY_LIMIT = 10000;

    private static int MAX_KEYS_TO_PROCESS = 1000;

    public List<?> performAggregation(
            BaseRequest baseRequest,
            String keyPattern,
            Object sourceObject,
            R responseObject) {
        List<String> keys = null;
        Instant validatorStart = Instant.now();
        boolean isValidRequest = validator.validateRequest(baseRequest, sourceObject);
        Instant validatorEnd = Instant.now();
        Duration timeElapsed = Duration.between(validatorStart, validatorEnd);
        LOGGER.info(MessageFormat.format("Time taken for validation by in milliseconds: {0}", timeElapsed.toMillis()));
        List<?> finalResult = null;
        if (isValidRequest) {
            // Flow will be to first scan and filter all records
            // Second, if there's a group by requirement then perform group by
            // Finally, if sorting needs to be performed then do that
            Instant fetchDataStart = Instant.now();
            keys = commandHelper.searchKeysWithPattern(keyPattern);
            LOGGER.info("keys from redis scan size ::: " + keys.size());
            Instant fetchDataEnd = Instant.now();
            timeElapsed = Duration.between(fetchDataStart, fetchDataEnd);
            LOGGER.info("Time taken for fetching all keys by in milliseconds: " + timeElapsed.toMillis());

            List<T> result;
            if (!keys.isEmpty()) {
                fetchDataStart = Instant.now();
                result = fetchDataFromRedisWithApplyAsync(keys, sourceObject, baseRequest);
                fetchDataEnd = Instant.now();
                timeElapsed = Duration.between(fetchDataStart, fetchDataEnd);

                LOGGER.info("Time taken for fetching all data by async method in milliseconds: " + timeElapsed.toMillis());
                int limit = baseRequest.getLimit() > 0 && baseRequest.getLimit() < MAX_QUERY_LIMIT ? baseRequest.getLimit(): MAX_QUERY_LIMIT;
                Instant aggregationStart = Instant.now();
                finalResult = aggregateResults(baseRequest, result, responseObject)
                        .stream()
                        .limit(limit)
                        .collect(Collectors.toList());
                Instant aggregationEnd = Instant.now();
                timeElapsed = Duration.between(aggregationStart, aggregationEnd);
                LOGGER.info("Time taken for aggregation only in milliseconds: " + timeElapsed.toMillis());
                LOGGER.info("After aggregation finished with limit the result is :: " + finalResult.size());
            } else {
                LOGGER.info("No keys found, process ends now");
            }
        }
        Instant aggregationEnd = Instant.now();
        timeElapsed = Duration.between(validatorStart, aggregationEnd);
        LOGGER.info("Total Time taken for processing all data in milliseconds: " + timeElapsed.toMillis());
        return finalResult;
    }

    private List<T> fetchDataFromRedisWithApplyAsync (List<String> keys, Object sourceObject, BaseRequest baseRequest) {
        List<T> result;
        if(!keys.isEmpty()) {
            if(keys.size() < MAX_KEYS_TO_PROCESS) {
                result = (List<T>) commandHelper.getAllResultsUsingMGET(keys, sourceObject.getClass());
            }
            else {
                result = new ArrayList<>();
                int threadCount = keys.size() / MAX_KEYS_TO_PROCESS;
                int keyCntStart = 0;
                int keyCntEnd;
                Instant splitStart1 = Instant.now();

                List<List<String>> subList = new ArrayList<>();
                for (int i = 0; i < threadCount; i++) {
                    keyCntEnd = keyCntStart + MAX_KEYS_TO_PROCESS;
                    if (i >= MAX_KEYS_TO_PROCESS || keyCntEnd > keys.size()) {
                        keyCntEnd = keys.size() - 1;
                    }
                    subList.add(keys.subList(keyCntStart, keyCntEnd));
                    keyCntStart = keyCntEnd;
                }
                Instant splitEnd1 = Instant.now();
                Duration timeElapsed = Duration.between(splitStart1, splitEnd1);
                LOGGER.info("sublist created of size ::: " + subList.size() + " in milliseconds " + timeElapsed.toMillis());

                List<CompletableFuture<CompletableFuture<List<T>>>> finalResult = subList.stream()
                        .map(keysList -> {
                            return fetchJsonObjects(keysList)
                                    .thenApplyAsync(responseData -> convertJsonToObject(responseData, sourceObject.getClass()))
                                    /*.whenCompleteAsync((listCompletableFuture, throwable) -> {
                                        try {
                                            filterRecordsCompletableFuture(baseRequest, listCompletableFuture.get());
                                        } catch (InterruptedException e) {
                                            throw new RuntimeException(e);
                                        } catch (ExecutionException e) {
                                            throw new RuntimeException(e);
                                        }
                                    })*/;
                        })
                        .collect(Collectors.toList());

                List<CompletableFuture> futureUsers = finalResult.stream().map(CompletableFuture::join).collect(Collectors.toList());
                CompletableFuture listFuture = CompletableFuture.allOf(futureUsers.toArray(new CompletableFuture<?>[0]));
                try {
                    listFuture.get();
                    List threadRes = futureUsers.stream().parallel().map(a -> {
                                try {
                                    return a.get();
                                } catch (InterruptedException e) {
                                    throw new RuntimeException(e);
                                } catch (ExecutionException e) {
                                    throw new RuntimeException(e);
                                }
                            })
                            .collect(Collectors.toList());
                    threadRes.forEach(res -> {
                        result.addAll((Collection<? extends T>) res);
                    });
                    // LOGGER.info("results after joining and filtering data ::::: " + threadRes.size());
                } catch (Exception e) {
                    LOGGER.error("Exception occurred ::: ", e);
                    throw new RuntimeException(e);
                }
            }
        } else {
            result = new ArrayList<>();
        }
        return result;
    }

    private CompletableFuture<List<JSONArray>> fetchJsonObjects(List<String> keys) {
        return CompletableFuture.supplyAsync(() -> {
            // return commandHelper.getJsonObjectFromRedis(keys);
            return commandHelper.executePipeline(keys);
        }, redisThreadPoolExecutor).exceptionally(throwable -> {
            LOGGER.error("Exception occur in fetching Redis JSON objects ", throwable);
            return null;
        });
    }

    private CompletableFuture<List<T>> convertJsonToObject(List<JSONArray> jsonObjects, Class clazz) {
        return CompletableFuture.supplyAsync(() -> {
            ObjectMapper mapper = new ObjectMapper();
            List<T> result = new ArrayList<>();
            jsonObjects.forEach(value -> {
                try {
                    if(value != null && value.length() > 0) {
                        T convertedResult = (T) mapper.readValue(value.get(0).toString(), clazz);
                        result.add(convertedResult);
                    }
                } catch (JsonProcessingException e) {
                    LOGGER.error("Parsing Exception occurred ", e);
                    throw new RuntimeException(e);
                }
            });
            return result;
        }, redisThreadPoolExecutor).exceptionally(throwable -> {
            LOGGER.error("Exception occur in fetching Redis JSON objects " + throwable);
            return null;
        });
    }

    private List<?> aggregateResults(BaseRequest baseRequest, List<T> result, R responseObject) {
        List<R> groupByRecords = null;
        if (baseRequest.getFilterFields() != null) {
            this.filterRecords(baseRequest, result);
            LOGGER.info("After filtering the result is :: " + result.size());
        }
        if (baseRequest.getGroupByFields() != null) {
            groupByRecords = this.doRecordsGrouping(baseRequest, result, responseObject);
            LOGGER.info("After grouping the result is :: " + groupByRecords.size());
        }
        if (baseRequest.getOrderByFields() != null) {
            if (groupByRecords != null) {
                this.sortRecords(baseRequest, groupByRecords);
                LOGGER.info("After aggregation the result is :: " + groupByRecords.size());
            } else {
                this.sortRecords(baseRequest, result);
                LOGGER.info("After aggregation the result is :: " + result.size());
            }
        }
        return groupByRecords != null ? groupByRecords : result;
    }

    private void filterRecords (BaseRequest baseRequest, List<T> result){
        Instant filterStart = Instant.now();
        if (baseRequest.getFilterFields() != null) {
            result.removeIf(res -> {
                AtomicBoolean filterData = new AtomicBoolean(true);
                if (baseRequest.getFilterFields() != null) {
                    for (FilterFields filterField : baseRequest.getFilterFields()) {
                        for (String value : filterField.getValues()) {
                            Object obj = PropertyLocator.getPropertyValue(res, filterField.getFieldName());
                            // Below logic will expand to all types of DataTypes available
                            filterData.set(filterData.get() && !matchValue(obj, value));
                        }
                    }
                }
                return filterData.get();
            });
            LOGGER.info("After removing elements the result is :: " + result.size());
        }
        Instant filterEnd = Instant.now();
        Duration timeElapsed = Duration.between(filterStart, filterEnd);
        LOGGER.info("Time taken for filtering by in milliseconds: "+ timeElapsed.toMillis());
    }

    private boolean matchValue(Object objValue, String matchWith) {
        if(objValue instanceof Integer) {
            if(objValue.equals(Integer.valueOf(matchWith))) {
                return true;
            }
        } else if(objValue instanceof String) {
            if(objValue.equals(String.valueOf(matchWith))) {
                return true;
            }
        } else if(objValue instanceof Long) {
            if(objValue.equals(Long.valueOf(matchWith))) {
                return true;
            }
        } else {
            return false;
        }
        return false;
    }

    private void sortRecords (BaseRequest baseRequest, List<?> result){
        Instant sortStart = Instant.now();
        if(baseRequest.getOrderByFields() != null) {
            Collections.sort(result, (one, two) -> {
                CompareToBuilder compToBuild = new CompareToBuilder();
                baseRequest.getOrderByFields().stream().forEachOrdered(sc -> {
                    String fieldName = sc.getFieldName();
                    String direction = sc.getSort();
                    Integer fv1 = (Integer) PropertyLocator.getPropertyValue(one, fieldName);
                    Integer fv2 = (Integer) PropertyLocator.getPropertyValue(two, fieldName);
                    if(direction.equalsIgnoreCase("ASC")) {
                        compToBuild.append(fv1, fv2);
                    }
                    if(direction.equalsIgnoreCase("DESC")) {
                        compToBuild.append(fv2, fv1);
                    }
                });
                return compToBuild.toComparison();
            });
        }
        Instant sortEnd = Instant.now();
        Duration timeElapsed = Duration.between(sortStart, sortEnd);
        LOGGER.info("Time taken for sorting by in milliseconds: "+ timeElapsed.toMillis());
    }

    private List<R> doRecordsGrouping(BaseRequest baseRequest, List<T> result, R responseClazz) {
        Instant groupByStart = Instant.now();
        Map<String, Integer> groupingResponse = new HashMap<>();
        result.forEach(res -> {
            StringBuilder key = new StringBuilder();
            for(String field: baseRequest.getGroupByFields()) {
                key.append(PropertyLocator.getPropertyValue(res, field));
                key.append(",");
            }
            key.deleteCharAt(key.length() - 1);
            if (groupingResponse.containsKey(key.toString())) {
                int count = groupingResponse.get(key.toString());
                groupingResponse.put(key.toString(), count + 1);
            } else {
                groupingResponse.put(key.toString(), 1);
            }
        });
        List<R> responseList = new ArrayList<>();
        groupingResponse.forEach((groupKey, groupValue) -> {
            String[] keys = groupKey.split(",");
            R res = null;
            try {
                Constructor<?>[] publicConstructors = Class.forName(responseClazz.getClass().getName()).getConstructors();
                for(Constructor constructor: publicConstructors) {
                    res = (R) constructor.newInstance();
                }
            } catch (ClassNotFoundException e) {
                throw new RuntimeException(e);
            } catch (InvocationTargetException e) {
                throw new RuntimeException(e);
            } catch (InstantiationException e) {
                throw new RuntimeException(e);
            } catch (IllegalAccessException e) {
                throw new RuntimeException(e);
            }
            for(int i = 0; i < keys.length; i++) {
                PropertyLocator.setPropertyValue(res, baseRequest.getGroupByFields()[i], keys[i]);
            }
            PropertyLocator.setPropertyValue(res, "groupByValue", groupValue);
            responseList.add((R) res);
        });
        Instant groupByEnd = Instant.now();
        Duration timeElapsed = Duration.between(groupByStart, groupByEnd);
        LOGGER.info("Time taken for grouping by in milliseconds: "+ timeElapsed.toMillis());
        return responseList;
    }
}
dsinghal-nice commented 8 months ago

Yes I tried that way, but when the number of users increase, the scan commands slows down and and even the JSON.GET also fetches the data slowly causing the latency to reach as high as 10 seconds which is not ideal in any scenario.

use FT.SEARCH

I don't have RediSearch, actually I am developing this solution on Elasticache for Redis

bsbodden commented 8 months ago

@dsinghal-nice I don't see any usage of Redis OM in the code above, am I missing something? It looks like you are just using Jedis from within a Spring App. If so, this question is in the wrong repo :-) Use https://github.com/redis/jedis instead

dsinghal-nice commented 8 months ago

@bsbodden Yes since I couldn't find any way to use OM library, as the methods currently use FT.SEARCH or FT.commands, which are not available. Is there a way in Redis OM to translate this code ?? Or I just have to go with low level client only ?

bsbodden commented 8 months ago

@dsinghal-nice if you want, if you crate a full-blown application with fake data I can attempt to translate to using OM and see if we can find ways around the scanning issues you are seeing, and whether search can improve it.