我用Rust制作了一个使用Redis进行批处理调度的库
的是什么?
你的目标是什么?
你希望实现什么?
你想要达到什么?
当启动批处理并进行处理时,希望确保冗余性并仅由一个进程在多台上启动和执行批处理。我们已经在PostgreSQL和DynamoDB上尝试过了,但这次我们将在Redis上尝试一下。
crate.io上的页面
我也会介绍以前在DynamoDB上做过同样事情的文章。
在AWS的ECS上执行多个批处理任务时需要互斥。
代码 (daima)
Redis意味着缓存数据库。
在Redis中,我们使用哈希来存储数据。通过指定哈希字段为批处理的名称,并将值设为批处理的启动时间来保存数据。
首先我们会检查该字段是否存在值,如果不存在则直接写入数据。
如果存在值的情况下,我们会判断是否已经经过了一定的时间,如果已经经过,则进行写入操作,但如果没有经过则无法执行该批处理任务。
由于在Rust代码中以原子方式执行上述处理非常困难,因此我们使用Redis内部的Lua脚本来执行处理。
local key = KEYS[1]
local field = KEYS[2]
local start_utsms = tonumber(ARGV[1])
local lock_milli_second = tonumber(ARGV[2])
local value = redis.call('HGET', key, field)
if value then
local current_start_utsms = tonumber(value)
if current_start_utsms + lock_milli_second < start_utsms then
redis.call('HSET', key, field, start_utsms)
return "1"
end
else
redis.call('HSET', key, field, start_utsms)
return "1"
end
生锈
在Rust中,目前只是获取当前时间并调用Lua脚本。它返回一个布尔值来判断是否可以执行批处理,需要查看Lua的返回值。
用Rust调用Redis的Lua脚本示例可能很适合。通过Lua脚本,可以传递多个key和arg。在Lua脚本内部,可以通过KEYS和ARGV进行访问。由于key和arg之间几乎没有区别,所以没有特别需要注意的差别。
考试是呼叫方法的示例。
[package]
name = "redis-batches"
version = "0.1.2"
edition = "2021"
[dependencies]
once_cell = "1.15.0"
redis = { version = "0.22", features = ["script", "aio"]}
[features]
default = ["tokio-comp"]
tokio-comp = ["redis/tokio-comp"]
async-std-comp = ["redis/async-std-comp"]
[dev-dependencies]
tokio = { version = "1", features = ["full"] }
use once_cell::sync::Lazy;
use redis::Script;
use std::time::SystemTime;
static LOCK_SCRIPT: Lazy<Script> =
Lazy::new(|| Script::new(include_str!("./redis_scripts/batches_lock.lua")));
pub async fn lock(
key: &str,
field: &str,
lock_milli_second: u128,
redis_conn: &mut redis::aio::Connection,
) -> Result<bool, redis::RedisError> {
let start_utsms = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis();
let mut invocation = LOCK_SCRIPT.key(key);
invocation.key(field);
invocation.arg(start_utsms.to_string());
invocation.arg(lock_milli_second.to_string());
let res: Option<String> = invocation.invoke_async(redis_conn).await?;
Ok(res.is_some())
}
#[cfg(test)]
mod tests {
use crate::lock;
#[tokio::test]
async fn it_works() {
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let mut con = client.get_async_connection().await.unwrap();
let res1 = lock("mutex", "test", 1000, &mut con).await.unwrap();
assert_eq!(true, res1);
let res2 = lock("mutex", "test", 1000, &mut con).await.unwrap();
assert_eq!(false, res2);
tokio::time::sleep(tokio::time::Duration::from_millis(1001)).await;
let res3 = lock("mutex", "test", 1000, &mut con).await.unwrap();
assert_eq!(true, res3);
tokio::time::sleep(tokio::time::Duration::from_millis(1001)).await;
}
}
总结
完成处理后考虑解除锁定,但意识到RAII,在Drop函数中调用async,但它无法正常工作。
如果是定期启动且执行时间较短的批处理任务,我认为即使不解锁也不会有问题。