Spring Boot Alibaba技术栈介绍与实践
SpringCloud Alibaba介绍
Spring Cloud Alibaba 致力于提供微服务开发的一站式解决方案。此项目包含开发分布式应用微服务的必需组件,方便开发者通过 Spring Cloud 编程模型轻松使用这些组件来开发分布式应用服务。
依托 Spring Cloud Alibaba,您只需要添加一些注解和少量配置,就可以将 Spring Cloud 应用接入阿里微服务解决方案,通过阿里中间件来迅速搭建分布式应用系统。
主要功能
服务限流降级
默认支持
WebServlet、WebFlux,OpenFeign、RestTemplate、Spring Cloud Gateway,Zuul,Dubbo和RocketMQ限流降级功能的接入,可以在运行时通过控制台实时修改限流降级规则,还支持查看限流降级Metrics监控。服务注册与发现
适配
Spring Cloud服务注册与发现标准,默认集成了Ribbon的支持。分布式配置管理
支持分布式系统中的外部化配置,配置更改时自动刷新。
消息驱动能力
基于
Spring Cloud Stream为微服务应用构建消息驱动能力。分布式事务
使用
@GlobalTransactional注解, 高效并且对业务零侵入地解决分布式事务问题。阿里云对象存储
阿里云提供的海量、安全、低成本、高可靠的云存储服务。支持在任何应用、任
何时间、任何地点存储和访问任意类型的数据。分布式任务调度
提供秒级、精准、高可靠、高可用的定时(基于
Cron表达式)任务调度服务。
同时提供分布式的任务执行模型,如网格任务。网格任务支持海量子任务均匀分配到所有Worker(schedulerx-client)上执行。阿里云短信服务
覆盖全球的短信服务,友好、高效、智能的互联化通讯能力,帮助企业迅速搭建客户触达通道。
组件
Sentinel
把流量作为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。
Nacos
一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。
RocketMQ
一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。
Dubbo
Apache Dubbo是一款高性能Java RPC框架。Seata
阿里巴巴开源产品,一个易于使用的高性能微服务分布式事务解决方案。
Sentinel
阿里开源的一套用于服务容错的综合性解决方案。它以流量为切入点, 从流量控制、熔断降级、系统负载保护等多个维度来保护服务的稳定性。
Spring Cloud Gateway
Spring Cloud Gateway是Spring公司基于Spring 5.0,Spring Boot 2.0 和 Project Reactor 等技术开发的网关,它旨在为微服务架构提供一种简单有效的统一的 API 路由管理方式。
Spring Cloud Sleuth
主要功能就是在分布式系统中提供追踪解决方案。
RocketMQ
Rocketmq是一款分布式,队列模型的消息中间件,由阿里巴巴研发,借鉴参考了JMS规范的MQ实现,更参考了优秀的开源消息中间件KAFKA,并且结合阿里实际业务需求在天猫双十一的场景,实现业务削峰
Spring Cloud Alibaba全组件案例
Tips:下面的实践,是我在黑马官网找的视频中的实践,使用的是用户-商品-订单服务的案例。个人觉得这种结合案例讲解是很好的方法。
代码获取
Github地址:👉https://github.com/imxushuai/springcloud-alibaba👈
环境搭建
技术选型
JDK:1.8
Maven:3.3.9及以上
数据库:MySQL 5.7
Spring Cloud Alibaba:2.1.0.RELEASE(对标Spring Cloud的G版)
尽量保存版本一致,可以避免一些问题。
案例模块
| 模块名 | 描述 | 端口号 |
|---|---|---|
| springcloud-alibaba | 父工程 | |
| springcloud-alibaba-common | 公共模块,存放实体类和工具类等 | |
| springcloud-alibaba-user | 用户微服务 | 807x |
| springcloud-alibaba-product | 商品微服务 | 808x |
| springcloud-alibaba-order | 订单微服务 | 809x |
创建父工程
首先,创建一个maven项目
pom.xml
1 |
|
创建公共模块
创建一个maven项目
pom.xml
1 |
|
编写实体类
User(用户实体类)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24package com.springcloud.alibaba.common.entity;
import lombok.Data;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
(name = "shop_user")
public class User {
(strategy = GenerationType.IDENTITY)
private Integer uid;
private String username;
private String password;
private String telephone;
}Product(商品实体类)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33package com.springcloud.alibaba.common.entity;
import lombok.Data;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
(name = "shop_product")
public class Product {
(strategy = GenerationType.IDENTITY)
private Integer pid;
/**
* 商品名称
*/
private String pname;
/**
* 商品价格
*/
private Double pprice;
/**
* 商品库存
*/
private Integer stock;
}Order(订单实体类)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49package com.springcloud.alibaba.common.entity;
import lombok.Data;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
(name = "shop_order")
public class Order {
(strategy = GenerationType.IDENTITY)
private Long oid;
/**
* 用户id
*/
private Integer uid;
/**
* 用户名
*/
private String username;
/**
* 商品id
*/
private Integer pid;
/**
* 商品名称
*/
private String pname;
/**
* 商品单价
*/
private Double pprice;
/**
* 购买数量
*/
private Integer number;
}
创建用户微服务
创建一个maven项目
pom.xml
1 |
|
编写Spring Boot启动类
1 | package com.springcloud.alibaba.user; |
编写application.yml配置文件
1 | server: |
创建商品微服务
创建一个maven项目
pom.xml
1 |
|
编写Spring Boot启动类
1 | package com.springcloud.alibaba.product; |
编写application.yml配置文件
1 | server: |
基础代码编写
dao
1
2
3
4
5
6
7package com.springcloud.alibaba.product.repository;
import com.springcloud.alibaba.common.entity.Product;
import org.springframework.data.jpa.repository.JpaRepository;
public interface ProductRepository extends JpaRepository<Product, Integer> {
}service
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17package com.springcloud.alibaba.product.service;
import com.springcloud.alibaba.common.entity.Product;
import com.springcloud.alibaba.product.repository.ProductRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
public class ProductService {
private ProductRepository productRepository;
public Product findByPid(Integer pid) {
return productRepository.findById(pid).orElse(null);
}
}controller
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27package com.springcloud.alibaba.product.controller;
import com.alibaba.fastjson.JSON;
import com.springcloud.alibaba.common.entity.Product;
import com.springcloud.alibaba.product.service.ProductService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
4j
public class ProductController {
private ProductService productService;
("product/{pid}")
public Product product(@PathVariable Integer pid) {
Product product = productService.findByPid(pid);
log.info(">> 查询到商品: {}", JSON.toJSONString(product));
return product;
}
}
数据插入
数据库需要提前建好,启动项目后,
JPA会自动建表
运行下方SQL语句
1 | INSERT INTO shop_product VALUE(NULL,'小米','1000','5000'); |
测试API
使用浏览器或者PostMan调用:http://localhost:8081/product/1
创建订单微服务
创建一个maven项目
pom.xml
1 |
|
编写Spring Boot启动类
1 | package com.springcloud.alibaba.order; |
编写application.yml配置文件
1 | server: |
基础代码编写
dao
1
2
3
4
5
6
7package com.springcloud.alibaba.order.repository;
import com.springcloud.alibaba.common.entity.Order;
import org.springframework.data.jpa.repository.JpaRepository;
public interface OrderRepository extends JpaRepository<Order, Integer> {
}service
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23package com.springcloud.alibaba.order.service;
import com.springcloud.alibaba.common.entity.Order;
import com.springcloud.alibaba.order.repository.OrderRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
public class OrderService {
private OrderRepository orderRepository;
public List<Order> findAll() {
return orderRepository.findAll();
}
public void save(Order order) {
orderRepository.save(order);
}
}controller
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55package com.springcloud.alibaba.order.controller;
import com.alibaba.fastjson.JSON;
import com.springcloud.alibaba.common.entity.Order;
import com.springcloud.alibaba.common.entity.Product;
import com.springcloud.alibaba.order.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import java.util.List;
4j
public class OrderController {
private OrderService orderService;
private RestTemplate restTemplate;
("order")
public List<Order> orderList() {
return orderService.findAll();
}
//准备买1件商品
("/order/prod/{pid}")
public Order order(@PathVariable("pid") Integer pid) {
log.info(">> 客户下单,这时候要调用商品微服务查询商品信息");
//通过restTemplate调用商品微服务
Product product = restTemplate.getForObject("http://localhost:8081/product/" + pid, Product.class);
if (product != null) {
log.info(">> 商品信息,查询结果: {}", JSON.toJSONString(product));
Order order = new Order();
order.setUid(1);
order.setUsername("测试用户");
order.setPid(product.getPid());
order.setPname(product.getPname());
order.setPprice(product.getPprice());
order.setNumber(1);
orderService.save(order);
return order;
}
throw new RuntimeException("购买失败!");
}
}
测试API
使用浏览器或者PostMan调用:http://localhost:8091/order/prod/1
Spring Cloud Alibaba实践
服务治理(Nacos Discovery)
什么是服务治理
服务治理是微服务架构中最核心最基本的模块。用于实现各个微服务的自动化注册与发现。
- 服务注册:在服务治理框架中,都会构建一个注册中心,每个服务单元向注册中心登记自己提供服
务的详细信息。并在注册中心形成一张服务的清单,服务注册中心需要以心跳的方式去监测清单中
的服务是否可用,如果不可用,需要在服务清单中剔除不可用的服务。 - 服务发现:服务调用方向服务注册中心咨询服务,并获取所有服务的实例清单,实现对具体服务实
例的访问。
通过上面的调用图会发现,除了微服务,还有一个组件是服务注册中心,它是微服务架构非常重要的一个组件,在微服务架构里主要起到了协调者的一个作用。注册中心一般包含如下几个功能:
服务发现:
服务注册:保存服务提供者和服务调用者的信息
服务订阅:服务调用者订阅服务提供者的信息,注册中心向订阅者推送提供者的信息服务配置:
配置订阅:服务提供者和服务调用者订阅微服务相关的配置
配置下发:主动将配置推送给服务提供者和服务调用者服务健康检测
检测服务提供者的健康情况,如果发现异常,执行服务剔除
常见的注册中心
Zookeeper
zookeeper是一个分布式服务框架,是Apache Hadoop的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。Eureka
Eureka是Springcloud Netflix中的重要组件,主要作用就是做服务注册和发现。但是现在已经闭
源Consul
Consul是基于GO语言开发的开源工具,主要面向分布式,服务化的系统提供服务注册、服务发现和配置管理的功能。Consul的功能都很实用,其中包括:服务注册/发现、健康检查、Key/Value存储、多数据中心和分布式一致性保证等特性。Consul本身只是一个二进制的可执行文件,所以安装和部署都非常简单,只需要从官网下载后,在执行对应的启动脚本即可。Nacos
Nacos是一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。它是SpringCloud Alibaba组件之一,负责服务注册发现和服务配置,可以这样认为注册中心 + 配置中心。
Nacos
Nacos安装
预备环境准备
Nacos 依赖 Java 环境来运行。如果您是从代码开始构建并运行Nacos,还需要为此配置 Maven环境,请确保是在以下版本环境中安装使用:
下载最新的稳定版本
从该地址下载地址下载最新的稳定版本,我这里使用的是最新的稳定版:1.3.2
解压下载好的压缩包
启动Nacos
Linux
1
2cd nacos/bin # 进入nacos下的bin目录
./startup.sh -m standalone # 以单机模式启动Windows
1
2cd nacos/bin
./startup.cmd -m standalone
访问网页控制台
访问:http://your-host:8848/nacos
可以直接登录,账号密码:nacos/nacos
注册商品和订单微服务到Nacos
注册商品以及订单微服务到Nacos操作相同,如下:
在商品以及订单微服务中加入
Nacos-client依赖1
2
3
4<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>在商品以及订单微服务的启动类上添加
@EnableDiscoveryClient注解在商品以及订单微服务的
application.yml配置文件中配置Nacos的地址1
2
3
4
5spring:
cloud:
nacos:
discovery:
server-addr: 192.168.149.101:8848修改
OrderController代码逻辑,使用DiscoveryClient获取服务实例并从中获取服务IP地址和端口号进行远程调用1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30//准备买1件商品
("/order/prod/{pid}")
public Order order(@PathVariable("pid") Integer pid) {
log.info(">> 客户下单,这时候要调用商品微服务查询商品信息");
List<ServiceInstance> instances = discoveryClient.getInstances("service-product");
if (instances != null && instances.size() > 0) {
ServiceInstance serviceInstance = instances.get(0);
//通过restTemplate调用商品微服务
Product product = restTemplate.getForObject(
"http://" + serviceInstance.getHost() + ":" + serviceInstance.getPort() + "/product/" + pid, Product.class);
if (product != null) {
log.info(">> 商品信息,查询结果: {}", JSON.toJSONString(product));
Order order = new Order();
order.setUid(1);
order.setUsername("测试用户");
order.setPid(product.getPid());
order.setPname(product.getPname());
order.setPprice(product.getPprice());
order.setNumber(1);
orderService.save(order);
return order;
}
}
throw new RuntimeException("购买失败!");
}启动订单和商品微服务
查看
Nacos服务列表调用购买商品API
成功调用购买商品API
负载均衡
什么是负载均衡
负载均衡就是将负载(工作任务,访问请求)进行分摊到多个操作单元(服务器,组件)上进行执行。
根据负载均衡发生位置的不同,一般分为服务端负载均衡和客户端负载均衡。
服务端负载均衡指的是发生在服务提供者一方,比如常见的nginx负载均衡
而客户端负载均衡指的是发生在服务请求的一方,也就是在发送请求之前已经选好了由哪个实例处理请求。
Ribbon组件
使用Ribbon组件完成负载均衡调用
在
RestTemplate的JavaBean配置方法上加入@LoadBalanced注解1
2
3
4
5
public RestTemplate getRestTemplate() {
return new RestTemplate();
}修改代码,使用
RestTemplate以及服务名直接调用API1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25//准备买1件商品
("/order/prod/{pid}")
public Order order(@PathVariable("pid") Integer pid) {
log.info(">> 客户下单,这时候要调用商品微服务查询商品信息");
//通过restTemplate调用商品微服务
Product product = restTemplate.getForObject(
"http://product-service/product/" + pid, Product.class);
if (product != null) {
log.info(">> 商品信息,查询结果: {}", JSON.toJSONString(product));
Order order = new Order();
order.setUid(1);
order.setUsername("测试用户");
order.setPid(product.getPid());
order.setPname(product.getPname());
order.setPprice(product.getPprice());
order.setNumber(1);
orderService.save(order);
return order;
}
throw new RuntimeException("购买失败!");
}启动多个
Product服务调用购买商品API
Ribbon支持的负载均衡策略
Ribbon默认采用的是轮询
| 策略名 | 描述 | 实现说明 |
|---|---|---|
| BestAvailableRule | 选择一个最小的并发请求的server | 逐个考察Server,如果Server被tripped了,则忽略,在选择其中ActiveRequestsCount最小的server |
| AvailabilityFilteringRule | 过滤掉那些因为一直连接失败的被标记为circuit tripped的后端server,并过滤掉那些高并发的的后端server(activeconnections 超过配置的阈值) | 使用一个AvailabilityPredicate来包含过滤server的逻辑,其实就就是检查status里记录的各个server的运行状态 |
| WeightedResponseTimeRule | 根据相应时间分配一个weight,相应时间越长,weight越小,被选中的可能性越低 | 一个后台线程定期的从status里面读取评价响应时间,为每个server计算一个weight。Weight的计算也比较简单responsetime 减去每个server自己平均的responsetime是server的权重。当刚开始运行,没有形成statas时,使用roubine策略选择server |
| RetryRule | 对选定的负载均衡策略机上重试机制 | 在一个配置时间段内当选择server不成功,则一直尝试使用subRule的方式选择一个可用的server |
| RoundRobinRule | 轮询方式轮询选择server | 轮询index,选择index对应位置的server |
| RandomRule | 随机选择一个server | 在index上随机,选择index对应位置的server |
| ZoneAvoidanceRule | 复合判断server所在区域的性能和server的可用性选择server | 使用ZoneAvoidancePredicate和AvailabilityPredicate来判断是否选择某个server,前一个判断判定一个zone的运行性能是否可用,剔除不可用的zone(的所有server),AvailabilityPredicate用于过滤掉连接数过多的Server |
通过如下配置可以进行均衡负载策略的配置
1 | service-product: # 调用的提供者的名称 |
Feign服务调用
什么是Feign
Feign是Spring Cloud提供的一个声明式的伪Http客户端, 它使得调用远程服务就像调用本地服务一样简单, 只需要创建一个接口并添加一个注解即可。
Nacos很好的兼容了Feign, Feign默认集成了Ribbon, 所以在Nacos下使用Fegin默认就实现了负载均衡的效果。
基于Feign实现服务调用
加入
Feign依赖1
2
3
4
5<!--fegin组件-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>在订单微服务的启动类加上
@EnableFeignClients注解编写
FeignClient1
2
3
4
5
6
7
8
9
10
11
12
13
14package com.springcloud.alibaba.order.client;
import com.springcloud.alibaba.common.entity.Product;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
("service-product")
public interface ProductClient {
("product/{pid}")
Product findById(@PathVariable Integer pid);
}修改购买商品逻辑,调用编写的FeignClient中的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private ProductClient productClient;
//准备买1件商品
("/order/prod/{pid}")
public Order order(@PathVariable("pid") Integer pid) {
log.info(">> 客户下单,这时候要调用商品微服务查询商品信息");
//通过Feign客户端调用
Product product = productClient.findById(pid);
if (product != null) {
log.info(">> 商品信息,查询结果: {}", JSON.toJSONString(product));
Order order = new Order();
order.setUid(1);
order.setUsername("测试用户");
order.setPid(product.getPid());
order.setPname(product.getPname());
order.setPprice(product.getPprice());
order.setNumber(1);
orderService.save(order);
return order;
}
throw new RuntimeException("购买失败!");
}测试购买商品API
服务容错(Sentinel)
服务雪崩
在分布式系统中,由于网络原因或自身的原因,服务一般无法保证 100% 可用。如果一个服务出现了问题,调用这个服务就会出现线程阻塞的情况,此时若有大量的请求涌入,就会出现多条线程阻塞等待,进而导致服务瘫痪。
由于服务与服务之间的依赖性,故障会传播,会对整个微服务系统造成灾难性的严重后果,这就是服务故障的 “雪崩效应”。
雪崩发生的原因多种多样,有不合理的容量设计,或者是高并发下某一个方法响应变慢,亦或是某台机器的资源耗尽。我们无法完全杜绝雪崩源头的发生,只有做好足够的容错,保证在一个服务发生问题,不会影响到其它服务的正常运行。也就是"雪落而不雪崩"。
常见的容错方案
常见的容错思路有隔离、超时、限流、熔断、降级
隔离
它是指将系统按照一定的原则划分为若干个服务模块,各个模块之间相对独立,无强依赖。当有故障发生时,能将问题和影响隔离在某个模块内部,而不扩散风险,不波及其它模块,不影响整体的系统服务。常见的隔离方式有:线程池隔离和信号量隔离。
超时
在上游服务调用下游服务的时候,设置一个最大响应时间,如果超过这个时间,下游未作出反应,就断开请求,释放掉线程。
限流
限流就是限制系统的输入和输出流量已达到保护系统的目的。为了保证系统的稳固运行,一旦达到的需要限制的阈值,就需要限制流量并采取少量措施以完成限制流量的目的。
熔断
在互联网系统中,当下游服务因访问压力过大而响应变慢或失败,上游服务为了保护系统整体的可用性,可以暂时切断对下游服务的调用。这种牺牲局部,保全整体的措施就叫做熔断。
降级
降级其实就是为服务提供一个托底方案,一旦服务无法正常调用,就使用托底方案。
常用的容错组件
Hystrix
Hystrix是由Netflix开源的一个延迟和容错库,用于隔离访问远程系统、服务或者第三方库,防止级联失败,从而提升系统的可用性与容错性。Resilience4J
Resilicence4J一款非常轻量、简单,并且文档非常清晰、丰富的熔断工具,这也是Hystrix官方推荐的替代产品。不仅如此,Resilicence4j还原生支持Spring Boot 1.x/2.x,而且监控也支持和prometheus等多款主流产品进行整合。Sentinel
Sentinel是阿里巴巴开源的一款断路器实现,本身在阿里内部已经被大规模采用,非常稳定。
三个组件的对比表:
| Sentinel | Hystrix | Resilience4J | |
|---|---|---|---|
| 隔离策略 | 信号量隔离(并发线程数限流) | 线程池隔离/信号量隔离 | 信号量隔离 |
| 熔断降级策略 | 基于响应时间、异常比率、异常数 | 基于异常比率 | 基于异常比率、响应时间 |
| 实时统计实现 | 滑动窗口(LeapArray) | 滑动窗口(基于RxJava) | Ring Bit Buffer |
| 动态规则配置 | 支持多种数据源 | 支持多种数据源 | 有限支持 |
| 扩展性 | 多个扩展点 | 插件的形式 | 接口的形式 |
| 基于注解的支持 | 支持 | 支持 | 支持 |
| 限流 | 基于 QPS,支持基于调用关系的限流 | 有限的支持 | Rate Limiter |
| 流量整形 | 支持预热模式、匀速器模式、预热排队模式 | 不支持 | 简单的 Rate Limiter模式 |
| 系统自适应保护 | 支持 | 不支持 | 不支持 |
| 控制台 | 提供开箱即用的控制台,可配置规则、查看秒级监控、机器发现等 | 简单的监控查看 | 不提供控制台,可对接其它监控系统 |
什么是Sentinel
Sentinel(分布式系统的流量防卫兵) 是阿里开源的一套用于服务容错的综合性解决方案。它以流量为切入点, 从流量控制、熔断降级、系统负载保护等多个维度来保护服务的稳定性。
Sentinel 具有以下特征:
- 丰富的应用场景:
Sentinel承接了阿里巴巴近 10 年的双十一大促流量的核心场景, 例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等。 - 完备的实时监控:
Sentinel提供了实时的监控功能。通过控制台可以看到接入应用的单台机器秒级数据, 甚至 500 台以下规模的集群的汇总运行情况。 - 广泛的开源生态:
Sentinel提供开箱即用的与其它开源框架/库的整合模块, 例如与SpringCloud、Dubbo、gRPC的整合。只需要引入相应的依赖并进行简单的配置即可快速地接入Sentinel。 - 完善的 SPI 扩展点:
Sentinel提供简单易用、完善的SPI扩展接口。您可以通过实现扩展接口来快速地定制逻辑。例如定制规则管理、适配动态数据源等。
Sentinel 分为两个部分:
- 核心库(Java 客户端)不依赖任何框架/库,能够运行于所有 Java 运行时环境,同时对
Dubbo或者Spring Cloud等框架也有较好的支持。 - 控制台(Dashboard)基于
Spring Boot开发,打包后可以直接运行,不需要额外的Tomcat等应用容器。
运行Sentinel
Sentinel提供一个轻量级的控制台, 它提供机器发现、单机资源实时监控以及规则管理等功能。
最新的正式版下载:https://github.com/alibaba/Sentinel/releases
直接下载jar包即可。
我这里由于想修改启动的配置,又不想使用参数,所以把jar包里面的配置文件扒出来了,修改了一些配置,然后把配置文件和jar包放在同级目录直接使用
java -jar启动了,Spring boot启动的时候,加载配置文件是有优先级的(想了解的哥们儿,可以自行百度哈),但实际生产环境需避免配置文件优先级,防止不必要的错误。
启动完毕后,访问: localhost:18080
我这里是修改过server.port的,默认是8080。默认的用户密码是
sentinel/sentinel
微服务集成Sentinel
直接引入sentinel的starter依赖即可。
然后通过配置sentinel的规则实现服务容错的各种策略。
1 | <dependency> |
修改application.yml,加入一下配置
1 | spring: |
测试Sentinel
新增一个限流的规则,测试Sentinel
登录
Sentinel控制台点击右侧的微服务名称,如果没有,需要任意调用一个接口,Sentinel就会加载到该微服务
选择 “流控规则” -> “新增流控规则”,弹出输入框,输入要限流的API以及相关参数,如图
这里我设置了
/sentinel/message1这个API,每秒只允许一个请求访问成功,若超过一个请求,则快速返回失败。测试效果
快速刷新,则会显示图上的效果。
Sentinel的概念
基本概念
资源
资源就是
Sentinel要保护的东西。它可以是Java应用程序中的任何内容,可以是一个服务,也可以是一个方法,甚至可以是一段代码。规则
规则就是用来定义如何保护资源。作用在资源之上,定义以什么样的方式来保护资源,主要包括流量控制规则、熔断降级规则以及系统保护规则
重要功能
Sentinel的主要功能就是荣作,主要体现:
流量控制
流量控制在网络传输中是一个常用的概念,它用于调整网络包的数据。任意时间到来的请求往往是随即不可控的,而系统的处理能力是有限的。我们需要根据系统的处理能力对流量进行控制。
Sentinel作为一个调配器,可以根据需要把随机的请求调整成可控的。熔断降级
当检测到调用链路中某个资源出现不稳定的表现,例如请求响应时间过长或异常请求比例升高的时候,则对这个资源的调用进行限制,让请求快速失败,避免影响到其它资源而导致雪崩。
Sentinel对这个问题采取了两种手段:通过并发线程数进行限制
Sentinel通过限制资源并发线程数量,来减少不稳定资源对其它资源的影响。当某个资源出现不稳定的情况下,例如响应时间过长,对资源的直接影响就是会造成线程数的逐步堆积。当线程数在特定资源上堆积到一定的数量之后,对该资源的新请求就会被拒绝。堆积的线程完成任务后才开始继续接受新的请求。通过响应时间对资源进行降级
除了对并发线程数进行控制以外。
Sentinel还可以通过响应时间来快速降级不稳定的资源。当依赖的资源出现响应时间过长后,所有对该资源的访问都会被拒绝,直到过了指定的时间窗口之后才重新恢复。
Sentinel和Hystrix的区别两者的原则是一致的, 都是当一个资源出现问题时, 让其快速失败, 不要波及到其它服务。但是在限制的手段上, 确采取了完全不一样的方法
Hystrix采用的是线程池隔离的方式, 优点是做到了资源之间的隔离, 缺点是增加了线程 切换的成本。Sentinel采用的是通过并发线程的数量和响应时间来对资源做限制。系统负载保护
Sentinel同时提供系统维度的自适应保护能力。当系统负载较高的时候,如果还持续让 请求进入可能会导致系统崩溃,无法响应。在集群环境下,会把本应这台机器承载的流量转发到其 它的机器上去。如果这个时候其它的机器也处在一个边缘状态的时候,Sentinel提供了对应的保护机制,让系统的入口流量和系统的负载达到一个平衡,保证系统在能力范围之内处理最多的请 求。
Sentinel流控
流控规则
流量控制,其原理是监控应用流量的QPS(每秒查询率) 或并发线程数等指标,当达到指定的阈值时流量进行控制,以避免被瞬时的流量高峰冲垮,从而保障应用的高可用性。
找到要设置流控规则的接口地址,点击流控按钮,打开如下图的设置界面:
- 资源名:唯一名称,默认是请求路径,可自定义
- 针对来源,默认default,不区分来源
- 阈值类型/单机阈值
- QPS:当前接口的每秒请求上限数量,达到设置的数量时则进行限流
- 线程数:当前接口的线程数达到阈值时则进行限流
- 是否集群:是否为集群环境
以上是基本的设置,还有流控模式和流控效果,下面详细讲解
流控模式
直接流控模式
直接流控模式是最简单的模式,当指定的接口达到限流条件时开启限流。上面案例使用的就是直接流控模式。
关联流控模式
关联流控模式指的是,当指定接口关联的接口达到限流条件时,开启对指定接口开启限流。选择关联流控模式时,会多一个输入框,让你输入关联资源名,当输入的关联资源名达到阈值时就会进行限流
链路流控模式
链路流控模式指的是,当从某个接口过来的资源达到限流条件时,开启限流。它的功能有点类似于针对来源配置项,区别在于:针对来源是针对上级微服务,而链路流控是针对上级接口,也就是说它的粒度更细。
流控效果
快速失败(默认)
直接失败,抛出异常。
Warm Up
它从开始阈值到最大QPS阈值会有一个缓冲阶段,一开始的阈值是最大QPS阈值的1/3,然后慢慢增长,直到最大阈值,适用于将突然增大的流量转换为缓步增长的场景。
排队等待
让请求以均匀的速度通过,单机阈值为每秒通过数量,其余的排队等待; 它还会让设置一个超时时间,当请求超过超时间时间还未处理,则会被丢弃。
Sentinel降级
降级规则就是设置当满足什么条件的时候,对服务进行降级。Sentinel提供了三个衡量条件:
降级规则
平均响应时间
当资源的平均响应时间超过阈值(以 ms 为单位)之后,资源进入准降级状态。如果接下来 1s 内持续进入 5 个请求,它们的 RT都持续超过这个阈值,那么在接下的时间窗口(以 s 为单位)之内,就会对这个方法进行服务降级。
异常比例
当资源的每秒异常总数占通过量的比值超过阈值之后,资源进入降级状态,即在接下的时间窗口(以 s 为单位)之内,对这个方法的调用都会自动地返回。异常比率的阈值范围是 [0.0,1.0]。
Sentinel热点
热点参数流控规则是一种更细粒度的流控规则, 它允许将规则具体到参数上。
热点规则
直接上示例
编写用于设置热点规则的接口
1
2
3
4
5("message3")
("message3")// 必须添加该注解, 否则热点规则不会生效
public String message3(String name, Integer age) {
return "Message 3. name = " + name + ",age = " + age;
}热点规则设置如下
预期达到的限流效果,当访问message3接口时,如果name参数不为空,则每秒访问数超过1,就会限流。而只传age参数的时候,API访问不会被限制
高级选项设置
可以在高级选项中设置具体的参数的值的阈值,比如:设置name=imxushuai时,阈值设置到1000。这样就可以达到对具体的参数值的限流
Sentinel授权
很多时候,我们需要根据调用来源来判断该次请求是否允许放行,这时候可以使用 Sentinel 的来源访问控制的功能。来源访问控制根据资源的请求来源(origin)限制资源是否通过。
设置指定的流控应用以及授权的类型,其中授权类型为白名单/黑名单,相信授权类型时很好理解的,但是什么是流控应用呢。
Sentinel提供了接口来让我们自定义授权的规则,非常的灵活,直接上代码示例
1 | package com.springcloud.alibaba.order.config; |
如代码中,我们可以从请求中获取任意的参数作为来进行自定义判断,达到我们想要的效果,上方给出的代码中,只是简单的从header中获取了名为serviceName的参数直接返回,这个返回值如果和我们设置的流控应用的值一致则会触发我们的授权规则,从而达到限流的效果。
Sentinel系统规则
系统保护规则是从应用级别的入口流量进行控制,从单台机器的总体 Load、RT、入口 QPS 、CPU使用率和线程数五个维度监控应用数据,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。
- Load(仅对 Linux/Unix-like 机器生效):当系统 load1 超过阈值,且系统当前的并发线程数超过系统容量时才会触发系统保护。系统容量由系统的 maxQps minRt 计算得出。设定参考值一般是 CPU cores 2.5。
- RT:当单台机器上所有入口流量的平均 RT 达到阈值即触发系统保护,单位是毫秒。
- 线程数:当单台机器上所有入口流量的并发线程数达到阈值即触发系统保护。
- 入口 QPS:当单台机器上所有入口流量的 QPS 达到阈值即触发系统保护。
- CPU使用率:当单台机器上所有入口流量的 CPU使用率达到阈值即触发系统保护。
自定义异常返回
在上面的测试中,我们会发现,当限流后的返回情况直接跳转到了一个错误页面,在真实的场景中,其实我们是需要返回特定的数据的,这就需要我们进行异常的自定义返回了。
Sentinel提供了各种异常的接口,我们可以选择需要自定义的异常类型进行自定义,异常类型如下:
异常接口:
BlockException包含了所有的Sentinel限流异常,即统一设置所有的限流异常返回
流控异常:
FlowException降级异常:
DegradeException参数限流异常:
ParamFlowException授权异常:
AuthorityException系统负载异常:
SystemBlockException
有了这些类型,我们既可以使用Spring boot的统一异常处理,去单独处理以上的异常,也可以使用Sentinel提供的接口类进行异常处理,下方我给出一个使用Sentinel提供的接口类的方式处理统一异常返回的代码示例
代码示例:
1 | package com.springcloud.alibaba.order.config; |
@SentinelResource注解
@SentinelResource的参数设置很多,我直接贴出源码类以及对应的含义和解释
我直接贴出的是
@SentinelResource的源码,如果看不懂注解类中的语法可以忽略代码,只看注释。
1 | (ElementType.METHOD) |
在@SentinelResource注解中我们可以定义当发现限流时进入的方法,我们可以在方法自定义我们自己的业务逻辑,从而达到我们想要的效果。
Sentinel规则持久化
这个就不做详细介绍了,主要就是指如何将存在Sentinel里的规则持久化到硬盘,避免每次重启Sentinel后之前的设置的限流规则就不在了,不需要特别记,到时候百度就行。(毕竟我不会的百度都会 ~)
Feign整合Sentinel
引入依赖
1
2
3
4<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>编写配置文件,打开
Feign对Sentinel的支持1
2
3
4feign:
sentinel:
# 开启sentinel支持
enabled: true编写容错类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17package com.springcloud.alibaba.order.client.fallback;
import com.springcloud.alibaba.common.entity.Product;
import com.springcloud.alibaba.order.client.ProductClient;
import org.springframework.stereotype.Component;
/**
* product service容错类
*/
public class ProductClientFallback implements ProductClient {
public Product findById(Integer pid) {
// 具体的容错业务逻辑, 我这里直接返回空的商品对象
return new Product();
}
}修改FiegnClient指定起容错类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15package com.springcloud.alibaba.order.client;
import com.springcloud.alibaba.common.entity.Product;
import com.springcloud.alibaba.order.client.fallback.ProductClientFallback;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
(value = "service-product", fallback = ProductClientFallback.class)
public interface ProductClient {
("product/{pid}")
Product findById(@PathVariable Integer pid);
}
以上就是Sentinel的基本用法了。
服务网关(Gateway)
在微服务架构中,一个系统会被拆分为很多个微服务。那么作为客户端要如何去调用这么多的微服务呢?如果没有网关的存在,我们只能在客户端记录每个微服务的地址,然后分别去调用。
这样的架构,会存在着诸多的问题:
- 客户端多次请求不同的微服务,增加客户端代码或配置编写的复杂性
- 认证复杂,每个服务都需要独立认证。
- 存在跨域请求,在一定场景下处理相对复杂。
这些问题可以借助API网关来解决
API网关,就是指系统的统一入口,它封装了应用程序的内部结构,为客户端提供统一服务,一些与业务本身功能无关的公共逻辑可以在这里实现,诸如认证、鉴权、监控、路由转发等等。
在业界比较流行的网关,有下面这些:
Ngnix+lua
使用nginx的反向代理和负载均衡可实现对api服务器的负载均衡及高可用。lua是一种脚本语言,可以来编写一些简单的逻辑, nginx支持lua脚本
Kong
基于Nginx+Lua开发,性能高,稳定,有多个可用的插件(限流、鉴权等等)可以开箱即用。 问题:只支持Http协议;二次开发,自由扩展困难;提供管理API,缺乏更易用的管控、配置方式。
Zuul
Netflix开源的网关,功能丰富,使用JAVA开发,易于二次开发 问题:缺乏管控,无法动态配置;依赖组件较多;处理Http请求依赖的是Web容器,性能不如Nginx
Spring Cloud Gateway
Spring公司为了替换Zuul而开发的网关服务,这是我们要使用的网关。
Gateway简介
Spring Cloud Gateway是Spring公司基于Spring 5.0,Spring Boot 2.0 和 Project Reactor 等技术开发的网关,它旨在为微服务架构提供一种简单有效的统一的 API 路由管理方式。它的目标是替代Netflix Zuul,其不仅提供统一的路由方式,并且基于 Filter 链的方式提供了网关基本的功能,例如:安全,监控和限流。
优点:
- 性能强劲:是第一代网关Zuul的1.6倍
- 功能强大:内置了很多实用的功能,例如转发、监控、限流等
- 设计优雅,容易扩展
缺点:
- 其实现依赖Netty与WebFlux,不是传统的Servlet编程模型,学习成本高
- 不能将其部署在Tomcat、Jetty等Servlet容器里,只能打成jar包执行
- 需要Spring Boot 2.0及以上的版本,才支持
基本使用
新建maven项目
引入依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springcloud-alibaba</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>springcloud-alibaba-gateway</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!--nacos客户端-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
</dependencies>
</project>编写启动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15package com.springcloud.alibaba;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
public class GatewayApplication {
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
}编写配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38server:
port: 7000
spring:
application:
name: api-gateway
cloud:
nacos:
discovery:
server-addr: 192.168.149.101:8848
gateway:
routes: # 路由数组[路由 就是指定当请求满足什么条件的时候转到哪个微服务]
# 当前路由的标识, 要求唯一
- id: product_route
# 请求要转发到的地址
uri: lb://service-product
# 路由的优先级,数字越小级别越高
order: 1
# 断言(就是路由转发要满足的条件)
predicates:
# 当请求路径满足Path指定的规则时,才进行路由转发
- Path=/product-serv/**
# 过滤器,请求在传递过程中可以通过过滤器对其进行一定的修改
filters:
# 转发之前去掉1层路径,如果为0则原样转发URL到微服务
- StripPrefix=1
- id: order_route
# 请求要转发到的地址
uri: lb://service-order
# 路由的优先级,数字越小级别越高
order: 1
# 断言(就是路由转发要满足的条件)
predicates:
# 当请求路径满足Path指定的规则时,才进行路由转发
- Path=/order-serv/**
# 过滤器,请求在传递过程中可以通过过滤器对其进行一定的修改
filters:
# 转发之前去掉1层路径
- StripPrefix=1测试访问效果
可以看到,成功从网关访问到了product微服务的API。
Gateway核心架构
路由(Route) 是 gateway 中最基本的组件之一,表示一个具体的路由信息载体。主要定义了下面的几个信息:
- id,路由标识符,区别于其他 Route。
- uri,路由指向的目的地 uri,即客户端请求最终被转发到的微服务。
- order,用于多个 Route 之间的排序,数值越小排序越靠前,匹配优先级越高。
- predicate,断言的作用是进行条件判断,只有断言都返回真,才会真正的执行路由。
- filter,过滤器用于修改请求和响应信息。
Gateway的运行流程:
- Gateway Client向Gateway Server发送请求
- 请求首先会被HttpWebHandlerAdapter进行提取组装成网关上下文
- 然后网关的上下文会传递到DispatcherHandler,它负责将请求分发给RoutePredicateHandlerMapping
- RoutePredicateHandlerMapping负责路由查找,并根据路由断言判断路由是否可用
- 如果过断言成功,由FilteringWebHandler创建过滤器链并调用
- 请求会一次经过PreFilter–微服务–PostFilter的方法,最终返回响应
断言
Predicate(断言, 谓词) 用于进行条件判断,只有断言都返回真,才会真正的执行路由。
断言就是说: 在什么条件下,才能继续执行
内置路由断言工厂
SpringCloud Gateway包括许多内置的断言工厂,所有这些断言都与HTTP请求的不同属性匹配。具体如下:
基于Datetime类型的断言工厂
此类型的断言根据时间做判断,主要有三个:
AfterRoutePredicateFactory: 接收一个日期参数,判断请求日期是否晚于指定日期
BeforeRoutePredicateFactory: 接收一个日期参数,判断请求日期是否早于指定日期
BetweenRoutePredicateFactory: 接收两个日期参数,判断请求日期是否在指定时间段内
示例:
After=2019-12-31T23:59:59.789+08:00[Asia/Shanghai]基于远程地址的断言工厂 RemoteAddrRoutePredicateFactory:接收一个IP地址段,判断请求主机地址是否在地址段中
示例:
RemoteAddr=192.168.1.1/24基于Cookie的断言工厂
CookieRoutePredicateFactory:接收两个参数,cookie 名字和一个正则表达式。 判断请求cookie是否具有给定名称且值与正则表达式匹配。
示例:
Cookie=chocolate, ch.基于Header的断言工厂
HeaderRoutePredicateFactory:接收两个参数,标题名称和正则表达式。 判断请求Header是否具有给定名称且值与正则表达式匹配。
示例:
Header=X-Request-Id, \d+基于Host的断言工厂
HostRoutePredicateFactory:接收一个参数,主机名模式。判断请求的Host是否满足匹配规则。
示例:
Host=**.testhost.org基于Method请求方法的断言工厂
MethodRoutePredicateFactory:接收一个参数,判断请求类型是否跟指定的类型匹配。
示例:
Method=GET基于Path请求路径的断言工厂
PathRoutePredicateFactory:接收一个参数,判断请求的URI部分是否满足路径规则。
示例:
Path=/foo/{segment}基于Query请求参数的断言工厂
QueryRoutePredicateFactory :接收两个参数,请求param和正则表达式, 判断请求参数是否具有给定名称且值与正则表达式匹配。
示例:
Query=baz, ba.基于路由权重的断言工厂
WeightRoutePredicateFactory:接收一个[组名,权重], 然后对于同一个组内的路由按照权重转发
示例:
1
2
3
4
5
6
7
8
9
10
11routes:
- id: weight_route1
uri: host1
predicates:
- Path=/product/**
- Weight=group3, 1
- id: weight_route2
uri: host2
predicates:
- Path=/product/**
- Weight= group3, 9
自定义路由断言工厂
新增自定义断言配置
1
2
3
4
5
6
7
8
9
10
11gateway:
routes:
- id: product_route
uri: lb://service-product
order: 1
predicates:
- Path=/product-serv/**
# 新增的自定义断言配置
- Age=20,80
filters:
- StripPrefix=1编写自定义断言类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52package com.springcloud.alibaba.predicates;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.springframework.cloud.gateway.handler.predicate.AbstractRoutePredicateFactory;
import org.springframework.cloud.gateway.handler.predicate.AfterRoutePredicateFactory;
import org.springframework.cloud.gateway.handler.predicate.BeforeRoutePredicateFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
/**
* Age自定义断言类
*
* 自定义断言类命名规范:配置项 + RoutePredicateFactory
*/
public class AgeRoutePredicateFactory extends AbstractRoutePredicateFactory<AgeRoutePredicateFactory.Config> {
public AgeRoutePredicateFactory() {
super(AgeRoutePredicateFactory.Config.class);
}
public List<String> shortcutFieldOrder() {
return Arrays.asList("minAge", "maxAge");
}
public Predicate<ServerWebExchange> apply(Config config) {
return (exchange) -> {
// 获取请求参数age
String ageString = exchange.getRequest().getQueryParams().getFirst("age");
if (StringUtils.isBlank(ageString)) {
return false;
}
// 判断age
int age = Integer.parseInt(ageString);
return age > config.minAge && age < config.maxAge;
};
}
static class Config {
private int minAge;
private int maxAge;
}
}测试会发现,当不传Age参数或者Age的参数值不在20-80之间时,就会直接跳转到错误页面。反之则正常返回数据
过滤器
在Gateway中, Filter的生命周期只有两个:Pre 和 Post
Pre
这种过滤器在请求被路由之前调用。我们可利用这种过滤器实现身份验证、在集群中选择请求的微服务、记录调试信息等。
Post这种过滤器在路由到微服务以后执行。这种过滤器可用来为响应添加标准的HTTP Header、收集统计信息和指标、将响应从微服务发送给客户端等。
Gateway 的Filter从作用范围可分为两种: GatewayFilter与GlobalFilter。
GatewayFilter
应用到单个路由或者一个分组的路由上。
GlobalFilter
应用到所有的路由上。
局部过滤器
Spring Gateway内置具备过滤器
在SpringCloud Gateway中内置了很多不同类型的网关路由过滤器。具体如下:
| 过滤器工厂类 | 作用 | 参数 |
| ————————— | ———————————————————— | ———————————————————— |
| AddRequestHeader | 为原始请求添加Header | Header的名称及值 |
| AddRequestParameter | 为原始请求添加请求参数 | 参数名称及值 |
| AddResponseHeader | 为原始响应添加Header | Header的名称及值 |
| DedupeResponseHeader | 剔除响应头中重复的值 | 需要去重的Header名称及去重策略 |
| Hystrix | 为路由引入Hystrix的断路器保护 | HystrixCommand的名称 |
| FallbackHeaders | 为fallbackUri的请求头中添加具体的异常信息 | Header的名称 |
| PrefixPath | 为原始请求路径添加前缀 | 前缀路径 |
| PreserveHostHeader | 为请求添加一个preserveHostHeader=true的属性,路由过滤器会检查该属性以决定是否要发送原始的Host | 无 |
| RequestRateLimiter | 用于对请求限流,限流算法为令牌桶 | keyResolver、rateLimiter、statusCode、denyEmptyKey、emptyKeyStatus |
| RedirectTo | 将原始请求重定向到指定的URL | http状态码及重定向的url |
| RemoveHopByHopHeadersFilter | 为原始请求删除IETF组织规定的一系列Header | 默认就会启用,可以通过配置指定仅删除哪些Header |
| RemoveRequestHeader | 为原始请求删除某个Header | Header名称 |
| RemoveResponseHeader | 为原始响应删除某个Header | Header名称 |
| RewritePath | 重写原始的请求路径 | 原始路径正则表达式以及重写后路径的正则表达式 |
| RewriteResponseHeader | 重写原始响应中的某个Header | Header名称,值的正则表达式,重写后的值 |
| SaveSession | 在转发请求之前,强制执行WebSession::save操作 | 无 |
| secureHeaders | 为原始响应添加一系列起安全作用的响应头 | 无,支持修改这些安全响应头的值 |
| SetPath | 修改原始的请求路径 | 修改后的路径 |
| SetResponseHeader | 修改原始响应中某个Header的值 | Header名称,修改后的值 |
| SetStatus | 修改原始响应的状态码 | HTTP 状态码,可以是数字,也可以是字符串 |
| StripPrefix | 用于截断原始请求的路径 | 使用数字表示要截断的路径的数量 |
| Retry | 针对不同的响应进行重试 | retries、statuses、methods、series |
| RequestSize | 设置允许接收最大请求包的大小。如果请求包大小超过设置的<值,则返回 413 Payload TooLarge | 请求包大小,单位为字节,默认值为5M |
| ModifyRequestBody | 在转发请求之前修改原始请求体内容 | 修改后的请求体内容 |
| ModifyResponseBody | 修改原始响应体的内容 | 修改后的响应体内容 |具体使用,建议百度。
自定义具备过滤器
自定义过滤器的方式和自定义断言工厂类方法类似
新增自定义过滤器配置
1
2
3
4
5
6
7
8
9
10
11gateway:
routes:
- id: product_route
uri: lb://service-product
order: 1
predicates:
- Path=/product-serv/**
filters:
- StripPrefix=1
# 新增自定义过滤器配置
- Log=true,false编写自定义过滤器类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46package com.springcloud.alibaba.filter;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
/**
* 自定义局部过滤器
*
* 过滤器命名规则:配置项 + GatewayFilterFactory
*/
4j
public class LogGatewayFilterFactory extends AbstractGatewayFilterFactory<LogGatewayFilterFactory.Config> {
public List<String> shortcutFieldOrder() {
return Arrays.asList("consoleLog", "cacheLog");
}
public GatewayFilter apply(Config config) {
return (exchange, chain) -> {
if (config.consoleLog) {
log.info("开启ConsoleLog....");
}
if (config.cacheLog) {
log.info("开启CacheLog....");
}
return chain.filter(exchange);
};
}
public static class Config {
private boolean consoleLog;
private boolean cacheLog;
}
}测试
注意观察控制台打印的日志。
全局过滤器
全局过滤器作用于所有路由, 无需配置。通过全局过滤器可以实现对权限的统一校验,安全性验证等功能。
Spring Gateway内置全局过滤器
自定义全局过滤器
内置的过滤器已经可以完成大部分的功能,但是对于企业开发的一些业务功能处理,还是需要我们自己编写过滤器来实现的,那么我们一起通过代码的形式自定义一个过滤器,去完成统一的权限校验。
开发中的鉴权逻辑:
- 当客户端第一次请求服务时,服务端对用户进行信息认证(登录)
- 认证通过,将用户信息进行加密形成token,返回给客户端,作为登录凭证
- 以后每次请求,客户端都携带认证的token
- 服务端对token进行解密,判断是否有效
我们自定义一个过滤器来完成类似操作
全局过滤器比局部过滤器药简单一些,直接定义过滤器类就OK
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41package com.springcloud.alibaba.filter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
4j
public class AuthGlobalFilterFactory implements GlobalFilter, Ordered {
/**
* 自定义的过滤器业务逻辑
*/
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String token = exchange.getRequest().getQueryParams().getFirst("token");
if (StringUtils.isBlank(token)) {// 鉴权失败
log.error("非法用户....");
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
}
// 鉴权成功, 执行一系列业务....
return chain.filter(exchange);
}
/**
* 过滤器的优先级, 值越小优先级越高
*
* @return orderValue
*/
public int getOrder() {
return 0;
}
}
网关限流
网关是所有请求的公共入口,所以可以在网关进行限流,而且限流的方式也很多,我们本次采用前面学过的Sentinel组件来实现网关的限流。Sentinel支持对SpringCloud Gateway、Zuul等主流网关进行限流。
从1.6.0版本开始,Sentinel提供了SpringCloud Gateway的适配模块,可以提供两种资源维度的限流:
- route维度:即在Spring配置文件中配置的路由条目,资源名为对应的routeId
- 自定义API维度:用户可以利用Sentinel提供的API来自定义一些API分组
Route维度限流
引入依赖
1
2
3
4<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-spring-cloud-gateway-adapter</artifactId>
</dependency>编写配置类
基于Sentinel 的Gateway限流是通过其提供的Filter来完成的,使用时只需注入对应的SentinelGatewayFilter实例以及 SentinelGatewayBlockExceptionHandler 实例即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88package com.springcloud.alibaba.config;
import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayFlowRule;
import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayRuleManager;
import com.alibaba.csp.sentinel.adapter.gateway.sc.SentinelGatewayFilter;
import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.BlockRequestHandler;
import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.GatewayCallbackManager;
import com.alibaba.csp.sentinel.adapter.gateway.sc.exception.SentinelGatewayBlockExceptionHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.result.view.ViewResolver;
import javax.annotation.PostConstruct;
import java.util.*;
4j
public class GatewayRouteFilterConfiguration {
private final List<ViewResolver> viewResolvers;
private final ServerCodecConfigurer serverCodecConfigurer;
public GatewayRouteFilterConfiguration(ObjectProvider<List<ViewResolver>> viewResolversProvider, ServerCodecConfigurer serverCodecConfigurer) {
this.viewResolvers = viewResolversProvider.getIfAvailable(Collections::emptyList);
this.serverCodecConfigurer = serverCodecConfigurer;
}
/**
* 初始化限流过滤器
*/
(Ordered.HIGHEST_PRECEDENCE)// 最高优先级
public GlobalFilter sentinelGatewayFilter() {
return new SentinelGatewayFilter();
}
/**
* 配置限流的异常处理器
*/
(Ordered.HIGHEST_PRECEDENCE)// 最高优先级
public SentinelGatewayBlockExceptionHandler sentinelGatewayBlockExceptionHandler() {
return new SentinelGatewayBlockExceptionHandler(viewResolvers, serverCodecConfigurer);
}
/**
* 初始化限流规则
*/
public void initGatewayRules() {
Set<GatewayFlowRule> rules = new HashSet<>();
rules.add(
new GatewayFlowRule("product_route") //资源名称,对应路由id
.setCount(1) // 限流阈值
.setIntervalSec(1) // 统计时间窗口,单位是秒,默认是 1 秒
);
GatewayRuleManager.loadRules(rules);
}
/**
* 自定义限流异常页面
*/
public void initBlockHandlers() {
BlockRequestHandler blockRequestHandler = (serverWebExchange, throwable) -> {
Map<String, Object> map = new HashMap<>();
map.put("code", 0);
map.put("message", "接口被限流了");
return ServerResponse.status(HttpStatus.OK).
contentType(MediaType.APPLICATION_JSON_UTF8).
body(BodyInserters.fromObject(map));
};
GatewayCallbackManager.setBlockHandler(blockRequestHandler);
}
}测试
多次刷新界面,可以看到被限流的返回值
API维度限流
自定义API分组是一种更细粒度的限流规则定义
1 | package com.springcloud.alibaba.config; |
在ProductController中新增接口
1 | ("product/api1/{string}") |
测试限流效果
- 正常的返回
链路追踪(Sleuth + Zipkin)
链路追踪简介
在大型系统的微服务化构建中,一个系统被拆分成了许多模块。这些模块负责不同的功能,组合成系统,最终可以提供丰富的功能。在这种架构中,一次请求往往需要涉及到多个服务。互联网应用构建在不同的软件模块集上,这些软件模块,有可能是由不同的团队开发、可能使用不同的编程语言来实现、有可能布在了几千台服务器,横跨多个不同的数据中心,也就意味着这种架构形式也会存在一些问题:
- 如何快速发现问题?
- 如何判断故障影响范围?
- 如何梳理服务依赖以及依赖的合理性?
- 如何分析链路性能问题以及实时容量规划?
分布式链路追踪(Distributed Tracing),就是将一次分布式请求还原成调用链路,进行日志记录,性能监控并将一次分布式请求的调用情况集中展示。比如各个服务节点上的耗时、请求具体到达哪台机器上、每个服务节点的请求状态等等。
常见的链路追踪技术有下面这些:
cat
由大众点评开源,基于Java开发的实时应用监控平台,包括实时应用监控,业务监控 。 集成方案是通过代码埋点的方式来实现监控,比如: 拦截器,过滤器等。 对代码的侵入性很大,集成成本较高。风险较大。
zipkin
由
Twitter公司开源,开放源代码分布式的跟踪系统,用于收集服务的定时数据,以解决微服务架构中的延迟问题,包括:数据的收集、存储、查找和展现。该产品结合spring-cloud-sleuth使用较为简单, 集成很方便, 但是功能较简单。pinpoint
Pinpoint是基于字节码注入的调用链分析,以及应用监控分析工具。特点是支持多种插件,UI功能强大,接入端无代码侵入。skywalking
SkyWalking是本土开源的基于字节码注入的调用链分析,以及应用监控分析工具。特点是支持多种插件,UI功能较强,接入端无代码侵入。目前已加入Apache孵化器。Sleuth
SpringCloud提供的分布式系统中链路追踪解决方案,一般结合zipkin一起使用,Seluth做链路追踪,zipkin做链路情况的可视化展示。
Sleuth简介
Spring Cloud Sleuth主要功能就是在分布式系统中提供追踪解决方案。它大量借用了Google Dapper的设计, 先来了解一下Sleuth中的术语和相关概念。
Trace
由一组
Trace Id相同的Span串联形成一个树状结构。为了实现请求跟踪,当请求到达分布式系统的入口端点时,只需要服务跟踪框架为该请求创建一个唯一的标识(即Trace Id),同时在分布式系统内部流转的时候,框架始终保持传递该唯一值,直到整个请求的返回。那么我们就可以使用该唯一标识将所有的请求串联起来,形成一条完整的请求链路。Span
代表了一组基本的工作单元。为了统计各处理单元的延迟,当请求到达各个服务组件的时候,也通过一个唯一标识(
Span Id)来标记它的开始、具体过程和结束。通过Span Id的开始和结束时间戳,就能统计该span的调用时间,除此之外,我们还可以获取如事件的名称。请求信息等元数据。Annotation
用它记录一段时间内的事件,内部使用的重要注释:
cs(Client Send)客户端发出请求,开始一个请求的生命
sr(Server Received)服务端接受到请求开始进行处理, sr-cs = 网络延迟(服务调用的时间)
ss(Server Send)服务端处理完毕准备发送到客户端,ss - sr = 服务器上的请求处理时间
cr(Client Reveived)客户端接受到服务端的响应,请求结束。 cr - sr = 请求的总时间
Sleuth基本使用
引入依赖
在父工程的pom文件中引入依赖
1
2
3
4
5<!--链路追踪 Sleuth-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>重启所有微服务,调用API后,观察控制台,就可以看到
Sleuth的日志打印
查看日志文件并不是一个很好的方法,当微服务越来越多日志文件也会越来越多,通过Zipkin可以将日志聚合,并进行可视化展示和全文检索。
Zipkin介绍
Zipkin 是 Twitter 的一个开源项目,它基于Google Dapper实现,它致力于收集服务的定时数据,以解决微服务架构中的延迟问题,包括数据的收集、存储、查找和展现。
我们可以使用它来收集各个服务器上请求链路的跟踪数据,并通过它提供的REST API接口来辅助我们查询跟踪数据以实现对分布式系统的监控程序,从而及时地发现系统中出现的延迟升高问题并找出系统性能瓶颈的根源。
除了面向开发的 API 接口之外,它也提供了方便的UI组件来帮助我们直观的搜索跟踪信息和分析请求链路明细,比如:可以查询某段时间内各用户请求的处理时间等。
Zipkin 提供了可插拔数据存储方式:In-Memory、MySql、Cassandra 以及 Elasticsearch。
上图是 Zipkin 的基础架构,它主要由 4 个核心组件构成:
- Collector:收集器组件,它主要用于处理从外部系统发送过来的跟踪信息,将这些信息转换为
Zipkin内部处理的Span格式,以支持后续的存储、分析、展示等功能。 - Storage:存储组件,它主要对处理收集器接收到的跟踪信息,默认会将这些信息存储在内存中,我们也可以修改此存储策略,通过使用其他存储组件将跟踪信息存储到数据库中。
- RESTful API:
API组件,它主要用来提供外部访问接口。比如给客户端展示跟踪信息,或是外接系统访问以实现监控等。 - Web UI:UI组件, 基于
API组件实现的上层应用。通过UI组件用户可以方便而有直观地查询和分析跟踪信息。
Zipkin分为两端,一个是 Zipkin服务端,一个是 Zipkin客户端,客户端也就是微服务的应用。 客户端会配置服务端的 URL 地址,一旦发生服务间的调用的时候,会被配置在微服务里面的 Sleuth 的监听器监听,并生成相应的 Trace 和 Span 信息发送给服务端。
Zipkin安装
- 下载ZipKin的jar包,访问:https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec,会自动开始下载。
- 运行jar包,
java -jar zipkin-server-2.12.9-exec.jar - 访问UI界面:
http://host:9411,默认端口为9411
Zipkin集成
引入依赖,,在父工程加入下列依赖
1
2
3
4
5<!--zipkin集成-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>在每个微服务加入下列配置
1
2
3
4
5
6
7spring:
zipkin:
base-url: http://192.168.149.101:9411/
discoveryClientEnabled: false #让nacos把它当成一个URL,而不要当做服务名
sleuth:
sampler:
probability: 1.0 #采样的百分比测试,访问URL:
http://localhost:7000/order-serv/order/prod/1,然后观察zipkin的UI界面注意:关闭之前gateway测试时加入的token过滤器
可以点击下方的请求,查看更详细的调用情况,里面会给出API途经的微服务的耗时情况,便于观察问题所在微服务。
ZipKin数据持久化
默认情况下,ZipKin将链路数据保存在内存中的,一旦ZipKin重启后,之前收集的链路数据将全部失效,这时我们就需要用到ZipKin的链路数据持久化了。
这里仅介绍将数据持久化到mysql数据库中
执行zipkin官方提供的见表语句,创建数据库表
注意:需要提前创建数据库,我这里创建的数据库名称为:zipkin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48CREATE TABLE IF NOT EXISTS zipkin_spans (
`trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit',
`trace_id` BIGINT NOT NULL,
`id` BIGINT NOT NULL,
`name` VARCHAR(255) NOT NULL,
`parent_id` BIGINT,
`debug` BIT(1),
`start_ts` BIGINT COMMENT 'Span.timestamp(): epoch micros used for endTs query and to implement TTL',
`duration` BIGINT COMMENT 'Span.duration(): micros used for minDuration and maxDuration query'
) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;
ALTER TABLE zipkin_spans ADD UNIQUE KEY(`trace_id_high`, `trace_id`, `id`) COMMENT 'ignore insert on duplicate';
ALTER TABLE zipkin_spans ADD INDEX(`trace_id_high`, `trace_id`, `id`) COMMENT 'for joining with zipkin_annotations';
ALTER TABLE zipkin_spans ADD INDEX(`trace_id_high`, `trace_id`) COMMENT 'for getTracesByIds';
ALTER TABLE zipkin_spans ADD INDEX(`name`) COMMENT 'for getTraces and getSpanNames';
ALTER TABLE zipkin_spans ADD INDEX(`start_ts`) COMMENT 'for getTraces ordering and range';
CREATE TABLE IF NOT EXISTS zipkin_annotations (
`trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit',
`trace_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.trace_id',
`span_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.id',
`a_key` VARCHAR(255) NOT NULL COMMENT 'BinaryAnnotation.key or Annotation.value if type == -1',
`a_value` BLOB COMMENT 'BinaryAnnotation.value(), which must be smaller than 64KB',
`a_type` INT NOT NULL COMMENT 'BinaryAnnotation.type() or -1 if Annotation',
`a_timestamp` BIGINT COMMENT 'Used to implement TTL; Annotation.timestamp or zipkin_spans.timestamp',
`endpoint_ipv4` INT COMMENT 'Null when Binary/Annotation.endpoint is null',
`endpoint_ipv6` BINARY(16) COMMENT 'Null when Binary/Annotation.endpoint is null, or no IPv6 address',
`endpoint_port` SMALLINT COMMENT 'Null when Binary/Annotation.endpoint is null',
`endpoint_service_name` VARCHAR(255) COMMENT 'Null when Binary/Annotation.endpoint is null'
) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;
ALTER TABLE zipkin_annotations ADD UNIQUE KEY(`trace_id_high`, `trace_id`, `span_id`, `a_key`, `a_timestamp`) COMMENT 'Ignore insert on duplicate';
ALTER TABLE zipkin_annotations ADD INDEX(`trace_id_high`, `trace_id`, `span_id`) COMMENT 'for joining with zipkin_spans';
ALTER TABLE zipkin_annotations ADD INDEX(`trace_id_high`, `trace_id`) COMMENT 'for getTraces/ByIds';
ALTER TABLE zipkin_annotations ADD INDEX(`endpoint_service_name`) COMMENT 'for getTraces and getServiceNames';
ALTER TABLE zipkin_annotations ADD INDEX(`a_type`) COMMENT 'for getTraces and autocomplete values';
ALTER TABLE zipkin_annotations ADD INDEX(`a_key`) COMMENT 'for getTraces and autocomplete values';
ALTER TABLE zipkin_annotations ADD INDEX(`trace_id`, `span_id`, `a_key`) COMMENT 'for dependencies job';
CREATE TABLE IF NOT EXISTS zipkin_dependencies (
`day` DATE NOT NULL,
`parent` VARCHAR(255) NOT NULL,
`child` VARCHAR(255) NOT NULL,
`call_count` BIGINT,
`error_count` BIGINT
) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;
ALTER TABLE zipkin_dependencies ADD UNIQUE KEY(`day`, `parent`, `child`);修改zipkin jar包的启动参数
1
2启动zipkin
java -jar zipkin-server-2.12.9-exec.jar --STORAGE_TYPE=mysql --MYSQL_HOST=192.168.149.1 --MYSQL_TCP_PORT=3306 --MYSQL_DB=zipkin --MYSQL_USER=root --MYSQL_PASS=123456再次访问API:
http://localhost:7000/order-serv/order/prod/1并观察数据库的变化,会发现链路数据已经存入了mysql中
消息队列(RocketMQ)
老实说,消息队列已经是个老生常谈的分布式组件了,我之前也有写过很详细的消息队列文章,只是用的消息队列产品不同而已。
所以消息队列的基础概念就不在这里做介绍了。
详情可见:
RocketMQ安装
下载RocketMq,直接访问官方网站:https://rocketmq.apache.org,首页上就可以下载到最新的正式版本,直接下载即可
注意:安装的时候下载
Binary包就可以了。上传下载的安装包到服务器并准备安装环境,由于rocketMQ是基于java语言开发的,所以需要提前在服务器安装好JDK
我这用的4.9.2,安装的JDK 1.8
解压上传到服务器的安装包
1
2
3
4
5使用unzip解压,若提示 command not found,则需要安装unzip
unzip
安装unzip的命令
yum install -y unzip启动NameServer
1
nohup ./bin/mqnamesrv &
(可选操作)修改broker和server服务启动参数
由于rocketMQ默认的JVM参数设置的内存占用比较高,所以可以视情况修改JVM参数调整内存占用
1
2
3
4
5
6
7
8
9修改runserver.sh,修改JVM参数
vim runserver.sh
修改runbroker.sh,修改JVM参数
vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
修改为JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
若vim提示 command not fount,则需要安装vim或者使用vi
yum install -y vim启动broker
1
nohup ./bin/mqbroker -n localhost:9876 &
关闭命令
1
2
3
4关闭broker
./bin/mqshutdown broker
关闭namesrv
./bin/mqshutdown namesrv
RocketMQ控制台安装
下载安装包,访问:https://github.com/apache/rocketmq-externals/tags,下载对应系统的安装包
将下载的源码包打包成可执行jar包
1
2
3
4
5若没有MVN环境还需要安装maven环境
打包前,需要修改配置文件,设置其rocketMQ的nameServer地址
打包
mvn clean package -Dmaven.test.skip=true打包完成后,将target中的可执行jar包运行
1
java -jar rocketmq-console-ng-1.0.0.jar
案例
消息生产者: 订单微服务
在订单微服务中添加
RocketMQ依赖以及spring boot rocket starter1
2
3
4
5
6
7
8
9
10
11<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>修改配置文件
1
2
3
4rocketmq:
name-server: 192.168.149.101:9876 #rocketMQ服务的地址
producer:
group: shop-order # 生产者组修改下单的逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private RocketMQTemplate rocketMQTemplate;
//准备买1件商品
("/order/prod/{pid}")
public Order order(@PathVariable("pid") Integer pid) {
log.info(">> 客户下单,这时候要调用商品微服务查询商品信息");
// 通过Feign客户端调用
Product product = productClient.findById(pid);
if (product != null) {
log.info(">> 商品信息,查询结果: {}", JSON.toJSONString(product));
Order order = new Order();
order.setUid(1);
order.setUsername("测试用户");
order.setPid(product.getPid());
order.setPname(product.getPname());
order.setPprice(product.getPprice());
order.setNumber(1);
orderService.save(order);
// 下单完成, 发送消息到用户微服务
rocketMQTemplate.convertAndSend("order-topic", order);
return order;
}
throw new RuntimeException("购买失败!");
}下单完成后,发送订单消息到
RocketMQ
消息消费者: 用户微服务
在用户微服务中添加
RocketMQ依赖以及spring boot rocket starter1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16<!--nacos客户端-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>修改主类,加入
@@EnableDiscoveryClient注解修改配置文件,加入
nacos与RocketMQ相关配置1
2
3
4
5
6
7
8
9spring:
cloud:
nacos:
discovery:
server-addr: 192.168.149.101:8848
rocketmq:
name-server: 192.168.149.101:9876 #rocketMQ服务的地址
producer:
group: shop-order # 生产者组编写消息消费者类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18package com.springcloud.alibaba.user.service;
import com.alibaba.fastjson.JSON;
import com.springcloud.alibaba.common.entity.Order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
4j
(consumerGroup = "shop-user", topic = "order-topic")
public class SmsService implements RocketMQListener<Order> {
public void onMessage(Order order) {
log.info("收到订单消息: {}", JSON.toJSONString(order));
}
}启动订单,产品,用户微服务,调用下单API并观察消息消费过程
配置中心(Nacos Config)
微服务架构下关于配置文件的一些问题:
- 配置文件相对分散。在一个微服务架构下,配置文件会随着微服务的增多变的越来越多,而且分散在各个微服务中,不好统一配置和管理。
- 配置文件无法区分环境。微服务项目可能会有多个环境,例如:测试环境、预发布环境、生产环境。每一个环境所使用的配置理论上都是不同的,一旦需要修改,就需要我们去各个微服务下手动维护,这比较困难。
- 配置文件无法实时更新。我们修改了配置文件之后,必须重新启动微服务才能使配置生效,这对一个正在运行的项目来说是非常不友好的。基于上面这些问题,我们就需要配置中心的加入来解决这些问题。
配置中心的思路是:
- 首先把项目中各种配置全部都放到一个集中的地方进行统一管理,并提供一套标准的接口。
- 当各个服务需要获取配置的时候,就来配置中心的接口拉取自己的配置。
- 当配置中心中的各种参数有更新的时候,也能通知到各个服务实时的过来同步最新的信息,使之动态更新。
基本使用
安装Nacos
引入Nacos Config依赖
1
2
3
4<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>添加
bootstrap.conf的配置文件1
2
3
4
5
6
7
8
9
10spring:
application:
name: service-order
cloud:
nacos:
config:
server-addr: 192.168.149.101:8848 #nacos中心地址
file-extension: yaml # 配置文件格式
profiles:
active: dev # 环境标识复制原有的配置文件在nacos中创建对应的配置文件,data id为:
service-order-dev.yaml修改原有的配置文件的名字为:
application.yml.bak,然后启动应用并观察能否成功运行
动态更新配置
实现了配置的远程存放,但是此时如果修改了配置,我们的程序是无法读取到的,因此,我们需要开启配置的动态刷新功能。
在nacos的配置文件中添加自定义的配置项,用于测试配置动态刷新
1
2app:
customer: test编写controller类,方便测试更新配置后查看是否动态刷新了本地配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18package com.springcloud.alibaba.order.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
public class TestConfigRefreshController {
("${app.customer}")
private String CUSTOMER;
("getCustomerConfig")
public String getCustomerConfig() {
return CUSTOMER;
}
}注意:
- 刷新配置的关键在于:
@RefreshScope注解。如果没有此注解,将不会自动刷新配置。 - 只有自定义的配置项需要手动加入
@RefreshScope才能应用自动刷新配置,类似数据库配置等,无需配置注解会自动刷新配置。
- 刷新配置的关键在于:
启动微服务,先调用查看CUSTOMER的值,然后修改nacos中的值再次查看。观察是否动态刷新。
配置共享
在日常的开发中基本上同样的一段配置,可能在很多微服务中或者同个微服务中的不同环境中用到,这个时候如果这一段配置在每个配置文件中都写一遍,当配置需要变更时,就必须每个配置文件都改一遍,这是非常的麻烦的。所以是否有办法将同一段配置在一个地方写好,其他配置文件都去引用这个配置文件呢?
在Nacos的配置中心中新建一个配置文件,命名任意并将公共的配置信息放入该配置文件
修改要引入该公共配置文件的
bootstrap.yml1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18spring:
application:
name: service-order
cloud:
nacos:
config:
server-addr: 192.168.149.101:8848 #nacos中心地址
file-extension: yaml # 配置文件格式
#指定共享配置,且支持动态刷新
ext-config:
- data-id: datasource.yaml
group: DEFAULT_GROUP
refresh: true
- data-id: common.yaml
group: DEFAULT_GROUP
refresh: true
profiles:
active: dev # 环境标识启动并观察是否按照设置的配置启动了微服务
分布式事务(Seata)
分布式的基础理论就不在这里做介绍了。
想了解的可以看下我的这篇文章:分布式事务
安装Seata
下载seata,访问:https://github.com/seata/seata/releases,下载对应系统的安装包
我这里用的版本为:seata 0.9.0
解压缩安装包
配置seata
修改registry.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
serverAddr = "192.168.149.101"
namespace = ""
cluster = "default"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"
nacos {
serverAddr = "192.168.149.101"
namespace = ""
}
}在安装目录创建nacos-config.txt并粘贴下方内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.thread-factory.boss-thread-prefix=NettyBoss
transport.thread-factory.worker-thread-prefix=NettyServerNIOWorker
transport.thread-factory.server-executor-thread-prefix=NettyServerBizHandler
transport.thread-factory.share-boss-worker=false
transport.thread-factory.client-selector-thread-prefix=NettyClientSelector
transport.thread-factory.client-selector-thread-size=1
transport.thread-factory.client-worker-thread-prefix=NettyClientWorkerThread
transport.thread-factory.boss-thread-size=1
transport.thread-factory.worker-thread-size=8
transport.shutdown.wait=3
service.vgroup_mapping.service-order=default
service.vgroup_mapping.service-product=default
service.enableDegrade=false
service.disable=false
service.max.commit.retry.timeout=-1
service.max.rollback.retry.timeout=-1
client.async.commit.buffer.limit=10000
client.lock.retry.internal=10
client.lock.retry.times=30
client.lock.retry.policy.branch-rollback-on-conflict=true
client.table.meta.check.enable=true
client.report.retry.count=5
client.tm.commit.retry.count=1
client.tm.rollback.retry.count=1
store.mode=file
store.file.dir=file_store/data
store.file.max-branch-session-size=16384
store.file.max-global-session-size=512
store.file.file-write-buffer-cache-size=16384
store.file.flush-disk-mode=async
store.file.session.reload.read_size=100
store.db.datasource=dbcp
store.db.db-type=mysql
store.db.driver-class-name=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true
store.db.user=root
store.db.password=123456
store.db.min-conn=1
store.db.max-conn=3
store.db.global.table=global_table
store.db.branch.table=branch_table
store.db.query-limit=100
store.db.lock-table=lock_table
recovery.committing-retry-period=1000
recovery.asyn-committing-retry-period=1000
recovery.rollbacking-retry-period=1000
recovery.timeout-retry-period=1000
transaction.undo.data.validation=true
transaction.undo.log.serialization=jackson
transaction.undo.log.save.days=7
transaction.undo.log.delete.period=86400000
transaction.undo.log.table=undo_log
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registry-type=compact
metrics.exporter-list=prometheus
metrics.exporter-prometheus-port=9898
support.spring.datasource.autoproxy=false需要修改的点:
添加事务组,和待会代码中配置的名称有关,需要和order-service和product-service一致
service.vgroup_mapping.service-order=default
service.vgroup_mapping.service-product=default
在业务数据库中创建数据表
1
2
3
4
5
6
7
8
9
10
11
12
13CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = INNODB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
执行脚本将
seata的配置导入nacos中1
2
3
4
5
6命令解析:-h -p 指定nacos的端口地址;-g 指定配置的分组,注意,是配置的分组;-t 指定命名空间id; -u -w指定nacos的用户名和密码,同样,这里开启了nacos注册和配置认证的才需要指定。
sh nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t 0af6e97b-a684-4647-b696-7c6d42aecce7 -u nacos -w nacos
我这里就全部使用默认的配置,默认会将seata配置全部导入到public工作空间中
注意,如果是在windos环境中需要用能够运行sh命令的客户端,比如:git bash
./nacos-config.sh 192.168.149.101启动
seata1
2
3
4
5
6windows, 在bin目录中执行
seata-server.bat -p 9000 -m file
linux, 在bin目录中执行
./seata-server.sh -p 9000 -m file
# 可以使用 -p 指定seata运行的端口可以在
nacos的服务列表中看到serverAddr的服务,说明启动成功
微服务配置Seata
引入依赖,
order-service与product-service中都需要引入1
2
3
4<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>配置
seata,order-service与product-service中都需要配置order-service的bootstrap.yml
1
2
3
4
5spring:
cloud:
alibaba:
seata:
tx-service-group: service-order #要与配置文件中的vgroupMapping一致product-service的bootstrap.yml
1
2
3
4
5spring:
cloud:
alibaba:
seata:
tx-service-group: service-product #要与配置文件中的vgroupMapping一致
配置代理数据源
Seata 是通过代理数据源实现事务分支的,所以需要配置 io.seata.rm.datasource.DataSourceProxy 的Bean,且是 @Primary默认的数据源,否则事务不会回滚,无法实现分布式事务。
order-service和product-service都需要配置1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23package com.springcloud.alibaba.order.config;
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
public class DataSourceProxyConfig {
(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
return new DruidDataSource();
}
public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
}复制
registry.conf文件到order-service和product-service的resources目录中配置完成
改造下单逻辑
在
product-service中的新增API用于扣减库存productService,新增方法
1
2
3
4
5
6
7
8
9public void reduceInventory(Integer pid, int num) {
Product product = this.findByPid(pid);
// 发生异常
int i = 1/0;
product.setStock(product.getStock() - num);
productRepository.save(product);
}productController,新增API
1
2
3
4("product/reduceInventory")
public void reduceInventory(@RequestParam Integer pid, @RequestParam int num) {
productService.reduceInventory(pid, num);
}
在
order-service中的ProductClient新增API1
2("product/reduceInventory")
public void reduceInventory(@RequestParam Integer pid, @RequestParam int num);order-service中的OrderService新增方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private ProductClient productClient;
private RocketMQTemplate rocketMQTemplate;
public Order createOrder(Integer pid) {
log.info(">> 客户下单,这时候要调用商品微服务查询商品信息");
// 通过Feign客户端调用
Product product = productClient.findById(pid);
if (product != null) {
log.info(">> 商品信息,查询结果: {}", JSON.toJSONString(product));
Order order = new Order();
order.setUid(1);
order.setUsername("测试用户");
order.setPid(product.getPid());
order.setPname(product.getPname());
order.setPprice(product.getPprice());
order.setNumber(1);
this.save(order);
// 减少库存
productClient.reduceInventory(pid, order.getNumber());
// 下单完成, 发送消息到用户微服务
rocketMQTemplate.convertAndSend("order-topic", order);
return order;
}
return null;
}在方法上加上
@GlobalTransactional后,分布式事务就会生效,测试时,可以先使用普通的@Transactional注解对比事务是否生效。修改
order-service中的OrderController的下单API1
2
3
4
5//准备买1件商品
("/order/prod/{pid}")
public Order order(@PathVariable("pid") Integer pid) {
return orderService.createOrder(pid);
}调用下单API观察分布式事务生效情况
结语
总结
Spring Cloud Alibaba在国内的使用率还算不错,去年也用Spring Cloud Alibaba完成了实战使用,用在了 四川防汛云平台(省级项目)上,总体来说Spring Cloud Alibaba还是比较好用的,技术栈也慢慢变得越来越成熟,社区也比较活跃。
感言
最近的一年,对比之前感觉自己有点懈怠了。主要是工作上比较忙,休息时间有点疲于学习了,导致这篇文章完结推迟了差不多半年。希望小伙伴们别像我这样,加油努力的学习,早日实现经济自由。冲冲冲!!!


































