卡飞资源网

专业编程技术资源共享平台

配置热更新系统

整体设计概览

┌────────────┐           ┌────────────────┐         ┌────────────┐
│ 配置后台服务 │  --写入-->│ Redis 延时队列 ZSET │         │  微服务A/B/... │
└────────────┘           └────────────────┘         └────────────┘
     │                                │                     ▲
     ▼                                ▼                     │
定时扫描ZSET                          到时触发推送         NSQ 订阅配置Topic
     │                                │                     │
     └────> 发送配置消息到 NSQ Topic ─┴──── 通过 NSQ 消费并应用配置

实现核心思路

1. Redis 延时队列实现(使用 ZSET)

ZSET key:config:delayed:push

成员内容(value):配置 JSON 或配置 ID

score(时间戳):生效时间(Unix 秒/毫秒)

ZADD config:delayed:push 1717665600 '{"key":"switch","value":"on"}'

2. 后台服务:定时扫描 Redis,检查到期的任务并推送到 NSQ

func ScanAndPushExpiredConfigs() {
    now := time.Now().Unix()
    // 取出所有 score <= 当前时间 的任务
    cfgs, err := rdb.ZRangeByScore(ctx, "config:delayed:push", &redis.ZRangeBy{
        Min:    "0",
        Max:    fmt.Sprintf("%d", now),
        Offset: 0,
        Count: 100,
    }).Result()
    if err != nil {
        log.Println("ZRangeByScore err:", err)
        return
    }

    for _, cfgJson := range cfgs {
        go func(cfgStr string) {
            // 从 ZSET 中移除
            rdb.ZRem(ctx, "config:delayed:push", cfgStr)
            // 发消息到 NSQ
            SendToNSQ("config_push", []byte(cfgStr))
        }(cfgJson)
    }
}

可以用 time.Ticker 每 30s 调用一次 ScanAndPushExpiredConfigs()。

3. 推送配置到 NSQ


var nsqProducer *nsq.Producer

func InitNSQProducer() {
    config := nsq.NewConfig()
    p, err := nsq.NewProducer("127.0.0.1:4150", config)
    if err != nil {
        log.Fatal(err)
    }
    nsqProducer = p
}

func SendToNSQ(topic string, data []byte) {
    err := nsqProducer.Publish(topic, data)
    if err != nil {
        log.Println("Failed to publish:", err)
    }
}

4. 微服务端:监听 NSQ Topic 接收配置并更新内存

type AppConfig struct {
    Key   string
    Value string
}

func HandleConfigPush(message *nsq.Message) error {
    var cfg AppConfig
    json.Unmarshal(message.Body, &cfg)
    log.Printf("Received config: %+v", cfg)

    // 更新本地配置缓存,比如 atomic.Value 替换
    LoadConfig(&cfg)
    return nil
}

func InitNSQConsumer() {
    config := nsq.NewConfig()
    consumer, _ := nsq.NewConsumer("config_push", "channel1", config)
    consumer.AddHandler(nsq.HandlerFunc(HandleConfigPush))
    err := consumer.ConnectToNSQD("127.0.0.1:4150")
    if err != nil {
        log.Fatal(err)
    }
}

Redis 延时队列 VS NSQ 自身延时投递

点位

Redis 延时队列

NSQ 延时投递(无原生)

消息调度准确性

精确可控(ZSET score)

需应用层模拟

投递可靠性

可持久存储,支持幂等处理

NSQ 偏向实时流处理

调度灵活性

多条件筛选(score、tag等)

不支持

架构复杂度

需轮询 Redis 扫描

稍简,但功能受限

总结整体实现流程

  1. 后台收到配置变更(含 effect_time);
  2. 把配置序列化成 JSON,写入 Redis ZSET,score 为生效时间戳;
  3. 后台服务定时轮询 Redis;
  4. 到期配置被发送到 NSQ;
  5. 各个微服务监听 NSQ topic,收到推送后热更新内存配置。
控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言