0%

转载自 https://www.cnblogs.com/skywang12345/p/3496101.html

ReentrantLock介绍

ReentrantLock是一个可重入的互斥锁,又被称为“独占锁”。

顾名思义,ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。
ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。“锁”是为了保护竞争资源,防止多个线程同时操作线程而出错,ReentrantLock在同一个时间点只能被一个线程获取(当某线程获取到“锁”时,其它线程就必须等待);ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。

阅读全文 »

Spring Integration 对 Spring 编程模型进行了扩展,使得后者能够支持著名的“企业集成模式”。通过SI(Spring Integration)可以在基于Spring的应用中引入轻量级的“消息驱动模式”,并且支持“通过声明式的适配器”与外部系统进行集成。这些“适配器”相较于Spring对于“remoting(远程调用)”、“messaging(事件消息)”、“scheduling(任务调度)”方面的支持,提供了更高层次的一种抽象。SI的首要目标是:为“构建企业集成方案、维护系统间通信”提供一种简单模型,应用该模型所产出的代码是可维护、可测试的

阅读全文 »

转载自https://mp.weixin.qq.com/s/8aMz07rOF5LuclnBaI_p5g


今天又要给大家介绍一个 Spring Boot 中的组件–HandlerMethodReturnValueHandler。

在前面的文章中(如何优雅的实现 Spring Boot 接口参数加密解密?),松哥已经和大家介绍过如何对请求/响应数据进行预处理/二次处理,当时我们使用了 ResponseBodyAdvice 和 RequestBodyAdvice。其中 ResponseBodyAdvice 可以实现对响应数据的二次处理,可以在这里对响应数据进行加密/包装等等操作。不过这不是唯一的方案,今天松哥要和大家介绍一种更加灵活的方案–HandlerMethodReturnValueHandler,我们一起来看看下。

阅读全文 »

转载自Java多线程系列–“JUC锁”01之 框架

[TOC]

synchronized 同步锁

使用 synchronized 关键字进行同步,实现对竞争资源的互斥访问的锁。Java 1.0版本中就已经支持同步锁了。

同步锁的原理是,对于每一个对象,有且仅有一个同步锁;不同的线程能共同访问该同步锁。但是,在同一个时间点,该同步锁能且只能被一个线程获取到。这样,获取到同步锁的线程就能进行CPU调度,从而在CPU上执行;而没有获取到同步锁的线程,必须进行等待,直到获取到同步锁之后才能继续运行。这就是,多线程通过同步锁进行同步的原理!

java.util.concurrent 包中的锁

相比同步锁,JUC包中的锁的功能更加强大,它为锁提供了一个框架,该框架允许更灵活地使用锁,只是它的用法更难罢了。

JUC包中的锁,包括:Lock接口,ReadWriteLock接口,LockSupport阻塞原语,Condition条件,AbstractOwnableSynchronizer/AbstractQueuedSynchronizer/AbstractQueuedLongSynchronizer三个抽象类,ReentrantLock独占锁,ReentrantReadWriteLock读写锁。由于CountDownLatch,CyclicBarrier和Semaphore也是通过AQS来实现的;因此,我也将它们归纳到锁的框架中进行介绍。

先看看锁的框架图,如下所示。

img

Lock

JUC包中的 Lock 接口支持那些语义不同(重入、公平等)的锁规则。所谓语义不同,是指锁可是有”公平机制的锁”、”非公平机制的锁”、”可重入的锁”等等。”公平机制”是指”不同线程获取锁的机制是公平的”,而”非公平机制”则是指”不同线程获取锁的机制是非公平的”,”可重入的锁”是指同一个锁能够被一个线程多次获取。

ReadWriteLock

ReadWriteLock 接口以和Lock类似的方式定义了一些读取者可以共享而写入者独占的锁。JUC包只有一个类实现了该接口,即 ReentrantReadWriteLock,因为它适用于大部分的标准用法上下文。但程序员可以创建自己的、适用于非标准要求的实现。

AbstractOwnableSynchronizer/AbstractQueuedSynchronizer/AbstractQueuedLongSynchronizer

AbstractQueuedSynchronizer就是被称之为AQS的类,它是一个非常有用的超类,可用来定义锁以及依赖于排队阻塞线程的其他同步器;ReentrantLock,ReentrantReadWriteLock,CountDownLatch,CyclicBarrier和Semaphore等这些类都是基于AQS类实现的。AbstractQueuedLongSynchronizer 类提供相同的功能但扩展了对同步状态的 64 位的支持。两者都扩展了类 AbstractOwnableSynchronizer(一个帮助记录当前保持独占同步的线程的简单类)。

LockSupport

LockSupport提供“创建锁”和“其他同步类的基本线程阻塞原语”。
LockSupport的功能和”Thread中的Thread.suspend()和Thread.resume()有点类似”,LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程。但是park()和unpark()不会遇到“Thread.suspend 和 Thread.resume所可能引发的死锁”问题。

Condition

Condition需要和Lock联合使用,它的作用是代替Object监视器方法,可以通过await(),signal()来休眠/唤醒线程。
Condition 接口描述了可能会与锁有关联的条件变量。这些变量在用法上与使用 Object.wait 访问的隐式监视器类似,但提供了更强大的功能。需要特别指出的是,单个 Lock 可能与多个 Condition 对象关联。为了避免兼容性问题,Condition 方法的名称与对应的 Object 版本中的不同。

ReentrantLock

ReentrantLock是独占锁。所谓独占锁,是指只能被独自占领,即同一个时间点只能被一个线程锁获取到的锁。ReentrantLock锁包括”公平的ReentrantLock”和”非公平的ReentrantLock”。”公平的ReentrantLock”是指”不同线程获取锁的机制是公平的”,而”非公平的  ReentrantLock”则是指”不同线程获取锁的机制是非公平的”,ReentrantLock是”可重入的锁”。
ReentrantLock的UML类图如下:

img

  1. ReentrantLock实现了Lock接口。
  2. ReentrantLock中有一个成员变量sync,sync是Sync类型;Sync是一个抽象类,而且它继承于AQS。
  3. ReentrantLock中有”公平锁类”FairSync和”非公平锁类”NonfairSync,它们都是Sync的子类。ReentrantReadWriteLock中sync对象,是FairSync与NonfairSync中的一种,这也意味着ReentrantLock是”公平锁”或”非公平锁”中的一种,ReentrantLock默认是非公平锁。

ReentrantReadWriteLock

ReentrantReadWriteLock是读写锁接口ReadWriteLock的实现类,它包括子类ReadLock和WriteLock。ReentrantLock是共享锁,而WriteLock是独占锁。
ReentrantReadWriteLock的UML类图如下:

img

  1. ReentrantReadWriteLock实现了ReadWriteLock接口。
  2. ReentrantReadWriteLock中包含sync对象,读锁readerLock和写锁writerLock。读锁ReadLock和写锁WriteLock都实现了Lock接口。
  3. 和”ReentrantLock”一样,sync是Sync类型;而且,Sync也是一个继承于AQS的抽象类。Sync也包括”公平锁”FairSync和”非公平锁”NonfairSync。

CountDownLatch

CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
CountDownLatch的UML类图如下:

img

CountDownLatch包含了sync对象,sync是Sync类型。CountDownLatch的Sync是实例类,它继承于AQS。

CyclicBarrier

CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
CyclicBarrier的UML类图如下:

img

CyclicBarrier是包含了”ReentrantLock对象lock”和”Condition对象trip”,它是通过独占锁实现的。
CyclicBarrier和CountDownLatch的区别是:

  1. CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。
  2. CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。

Semaphore

Semaphore是一个计数信号量,它的本质是一个”共享锁”。

信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可。

Semaphore的UML类图如下:

img

和”ReentrantLock”一样,Semaphore包含了sync对象,sync是Sync类型;而且,Sync也是一个继承于AQS的抽象类。Sync也包括”公平信号量”FairSync和”非公平信号量”NonfairSync。

[TOC]

1 理论

JVM-内存结构

JVM-类文件结构

JVM-类加载机制

JVM-分代收集理论

JVM-编译器优化

1.4 垃圾收集算法

1.5 垃圾收集器

2 工具

虚拟机性能监控故障处理工具

3 实战

3.1 大内存硬件上的程序部署策略

如果实际内存有16G,将-Xmx-Xms参数将Java 堆大小固定为12G,程序运行一段时间后,会常常不定期出现长时间失去响应,程序中涉及将磁盘中的文档提取到内存中,这些大对象在分配时直接进入了老年代,没有在 Minor GC 中被清理掉,很快就造成了内存不够的现象导致程序每隔几分钟出现十几秒的停顿。

解决办法是堆内存重新缩小到1.5G或者2G。

每一款Java虚拟机中的每一款垃圾收集器都有自己的应用目标与最适合的应用场景,如果在特定

场景中选择了不恰当的配置和部署方式,自然会事倍功半。目前单体应用在较大内存的硬件上主要的

部署方式有两种:

  1. 通过一个单独的Java虚拟机实例来管理大量的Java堆内存。

  2. 同时使用若干个Java虚拟机,建立逻辑集群来利用硬件资源。

控制Full GC频率的关键是老年代的相对稳定,这主要取决于应用中绝大多数对象能否符合“朝生夕灭”的原则,即大多数对象的生存时间不应当太长,尤其是不能有成批量的、长生存时间的大对象产 生,这样才能保障老年代空间的稳定。

如果要使用单个Java实例来管理大内存,还需考虑下面可能面临的问题:

  • 回收大块堆内存而导致的长时间停顿,自从G1收集器的出现,增量回收得到比较好的应用,

    这个问题有所缓解,但要到ZGC和Shenandoah收集器成熟之后才得到相对彻底地解决。

  • 大内存必须有64位Java虚拟机的支持,但由于压缩指针、处理器缓存行容量(Cache Line)等因素,64位虚拟机的性能测试结果普遍略低于相同版本的32位虚拟机。

  • 必须保证应用程序足够稳定,因为这种大型单体应用要是发生了堆内存溢出,几乎无法产生堆转储快照(要产生十几GB乃至更大的快照文件),哪怕成功生成了快照也难以进行分析;如果确实出了问题要进行诊断,可能就必须应用JMC这种能够在生产环境中进行的运维工具。

  • 相同的程序在64位虚拟机中消耗的内存一般比32位虚拟机要大,这是由于指针膨胀,以及数据类型对齐补白等因素导致的,可以开启(默认即开启)压缩指针功能来缓解。

3.2 集群间同步导致的内存溢出

被集群共享的数据要使用类似JBossCache这种非集中式的集群缓存来同步的话,可以允许读操作频繁,因为数据在本地内存有一份副本,读取的动作不会耗费多少资源,但不应当有过于频繁 的写操作,会带来很大的网络同步的开销。

3.3 堆外内存导致的溢出错误

1
2
3
4
5
6
[org.eclipse.jetty.util.log] handle failed java.lang.OutOfMemoryError: null 
at sun.misc.Unsafe.allocateMemory(Native Method)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:99)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:288)
at org.eclipse.jetty.io.nio.DirectNIOBuffer.<init>
……

直接内存(Direct Memory)并不是虚拟机运行时数据区的一部分,也不是《Java虚拟机规范》中定义的内存区域。但是这部分内存也被频繁的使用,而且也可能导致OutOfMemory异常出现。

在JDK 1.4中新加入了NIO(New Input/Output)类,引入了一种基于通道(Channel)与缓冲区(Buffer)的I/O方式,它可以使用Native函数库直接分配堆外内存,然后通过一个存储在Java堆里面的DirectByteBuffer对象作为这块内存的引用进行操作。这样能在一些场景中显著提高性能,因为避免了在Java堆和Native堆中来回复制数据。

显然,本机直接内存的分配不会受到Java堆大小的限制,但是,既然是内存,则肯定还是会受到本机总内存(包括物理内存、SWAP分区或者分页文件)大小以及处理器寻址空间的限制,一般服务 器管理员配置虚拟机参数时,会根据实际内存去设置-Xmx等参数信息,但经常忽略掉直接内存,使得 各个内存区域总和大于物理内存限制(包括物理的和操作系统级的限制),从而导致动态扩展时出现 OutOfMemoryError异常。

3.4 外部命令导致系统缓慢

3.5 服务器虚拟机进程崩溃

3.6 不恰当数据结构导致内存占用过大

3.7 由Windows虚拟内存导致的长时间停顿

3.8 由安全点导致长时间停顿

4 参考资源

JVM调优总结

1 背景

在老版本的ES(例如2.3版本)中, index的shard数量定好后,就不能再修改,除非重建数据才能实现。

从ES6.1开始,ES 支持可以在线操作扩大shard的数量(注意:操作期间也需要对index锁写)

从ES7.0开始,split时候,不再需要加参数 index.number_of_routing_shards

具体参考官方文档:

https://www.elastic.co/guide/en/elasticsearch/reference/7.5/indices-split-index.html

https://www.elastic.co/guide/en/elasticsearch/reference/6.1/indices-split-index.html

2 split的过程:

1、创建一个新的目标index,其定义与源index相同,但是具有更多的primary shard。

2、将segment从源index硬链接到目标index。(如果文件系统不支持硬链接,则将所有segment都复制到新索引中,这是一个非常耗时的过程。)

3、创建低级文件后,再次对所有文档进行哈希处理,以删除属于不同shard的documents

4、恢复目标索引,就像它是刚刚重新打开的封闭索引一样。

3 为啥ES不支持增量resharding?

从N个分片到N + 1个分片。增量重新分片确实是许多键值存储支持的功能。仅添加一个新的分片并将新的数据推入该新的分片是不可行的:这可能是一个索引瓶颈,并根据给定的_id来确定文档所属的分片,这对于获取,删除和更新请求是必需的,会变得很复杂。这意味着我们需要使用其他哈希方案重新平衡现有数据。

键值存储有效执行此操作的最常见方式是使用一致的哈希。当分片的数量从N增加到N + 1时,一致的哈希仅需要重定位键的1 / N。但是,Elasticsearch的存储单位(碎片)是Lucene索引。由于它们以搜索为导向的数据结构,仅占Lucene索引的很大一部分,即仅占5%的文档,将其删除并在另一个分片上建立索引通常比键值存储要高得多的成本。如上节所述,当通过增加乘数来增加分片数量时,此成本保持合理:这允许Elasticsearch在本地执行拆分,这又允许在索引级别执行拆分,而不是为需要重新索引的文档重新编制索引移动,以及使用硬链接进行有效的文件复制。

对于仅追加数据,可以通过创建新索引并将新数据推送到其中,同时添加一个别名来覆盖读取操作的新旧索引,从而获得更大的灵活性。假设旧索引和新索引分别具有M和N个分片,与搜索具有M + N个分片的索引相比,这没有开销。

4 索引能进行split的前提条件:

1、目标索引不能存在。

2、源索引必须比目标索引具有更少的primary shard。

3、目标索引中主shard的数量必须是源索引中主shard的数量的倍数。

4、处理拆分过程的节点必须具有足够的可用磁盘空间,以容纳现有索引的第二个副本。

5 操作

下面是具体的实验部分:

tips:实验机器有限,索引的replica都设置为0,生产上至少replica>=1

5.1 创建一个索引,2个主shard,没有副本

1
2
3
4
curl -s -X PUT “<http://localhost:9200/twitter?pretty”> -H ‘Content-Type: application/json’ -d’ { “settings”: { “index.number_of_shards”: 2, “index.number_of_replicas”: 0 }, “aliases”: { “my_search_indices”: {} } }’ 
# index.number_of_shards:主分片设定个数
# index.number_of_replicas:副本分片设定个数,一个副本就等于把整个索引备份1份
# aliases:设定索引别名”my_search_indices”

写入几条测试数据

1
2
3
curl -s –X PUT “<http://localhost:9200/my_search_indices/_doc/11?pretty”> –H ‘Content–Type: application/json’ -d ‘{ “id”: 11, “name”:“lee”, “age”:“23” }’ 

curl -s –X PUT “<http://localhost:9200/my_search_indices/_doc/22?pretty”> –H ‘Content–Type: application/json’ -d ‘{ “id”: 22, “name”:“amd”, “age”:“22” }’

查询数据

1
curl -s -XGET “<http://localhost:9200/my_search_indices/_search”> | jq .

5.2 对索引锁写,以便下面执行split操作

1
2
curl -s -X PUT “<http://localhost:9200/twitter/_settings?pretty”> -H ‘Content-Type: application/json’ -d ‘{ “settings”: { “index.blocks.write”: true } }’ 
# index.blocks.write:写入锁定,只能读,不能写

写数据测试,确保锁写生效

1
2
curl -s -X PUT “<http://localhost:9200/twitter/_doc/33?pretty”> -H ‘Content-Type: application/json’ -d ‘{ “id”: 33, “name”:“amd”, “age”:“33” }’ 
# 测试写入失败

取消 twitter 索引的alias

1
2
3
curl -s -X POST “<http://localhost:9200/_aliases?pretty”> -H ‘Content-Type: application/json’ -d ‘{ “actions” : [ { “remove” : { “index” : “twitter”, “alias” : “my_search_indices” } } ] }‘ 

curl -s -X GET “<http://localhost:9200/_cat/aliases”>

第二种方式:

1
2
3
# 取消索引别名 
curl -s -X DELETE “<http://localhost:9200/twitter/_alias/my_search_indices”>
curl -s -X GET“<http://localhost:9200/_cat/aliases”>

5.3 开始执行 split 切分索引的操作,调整后索引名称为new_twitter,且主shard数量为8

1
curl -s –X POST “<http://localhost:9200/twitter/_split/new_twitter?pretty”> –H ‘Content–Type: application/json’ -d ‘{ “settings”: { “index.number_of_shards”: 8, “index.number_of_replicas”: 0 } }’

对新的index添加alias

1
curl -s –X POST “<http://localhost:9200/_aliases?pretty”> –H ‘Content–Type: application/json’ -d ‘{ “actions” : [ { “add” : { “index” : “new_twitter”, “alias” : “my_search_indices” } } ] }’

第二种方式:

1
2
# 新建索引别名 
curl -s -X PUT “<http://localhost:9200/new_twitter/_alias/my_search_indices”>

结果:

1
2
3
4
5
{ 
"acknowledged" : true,
"shards_acknowledged” : true,
"index" : "new_twitter"
}

补充:

查看split的进度,可以使用 _cat/recovery 这个api, 或者在 cerebro 界面上查看。

5.4 查看新索引的数据,能正常查看

1
curl -s -XGET “<http://localhost:9200/my_search_indices/_search”> | jq .

查看split的进度,可以使用 _cat/recovery 这个api, 或者在 cerebro 界面上查看。

1
curl -s -X GET “<http://localhost:9200/_cat/recovery”>

对新索引写数据测试,可以看到失败的

1
curl -s -X PUT “localhost:9200/my_search_indices/_doc/33?pretty” -H ‘Content-Type: application/json’ -d ‘{ “id”: 33, “name”:“amd”, “age”:“33” }’ # 写入失败

打开索引的写功能

1
curl -s –X PUT “localhost:9200/my_search_indices/_settings?pretty” –H ‘Content–Type: application/json’ -d ‘{ “settings”: { “index.blocks.write”: false } }’

再次对新索引写数据测试,可以看到此时,写入是成功的

1
2
3
curl -s –X PUT “localhost:9200/my_search_indices/_doc/33?pretty” –H ‘Content–Type: application/json’ -d ‘{ “id”: 33, “name”:“amd”, “age”:“33” }’ 

curl -s –X PUT “localhost:9200/my_search_indices/_doc/44?pretty” –H ‘Content–Type: application/json’ -d ‘{ “id”: 44, “name”:“intel”, “age”:“4” }’

此时,老的那个索引还是只读的,我们确保新索引OK后,就可以考虑关闭或者删除老的 twitter索引了。

测试将新数据写入别名

1
2
curl -s –X PUT “localhost:9200/my_search_indices/_doc/44?pretty” –H ‘Content–Type: application/json’ -d ‘{ “id”: 44, “name”:“amd”, “age”:“44” }’ 
# 写入也是ok 的

删除索引

1
curl -s -X DELETE “<http://localhost:9200/new_twitter”>

总结

贴一张 生产环境执行后的index的截图,可以看到新的index的每个shard体积只有老index的一半,这样也就分摊了index的压力:

https://www.xiaohuait.com/wp-content/uploads/2020/09/774.jpg

ES集群核心概念

1)Cluster:集群

ES可以作为一个独立的单个搜索服务器。不过,为了处理大型数据集,实现容错和高可用性,ES可以运行在许多互相合作的服务器上。这些服务器的集合称为集群,集群内的节点的cluster.name相同。

2)Node:节点

形成集群的每个服务器称为节点。

ES 为分配不同的任务,定义了以下几个节点角色:Master,Data Node,Coordinating Node,Ingest Node

  • Master 节点:每个 ES 节点启动之前都会有个默认配置 node.master:true ,也就是说每个节点都有可能成为 Master 节点,这些节点被称作 Master-eligible nodes ,就是合格的有资格成为 Master 节点的节点。当然 Master 只能有一个,所以会通过选举的方法对这启动的节点选举,被选中的节点才会成为 Master 节点。 Master 节点主要是负责维护集群的状态,像所有节点的信息,所有的索引和它相关的 Mapping 关系,配置信息,分片的路由等。既然 Master 节点维护了这么重要的信息,玩意它挂了怎么办?挂了的话,将会对其他的有资格成为 Master 节点的节点重新选举出另一个 Master 节点,因此这就说明了其他 Master-eligible nodes 也会保存集群信息,但是只有 Master 节点有权限能够修改,试想如果其他节点也能修改的话,这将会导致数据不一致的问题。

  • Data Node 节点:数据节点,这个节点主要负责数据的存储,在数据扩展上起到了至关重要的作用。也就是说读写数据都会找到相应的 Data Node 节点。

  • Coordinating Node 节点:协调节点主要负责协调客户端的请求,将接收到的请求分发给合适的节点,并把结果汇集到一起。比如客户端请求查询某个索引的数据,协调节点将会把请求分发给保存相关的数据的 DataNode 节点,找到相应的分片,并把查询到的结果都汇集返回。并且每个节点都默认起到了 Coordinating Node 的职责。

  • Ingest Node节点: Ingest node 专门对索引的文档做预处理,发生在对真实文档建立索引之前。在建立索引对文档预处理之前,先定义一个管道(pipeline),管道里指定了一系列的处理器。每个处理器能够把文档按照某种特定的方式转换。比如在管道里定义一个从某个文档中移除字段的处理器,紧接着一个重命名字段的处理器。集群的状态也会被存储到配置的管道内。定义一个管道,简单的在索引或者bulk request(一种批量请求方法)操作上定义 pipeline 参数,这样 ingest node 就会知道哪个管道在使用。这个节点在使用过程中用的也不多,所以大概了解一下就行。

  • 文档读写原理

    https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/es_8_2.png

    https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/es_8_3.png

说明:

  • 一个节点可以充当一个或多个角色,默认三个角色都有。
  • 协调节点:一个节点只作为接收请求、转发请求到其他节点、汇总各个节点返回数据等功能的节点。就叫协调节点。

3)Index:索引

在 ES 中, 索引是一组文档的集合。索引的作用相当于图书的目录,可以根据目录中的页码快速找到所需的内容。当表中有大量记录时,若要对表进行查询,第一种搜索信息方式是全表搜索,是将所有记录一一取出,和查询条件进行一一对比,然后返回满足条件的记录,这样做会消耗大量数据库系统时间,并造成大量磁盘I/O操作;第二种就是在表中建立索引,然后在索引中找到符合查询条件的索引值,最后通过保存在索引中的ROWID(相当于页码)快速找到表中对应的记录。

4)Shard:分片

当有大量的文档时,由于内存的限制、磁盘处理能力不足、无法足够快的响应客户端的请求等,一个节点可能不够。

这种情况下,数据可以分为较小的分片。每个分片放到不同的服务器上。当你查询的索引分布在多个分片上时,ES会把查询发送给每个相关的分片,并将结果组合在一起,而应用程序并不知道分片的存在。即:这个过程对用户来说是透明的。

说明:

  • 创建索引的时候就确定好主分片的数量,除非重索引。
  • 分片对应的存储实体是索引。
  • 一个分片就是一个 Lucene 实例

5)路由

Elasticsearch 如何知道一个文档应该存放到哪个分片中呢?当我们创建文档时,它如何决定这个文档应当被存储在分片 1 还是分片 2 中呢?首先这肯定不会是随机的,否则将来要获取文档的时候我们就不知道从何处寻找了。实际上,这个过程是根据下面这个公式决定的:shard = hash(routing) % number_of_primary_shardsrouting 是一个可变值,唯一不可重复,默认是文档的 _id ,也可以设置成一个自定义的值。 routing 通过 hash 函数生成一个数字,然后这个数字再除以 number_of_primary_shards (主分片的数量)后得到余数 。这个分布在 0 到 number_of_primary_shards-1 之间的余数,就是我们所寻求的文档所在分片的位置。这就解释了为什么我们要在创建索引的时候就确定好主分片的数量 并且永远不会改变这个数量:因为如果数量变化了,那么所有之前路由的值都会无效,文档也再也找不到了。所有的文档 API( get 、 index 、 delete 、 bulk 、 update 以及 mget )都接受一个叫做 routing 的路由参数 ,通过这个参数我们可以自定义文档到分片的映射。一个自定义的路由参数可以用来确保所有相关的文档——例如所有属于同一个用户的文档——都被存储到同一个分片中。

6)Replia:副本

在创建某个索引之前,需要指定分配这个索引多少个分片?多少个副本?副本就这这个分片的备胎,当分片挂掉了,它的副本就会随时准备上位,因此副本也是个分片只不过不负责主要功能。

不仅仅如此,ES 如何能够提高数据吞吐量呢?增加副本个数就是个不错的选择,比如说读写分离,读数据的时候从副本上读,写数据的时候只用主分片去写。需要注意的是,主分片的个数实在建立索引之前要确定,建立完索引之后,是不能够进行修改的,除非重新建索引。因此在建索引之前,一定要合理的配置分片个数,副本个数的话后期是可以改动的。

为提高查询吞吐量或实现高可用性,可以使用分片副本。副本是一个分片的精确复制,每个分片可以有零个或多个副本。ES中可以有许多相同的分片,其中之一被选择更改索引操作,这种特殊的分片称为主分片。当主分片丢失时,如:该分片所在的数据不可用时,集群将副本提升为新的主分片。Elasticsearch 禁止同一个分片的主分片和副本分片在同一个节点上,所以如果是一个节点的集群是不能有副本的。

它在节点失败的情况下提供高可用性。由于这个原因,需要注意的是,副本分片永远不会分配到与主分片相同的节点上。

如何设置副本

启动 2 个 ES 节点,配置分片个数为 3,副本个数为 1(每个分片有一个副本)。如下图,蓝色的代表主分片,绿色的是副本,仔细一点不难发现,分片与其副本不在同一个节点内。这是非常合理的,因为副本本来就是主分片的备胎,当主分片节点挂了,另外一个节点的副本将会充当主分片,如果它们在同一个节点内,副本将发挥不到作用。

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/es_9_1.png

水平扩展原理

单个节点的容量是有限的,如果后期两个节点的容量不能够支持三个分片,那么另外启动一个节点就可以了,ES 会自动的重新规划分片,如下图:可以看到 A3 节点已经被自动的分配到 Node3 节点里面了,另外副本 B1 从 Node2 移动到 Node3 节点,B3 分片从 Node1 节点被分配到 Node2 节点。这里想一下,如果再启动一个节点呢?是的,再启动一个节点将不会对主分片起到任何作用,因为主分片不可以修改,只有三个,但是副本可以修改,能够起到扩容的作用。

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/es_9_2.png

转载自https://www.cnblogs.com/skywang12345/p/3514623.html

AtomicReference介绍和函数列表

AtomicReference是作用是对”对象”进行原子操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 使用 null 初始值创建新的 AtomicReference。
AtomicReference()
// 使用给定的初始值创建新的 AtomicReference。
AtomicReference(V initialValue)

// 如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。
boolean compareAndSet(V expect, V update)
// 获取当前值。
V get()
// 以原子方式设置为给定值,并返回旧值。
V getAndSet(V newValue)
// 最终设置为给定值。
void lazySet(V newValue)
// 设置为给定值。
void set(V newValue)
// 返回当前值的字符串表示形式。
String toString()
// 如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。
boolean weakCompareAndSet(V expect, V update)

源码(JDK1.8.0_202)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
/**
* An object reference that may be updated atomically. See the {@link
* java.util.concurrent.atomic} package specification for description
* of the properties of atomic variables.
* @since 1.5
* @author Doug Lea
* @param <V> The type of object referred to by this reference
*/
public class AtomicReference<V> implements java.io.Serializable {
private static final long serialVersionUID = -1848883965231344442L;

// 获取Unsafe对象,Unsafe的作用是提供CAS操作
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicReference.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

// volatile类型
private volatile V value;

/**
* Creates a new AtomicReference with the given initial value.
*
* @param initialValue the initial value
*/
public AtomicReference(V initialValue) {
value = initialValue;
}

/**
* Creates a new AtomicReference with null initial value.
*/
public AtomicReference() {
}

/**
* Gets the current value.
*
* @return the current value
*/
public final V get() {
return value;
}

/**
* Sets to the given value.
*
* @param newValue the new value
*/
public final void set(V newValue) {
value = newValue;
}

/**
* Eventually sets to the given value.
*
* @param newValue the new value
* @since 1.6
*/
public final void lazySet(V newValue) {
unsafe.putOrderedObject(this, valueOffset, newValue);
}

/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}

/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* <p><a href="package-summary.html#weakCompareAndSet">May fail
* spuriously and does not provide ordering guarantees</a>, so is
* only rarely an appropriate alternative to {@code compareAndSet}.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful
*/
public final boolean weakCompareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}

/**
* Atomically sets to the given value and returns the old value.
*
* @param newValue the new value
* @return the previous value
*/
@SuppressWarnings("unchecked")
public final V getAndSet(V newValue) {
return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
}

/**
* Atomically updates the current value with the results of
* applying the given function, returning the previous value. The
* function should be side-effect-free, since it may be re-applied
* when attempted updates fail due to contention among threads.
*
* @param updateFunction a side-effect-free function
* @return the previous value
* @since 1.8
*/
public final V getAndUpdate(UnaryOperator<V> updateFunction) {
V prev, next;
do {
prev = get();
next = updateFunction.apply(prev);
} while (!compareAndSet(prev, next));
return prev;
}

/**
* Atomically updates the current value with the results of
* applying the given function, returning the updated value. The
* function should be side-effect-free, since it may be re-applied
* when attempted updates fail due to contention among threads.
*
* @param updateFunction a side-effect-free function
* @return the updated value
* @since 1.8
*/
public final V updateAndGet(UnaryOperator<V> updateFunction) {
V prev, next;
do {
prev = get();
next = updateFunction.apply(prev);
} while (!compareAndSet(prev, next));
return next;
}

/**
* Atomically updates the current value with the results of
* applying the given function to the current and given values,
* returning the previous value. The function should be
* side-effect-free, since it may be re-applied when attempted
* updates fail due to contention among threads. The function
* is applied with the current value as its first argument,
* and the given update as the second argument.
*
* @param x the update value
* @param accumulatorFunction a side-effect-free function of two arguments
* @return the previous value
* @since 1.8
*/
public final V getAndAccumulate(V x,
BinaryOperator<V> accumulatorFunction) {
V prev, next;
do {
prev = get();
next = accumulatorFunction.apply(prev, x);
} while (!compareAndSet(prev, next));
return prev;
}

/**
* Atomically updates the current value with the results of
* applying the given function to the current and given values,
* returning the updated value. The function should be
* side-effect-free, since it may be re-applied when attempted
* updates fail due to contention among threads. The function
* is applied with the current value as its first argument,
* and the given update as the second argument.
*
* @param x the update value
* @param accumulatorFunction a side-effect-free function of two arguments
* @return the updated value
* @since 1.8
*/
public final V accumulateAndGet(V x,
BinaryOperator<V> accumulatorFunction) {
V prev, next;
do {
prev = get();
next = accumulatorFunction.apply(prev, x);
} while (!compareAndSet(prev, next));
return next;
}

/**
* Returns the String representation of the current value.
* @return the String representation of the current value
*/
public String toString() {
return String.valueOf(get());
}

}

说明:AtomicReference的源码比较简单。它是通过”volatile”和”Unsafe提供的CAS函数实现”原子操作。(01) value是volatile类型。这保证了:当某线程修改value的值时,其他线程看到的value值都是最新的value值,即修改之后的volatile的值。(02) 通过CAS设置value。这保证了:当某线程池通过CAS函数(如compareAndSet函数)设置value时,它的操作是原子的,即线程在操作value时不会被中断。

AtomicReference示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// AtomicReferenceTest.java的源码
import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceTest {

public static void main(String[] args){

// 创建两个Person对象,它们的id分别是101和102。
Person p1 = new Person(101);
Person p2 = new Person(102);
// 新建AtomicReference对象,初始化它的值为p1对象
AtomicReference ar = new AtomicReference(p1);
// 通过CAS设置ar。如果ar的值为p1的话,则将其设置为p2。
ar.compareAndSet(p1, p2);

Person p3 = (Person)ar.get();
System.out.println("p3 is "+p3);
System.out.println("p3.equals(p1)="+p3.equals(p1));
}
}

class Person {
volatile long id;
public Person(long id) {
this.id = id;
}
public String toString() {
return "id:"+id;
}
}

运行结果

1
2
p3 is id:102
p3.equals(p1)=false

结果说明:新建AtomicReference对象ar时,将它初始化为p1。紧接着,通过CAS函数对它进行设置。如果ar的值为p1的话,则将其设置为p2。最后,获取ar对应的对象,并打印结果。p3.equals(p1)的结果为false,这是因为Person并没有覆盖equals()方法,而是采用继承自Object.java的equals()方法;而Object.java中的equals()实际上是调用”==”去比较两个对象,即比较两个对象的地址是否相等。