下面是一个基于 MySQL + Redis + RabbitMQ + Canal 保证消息一致性的简化代码实现流程,按典型的电商场景(如“下单成功后发消息异步处理发货”)来设计
架构概览简述
- 下单服务写入 MySQL,发送消息到 RabbitMQ(通过事务消息机制)
- Canal 监听 binlog,读取下单成功数据
- 消费者服务消费 MQ 消息,处理业务逻辑(如扣减库存)并更新 Redis
1 下单服务(写数据库 + 发MQ消息)
// 模拟下单接口(MySQL + RabbitMQ 半事务实现)
func CreateOrder(db *sql.DB, mqChannel *amqp.Channel, order Order) error {
tx, err := db.Begin()
if err != nil {
return err
}
// 插入订单数据
_, err = tx.Exec("INSERT INTO orders(id, user_id, status) VALUES (?, ?, ?)", order.ID, order.UserID, "pending")
if err != nil {
tx.Rollback()
return err
}
// 业务提交前确保写成功
err = tx.Commit()
if err != nil {
return err
}
// 发送 MQ 消息(异步通知下游服务)
msgBody, _ := json.Marshal(order)
err = mqChannel.Publish(
"order_exchange", // exchange
"order.created", // routing key
false, false,
amqp.Publishing{
ContentType: "application/json",
Body: msgBody,
},
)
return err
}
2 Canal 配置监听 orders 表变更
# instance config example
"tableRegex": "test_db\\.orders",
"rowFilterMode": true,
"async": false
3 消费服务监听 RabbitMQ 消息(完成业务逻辑)
func ConsumeOrderCreated(mqChannel *amqp.Channel, redisClient *redis.Client) {
msgs, _ := mqChannel.Consume(
"order_queue", "", true, false, false, false, nil,
)
for msg := range msgs {
var order Order
_ = json.Unmarshal(msg.Body, &order)
// 业务处理,例如:更新 Redis 缓存、异步扣库存
redisClient.Set(context.Background(), fmt.Sprintf("order:%d", order.ID), "created", 10*time.Minute)
fmt.Println("Processed order:", order.ID)
}
}
总结:一致性保障方式
环节 | 机制 | 说明 |
DB+MQ | Canal + binlog | 确保消息发送不会遗漏 |
MQ 消费 | 手动 ack 或幂等操作 | 避免重复消费、消息丢失 |
Redis 更新 | 来自 MQ 消息触发 | 与 DB 状态解耦,确保最终一致性 |