0%

转载自https://www.cnblogs.com/skywang12345/p/3514593.html

1 简介

AtomicInteger, AtomicLong和AtomicBoolean这3个基本类型的原子类的原理和用法相似。所以以AtomicLong对基本类型的原子类进行介绍。内容主要包括:

  • AtomicLong介绍和函数列表
  • AtomicLong源码分析(基于JDK1.8.0_251)
  • AtomicLong示例

2 AtomicLong介绍和函数列表

AtomicLong是作用是对长整形进行原子操作。

在32位操作系统中,64位的long 和 double 变量由于会被JVM当作两个分离的32位来进行操作,所以不具有原子性。而使用AtomicLong能让long的操作保持原子型。

2.1 函数列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 构造函数
AtomicLong()
// 创建值为initialValue的AtomicLong对象
AtomicLong(long initialValue)
// 以原子方式设置当前值为newValue。
final void set(long newValue)
// 获取当前值
final long get()
// 以原子方式将当前值减 1,并返回减1后的值。等价于“--num”
final long decrementAndGet()
// 以原子方式将当前值减 1,并返回减1前的值。等价于“num--”
final long getAndDecrement()
// 以原子方式将当前值加 1,并返回加1后的值。等价于“++num”
final long incrementAndGet()
// 以原子方式将当前值加 1,并返回加1前的值。等价于“num++”
final long getAndIncrement()
// 以原子方式将delta与当前值相加,并返回相加后的值。
final long addAndGet(long delta)
// 以原子方式将delta添加到当前值,并返回相加前的值。
final long getAndAdd(long delta)
// 如果当前值 == expect,则以原子方式将该值设置为update。成功返回true,否则返回false,并且不修改原值。
final boolean compareAndSet(long expect, long update)
// 以原子方式设置当前值为newValue,并返回旧值。
final long getAndSet(long newValue)
// 返回当前值对应的int值
int intValue()
// 获取当前值对应的long值
long longValue()
// 以 float 形式返回当前值
float floatValue()
// 以 double 形式返回当前值
double doubleValue()
// 最后设置为给定值。延时设置变量值,这个等价于set()方法,但是由于字段是volatile类型的,因此次字段的修改会比普通字段(非volatile字段)有稍微的性能延时(尽管可以忽略),所以如果不是想立即读取设置的新值,允许在“后台”修改值,那么此方法就很有用。如果还是难以理解,这里就类似于启动一个后台线程如执行修改新值的任务,原线程就不等待修改结果立即返回(这种解释其实是不正确的,但是可以这么理解)。
final void lazySet(long newValue)
// 如果当前值 == 预期值,则以原子方式将该设置为给定的更新值。JSR规范中说:以原子方式读取和有条件地写入变量但不 创建任何 happen-before 排序,因此不提供与除 weakCompareAndSet 目标外任何变量以前或后续读取或写入操作有关的任何保证。大意就是说调用weakCompareAndSet时并不能保证不存在happen-before的发生(也就是可能存在指令重排序导致此操作失败)。但是从Java源码来看,其实此方法并没有实现JSR规范的要求,最后效果和compareAndSet是等效的,都调用了unsafe.compareAndSwapInt()完成操作。
final boolean weakCompareAndSet(long expect, long update)

3 AtomicLong 源码分析

  • AtomicLong源码分析

AtomicLong的代码很简单,下面仅以incrementAndGet()为例,对AtomicLong的原理进行说明。

incrementAndGet()源码如下:

1
2
3
4
5
6
7
8
9
10
11
public final long incrementAndGet() {
for (;;) {
// 获取AtomicLong当前对应的long值
long current = get();
// 将current加1
long next = current + 1;
// 通过CAS函数,更新current的值
if (compareAndSet(current, next))
return next;
}
}

说明

(01) incrementAndGet()首先会根据get()获取AtomicLong对应的long值。该值是volatile类型的变量,get()的源码如下:

1
2
3
4
5
6
// value是AtomicLong对应的long值
private volatile long value;
// 返回AtomicLong对应的long值
public final long get() {
return value;
}

(02) incrementAndGet()接着将current加1,然后通过CAS函数,将新的值赋值给value。

compareAndSet()的源码如下:

1
2
3
public final boolean compareAndSet(long expect, long update) {
return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}

compareAndSet()的作用是更新AtomicLong对应的long值。它会比较AtomicLong的原始值是否与expect相等,若相等的话,则设置AtomicLong的值为update。

4 AtomicLong示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// LongTest.java的源码
import java.util.concurrent.atomic.AtomicLong;

public class LongTest {

public static void main(String[] args){

// 新建AtomicLong对象
AtomicLong mAtoLong = new AtomicLong();

mAtoLong.set(0x0123456789ABCDEFL);
System.out.printf("%20s : 0x%016X\n", "get()", mAtoLong.get());
System.out.printf("%20s : 0x%016X\n", "intValue()", mAtoLong.intValue());
System.out.printf("%20s : 0x%016X\n", "longValue()", mAtoLong.longValue());
System.out.printf("%20s : %s\n", "doubleValue()", mAtoLong.doubleValue());
System.out.printf("%20s : %s\n", "floatValue()", mAtoLong.floatValue());

System.out.printf("%20s : 0x%016X\n", "getAndDecrement()", mAtoLong.getAndDecrement());
System.out.printf("%20s : 0x%016X\n", "decrementAndGet()", mAtoLong.decrementAndGet());
System.out.printf("%20s : 0x%016X\n", "getAndIncrement()", mAtoLong.getAndIncrement());
System.out.printf("%20s : 0x%016X\n", "incrementAndGet()", mAtoLong.incrementAndGet());

System.out.printf("%20s : 0x%016X\n", "addAndGet(0x10)", mAtoLong.addAndGet(0x10));
System.out.printf("%20s : 0x%016X\n", "getAndAdd(0x10)", mAtoLong.getAndAdd(0x10));

System.out.printf("\n%20s : 0x%016X\n", "get()", mAtoLong.get());

System.out.printf("%20s : %s\n", "compareAndSet()", mAtoLong.compareAndSet(0x12345679L, 0xFEDCBA9876543210L));
System.out.printf("%20s : 0x%016X\n", "get()", mAtoLong.get());
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
get() : 0x0123456789ABCDEF
intValue() : 0x0000000089ABCDEF
longValue() : 0x0123456789ABCDEF
doubleValue() : 8.1985529216486896E16
floatValue() : 8.1985531E16
getAndDecrement() : 0x0123456789ABCDEF
decrementAndGet() : 0x0123456789ABCDED
getAndIncrement() : 0x0123456789ABCDED
incrementAndGet() : 0x0123456789ABCDEF
addAndGet(0x10) : 0x0123456789ABCDFF
getAndAdd(0x10) : 0x0123456789ABCDFF

get() : 0x0123456789ABCE0F
compareAndSet() : false
get() : 0x0123456789ABCE0F

转载自https://www.cnblogs.com/skywang12345/p/3514604.html

在”Java多线程系列–“ Java多线程JUC原子类之 AtomicLong原子类 “中介绍过,AtomicLong是作用是对长整形进行原子操作。而AtomicLongArray的作用则是对”长整形数组”进行原子操作。

AtomicLongArray函数列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 创建给定长度的新 AtomicLongArray。
AtomicLongArray(int length)
// 创建与给定数组具有相同长度的新 AtomicLongArray,并从给定数组复制其所有元素。
AtomicLongArray(long[] array)

// 以原子方式将给定值添加到索引 i 的元素。
long addAndGet(int i, long delta)
// 如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。
boolean compareAndSet(int i, long expect, long update)
// 以原子方式将索引 i 的元素减1。
long decrementAndGet(int i)
// 获取位置 i 的当前值。
long get(int i)
// 以原子方式将给定值与索引 i 的元素相加。
long getAndAdd(int i, long delta)
// 以原子方式将索引 i 的元素减 1。
long getAndDecrement(int i)
// 以原子方式将索引 i 的元素加 1。
long getAndIncrement(int i)
// 以原子方式将位置 i 的元素设置为给定值,并返回旧值。
long getAndSet(int i, long newValue)
// 以原子方式将索引 i 的元素加1。
long incrementAndGet(int i)
// 最终将位置 i 的元素设置为给定值。
void lazySet(int i, long newValue)
// 返回该数组的长度。
int length()
// 将位置 i 的元素设置为给定值。
void set(int i, long newValue)
// 返回数组当前值的字符串表示形式。
String toString()
// 如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。
boolean weakCompareAndSet(int i, long expect, long update)

源码分析

AtomicLongArray的代码很简单,下面仅以incrementAndGet()为例,对AtomicLong的原理进行说明。

incrementAndGet()源码如下:

1
2
3
public final long incrementAndGet(int i) {
return addAndGet(i, 1);
}

说明:incrementAndGet()的作用是以原子方式将long数组的索引 i 的元素加1,并返回加1之后的值。

addAndGet()源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public long addAndGet(int i, long delta) {
// 检查数组是否越界
long offset = checkedByteOffset(i);
while (true) {
// 获取long型数组的索引 offset 的原始值
long current = getRaw(offset);
// 修改long型值
long next = current + delta;
// 通过CAS更新long型数组的索引 offset的值。
if (compareAndSetRaw(offset, current, next))
return next;
}
}

说明:addAndGet()首先检查数组是否越界。如果没有越界的话,则先获取数组索引i的值;然后通过CAS函数更新i的值。

getRaw()源码如下:

1
2
3
private long getRaw(long offset) {
return unsafe.getLongVolatile(array, offset);
}

说明:unsafe是通过Unsafe.getUnsafe()返回的一个Unsafe对象。通过Unsafe的CAS函数对long型数组的元素进行原子操作。如compareAndSetRaw()就是调用Unsafe的CAS函数,它的源码如下:

compareAndSetRaw()源码如下:

1
2
3
private boolean compareAndSetRaw(long offset, long expect, long update) {
return unsafe.compareAndSwapLong(array, offset, expect, update);
}

用法示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TestAtomicArrayLong {

public static void main(String[] args) {
// 新建AtomicLongArray对象
long[] arrLong = new long[] {10, 20, 30, 40, 50};
AtomicLongArray ala = new AtomicLongArray(arrLong);

ala.set(0, 100);
for (int i=0, len=ala.length(); i<len; i++)
System.out.printf("get(%d) : %s\n", i, ala.get(i));

System.out.printf("%20s : %s\n", "getAndDecrement(0)", ala.getAndDecrement(0));
System.out.printf("%20s : %s\n", "decrementAndGet(1)", ala.decrementAndGet(1));
System.out.printf("%20s : %s\n", "getAndIncrement(2)", ala.getAndIncrement(2));
System.out.printf("%20s : %s\n", "incrementAndGet(3)", ala.incrementAndGet(3));

System.out.printf("%20s : %s\n", "addAndGet(100)", ala.addAndGet(0, 100));
System.out.printf("%20s : %s\n", "getAndAdd(100)", ala.getAndAdd(1, 100));

System.out.printf("%20s : %s\n", "compareAndSet()", ala.compareAndSet(2, 31, 1000));
System.out.printf("%20s : %s\n", "get(2)", ala.get(2));
}
}

结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
get(0) : 100
get(1) : 20
get(2) : 30
get(3) : 40
get(4) : 50
getAndDecrement(0) : 100
decrementAndGet(1) : 19
getAndIncrement(2) : 30
incrementAndGet(3) : 41
addAndGet(100) : 199
getAndAdd(100) : 19
compareAndSet() : true
get(2) : 1000

Process finished with exit code 0

[TOC]

1. 前言

Alibaba Sentinel 基础部分主要分六个部分:

  • Sentinel 全景分析
  • Sentinel 极速入门
  • Sentinel 核心API
  • Sentinel 控制台使用
  • Sentinel 与 Spring 整合 AOP 分析
  • Sentinel 与 Dubbo 整合分析

2. Sentinel 全景分析

官方介绍

Sentinel 是什么?

Sentinel 是面向分布式服务架构的流量控制组件,主要以流量为切入点,从限流、流量整形、熔断降级、系统负载保护、热点防护等多个维度来帮助开发者保障微服务的稳定性。

它可以做到以下几点:

  • 流量控制

  • 熔断降级

  • 系统保护

  • 服务安全

Sentinel 的特性

  • 丰富的应用场景:Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等。
  • 完备的实时监控:Sentinel 同时提供实时的监控功能。您可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。
  • 广泛的开源生态:Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Dubbo、gRPC 的整合。您只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。
  • 完善的 SPI 扩展点:Sentinel 提供简单易用、完善的 SPI 扩展接口。您可以通过实现扩展接口来快速地定制逻辑。例如定制规则管理、适配动态数据源等。

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/sentinel的主要特性.png

部分是 sentinel-dashboard。

绿色部分是 sentinel-core,是 Sentinel 的核心库。

紫色部分表示 sentinel 可以持久化规则到配置中心。

蓝色部分 sentinel 的扩展点。

Sentinel 的生态

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/sentinel_2_6.png

Sentinel 的组成部分

  • 核心库:sentinel-core
  • 控制台:sentinel-dashboard
  • 第三方支持组件:如spring、dubbo、rocketmq等

3. Sentinel 极速入门

4. Sentinel 核心API

5. Sentinel 控制台使用

6. Sentinel 与 Spring 整合 AOP 分析

7. Sentinel 与 Dubbo 整合分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 构造函数
AtomicLong()
// 创建值为initialValue的AtomicLong对象
AtomicLong(long initialValue)
// 以原子方式设置当前值为newValue。
final void set(long newValue)
// 获取当前值
final long get()
// 以原子方式将当前值减 1,并返回减1后的值。等价于“--num”
final long decrementAndGet()
// 以原子方式将当前值减 1,并返回减1前的值。等价于“num--”
final long getAndDecrement()
// 以原子方式将当前值加 1,并返回加1后的值。等价于“++num”
final long incrementAndGet()
// 以原子方式将当前值加 1,并返回加1前的值。等价于“num++”
final long getAndIncrement()
// 以原子方式将delta与当前值相加,并返回相加后的值。
final long addAndGet(long delta)
// 以原子方式将delta添加到当前值,并返回相加前的值。
final long getAndAdd(long delta)
// 如果当前值 == expect,则以原子方式将该值设置为update。成功返回true,否则返回false,并且不修改原值。
final boolean compareAndSet(long expect, long update)
// 以原子方式设置当前值为newValue,并返回旧值。
final long getAndSet(long newValue)
// 返回当前值对应的int值
int intValue()
// 获取当前值对应的long值
long longValue()
// 以 float 形式返回当前值
float floatValue()
// 以 double 形式返回当前值
double doubleValue()
// 最后设置为给定值。延时设置变量值,这个等价于set()方法,但是由于字段是volatile类型的,
// 因此次字段的修改会比普通字段(非volatile字段)有稍微的性能延时(尽管可以忽略),
// 所以如果不是想立即读取设置的新值,允许在“后台”修改值,那么此方法就很有用。
// 如果还是难以理解,这里就类似于启动一个后台线程如执行修改新值的任务,原线程就不等待修改结果立即返回
// (这种解释其实是不正确的,但是可以这么理解)。
final void lazySet(long newValue)
// 如果当前值 == 预期值,则以原子方式将该设置为给定的更新值。
// JSR规范中说:以原子方式读取和有条件地写入变量但不 创建任何 happen-before 排序,
// 因此不提供与除 weakCompareAndSet 目标外任何变量以前或后续读取或写入操作有关的任何保证。
// 大意就是说调用weakCompareAndSet时并不能保证不存在happen-before的发生(也就是可能存在指令重排序导致此操作失败)。
// 但是从Java源码来看,其实此方法并没有实现JSR规范的要求,最后效果和compareAndSet是等效的,
// 都调用了unsafe.compareAndSwapInt()完成操作。
final boolean weakCompareAndSet(long expect, long update)


转载自https://www.cnblogs.com/skywang12345/p/3514604.html

在”Java多线程系列–“ Java多线程JUC原子类之 AtomicLong原子类 “中介绍过,AtomicLong是作用是对长整形进行原子操作。而AtomicLongArray的作用则是对”长整形数组”进行原子操作。

AtomicLongArray函数列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 创建给定长度的新 AtomicLongArray。
AtomicLongArray(int length)
// 创建与给定数组具有相同长度的新 AtomicLongArray,并从给定数组复制其所有元素。
AtomicLongArray(long[] array)

// 以原子方式将给定值添加到索引 i 的元素。
long addAndGet(int i, long delta)
// 如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。
boolean compareAndSet(int i, long expect, long update)
// 以原子方式将索引 i 的元素减1。
long decrementAndGet(int i)
// 获取位置 i 的当前值。
long get(int i)
// 以原子方式将给定值与索引 i 的元素相加。
long getAndAdd(int i, long delta)
// 以原子方式将索引 i 的元素减 1。
long getAndDecrement(int i)
// 以原子方式将索引 i 的元素加 1。
long getAndIncrement(int i)
// 以原子方式将位置 i 的元素设置为给定值,并返回旧值。
long getAndSet(int i, long newValue)
// 以原子方式将索引 i 的元素加1。
long incrementAndGet(int i)
// 最终将位置 i 的元素设置为给定值。
void lazySet(int i, long newValue)
// 返回该数组的长度。
int length()
// 将位置 i 的元素设置为给定值。
void set(int i, long newValue)
// 返回数组当前值的字符串表示形式。
String toString()
// 如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。
boolean weakCompareAndSet(int i, long expect, long update)

源码分析

AtomicLongArray的代码很简单,下面仅以incrementAndGet()为例,对AtomicLong的原理进行说明。

incrementAndGet()源码如下:

1
2
3
public final long incrementAndGet(int i) {
return addAndGet(i, 1);
}

说明:incrementAndGet()的作用是以原子方式将long数组的索引 i 的元素加1,并返回加1之后的值。

addAndGet()源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public long addAndGet(int i, long delta) {
// 检查数组是否越界
long offset = checkedByteOffset(i);
while (true) {
// 获取long型数组的索引 offset 的原始值
long current = getRaw(offset);
// 修改long型值
long next = current + delta;
// 通过CAS更新long型数组的索引 offset的值。
if (compareAndSetRaw(offset, current, next))
return next;
}
}

说明:addAndGet()首先检查数组是否越界。如果没有越界的话,则先获取数组索引i的值;然后通过CAS函数更新i的值。

getRaw()源码如下:

1
2
3
private long getRaw(long offset) {
return unsafe.getLongVolatile(array, offset);
}

说明:unsafe是通过Unsafe.getUnsafe()返回的一个Unsafe对象。通过Unsafe的CAS函数对long型数组的元素进行原子操作。如compareAndSetRaw()就是调用Unsafe的CAS函数,它的源码如下:

compareAndSetRaw()源码如下:

1
2
3
private boolean compareAndSetRaw(long offset, long expect, long update) {
return unsafe.compareAndSwapLong(array, offset, expect, update);
}

用法示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TestAtomicArrayLong {

public static void main(String[] args) {
// 新建AtomicLongArray对象
long[] arrLong = new long[] {10, 20, 30, 40, 50};
AtomicLongArray ala = new AtomicLongArray(arrLong);

ala.set(0, 100);
for (int i=0, len=ala.length(); i<len; i++)
System.out.printf("get(%d) : %s\\n", i, ala.get(i));

System.out.printf("%20s : %s\\n", "getAndDecrement(0)", ala.getAndDecrement(0));
System.out.printf("%20s : %s\\n", "decrementAndGet(1)", ala.decrementAndGet(1));
System.out.printf("%20s : %s\\n", "getAndIncrement(2)", ala.getAndIncrement(2));
System.out.printf("%20s : %s\\n", "incrementAndGet(3)", ala.incrementAndGet(3));

System.out.printf("%20s : %s\\n", "addAndGet(100)", ala.addAndGet(0, 100));
System.out.printf("%20s : %s\\n", "getAndAdd(100)", ala.getAndAdd(1, 100));

System.out.printf("%20s : %s\\n", "compareAndSet()", ala.compareAndSet(2, 31, 1000));
System.out.printf("%20s : %s\\n", "get(2)", ala.get(2));
}
}

结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
get(0) : 100
get(1) : 20
get(2) : 30
get(3) : 40
get(4) : 50
getAndDecrement(0) : 100
decrementAndGet(1) : 19
getAndIncrement(2) : 30
incrementAndGet(3) : 41
addAndGet(100) : 199
getAndAdd(100) : 19
compareAndSet() : true
get(2) : 1000

Process finished with exit code 0

初识 RocketMQ

RocketMQ是一款分布式、队列模型的消息中间件,由阿里巴巴自主研发的一款适用于高并发、高可靠性、海量数据场景的消息中间件。早期开源2.x版本名为MetaQ;15年迭代3.x版本,更名为RocketMQ,16年开始贡献到Apache,经过1年多的孵化,最终成为Apache顶级的开源项目,更新非常频繁,社区活跃度也非常高;目前最新版本为4.5.1-release版本(2019-7-20日前)。RocketMQ参考借鉴了优秀的开源消息中间件Apache Kafka(这也是我们后面课程中重点要讲解的内容哦),其消息的路由、存储、集群划分都借鉴了Kafka优秀的设计思路,并结合自身的 “双十一” 场景进行了合理的扩展和API丰富。

优秀的能力与支持

接下来我们一起来看一下RocketMQ优秀的能力吧 ~

  • 支持集群模型、负载均衡、水平扩展能力
  • 亿级别的消息堆积能力
  • 采用零拷贝的原理、顺序写盘、随机读(索引文件)
  • 丰富的API使用
  • 代码优秀,底层通信框架采用Netty NIO框架
  • NameServer 代替 Zookeeper
  • 强调集群无单点,可扩展,任意一点高可用,水平可扩展
  • 消息失败重试机制、消息可查询
  • 开源社区活跃度、是否足够成熟(经过双十一考验)

专业术语

任何一种技术框架,都有 “她” 的专有名词,在你刚开始接触 “她” 的时候,一定要了解 “她” 的专业术语,这样能够更快速、更高效的和 “她” 愉快的玩耍…

  • Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息。
  • Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费。
  • Push Consumer:Consumer的一种,需要向Consumer对象注册监听。
  • Pull Consumer:Consumer的一种,需要主动请求Broker拉取消息。
  • Producer Group:生产者集合,一般用于发送一类消息。
  • Consumer Group:消费者集合,一般用于接受一类消息进行消费。
  • Broker : MQ消息服务(中转角色,用于消息存储与生产消费转发)。

RocketMQ核心源码包及功能说明

如下图所示,我们要带小伙伴们一起来看看RocketMQ源码包的组成,这样更方便我们日后对他有一个深入的学习。

https://climg.mukewang.com/5df8401d0919472f03410264.png

https://climg.mukewang.com/5df8404409bd481303400274.png

  • rocketmq-broker 主要的业务逻辑,消息收发,主从同步, pagecache
  • rocketmq-client 客户端接口,比如生产者和消费者
  • rocketmq-common 公用数据结构等等
  • rocketmq-distribution 编译模块,编译输出等
  • rocketmq-example 示例,比如生产者和消费者
  • rocketmq-fliter 进行Broker过滤的不感兴趣的消息传输,减小带宽压力
  • rocketmq-logappender、rocketmq-logging日志相关
  • rocketmq-namesrv Namesrv服务,用于服务协调
  • rocketmq-openmessaging 对外提供服务
  • rocketmq-remoting 远程调用接口,封装Netty底层通信
  • rocketmq-srvutil 提供一些公用的工具方法,比如解析命令行参数
  • rocketmq-store 消息存储核心包
  • rocketmq-test 提供一些测试代码包
  • rocketmq-tools 管理工具,比如有名的mqadmin工具

集群架构模型

RocketMQ为我们提供了丰富的集群架构模型,包括单点模式、主从模式、双主模式、以及生产上使用最多的双主双从模式(或者说多主多从模式),在这里我们仅介绍一下经典的双主双从集群模型,如下图所示:

https://climg.mukewang.com/5df8407b0950a18b08540381.png

  • Producer集群就是生产者集群(他们在同一个生产者组 Producer Group)
  • Consumer集群就是消费者集群(他们在同一个消费者组 Consumer Group)
  • NameServer集群作为超轻量级的配置中心,只做集群元数据存储和心跳工作,不必保障节点间数据强一致性,也就是说NameServer集群是一个多机热备的概念。
  • 对于Broker而言,通常Master与Slave为一组服务,他们互为主从节点,通过NameServer与外部的Client端暴露统一的集群入口。Broker就是消息存储的核心MQ服务了。

集群架构思考

RocketMQ作为国内顶级的消息中间件,其性能主要依赖于天然的分布式Topic/Queue,并且其内存与磁盘都会存储消息数据,借鉴了Kafka的 “空中接力” 概念(这个我们后面学习Kafka的时候会详细的说明),所谓 “空中接力” 就是指数据不一定要落地,RocketMQ提供了同步/异步双写、同步/异步复制的特性。在真正的生产环境中应该选择符合自己业务的配置。下面针对于RocketMQ的高性能及其瓶颈在这里加以说明:

  • 架构思考:
    • RocketMQ目前本人在公司内部实际生产环境采用8M-8S的集群架构(8主8从)硬件单点Master为32C,96G内存,500G的SSD
    • 其主要瓶颈最终会落在IOPS上面,当高峰期来临的时候,磁盘读写能力是主要的性能瓶颈,每秒收发消息IOPS达到10W+ 消息,这也是公司内部主要的可靠性消息中间件
    • 在很多时候,我们的业务会有一些非核心的消息投递,后续会进行消息中间件的业务拆分,把不重要的消息(可以允许消息丢失、非可靠性投递的消息)采用Kafka的异步发送机制,借助Kafka强大的吞吐量和消息堆积能力来做业务的分流(当然RocketMQ的性能也足够好)。
    • 为什么瓶颈在IOPS? 根本原因还是因为云环境导致的问题,云环境的SSD物理存储显然和自建机房SSD会有不小的差距,这一点我们无论是从数据库的磁盘性能、还是搜索服务(ElasticSearch)的磁盘性能,都能给出准确的瓶颈点,单机IOPS达到1万左右就是云存储SSD的性能瓶颈,这个也解释了 “木桶短板原理” 的效应,在真正的生产中,CPU的工作主要在等待IO操作,高并发下 CPU资源接近极限,但是IOPS还是达不到我们想要的效果。

本节知识点回顾

Hi,小伙伴们,本节课我们通过简要的图文学习,带大家快速的过了一下RocketMQ,那么小伙伴们记住一定要在脑海里建立知识的结构体系,并串联起来!无论是现在,还是说未来,本神都希望小伙伴要按照下面的步骤进行回忆和复习:

  1. RocketMQ的前世今生 ?
  2. RocketMQ的专业术语?
  3. RocketMQ源码包的组成?
  4. RocketMQ的集群架构模型
  5. RocketMQ在真正生产环境中面临的瓶颈点以及解决方案

补充课外资料

为了方便爱学习的小伙伴们,本神特意加餐一波,提供RocketMQ的官方文档和相关软件包,以及对应的代码示例,辅助小伙伴们更好掌握RocketMQ。另外本神还在慕课网上的一门课程详细的讲解了RocketMQ的各个知识点以及实战应用,有想深入学习的小伙伴可以学习共勉。

  1. 官方文档
  2. RocketMQ4.3.0版本(Linux平台)
  3. RocketMQ代码示例
  4. 慕课网实战课程《RocketMQ核心技术精讲与高并发抗压实战》传送门:https://coding.imooc.com/class/292.html

修改storage配置文件

  • 修改该storage.cond配置文件

    https://climg.mukewang.com/5e0ef5c108c15a6116000535.jpg

1
2
3
4
5
6
7
8
9
10
11
# 修改组名
group_name=imooc
# 修改storage的工作空间
base_path=/usr/local/fastdfs/storage
# 修改storage的存储空间
store_path0=/usr/local/fastdfs/storage
# 修改tracker的地址和端口号,用于心跳
tracker_server=192.168.1.153:22122

# 后续结合nginx的一个对外服务端口号
http.server_port=8888
  • 创建目录
1
mkdir /usr/local/fastdfs/storage -p

启动storage

前提:必须首先启动tracker

1
/usr/bin/fdfs_storaged /etc/fdfs/storage.conf

检查进程如下:

1
ps -ef|grep storage

https://climg.mukewang.com/5e0ef61c08e61f1d16000147.jpg

测试上传

  • 修改的client.conf配置文件

    https://climg.mukewang.com/5e0ef62b089e938e15020712.jpg

    1
    2
    base_path=/usr/local/fastdfs/client
    tracker_server=192.168.1.153:22122
    1
    mkdir /usr/local/fastdfs/client
  • 测试:

1
2
wget <https://www.imooc.com/static/img/index/logo.png>
./fdfs_test /etc/fdfs/client.conf upload /home/logo.png

上传成功:

https://climg.mukewang.com/5e0ef64008dfab1e16000857.jpg