整体设计概览
┌────────────┐ ┌────────────────┐ ┌────────────┐
│ 配置后台服务 │ --写入-->│ Redis 延时队列 ZSET │ │ 微服务A/B/... │
└────────────┘ └────────────────┘ └────────────┘
│ │ ▲
▼ ▼ │
定时扫描ZSET 到时触发推送 NSQ 订阅配置Topic
│ │ │
└────> 发送配置消息到 NSQ Topic ─┴──── 通过 NSQ 消费并应用配置
实现核心思路
1. Redis 延时队列实现(使用 ZSET)
ZSET key:config:delayed:push
成员内容(value):配置 JSON 或配置 ID
score(时间戳):生效时间(Unix 秒/毫秒)
ZADD config:delayed:push 1717665600 '{"key":"switch","value":"on"}'
2. 后台服务:定时扫描 Redis,检查到期的任务并推送到 NSQ
func ScanAndPushExpiredConfigs() {
now := time.Now().Unix()
// 取出所有 score <= 当前时间 的任务
cfgs, err := rdb.ZRangeByScore(ctx, "config:delayed:push", &redis.ZRangeBy{
Min: "0",
Max: fmt.Sprintf("%d", now),
Offset: 0,
Count: 100,
}).Result()
if err != nil {
log.Println("ZRangeByScore err:", err)
return
}
for _, cfgJson := range cfgs {
go func(cfgStr string) {
// 从 ZSET 中移除
rdb.ZRem(ctx, "config:delayed:push", cfgStr)
// 发消息到 NSQ
SendToNSQ("config_push", []byte(cfgStr))
}(cfgJson)
}
}
可以用 time.Ticker 每 30s 调用一次 ScanAndPushExpiredConfigs()。
3. 推送配置到 NSQ
var nsqProducer *nsq.Producer
func InitNSQProducer() {
config := nsq.NewConfig()
p, err := nsq.NewProducer("127.0.0.1:4150", config)
if err != nil {
log.Fatal(err)
}
nsqProducer = p
}
func SendToNSQ(topic string, data []byte) {
err := nsqProducer.Publish(topic, data)
if err != nil {
log.Println("Failed to publish:", err)
}
}
4. 微服务端:监听 NSQ Topic 接收配置并更新内存
type AppConfig struct {
Key string
Value string
}
func HandleConfigPush(message *nsq.Message) error {
var cfg AppConfig
json.Unmarshal(message.Body, &cfg)
log.Printf("Received config: %+v", cfg)
// 更新本地配置缓存,比如 atomic.Value 替换
LoadConfig(&cfg)
return nil
}
func InitNSQConsumer() {
config := nsq.NewConfig()
consumer, _ := nsq.NewConsumer("config_push", "channel1", config)
consumer.AddHandler(nsq.HandlerFunc(HandleConfigPush))
err := consumer.ConnectToNSQD("127.0.0.1:4150")
if err != nil {
log.Fatal(err)
}
}
Redis 延时队列 VS NSQ 自身延时投递
点位 | Redis 延时队列 | NSQ 延时投递(无原生) |
消息调度准确性 | 精确可控(ZSET score) | 需应用层模拟 |
投递可靠性 | 可持久存储,支持幂等处理 | NSQ 偏向实时流处理 |
调度灵活性 | 多条件筛选(score、tag等) | 不支持 |
架构复杂度 | 需轮询 Redis 扫描 | 稍简,但功能受限 |
总结整体实现流程
- 后台收到配置变更(含 effect_time);
- 把配置序列化成 JSON,写入 Redis ZSET,score 为生效时间戳;
- 后台服务定时轮询 Redis;
- 到期配置被发送到 NSQ;
- 各个微服务监听 NSQ topic,收到推送后热更新内存配置。