1. 缓存管理器实现
```rust
//! 缓存管理器实现
//!
//! 提供分布式 Redis 缓存的通用模式
use redis::{aio::ConnectionManager, AsyncCommands};
use serde::{de::DeserializeOwned, Serialize};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
/// 缓存管理器
pub struct CacheManager {
/// 配置
config: CacheConfig,
/// Redis 连接管理器
redis: Option,
/// 统计信息
stats: Arc>,
}
impl CacheManager {
/// 创建新的缓存管理器(带超时控制)
pub async fn new(config: CacheConfig) -> Result {
let redis = if config.enabled && config.redis.enabled {
// 创建 Redis 客户端
let client = redis::Client::open(config.redis.url.as_str())
.map_err(|e| CacheError::Connection(format!("Redis 连接失败: {}", e)))?;
// 超时控制(适应远程 Redis)
let timeout = Duration::from_secs(30);
match tokio::time::timeout(timeout, ConnectionManager::new(client)).await {
Ok(Ok(conn)) => Some(conn),
Ok(Err(e)) => return Err(CacheError::Connection(format!("Redis 连接失败: {}", e))),
Err(_) => return Err(CacheError::Timeout(format!("Redis 连接超时({}秒)", timeout.as_secs()))),
}
} else {
None
};
Ok(Self {
config,
redis,
stats: Arc::new(RwLock::new(CacheStats::new())),
})
}
/// 获取缓存
pub async fn get(&self, key: &str) -> Result
if !self.config.enabled {
return Ok(None);
}
// 增加请求计数
{
let mut stats = self.stats.write().await;
stats.total_requests += 1;
}
if let Some(mut redis) = self.redis.clone() {
match redis.get::<&str, Vec>(key).await {
Ok(bytes) if !bytes.is_empty() => {
// 更新统计
{
let mut stats = self.stats.write().await;
stats.redis_hits += 1;
}
self.deserialize(&bytes)
}
Ok(_) => {
let mut stats = self.stats.write().await;
stats.redis_misses += 1;
Ok(None)
}
Err(e) => {
log::warn!("Redis 读取失败: key={}, error={}", key, e);
let mut stats = self.stats.write().await;
stats.redis_misses += 1;
Ok(None) // 缓存失败不应影响业务
}
}
} else {
Ok(None)
}
}
/// 设置缓存(带 TTL)
pub async fn set(
&self,
key: &str,
value: &T,
ttl: Option,
) -> Result<(), CacheError> {
if !self.config.enabled {
return Ok(());
}
let bytes = self.serialize(value)?;
if let Some(mut redis) = self.redis.clone() {
let ttl_seconds = ttl.unwrap_or(self.config.default_ttl);
match redis.set_ex::<&str, Vec, ()>(key, bytes, ttl_seconds).await {
Ok(_) => log::debug!("Redis 写入: key={}, ttl={}s", key, ttl_seconds),
Err(e) => log::warn!("Redis 写入失败: key={}, error={}", key, e),
}
}
Ok(())
}
/// 删除缓存
pub async fn delete(&self, key: &str) -> Result<(), CacheError> {
if let Some(mut redis) = self.redis.clone() {
match redis.del::<&str, ()>(key).await {
Ok(_) => log::debug!("Redis 删除: {}", key),
Err(e) => log::warn!("Redis 删除失败: key={}, error={}", key, e),
}
}
Ok(())
}
/// 序列化
fn serialize(&self, value: &T) -> Result, CacheError> {
serde_json::to_vec(value).map_err(|e| {
CacheError::Serialization(format!("序列化失败: {}", e))
})
}
/// 反序列化
fn deserialize(&self, bytes: &[u8]) -> Result
if bytes.is_empty() {
return Ok(None);
}
match serde_json::from_slice(bytes) {
Ok(value) => Ok(Some(value)),
Err(e) => {
log::warn!("反序列化失败: {}", e);
Ok(None) // 损坏数据应跳过,不返回错误
}
}
}
}
/// 缓存统计信息
#[derive(Debug, Clone, Default)]
pub struct CacheStats {
pub total_requests: u64,
pub redis_hits: u64,
pub redis_misses: u64,
}
impl CacheStats {
pub fn new() -> Self {
Self::default()
}
/// 命中率
pub fn hit_rate(&self) -> f64 {
if self.total_requests == 0 {
0.0
} else {
self.redis_hits as f64 / self.total_requests as f64 * 100.0
}
}
}
/// 缓存错误类型
#[derive(Debug, thiserror::Error)]
pub enum CacheError {
#[error("连接错误: {0}")]
Connection(String),
#[error("超时错误: {0}")]
Timeout(String),
#[error("序列化错误: {0}")]
Serialization(String),
#[error("Redis 错误: {0}")]
Redis(#[from] redis::RedisError),
}
```
2. 缓存键设计模式
```rust
/// 缓存键生成器
///
/// 设计原则:
/// 1. 使用命名空间前缀避免 key 冲突
/// 2. 包含业务标识便于问题排查
/// 3. 支持版本控制便于缓存更新
pub struct CacheKeyBuilder;
impl CacheKeyBuilder {
/// 构建带命名空间的键
/// 格式: {namespace}:{entity}:{id}
pub fn build(namespace: &str, entity: &str, id: impl std::fmt::Display) -> String {
format!("{}:{}:{}", namespace, entity, id)
}
/// 构建列表缓存键
/// 格式: {namespace}:{entity}:list:{query_hash}
pub fn list_key(namespace: &str, entity: &str, query: &str) -> String {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(query.as_bytes());
let hash = format!("{:x}", hasher.finalize());
format!("{}:{}:list:{}", namespace, entity, &hash[..8])
}
/// 构建模式匹配键
/// 格式: {namespace}:{entity}:*
pub fn pattern(namespace: &str, entity: &str) -> String {
format!("{}:{}:*", namespace, entity)
}
/// 构建版本化键
/// 格式: {namespace}:{entity}:{id}:v{version}
pub fn versioned(namespace: &str, entity: &str, id: impl std::fmt::Display, version: u64) -> String {
format!("{}:{}:{}:v{}", namespace, entity, id, version)
}
}
```
3. 批量缓存删除模式
```rust
impl CacheManager {
/// 批量删除缓存(支持模式匹配)
///
/// 使用 SCAN 命令遍历删除,避免 DEL 阻塞
pub async fn delete_pattern(&self, pattern: &str) -> Result {
let mut deleted_count = 0;
if let Some(redis) = &self.redis {
let mut cursor: u64 = 0;
loop {
let result: std::result::Result<(u64, Vec), redis::RedisError> =
redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(pattern)
.arg("COUNT")
.arg(100)
.query_async(&mut redis.clone())
.await;
match result {
Ok((new_cursor, keys)) => {
if !keys.is_empty() {
let del_result: std::result::Result<(), redis::RedisError> =
redis::cmd("DEL").arg(&keys).query_async(&mut redis.clone()).await;
if del_result.is_ok() {
deleted_count += keys.len();
}
}
cursor = new_cursor;
if cursor == 0 {
break;
}
}
Err(e) => {
log::warn!("Redis SCAN 失败: {}", e);
break;
}
}
}
log::info!("批量删除完成: pattern={}, deleted={}", pattern, deleted_count);
}
Ok(deleted_count)
}
}
```
---