卡飞资源网

专业编程技术资源共享平台

基于 MySQL + Redis + RabbitMQ + Canal 保证消息一致性

下面是一个基于 MySQL + Redis + RabbitMQ + Canal 保证消息一致性的简化代码实现流程,按典型的电商场景(如“下单成功后发消息异步处理发货”)来设计

架构概览简述

  1. 下单服务写入 MySQL,发送消息到 RabbitMQ(通过事务消息机制)
  2. Canal 监听 binlog,读取下单成功数据
  3. 消费者服务消费 MQ 消息,处理业务逻辑(如扣减库存)并更新 Redis

1 下单服务(写数据库 + 发MQ消息)

// 模拟下单接口(MySQL + RabbitMQ 半事务实现)
func CreateOrder(db *sql.DB, mqChannel *amqp.Channel, order Order) error {
	tx, err := db.Begin()
	if err != nil {
		return err
	}

	// 插入订单数据
	_, err = tx.Exec("INSERT INTO orders(id, user_id, status) VALUES (?, ?, ?)", order.ID, order.UserID, "pending")
	if err != nil {
		tx.Rollback()
		return err
	}

	// 业务提交前确保写成功
	err = tx.Commit()
	if err != nil {
		return err
	}

	// 发送 MQ 消息(异步通知下游服务)
	msgBody, _ := json.Marshal(order)
	err = mqChannel.Publish(
		"order_exchange", // exchange
		"order.created",  // routing key
		false, false,
		amqp.Publishing{
			ContentType: "application/json",
			Body:        msgBody,
		},
	)
	return err
}

2 Canal 配置监听 orders 表变更

# instance config example
"tableRegex": "test_db\\.orders",
"rowFilterMode": true,
"async": false

3 消费服务监听 RabbitMQ 消息(完成业务逻辑)

func ConsumeOrderCreated(mqChannel *amqp.Channel, redisClient *redis.Client) {
	msgs, _ := mqChannel.Consume(
		"order_queue", "", true, false, false, false, nil,
	)

	for msg := range msgs {
		var order Order
		_ = json.Unmarshal(msg.Body, &order)

		// 业务处理,例如:更新 Redis 缓存、异步扣库存
		redisClient.Set(context.Background(), fmt.Sprintf("order:%d", order.ID), "created", 10*time.Minute)

		fmt.Println("Processed order:", order.ID)
	}
}

总结:一致性保障方式

环节

机制

说明

DB+MQ

Canal + binlog

确保消息发送不会遗漏

MQ 消费

手动 ack 或幂等操作

避免重复消费、消息丢失

Redis 更新

来自 MQ 消息触发

与 DB 状态解耦,确保最终一致性

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言