0%

SpringEl + aop + jedis实现缓存管理

目的:使用 aop 配合 jedis 和 spring expression 实现缓存管理

切点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import java.lang.annotation.*;

/**
* 配合{@link CacheAspect},该注解是切入点
* @author fengxuechao
* @version 0.1
* @date 2019/8/26
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface Cacheable {

/**
* redis key 的前缀
* @return
*/
String prefix() default "";

/**
* redis key
* @return
*/
String key();
}

切面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* @author fengxuechao
* @version 0.1
* @date 2019/9/2
*/
@Slf4j
@Aspect
@Component
public class CacheAspect {

@Autowired
private JedisCluster jedisCluster;

@Autowired
private AthenaIotProperties properties;

/**
* 模仿spring cache
*
* @param pjp
* @return
* @throws Throwable
*/
@Around("@annotation(Cacheable)")
public Object cacheable(ProceedingJoinPoint pjp) throws Throwable {
boolean info = log.isInfoEnabled();
Method method = AopUtils.getMethod(pjp);
Cacheable cacheable = method.getAnnotation(Cacheable.class);
String key = getELString(cacheable, method, pjp.getArgs());
int expireIn = properties.getTokenExpireIn();
Boolean existKey = jedisCluster.exists(key);
if (!existKey) {
if (info) {
log.info("缓存中没有 key:{}", key);
}
Object obj = pjp.proceed();
String value = JSON.toJSONString(obj);
jedisCluster.setex(key, expireIn, value);
if (info) {
log.info("缓存token:key={}, expireIn={}, value={}", key, expireIn, value);
}
return obj;
}
String athenaIotTokenString = jedisCluster.get(key);
if (info) {
log.info("缓存token:key={}, expireIn={}, value={}", key, expireIn, athenaIotTokenString);
}
return JSON.parseObject(athenaIotTokenString, method.getReturnType());
}

private String getELString(Cacheable cacheable, Method method, Object[] args) {
ExpressionParser parser = new SpelExpressionParser();
EvaluationContext context = new StandardEvaluationContext();
//获取被拦截方法参数名列表(使用Spring支持类库)
LocalVariableTableParameterNameDiscoverer nameDiscoverer = new LocalVariableTableParameterNameDiscoverer();
String[] paraNameArr = nameDiscoverer.getParameterNames(method);
//把方法参数放入SPEL上下文中
for (int i = 0; i < paraNameArr.length; i++) {
context.setVariable(paraNameArr[i], args[i]);
}
String keyPrefix = parser.parseExpression(cacheable.prefix()).getValue(context, String.class);
String key = parser.parseExpression(cacheable.key()).getValue(context, String.class);
return MessageFormat.format("{0}{1}", keyPrefix, key);
}

}

实现

1
2
3
4
5
6
7
8
9
@Cacheable(prefix = "'token:'", key = "#authRequest.appId+':'+#authRequest.secret")
//@Cacheable(value = "token", key = "#p0.appId+':'+#p0.secret")
@Override
public AccessToken getAccessToken(AuthenticationRequest authRequest) {
String tokenUrl = authRequest.getTokenUrl();
ParameterizedTypeReference typeReference = new ParameterizedTypeReference<ResultBean<AccessToken>>(){};
ResponseEntity<ResultBean<AccessToken>> entity = restTemplate.exchange(tokenUrl, HttpMethod.GET, null, typeReference);
return entity.getBody().getData();
}

Java8-LocalDateTime记录

文中都使用的时区都是东8区,也就是北京时间。这是为了防止服务器设置时区错误时导致时间不对,如果您是其他时区,请自行修改。

关键类介绍

– Instant:它代表的是时间戳
– LocalDate:不包含具体时间的日期,比如2019-08-28。它可以用来存储生日,周年纪念日,入职日期等。
– LocalTime:它代表的是不含日期的时间
– LocalDateTime:它包含了日期及时间,不过还是没有偏移信息或者说时区。
– ZonedDateTime:这是一个包含时区的完整的日期时间,偏移量是以UTC/格林威治时间为基准的。

Java8新增的DateTimeFormatter与SimpleDateFormat的区别

两者最大的区别是,Java8的DateTimeFormatter是线程安全的,而SimpleDateFormat并不是线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;

public class Main {

public static void main(String args[]){

//解析日期
String dateStr= "2016年10月25日";
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy年MM月dd日");
LocalDate date= LocalDate.parse(dateStr, formatter);

//日期转换为字符串
LocalDateTime now = LocalDateTime.now();
DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy年MM月dd日 hh:mm a");
String nowStr = now .format(format);
System.out.println(nowStr);

//ThreadLocal来限制SimpleDateFormat
System.out.println(format(new Date()));
}

//要在高并发环境下能有比较好的体验,可以使用ThreadLocal来限制SimpleDateFormat只能在线程内共享,这样就避免了多线程导致的线程安全问题。
private static ThreadLocal<DateFormat> threadLocal = new ThreadLocal<DateFormat>() {
@Override
protected DateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
};

public static String format(Date date) {
return threadLocal.get().format(date);
}

}

LocalDateTime获取毫秒数

1
2
3
4
//获取秒数
Long second = LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8"));
//获取毫秒数
Long milliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();

LocalDateTime与String互转

1
2
3
4
5
6
7
8
//时间转字符串格式化
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS");
String dateTime = LocalDateTime.now(ZoneOffset.of("+8")).format(formatter);

//字符串转时间
String dateTimeStr = "2018-07-28 14:11:15";
DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime dateTime = LocalDateTime.parse(dateTimeStr, df);

Date与LocalDateTime互转

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//将java.util.Date 转换为java8 的java.time.LocalDateTime,默认时区为东8区
public static LocalDateTime dateConvertToLocalDateTime(Date date) {
return date.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDateTime();
}

//将java8 的 java.time.LocalDateTime 转换为 java.util.Date,默认时区为东8区
public static Date localDateTimeConvertToDate(LocalDateTime localDateTime) {
return Date.from(localDateTime.toInstant(ZoneOffset.of("+8")));
}

/**
* 测试转换是否正确
*/
@Test
public void testDateConvertToLocalDateTime() {
Date date = DateUtils.parseDate("2018-08-01 21:22:22", DateUtils.DATE_YMDHMS);
LocalDateTime localDateTime = DateUtils.dateConvertToLocalDateTime(date);
Long localDateTimeSecond = localDateTime.toEpochSecond(ZoneOffset.of("+8"));
Long dateSecond = date.toInstant().atOffset(ZoneOffset.of("+8")).toEpochSecond();
Assert.assertTrue(dateSecond.equals(localDateTimeSecond));
}

kafka 单独设置某个topic的数据过期时间

kafka 默认存放7天的临时数据,如果遇到磁盘空间小,存放数据量大,可以设置缩短这个时间。

全局设置

修改 server.properties

1
2
log.retention.hours=72
log.cleanup.policy=delete

单独对某一个topic设置过期时间

但如果只有某一个topic数据量过大。
想单独对这个topic的过期时间设置短点:

1
./kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name mytopic --entity-type topics --add-config retention.ms=86400000

retention.ms=86400000 为一天,单位是毫秒。

查看设置:

1
2
$ ./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name mytopic --entity-type topics
Configs for topics:wordcounttopic are retention.ms=86400000

立即删除某个topic下的数据

1
./kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic --config cleanup.policy=delete

之前使用eureka时,注册服务的ID 是随机数,eureka上不会出现同一服务多实例的问题。但是,换上了 consul 作为注册中心后,却出现同一个服务拥有多个实例的问题,上次服务挂掉之后的实例还在注册中心上挂着,每次重启多一个实例。

有什么办法去解决这个问题?答案就是自定义spring cloud consul 的注册方法,使其唯一化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ServiceIdRegister extends ConsulServiceRegistry {

public ServiceIdRegister(ConsulClient client, ConsulDiscoveryProperties properties, TtlScheduler ttlScheduler, HeartbeatProperties heartbeatProperties) {
super(client, properties, ttlScheduler, heartbeatProperties);
}

@Override
public void register(ConsulRegistration reg) {
//重新设计id, 服务命-ip-port
reg.getService().setId(reg.getService().getName() + "-" + reg.getService().getAddress() + "-" + reg.getPort());
super.register(reg);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
@ConditionalOnConsulEnabled
public class ConsulConfig {

@Autowired(required = false)
private TtlScheduler ttlScheduler;

/**
* 重写register方法
*
* @param consulClient
* @param properties
* @param heartbeatProperties
* @return
*/
@Bean
public ServiceIdRegister consulServiceRegistry(ConsulClient consulClient, ConsulDiscoveryProperties properties,
HeartbeatProperties heartbeatProperties) {
return new ServiceIdRegister(consulClient, properties, ttlScheduler, heartbeatProperties);
}

}

1
2
3
4
5
6
spring:
cloud:
consul:
discovery:
# 健康检查失败多长时间后,取消注册
health-check-critical-timeout: 30s

在配置文件中如上配置后可以使得服务下线后自动删除无效服务,而不必像很多的博客中写的那样专门写一个删除失效服务。

其它的配置属性解析:

  • spring.cloud.consul.host:配置consul地址
  • spring.cloud.consul.port:配置consul端口
  • spring.cloud.consul.discovery.enabled:启用服务发现
  • spring.cloud.consul.discovery.register:启用服务注册
  • spring.cloud.consul.discovery.deregister:服务停止时取消注册
  • spring.cloud.consul.discovery.prefer-ip-address:表示注册时使用IP而不是hostname
  • spring.cloud.consul.discovery.health-check-interval:健康检查频率
  • spring.cloud.consul.discovery.health-check-path:健康检查路径
  • spring.cloud.consul.discovery.health-check-critical-timeout:健康检查失败多长时间后,取消注册
  • spring.cloud.consul.discovery.instance-id:服务注册标识

在SpringBoot1.5.x下如何使RedisTokenStore集群化

在 spring boot 1.5.x 下 spring-boot-starter-data-redis 默认使用 jedis 作为客户端。

因为 JedisCluster 不支持集群的管道操作(pipleline),但是项目中又要用到 Redis 集群,这时候该怎么办呢?

现在,提供两种解决办法:

  1. 重写 RedisTokenStore, 用 RedisTemplateTokenStore
  2. 将 jedis 换掉,使用 spring boot 2.x 中默认的 redis 客户端 lettuce 来支持 Redis 集群(推荐)

解决办法 1:重写 RedisTokenStore

因为 JedisCluster 不支持管道操作:(源码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class JedisClusterConnection implements RedisClusterConnection {
/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.RedisConnection#openPipeline()
*/
@Override
public void openPipeline() {
throw new UnsupportedOperationException("Pipeline is currently not supported for JedisClusterConnection.");
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.RedisConnection#closePipeline()
*/
@Override
public List<Object> closePipeline() throws RedisPipelineException {
throw new UnsupportedOperationException("Pipeline is currently not supported for JedisClusterConnection.");
}
}

因此可以使用 RedisTemplate 重写 RedisTokenStore,虽然会导致性能的损失,但至少能用不是吗

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package com.fengxuechao.examples.auth.provider.token.store;

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.security.oauth2.common.OAuth2AccessToken;
import org.springframework.security.oauth2.common.OAuth2RefreshToken;
import org.springframework.security.oauth2.provider.OAuth2Authentication;
import org.springframework.security.oauth2.provider.token.AuthenticationKeyGenerator;
import org.springframework.security.oauth2.provider.token.DefaultAuthenticationKeyGenerator;
import org.springframework.security.oauth2.provider.token.TokenStore;

import java.util.*;
import java.util.concurrent.TimeUnit;

/**
* @author fengxuechao
* @version 0.1
* @date 2019/6/21
*/
public class RedisTemplateTokenStore implements TokenStore {
private static final String ACCESS = "access:";
private static final String AUTH_TO_ACCESS = "auth_to_access:";
private static final String AUTH = "auth:";
private static final String REFRESH_AUTH = "refresh_auth:";
private static final String ACCESS_TO_REFRESH = "access_to_refresh:";
private static final String REFRESH = "refresh:";
private static final String REFRESH_TO_ACCESS = "refresh_to_access:";
private static final String CLIENT_ID_TO_ACCESS = "client_id_to_access:";
private static final String UNAME_TO_ACCESS = "uname_to_access:";

private RedisTemplate<String,Object> redisTemplate ;

public RedisTemplate<String,Object> getRedisTemplate() {
return redisTemplate;
}

public void setRedisTemplate(RedisTemplate<String,Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}

private AuthenticationKeyGenerator authenticationKeyGenerator = new DefaultAuthenticationKeyGenerator();

public void setAuthenticationKeyGenerator(AuthenticationKeyGenerator authenticationKeyGenerator) {
this.authenticationKeyGenerator = authenticationKeyGenerator;
}

@Override
public OAuth2AccessToken getAccessToken(OAuth2Authentication authentication) {
String key = authenticationKeyGenerator.extractKey(authentication);
OAuth2AccessToken accessToken = (OAuth2AccessToken) redisTemplate.opsForValue().get(AUTH_TO_ACCESS+key);
if (accessToken != null
&& !key.equals(authenticationKeyGenerator.extractKey(readAuthentication(accessToken.getValue())))) {
// Keep the stores consistent (maybe the same user is represented by this authentication but the details
// have changed)
storeAccessToken(accessToken, authentication);
}
return accessToken;
}

@Override
public OAuth2Authentication readAuthentication(OAuth2AccessToken token) {
return readAuthentication(token.getValue());
}

@Override
public OAuth2Authentication readAuthentication(String token) {
return (OAuth2Authentication) this.redisTemplate.opsForValue().get(AUTH + token);
}

@Override
public OAuth2Authentication readAuthenticationForRefreshToken(OAuth2RefreshToken token) {
return readAuthenticationForRefreshToken(token.getValue());
}

public OAuth2Authentication readAuthenticationForRefreshToken(String token) {
return (OAuth2Authentication) this.redisTemplate.opsForValue().get( REFRESH_AUTH+token);
}

@Override
public void storeAccessToken(OAuth2AccessToken token, OAuth2Authentication authentication) {

this.redisTemplate.opsForValue().set(ACCESS+ token.getValue(), token);
this.redisTemplate.opsForValue().set(AUTH +token.getValue(), authentication);
this.redisTemplate.opsForValue().set(AUTH_TO_ACCESS+authenticationKeyGenerator.extractKey(authentication), token);
if (!authentication.isClientOnly()) {
redisTemplate.opsForList().rightPush(UNAME_TO_ACCESS+getApprovalKey(authentication), token) ;
}

redisTemplate.opsForList().rightPush(CLIENT_ID_TO_ACCESS+authentication.getOAuth2Request().getClientId(), token) ;

if (token.getExpiration() != null) {

int seconds = token.getExpiresIn();
redisTemplate.expire(ACCESS+ token.getValue(), seconds, TimeUnit.SECONDS) ;
redisTemplate.expire(AUTH+ token.getValue(), seconds, TimeUnit.SECONDS) ;

redisTemplate.expire(AUTH_TO_ACCESS+ authenticationKeyGenerator.extractKey(authentication), seconds, TimeUnit.SECONDS) ;
redisTemplate.expire(CLIENT_ID_TO_ACCESS+authentication.getOAuth2Request().getClientId(), seconds, TimeUnit.SECONDS) ;
redisTemplate.expire(UNAME_TO_ACCESS+ getApprovalKey(authentication), seconds, TimeUnit.SECONDS) ;
}
if (token.getRefreshToken() != null && token.getRefreshToken().getValue() != null) {
this.redisTemplate.opsForValue().set( REFRESH_TO_ACCESS+ token.getRefreshToken().getValue(), token.getValue());
this.redisTemplate.opsForValue().set(ACCESS_TO_REFRESH+token.getValue(), token.getRefreshToken().getValue());
}
}

private String getApprovalKey(OAuth2Authentication authentication) {
String userName = authentication.getUserAuthentication() == null ? "" : authentication.getUserAuthentication()
.getName();
return getApprovalKey(authentication.getOAuth2Request().getClientId(), userName);
}

private String getApprovalKey(String clientId, String userName) {
return clientId + (userName==null ? "" : ":" + userName);
}

@Override
public void removeAccessToken(OAuth2AccessToken accessToken) {
removeAccessToken(accessToken.getValue());
}

@Override
public OAuth2AccessToken readAccessToken(String tokenValue) {
return (OAuth2AccessToken) this.redisTemplate.opsForValue().get(ACCESS+tokenValue);
}

public void removeAccessToken(String tokenValue) {
OAuth2AccessToken removed = (OAuth2AccessToken) redisTemplate.opsForValue().get(ACCESS+tokenValue);
// Don't remove the refresh token - it's up to the caller to do that
OAuth2Authentication authentication = (OAuth2Authentication) this.redisTemplate.opsForValue().get(AUTH+tokenValue);

this.redisTemplate.delete(AUTH+tokenValue);
redisTemplate.delete(ACCESS+tokenValue);
this.redisTemplate.delete(ACCESS_TO_REFRESH +tokenValue);

if (authentication != null) {
this.redisTemplate.delete(AUTH_TO_ACCESS+authenticationKeyGenerator.extractKey(authentication));

String clientId = authentication.getOAuth2Request().getClientId();
// redisTemplate.opsForList().rightPush("UNAME_TO_ACCESS:"+getApprovalKey(authentication), token) ;
redisTemplate.opsForList().leftPop(UNAME_TO_ACCESS+getApprovalKey(clientId, authentication.getName()));
redisTemplate.opsForList().leftPop(CLIENT_ID_TO_ACCESS+clientId);

this.redisTemplate.delete(AUTH_TO_ACCESS+authenticationKeyGenerator.extractKey(authentication));
}
}

@Override
public void storeRefreshToken(OAuth2RefreshToken refreshToken, OAuth2Authentication authentication) {
this.redisTemplate.opsForValue().set(REFRESH+refreshToken.getValue(), refreshToken);
this.redisTemplate.opsForValue().set( REFRESH_AUTH + refreshToken.getValue(), authentication);
}

@Override
public OAuth2RefreshToken readRefreshToken(String tokenValue) {
return (OAuth2RefreshToken) this.redisTemplate.opsForValue().get(REFRESH+tokenValue);
}

@Override
public void removeRefreshToken(OAuth2RefreshToken refreshToken) {
removeRefreshToken(refreshToken.getValue());
}

public void removeRefreshToken(String tokenValue) {
this.redisTemplate.delete( REFRESH + tokenValue);
this.redisTemplate.delete( REFRESH_AUTH + tokenValue);
this.redisTemplate.delete(REFRESH_TO_ACCESS +tokenValue);
}

@Override
public void removeAccessTokenUsingRefreshToken(OAuth2RefreshToken refreshToken) {
removeAccessTokenUsingRefreshToken(refreshToken.getValue());
}

private void removeAccessTokenUsingRefreshToken(String refreshToken) {

String token = (String) this.redisTemplate.opsForValue().get( REFRESH_TO_ACCESS +refreshToken) ;

if (token != null) {
redisTemplate.delete(ACCESS+ token);
}
}

@Override
public Collection<OAuth2AccessToken> findTokensByClientIdAndUserName(String clientId, String userName) {
List<Object> result = redisTemplate.opsForList().range(UNAME_TO_ACCESS+ getApprovalKey(clientId, userName), 0, -1);

if (result == null || result.size() == 0) {
return Collections.<OAuth2AccessToken> emptySet();
}
List<OAuth2AccessToken> accessTokens = new ArrayList<OAuth2AccessToken>(result.size());

for(Iterator<Object> it = result.iterator(); it.hasNext();){
OAuth2AccessToken accessToken = (OAuth2AccessToken) it.next();
accessTokens.add(accessToken);
}

return Collections.<OAuth2AccessToken> unmodifiableCollection(accessTokens);
}

@Override
public Collection<OAuth2AccessToken> findTokensByClientId(String clientId) {
List<Object> result = redisTemplate.opsForList().range((CLIENT_ID_TO_ACCESS+clientId), 0, -1);

if (result == null || result.size() == 0) {
return Collections.<OAuth2AccessToken> emptySet();
}
List<OAuth2AccessToken> accessTokens = new ArrayList<OAuth2AccessToken>(result.size());

for(Iterator<Object> it = result.iterator();it.hasNext();){
OAuth2AccessToken accessToken = (OAuth2AccessToken) it.next();
accessTokens.add(accessToken);
}

return Collections.<OAuth2AccessToken> unmodifiableCollection(accessTokens);
}
}

解决办法 2:使用 lettuce 替换 jedis

我们可以使用 Lettuce 来替代 jedis,况且 lettuce 也是 spring boot 2.x 中默认的 redis 客户端。

POM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<artifactId>jedis</artifactId>
<groupId>redis.clients</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- lettuce 客户端 -->
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.0.5.RELEASE</version>
<scope>compile</scope>
</dependency>
<!-- lettuce 客户端 -->
<dependency>
<groupId>biz.paluch.redis</groupId>
<artifactId>lettuce</artifactId>
<version>4.5.0.Final</version>
</dependency>
<!-- lettuce 连接池 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>

配置文件 application.yml

1
2
3
4
5
6
7
8
9
spring:
redis:
cluster:
nodes: 192.168.213.13:7001,192.168.213.14:7003,192.168.213.21:7006
max-redirects: 5
logging:
level:
root: info
com.fengxuechao.examples.auth: debug

配置 LettuceConnectionFactory 和 RedisTokenStore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.fengxuechao.examples.auth.config;

import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;

/**
* Redis 配置
*
* @author fengxuechao
* @version 0.1
* @date 2019/6/24
*/
@EnableConfigurationProperties(RedisProperties.class)
@Configuration
public class RedisConfig {

/**
* 使用 lettuce 作为 redis 的连接池
*
* @param configuration
* @return
*/
@Bean
public LettuceConnectionFactory lettuceConnectionFactory(RedisClusterConfiguration configuration) {
return new LettuceConnectionFactory(configuration);
}

/**
* lettuce 集群配置
*/
@Bean
public RedisClusterConfiguration getClusterConfiguration(RedisProperties redisProperties) {
RedisProperties.Cluster clusterProperties = redisProperties.getCluster();
RedisClusterConfiguration config = new RedisClusterConfiguration(clusterProperties.getNodes());

if (clusterProperties.getMaxRedirects() != null) {
config.setMaxRedirects(clusterProperties.getMaxRedirects());
}
return config;
}

@Bean
public TokenStore tokenStore(LettuceConnectionFactory lettuceConnectionFactory) {
return new RedisTokenStore(lettuceConnectionFactory);
}
}

解决Elasticsearch-head插件链接不上服务

修改elasticsearch.yml文件

1
2
3
4
vim $ES_HOME$/config/elasticsearch.yml
# 增加如下字段
http.cors.enabled: true
http.cors.allow-origin: "*"

重启ES

filebeat

filebeat合并多行日志示例

本节中的示例包括以下内容:

  • 将Java堆栈跟踪日志组合成一个事件
  • 将C风格的日志组合成一个事件
  • 结合时间戳处理多行事件

Java堆栈跟踪

Java示例一

Java堆栈跟踪由多行组成,每一行在初始行之后以空格开头,如本例中所述:

1
2
3
4
Exception in thread "main" java.lang.NullPointerException
at com.example.myproject.Book.getTitle(Book.java:16)
at com.example.myproject.Author.getBookTitles(Author.java:25)
at com.example.myproject.Bootstrap.main(Bootstrap.java:14)

要将这些行整合到Filebeat中的单个事件中,请使用以下多行配置:

1
2
3
multiline.pattern: '^[[:space:]]'
multiline.negate: false
multiline.match: after

此配置将以空格开头的所有行合并到上一行。

Java示例二

下面是一个Java堆栈跟踪日志,稍微复杂的例子:

1
2
3
4
5
6
7
Exception in thread "main" java.lang.IllegalStateException: A book has a null property
at com.example.myproject.Author.getBookIds(Author.java:38)
at com.example.myproject.Bootstrap.main(Bootstrap.java:14)
Caused by: java.lang.NullPointerException
at com.example.myproject.Book.getId(Book.java:22)
at com.example.myproject.Author.getBookIds(Author.java:35)
... 1 more

要将这些行整合到Filebeat中的单个事件中,请使用以下多行配置:

1
2
3
multiline.pattern: '^[[:space:]]+(at|\.{3})\b|^Caused by:'
multiline.negate: false
multiline.match: after

此配置解释如下:

  • 将以空格开头的所有行合并到上一行
  • 并把以Caused by开头的也追加到上一行

C风格的日志

一些编程语言在一行末尾使用反斜杠 \ 字符,表示该行仍在继续,如本例中所示:

1
2
printf ("%10.10ld  \t %10.10ld \t %s\
%f", w, x, y, z );

要将这些行整合到Filebeat中的单个事件中,请使用以下多行配置:

1
2
3
multiline.pattern: '\\$'
multiline.negate: false
multiline.match: before

此配置将以\字符结尾的任何行与后面的行合并。

时间戳

来自Elasticsearch等服务的活动日志通常以时间戳开始,然后是关于特定活动的信息,如下例所示:

1
2
[2015-08-24 11:49:14,389][INFO ][env                      ] [Letha] using [1] data paths, mounts [[/
(/dev/disk1)]], net usable_space [34.5gb], net total_space [118.9gb], types [hfs]

要将这些行整合到Filebeat中的单个事件中,请使用以下多行配置:

1
2
3
multiline.pattern: '^\[[0-9]{4}-[0-9]{2}-[0-9]{2}'
multiline.negate: true
multiline.match: after

此配置使用negate: true和match: after设置来指定任何不符合指定模式的行都属于上一行。

应用程序事件

有时您的应用程序日志包含以自定义标记开始和结束的事件,如以下示例:

1
2
3
[2015-08-24 11:49:14,389] Start new event
[2015-08-24 11:49:14,395] Content of processing something
[2015-08-24 11:49:14,399] End event

要在Filebeat中将其整合为单个事件,请使用以下多行配置:

1
2
3
4
multiline.pattern: 'Start new event'
multiline.negate: true
multiline.match: after
multiline.flush_pattern: 'End event'

此配置把指定字符串开头,指定字符串结尾的多行合并为一个事件。

1. 前言

首先明确一点,SpringBoot自带的ES模板,不建议使用,建议使用Rest Client。如果业务简单,且无特殊要求,可以使用SpringBoot的模板ElasticsearchRepository来搞定。这个非常简单,这里不作介绍

ElasticsearchRepository

  • 优点: 简单,SpringBoot无缝对接,配置简单
  • 缺点: 基于即将废弃的TransportClient, 不能支持复杂的业务

2. 使用 Spring 的 IOC 管理 ES 的连接客户端

步骤:

  1. 配置ES节点
  2. 配置Rest Client
  3. 配置Rest High Level Client
  4. 使用IOC注入

根据我从其他网站上查询的资料,Rest Client是长连接,而且内部有默认的线程池管理,因此一般无需自定义线程池管理连接。如果不对请指正。

基于以上结论。模仿 spring-boot-autoconfigure 先把连接点全部配置到配置文件中.

2.1. 配置 maven 依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- 省略 -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<elasticsearch.version>7.1.1</elasticsearch.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
</dependencies>

</project>

2.2. 编写配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.awifi.capacity.admin.statistic.elasticsearch;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

import java.util.List;

/**
* Configuration properties for Elasticsearch.
*
* @author fengxuechao
* @since 1.0.2
*/
@Data
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticsearchProperties {

private List<String> hostAndPortList;

private String username;

private String password;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package com.awifi.capacity.admin.statistic.elasticsearch;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;

/**
* ES 配置
*
* @author fengxuechao
* @since 1.1.0
*/
@Configuration
@ConditionalOnClass({Client.class, RestHighLevelClient.class})
@EnableConfigurationProperties(ElasticsearchProperties.class)
public class ElasticsearchConfiguration implements DisposableBean {

private static final Log logger = LogFactory.getLog(ElasticsearchConfiguration.class);

private Closeable closeable;

@Autowired
private ElasticsearchProperties properties;


/**
* 创建 Elasticsearch RestHighLevelClient
*
* @return
*/
@Bean("restHighLevelClient")
@ConditionalOnMissingBean
public RestHighLevelClient restHighLevelClient() {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
List<HttpHost> list = createHttpHost();
HttpHost[] array = list.toArray(new HttpHost[list.size()]);
RestClientBuilder builder = RestClient.builder(array);
//es账号密码设置
if (StringUtils.hasText(properties.getUsername())) {
String username = properties.getUsername();
String password = properties.getPassword();
UsernamePasswordCredentials usernamePasswordCredentials = new UsernamePasswordCredentials(username, password);
credentialsProvider.setCredentials(AuthScope.ANY, usernamePasswordCredentials);
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {

/**
* 这里可以设置一些参数,比如cookie存储、代理等等
* @param httpClientBuilder
* @return
*/
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.disableAuthCaching();
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
}
// RestHighLevelClient实例需要Rest low-level client builder构建
RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder);
closeable = restHighLevelClient;
return restHighLevelClient;
}

/**
* 读取配置文件es信息构建 HttpHost 列表
*
* @return
*/
private List<HttpHost> createHttpHost() {
List<String> hostAndPortList = properties.getHostAndPortList();
if (hostAndPortList.isEmpty()) {
throw new IllegalArgumentException("必须配置elasticsearch节点信息");
}
List<HttpHost> list = new ArrayList<>(hostAndPortList.size());
for (String s : hostAndPortList) {
String[] hostAndPortArray = s.split(":");
String hostname = hostAndPortArray[0];
int port = Integer.parseInt(hostAndPortArray[1]);
list.add(new HttpHost(hostname, port));
}
return list;
}


/**
* 当不再需要时,需要关闭高级客户端实例,以便它所使用的所有资源以及底层的http客户端实例及其线程得到正确释放。
* 通过close方法来完成,该方法将关闭内部的RestClient实例
*/
@Override
public void destroy() throws Exception {
if (this.closeable != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Closing Elasticsearch client");
}
try {
this.closeable.close();
} catch (NoSuchMethodError ex) {
// Earlier versions of Elasticsearch had a different method name
ReflectionUtils.invokeMethod(
ReflectionUtils.findMethod(Closeable.class, "close"),
this.closeable);
}
} catch (final Exception ex) {
if (logger.isErrorEnabled()) {
logger.error("Error closing Elasticsearch client: ", ex);
}
}
}
}
}

2.3. 在 application.properties 文件中配置 es 集群信息

application.properties

1
2
3
4
# ES 配置
elasticsearch.hostAndPortList[0]=192.168.200.19:9200
elasticsearch.username=
elasticsearch.password=

2.4. 单元测试

Redis 有序集合(sorted set)

Redis 有序集合和集合一样也是string类型元素的集合,且不允许重复的成员。

不同的是每个元素都会关联一个double类型的分数。redis正是通过分数来为集合中的成员进行从小到大的排序。

有序集合的成员是唯一的,但分数(score)却可以重复。

集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是O(1)。 集合中最大的成员数为 232 - 1 (4294967295, 每个集合可存储40多亿个成员)。