Open Bajix opened 4 years ago
Checkout this example: https://github.com/RedisLabsModules/redismodule-rs/blob/master/examples/timer.rs
I already made timers that work when ran by commands. I was hoping for a lazy solution so that there didn't need to be a sidekick service that needed to run a startup command
use std::os::raw::c_int;
use itertools::Itertools;
use rayon::prelude::*;
use redis_module::raw as rawmod;
use std::{convert::TryFrom, time::Duration};
#[macro_use]
extern crate redis_module;
use redis_module::{
parse_float, parse_integer, parse_unsigned_integer, Context, NextArg, RedisError, RedisResult,
RedisValue,
};
static DECAY_FREQUENCY: i32 = 5000; // 5 seconds
static HALF_LIFE: i32 = 604800000; // 7 days
fn decay_tick(ctx: &Context, entity_type: String) {
let zset_key = format!("{}::scores", entity_type);
let zset = ctx
.call("ZRANGE", &[&zset_key, "0", "-1", "WITHSCORES"])
.unwrap();
if let RedisValue::Array(values) = zset {
let members = values.par_iter().step_by(2).map(|member| match member {
RedisValue::SimpleString(value) => value,
_ => panic!(),
});
let scores = values
.par_iter()
.skip(1)
.step_by(2)
.map(|score| match score {
RedisValue::SimpleString(value) => {
let score = parse_float(&*value).unwrap();
let decay_rate = 2_f64.powf(-DECAY_FREQUENCY as f64 / HALF_LIFE as f64);
score * decay_rate
}
_ => panic!(),
});
let data: Vec<_> = members.zip(scores).collect();
for (member, score) in data.iter() {
ctx
.call("ZADD", &[&zset_key, "XX", &score.to_string(), member])
.unwrap();
}
}
queue_next_tick(&ctx, &entity_type);
}
fn queue_next_tick(ctx: &Context, entity_type: &String) {
let timer_id = ctx.create_timer(
Duration::from_millis(DECAY_FREQUENCY as u64),
decay_tick,
entity_type.clone(),
);
ctx
.call(
"HSET",
&["decay::timers", &entity_type, &timer_id.to_string()],
)
.unwrap();
}
fn start_decay(ctx: &Context, args: Vec<String>) -> RedisResult {
let mut args = args.into_iter().skip(1);
let entity_type = args.next_string()?;
queue_next_tick(&ctx, &entity_type);
Ok(RedisValue::Integer(1))
}
fn clear_decay(ctx: &Context, args: Vec<String>) -> RedisResult {
let mut args = args.into_iter().skip(1);
let entity_type = args.next_string()?;
let timer = ctx.call("HGET", &["decay::timers", &entity_type])?;
if let RedisValue::SimpleString(timer_id) = timer {
let timer_id = parse_unsigned_integer(&timer_id)?;
let _entity_type: String = ctx.stop_timer(timer_id)?;
return Ok(RedisValue::Integer(1));
}
Ok(RedisValue::Integer(0))
}
pub extern "C" fn init(raw_ctx: *mut rawmod::RedisModuleCtx) -> c_int {
let ctx = Context::new(raw_ctx);
queue_next_tick(&ctx, &String::from("posts"));
queue_next_tick(&ctx, &String::from("pages"));
rawmod::REDISMODULE_OK as c_int
}
redis_module! {
name: "trending",
version: 1,
data_types: [],
init: init,
commands: [
["trending.start_decay", start_decay, "", 1, 1, 1],
["trending.clear_decay", clear_decay, "", 1, 1, 1]
],
}
This specifically becomes problematic with the following lines and works otherwise:
queue_next_tick(&ctx, &String::from("posts"));
queue_next_tick(&ctx, &String::from("pages"));
This seems to crash when I queue a timer during init, but perhaps I'm just doing a bad init function?
@Bajix did you find the issue?
It looks like init is defined within the redis_module! macro but I don't see it documented anywhere. My module requires background timers, and so it would be ideal to start this using init, though this can also be done with a startup command