richfitz / redux

:telephone_receiver::computer: Redis client for R
https://richfitz.github.io/redux
89 stars 17 forks source link

Add support for Redis 5.0.x X* commands #43

Open richfitz opened 3 years ago

richfitz commented 3 years ago

Omitted from #41 as the basic interface generated did not seem usable. This needs input from someone who is using these.

Note that these can be manually used using $command()

con$command(list("XINFO", "STREAM", "mystream"))
acksmaggart commented 3 years ago

Hey @richfitz, first of all thanks for the excellent package. It has been a huge leg up. Second, we find ourselves using the X* functions via con$command as you suggest. I don't have the bandwidth to be very involved, but in case it is helpful and/or for anyone else coming across this issue here are some code snippets that implement rough versions of XADD, XACK, and XREADGROUP. They all take a redis connection as their first argument. Let me know if there is any other context or info that would be helpful.

XREADGROUP <- function(r, group, consumer, streams, count=NULL, block=NULL, noack=FALSE){
  args <- list("XREADGROUP", "GROUP", group, consumer)
  if (!is.null(count)){
    args <- c(args, c("COUNT", count))
  }
  if (!is.null(block)){
    args <- c(args, c("BLOCK", block))
  } 
  if (noack){
    args <- c(args, "NOACK")
  }
  args <- c(args, "STREAMS")
  args <- c(args, names(streams))
  args <- c(args, streams)
  redis_response <- r$command(args)
  if (is.null(redis_response)){
    # No new messages
    return(redis_response)
  }
  # redis_response is a list of streams:
    # each stream is a list with two items:
      # 1. A Stream Name
      # 2. A list of messages where each messages is a list with two items:
        # 1. Message id
        # 2. A list of key-value pairs that alternates key, value, key, value...
  out <- list()
  for (stream_section_raw in redis_response){
    stream_section_out <- list()
    stream_name <- stream_section_raw[[1]]
    stream_section_out[["stream"]] <- stream_name
    messages <- list()
    for (i in 1:length(stream_section_raw[[2]])){
      message_raw <- stream_section_raw[[2]][[i]]
      # The payload is a list that alternates key, value, key, value...
      keys <- message_raw[[2]][c(TRUE, FALSE)]
      values <- message_raw[[2]][c(FALSE, TRUE)]
      names(values) <- keys
      message_out <- list(
        id=message_raw[[1]],
        payload=values
      )
      messages[[i]] <- message_out
    }
    stream_section_out[['messages']] <- messages
    out[[stream_name]] <- stream_section_out
  }
  return(out)
}

XADD <- function(r, stream, values, id="*", maxlength=NULL){
  args = list("XADD", stream)
  if (!is.null(maxlength)){
    args <- c(args, "MAXLEN", "~", maxlength)
  }
  args <- c(args, id)
  for (field in names(values)){
    args <- c(args, field, values[[field]])
  }
  return(r$command(args))
}

XACK <- function(r, stream, group, ids){
  args = list("XACK", stream, group)
  args <- c(args, ids)
  return(r$command(args))
}

### Usage
# Create the consumer group "r-worker" on the stream "r-test"
r$command(list("XGROUP", "CREATE", "r-test", "r-worker", "$", "MKSTREAM"))
# Add messages to the "r-test" stream
x1 <- XADD(r, "r-test", list("value1"="apples", "value2"="something"))
x2 <- XADD(r, "r-test", list("value1"="bananas", "value2"="something else"))
# Read messages from the "r-test" stream from consumer group "r-worker" as consumer "consumer1"
y <- XREADGROUP(r, "r-worker", "consumer1", list("r-test"=">", "r-test2"=">"))
print(x[['r-test']]$messages[[1]]$payload$value1) # Prints 'apples'
print(x[['r-test']]$messages[[2]]$payload$value1) # Prints 'bananas'
# Acknowledge the messages
z <- XACK(r, "r-test", "r-worker", c(x1, x2))
skadaman commented 1 year ago

@MaxTaggart thank you for the head start and examples. @richfitz if you still need help here I would be interested in contributing as I need to create Redis Stream interface for communication between a web application and R models.