我用Rust制作了一个使用Redis进行批处理调度的库

的是什么?
你的目标是什么?
你希望实现什么?
你想要达到什么?

当启动批处理并进行处理时,希望确保冗余性并仅由一个进程在多台上启动和执行批处理。我们已经在PostgreSQL和DynamoDB上尝试过了,但这次我们将在Redis上尝试一下。

crate.io上的页面

我也会介绍以前在DynamoDB上做过同样事情的文章。

在AWS的ECS上执行多个批处理任务时需要互斥。

代码 (daima)

Redis意味着缓存数据库。

在Redis中,我们使用哈希来存储数据。通过指定哈希字段为批处理的名称,并将值设为批处理的启动时间来保存数据。
首先我们会检查该字段是否存在值,如果不存在则直接写入数据。
如果存在值的情况下,我们会判断是否已经经过了一定的时间,如果已经经过,则进行写入操作,但如果没有经过则无法执行该批处理任务。

由于在Rust代码中以原子方式执行上述处理非常困难,因此我们使用Redis内部的Lua脚本来执行处理。

local key = KEYS[1]
local field = KEYS[2]
local start_utsms = tonumber(ARGV[1])
local lock_milli_second = tonumber(ARGV[2])
local value = redis.call('HGET', key, field)
if value then
  local current_start_utsms = tonumber(value)
  if current_start_utsms + lock_milli_second < start_utsms then
    redis.call('HSET', key, field, start_utsms)
    return "1"
  end
else
  redis.call('HSET', key, field, start_utsms)
  return "1"
end

生锈

在Rust中,目前只是获取当前时间并调用Lua脚本。它返回一个布尔值来判断是否可以执行批处理,需要查看Lua的返回值。

用Rust调用Redis的Lua脚本示例可能很适合。通过Lua脚本,可以传递多个key和arg。在Lua脚本内部,可以通过KEYS和ARGV进行访问。由于key和arg之间几乎没有区别,所以没有特别需要注意的差别。

考试是呼叫方法的示例。

[package]
name = "redis-batches"
version = "0.1.2"
edition = "2021"

[dependencies]
once_cell = "1.15.0"
redis = { version = "0.22", features = ["script", "aio"]}

[features]
default = ["tokio-comp"]
tokio-comp = ["redis/tokio-comp"]
async-std-comp = ["redis/async-std-comp"]

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
use once_cell::sync::Lazy;
use redis::Script;
use std::time::SystemTime;

static LOCK_SCRIPT: Lazy<Script> =
    Lazy::new(|| Script::new(include_str!("./redis_scripts/batches_lock.lua")));

pub async fn lock(
    key: &str,
    field: &str,
    lock_milli_second: u128,
    redis_conn: &mut redis::aio::Connection,
) -> Result<bool, redis::RedisError> {
    let start_utsms = SystemTime::now()
        .duration_since(SystemTime::UNIX_EPOCH)
        .unwrap()
        .as_millis();
    let mut invocation = LOCK_SCRIPT.key(key);
    invocation.key(field);
    invocation.arg(start_utsms.to_string());
    invocation.arg(lock_milli_second.to_string());
    let res: Option<String> = invocation.invoke_async(redis_conn).await?;
    Ok(res.is_some())
}

#[cfg(test)]
mod tests {
    use crate::lock;

    #[tokio::test]
    async fn it_works() {
        let client = redis::Client::open("redis://127.0.0.1/").unwrap();
        let mut con = client.get_async_connection().await.unwrap();
        let res1 = lock("mutex", "test", 1000, &mut con).await.unwrap();
        assert_eq!(true, res1);
        let res2 = lock("mutex", "test", 1000, &mut con).await.unwrap();
        assert_eq!(false, res2);
        tokio::time::sleep(tokio::time::Duration::from_millis(1001)).await;
        let res3 = lock("mutex", "test", 1000, &mut con).await.unwrap();
        assert_eq!(true, res3);
        tokio::time::sleep(tokio::time::Duration::from_millis(1001)).await;
    }
}

总结

完成处理后考虑解除锁定,但意识到RAII,在Drop函数中调用async,但它无法正常工作。
如果是定期启动且执行时间较短的批处理任务,我认为即使不解锁也不会有问题。

广告
将在 10 秒后关闭
bannerAds