分布式、微服务、高性能相关技术

注册中心

consul

consul 用于服务注册、发现,带健康检测

1docker run -d --name=cs1 -p 8510:8500 -p 8310:8300 -p 8312:8302 -p 8610:8600/udp consul consul agent -dev -client 0.0.0.0 
2
3# dns 
4dig @127.0.0.1 -p 8610 consul.service.consul SRV
5
6#mac 启动
7consul agent -dev

添加服务

https://www.consul.io/api-docs/agent/service#register-service

删除服务

https://www.consul.io/api-docs/agent/service#deregister-service

设置健康检查

https://www.consul.io/api-docs/agent/check

同一个服务注册多个实例

获取服务

https://www.consul.io/api-docs/agent/service#list-services

注册、注销服务,健康监测代码

 1import requests
 2
 3headers = {
 4    "content-type": "application/json"
 5}
 6
 7
 8# https://www.consul.io/api-docs/agent/check#grpcusetls
 9def registerGrpcCheck(name, id, address, port):
10    url = "http://127.0.0.1:8500/v1/agent/service/register"
11    rsp = requests.put(url, headers=headers, json={
12        "Name": name,
13        "ID": id,
14        "Tags": ["micro", "crmao", "user"],
15        "Address": address,
16        "Port": port,
17        "Check": {
18            "GRPC": f"{address}:{port}",
19            "GRPCUseTLS": False,
20            "Timeout": "5s",
21            "Interval": "5s",
22            "DeregisterCriticalServiceAfter": "5s",
23        }
24    })
25    if rsp.status_code == 200:
26        print("注册成功")
27
28    else:
29        print(rsp.text)
30        print(f"注册失败:{rsp}")
31
32
33def register(name, id, address, port):
34    url = "http://127.0.0.1:8500/v1/agent/service/register"
35    rsp = requests.put(url, headers=headers, json={
36        "Name": name,
37        "ID": id,
38        "Tags": ["micro", "crmao", "user"],
39        "Address": address,
40        "Port": port,
41    })
42    if rsp.status_code == 200:
43        print("注册成功")
44
45    else:
46        print(rsp.text)
47        print(f"注册失败:{rsp}")
48
49
50def registerAndCheck(name, id, address, port):
51    url = "http://127.0.0.1:8500/v1/agent/service/register"
52    rsp = requests.put(url, headers=headers, json={
53        "Name": name,
54        "ID": id,
55        "Tags": ["micro", "crmao", "user"],
56        "Address": address,
57        "Port": port,
58        "Check": {
59            "HTTP": f"http://{address}:{port}/health",
60            "Timeout": "5s",
61            "Interval": "5s",
62            "DeregisterCriticalServiceAfter": "5s",
63        }
64    })
65    if rsp.status_code == 200:
66        print("注册成功")
67
68    else:
69        print(rsp.text)
70        print(f"注册失败:{rsp}")
71
72
73def deregister(id):
74    url = f"http://127.0.0.1:8500/v1/agent/service/deregister/{id}"
75    rsp = requests.put(url, headers=headers)
76    if rsp.status_code == 200:
77        print("注销成功")
78
79    else:
80        print(rsp.text)
81        print(f"注销失败:{rsp}")
82
83
84if __name__ == "__main__":
85    registerGrpcCheck("micro", "user-srv", "127.0.0.1", 50052)
86    # deregister("go")
87    # registerAndCheck("micro", "go", "127.0.0.1", 8081)

配置中心

为什么需要配置中心

  1. 使用传统的配置文件方式,那么如果有20个实例,就得修改20个文件。
  2. 很多服务依赖同一个配置
  3. 生成环境及开发环境 配置需要隔离
  4. 多语言多框架,配置很难统一
  5. 配置实时推送,回滚
  6. 配置的权限问题

nacos安装

1docker run --name nacos-standalone -e MODE=standalone -e JVM_XMS=512m -e JVM_XMX=512m -e JVM_XMN=256m -p 8848:8848 -d nacos/nacos-server:latest

http://127.0.0.1:8848/nacos/

账号密码: nacos nacos

nacos术语

命名空间 :隔离配置集,一般用来区分 微服务

配置集(data id) : 相等于一个配置文件

(group) : 不同开发环境, 开发,测试,生成 怎么区分 , 用组来区分

go操作nacos

现在 github.com/nacos-group/nacos-sdk-go 有v2版本了。

 1package main
 2
 3import (
 4	"fmt"
 5	"github.com/nacos-group/nacos-sdk-go/clients"
 6	"github.com/nacos-group/nacos-sdk-go/common/constant"
 7	"github.com/nacos-group/nacos-sdk-go/vo"
 8)
 9
10func main() {
11	sc := []constant.ServerConfig{
12		{
13			IpAddr: "127.0.0.1",
14			Port:   8848,
15		},
16	}
17
18	cc := constant.ClientConfig{
19		NamespaceId:         "f59db538-cb44-45e0-96fd-3f4262e37e5b", //namespace id
20		TimeoutMs:           5000,
21		NotLoadCacheAtStart: true,
22		LogDir:              "/tmp/nacos/log",   
23		CacheDir:            "/tmp/nacos/cache",// 缓存目录
24		RotateTime:          "1h",
25		MaxAge:              3,
26		LogLevel:            "debug",
27	}
28
29	client, _ := clients.NewConfigClient(
30		vo.NacosClientParam{
31			ClientConfig:  &cc,
32			ServerConfigs: sc,
33		},
34	)
35
36	content, _ := client.GetConfig(vo.ConfigParam{
37		DataId: "go-user-web.json",
38		Group:  "dev"})
39
40	fmt.Println(content)
41	_ = client.ListenConfig(vo.ConfigParam{
42		DataId: "go-user-web.json",
43		Group:  "dev",
44		OnChange: func(namespace, group, dataId, data string) {
45			fmt.Println("group:" + group + ", dataId:" + dataId + ", data:" + data)
46		},
47	})
48
49	select {}
50}

连接池

连接池是指预先分配一批连接,并将它们放入一个缓冲区中循环使用,形成池化效应。

假如接口服务与 Redis 只有一个连接,平均每次请求 Redis 耗时 10ms,一秒钟能请求多少次呢? 一个连接一秒钟最多只能处理 100 次请求。 这显然在高并发系统中是不行的。

可能你会说,既然复用单个连接无法承载高并发,那就每次请求都新建连接嘛!想法很好,但现实很残酷。建立连接的时候,TCP 需要经历三次握手,假如网络延迟是 5 ms,三次握手就耗费 15ms,这比一次请求来回的时间都长了。其次,如果每次请求都建立连接,还需要考虑关闭连接,以免连接数过多压垮 Redis。而关闭连接的过程涉及 TCP 四次挥手,这又是一笔时间开销。

每次请求建立、关闭连接会导致请求延迟增加,还有可能把 Redis 压垮。 另外,如果高并发下频繁地建立、关闭连接,会导致操作系统耗费过多 CPU 用于分配、回收系统资源

如何设计连接池

通常,连接池有几个参数:最小连接数、空闲连接数、最大连接数。

拿redis链接池举例

最小连接数通常用于控制当前连接数的最小值,如果连接数小于最小值,遇到突发流量容易导致性能问题。

空闲连接数就是用于控制连接池中空闲连接的数量,如果超过这个值,意味着浪费资源,需要关闭多余连接;如果低于这个值,则可能无法应对突发流量,需要分配新的空闲连接。

有关空闲连接的分配,可以通过定时器来控制。设置时,要尽量保障服务向 Redis 发起请求的时候,有足够的空闲连接,这样可以减少建立连接的时间和资源开销。通常是由独立的线程定时检查空闲连接是否小于某个值,比如每隔 1 秒钟检查空闲连接数是否小于 2,是的话就新建一批空闲连接。

最大连接数通常用于控制系统中连接数不超过最大值,以免大量连接将 Redis 压垮。

服务需要发起一个 Redis 请求时,会先尝试从连接池中获取连接,如果获取不到,则会建立一个新的连接。

如何从连接池中获取连接,用完后又如何将连接放回到连接池中呢?

通常可以采用循环队列来保存空闲连接。使用的时候,可以从队列头部取出连接,用完后将空闲连接放到队列尾部。

在 Go 语言中,还有可以利用带缓冲区的 channel 来充当队列,这个简单点。

https://github.com/cr-mao/crgo/tree/main/infra/pool

分布式认证

  • jwt(可逆加解密, 可以使用refresh_token,access_token 用来刷新token)
  • token 存redis 、mysql

服务雪崩

各个服务直接相互调用,a调用b,b调用c ,c调用d

当d不可用,或者响应慢的时候,c服务抗请求能力下降,紧接着b也下降,a也下降, 慢慢的整个系统就奔榻了。

服务雪崩原因

  • 服务不可用
    • 硬件故障
    • 程序bug
    • 缓存击穿
    • 用户大量请求
  • 重试加大流量
  • 服务调用者也会不可用

服务雪崩应应对策略

  • 应用扩容
    • 升级机器、加机器
  • 流控
    • 关闭重试
    • 限流
  • 缓存
    • 缓存预加载
  • 服务降级
    • 服务接口拒绝服务
    • 页面拒绝服务
    • 延迟持久化
    • 随机拒绝服务

限流和熔断最终都会导致用户的体验降级

限流:流量2k,但是我的服务能力只有1k,所以这个时候多出来的流量怎么办?

  • a.拒绝;
  • b.排队等待;

用户体验

  • 用户体验不好:当前访问用户过多,请稍后重试
  • 用户体验降级:原本是访问流畅,下单流畅 -> 当前访问用户过多,请稍后重试

超时机制

解决服务雪崩,那么超时机制是必须的

timeout是为了保护服务,避免服务consumer因为服务provider 响应慢而响应慢, 是为了让comsumer服务保持原有的性能

重试

如果是provider偶尔抖动,因为超时而放弃了,那么多少会影响原来的业务。可以在超时后进行重试,可以重试考虑请求到其他服务节点去,因为原来provider已经响应慢了,可能因为重试,其性能问题会加剧

幂等

重试要考虑幂等性,对provider产生的影响应该是一样的。

常用方案:

  • 唯一索引

  • 新建用户,手机号唯一

  • token 机制

  • 表单token,进入页面生成token, 使用一次就删除 ,get和删除最好是原子性的

  • 悲观锁

    • mutex
    • mysql for update
  • 乐观锁

    • mysql 表增加版本号version字段,只有当前版本的能更新成功
  • 分布式锁 (常用)

  • select+insert

    • 一些后台任务,没有并发性的可以考虑
  • 对外接口

    • 如银联提供source+序列号,在数据库做唯一索引,防止多次付款

分布式锁

要解决的问题:

互斥性:任意客户端只能有一个获取到锁,不能同时多个客户端获取

安全性:锁只能被持有该锁的用户删除,不能被其他用户删除

死锁:当客户端因为某些原因宕机,而无法释放锁(1代码没释放锁,2断电,3网络出现问题)

容错:当部分节点宕机,客户端能获取到锁或释放锁

设置过期时间,解决以上问题,那么分布式锁超时问题(代码还没走完,key过期了)

分布式锁超时问题,考虑续租问题,用新的线程去续租,一般在2/3时间去尝试续租。

分布式锁解锁,自己才能解锁自己的锁,value用随机数,相等在能delete(lua script)

redis分布式锁

  • 优点
    • redis本身使用频繁,不需要维护其他第三方组件了
    • 简单
    • 性能高
  • 缺点
    • 依赖了第三方组件
    • 单机redis 宕机的可能性大
    • 集群redis中 锁有些问题(redlock)

redlock 大致原理

加锁大多数加锁成功即可。

释放锁,只是向所有机器发送指令,只释放自己加的锁

过载保护

所谓过载保护,是指负载超过系统的承载能力时,系统会自动采取保护措施,确保自身不被压垮。

每次春运抢票,12306 网站都会卡顿、瘫痪。后来它是怎么改进的呢?

  • 用户登录时添加图片验证,防止抢票软件自动登录
  • 当用户请求比较频繁的时候,系统出现提示“您的操作频率过快请稍后重试
  • 当流量过大时,系统会提示“系统繁忙,请稍后重试

其中的第 1 点和第 2 点能够限流,第 3 点可以熔断

为啥进行限流?

机器只能提供一定的流量访问能力。解决:1.拒绝。2排队等待

机器有限,机器成本或高峰请求(活动)只有一段时间, 只能提供一定的流量访问能力。解决:1.拒绝。2排队等待

  • 并发扛不住 ,增加机器的话,成本增加。
  • 用户体验不太好, 提示请稍后重试 和 你的服务器直接挂了
  • 用户体验下降了, 原本顺畅,现在是请稍后重试,服务被降级了

熔断的基本原理

在高可用设计中,除了流控外,对分布式系统调用链路中不稳定的资源(比如RPC服务等)进行熔断降级也是保障高可用的重要措施之一。现在的分布式架构中一个服务常常会调用第三方服务,这个第三方服务可能是另外的一个RPC接口、数据库,或者第三方 API 等等。例如,支付的时候,可能需要远程调用银联提供的 API;查询某个商品的价格,可能需要进行数据库查询。然而,除了自身服务外,依赖的外部服务的稳定性是不能绝对保证的。如果依赖的第三方服务出现了不稳定的情况,比如请求的响应时间变长,那么服务自身调用第三方服务的响应时间也会响应变长,也就是级联效应,服务自身的线程可能会产生堆积,最终可能耗尽业务自身的线程池,最终服务本身也变得不可用。

现代微服务架构基本都是分布式的,整个分布式系统由非常多的微服务组成。不同服务之间相互调用,组成复杂的调用链路。前面描述的问题在分布式链路调用中会产生放大的效果。整个复杂链路中的某一环如果不稳定,就可能会层层级联,最终可能导致整个链路全部挂掉。因此我们需要对不稳定的 弱依赖服务调用 进行 熔断降级,暂时切断不稳定的服务调用,避免局部不稳定因素导致整个分布式系统的雪崩。熔断降级作为保护服务自身的手段,通常在客户端(调用端)进行配置。

https://sentinelguard.io/zh-cn/docs/golang/circuit-breaking.html

最佳场景实践

熔断器一般用于应用对外部资源访问时的保护措施。这里简单描述一些场景:

  • 分布式系统中降级:假设存在应用A需要调用应用B的接口(特别是一些对接外部公司或者业务的接口时候),那么一般用于A调用B的接口时的防护;
  • 数据库慢调用的防护: 假设应用需要读/写数据库,但是该读写SQL存在潜在慢SQL的可能性,那么可以对该读写接口做防护,当接口不稳定时候(存在慢SQL),那么基于熔断器做降级。
  • 也可以是应用中任意弱依赖接口做降级防护(即自动降级后不影响业务核心链路)。

熔断就是在系统濒临崩溃的时候,立即中断服务,从而保障系统稳定避免崩溃。 它类似于电器中的“保险丝”或“断路器”,当电流过大的时候,“保险丝”会先被烧掉,断开电流,以免电路过热烧毁电器引起火灾。软件系统中的熔断也是如此,当系统满足某个判断条件时,就会拒绝处理请求,避免系统压力过大被压垮

nginx 负载均衡中,可以配置失败多少时间内如5次错误,则不发送请求给该服务器。这其实就是一种熔断措施。

熔断的判断条件

  • 比如 CPU 的使用率超过 90%
  • 比如请求错误率超过 5%
  • 比如请求延迟超过 1s

它们中的任意一个满足条件就会出现熔断。

有的如请求错误率可以直接观测,但是有的需要通过第三方监控系统。 调用方请求监控系统 。

比如A服务访问B服务,这时候B服务很慢(B服务压力过大,导致了出现不少请求错误),调用方很容易出现一个问题:每次调用都超时 结果这个时候数据库出现了问题,超时重试,导致网络2k的流量突然变成了3k,这让原本满负荷的B服务雪上加霜,B服务宕机 如果这时候有一种熔断机制(比较恰当的比喻如保险丝)

何时熔断:

  • a.发现了大部分请求很慢,50%请求都很慢
  • b.发现请求有50%都错误了
  • c.错误数量很多,比如1s出现了20个错误

限流的基本原理

限流的原理跟熔断有点类似,都是通过判断某个条件来确定是否执行某个策略。比如,如果并发请求达到 1000 QPS,那么系统会拒绝或者延迟处理后续请求。它的目的是确保系统高效、稳定地运行,确保请求能够快速处理的同时,保障系统不被流量压垮。

限流算法主要有:计数器限流、滑动窗口限流、令牌桶限流、漏桶限流。

计数器限流:固定窗口限流算法

滑动窗口限流: 滑动窗口限流算法是对计数器限流算法的优化。它的主要原理是将计数器限流算法中的一个周期拆分成很多等分,比如将 5 秒的周期拆成 5 个 1 秒,每次统计从当前时间开始过去 5 秒内的流量,每隔 1 秒往后滑动 1 秒。

令牌桶算法的基本原理是,使用一个定时器以恒定速度往桶里颁发令牌,桶满了则丢弃多余令牌。

漏桶限流算法: 漏桶算法的原理跟令牌桶有点相似,只不过漏桶算法采用“生产者-消费者”模型。在“生产者”一端,所有请求进队列,队列满了则丢弃请求。在“消费者”一端,以一定速度消费队列并处理请求。

Nginx按请求速率限速模块使用的是漏桶算法。

Sentinel

sentinel,hystrix组件,hystrix(已经不更新了,官方认为已经够好了)

Sentinel 和 Hystrix 的原则是一致的: 当调用链路中某个资源出现不稳定,例如,表现为 timeout,异常比例升高的时候,则对这个资源的调用进行限制,并让请求快速失败,避免影响到其它的资源,最终产生雪崩的效果。

流控

  • direct

  • 预热(即预热/冷启动方式。当系统长期处于低水位的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过"冷启动",让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮。)

    • 采用令牌桶 方式
 1package main
 2
 3import (
 4	sentinel "github.com/alibaba/sentinel-golang/api"
 5	"github.com/alibaba/sentinel-golang/core/hotspot"
 6	"github.com/gin-gonic/gin"
 7	"log"
 8)
 9
10func main()  {
11	//err := sentinel.Init("sentinel.yaml")
12	err := sentinel.InitDefault()
13	if err != nil {
14		log.Fatal(err)
15	}
16	//_,err=flow.LoadRules([]*flow.FlowRule{
17	//	{
18	//		Resource:"request",
19	//		MetricType:flow.QPS,
20	//		Count: 100,
21	//		ControlBehavior:flow.Reject,  //快速失败
22	//	},
23	//})
24	//
25	//if err != nil {
26	//	log.Fatal(err)
27	//}
28	_,err=hotspot.LoadRules([]*hotspot.Rule{
29		{
30			ID:"1",
31			Resource:"test",
32			MetricType:hotspot.QPS,
33			ControlBehavior:hotspot.Reject,
34			ParamIndex:        0,
35			Threshold:         3, //流控
36			MaxQueueingTimeMs: 0,
37			BurstCount:        10, //桶的容量, 开始是10 , 意味着第一秒可以 有13个请求
38			DurationInSec:     1,
39		},
40		{
41			ID:"2",
42			Resource:"test",
43			MetricType:hotspot.QPS,
44			ControlBehavior:hotspot.Reject,
45			ParamIndex:        1,
46			Threshold:         4,
47			MaxQueueingTimeMs: 0,
48			BurstCount:        0,
49			DurationInSec:     1,
50
51		},
52	})
53	if err != nil {
54		log.Fatal(err)
55	}
56
57
58	r:=gin.New()
59	r.Use(func(context *gin.Context) {
60		e,err:=sentinel.Entry("request")
61		if err!=nil{
62			context.AbortWithStatusJSON(400,gin.H{"error":"limit has bean reached"})
63		}else{
64			e.Exit()
65			context.Next()
66		}
67	})
68 
69	r.Use(func(context *gin.Context) {
70		if context.Request.URL.Path=="/prodlist"{
71			var t,size interface{}
72			if context.Query("type")!=""{
73				t=context.Query("type")
74			}
75			if context.Query("size")!=""{
76				size=context.Query("size")
77			}
78			
79			//请求有 type,size 参数 ,
80				e,err:=sentinel.Entry("test",sentinel.WithArgs(t,size))
81				if err!=nil{
82					context.AbortWithStatusJSON(400,gin.H{"error":"hot limit has bean reached"})
83				}else{
84					e.Exit()
85					context.Next()
86				}
87		}else{
88			context.Next()
89		}
90	})
91	r.Handle("GET","/prodlist", func(context *gin.Context) {
92		context.JSON(200,gin.H{"result":"prodlist"})
93	})
94
95	r.Run(":8080")
96}

分布式事务

分布式事务问题的由来

1try {
2    START TRANSACTION;
3    INSERT INTO `order_main_0`.`order_0` (`id`)VALUES(1);
4    INSERT INTO `order_main_0`.`order_0` (`id`)VALUES(2);
5    COMMIT;
6catch (Exception $e) {
7    ROLLBACK;
8}

但是上面这种普通事务, 只能适用于同一个数据库服务器。 现在我们做了数据库拆分。库A在服务器1上, 库B在服务器2上,上面这种方式,就不适用了。

分布式事务难点

ACID 特性在分布式环境下变得困难:

  1. 因为网络通信的不可靠,事务的原子性需要用多次日志和网络通信来保证。
  2. 存储节点的增加,放大了单个存储节点在事务过程中出现故障的风险。
  3. 用锁实现的事务隔离性,在故障或网络抖动时严重影响性能。

两阶段提交(2pc)

两阶段提交又称2PC(two-phase commit protocol),2pc是一个非常经典的强一致、中心化的原子提交协议。这 里所说的中心化是指协议中有两类节点:一个是中心化协调者节点(coordinator)和N个参与者节点 (partcipant)。

第一阶段:请求/表决阶段

既然称为两阶段提交,说明在这个过程中是大致存在两个阶段的处理流程。这个阶段被称之 为请求/表决阶段。是个什么意思呢? 就是在分布式事务的发起方在向分布式事务协调者(Coordinator)发送请求时,Coordinator首先会分别向参与者 (Partcipant)节点A、参与这节点(Partcipant)节点B分别发送事务预处理请求,称之为Prepare,有些资料也 叫"Vote Request"。 说的直白点就是问一下这些参与节点"这件事你们能不能处理成功了",此时这些参与者节点一般来说就会打开本地数 据库事务,然后开始执行数据库本地事务,但在执行完成后并不会立马提交数据库本地事务,而是先向Coordinator 报告说:“我这边可以处理了/我这边不能处理”。 如果所有的参与这节点都向协调者作了“Vote Commit”的反馈的话,那么此时流程就会进入第二个阶段了

第二阶段:提交/执行阶段(正常流程) 如果所有参与者节点都向协调者报告说“我这边可以处理”,那么此时协调者就会向所有参与者节点发送“全局提交确认 通知(global_commit)”,即你们都可以进行本地事务提交了,此时参与者节点就会完成自身本地数据库事务的提 交,并最终将提交结果回复“ack”消息给Coordinator,然后Coordinator就会向调用方返回分布式事务处理完成的结 果。

第二阶段:提交/执行阶段(异常流程)

相反,在第二阶段除了所有的参与者节点都反馈“我这边可以处理了”的情况外,也会有节点反馈说“我这边不能处 理”的情况发生,此时参与者节点就会向协调者节点反馈“Vote_Abort”的消息。此时分布式事务协调者节点就会向所 有的参与者节点发起事务回滚的消息(“global_rollback”),此时各个参与者节点就会回滚本地事务,释放资源, 并且向协调者节点发送“ack”确认消息,协调者节点就会向调用方返回分布式事务处理失败的结果。 以上就是两阶段提交的基本过程了,那么按照这个两阶段提交协议,分布式系统的数据一致性问题就能得到满足吗? 实际上分布式事务是一件非常复杂的事情,两阶段提交只是通过增加了事务协调者(Coordinator)的角色来通过2 个阶段的处理流程来解决分布式系统中一个事务需要跨多个服务节点的数据一致性问题。但是从异常情况上考虑,这 个流程也并不是那么的无懈可击。

假设如果在第二个阶段中Coordinator在接收到Partcipant的"Vote_Request"后挂掉了或者网络出现了异常,那么 此时Partcipant节点就会一直处于本地事务挂起的状态,从而长时间地占用资源。当然这种情况只会出现在极端情 况下,然而作为一套健壮的软件系统而言,异常Case的处理才是真正考验方案正确性的地方。

以下几点是XA-两阶段提交协议中会遇到的一些问题:

  • 性能问题。从流程上我们可以看得出,其最大缺点就在于它的执行过程中间,节点都处于阻塞状态。各个操作 数据库的节点此时都占用着数据库资源,只有当所有节点准备完毕,事务协调者才会通知进行全局提交,参与 者进行本地事务提交后才会释放资源。这样的过程会比较漫长,对性能影响比较大。

  • 协调者单点故障问题。事务协调者是整个XA模型的核心,一旦事务协调者节点挂掉,会导致参与者收不到提交 或回滚的通知,从而导致参与者节点始终处于事务无法完成的中间状态。

  • 丢失消息导致的数据不一致问题。在第二个阶段,如果发生局部网络问题,一部分事务参与者收到了提交消 息,另一部分事务参与者没收到提交消息,那么就会导致节点间数据的不一致问题。

以下php角色就是协调者,2个mysql 实例就是参与者

 1<?php
 2require "db.php";
 3$dbtest1 = new mysqli("127.0.0.1","root","123456","laravel_blog")or die("dbtest1 连接失败");
 4$dbtest2     = new mysqli("127.0.0.1","root","123456","laravel_blog_2")or die("dbtest2 连接失败");
 5
 6//为XA事务指定一个id,xid 必须是一个唯一值。
 7$xid = uniqid("");
 8
 9//两个库指定同一个事务id,表明这两个库的操作处于同一事务中
10$dbtest1->exec("XA START '$xid'");//准备事务1
11$dbtest2->exec("XA START '$xid'");//准备事务2
12
13
14try {
15    //$dbtest1
16    $return = $dbtest1->exec("UPDATE atest SET id=3 WHERE id=2") ;
17    echo "xa1:"; print_r($return);
18    if(!in_array($return,['0','1'])) {
19        throw new Exception("库1执行sql操作失败!");
20    }
21
22    //$dbtest2
23    $return = $dbtest2->exec("UPDATE atest_2 SET id2=3 WHERE id2=2") ;
24    echo "xa2:"; print_r($return);
25    if(!in_array($return,['0','1'])) {
26        throw new Exception("库2执行sql操作失败!");
27    }
28
29
30    //阶段1:$dbtest1提交准备就绪
31
32    $dbtest1->exec("XA END '$xid'");
33    $dbtest1->exec("XA PREPARE '$xid'");
34    //阶段1:$dbtest2提交准备就绪
35    $dbtest2->exec("XA END '$xid'");
36    $dbtest2->exec("XA PREPARE '$xid'");
37
38
39    //阶段2:提交两个库
40    $dbtest1->exec("XA COMMIT '$xid'");
41    $dbtest2->exec("XA COMMIT '$xid'");
42
43}
44
45catch (Exception $e) {
46
47    //阶段2:回滚
48    $dbtest1->exec("XA ROLLBACK '$xid'");
49    /*
50    上面这行代码是2pc中的xa事务,
51    update set a = a+1
52    如果是TCC,那么上面这行代码就变了,变成调用一个php接口,这个接口的作用就是把之前的操作给取消
53    update set a = a-1*/
54
55    $dbtest2->exec("XA ROLLBACK '$xid'");
56    die("Exception:".$e->getMessage());
57}
58
59echo "执行完毕";exit;
60/*
61$dbtest1->close();
62$dbtest2->close();*/
63
64?>

3pc

三阶段提交又称3PC,其在两阶段提交的基础上增加了CanCommit阶段,并引入了超时机制。一旦事务参与者迟迟 没有收到协调者的Commit请求,就会自动进行本地commit,这样相对有效地解决了协调者单点故障的问题。 但是性能问题和不一致问题仍然没有根本解决。

在阶段一中,如果所有的参与者都返回Yes的话,那么就会进入PreCommit阶段进行事务预提交。此时分布式事务协 调者会向所有的参与者节点发送PreCommit请求,参与者收到后开始执行事务操作,并将Undo和Redo信息记录到 事务日志中。参与者执行完事务操作后(此时属于未提交事务的状态),就会向协调者反馈“Ack”表示我已经准备好 提交了,并等待协调者的下一步指令。

否则,如果阶段一中有任何一个参与者节点返回的结果是No响应,或者协调者在等待参与者节点反馈的过程中超时 (2PC中只有协调者可以超时,参与者没有超时机制)。整个分布式事务就会中断,协调者就会向所有的参与者发 送“abort”请求。

相比较2PC而言,3PC对于协调者(Coordinator)和参与者(Partcipant)都设置了超时时间,而2PC只有协调者 才拥有超时机制。这解决了一个什么问题呢?这个优化点,主要是避免了参与者在长时间无法与协调者节点通讯(协 调者挂掉了)的情况下,无法释放资源的问题,因为参与者自身拥有超时机制会在超时后,自动进行本地commit从 而进行释放资源。而这种机制也侧面降低了整个事务的阻塞时间和范围。 另外,通过CanCommit、PreCommit、DoCommit三个阶段的设计,相较于2PC而言,多设置了一个缓冲阶段保 证了在最后提交阶段之前各参与节点的状态是一致的。 以上就是3PC相对于2PC的一个提高(相对缓解了2PC中的前两个问题),但是3PC依然没有完全解决数据不一致的 问题。

3pc解决了2阶段提交的前面2个问题, 但是后面那个问题依旧没有解决

补偿事务(TCC)

说起分布式事务的概念,不少人都会搞混淆,似乎好像分布式事务就是TCC。实际上TCC与2PC、3PC一样,只是分 布式事务的一种实现方案而已。

TCC(Try-Confirm-Cancel)又称补偿事务。其核心思想是:“针对每个操作都要注册一个与其对应的确认和补偿 (撤销操作)"。它分为三个操作:

  • Try阶段:主要是对业务系统做检测及资源预留。

  • Confirm阶段:确认执行业务操作。 通过调用确认接口

  • Cancel阶段:取消执行业务操作。 通过调用取消接口 TCC事务的处理流程与2PC两阶段提交类似,不过2PC通常都是在跨库的DB层面,而TCC本质上就是一个应用层面的 2PC,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据库操作的粒度, 使得降低锁冲突、提高吞吐量成为可能。

    而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现try、confirm、cancel三个操作。此外, 其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。为了满足一致性的要 求,confirm和cancel接口还必须实现幂等。

下单支付场景举例, 下单的时候扣库存, 生成出库单。

库存服务:

  • 库存表增加冻结字段。 (其中客户端订单服务调用端 取库存是根据库存数-冻结数,小于等于0 表示没货)
  • try阶段可以做点前置工作,好比增加库存冻结字段, 冻结库存数 +=卖的库存数
  • confirm 阶段: 执行扣减冻结的库存数,实际库存减去冻结的
  • cancel阶段, 回滚冻结字段。

出库单服务(当作是另外的服务,也许是和库存在一个服务中)

  • 出库单表增加状态值 好比 (1待支付, 2出库成功 3 出库失败)
  • try阶段 增加出库单记录,其中生成唯一号xxxSn (status=1)
  • confirm阶段 修改xxxSn的status=2
  • cancel阶段 修改xxxSn的status=3 .

订单服务中调用 它们的try接口,如果一方失败,则调用成功方的cancel接口。 都成功才能调用它们的confirm接口

(注意,cancel,confirm阶段其实调用也可能会失败,实际场景还是很复杂的)

需要找一款分布式事务框架,(事务管理器 ,事务日志)

对业务代码侵入性太强。 事务日志,会有性能损耗。需要通过锁来保证数据一致性,加锁导致性能不高。try,confirm,cancel都可能需要重试机制 和 幂等性操作。

基于本地消息表的最终一致性

  1. 增加消息日志表, 调用端业务 和 消息日志表 通过本地mysql事务完成一致性。
  2. 引入异步任务,消费这张消息日志表, 通过mq发送消息, mq消息到mq server了,confirm机制,则打标记已经发送成功。
  3. 消息日志对应到服务,收到消息进行消费,响应ack, 不然mq 会重发消息,在 消息日志对应到服务中需要实现 幂等性,避免重复消费问题。

https://blog.csdn.net/qq_45066628/article/details/119192794

最大努力通知

MQ会按照间隔如 10s, 1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知间隔。

可能通知方需要增加主动查询接口,方便调用方排查问题。

链路追踪tracer

为什么需要链路追踪?

链路追踪用来监视和诊断基于微服务的分布式系统,用于服务依赖性分析,辅助性能优化

分布式系统,服务调用,通过打log,问题也难查找 ,不同服务都有自己的log ,也是要一层一层询问。

链路追踪核心:代码埋点,数据存储,展示查询

链路追踪常用组件

zipkin ,jaeger,sykwalking

jaeger介绍

Jaeger组成

Jaeger Client(已废弃,推荐opentelemetry):为不同语言实现了符合OpenTracing标准的SDK.应用程序通过API写入数据,client library把trace信息按照应用程序指定的采样策略传递给jaeger-agent

Agent:它是一个监听再UDP端口上接收span数据的网络守护进程,它会将数据批量发送给collector.它被设计成一个基础组件,部署到所有的宿主机上.Agent将client library和collector解耦,为client library屏蔽了路由和发现collector的细节

collector:接收jaeger-agent发送来的数据.然后将数据写入到后端存储.collector被设计成无状态的组件,因此可以同事运行任意数量的jaeger-collector

Data Store:后端存储被设计成一个可插拔的组件,支持将数据写入cassandra,elasticsearch

Query:接收查询请求,然后从后端存储系统中检索trace并通过UI进行展示.Query是无状态的,可以启动多个实例,把他们部署再nginx这样的负载均衡器后面

https://www.cnblogs.com/chopper-poet/p/10743141.html

https://cloud.tencent.com/developer/article/1494063

jaeger是golang开发的

jaeger:高扩展性,原生支持opentracing,可观察性

jaeger原理: 记录服务调用链,个调用所需时间

jaeger术语 span:

  • jaeger中的逻辑工作单元
  • 作名称,操作的开始时间和持续时间
  • 跨度可以嵌套并排序以建立因果关系模型

span包含的对象:

  • operation name | span name 操作名称
  • start timestamp
  • end timestamp
  • span tag :一组键值对构成的span 标签对象
  • span log :一组span 的日志集合
  • spanContext:span 上下文对象
    • 任何一个opentracing的实现都需要将当前调用链的状态(如trace和span的id),依赖一个独特的span 去跨进程传输
    • Baggage Items,Trace的随行数据,是一个键值对集合,它存在于trace中,也需要跨进程边界传输
  • 一个完整的请求路径可以理解为一个tracer, 每次调用操作可以理解为1个span。
  • Span间关系:一个Span可以与一个或者多个SpanContexts存在因果关系。OpenTracing目前定义了两种关系:ChildOf(父子) 和 FollowsFrom(跟随)

使用opentelemetry-go操作Jaeger

https://github.com/open-telemetry/opentelemetry-go/blob/main/example/jaeger/main.go

demo 示例

 1package main
 2
 3import (
 4	"context"
 5	"log"
 6	"time"
 7
 8	"go.opentelemetry.io/otel"
 9	"go.opentelemetry.io/otel/attribute"
10	"go.opentelemetry.io/otel/exporters/jaeger"
11	"go.opentelemetry.io/otel/sdk/resource"
12	tracesdk "go.opentelemetry.io/otel/sdk/trace"
13	semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
14)
15
16const (
17	service     = "trace-demo" // 服务名
18	environment = "production" // 环境
19	id          = 1            // id
20)
21
22// tracerProvider returns an OpenTelemetry TracerProvider configured to use
23// the Jaeger exporter that will send spans to the provided url. The returned
24// TracerProvider will also use a Resource configured with all the information
25// about the application.
26
27// tracerProvider 创建trace的提供者
28func tracerProvider(url string) (*tracesdk.TracerProvider, error) {
29	// Create the Jaeger exporter
30	// 创建 Jaeger exporter
31	exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
32	if err != nil {
33		return nil, err
34	}
35	tp := tracesdk.NewTracerProvider(
36		// Always be sure to batch in production.
37		tracesdk.WithBatcher(exp),
38		// Record information about this application in a Resource.
39		tracesdk.WithResource(resource.NewWithAttributes(
40			semconv.SchemaURL,
41			semconv.ServiceNameKey.String(service),
42			attribute.String("environment", environment),
43			attribute.Int64("ID", id),
44		)),
45	)
46	return tp, nil
47}
48
49func main() {
50	tp, err := tracerProvider("http://127.0.0.1:14268/api/traces")
51	if err != nil {
52		log.Fatal(err)
53	}
54
55	// Register our TracerProvider as the global so any imported
56	// instrumentation in the future will default to using it.
57	// 设置全局的TracerProvider,方便后面使用
58	otel.SetTracerProvider(tp)
59
60	ctx, cancel := context.WithCancel(context.Background())
61	defer cancel()
62
63	// Cleanly shutdown and flush telemetry when the application exits.
64	// 优雅退出
65	defer func(ctx context.Context) {
66		// Do not make the application hang when it is shutdown.
67		ctx, cancel = context.WithTimeout(ctx, time.Second*5)
68		defer cancel()
69		if err := tp.Shutdown(ctx); err != nil {
70			log.Fatal(err)
71		}
72	}(ctx)
73
74	// trace 上报
75	tr := tp.Tracer("component-main")
76
77	ctx, span := tr.Start(ctx, "foo")
78	defer span.End()
79
80	// Context 向下传递
81	bar(ctx)
82}
83
84func bar(ctx context.Context) {
85	// Use the global TracerProvider.
86	// 使用 全局 TracerProvider
87	tr := otel.Tracer("component-bar")
88	_, span := tr.Start(ctx, "bar")
89	span.SetAttributes(attribute.Key("testset").String("value"))
90	defer span.End()
91
92	// Do bar...
93}

简单说明一下上报流程:

  • 1.创建tracerProvider,通过tracerProvider来连接 Jaeger
  • 2.创建Tracer ,推荐使用 全局的TracerProvider 来创建如:otel.Tracer(“component-bar”)
  • 3.创建span ctx, span := tr.Start(ctx, “foo”) 。
  • 4.调用span.End(),完成上报。 defer span.End()

TracerProvider接口: 以简单理解为Tracer提供者,只有一个方法Tracer 返回tracer

Tracer接口:Tracer is the creator of Spans.Tracer是span的创造者,也只提供一个对象span

Span is the individual component of a trace. It represents a single named and timed operation of a workflow that is traced. Span是trace的单个组成部分。 它代表了跟踪的工作流的单个命名和定时操作。

常用方法End:End completes the Span. The Span is considered complete and ready to be delivered through the rest of the telemetry pipeline after this method is called.完成span,此时Span被认为是完整的,已经准备好在此方法调用后通过剩下的 telemetry pipeline 传递。

简单理解下:调用end方法后,此时span是完整的,准备被上报。

更多opentelemetry-go操作其他分布式链路追踪组件可以看下方

https://github.com/open-telemetry/opentelemetry-go/tree/main/example