0%

[TOC]

1 Kafka 核心概念详解

1.1 Kafka(MQ) 的应用场景

1.1.1 Kafka(MQ)之异步化、服务解耦、削峰填谷

  • 异步化

    kafka_1_1

  • 服务解耦、削峰填谷

    kafka_1_2

阅读全文 »

[TOC]

1. 整体功能概述

rabbitmq_24_1

rabbitmq_24_2

2. 基础组件模块划分

1
2
3
4
5
rabbit-parent
- rabbit-api 对外提供统一的API接口
- rabbit-commmon 公共包
- rabbit-core-producer 核心包
- rabbit-task es-job

Github 地址

3. 可靠性消息投递

rabbitmq_19_13

rabbitmq_24_4

4. 思维导图

rabbitmq_xmind.png

rabbitmq_23_3

rabbitmq_23_4

rabbitmq_23_5

rabbitmq_23_6

rabbitmq_23_8

生产者关键代码

application.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
server.servlet.context-path=/
server.port=8001

spring.rabbitmq.addresses=192.168.11.71:5672,192.168.11.72:5672,192.168.11.71:5673
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

## 使用启用消息确认模式
spring.rabbitmq.publisher-confirms=true

## 设置return消息模式,注意要和mandatory一起去配合使用
##spring.rabbitmq.publisher-returns=true
##spring.rabbitmq.template.mandatory=true

spring.application.name=rabbit-producer
spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL

RabbitSender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import java.util.Map;
import java.util.UUID;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;


@Component
public class RabbitSender {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 这里就是确认消息的回调监听接口,用于确认消息是否被broker所收到
*/
final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
/**
* @param CorrelationData 作为一个唯一的标识
* @param ack broker 是否落盘成功
* @param cause 失败的一些异常信息
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("消息ACK结果:" + ack + ", correlationData: " + correlationData.getId());
}
};

/**
* 对外发送消息的方法
* @param message 具体的消息内容
* @param properties 额外的附加属性
* @throws Exception
*/
public void send(Object message, Map<String, Object> properties) throws Exception {

MessageHeaders mhs = new MessageHeaders(properties);
Message<?> msg = MessageBuilder.createMessage(message, mhs);

rabbitTemplate.setConfirmCallback(confirmCallback);

// 指定业务唯一的iD
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

MessagePostProcessor mpp = new MessagePostProcessor() {

@Override
public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message)
throws AmqpException {
System.err.println("---> post to do: " + message);
return message;
}
};

rabbitTemplate.convertAndSend("exchange-1",
"springboot.rabbit",
msg, mpp, correlationData);

}
}

消费者关键代码

application.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
server.servlet.context-path=/
server.port=8002

spring.rabbitmq.addresses=192.168.11.71:5672,192.168.11.72:5672,192.168.11.71:5673
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

## 表示消费者消费成功消息以后需要手工的进行签收(ack),默认为auto
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.simple.prefetch=1


## 作业:
## 最好不要在代码里写死配置信息,尽量使用这种方式也就是配置文件的方式
## 在代码里使用 ${} 方式进行设置配置: ${spring.rabbitmq.listener.order.exchange.name}
spring.rabbitmq.listener.order.exchange.name=order-exchange
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.key=order.*

spring.application.name=rabbit-producer
spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL

RabbitReceive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Component
public class RabbitReceive {

/**
* 组合使用监听
* @RabbitListener @QueueBinding @Queue @Exchange
* @param message
* @param channel
* @throws Exception
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue-1", durable = "true"),
exchange = @Exchange(name = "exchange-1",
durable = "true",
type = "topic",
ignoreDeclarationExceptions = "true"),
key = "springboot.*"
)
)
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
// 1. 收到消息以后进行业务端消费处理
System.err.println("-----------------------");
System.err.println("消费消息:" + message.getPayload());

// 2. 处理成功之后 获取deliveryTag 并进行手工的ACK操作, 因为我们配置文件里配置的是 手工签收
// spring.rabbitmq.listener.simple.acknowledge-mode=manual
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
}

rabbitmq_22_1

rabbitmq_22_2

rabbitmq_22_3

rabbitmq_22_4

rabbitmq_22_5

rabbitmq_22_7

Sender4DLXExchange

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Sender4DLXExchange {


public static void main(String[] args) throws Exception {

//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.71");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_dlx_exchange";
String routingKey = "group.bfxy";
//5 发送

Map<String, Object> headers = new HashMap<String, Object>();

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
// TTL
.expiration("6000")
.headers(headers).build();

String msg = "Hello World RabbitMQ 4 DLX Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey , props , msg.getBytes());

}

}

Receiver4DLXtExchange

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class Receiver4DLXtExchange {

public static void main(String[] args) throws Exception {


ConnectionFactory connectionFactory = new ConnectionFactory() ;

connectionFactory.setHost("192.168.11.71");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();
//4 声明正常的 exchange queue 路由规则
String queueName = "test_dlx_queue";
String exchangeName = "test_dlx_exchange";
String exchangeType = "topic";
String routingKey = "group.*";
// 声明 exchange
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);


// 注意在这里要加一个特殊的属性arguments: x-dead-letter-exchange
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dlx.exchange");
//arguments.put("x-dead-letter-routing-key", "dlx.*");
//arguments.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, false, false, false, arguments);
channel.queueBind(queueName, exchangeName, routingKey);


//dlx declare:
channel.exchangeDeclare("dlx.exchange", exchangeType, true, false, false, null);
channel.queueDeclare("dlx.queue", false, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");


// durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
// 循环获取消息
while(true){
// 获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}

rabbitmq_21_1

rabbitmq_21_2

rabbitmq_21_3

rabbitmq_21_4

rabbitmq_21_5

rabbitmq_21_6

rabbitmq_21_7

Sender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Sender {


public static void main(String[] args) throws Exception {

//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
// connectionFactory.setHost("192.168.11.71");
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();
//4 声明
String queueName = "test001";
//参数: queue名字,是否持久化,独占的queue(仅供此连接),不使用时是否自动删除, 其他参数
channel.queueDeclare(queueName, true, false, false, null);

for(int i = 0; i < 5;i++) {
String msg = "Hello World RabbitMQ " + i;
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("flag", i);
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers).build();
channel.basicPublish("", queueName , props , msg.getBytes());
}
}

}

Receiver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class Receiver {

public static void main(String[] args) throws Exception {


ConnectionFactory connectionFactory = new ConnectionFactory() ;

// connectionFactory.setHost("192.168.11.71");
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();

String queueName = "test001";
//durable 是否持久化消息
channel.queueDeclare(queueName, true, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);

// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, false, consumer);
// 循环获取消息
while(true){
// 获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
Thread.sleep(1000);

if((Integer)delivery.getProperties().getHeaders().get("flag") == 0) {
//throw new RuntimeException("异常");
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
} else {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
}

IO多路复用

I/O是指网络I/O,多路指多个TCP连接(即socket或者channel),复用指复用一个或几个线程。意思说一个或一组线程处理多个TCP连接。最大优势是减少系统开销小,不必创建过多的进程/线程,也不必维护这些进程/线程。 IO多路复用使用两个系统调用(select/poll/epoll和recvfrom),blocking IO只调用了recvfrom;select/poll/epoll 核心是可以同时处理多个connection,而不是更快,所以连接数不高的话,性能不一定比多线程+阻塞IO好,多路复用模型中,每一个socket,设置为non-blocking,阻塞是被select这个函数block,而不是被socket阻塞的。

select机制

基本原理: 客户端操作服务器时就会产生这三种文件描述符(简称fd):writefds(写)、readfds(读)、和exceptfds(异常)。select会阻塞住监视3类文件描述符,等有数据、可读、可写、出异常 或超时、就会返回;返回后通过遍历fdset整个数组来找到就绪的描述符fd,然后进行对应的IO操作。

优点:几乎在所有的平台上支持,跨平台支持性好

缺点:由于是采用轮询方式全盘扫描,会随着文件描述符FD数量增多而性能下降。 每次调用 select(),需要把 fd 集合从用户态拷贝到内核态,并进行遍历(消息传递都是从内核到用户空间) 默认单个进程打开的FD有限制是1024个,可修改宏定义,但是效率仍然慢。

poll机制

基本原理与select一致,也是轮询+遍历;唯一的区别就是poll没有最大文件描述符限制(使用链表的方式存储fd)。

epoll机制

基本原理:没有fd个数限制,用户态拷贝到内核态只需要一次,使用时间通知机制来触发。通过epoll_ctl注册fd,一旦fd就绪就会通过callback回调机制来激活对应fd,进行相关的io操作。epoll之所以高性能是得益于它的三个函数 1)epoll_create()系统启动时,在Linux内核里面申请一个B+树结构文件系统,返回epoll对象,也是一个fd 2)epoll_ctl() 每新建一个连接,都通过该函数操作epoll对象,在这个对象里面修改添加删除对应的链接fd, 绑定一个callback函数 3)epoll_wait() 轮训所有的callback集合,并完成对应的IO操作

优点:没fd这个限制,所支持的FD上限是操作系统的最大文件句柄数,1G内存大概支持10万个句柄 效率提高,使用回调通知而不是轮询的方式,不会随着FD数目的增加效率下降 内核和用户空间mmap同一块内存实现(mmap是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间)

例子:100万个连接,里面有1万个连接是活跃,我们可以对比 select、poll、epoll 的性能表现 select:不修改宏定义默认是1024,l则需要100w/1024=977个进程才可以支持 100万连接,会使得CPU性能特别的差。 poll: 没有最大文件描述符限制,100万个链接则需要100w个fd,遍历都响应不过来了,还有空间的拷贝消耗大量的资源。 epoll: 请求进来时就创建fd并绑定一个callback,主需要遍历1w个活跃连接的callback即可,即高效又不用内存拷贝。

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/redis_thread_model1.png

Redis线程模型的组成


  1. 多个socket
  2. IO多路复用程序
  3. scocket队列
  4. 文件事件分配器
  5. 事件处理器(连接应答处理器,命令请求处理器,命令回复处理器)

多个 socket 可能会并发产生不同的操作,每个操作对应不同的文件事件,但是 IO 多路复用程序会监听多个 socket,会将 socket 产生的事件放入队列中排队,事件分派器每次从队列中取出一个事件,把该事件交给对应的事件处理器进行处理。

流程及原理


redis_thread_model

  1. 客户端socket01请求redis的server scoket建立连接,此时server socket生成AE_READABLE事件,IO多路复用程序监听到server socket产生的事件,并将该事件压入队列。

    文件事件分派器从队列中拉取事件交给连接应答处理器,处理器同时生成一个与客户端通信的socket01,并将该scoket01的AE_READABLE事件与命令请求处理器关联

  2. 此时客户端scoket01发送一个”set key value“的请求,redis的scoket01接收到AE_READABLE事件,IO多路复用程序监听到事件,将事件压入队列,文件分派器取到事件,由于scoket01已经

    和命令请求处理器关联,所以命令请求处理器开始”set key value”,完毕后会将redis的scoket01的AE_WRITABLE事件关联到命令回复处理器

  3. 如果此时客户端准备好接收返回结果了,向redis中的socket01发起询问请求,那么 redis 中的 socket01 会产生一个 AE_WRITABLE 事件,同样压入队列中,事件分派器找到相关联的命令回复处理器,由命令回复处理器对 socket01 输入本次操作的一个结果,比如 ok,之后解除 socket01 的 AE_WRITABLE 事件与命令回复处理器的关联。

这样便完成了redis的一次通信。

文件事件分配器


  1. Redis 基于 Reactor 模式开发了自己的网络事件处理器: 这个处理器被称为文件事件处理器(file event handler)
  2. 文件事件处理器使用 I/O 多路复用(multiplexing)程序来同时监听多个套接字, 并根据套接字目前执行的任务来为套接字关联不同的事件处理器。
  3. 当被监听的套接字准备好执行连接应答(accept)、读取(read)、写入(write)、关闭(close)等操作时, 与操作相对应的文件事件就会产生, 这时文件事件处理器就会调用套接字之前关联好的事件处理器来处理这些事件。
  4. 文件事件处理器以单线程方式运行, 但通过使用 I/O 多路复用程序来监听多个套接字, 文件事件处理器既实现了高性能的网络通信模型, 又可以很好地与 redis 服务器中其他同样以单线程方式运行的模块进行对接, 这保持了 Redis 内部单线程设计的简单性

为什么redis使用单线程模型还能保证高性能?


纯内存访问

redis 将所有数据放在内存中,内存的响应时长大约为 100 纳秒,这是 redis 的 QPS 过万的重要基础。

非阻塞式IO

  • 什么是阻塞式 IO

    当我们调用 Scoket 的读写方法,默认它们是阻塞的。

    read() 方法要传递进去一个参数 n,表示读取这么多字节后再返回,如果没有读够 n 字节线程就会阻塞,直到新的数据到来或者连接关闭了, read 方法才可以返回,线程才能继续处理。

    write() 方法会首先把数据写到系统内核为 Scoket 分配的写缓冲区中,当写缓存区满溢,即写缓存区中的数据还没有写入到磁盘,就有新的数据要写道写缓存区时,write() 方法就会阻塞,直到写缓存区中有空闲空间。

  • 什么是非阻塞式 IO

    非阻塞 IO 在 Scoket 对象上提供了一个选项Non_Blocking ,当这个选项打开时,读写方法不会阻塞,而是能读多少读多少,能写多少写多少。

    能读多少取决于内核为 Scoket 分配的读缓冲区的大小,能写多少取决于内核为 Scoket 分配的写缓冲区的剩余空间大小。读方法和写方法都会通过返回值来告知程序实际读写了多少字节数据。

    有了非阻塞 IO 意味着线程在读写 IO 时可以不必再阻塞了,读写可以瞬间完成然后线程可以继续干别的事了。

IO多路复用技术

IO多路复用技术

  • 白话举例:模拟一个tcp服务器处理30个客户socket

    假设你是一个老师,让30个学生解答一道题目,然后检查学生做的是否正确,你有下面几个选择:

    1. 第一种选择:按顺序逐个检查,先检查A,然后是B,之后是C、D。。。这中间如果有一个学生卡主,全班都会被耽误。这种模式就好比,你用循环挨个处理socket,根本不具有并发能力。
    2. 第二种选择:你创建30个分身,每个分身检查一个学生的答案是否正确。 这种类似于为每一个用户创建一个进程或者线程处理连接。
    3. 第三种选择,你站在讲台上等,谁解答完谁举手。这时C、D举手,表示他们解答问题完毕,你下去依次检查C、D的答案,然后继续回到讲台上等。此时E、A又举手,然后去处理E和A。。。 这种就是IO复用模型(Linux下的select、poll和epoll就是干这个的。将用户socket对应的fd注册进epoll,然后epoll帮你监听哪些socket上有消息到达,这样就避免了大量的无用操作。此时的socket应该采用非阻塞模式。这样,整个过程只在调用select、poll、epoll这些调用的时候才会阻塞,收发客户消息是不会阻塞的,整个进程或者线程就被充分利用起来,这就是事件驱动,所谓的reactor模式。)

单线程避免了线程切换和竞态产生的消耗

单线程能带来几个好处:

  • 第一,单线程可以简化数据结构和算法的实现。并发数据结构实现不但困难而且开发测试比较麻烦
  • 第二,单线程避免了线程切换和竞态产生的消耗,对于服务端开发来说,锁和线程切换通常是性能杀手。

单线程的问题:对于每个命令的执行时间是有要求的。如果某个命令执行过长,会造成其他命令的阻塞,所以 redis 适用于那些需要快速执行的场景。

参考资源


Redis线程模型

Redis线程模型

Redis 线程模型_好记性不如烂笔头-CSDN博客

Redis 单线程模型介绍

了解redis的单线程模型工作原理?一篇文章就够了

Redis 命令参考 - Redis 命令参考

Redis的数据类型 - string

string 字符串

string: 最简单的字符串类型键值对缓存,也是最基本的

key相关

keys *:查看所有的key (不建议在生产上使用,有性能影响)

type key:key的类型

string类型

get/set/del:查询/设置/删除set rekey data:设置已经存在的key,会覆盖setnx rekey data:设置已经存在的key,不会覆盖

set key value ex time:设置带过期时间的数据expire key:设置过期时间ttl:查看剩余时间,-1永不过期,-2过期

append key:合并字符串strlen key:字符串长度

incr key:累加1decr key:类减1incrby key num:累加给定数值decrby key num:累减给定数值

getrange key start end:截取数据,end=-1 代表到最后setrange key start newdata:从start位置开始替换数据

mset:连续设值mget:连续取值msetnx:连续设置,如果存在则不设置

其他

select index:切换数据库,总共默认16个flushdb:删除当前下边db中的数据flushall:删除所有db中的数据

Redis的数据类型 - hash

hash

hash:类似map,存储结构化数据结构,比如存储一个对象(不能有嵌套对象)

使用

hset key property value:> hset user name imooc> 创建一个user对象,这个对象中包含name属性,name值为imooc

hget user name:获得用户对象中name的值

hmset:设置对象中的多个键值对> hset user age 18 phone 139123123hmsetnx:设置对象中的多个键值对,存在则不添加> hset user age 18 phone 139123123

hmget:获得对象中的多个属性> hmget user age phone

hgetall user:获得整个对象的内容

hincrby user age 2:累加属性hincrbyfloat user age 2.2:累加属性

hlen user:有多少个属性

hexists user age:判断属性是否存在

hkeys user:获得所有属性hvals user:获得所有值

hdel user:删除对象

Redis的数据类型 - list

list

list:列表,[a, b, c, d, …]

使用

lpush userList 1 2 3 4 5:构建一个list,从左边开始存入数据rpush userList 1 2 3 4 5:构建一个list,从右边开始存入数据lrange list start end:获得数据

lpop:从左侧开始拿出一个数据rpop:从右侧开始拿出一个数据

pig cow sheep chicken duck

llen list:list长度lindex list index:获取list下标的值

lset list index value:把某个下标的值替换

linsert list before/after value:插入一个新的值

lrem list num value:删除几个相同数据

ltrim list start end:截取值,替换原来的list

Redis的数据类型 - zset

sorted set:

sorted set:排序的set,可以去重可以排序,比如可以根据用户积分做排名,积分作为set的一个数值,根据数值可以做排序。set中的每一个memeber都带有一个分数

使用

zadd zset 10 value1 20 value2 30 value3:设置member和对应的分数

zrange zset 0 -1:查看所有zset中的内容zrange zset 0 -1 withscores:带有分数

zrank zset value:获得对应的下标zscore zset value:获得对应的分数

zcard zset:统计个数zcount zset 分数1 分数2:统计个数

zrangebyscore zset 分数1 分数2:查询分数之间的member(包含分数1 分数2)zrangebyscore zset (分数1 (分数2:查询分数之间的member(不包含分数1 和 分数2)zrangebyscore zset 分数1 分数2 limit start end:查询分数之间的member(包含分数1 分数2),获得的结果集再次根据下标区间做查询

zrem zset value:删除member

[toc]

1. Redis的持久化机制 - RDB

1.1 什么是RDB

RDB:每隔一段时间,把内存中的数据写入磁盘的临时文件,作为快照,恢复的时候把快照文件读进内存。如果宕机重启,那么内存里的数据肯定会没有的,那么再次启动redis后,则会恢复。

1.2 备份与恢复

内存备份 –> 磁盘临时文件临时文件 –> 恢复到内存

1.3 RDB优劣势

  • 优势
    1. 每隔一段时间备份,全量备份
    2. 灾备简单,可以远程传输
    3. 子进程备份的时候,主进程不会有任何io操作(不会有写入修改或删除),保证备份数据的的完整性
    4. 相对AOF来说,当有更大文件的时候可以快速重启恢复
  • 劣势
    1. 发生故障是,有可能会丢失最后一次的备份数据
    2. 子进程所占用的内存比会和父进程一模一样,如会造成CPU负担
    3. 由于定时全量备份是重量级操作,所以对于实时备份,就无法处理了。

1.4 RDB的配置

  1. 保存位置,可以在redis.conf自定义:/user/local/redis/working/dump.rdb

  2. 保存机制:

    1
    2
    3
    4
    save 900 1
    save 300 10
    save 60 10000
    save 10 3
    1
    2
    3
    4
    5
    如果1个缓存更新,则15分钟后备份
    * 如果10个缓存更新,则5分钟后备份
    * 如果10000个缓存更新,则1分钟后备份
    * 演示:更新3个缓存,10秒后备份
    * 演示:备份dump.rdb,删除重启
  3. stop-writes-on-bgsave-error
    ◦ yes:如果save过程出错,则停止写操作
    ◦ no:可能造成数据不一致

  4. rdbcompression
    ◦ yes:开启rdb压缩模式
    ◦ no:关闭,会节约cpu损耗,但是文件会大,道理同nginx

  5. rdbchecksum
    ◦ yes:使用CRC64算法校验对rdb进行数据校验,有10%性能损耗
    ◦ no:不校验

1.5 总结

RDB适合大量数据的恢复,但是数据的完整性和一致性可能会不足。

2 Redis的持久化机制 - AOF

RDB会丢失最后一次备份的rdb文件,但是其实也无所谓,其实也可以忽略不计,毕竟是缓存,丢了就丢了,但是如果追求数据的完整性,那就的考虑使用AOF了。

2.1 AOF特点

  1. 以日志的形式来记录用户请求的写操作。读操作不会记录,因为写操作才会存存储。
  2. 文件以追加的形式而不是修改的形式。
  3. redis的aof恢复其实就是把追加的文件从开始到结尾读取执行写操作。

2.2 优势

  1. AOF更加耐用,可以以秒级别为单位备份,如果发生问题,也只会丢失最后一秒的数据,大大增加了可靠性和数据完整性。所以AOF可以每秒备份一次,使用fsync操作。
  2. 以log日志形式追加,如果磁盘满了,会执行 redis-check-aof 工具
  3. 当数据太大的时候,redis可以在后台自动重写aof。当redis继续把日志追加到老的文件中去时,重写也是非常安全的,不会影响客户端的读写操作。
  4. AOF 日志包含的所有写操作,会更加便于redis的解析恢复。

2.3 劣势

  1. 相同的数据,同一份数据,AOF比RDB大
  2. 针对不同的同步机制,AOF会比RDB慢,因为AOF每秒都会备份做写操作,这样相对与RDB来说就略低。 每秒备份fsync没毛病,但是如果客户端的每次写入就做一次备份fsync的话,那么redis的性能就会下降。
  3. AOF发生过bug,就是数据恢复的时候数据不完整,这样显得AOF会比较脆弱,容易出现bug,因为AOF没有RDB那么简单,但是呢为了防止bug的产生,AOF就不会根据旧的指令去重构,而是根据当时缓存中存在的数据指令去做重构,这样就更加健壮和可靠了。

2.4 AOF的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# AOF 默认关闭,yes可以开启
appendonly no

# AOF 的文件名
appendfilename "appendonly.aof"

# no:不同步
# everysec:每秒备份,推荐使用
# always:每次操作都会备份,安全并且数据完整,但是慢性能差
appendfsync everysec

# 重写的时候是否要同步,no可以保证数据安全
no-appendfsync-on-rewrite no

# 重写机制:避免文件越来越大,自动优化压缩指令,会fork一个新的进程去完成重写动作,新进程里的内存数据会被重写,此时旧的aof文件不会被读取使用,类似rdb
# 当前AOF文件的大小是上次AOF大小的100% 并且文件体积达到64m,满足两者则触发重写
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

2.5 到底采用RDB还是AOF呢?

  1. 如果你能接受一段时间的缓存丢失,那么可以使用RDB
  2. 如果你对实时性的数据比较care,那么就用AOF
  3. 使用RDB和AOF结合一起做持久化,RDB做冷备,可以在不同时期对不同版本做恢复,AOF做热备,保证数据仅仅只有1秒的损失。当AOF破损不可用了,那么再用RDB恢复,这样就做到了两者的相互结合,也就是说Redis恢复会先加载AOF,如果AOF有问题会再加载RDB,这样就达到冷热备份的目的了。