Spring WebFlux实战
前面通过一个促销活动的例子展示了Spring Boot的微服务开发过程。本节将采用Spring WebFlux框架重新改造一下促销活动的微服务项目。
在前面,microservice-promotion项目是基于Spring Boot开发的,这里将使用Spring WebFlux框架进行项目改造,并进行完整的代码展示。
(1)在pom.xml文件中添加相关依赖,引入spring-boot
starter-data-redis-reactive包,具体依赖如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://
www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://
maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter
parent</artifactId>
<version>2.3.9.RELEASE</version>
<relativePath/> <!--lookup parent from
repository -->
</parent>
<groupId>com.example.microservice.promotion</groupId>
<artifactId>microservice-promotion</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>microservice-promotion</name>
<description>microservice-promotion project for
Spring Boot
</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis
reactive
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter
web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter
test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit
vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter
actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter
log4j2</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba
dependencies</artifactId>
<version>2.2.5.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos
config
</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos
discovery
</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel
</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot
maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
(2)修改application.xml配置文件,在其中配置数据库连接方式,代码如下:
server:
port: 8081
spring:
application:
name: microservice-promotion
(3)由于集成了Nacos和Sentinel中间件,因此需要修改bootstrap.xml配置文件,代码如下:
spring:
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
ip: 127.0.0.1
port: 80
namespace: 40421527-56ff-410b-8ca8-e025aca9e946
group: default
config:
server-addr: 127.0.0.1:8848
file-extension: properties
namespace: 40421527-56ff-410b-8ca8-e025aca9e946
group: default
sentinel:
enabled: true
transport:
dashboard: 127.0.0.1:8888
clientIp: 127.0.0.1
port: 8719
log:
dir: /log/sentinel
filter:
enabled: false
management:
endpoint:
metrics:
enabled: true
prometheus:
enabled: true
endpoints:
web:
base-path: /
exposure:
include: health,info,status,prometheus
metrics:
export:
prometheus:
enabled: true
tags:
application: ${spring.application.name}
web:
server:
request:
autotime:
enabled: true
percentiles-histogram: on
percentiles:
-0.9
-0.99
client:
request:
autotime:
enabled: true
percentiles-histogram: on
percentiles:
-0.9
-0.99
(4)本例使用log4j2日志架构,配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<properties>
<property name="LOG_HOME">/log</property>
</properties>
<Appenders>
<Console name="CONSOLE" target="SYSTEM_OUT" >
<PatternLayout pattern="%d{yyyy-MM-dd
HH:mm:ss.SSS} %-5p
[%t] %c{1.} %msg%n"/>
</Console>
<RollingRandomAccessFile name="INFO_FILE" fileName=
"${LOG_HOME}/info.log"
filePattern="${LOG_HOME}/info-%d{HH}-
%i.log" immediateFlush="true">
<PatternLayout pattern="%d{yyyy-MM-dd
HH:mm:ss.SSS}
[%traceId] %-5p %c{1.} %msg%n"/>
<Policies>
<TimeBasedTriggeringPolicy />
</Policies>
<DefaultRolloverStrategy max="1"/>
<Filters>
<ThresholdFilter level="error"
onMatch="ACCEPT"
onMismatch="NEUTRAL"/>
<ThresholdFilter level="info"
onMatch="ACCEPT"
onMismatch="DENY"/>
</Filters>
</RollingRandomAccessFile>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="CONSOLE" />
<AppenderRef ref="INFO_FILE" />
</Root>
</Loggers>
</Configuration>
(5)将Redis配置信息集成到Nacos上,具体的Redis信息如下:
redis.promotion.host=127.0.0.1
redis.promotion.port=6379
redis.promotion.password=test
redis.promotion.maxTotal=2000
redis.promotion.maxIdle=100
redis.promotion.minIdle=40
redis.promotion.maxWaitMillis=3000
redis.promotion.timeBetweenEvictionRunsMillis=30000
redis.promotion.commandTimeout=3000
(6)Redis自动配置如下:
新建RedisProperties.class文件,代码如下:
package com.example.promotion.config;
import lombok.Data;
import
org.springframework.boot.context.properties.Configuration
Properties;
@Data
@ConfigurationProperties(prefix = "redis")
public class RedisProperties {
private RedisInfo promotion;
@Data
public static class RedisInfo{
protected int maxTotal = 2000; //最大连接
数
protected int maxIdle = 100; //最大空闲
数
protected int minIdle = 40; //最小空闲
数
Protected int maxWaitMillis = 3000; //最长等待
时间
//空闲回收休眠时间
protected int timeBetweenEvictionRunsMillis = 30000;
protected int commandTimeout = 3000; //命令
执行超时时间
private String host;
//Redis地址
private int port;
//Redis端口
private String password;
//Redis密码
}
}
新建
RedisAutoConfiguration.class文件,代码如下:
package com.example.promotion.config;
import java.time.Duration;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import
org.springframework.boot.autoconfigure.condition.Conditional
OnClass;
import
org.springframework.boot.autoconfigure.condition.Conditional
OnProperty;
import
org.springframework.boot.context.properties.EnableConfigurati
on
Properties;
import
org.springframework.cloud.context.config.annotation.Refresh
Scope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import
org.springframework.data.redis.connection.RedisStandalone
Configuration;
import
org.springframework.data.redis.connection.lettuce.Lettuce
ClientConfiguration;
import
org.springframework.data.redis.connection.lettuce.Lettuce
ConnectionFactory;
import
org.springframework.data.redis.connection.lettuce.Lettuce
PoolingClientConfiguration;
import org.springframework.data.redis.core.
ReactiveStringRedis
Template;
@ConditionalOnClass(LettuceConnectionFactory.class)
@Configuration
@EnableConfigurationProperties(RedisProperties.class)
@ConditionalOnProperty("redis.promotion.host")
public class RedisAutoConfiguration {
@Bean
@RefreshScope
public GenericObjectPoolConfig
genericObjectPoolConfig(Redis
Properties properties) {
//通用线程池配置
GenericObjectPoolConfig genericObjectPoolConfig = new
Generic
ObjectPoolConfig();
//设置最大连接数
genericObjectPoolConfig.setMaxTotal(properties.getPromotion()
.
getMaxTotal());
//设置最大空闲数
genericObjectPoolConfig.setMaxIdle(properties.getPromotion().
getMaxIdle());
//设置最小空闲数
genericObjectPoolConfig.setMinIdle(properties.getPromotion().
getMinIdle());
//设置最长等待时间
genericObjectPoolConfig.setMaxWaitMillis(properties.get
Promotion().getMaxWaitMillis());
//从连接池取出连接时检查有效性
genericObjectPoolConfig.setTestOnBorrow(true);
//连接返回时检查有效性
genericObjectPoolConfig.setTestOnReturn(true);
//空闲时检查有效性
genericObjectPoolConfig.setTestWhileIdle(true);
//空闲回收休眠时间
genericObjectPoolConfig.setTimeBetweenEvictionRunsMillis
(properties.getPromotion().getTimeBetweenEvictionRunsMillis()
);
return genericObjectPoolConfig;
}
@Bean
@RefreshScope
public LettuceClientConfiguration
lettuceClientConfiguration
(RedisProperties properties, GenericObjectPoolConfig
genericObject
PoolConfig) {
//Lettuce客户端配置
LettucePoolingClientConfiguration build =
LettucePooling
ClientConfiguration.builder()
.commandTimeout(Duration.ofMillis(properties.get
Promotion().getCommandTimeout()))
.shutdownTimeout(Duration.ZERO)
.poolConfig(genericObjectPoolConfig)
.build();
return build;
}
@Bean
@RefreshScope
public LettuceConnectionFactory lettuceConnectionFactory
(RedisProperties properties,
LettuceClientConfiguration lettuceClientConfiguration) {
//Redis配置
RedisStandaloneConfiguration redisConfiguration = new
RedisStandaloneConfiguration(properties.getPromotion().getHos
t(),
properties.getPromotion().getPort());
redisConfiguration.setPassword(properties.getPromotion().
getPassword());
//Lettuce连接工厂
LettuceConnectionFactory lettuceConnectionFactory =
new
LettuceConnectionFactory(redisConfiguration,
lettuceClientConfiguration);
return lettuceConnectionFactory;
}
@Bean(name = "redisTemplate")
public ReactiveStringRedisTemplate
reactiveStringRedisTemplate
(LettuceConnectionFactory lettuceConnectionFactory) {
//StringRedisTemplate声明
return new
ReactiveStringRedisTemplate(lettuceConnection
Factory, RedisSerializationContext.string());
}
}
(7)新建Sentinel切面配置,代码如下:
package com.example.promotion.config;
import
com.alibaba.csp.sentinel.annotation.aspectj.SentinelResource
Aspect;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SentinelConfig {
@Bean
public SentinelResourceAspect sentinelResourceAspect() {
//Sentinel切面声明
return new SentinelResourceAspect();
}
}
(8)新建Model层对象PromotionEntity,代码如下:
package com.example.microservice.promotion.model;
import java.io.Serializable;
import lombok.Data;
@Data
public class PromotionEntity implements Serializable {
private static final long serialVersionUID = 1L;
//促销活动id
private Integer id;
//促销活动名称
private String name;
//促销活动开始时间
private Integer beginTime;
//促销活动结束时间
private Integer endTime;
//活动奖品
private String prize;
}
(9)接口返回通用状态码及Redis主键操作key声明。新增Constant.class文件,代码如下:
package com.example.promotion.constants;
public class Constant {
//接口成功返回状态码
public static final String SUCCESS_CODE = "S00000";
//接口失败返回状态码
public static final String ERROR_CODE = "F00001";
//接口成功返回信息
public static final String SUCCESS_MSG = "success";
//促销活动Redis存储结构key
public static final String REDIS_PROMOTION_KEY =
"promotion:{0}";
//活动奖品领取记录
public static final String REDIS_PRIZE_KEY =
"promotion:{0}:{1}";
}
(10)PromotionPushController接口代码如下:
package com.example.microservice.promotion.controller;
import
org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import
org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import
org.springframework.web.bind.annotation.RestController;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.annotation.SentinelResource;
import com.example.microservice.promotion.constants.Constant;
import
com.example.microservice.promotion.service.BlockHandlerServic
e;
import
com.example.microservice.promotion.service.FallBackService;
import
com.example.microservice.promotion.service.PromotionPushServi
ce;
import cn.hutool.json.JSONObject;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Slf4j
@RestController
@RequestMapping("/api")
public class PromotionPushController {
@Autowired
private PromotionPushService promotionPushService;
//促销活动投放接口,/api/pushPromotion?id=xx
@GetMapping("pushPromotion")
@ResponseBody
@SentinelResource(value = "pushPromotion", entryType
= EntryType.IN,
blockHandler = "promotionPushBlockHandle", blockHandlerClass
=
{BlockHandlerService.class}, defaultFallback = "fallback",
fallback
Class = {FallBackService.class})
public Mono<ResponseEntity<JSONObject>>
pushPromotion(Integer id) {
Mono<ResponseEntity<JSONObject>> mono =
Mono.empty();
try {
//调用促销活动投放服务方法
return
promotionPushService.pushPromotion(id);
} catch (Exception e) {
//记录错误日志
log.error("push promotion error!");
JSONObject jsonObject = new
JSONObject();
jsonObject.put("code", Constant.ERROR_CODE);
jsonObject.put("msg", "push promotion error!");
return
Mono.just(ResponseEntity.ok(jsonObject));
}
}
//领取奖品接口,/api/ getPrize?id=xx&device=xx
@GetMapping("getPrize")
@ResponseBody
@SentinelResource(value = "getPrize", entryType =
EntryType.IN,
blockHandler = "prizeBlockHandle", blockHandlerClass =
{BlockHandler
Service.class}, defaultFallback = "fallback", fallbackClass =
{FallBackService.class})
public Mono<ResponseEntity<JSONObject>>
getPrize(Integer id,
String device) {
try {
//调用领取奖品服务方法
return
promotionPushService.getPrize(id, device);
} catch (Exception e) {
//记录错误日志
log.error("get prize error!");
JSONObject jsonObject = new
JSONObject();
jsonObject.put("code", Constant.ERROR_CODE);
jsonObject.put("msg", "get prize error!");
return
Mono.just(ResponseEntity.ok(jsonObject));
}
}
}
(11)PromotionPushService代码如下:
package com.example.microservice.promotion.service;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.data.redis.core.ReactiveHashOperations;
import
org.springframework.data.redis.core.ReactiveStringRedisTempla
te;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import com.example.microservice.promotion.constants.Constant;
import
com.example.microservice.promotion.model.PromotionEntity;
import cn.hutool.json.JSONObject;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
@Slf4j
public class PromotionPushService {
@Autowired
private ReactiveStringRedisTemplate
reactiveStringRedisTemplate;
//促销活动投放方法
public Mono<ResponseEntity<JSONObject>>
pushPromotion(Integer id) {
//组装促销活动Redis key
String key =
MessageFormat.format(Constant.REDIS_PROMOTION_
KEY, String.valueOf(id));
//采用
ReactiveStringRedisTemplate查询促销活动信息
ReactiveHashOperations<String, String,
String> reactiveHash
Operations = reactiveStringRedisTemplate.opsForHash();
Flux<Entry<String, String>> flux =
reactiveHashOperations.
entries(key);
Map<String, String> map = new HashMap<>();
flux.subscribe(entry -> {
String k = entry.getKey();
String value = entry.getValue();
map.put(k, value);
});
flux.blockLast(Duration.ofMillis(1000)); //先查询,最多
阻塞1s
if (MapUtils.isNotEmpty(map)) {
String name = (String)
map.get("name");
String prize = (String)
map.get("prize");
Integer beginTime =
Integer.valueOf((String) map.get
("beginTime"));
Integer endTime =
Integer.valueOf((String) map.get
("endTime"));
Integer currentTime = (int)
(System.currentTimeMillis()/
1000);
//判断促销活动投放条件,如果在促销活动时间内,则投放
if (currentTime >= beginTime &&
currentTime <= endTime) {
//组装PromotionEntity对象
PromotionEntity
promotionEntity = new PromotionEntity();
promotionEntity.setBeginTime(beginTime);
promotionEntity.setEndTime(endTime);
promotionEntity.setId(id);
promotionEntity.setName(name);
promotionEntity.setPrize(prize);
log.info("push promotion
success");
JSONObject jsonObject = new
JSONObject(promotionEntity);
return
Mono.just(ResponseEntity.ok(jsonObject));
}
}
JSONObject jsonObject = new JSONObject();
jsonObject.put("code", Constant.ERROR_CODE);
jsonObject.put("msg", "push promotion error!");
return Mono.just(ResponseEntity.ok(jsonObject));
}
//领取奖品的方法
public Mono<ResponseEntity<JSONObject>>
getPrize(Integer id,
String device) {
//组装领取奖品记录Redis key
String key =
MessageFormat.format(Constant.REDIS_PRIZE_KEY,
String.valueOf(id), device);
//查询领取奖品记录
Mono<String> mono =
reactiveStringRedisTemplate.opsForValue().
get(key);
String value =
mono.block(Duration.ofMillis(1000));
//领取奖品判断条件,如果领取过,则不再发放
if (StringUtils.isEmpty(value)) {
String promotionKey =
MessageFormat.format(Constant.REDIS_
PROMOTION_KEY, String.valueOf(id));
ReactiveHashOperations<String,
String, String> reactive
HashOperations = reactiveStringRedisTemplate.opsForHash();
Flux<Entry<String, String>> flux =
reactiveHashOperations.
entries(promotionKey);
Map<String, String> map = new
HashMap<>();
flux.subscribe(entry -> {
String k = entry.getKey();
String v = entry.getValue();
if (StringUtils.equals("prize", k)) {
map.put(k, v);
}
});
//先查询,最多阻塞1s
flux.blockLast(Duration.ofMillis(1000));
if (MapUtils.isNotEmpty(map)) {
String prize = map.get("prize");
log.info("get prize success");
JSONObject jsonObject = new
JSONObject();
jsonObject.put("奖品", prize);
return
Mono.just(ResponseEntity.ok(jsonObject));
}
}
JSONObject jsonObject = new JSONObject();
jsonObject.put("code", Constant.ERROR_CODE);
jsonObject.put("msg", "prize is exist!");
return Mono.just(ResponseEntity.ok(jsonObject));
}
}
(12)限流代码如下:
package com.example.microservice.promotion.service;
import org.springframework.http.ResponseEntity;
import com.example.microservice.promotion.constants.Constant;
import cn.hutool.json.JSONObject;
import reactor.core.publisher.Mono;
//限流通用类
public final class BlockHandlerService {
public static Mono<ResponseEntity<JSONObject>>
promotionPush
BlockHandle(Integer id) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("code", Constant.ERROR_CODE);
jsonObject.put("msg", "pushPromotion blcok!");
return Mono.just(ResponseEntity.ok(jsonObject));
}
public static Mono<ResponseEntity<JSONObject>>
prizeBlockHandle
(Integer id, String device) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("code", Constant.ERROR_CODE);
jsonObject.put("msg", "get prize blcok!");
return Mono.just(ResponseEntity.ok(jsonObject));
}
}
(13)降级代码如下:
package com.example.microservice.promotion.service;
import org.springframework.http.ResponseEntity;
import com.example.microservice.promotion.constants.Constant;
import cn.hutool.json.JSONObject;
import reactor.core.publisher.Mono;
//降级通用类
public final class FallBackService {
public static Mono<ResponseEntity<JSONObject>>
defaultFallBack
(Throwable ex){
JSONObject jsonObject = new JSONObject();
jsonObject.put("code", Constant.ERROR_CODE);
jsonObject.put("msg", "pushPromotion fallback!");
return Mono.just(ResponseEntity.ok(jsonObject));
}
}
(14)
MicroservicePromotionApplication代码如下:
package com.example.microservice.promotion;
import org.springframework.boot.SpringApplication;
import
org.springframework.boot.autoconfigure.SpringBootApplication;
import
org.springframework.boot.autoconfigure.data.redis.RedisAuto
Configuration;
import
org.springframework.boot.autoconfigure.data.redis.Redis
ReactiveAutoConfiguration;
import
org.springframework.boot.autoconfigure.data.redis.Redis
RepositoriesAutoConfiguration;
import
org.springframework.cloud.client.discovery.EnableDiscovery
Client;
import
org.springframework.context.annotation.EnableAspectJAutoProxy
;
@SpringBootApplication(exclude =
{RedisAutoConfiguration.class, Redis
RepositoriesAutoConfiguration.class,
RedisReactiveAutoConfiguration.
class})
@EnableAspectJAutoProxy
//开启切面
@EnableDiscoveryClient
//开启服务发现
public class MicroservicePromotionApplication {
public static void main(String[] args) {
SpringApplication.run(MicroservicePromotionApplication.class,
args);
}
}
此时,启动
MicroservicePromotionApplication主类即可访问促销活动接口
http://localhost:8081/api/pushPromotion?id=3,返回结果如下:
{
id: 3,
name: "会员促销活动",
beginTime: 1614822680,
endTime: 1617176808,
prize: "3天免费会员"
}
访问领取奖品接口
http://localhost/api/getPrize?id=3&device= 3af57d0545766ec9 40d2c32a6567cc06aed,返回结果如下:
{
奖品: "3天免费会员"
}