0%

什么是FastDFS

  • FastDFS是一个开源的轻量级分布式文件系统,它对文件进行管理,功能包括:文件存储、文件同步、文件访问(文件上传、文件下载)等,解决了大容量存储和负载均衡的问题。特别适合以文件为载体的在线服务,如相册网站、视频网站等等。
  • FastDFS为互联网量身定制,充分考虑了冗余备份、负载均衡、线性扩容等机制,并注重高可用、高性能等指标,使用FastDFS很容易搭建一套高性能的文件服务器集群提供文件上传、下载等服务。
  • FastDFS服务端有两个角色:跟踪器(tracker)和存储节点(storage)。跟踪器主要做调度工作,在访问上起负载均衡的作用。
  • 存储节点存储文件,完成文件管理的所有功能:就是这样的存储、同步和提供存取接口,FastDFS同时对文件的metadata进行管理。所谓文件的meta data就是文件的相关属性,以键值对(key value)方式表示,如:width=1024,其中的key为width,value为1024。文件metadata是文件属性列表,可以包含多个键值对。
  • 跟踪器和存储节点都可以由一台或多台服务器构成。跟踪器和存储节点中的服务器均可以随时增加或下线而不会影响线上服务。其中跟踪器中的所有服务器都是对等的,可以根据服务器的压力情况随时增加或减少。
  • 为了支持大容量,存储节点(服务器)采用了分卷(或分组)的组织方式。存储系统由一个或多个卷组成,卷与卷之间的文件是相互独立的,所有卷的文件容量累加就是整个存储系统中的文件容量。一个卷可以由一台或多台存储服务器组成,一个卷下的存储服务器中的文件都是相同的,卷中的多台存储服务器起到了冗余备份和负载均衡的作用。
  • 在卷中增加服务器时,同步已有的文件由系统自动完成,同步完成后,系统自动将新增服务器切换到线上提供服务。
  • 当存储空间不足或即将耗尽时,可以动态添加卷。只需要增加一台或多台服务器,并将它们配置为一个新的卷,这样就扩大了存储系统的容量。
  • FastDFS中的文件标识分为两个部分:卷名和文件名,二者缺一不可。

– 摘自<百度百科>

阅读全文 »

前言

目前的数据同步,mappings映射会自动创建,但是分词不会,还是会使用默认的,而我们需要中文分词,这个时候就需要自定义模板功能来设置分词了。

查看Logstash默认模板

1
GET /_template/logstash

修改模板如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
{
"order": 0,
"version": 1,
"index_patterns": ["*"],
"settings": {
"index": {
"refresh_interval": "5s"
}
},
"mappings": {
"_default_": {
"dynamic_templates": [
{
"message_field": {
"path_match": "message",
"match_mapping_type": "string",
"mapping": {
"type": "text",
"norms": false
}
}
},
{
"string_fields": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"type": "text",
"norms": false,
"analyzer": "ik_max_word",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
],
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "keyword"
},
"geoip": {
"dynamic": true,
"properties": {
"ip": {
"type": "ip"
},
"location": {
"type": "geo_point"
},
"latitude": {
"type": "half_float"
},
"longitude": {
"type": "half_float"
}
}
}
}
}
},
"aliases": {}
}

新增如下配置,用于更新模板,设置中文分词

1
2
3
4
5
6
7
8
# 定义模板名称
template_name => "myik"
# 模板所在位置
template => "/usr/local/logstash-6.4.3/sync/logstash-ik.json"
# 重写模板
template_overwrite => true
# 默认为true,false关闭logstash自动管理模板功能,如果自定义模板,则设置为false
manage_template => false

重新运行Logstash进行同步

1
./logstash -f /usr/local/logstash/sync/logstash-db-sync.conf

logstash同步数据库配置

  1. 上传并解压logstash,路径为 /usr/local/logstash

  2. 创建文件名:logstash-db-sync.conf,后缀为conf,文件名随意,位置也随意,为方便起见,路径为/usr/local/logstash/sync

  3. 把数据库驱动拷贝至/usr/local/logstash/sync

  4. 配置内容如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    input {
    jdbc {
    # 设置 MySql/MariaDB 数据库url以及数据库名称
    jdbc_connection_string => "jdbc:mysql://192.168.1.6:3306/foodie-shop-dev?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true"
    # 用户名和密码
    jdbc_user => "root"
    jdbc_password => "root"
    # 数据库驱动所在位置,可以是绝对路径或者相对路径
    jdbc_driver_library => "/usr/local/logstash-6.4.3/sync/mysql-connector-java-5.1.41.jar"
    # 驱动类名
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # 开启分页
    jdbc_paging_enabled => "true"
    # 分页每页数量,可以自定义
    jdbc_page_size => "10000"
    # 执行的sql文件路径
    statement_filepath => "/usr/local/logstash-6.4.3/sync/foodie-items.sql"
    # 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务
    schedule => "* * * * *"
    # 索引类型
    type => "_doc"
    # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
    use_column_value => true
    # 记录上一次追踪的结果值
    last_run_metadata_path => "/usr/local/logstash-6.4.3/sync/track_time"
    # 如果 use_column_value 为true, 配置本参数,追踪的 column 名,可以是自增id或者时间
    tracking_column => "updated_time"
    # tracking_column 对应字段的类型
    tracking_column_type => "timestamp"
    # 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有的数据库记录
    clean_run => false
    # 数据库字段名称大写转小写
    lowercase_column_names => false
    }
    }
    output {
    elasticsearch {
    # es地址
    hosts => ["192.168.1.187:9200"]
    # 同步的索引名
    index => "foodie-items"
    # 设置_docID和数据相同。itemId与sql同步脚本中的itemId保持一致
    document_id => "%{itemId}"
    #document_id => "%{id}"
    }
    # 日志输出
    stdout {
    codec => json_lines
    }
    }
  5. sql同步脚本

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    SELECT
    i.id as itemId,
    i.item_name as itemName,
    i.sell_counts as sellCounts,
    ii.url as imgUrl,
    tempSpec.price_discount as price,
    i.updated_time as updated_time
    FROM
    items i
    LEFT JOIN
    items_img ii
    on
    i.id = ii.item_id
    LEFT JOIN
    (SELECT item_id,MIN(price_discount) as price_discount from items_spec GROUP BY item_id) tempSpec
    on
    i.id = tempSpec.item_id
    WHERE
    ii.is_main = 1
    and
    i.updated_time >= :sql_last_value
    --:sql_last_value是记录的最后的一个值

启动logstatsh

1
./logstash -f /usr/local/logstash/sync/logstash-db-sync.conf

参考资源

Logstash使用介绍

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/logstash_1_1.png

1 Logstash

Logstash是elastic技术栈中的一个技术。它是一个数据采集引擎,可以从数据库采集数据到es中。我们可以通过设置自增id主键或者时间来控制数据的自动同步,这个id或者时间就是用于给logstash进行识别的

  • id:假设现在有1000条数据,Logstatsh识别后会进行一次同步,同步完会记录这个id为1000,以后数据库新增数据,那么id会一直累加,Logstatsh会有定时任务,发现有id大于1000了,则增量加入到es中
  • 时间:同理,一开始同步1000条数据,每条数据都有一个字段,为time,初次同步完毕后,记录这个time,下次同步的时候进行时间比对,如果有超过这个时间的,那么就可以做同步,这里可以同步新增数据,或者修改元数据,因为同一条数据的时间更改会被识别,而id则不会。

2 安装Logstash

官方链接:https://www.elastic.co/cn/downloads/past-releases/logstash-6-4-3

  • 注:使用Logstatsh的版本号与elasticsearch版本号需要保持一致

3 插件 logstash-input-jdbc

本插件用于同步,es6.x起自带,这个是集成在了 logstash中的。所以直接配置同步数据库的配置文件即可

4 创建索引

同步数据到es中,前提得要有索引,这个需要手动先去创建,名字随意。比如:foodie-items

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/5e01cbe1086da30502810316.jpg

5 JDK

记得安装JDK,java -version检查一下,如果没有安装,请安装一下。

springboot2.3.3中关于spring-data-elasticsearch-4.0.3的使用_阿良~的博客-CSDN博客

依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<!-- spring boot 2.3.7 以上 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.7.RELEASE</version>
<relativePath/>
</parent>

<!-- 2.3.7.RELEASE 支持 ES 7.X -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<version>2.3.7.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

配置文件

1
2
3
4
spring:
elasticsearch:
rest:
uris: <http://localhost:9200>

对象映射

  • 首先要清楚注解的使用。

    • @Document:类注解,以指示该类是映射到数据库的候选对象。最重要的属性是:
      • indexName:用于存储此实体的索引的名称
      • type:映射类型。如果未设置,则使用小写的类的简单名称。(当前版本开始不推荐使用)
      • shards:索引的分片数。(经测试,分片数无法设置,应该是个BUG,推荐先创建索引设置好分片数和副本数)
      • replicas:索引的副本数。(经测试,分片数无法设置,应该是个BUG)
      • refreshIntervall:索引的刷新间隔。用于索引创建。默认值为“ 1s”。
      • indexStoreType:索引的索引存储类型。用于索引创建。默认值为“ fs”。
      • createIndex:配置是否在存储库引导中创建索引。默认值为true。
      • versionType:版本管理的配置。默认值为EXTERNAL。
    • @Id:字段注解,以标记用于标识目的的字段。
    • @Field:在字段级别应用并定义字段的属性,大多数属性映射到各自的Elasticsearch映射定义(以下列表不完整,请查看注释Javadoc以获取完整的参考):
      • name:字段名称,它将在Elasticsearch文档中表示,如果未设置,则使用Java字段名称。
      • type:字段类型,可以是文本,关键字,长整数,短整数,字节,双精度,浮点型,半浮点数,标度浮点数,日期,布尔值,二进制,整数等。(属性(FieldType)类型不灵活)
      • format和日期类型的pattern定义。必须为日期类型定义。
      • store:标记是否将原始字段值存储在Elasticsearch中,默认值为false。
      • analyzer,searchAnalyzer,normalizer用于指定自定义分析和正规化。
    • @GeoPoint:将字段标记为geo_point数据类型。如果字段是GeoPoint类的实例,则可以省略。
  • 示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    import org.springframework.data.annotation.Id;
    import org.springframework.data.elasticsearch.annotations.Document;
    import org.springframework.data.elasticsearch.annotations.Field;
    import org.springframework.data.elasticsearch.annotations.FieldType;

    /**
    * 经测试,更改 shards,replicas的值无法更改
    */
    @Document(indexName = "stu", shards = 3, replicas = 2, createIndex = false)
    public class Stu {

    @Id
    private Long stuId;

    @Field(store = true)
    private String name;

    @Field(store = true)
    private Integer age;

    @Field(store = true)
    private Float money;

    @Field(store = true, type = FieldType.Keyword)
    private String sign;

    @Field(store = true)
    private String description;

    // 省略 getter、setter

    @Override
    public String toString() {
    return "Stu{" +
    "stuId=" + stuId +
    ", name='" + name + '\\'' +
    ", age=" + age +
    ", money=" + money +
    ", sign='" + sign + '\\'' +
    ", description='" + description + '\\'' +
    '}';
    }
    }

ElasticsearchRestTemplate 用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import com.fengxuechao.FoodieSearchApp;
import com.fengxuechao.es.pojo.Stu;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = FoodieSearchApp.class)
public class ESTest {

@Autowired
private ElasticsearchRestTemplate esTemplate;

/**
* 不建议使用 ElasticsearchRestTemplate 对索引进行管理(创建索引,更新映射,删除索引)
* 索引就像是数据库或者数据库中的表,我们平时是不会是通过java代码频繁的去创建修改删除数据库或者表的
* 我们只会针对数据做CRUD的操作
* 在es中也是同理,我们尽量使用 ElasticsearchRestTemplate 对文档数据做CRUD的操作
* 1. 属性(FieldType)类型不灵活
* 2. 主分片与副本分片数无法设置
*/

@Test
public void createIndexStu() {

Stu stu = new Stu();
stu.setStuId(1007L);
stu.setName("iron man");
stu.setAge(22);
stu.setMoney(1000.8f);
stu.setSign("I am iron man");
stu.setDescription("I have a spider man");

IndexQuery indexQuery = new IndexQueryBuilder().withObject(stu).build();
esTemplate.index(indexQuery, esTemplate.getIndexCoordinatesFor(stu.getClass()));
}

@Test
public void deleteIndexStu() {
esTemplate.indexOps(Stu.class).delete();
}

// ------------------------- 我是分割线 --------------------------------

@Test
public void updateStuDoc() {

Map<String, Object> sourceMap = new HashMap<>();
sourceMap.put("name", "spider man");
sourceMap.put("money", 99.8f);
sourceMap.put("age", 33);

IndexRequest indexRequest = new IndexRequest();
indexRequest.source(sourceMap);

UpdateQuery updateQuery = UpdateQuery.builder("1004")
.withDocument(Document.from(sourceMap))
.build();

// update stu set sign='abc',age=33,money=88.6 where docId='1002'

esTemplate.update(updateQuery, IndexCoordinates.of("stu"));
}

@Test
public void getStuDoc() {
Stu stu = esTemplate.get("1004", Stu.class);

System.out.println(stu);
}

@Test
public void deleteStuDoc() {
esTemplate.delete("1002", IndexCoordinates.of("stu"));
}

// ------------------------- 我是分割线 --------------------------------

@Test
public void searchStuDoc() {

Pageable pageable = PageRequest.of(0, 2);

NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.matchQuery("description", "spider man"))
.withPageable(pageable)
.build();
SearchHits<Stu> hits = esTemplate.search(query, Stu.class, esTemplate.getIndexCoordinatesFor(Stu.class));
System.out.println("检索后的总分页数目为:" + hits.getTotalHits());
for (SearchHit<Stu> hit : hits) {
System.out.println(hit.getContent());
}

}

@Test
public void highlightStuDoc() {

String preTag = "<font color='red'>";
String postTag = "</font>";

Pageable pageable = PageRequest.of(0, 10);

SortBuilder sortBuilder = new FieldSortBuilder("money")
.order(SortOrder.DESC);
SortBuilder sortBuilderAge = new FieldSortBuilder("age")
.order(SortOrder.ASC);

NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.matchQuery("description", "spider man"))
.withHighlightFields(new HighlightBuilder.Field("description")
.preTags(preTag)
.postTags(postTag))
.withSort(sortBuilder)
.withSort(sortBuilderAge)
.withPageable(pageable)
.build();

SearchHits<Stu> hits = esTemplate.search(query, Stu.class);

System.out.println("检索后的总分页数目为:" + hits.getTotalHits());
// 将
List<SearchHit<Stu>> list = hits.get()
.peek(stuSearchHit -> stuSearchHit.getContent().setDescription(stuSearchHit.getHighlightField("description").get(0))).collect(Collectors.toList());
for (SearchHit<Stu> hit : list) {
System.out.println(hit.getContent());
}

}
}

创建工程,引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<!--<version>2.1.5.RELEASE</version>-->
<version>2.2.2.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

配置yml

1
2
3
4
5
spring:
data:
elasticsearch:
cluster-name: es6
cluster-nodes: 192.168.1.187:9300

版本协调

目前springboot-data-elasticsearch中的es版本贴合为es-6.4.3,如此一来版本需要统一,把es进行降级。等springboot升级es版本后可以在对接最新版的7.4。

Netty issue fix

1
2
3
4
5
6
7
8
9
10
11
12
@Configuration
public class ESConfig {

/**
* 解决netty引起的issue
*/
@PostConstruct
void init() {
System.setProperty("es.set.netty.runtime.available.processors", "false");
}

}

附 elasticsearch6.4.3配置文件

elasticsearch.yml

1
2
3
4
5
6
cluster.name: es6
node.name: node0
path.data: /usr/local/elasticsearch-6.4.3/data
path.logs: /usr/local/elasticsearch-6.4.3/logs
network.host: 0.0.0.0
./elasticsearch

如果出现如下错误:

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/es-17_1.jpg

那么需要切换到root用户下去修改配置如下:

1
2
3
4
5
6
vim /etc/security/limits.conf
soft nofile 65536
* hard nofile 131072
* soft nproc 2048
* hard nproc 4096
vim /etc/sysctl.conf

https://raw.githubusercontent.com/littlefxc/littlefxc.github.io/images/images/es-17_2.jpg

别忘记 sysctl -p 刷新一下

最后再次启动OK

Don’t forget!

中文分词器也需要去配置一下噢别忘记!:)中文分词器的版本要记得使用6,而不是之前的7,版本一定要贴合噢~比如目前的所有版本都是统一为es-6.4.3,那么下载地址为:https://github.com/medcl/elasticsearch-analysis-ik/releases/tag/v6.4.3

什么是脑裂

如果发生网络中断或者服务器宕机,那么集群会有可能被划分为两个部分,各自有自己的master来管理,那么这就是脑裂。

脑裂解决方案

master主节点要经过多个master节点共同选举后才能成为新的主节点。就跟班级里选班长一样,并不是你1个人能决定的,需要班里半数以上的人决定。

解决实现原理:半数以上的节点同意选举,节点方可成为新的master。

  • discovery.zen.minimum_master_nodes=(N/2)+1
    • N为集群的中master节点的数量,也就是那些 node.master=true 设置的那些服务器节点总数。

ES 7.X

在最新版7.x中,minimum_master_node这个参数已经被移除了,这一块内容完全由es自身去管理,这样就避免了脑裂的问题,选举也会非常快。

前置操作

需要确认其它es节点中的data目录,一定要清空,不能有数据。

配置集群

修改elasticsearch.yml这个配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 配置集群名称,保证每个节点的名称相同,如此就能都处于一个集群之内了
cluster.name: es-cluster

# 每一个节点的名称,必须不一样
node.name: es-node1

# http端口(使用默认即可)
http.port: 9200

# 主节点,作用主要是用于来管理整个集群,负责创建或删除索引,管理其他非master节点(相当于企业老总)
node.master: true

# 数据节点,用于对文档数据的增删改查
node.data: true

# 集群列表
discovery.seed_hosts: ["192.168.1.184", "192.168.1.185", "192.168.1.186"]

# 启动的时候使用一个master节点
cluster.initial_master_nodes: ["es-node1"]

最后可以通过如下命令查看配置文件的内容:

1
more elasticsearch.yml | grep ^[^#]

1 概念

当Consul中注册的服务信息发生变化的时候,我们除了定时通过接口去查询最新的服务信息之外,consul还提供了watch机制,通过监控consul数据的变化,主动通知。

目前conusl watch支持两种通知方式:可执行程序Http接口

consul watch支持监控的数据类型:

  • services - 监控指定列表服务的可用性
  • service - 监控指定服务的实例
  • nodes - 监控节点
  • key - 监控特定的键值对
  • keyprefix - 监控consul配置中心的前缀
  • checks - 监控健康检查的值
  • event - 监控自定义用户事件

也就是说,consul支持监控服务、键值数据、节点信息,只要发生变化,就通知我们。

2 基本用法

我们以监控键值数据的变化为例子,介绍watch机制的用法。

watch监控的配置信息是跟consul的配置文件放在一起的,通过配置文件中的watches字段,设置监控信息。

说明:watches的配置,大致用法就是通过type,配置监控数据的类型,然后根据不同数据类型配置不同的参数,最后选择一种通知方式进行配置。

2.1 通过可执行程序通知

当我们监控的数据发生变化,consul agent会调用我们配置的可执行程序(命令、脚本等等),并且通过标准输入,以Json格式传入通知的参数, 我们只要在程序中根据参数处理业务即可。

例子:

下面是我们consul agent的配置,文件名是 /etc/consul/watches.json

1
2
3
4
5
6
7
8
9
10
11
{
"datacenter": "dc1",
"data_dir": "/var/consul",
"ui": true,
"watches": [{
"type": "key",
"key": "foo/bar/baz",
"handler_type": "script",
"args": ["/usr/bin/my-service-handler.sh", "-redis"]
}]
}

这个例子的意思,监控key=foo/bar/baz的键值数据,如果数据发生变化,就调用/usr/bin/my-service-handler.sh -redis 命令。

参数说明:

  • datacenter - 数据中心的名字

  • data_dir - consul数据存放目录

  • ui - 开启consul ui

  • watches - 配置监控信息,如果监控的数据发生变化,则根据配置执行通知。

    watches参数说明:

    • type - 监控的数据类型
    • key - 监控的键值数据的Key
    • handler_type - 通知类型,支持script和http
    • args - 配置通知类型为script的,执行命令,是一个数组,第一个元素是命令,后面第2个到第N个元素是命令的参数。

例子的watches配置的意思就是:

监控key=foo/bar/baz的键值数据,如果数据发生变化,就调用/usr/bin/my-service-handler.sh -redis 命令, 这个命令可以通过标准输入,接收变化的数据。

启动consul,就会加载watches配置。

2.2 通过Http接口通知

通过http接口通知数据变化,大体上配置跟上面一样,区别是多了一些http接口的配置参数。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"datacenter": "dc1",
"data_dir": "/Users/jogin/Documents/work/local/consul/data",
"ui": true,
"watches": [{
"type": "key",
"key": "foo/bar/baz",
"handler_type": "http",
"http_handler_config": {
"path": "<https://localhost:8000/watch>",
"method": "POST",
"header": {
"x-foo": ["bar", "baz"]
},
"timeout": "10s",
"tls_skip_verify": false
}
}]
}

这个例子的意思,就是当key=foo/bar/baz的键值数据发生变化,就通过https://localhost:8000/watch通知我们。

我们关注watches字段的配置信息,下面是参数说明:

  • type - watch监控类型是key

  • key - 监控foo/bar/baz这个key

  • handler_type - 通知类型, http

  • http_handler_config - 配置http通知信息。

    http_handler_config参数说明:

    • path - 通知Url
    • method - http请求方法
    • header - 自定义Http请求头,没有可以忽略
    • timeout - 超时时间,10秒
    • tls_skip_verify - 是否跳过tls验证

3 监控服务

上面介绍了watch的基本用法,我们也可以监控服务信息的变化,例如当有人注册新的服务或者服务不可用的时候,通知我们。

我们忽略掉,consul agent的配置,单独看watches的配置。

监控所有的服务的配置

1
2
3
4
5
6
7
8
9
{
"watches": [{
"type": "services",
"handler_type": "http",
"http_handler_config": {
...忽略...
}
}]
}

监控单个服务的配置

1
2
3
4
5
6
7
8
9
10
{
"watches": [{
"type": "service",
"service": "要监控的服务名",
"handler_type": "http",
"http_handler_config": {
...忽略...
}
}]
}

4 监控键值数据

前面介绍过基本Key的监控,其实我们还可以通过key的前缀,批量监控一批key,只要key的前缀相同,这些Key下面的数据发生变化,都会发送通知。

1
2
3
4
5
6
7
{
"watches": [{
"type": "keyprefix",
"prefix": "foo/",
"args": ["/usr/bin/my-service-handler.sh", "-redis"]
}]
}

监控类型为:keyprefix

通过prefix字段,配置key的前缀。

这个配置的意思就是:以foo/开头的Key, 数据发生变化,都会执行args参数,配置的命令。

5 监控节点

我们也可以监控consul集群节点的变化信息。

1
2
3
4
5
6
7
8
9
{
"watches": [{
"type": "nodes",
"handler_type": "http",
"http_handler_config": {
...忽略...
}
}]
}

没有其他额外的参数,type=nodes即可,当节点信息发生变化,会根据配置的方式通知。

参考资源

Consul by HashiCorp

ExplorerMan

Consul Watches监控服务变化

开发者头条

Consul 单机集群搭建