0%

代码审查者应该关注哪些方面?

代码审查者应该关注以下方面:

  • 设计:代码是否经过精心设计并适合您的系统?
  • 功能:

概述

  • 服务调用链追踪是干什么的?

  • Sleuth 核心功能和体系结构?

    调用链路数据模型 - Trace,Span,Annotation

  • 链路追踪原理介绍

  • Zipkin 简介:搭建Zipkin服务端、Sleuth 集成 Zipkin

  • Sleuth 集成 ELK 实现日志搜索

链路追踪的基本功能

  • 分布式环境下链路追踪
  • Timing 信息
  • 定位链路
  • 信息收集和展示

Sleuth

Sleuth 的功能

Sleuth 的最核心功能就是提供链路追踪,在一个用户请求发起到结束的整个过程中,这个Request经过的所有服务都会被梳理出来:

上图是一个由用户 X 请求发起的,穿过多个服务的分布式系统,A、B、C、D、E 表示不同的子系统或处理过程。在这个图中, A 是前端,B、C 是中间层、D、E 是 C 的后端。这些子系统通过 rpc 协议连接,例如 gRPC。

一个简单实用的分布式链路追踪系统的实现,就是对服务器上每一次请求以及响应收集跟踪标识符(message identifiers)和时间戳(timestamped events)。

分布式服务的跟踪系统需要记录在一次特定的请求后系统中完成的所有工作的信息。用户请求可以是并行的,同一时间可能有大量的动作要处理,一个请求也会经过系统中的多个服务,系统中时时刻刻都在产生各种跟踪信息,必须将一个请求在不同服务中产生的追踪信息关联起来。

借助 Sleuth 的链路追踪能力,我们还可以完成一些其他的任务,比如说:

  1. 线上故障定位:结合Tracking ID寻找上下游链路中所有的日志信息(这一步还需要借助一些其他开源组件,后面会有这部分的Demo)
  2. 依赖分析梳理:梳理上下游依赖关系,理清整个系统中所有微服务之间的依赖关系
  3. 链路优化:比如说目前我们有三种途径可以导流到下单接口,通过对链路调用情况的统计分析,我们可以识别出转化率最高的业务场景,从而为以后的产品设计提供指导意见。
  4. 性能分析:梳理各个环节的时间消耗,找到性能瓶颈,为性能优化、软硬件资源调配指明方向

Sleuth 的设计理念

从上图我们可以看出,Sleuth 采用底层 Log 系统的方式实现业务埋点。

哪些数据需要埋点?

每一个微服务都有自己的Log组件(slf4j,lockback等各不相同),当我们集成了Sleuth之后,它便会将链路信息传递给底层Log组件,同时Log组件会在每行Log的头部输出这些数据,这个埋点动作主要会记录两个关键信息:

  • 链路ID: 当前调用链的唯一ID,在这次调用请求开始到结束的过程中,所有经过的节点都拥有一个相同的链路ID
  • 单元ID: 在一次链路调用中会访问不同服务器节点上的服务,每一次服务调用都相当于一个独立单元,也就是说会有一个独立的单元ID。同时每一个独立单元都要知道调用请求来自哪里(就是对当前服务发起直接调用的那一方的单元ID,我们记为Parent ID)

比如这里服务A是起始节点,所以它的Event ID(单元ID)和Trace ID(链路ID)相同,而服务B的前置节点就是A节点,所以B的Parent Event就指向A的Event ID。而C在B的下游,所以C的Parent就指向B。A、B和C三个服务都有同一个链路ID,但是各自有不同的单元ID。

数据埋点之前要解决的问题

看起来创建埋点数据是件很容易的事儿,但是想让这套方案在微服务集群环境下生效,我们还需要先解决两个核心问题:

  • Log系统集成:如何让埋点信息加入到业务Log中?
  • 埋点信息的传递:我们知道SpringCloud中的调用都是通过HTTP请求来传递的,那么上游调用方是如何将链路ID等信息传入到下游的呢?

Log 系统集成

我们需要把链路追踪信息加入到业务Log中,这些业务Log是我们研发人员写在具体服务里的,而不是Sleuth单独打印的log,因此Sleuth需要找到一个合适的切入点,让底层Log组件可以获取链路信息,并且我们的业务代码还不需要做任何改动。

如果有对Log框架做过深度定制的同学可能一下就能想到实现方式,就是使用MDC + Format Pattern的方式输出信息,我们先来看一下Log组件打印信息到文件的过程:

当我们使用log.info()打印日志的时候,Log组件会将“写入”动作封装成一个LogEvent事件,而这个事件的具体表现形式由 Log Format 和 MDC 共同控制,Format决定了Log的输出格式,而MDC决定了输出什么内容。

Log Format Pattern

Log组件定义了日志输出格式,这和我们平时使用“String.format”的方式差不多,集成了Sleuth后的Log输出格式是下面这个样子:

1
"%5p [sleuth-traceA,%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]"

我们可以发现上面有几个X开头的占位符,这就是我们需要写入Log的链路追踪信息了。至于这几个符号分别对应链路信息的哪部分,下文会详细介绍。

MDC

MDC是通过InheritableThreadLocal来实现的,它可以携带当前线程的上下文信息。它的底层是一个Map结构,存储了一系列Key-Value的值。Sleuth就是借助Spring的AOP机制,在方法调用的时候配置了切面,将链路追踪数据加入到了MDC中,这样在打印Log的时候,就能从MDC中获取这些值,填入到Log Format中的占位符里。

由于MDC基于InheritableThreadLocal而不是ThreadLocal实现,因此假如在当前线程中又开启了新的子线程,那么子线程依然会保留父线程的上下文信息。

源代码可以参考logback组件中LogEvent类的prepareForDeferredProcessing方法,了解MDC和Log Format是如何工作的。建议在打印log的地方打一个断点,本地启动项目后发起一次调用,然后一路跟进去一看便知。

调用链路数据模型 Trace,Span,Annotation

Span

它标识了 Sleuth 下面一个基本工作单元,每个单元都有一个独一无二的ID。比如服务A发起对服务B的调用,这个事件就可以看作一个独立单元,它生成了一个独立的ID。

Span 不单单只是一个ID,它还包含一些其他信息,比如事件戳,它标识了一个事件从开始到结束的时间,我们可以用这个信息来统计接口的执行时间。每个 Span 还有一系列特殊的“标记”,也就是接下来要介绍的 “Annotation”,它标识了这个 Span 在执行过程中发起的一些特殊事件。

Trace

实际场景中,我们需要知道某次请求调用的情况,所以只有spanid还不够,得为每次请求做个唯一标识,这样才能根据标识查出本次请求调用的所有服务,它就是从头到尾贯穿整个调用链的ID,我们叫它 Trace ID,不管调用链路中途访问了多少个服务节点,在每个节点的 log 中都会打印同一个 Trace ID。

Annotation

一个 Span 可以包含多个 Annotation,每个 Annotation 表示一个特殊事件,比如:

  • Client Sent简称cs,客户端发起调用请求到服务端。
  • Server Received简称sr,指服务端接收到了客户端的调用请求。
  • Server Sent简称ss,指服务端完成了处理,准备将信息返给客户端。
  • Client Received简称cr,指客户端接收到了服务端的返回信息。

每个 Annotation 同样有一个时间戳字段,这样我们就能计算一个 Span 内部每个事件的起始和结束时间。

用一张图表示 Trace、Span 和 Annotation 的关系:

上面的图中调用了两个接口Server 1和Service 2,整个调用过程的所有Span都有相同的Trace ID,但每一个Span都有独立的Span ID。其中Service 1对Service 2的调用分为两个Span,蓝色Span的时间跨度从调用发起直到调用结束,分别记录了4个特殊事件(对应客户端和服务端对Request和Response的传输)。绿色Span主要针对Service 2内部业务的处理,因此我们在Service 2中打印的日志将会带上绿色Span的ID。

服务节点间的ID传递

我们知道了Trace ID和Span ID,眼下的问题就是如何在不同服务节点之间传递这些ID。我想这一步大家很容易猜到是怎么做的,因为在Eureka的服务治理下所有调用请求都是基于HTTP的,那我们的链路追踪ID也一定是HTTP请求中的一部分。可是把ID加在HTTP哪里好呢?Body里可以吗?NoNoNo,一来GET请求压根就没有Body,二来加入Body还有可能影响后台服务的反序列化。那加在URL后面呢?似乎也不妥,因为某些服务组件对URL的长度可能做了限制(比如Nginx可以设置最大URL长度)。

那剩下的只有Header了!Sleuth正是通过Filter向Header中添加追踪信息,我们来看下面表格中Header Name和Trace Data的对应关系:

HTTP Header Name Trace Data 说明
X-B3-TraceId Trace ID 链路全局唯一ID
X-B3-SpanId Span ID 当前Span的ID
X-B3-ParentSpanId Parent Span ID 前一个Span的ID
X-Span-Export Can be exported for sampling or not 是否可以被采样

Zipkin

Zipkin 能干什么?

上文讲了 Sleuth 的最核心功能就是提供链路追踪,数据采样、日志埋点和Log系统集成,但是没有什么页面可以展示出来,没有信息汇聚的能力,不能够直观的对整个集群的调用链路进行分析。

Zipkin是一套分布式实时数据追踪系统,它主要关注的是时间维度的监控数据,比如某个调用链路下各个阶段所花费的时间,同时还可以从可视化的角度帮我们梳理上下游系统之间的依赖关系。

Zipkin 的核心功能

Zipkin的主要作用是收集Timing维度的数据,以供查找调用延迟等线上问题。所谓Timing其实就是开始时间+结束时间的标记,有了这两个时间信息,我们就能计算得出调用链路每个步骤的耗时。Zipkin的核心功能有以下两点:

  1. 数据收集: 聚合客户端数据
  2. 数据查找: 通过不同维度对调用链路进行查找

Zipkin分为服务端和客户端,服务端是一个专门负责收集数据、查找数据的中心Portal,而每个客户端负责把结构化的Timing数据发送到服务端,供服务端做索引和分析。这里我们重点关注一下“Timing数据”到底用来做什么,前面我们说过Zipkin主要解决调用延迟情况的线上排查,它通过收集一个调用链上下游所有工作单元的独立用时,Zipkin就能知道每个环节在服务总用时中所占的比重,再通过图形化界面的形式,让开发人员知道性能瓶颈出在哪里。

Zipkin提供了多种维度的查找功能用来检索Span的耗时,最直观的是通过Trace ID查找整个Trace链路上所有Span的前后调用关系和每阶段的用时,还可以根据Service Name或者访问路径等维度进行查找。

Zipkin 的组件

  • Collector:很多人以为Collector是一个客户端组件,其实它是Zipkin Server的守护进程,用来验证客户端发送来的链路数据,并在存储结构中建立索引。守护进程就是指一类用于执行特定任务的后台进程,它独立于Zipkin Server的控制终端,一直等待接收客户端数据。

  • Storage:Zipkin支持ElasticSearch和MySQL等存储介质用来保存链路信息

  • Search Engine:提供基于JSON API的接口来查找信息

  • Dashboard:一个大盘监控页面,后台调用Search Engine来获取展示信息。大家如果本地启动Zipkin会每次刷新主页后系统日志会打印Error信息,这个是Zipkin的一个小问题,直接跳过即可。

MySQL事务的4种隔离级别

1 简介

事务的4种隔离级别分别是读未提交(Read Uncommitted)、读已提交(Read Committed)、 可重复读(Repeatable Read)和串行化(Serializable)。

首先,在了解这4种隔离级别前就必须先要了解其前提,也就是事务,本文简单介绍一下关于事务。

之后,我们也要理解这4种隔离级别产生的原因和场景展现以及4种隔离级别是如何解决问题的。

2 什么是数据库事务?

事务由一个有限的数据库操作序列组成,这些操作要么全部执行,要么全部不执行,是一个不可分割的工作单位。

例如一个银行转账场景:

A转账B 100元,A的账号扣除100元,B的账号加上100块。假如中间出现任何异常,例如,在A的账号扣100元时,银行瘫痪,B的账号余额没有发生变化。这时候就需要事务来保证将A的钱还回去。

2.1 事务的四大特性(ACID)

  • 原子性:事务作为一个整体被执行,包含在其中的对数据库的操作要么全部都执行,要么都不执行。
  • 一致性:指在事务开始之前和事务结束以后,数据不会被破坏,假如A账户给B账户转10块钱,不管成功与否,A和B的总金额是不变的。
  • 隔离性:多个事务并发访问时,事务之间是相互隔离的,一个事务不应该被其他事务干扰,多个并发事务之间要相互隔离。
  • 持久性:表示事务完成提交后,该事务对数据库所作的操作更改,将持久地保存在数据库之中。

3 并发事务会导致的问题

  • 脏读:事务 A 读取了事务 B 更新的数据,然后 B 进行回滚操作,那么A读取的数据就是脏数据
  • 不可重复读:事务A多次读取同一数据,事务B在事务A多次读取的过程中,对数据做了更新并提交,导致事务A多次夺取同一数据时,结果不一致。
  • 幻读:系统管理员A将数据库中所有学生的成绩从具体分数改为ABCDE等级,但是系统管理员B就在这个时候插入了一条具体分数的记录,当系统管理员A改结束后发现还有一条记录没有改过来,就好像发生了幻觉一样,这就叫幻读。

💡 不可重复读的和幻读很容易混淆,不可重复读侧重于修改,幻读侧重于新增或删除。解决不可重复读的问题只需锁住满足条件的行,解决幻读需要锁表。

3.1 本文会使用到的 SQL 语句:

3.1.1 示例表结构

1
2
3
4
5
6
7
CREATE TABLE `account` (
`id` int(11) NOT NULL,
`name` varchar(255) DEFAULT NULL,
`balance` int(11) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `un_name_idx` (`name`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3.1.2 查询事务的默认隔离级别

1
2
3
4
5
6
7
mysql> select @@transaction_isolation;
+-------------------------+
| @@transaction_isolation |
+-------------------------+
| REPEATABLE-READ |
+-------------------------+
1 row in set (0.01 sec)

3.1.3 设置当前会话的事务隔离级别

1
2
mysql> set session transaction isolation level read uncommitted;
Query OK, 0 rows affected (0.00 sec)

4 事务的4种隔离级别和示例演示

事务隔离级别 脏读 不可重复读 幻读
读未提交(read-uncommitted)
不可重复读(read-committed)又叫读已提交
可重复读(repeatable-read)
串行化(serializable)

4.1 读未提交

事务A:

Untitled

事务B:

Untitled

💡 读未提交是隔离级别最低的,会造成脏读。

4.2 读已提交

为了避免脏读,数据库有了比读未提交更高的隔离级别,即读已提交

对于提交:当前事务只能读取其它事务已提交的数据,未提交事务的数据读取不到。

事务A:

Untitled

事务B:

Untitled

由此可以得出结论,隔离级别设置为已提交读(READ COMMITTED)
 时,已经不会出现脏读问题了,当前事务只能读取到其他事务提交的数据。但是,站在事务A的角度想想,存在其他问题吗?

提交读的隔离级别会有什么问题呢?

在同一个事务A里,相同的查询sql,读取同一条记录(id=1),读到的结果是不一样的,即不可重复读。所以,隔离级别设置为read committed的时候,还会存在不可重复读的并发问题。

4.3 可重复读

为了避免不可重复读的并发问题,我们将隔离级别设置为可重复读(REPEATABLEE READ),重复一下之前的操作。

事务A:

Untitled

事务B:

Untitled

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/mysql_transaction_5.png

到了这一步,可以发现事务隔离级别设置为可重复读,可以解决幻读问题。

那么可重复读真的是否已经解决了幻读问题呢?毕竟还剩个事务隔离级别呢。

RR隔离级别下,手动启动一个事务,进行select操作,他会生成一个快照,可以理解为将当前数据库的数据复制一份,在当前事务中,之后不管进行多少次select查询,都是在模板中去取数据,所以不管数据库中是否对数据进行了改变,都不会影响当前事务数据的读取,从而避免了幻读。这种普通的 select 操作,称为快照读

但是如果在当前事务中使用了下图语句进行当前读:

1
select * from account for update;

for update是进行当前读的操作,他会重新从数据库去加载当前的最新的数据,每执行一次加载一次,如果在此时,另外一个事务为数据库添加了一个事务,再进行查询,会发现查询的数据与之前相比多了或者少了,这也就是幻读现象。

如果你阅读到这里,去实操一下,会发现和我说的不一样,有一种上当的感觉。

其实不是的,这是因为上述都是在标准的可重复读下的情况,在innodb存储引擎中对可重复读进行了改造,为当前读加上了 Next-key Lock,也就是间隙锁和行锁的统称,行锁防止了别的事务修改或者删除,间隙锁防止了别的事务新增。也就是在进行上面的for update事务中,其他的事务不能对数据进行增删操作,执行会报错或者长时间处于等待状态。

💡 注意:如果A事务如果进行了快照读,然后通过B事务对数据就行增删,然后紧接着A事务进行当前读操作,两次读取数据不一致,不能算作幻读,因为幻读定义是同一个select语句,快照读和当前读的查询语句是不一样的.

小结

  1. 数据库的并发问题有:脏读、不可重复读和幻读;
  2. 事务隔离级别依次为:读未提交、读已提交、可重复读和串行化;
  3. 在标准的RR下并没有彻底解决幻读,但是在Mysql的innodb引擎中彻底解决了;
  4. innodb通过 Next-Key lock解决的幻读问题,其实也就是阻塞串行化了;
  5. 不能把快照读和当前读在一个事务中进行比较是否出现幻读,两者不是同一个select,不满足幻读的官方定义。

4.4 串行化

略,这部分我懒得放图了,因为结果和上面没啥差别。

相关性


文献引用


一文彻底读懂MySQL事务的四大隔离级别 - Jay_huaxiao - 博客园

MySQL的四种事务隔离级别

关于数据库隔离级别为RR(可重复读)下是否解决幻读问题_眉梢i的博客-CSDN博客_rr解决幻读

引言

分布式事务是一个复杂的问题,本文就基于 RocketMQ 来实现最终一种性方案的分布式事务的示例与测试。

概念

RocketMQ事务消息示例

整体的流程如上所示。

RocketMQ 事务消息的原理是基于两阶段提交和事务状态回查。

  • 半消息:是指暂时不能被消费的消息,半消息实际上被放在主题名为 RMQ_SYS_TRANS_HALF_TOPIC下,当 producer 对半消息进行二次确认后,也就是上图的第 4 步后,consumer 才可以消费。

  • 事务状态回查:如果上图的第 4 步,半消息提交因为种种原因(网络原因、producer崩溃)失败了,而导致 broker 不能收到 producer 的确认消息,那么 broker 就会定时扫描这些半消息,主动去确认。

    当然,这个定时机制也是可以配置的。

最重要的两个概念就介绍到这里啦,其它的就不啰嗦了。

业务流程:每增加一个订单,就增加相应的积分。

数据库

数据库有两个,一个包含订单表和事务日志表,另一个则只有订单积分表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-- 本地业务
CREATE TABLE `orders` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`user_id` int unsigned NOT NULL,
`goods_name` varchar(255) NOT NULL COMMENT '商品名',
`total` int unsigned NOT NULL COMMENT '数量',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;

-- 这张表专门用于事务状态回查
-- 当本地业务提交后,此表也插入一条记录,两者处于同一个事务中
-- 通过 RocketMQ事务ID 查询该表,如果返回记录,则证明本地事务已提交;如果未返回记录,则本地事务可能是未知状态或者是回滚状态。
CREATE TABLE `transaction_log` (
`id` varchar(32) NOT NULL COMMENT '事务ID',
`business` varchar(32) NOT NULL COMMENT '业务标识',
`foreign_key` varchar(32) NOT NULL COMMENT '对应业务表中的主键',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
1
2
3
4
5
6
7
8
-- 远端业务
CREATE TABLE `order_credits` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`user_id` int unsigned NOT NULL COMMENT '用户ID',
`order_id` int unsigned NOT NULL COMMENT '订单ID',
`total` int unsigned NOT NULL COMMENT '积分数量',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;

核心代码

事务管理工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package com.fengxuechao.example.rocketmq;

import java.sql.*;

/**
* 事务管理工具类
*
* @author fengxuechao
* @date 2022/4/7
*/
public class TransactionUtil {

private static final ThreadLocal<Connection> connections = new ThreadLocal<>();

private TransactionUtil() {
}

/**
* 开启事务, jdbcUrl 要记得修改
*/
public static Connection startTransaction() {
Connection connection = connections.get();
if (connection == null) {
try {
connection = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/rocketmq?serverTimezone=GMT%2B8",
"root",
"12345678");
connection.setAutoCommit(false);
connections.set(connection);
} catch (SQLException e) {
e.printStackTrace();
}
}
return connection;
}

public static int execute(String sql, Object... args) throws SQLException {
PreparedStatement preparedStatement = createPreparedStatement(sql, args);
return preparedStatement.executeUpdate();
}

public static ResultSet select(String sql, Object... args) throws SQLException {
PreparedStatement preparedStatement = createPreparedStatement(sql, args);
preparedStatement.execute();
return preparedStatement.getResultSet();
}

private static PreparedStatement createPreparedStatement(String sql, Object[] args) throws SQLException {
Connection connection = startTransaction();
PreparedStatement preparedStatement = connection.prepareStatement(sql);
if (args != null) {
for (int i = 0; i < args.length; i++) {
preparedStatement.setObject(i + 1, args[i]);
}
}
return preparedStatement;
}


/**
* 提交事务
*/
public static void commit() {
try (Connection connection = connections.get()) {
connection.commit();
connections.remove();
} catch (SQLException e) {
e.printStackTrace();
}
}

/**
* 回滚事务
*/
public static void rollback() {
try (Connection connection = connections.get()) {
connection.rollback();
connections.remove();
} catch (SQLException e) {
e.printStackTrace();
}
}
}

生产者业务代码

发送半消息。对应的是上图中的第 1 步。

注意点:如果发送事务消息,在这里我们的创建的实例必须是 TransactionMQProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.fengxuechao.example.rocketmq;

import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* 消息事务生产者
*
* @author fengxuechao
* @date 2022/4/6
*/
public class TransactionProducer {

public static void main(String[] args) throws MQClientException, InterruptedException {

// 生产者事务监听器
TransactionListener transactionListener = new OrderTransactionListener();
TransactionMQProducer producer = new TransactionMQProducer();
ExecutorService executorService = new ThreadPoolExecutor(
2, 5, 100,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000),
r -> new Thread(r, "client-transaction-msg-check-thread"));

producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setProducerGroup("producer_order_trans_group");
producer.start();

// 发送消息
String topic = "transaction-topic";
String tags = "trans-order";
Order order = new Order();
order.setId(1);
order.setUserId(1);
order.setGoodsName("小脆面");
order.setTotal(2);
String orderJson = JSON.toJSONString(order);
try {
byte[] orderBytes = orderJson.getBytes(RemotingHelper.DEFAULT_CHARSET);
Message msg = new Message(topic, tags, "order", orderBytes);
producer.sendMessageInTransaction(msg, null);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
producer.shutdown();
}
}

  • 半消息确认,执行本地事务。对应的是executeLocalTransaction这个方法,需要注意的是本地业务提交后,事务日志表也插入一条记录,两者处于同一个事务中。

  • 回查事务状态。对应的是checkLocalTransaction这个方法。

    • 在这里,我们通过事务ID查询transaction_log这张表,如果可以查询到结果,就提交事务消息;如果没有查询到,就返回未知状态。
    • 如果返回未知状态,broker 会以1分钟的间隔时间不断回查,直至达到事务回查最大检测数,如果超过这个数字还未查询到事务状态,则回滚此消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package com.fengxuechao.example.rocketmq;

import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.sql.ResultSet;
import java.sql.SQLException;

/**
* @author fengxuechao
* @date 2022/4/7
*/
public class OrderTransactionListener implements TransactionListener {
/**
* When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return Transaction state
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// RocketMQ 半消息发送成功,开始执行本地事务
System.out.println("执行本地事务");
TransactionUtil.startTransaction();
LocalTransactionState state;
try {
// 创建订单
System.out.println("创建订单");
String orderStr = new String(msg.getBody());
Order order = JSON.parseObject(orderStr, Order.class);
String sql = "insert into orders(id, user_id, goods_name, total) values(?, ?, ?, ?)";
int executeUpdates = TransactionUtil.execute(sql, order.getId(), order.getUserId(),
order.getGoodsName(), order.getTotal());
if (executeUpdates > 0) {
// 写入本地事务日志
System.out.println("写入本地事务日志");
String logSql = "insert into transaction_log(id, business, foreign_key) values(?, ?, ?)";
String business = msg.getKeys();
TransactionUtil.execute(logSql, msg.getTransactionId(), business, order.getId());
}
TransactionUtil.commit();
state = LocalTransactionState.COMMIT_MESSAGE;
} catch (SQLException e) {
TransactionUtil.rollback();
state = LocalTransactionState.ROLLBACK_MESSAGE;
System.out.println("本地事务异常,回滚");
e.printStackTrace();
}
return state;
}

/**
* When no response to prepare(half) message. broker will send check message to check the transaction status, and this
* method will be invoked to get local transaction status.
*
* @param msg Check message
* @return Transaction state
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 回查本地事务
System.out.printf("回查本地事务, transactionId = %s%n", msg.getTransactionId());
TransactionUtil.startTransaction();
String sql = "select id, business, foreign_key from transaction_log where id = ?";
try (ResultSet transactionLog = TransactionUtil.select(sql, msg.getTransactionId())) {
if (transactionLog == null) {
return LocalTransactionState.UNKNOW;
}
} catch (SQLException e) {
e.printStackTrace();
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}

消费者业务代码

这里是增加积分的阶段。

  • 需要注意的是幂等性消费,总的思路就是在执行业务前,必需确认该消息是否被处理过。可以使用 RocketMQ 事务消息的 ID,也可以使用订单ID。

  • 第二个需要注意的是消息一直不能成功消费。这个时候,我想到两种方式处理:

    • 在代码中设置消息重试次数,然后发送邮件或其他方式通知业务方人工处理
    • 或者等待消息达到最大重试次数,进入死信队列(主题:%DLQ% + 消费者组名称)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.fengxuechao.example.rocketmq;

import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.nio.charset.StandardCharsets;
import java.sql.ResultSet;
import java.util.List;

/**
* @author fengxuechao
* @date 2022/4/7
*/
public class TransactionConsumer {

public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_order_trans_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("transaction-topic", "trans-order");
consumer.setMaxReconsumeTimes(3);
TransactionUtil.startTransaction();
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
// 多次消费消息处理仍然失败后,发送邮件,人工处理
if (msg.getReconsumeTimes() >= 3) {
// 发送邮件,人工处理
sendMail();
}

String orderStr = new String(msg.getBody(), StandardCharsets.UTF_8);
Order order = JSON.parseObject(orderStr, Order.class);
// 幂等性保持
String sql1 = "select * from order_credits where order_id = ?";
ResultSet rs = TransactionUtil.select(sql1, order.getId());
if (rs != null && rs.next()) {
System.out.println("积分已添加,订单已处理!");
} else {
// 增加积分
String sql2 = "insert into order_credits(user_id,order_id,total) values(?,?,?)";
TransactionUtil.execute(sql2, order.getUserId(), order.getId(), order.getTotal() * 2);
System.out.printf("订单(id=%s)添加积分%n", order.getId());
TransactionUtil.commit();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
TransactionUtil.rollback();
e.printStackTrace();
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

private void sendMail() { }
});

consumer.start();

System.out.println("Consumer Started.");
}
}

总体上,我的思路就是这样,希望大家一起讨论学习。

链接

引言

工欲善其事,必先利其器。

因为只有一台电脑,只能搭建伪集群来学习了,但是,本身又是个偷懒的人,启动伪集群 RocketMQ 的命令有点多,不想敲那么多的命令,顺便将搭建 RocketMQ 集群的部署方式记录一下。

RocketMQ 的部署方式有3种:

  • 2m-noslave:多 Master 模式,无 Slave。[双主模式]
    • 优点:配置简单,性能最高
    • 缺点:可能会有少量消息丢失(配置相关),单台机器重启或宕机期间,该机器下未被消费的消息在机器恢复前不可订阅,影响消息实时性
  • 2m-2s-sync:多 Master 多 Slave 模式,同步双写。[双主双从+同步模式]
    • 优点:服务可用性与数据可用性非常高
    • 缺点:性能比异步集群略低
  • 2m-2s-async:多 Master 多 Slave 模式,异步复制。[双主双从+异步模式]
    • 优点:性能同多Master几乎一样,实时性高,主备间切换对应用透明,不需人工干预
    • 缺点:Master宕机或磁盘损坏时会有少量消息丢失

本文主要记录一下同步双写模式的搭建步骤和一键启动脚本。

端口规划

首先,因为只有一台电脑,端口要进行规划,以避免端口占用的问题。

采用的方案就是“双主双从+同步模式”。

名称 端口
namesrv1 9876
Namesrv2 9877
brokera-master 10910
brokera-slave 10920
brokerb-master 10930
brokerb-slave 10940

下载

https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.3/rocketmq-all-4.9.3-bin-release.zip

配置文件

1
vim rocketmq/conf/2m-2s-sync/broker-a.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 集群名字
brokerClusterName=DefaultCluster
# broker 名字,可重复,master 的名字和 slave 的名字保持一致
brokerName=broker-a
# 0 表示 master, >0 表示 slave
brokerId=0
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=48
# Broker 的角色
brokerRole=SYNC_MASTER
# 刷盘方式
flushDiskType=ASYNC_FLUSH
# Broker 对外服务的监听端口, 注意需改端口, 并且要和默认的10911相差5以上
listenPort=10910
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# nameServer地址,分号分割
namesrvAddr=localhost:9876;localhost:9877
# 存储路径
storePathRootDir=/usr/local/var/lib/rocketmq/broker-a
# commitLog 存储路径
storePathCommitLog=/usr/local/var/lib/rocketmq/broker-a/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/var/lib/rocketmq/broker-a/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/var/lib/rocketmq/broker-a/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/local/var/lib/rocketmq/broker-a/checkpoint
# abort 文件存储路径
abortFile=/usr/local/var/lib/rocketmq/broker-a/about

其它节点的配置文件类似只需修改一下 brokerIdbrokerNamelistenPortbrokerRolestore*这些参数即可。

修改 JVM 启动资源要素

还是因为本身只有一台机器,资源有限的原因。

1
vim rocketmq-4.9.3/bin/runbroker.sh

image-20220402143803612

1
vim rocketmq-4.9.3/bin/runserver.sh

image-20220402143514440

一键启动脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#!/bin/bash
export JAVA_HOME="/Library/Java/JavaVirtualMachines/jdk1.8.0_251.jdk/Contents/Home"
export ROCKETMQ_HOME="/usr/local/rocketmq-4.9.3"
export ROCKETMQ_LOG_DIR="/usr/local/var/log/rocketmq"

if [ $1 == "startall" ] then
# 启动 namesrv1
nohup sh ${ROCKETMQ_HOME}/bin/mqnamesrv -c ${ROCKETMQ_HOME}/conf/2m-2s-sync/namesrv-1.properties > ${ROCKETMQ_LOG_DIR}/mqnamesrv1.log 2>&1 &

# 启动 namesrv2
nohup sh ${ROCKETMQ_HOME}/bin/mqnamesrv -c ${ROCKETMQ_HOME}/conf/2m-2s-sync/namesrv-2.properties > ${ROCKETMQ_LOG_DIR}/mqnamesrv2.log 2>&1 &

# 启动 broker-a
nohup sh ${ROCKETMQ_HOME}/bin/mqbroker -c ${ROCKETMQ_HOME}/conf/2m-2s-sync/broker-a.properties > ${ROCKETMQ_LOG_DIR}/broker-a.log 2>&1 &

# 启动 broker-a-s
nohup sh ${ROCKETMQ_HOME}/bin/mqbroker -c ${ROCKETMQ_HOME}/conf/2m-2s-sync/broker-a-s.properties > ${ROCKETMQ_LOG_DIR}/broker-a-s.log 2>&1 &

# 启动 broker-b
nohup sh ${ROCKETMQ_HOME}/bin/mqbroker -c ${ROCKETMQ_HOME}/conf/2m-2s-sync/broker-b.properties > ${ROCKETMQ_LOG_DIR}/broker-b.log 2>&1 &

# 启动 broker-b-s
nohup sh ${ROCKETMQ_HOME}/bin/mqbroker -c ${ROCKETMQ_HOME}/conf/2m-2s-sync/broker-b-s.properties > ${ROCKETMQ_LOG_DIR}/broker-b-s.log 2>&1 &

export NAMESRV_ADDR=localhost:9876,localhost:9877

echo "rocketmq 集群启动成功!"
elif[ $1 == "start" ] then
nohup ${ROCKETMQ_HOME}/bin/mqnamesrv &
nohup ${ROCKETMQ_HOME}/bin/mqbroker -n localhost:9876 &
export NAMESRV_ADDR=localhost:9876
echo "rocketmq 单机启动成功!"
elif [ $1 == "stop" ] then
# 先关闭 broker
sh ${ROCKETMQ_HOME}/bin/mqshutdown broker
# 再关闭 namesrv
sh ${ROCKETMQ_HOME}/bin/mqshutdown namesrv
echo "rocketmq 关闭成功!"
else
echo "rockermq_service.sh (startall|start|stop)"
fi

运行情况

image-20220402163857315

参考文档

官方中文部署文档

JDK8 JVM 垃圾收集日志打印参数

1
-Xms50m -Xmx50m -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+PrintGCCause -Xloggc:/Users/fengxuechao/WorkSpace/IdeaProjects/foodie/logs/order_gclog.log

JDK8 运行时参数

  • -XX:+TraceClassLoading:跟踪类加载情况

  • +XX:TraceBiasedLocking:跟踪偏向锁的情况

0 引言

好记性不如烂笔头,把常见的一些 MySQL 索引失效的问题记录下来,在工作中可以时时检查对比。

主要分为两个部分,explain 介绍和各种索引失效场景的模拟。

建表语句

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE `people` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '姓名',
`gender` tinyint unsigned DEFAULT NULL COMMENT '性别,0男1女',
`career` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '技能',
`skills` varchar(30) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '技能',
`birthday` date DEFAULT NULL COMMENT '出生日期',
`lifetime` int DEFAULT NULL COMMENT '寿命',
`gmt_create` timestamp NULL DEFAULT NULL COMMENT '入库时间',
PRIMARY KEY (`id`),
KEY `idx_career_skills_lifetime` (`career`,`skills`,`lifetime`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

explain 执行计划各属性解释说明

id

SELECT 识别符。这是 SELECT 的查询序列号,SQL 的执行顺序:

  1. id 相同时,执行顺序自上而下。
  2. 如果是子查询,id 会递增,id 值越大优先级越高,越先被执行。
  3. id 如果相同,可以认为是一组,自上而下顺序执行;再所有组中,优先级越高,越先被执行。

select_type

select_type 说明
SIMPLE 简单查询,不使用UNION或子查询
PRIMARY 最外层查询,查询中若包含任何复杂的子部分,最外层的 select 被标记为 PRIMARY
SUBQUERY 子查询中的第一个select,结果不依赖于外部查询
DEPENDENT SUBQUERY 子查询中的第一个select,结果依赖于外部查询
UNCACHEABLE SUBQUERY 一个子查询的结果不能被缓存,必需重新评估外链表的第一行
DERIVED 子查询,派生表的 select,from 子句的的子查询
UNION 联合,UNION 中的第二个或后面的SELECT语句
UNION RESULT 使用联合的结果
DEPENDENT UNION UNION中的第二个或后面的SELECT语句,取决于外面的查询

type

type 说明
ALL 全数据表扫描
index 全索引表扫描
RANGE 对索引列进行范围查询
INDEX_MERGE 合并索引,使用多个单列索引查询
REF 根据索引查找一个或多个值
EQ_REF 搜索时使用 primary key 或 unique 类型
CONST 常量,表最多有一个匹配行,因为仅有一行,在这行的列值可被优化器剩余部分认为是常数,const表很快,因为它们只读取一次。
SYSTEM 系统,表仅有一行(=系统表)。这是const联接类型的一个特例。
  • 性能:all < index < range < index_merge < ref_or_null < ref < eq_ref < system/const

  • 性能在 range 之下基本都可以进行调优

possible_keys

可能使用的索引

key

真实使用的索引

key_len

MySQL中使用索引字节长度

rows

mysql 预估为了找到所需的行而要读取的行数

filtered

按表条件过滤的行百分比

extra

extra 说明
Using index 此值表示mysql将使用覆盖索引,以避免访问表。
Using where mysql 将在存储引擎检索行后再进行过滤,许多where条件里涉及索引中的列,当(并且如果)它读取索引时,就能被存储引擎检验,因此不是所有带where子句的查询都会显示“Using where”。有时“Using where”的出现就是一个暗示:查询可受益于不同的索引。
Using temporary mysql 对查询结果排序时会使用临时表。常见于排序和分组查询group by,order by
Using filesort 当Query中包含 order by 操作,而且无法利用索引完成的排序操作称为“文件排序”。
Range checked for each record(index map: N) 没有好用的索引,新的索引将在联接的每一行上重新估算,N是显示在possible_keys列中索引的位图,并且是冗余的

1 不符合最左前缀原则

1
2
3
4
5
6
7
mysql> explain select * from people where `lifetime` = 23 and `skills` = '口才';
+----+-------------+--------+------------+------+---------------+------+---------+------+------+----------+-------------+
| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | filtered | Extra |
+----+-------------+--------+------------+------+---------------+------+---------+------+------+----------+-------------+
| 1 | SIMPLE | people | NULL | ALL | NULL | NULL | NULL | NULL | 14 | 7.14 | Using where |
+----+-------------+--------+------------+------+---------------+------+---------+------+------+----------+-------------+
1 row in set, 1 warning (0.00 sec)

说明:

  • 最左前缀原则指的是从索引最左前列开始并且不跳过索引中的列

  • type 是 ALL, 表示查询语句是全表数据查询,where 的查询条件是 lifetime 和 skills,缺少 career 这个索引条件,无法命中索引 idx_career_skills_lifetime。

2 在索引列上有多余操作,如:函数、计算、类型转换

1
2
3
4
5
6
mysql> explain select * from people where left(career, 2) = '群众';
+----+-------------+--------+------------+------+---------------+------+---------+------+------+----------+-------------+
| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | filtered | Extra |
+----+-------------+--------+------------+------+---------------+------+---------+------+------+----------+-------------+
| 1 | SIMPLE | people | NULL | ALL | NULL | NULL | NULL | NULL | 14 | 100.00 | Using where |
+----+-------------+--------+------------+------+---------------+------+---------+------+------+----------+-------------+

说明:

  • LEFT()函数是一个字符串函数,它返回具有指定长度的字符串的左边部分。
  • LEFT()函数的语法:LEFT(str,length);
    • str是要提取子字符串的字符串。
    • length是一个正整数,指定将从左边返回的字符数。
    • LEFT()函数返回str字符串中最左边的长度字符。
    • 如果strlength参数为NULL,则返回NULL值。
    • 如果length0或为负,则LEFT函数返回一个空字符串。
    • 如果length大于str字符串的长度,则LEFT函数返回整个str字符串。
    • 请注意,SUBSTRINGSUBSTR 函数也提供与LEFT函数相同的功能。
  • career 字段是有索引的,但是执行计划中看到的却没有命中,说明在索引字段上添加多余操作会使其失效

3 查询条件有!=><

4 like以通配符%为开头

5 使用来了or

6 使用了is null或者is not null

redis 常见问题

  1. 为什么使用 redis ?
    • 高性能、高并发
    • redis 走内存,天然支持高并发,单机轻松支持一秒十几万,是单机mysql性能的几十倍。
  2. redis 在项目中如何使用?

[toc]

引言

简单来说,回表就是 MySQL 要先查询到主键索引,然后再用主键索引定位到数据。

下面,对一些问题进行分析与回答:

  • 什么是聚簇索引?什么是非聚簇索引?
  • 为什么回表要先查到主键索引?
  • 主键索引和非主键索引有什么区别?
  • 如何避免回表?

聚簇索引和非聚簇索引是什么?

MySQL 的索引有不同的角度的分类方式,例如:按数据结构分、按逻辑角度分、按物理存储分。

其中,按物理存储分有两种索引:聚簇索引非聚簇索引

简单来说,聚簇索引是主键索引

主键索引之外的就是非聚簇索引,非聚簇索引又叫辅助索引或者二级索引。

主键索引和非主键索引有什么区别?

相同点:都使用的是 B+Tree 。

不同点:叶子节点存储的数据不同

  • 主键索引的叶子节点存储的是一行完整的数据
  • 非主键索引的叶子节点存储的是主键值。叶子节点不包含记录的全部数据,非主键的叶子节点除了用来排序的 key 还包含一个书签(bookmark),其中存储了聚簇索引的 key。

那么这两种索引在使用方面上有什么区别呢?

  1. 使用主键索引查询
    1
    2
    3
    # 主键索引的的叶子节点存储的是**一行完整的数据**
    # 所以只需搜索主键索引的 B+Tree 就可以轻松找到全部数据
    select * from user where id = 1;
  2. 使用非主键索引查询
    1
    2
    3
    4
    # 非主键索引的叶子节点存储的是**主键值**
    # 所以MySQL会先查询到 name 列的索引的 B+Tree,搜索得到对应的主键值
    # 然后再去搜索该主键值查询主键索引的 B+Tree 才可以找到对应的数据
    select * from user where name = 'Jack';

可以看出使用非主键索引要比主键索引多使用一次 B+Tree。

B-Tree 和 B+Tree 的简单理解

理解聚簇索引和非聚簇索引的关键在于 B+Tree 的理解。

用一幅图来表示,其它的就不再过多解释了:

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/KjXSSU.jpg

这里只是简单介绍一下 B-Tree 和 B+Tree 的区别:

  1. B+树中只有叶子节点会带有指向记录的指针,而B树则所有节点都带有,在内部节点出现的索引项不会再出现在叶子节点中。
  2. B+树中所有叶子节点都是通过指针连接在一起,而B树不会。

如何避免回表?

使用覆盖索引,所谓覆盖索引就是指索引中包含了查询中的所有字段,这种情况下就不需要再进行回表查询了。