0%

Java并发学习记录之ThreadLocal

JDK 版本:1.8


我们需要关注的,自然是 ThreadLocal 的 set() 方法和 get() 方法。

ThreadLocal 的 set() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ThreadLocal<T> {

public void set(T value) {
// 获取当前线程对象
Thread t = Thread.currentThread();
// 获得当前线程的 ThreadLocalMap
ThreadLocalMap map = getMap(t);
// 将值放入 ThreadLocalMap 中
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

// ...
}

在 set 时,首先获得当前线程对象,然后通过 getMap() 拿到线程的 ThreadLocalMap,并将值放入
ThreadLocalMap 中。

那么这个 ThreadLocalMap 和 Thread 有什么关系?

可以理解为一个 Map (虽然不是,但可以简单地理解为 HashMap),但是它是定义在 Thread 内部的成员。

1
2
3
4
5
6
public class Thread implements Runnable {
/* 与此线程相关的 ThreadLocal 值。这个 Map 由 ThreadLocal 类维护。 */
ThreadLocal.ThreadLocalMap threadLocals = null;

// ...
}

而设置到 ThreadLocal 中的值,就是写入 threadLocals 这个 Map。其中 key 为 ThreadLocal 当前对象,
value 就是我们需要的值。而 threads 本身就保存了自己所在线程的所有 “局部变量”,也就是一个 ThreadLocal 变量的集合。

而 get() 方法也是先取得当前线程的 ThreadLocalMap 对象。然后,通过将自己作为 key 取得内部的实际数据。

同时也可以看到在 Thread 的 exit() 方法中, Thread 类会进行一些清理工作,注意下述代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Thread implements Runnable {
/**
* 系统调用此方法是为了让线程在实际退出之前有机会进行清理。
*/
private void exit() {
if (group != null) {
group.threadTerminated(this);
group = null;
}
/* 侵略性地清除所有引用字段:参见bug 4006245 */
target = null;
/* 加快这些资源的释放 */
threadLocals = null;
inheritableThreadLocals = null;
inheritedAccessControlContext = null;
blocker = null;
uncaughtExceptionHandler = null;
}
}

因此如果我们使用线程池,那就意味着线程未必会退出。如果这样,将一些对象设置到 ThreadLocal 中,
可能会使系统出现内存泄漏(JVM无法回收你不再使用的对象)的可能。

但又要及时回收对象,就可以使用 ThreadLocal.remove(),告诉JVM,回收对象,防止内存泄漏。

那么,ThreadLocalMap 是什么鬼?

首先,通过前面的 ThreadLocal 的核心的 set() 方法分析,ThreadLocal 的 set 方法是通过代理给它的内部类ThreadLocalMap实现的。
于是对于 ThreadLocal 的分析就转换为对内部类 ThreadLocalMap 的分析。

对 ThreadLocalMap 的 set() 方法和相关属性/方法的分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
/**
* ThreadLocalMap是一个定制的哈希映射,只适合维护线程本地值。
* 在ThreadLocal类之外不导出任何操作。类是包私有的,允许在类线程中声明字段。
* 为了帮助处理非常大且长期存在的使用,哈希表项对键使用弱引用。
* 但是,由于没有使用引用队列,因此只有在表开始耗尽空间时才保证删除过时的条目。
*/
static class ThreadLocalMap {

/**
* 该类继承了WeakReference是方便垃圾回收,在底层map扩容之前进行entry的回收,
* 减少扩容的概率,提高性能
*/
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}

/**
* 初始容量
*/
private static final int INITIAL_CAPACITY = 16;

/**
* 底层数组
*/
private Entry[] table;

/**
* map中entry的个数
*/
private int size = 0;

/**
* 阈值,超过这个阈值之后就需要进行扩容
*/
private int threshold; // Default to 0

/**
* 阈值是底层数组长度的2/3
*/
private void setThreshold(int len) {
threshold = len * 2 / 3;
}

/**
* 计算下一个索引,hash算法定位失败的时候(也就是该索引位置存在元素)
*/
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}

/**
* 上一个位置索引,hash算法定位失败的时候(也就是该索引位置存在元素)
*/
private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}

/**
* 根据 key 和 value 构建 ThreadLocalMap
*/
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
//初始化table
table = new Entry[INITIAL_CAPACITY];
//计算索引
// & (INITIAL_CAPACITY - 1) 这是取模的一种方式,对于2的幂作为模数取模,用此代替%(2^n)
// firstKey.threadLocalHashCode 其主要目的就是为了让哈希码能均匀的分布在2的n次方的数组里, 也就是Entry[] table中
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
//设置值
table[i] = new Entry(firstKey, firstValue);
size = 1;
//设置阈值
setThreshold(INITIAL_CAPACITY);
}

/**
* 根据父容器构造ThreadLocalMap
*
* @param parentMap the map associated with parent thread.
*/
private ThreadLocalMap(ThreadLocalMap parentMap) {
Entry[] parentTable = parentMap.table;
int len = parentTable.length;
setThreshold(len);
table = new Entry[len];
//复制遍历
for (int j = 0; j < len; j++) {
Entry e = parentTable[j];
if (e != null) {
@SuppressWarnings("unchecked")
ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
if (key != null) {
Object value = key.childValue(e.value);
Entry c = new Entry(key, value);
//该句相当于 hashcode % len但是&运算更加高效
int h = key.threadLocalHashCode & (len - 1);
//hash算法定位失败,找下一个索引
while (table[h] != null)
h = nextIndex(h, len);
table[h] = c;
size++;
}
}
}
}

/**
* ThreadLocalMap使用线性探测法来解决哈希冲突,线性探测法的地址增量di = 1, 2, … , m-1,
* 其中,i为探测次数。该方法一次探测下一个地址,直到有空的地址后插入,若整个空间都找不到空余的地址,则产生溢出。
* 假设当前table长度为16,也就是说如果计算出来key的hash值为14,如果table[14]上已经有值,并且其key与当前key不一致,
* 那么就发生了hash冲突,这个时候将14加1得到15,取table[15]进行判断,
* 这个时候如果还是冲突会回到0,取table[0],以此类推,直到可以插入。
*
*/
private void set(ThreadLocal<?> key, Object value) {

Entry[] tab = table;
int len = tab.length;
//计算索引。
int i = key.threadLocalHashCode & (len-1);

// 根据获取到的索引进行循环,如果当前索引上的table[i]不为空,在没有return的情况下,
// 就使用nextIndex()获取下一个(方法注释上提到到线性探测法)。
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
//table[i]上key不为空,并且和当前key相同,更新value
if (k == key) {
e.value = value;
return;
}
//table[i]上的key为空,说明被回收了(上面的弱引用中提到过)。
//这个时候说明改table[i]可以重新使用,用新的key-value将其替换,并删除其他无效的entry
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
//找到为空的插入位置,插入值,在为空的位置插入需要对size进行加1操作
tab[i] = new Entry(key, value);
int sz = ++size;
/**
* cleanSomeSlots用于清除那些e.get()==null,也就是table[index] != null && table[index].get()==null
* 之前提到过,这种数据key关联的对象已经被回收,所以这个Entry(table[index])可以被置null。
* 如果没有清除任何entry,并且当前使用量达到了负载因子所定义(长度的2/3),那么进行rehash()
*/
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}

/**
* 替换无效entry
*/
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;

/**
* 根据传入的无效entry的位置(staleSlot),向前扫描
* 一段连续的entry(这里的连续是指一段相邻的entry并且table[i] != null),
* 直到找到一个无效entry,或者扫描完也没找到
*/
int slotToExpunge = staleSlot;//之后用于清理的起点
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;

// 向后扫描一段连续的entry
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();

// 如果找到了key,将其与传入的无效entry替换,也就是与table[staleSlot]进行替换
if (k == key) {
e.value = value;

tab[i] = tab[staleSlot];
tab[staleSlot] = e;

//如果向前查找没有找到无效entry,则更新slotToExpunge为当前值i
if (slotToExpunge == staleSlot)
slotToExpunge = i;
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}

// 如果向前查找没有找到无效entry,并且当前向后扫描的entry无效,则更新slotToExpunge为当前值i
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}

// 如果没有找到key,也就是说key之前不存在table中,就直接最开始的无效entry——tab[staleSlot]上直接新增即可
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);

// slotToExpunge != staleSlot,说明存在其他的无效entry需要进行清理。
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}

/**
* 连续段清除
* 根据传入的staleSlot,清理对应的无效entry——table[staleSlot],
* 并且根据当前传入的staleSlot,向后扫描一段连续的entry(这里的连续是指一段相邻的entry并且table[i] != null),
* 对可能存在hash冲突的entry进行rehash,并且清理遇到的无效entry.
*
* @param staleSlot key为null,需要无效entry所在的table中的索引
* @return 返回下一个为空的solt的索引。
*/
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;

// 清理无效entry,置空
tab[staleSlot].value = null;
tab[staleSlot] = null;
//size减1,置空后table的被使用量减1
size--;

// Rehash until we encounter null
Entry e;
int i;
// 从staleSlot开始向后扫描一段连续的entry
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
//如果遇到key为null,表示无效entry,进行清理.
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
//如果key不为null,计算索引
int h = k.threadLocalHashCode & (len - 1);
/**
* 计算出来的索引——h,与其现在所在位置的索引——i不一致,置空当前的table[i]
* 从h开始向后线性探测到第一个空的slot,把当前的entry挪过去。
*/
if (h != i) {
tab[i] = null;

// Unlike Knuth 6.4 Algorithm R, we must scan until
// null because multiple entries could have been stale.
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
//下一个为空的solt的索引。
return i;
}

/**
* 启发式的扫描清除,扫描次数由传入的参数n决定
*
* @param i 从i向后开始扫描(不包括i,因为索引为i的Slot肯定为null)
*
* @param n 控制扫描次数,正常情况下为 log2(n) ,
* 如果找到了无效entry,会将n重置为table的长度len,进行段清除。
*
* map.set()点用的时候传入的是元素个数,replaceStaleEntry()调用的时候传入的是table的长度len
*
* @return true if any stale entries have been removed.
*/
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len);
Entry e = tab[i];
if (e != null && e.get() == null) {
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}

/**
* Re-pack and/or re-size the table. First scan the entire
* table removing stale entries. If this doesn't sufficiently
* shrink the size of the table, double the table size.
*/
private void rehash() {
//全清理
expungeStaleEntries();

// threshold = 2/3 * len,所以threshold - threshold / 4 = 1en/2
// 这里主要是因为上面做了一次全清理所以size减小,需要进行判断。
// 判断的时候把阈值调低了。
if (size >= threshold - threshold / 4)
resize();
}

/**
* 扩容,扩大为原来的2倍(这样保证了长度为2的冥)
*/
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;

for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
//虽然做过一次清理,但在扩容的时候可能会又存在key==null的情况。
if (k == null) {
e.value = null; // Help the GC
} else {
//同样适用线性探测来设置值。
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
//设置新的阈值
setThreshold(newLen);
size = count;
table = newTab;
}

/**
* 全清理,清理所有无效entry
*/
private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
for (int j = 0; j < len; j++) {
Entry e = tab[j];
if (e != null && e.get() == null)
//使用连续段清理
expungeStaleEntry(j);
}
}
}

从上面的分析,从 ThreadLocal 的 set() 着手分析再深入到 ThreadLocalMap 的 set() 方法。

同样的对于 ThreadLocalMap 中的 getEntry() 也从 ThreadLocal 的 get() 方法入手。

ThreadLocal 中的 get()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public T get() {
//同set方法类似获取对应线程中的ThreadLocalMap实例
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
//为空返回初始化值
return setInitialValue();
}
/**
* 初始化设值的方法,可以被子类覆盖。
*/
protected T initialValue() {
return null;
}

private T setInitialValue() {
//获取初始化值,默认为null(如果没有子类进行覆盖)
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
//不为空不用再初始化,直接调用set操作设值
if (map != null)
map.set(this, value);
else
//第一次初始化,createMap在上面介绍set()的时候有介绍过。
createMap(t, value);
return value;
}

ThreadLocalMap 中的 getEntry()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private ThreadLocal.ThreadLocalMap.Entry getEntry(ThreadLocal<?> key) {
//根据key计算索引,获取entry
int i = key.threadLocalHashCode & (table.length - 1);
ThreadLocal.ThreadLocalMap.Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}

/**
* 通过直接计算出来的key找不到对于的value的时候适用这个方法.
*/
private ThreadLocal.ThreadLocalMap.Entry getEntryAfterMiss(ThreadLocal<?> key, int i, ThreadLocal.ThreadLocalMap.Entry e) {
ThreadLocal.ThreadLocalMap.Entry[] tab = table;
int len = tab.length;

while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
return e;
if (k == null)
//清除无效的entry
expungeStaleEntry(i);
else
//基于线性探测法向后扫描
i = nextIndex(i, len);
e = tab[i];
}
return null;
}

ThreadLocalMap中的remove()

同样的 remove() ,就是找到对应的table[],调用 weakrefrence 的 clear()清除引用,
然后再调用 expungeStaleEntry() 进行清除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void remove(ThreadLocal<?> key) {
ThreadLocal.ThreadLocalMap.Entry[] tab = table;
int len = tab.length;
//计算索引
int i = key.threadLocalHashCode & (len-1);
//进行线性探测,查找正确的key
for (ThreadLocal.ThreadLocalMap.Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
//调用weakrefrence的clear()清除引用
e.clear();
//连续段清除
expungeStaleEntry(i);
return;
}
}
}

参考引用

哈希表——线性探测法、链地址法、查找成功、查找不成功的平均长度

Java并发学习记录之volatile

首先来个感性认识:

  • volatile 关键字仅能实现对原始变量(如boolen、short、int、long等)赋值操作的原子性,
    但是复合操作如 i++则不保证。

volatile 和 synchronized 的区别

volatile 关键字解决的是内存可见性的问题,会使得所有对volatile变量的读写都会直接刷到主存,
即保证了变量的可见性。
这样就能满足一些对变量可见性有要求而对读取顺序没有要求的需求。

synchronized 关键字解决的是执行控制的问题,它会阻止其它线程获取当前对象的监控锁,
这样就使得当前对象中被synchronized关键字保护的代码块无法被其它线程访问,也就无法并发执行。
更重要的是,synchronized 还会创建一个内存屏障,内存屏障指令保证了所有CPU操作结果都会直接刷到主存中,
从而保证了操作的内存可见性,同时也使得先获得这个锁的线程的所有操作,
都 happens-before 于随后获得这个锁的线程的操作。

区别点

  1. volatile 本质是在告诉 JVM 当前变量在寄存器(工作内存)中的值是不确定的,需要从主存中读取;
    synchronized 则是锁定当前变量,只有当前线程可以访问该变量,其他线程被阻塞住。

  2. volatile 仅能使用在变量级别;synchronized 则可以使用在变量、方法、和类级别的。

  3. volatile 仅能实现变量的修改可见性,不能保证原子性;
    synchronized 则可以保证变量的修改可见性和原子性。

  4. volatile 不会造成线程的阻塞;synchronized 可能会造成线程的阻塞。

  5. volatile 标记的变量不会被编译器优化;synchronized 标记的变量可以被编译器优化

  6. volatile 关键字用于解决变量在多个线程之间的可见性,而 synchronized 关键字解决的是多个线程之间访问资源的同步性。

volatile 的可见性

下面是加了 volatile 关键字 和不加 volatile 关键字的区别,作图演示:

在这里插入图片描述

区别在于,volatile 修饰的成员变量在每次被线程访问时,
都强迫从主存(共享内存)中重读该成员变量的值。
而且,当成员变量发生变化时,强迫线程将变化值回写到主内存(共享内存)。
这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值,这样也就保证了同步数据的可见性。

验证 volatile 的可见性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.littlefxc.examples.base.thread;

/**
* @author fengxuechao
* @date 2019/2/21
**/
public class VolatileSample extends Thread {

private int number;

static boolean ready = true;

@Override
public void run() {
while (ready) {
number++;
}
System.out.println(ready);
System.out.println(number);
}

public static void main(String[] args) throws InterruptedException {
Thread t1 = new VolatileSample();
t1.start();
Thread.sleep(1000);
ready = false;
}
}

上面这段代码,我没有为 ready 变量添加 volatile 关键字,程序运行后陷入死循环。

ready 变量添加关键字 volatile 修饰后再次运行程序,及时停止,没有死循环。

验证结果:

在这里插入图片描述

Java 并发学习记录之线程间通信

volatile 和 synchronized 关键字

Java并发学习记录之volatile

Java并发学习记录之synchronized

等待/通知机制

Java并发学习记录之wait-notify机制

管道输入/输出流

管道输入/输出流和普通文件的输入/输出流或者网络输入、输出流不同之处在于管道输入/输出流主要用于线程之间的数据传输,
而且传输的媒介为内存。

管道输入/输出流主要包括下列两类的实现:

  • 面向字节: PipedOutputStream、 PipedInputStream
  • 面向字符: PipedWriter、 PipedReader
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.littlefxc.examples.base.thread.pipe;

import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;

/**
* 管道输入/输出主要用于线程间的数据传输,传输的媒介是内存。具体实现:
* <br>面向字节:
* <ul>
* <li>PipedWriter</li>
* <li>PipedReader</li>
* </ul>
* <br>面向字符:
* <ul>
* <li>PipedOutputStream</li>
* <li>PipedInputStream</li>
* </ul>
*
* @author fengxuechao
* @date 2019/2/26
**/
public class Piped {

public static void main(String[] args) throws IOException {
PipedWriter writer = new PipedWriter();
PipedReader reader = new PipedReader();
// 将输出流和输入流进行必要的连接
writer.connect(reader);
Thread printThread = new Thread(new Print(reader), "PrintThread");
printThread.start();
int receive = 0;
try {
while ((receive = System.in.read()) != -1) {
writer.write(receive);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
writer.close();
}
}

static class Print implements Runnable {

private PipedReader reader;

public Print(PipedReader reader) {
this.reader = reader;
}

@Override
public void run() {
int receive = 0;
try {
while ((receive = reader.read()) != -1) {
System.out.print((char) receive);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

运行该示例,输入一组字符串,可以看到 printThread 进行了原样输出。

在这里插入图片描述

Tips:
对于 Piped 类型的流,必须要进行绑定,也就是调用 connect(),否则会抛异常。

Thread.join() 的使用

在很多情况下,主线程生成并起动了子线程,如果子线程里要进行大量的耗时的运算,
主线程往往将于子线程之前结束,但是如果主线程处理完其他的事务后,需要用到子线程的处理结果,
也就是主线程需要等待子线程执行完成之后再结束,这个时候就要用到join()方法了。另外,
一个线程需要等待另一个线程也需要用到join()方法。

Thread类除了提供 join() 方法之外,还提供了 join(long millis)join(long millis, int nanos)
两个具有超时特性的方法。这两个超时方法表示,如果线程thread在指定的超时时间没有终止,
那么将会从该超时方法中返回。

重点看一下 join(long millis) 的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.littlefxc.examples.base.thread;

/**
* @author fengxuechao
*/
public class JoinLongTest {

public static void main(String[] args) {
try {
MyThread threadTest = new MyThread();
threadTest.start();

threadTest.join(1000);// 主线成等待子线程1秒
// Thread.sleep(1000);
System.out.println("主线程结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}

static public class MyThread extends Thread {

@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println("子线程结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}
}

运行结果:

在这里插入图片描述

ThreadLocal

之前我们讲保证线程资源安全问题时,使用同步加锁的方式保证线程安全。还有一种办法就是隔离资源的做法。

所谓的隔离,即每个线程使用自己的局部资源。将资源隔离不让其它线程访问,
从被隔离资源的角度来说,能够访问它的只有当前线程。既然只有当前线程可以访问的数据,自然是线程安全的。
一个典型的例子就是 Servlet

ThreadLocal 的简单使用

首先来一个不使用 ThreadLocal 的类,然后在逐渐对其改造。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.littlefxc.examples.base.thread.threadlocal;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author fengxuechao
* @date 2019/2/26
**/
public class ThreadLocal1 {

private static final SimpleDateFormat SDF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
es.execute(new ParseDate(i));
}
es.shutdown();
}

private static class ParseDate implements Runnable {

int i = 0;

public ParseDate(int i) {
this.i = i;
}

@Override
public void run() {
try {
ParseDateWithSync
Date date = SDF.parse("2019-02-26 16:23:" + i % 60);
System.out.println(i + ":" + date);
// }
} catch (ParseException e) {
e.printStackTrace();
}
}
}
}

运行结果:

在这里插入图片描述

一般这种问题主要是因为 SimpleDateFormat 在多线程环境下,是线程不安全的,所以如果你在多线程环境中共享了SimpleDateFormat的实例,
比如你在类似日期类中定义了一个全局的 SimpleDateFormat 对象,这样子肯定会出现上述的报错

一种解决办法就是加锁,在上面代码中可以将注释去掉后再次运行,也就不会出现这个问题了。

但是我现在要使用保存线程局部变量的ThreadLocal对象来保存每一个线程的SimpleDateFormat对象,针对上述代码做出改变:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.littlefxc.examples.base.thread.threadlocal;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author fengxuechao
* @date 2019/2/26
**/
public class ParseDateWithThreadLocal {

static final String pattern = "yyyy-MM-dd HH:mm:ss";
private static final ThreadLocal<SimpleDateFormat> threadLocal = new ThreadLocal<SimpleDateFormat>();

public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
es.execute(new ParseDate(i));
}
es.shutdown();
}

private static class ParseDate implements Runnable {

int i = 0;

public ParseDate(int i) {
this.i = i;
}

@Override
public void run() {
try {
// 如果当前线程不持有 SimpleDateFormat 对象。那就新建并保存设置在当前线程中,如果已持有,则直接使用。
if (threadLocal.get()==null) {
threadLocal.set(new SimpleDateFormat(pattern));
}
Date date = threadLocal.get().parse("2019-02-26 16:23:" + i % 60);
System.out.println(i + ":" + date);
} catch (ParseException e) {
e.printStackTrace();
}
}
}
}

注意:从上面代码中也可以看出,为每一个线程分配一个对象的工作并不是由 ThreadLocal 来完成的,
而是需要开发人员在应用层面保证的。ThreadLocal 只是起到了一个容器的作用。

ThreadLocal的原理

ThreadLocal的原理

关键词 synchronized 的多种用法

  • 指定加锁对象:对给定对象加锁,进入同步代码前要获得给定对象的锁
  • 直接作用于实例方法:相当于对当前实例加锁,进入同步代码前要获得当前实例的锁
  • 直接作用于静态方法:相当于对当前类加锁,进入同步代码前要获得当前类的锁
阅读全文 »

分布式事务处理的XA规范(由 AP、RM、TM 组成):

  • 其中应用程序(Application Program ,简称 AP):AP 定义事务边界(定义事务开始和结束)并访问事务边界内的资源。
  • 资源管理器(Resource Manager,简称 RM):RM 管理计算机共享的资源,许多软件都可以去访问这些资源,资源包含比如数据库、文件系统、打印机服务器等。
  • 事务管理器(Transaction Manager ,简称TM):负责管理全局事务,分配事务唯一标识,监控事务的执行进度,并负责事务的提交、回滚、失败恢复等。
阅读全文 »

Logback-DBAppender

DBAppender以独立于Java编程语言的格式将日志事件插入到三个数据库表中。
这三个表是 logging_eventlogging_event_propertylogging_event_exception
在使用DBAppender之前,它们必须存在。Logback附带了创建表的SQL脚本。
它们可以在 logback-classic/src/main/java/ch/qos/logback/classic/db/script 文件夹下找到。
对于每个最流行的数据库系统,都有一个特定的脚本。如果您的特定类型的数据库系统缺少脚本,那么应该很容易以现有脚本为例编写一个脚本。
如果JDBC驱动程序支持JDBC 3.0规范中引入的getGeneratedKeys方法,假设您已经创建了如上所述的适当的数据库表,那么就不需要额外的步骤。
否则,必须有适合您的数据库系统的sql方言。目前,logback有H2、HSQL、MS SQL Server、MySQL、Oracle、PostgreSQL、SQLLite和Sybase等多种方言。

阅读全文 »

Log4j2-JDBCAppender

JDBCAppender使用标准JDBC将日志事件写入关系数据库表。可以将其配置为使用JNDI数据源或自定义工厂方法获取JDBC连接。
无论采用哪种方法,都必须有连接池作为支持。否则,日志记录性能将受到很大影响。

maven 依赖

通过使用 platform-bom 来管理依赖的版本问题。使用 druid 作为 JDBCAppender 的连接池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.spring.platform</groupId>
<artifactId>platform-bom</artifactId>
<version>Cairo-RELEASE</version>
<scope>import</scope>
<type>pom</type>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
</dependencies>

日志配置文件:log4j2.xml

<JDBC></JDBC> 是 Log4j2 的 JDBCAppender 的标签。

ConnectionFactory 是一个实现 Druid 连接池的单例类, 通过 method 属性来获取 java.sql.Connection

Column 表示数据表 log4j2 的字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?xml version="1.0" encoding="UTF-8" ?>
<Configuration status="INFO">
<Appenders>
<Console name="console" target="SYSTEM_OUT">
<PatternLayout pattern="[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n"/>
</Console>
<!-- JDBCAppender -->
<JDBC name="jdbc" tableName="log4j2">
<ConnectionFactory class="com.littlefxc.examples.log4j2.ConnectionFactory" method="getDatabaseConnection"/>
<!-- 数据表 log4j2 中的字段 -->
<Column name="time" pattern="%d{yyyy-MM-dd HH:mm:ss.SSS}" />
<Column name="level" pattern="%level"/>
<Column name="logger" pattern="%logger"/>
<Column name="message" pattern="%message"/>
<Column name="exception" pattern="%ex{full}"/>
</JDBC>
</Appenders>
<Loggers>
<Logger name="com.littlefxc.examples.log4j2" level="debug" additivity="false">
<AppenderRef ref="jdbc"/>
</Logger>
<Root level="debug" additivity="false">
<AppenderRef ref="console"/>
</Root>
</Loggers>
</Configuration>

数据库文件:schema.sql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

DROP TABLE IF EXISTS log4j2;
CREATE TABLE `log4j2` (
`id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT,
`time` char(23) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`level` char(5) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`logger` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`message` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`exception` varchar(1000) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = MyISAM AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

ConnectionFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.littlefxc.examples.log4j2;

import com.alibaba.druid.pool.DruidDataSource;

import javax.sql.DataSource;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;

/**
* Log4j2 ConnectionFactory
*
* @author fengxuechao
*/
public class ConnectionFactory {

private final DataSource dataSource;

private ConnectionFactory() {
Properties properties = new Properties();
String lineSeparator = File.separator;
String fileName = String.join(lineSeparator,
System.getProperty("user.dir"), "Log4j2-JDBCAppender", "src", "main", "resources", "db.properties");
try (InputStream stream = new FileInputStream(fileName)) {
properties.load(stream);
} catch (IOException e) {
e.printStackTrace();
}
this.dataSource = new DruidDataSource();
((DruidDataSource) this.dataSource).configFromPropety(properties);
}

public static Connection getDatabaseConnection() throws SQLException {
return Singleton.INSTANCE.dataSource.getConnection();
}

private static interface Singleton {
final ConnectionFactory INSTANCE = new ConnectionFactory();
}
}

ConnectionFactory 是一个实现 Druid 连接池的单例类。

db.properties

1
2
3
4
5
6
druid.url=jdbc:mysql://192.168.120.63:3306/learn?useSSL=false
druid.username=root
druid.password=123456
druid.driverClassName=com.mysql.jdbc.Driver
druid.maxActive=10
druid.minIdle=5

启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.littlefxc.examples.log4j2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author fengxuechao
* @date 2019/2/11
**/
public class App {

private static final Logger log = LoggerFactory.getLogger(App.class);

public static void main(String[] args) {
log.debug("This is debug");
log.info("This is info");
log.warn("This is warn");
log.error("This is error");
log.error("This is error", new RuntimeException("this is a exception"));
}
}

Spring SpEl 表达式语言 - 学习记录

SpEL是类似于OGNL和JSF EL的表达式语言,能够在运行时构建复杂表达式,存取对象属性、对象方法调用等。
所有的SpEL都支持XML和Annotation两种方式,格式:#{ expressoin }

Maven 依赖

因为 spring-context 依赖中包含 spring-expression,所以选择它。

1
2
3
4
5
6
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
</dependencies>

初始化

在后面的一系列的测试中,都有用到在 @Before 中的元素。我仅仅只是将一份这份单元测试类分解了而已。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public class SpelTest {
ExpressionParser parser;

// 测试 XML 配置 SpEl
User user;

BeanFactory beanFactory;

// 测试注解配置 SpEl
FieldValueTestBean fieldValueTestBean;

MovieRecommender movieRecommender;

PropertyValueTestBean propertyValueTestBean;

StandardEvaluationContext context;

// 发明家列表
List<Inventor> inventorList = new ArrayList<>();

// 发明家特斯拉
Inventor inventorTesla;

// 发明家爱迪生
Inventor inventorEdlson;

// 发明家贝尔
Inventor inventorBell;

// 发明家数组
Inventor[] inventorArrays;

Society society;


/**
* 初始化Spring
*/
@Before
public void setUp() {
beanFactory = new ClassPathXmlApplicationContext("classpath:spring-root.xml");
{
// 允许 SpEl 表达式访问 IOC 容器中的 bean
// SpEL支持使用 "@" 符号来引用Bean, 在引用Bean时需要使用BeanResolver接口实现来查找Bean, Spring提供BeanFactoryResolver实现
context = new StandardEvaluationContext();
context.setBeanResolver(new BeanFactoryResolver(beanFactory));
}

// SpEl 解析器
parser = (ExpressionParser) beanFactory.getBean("parser");

// XML 配置 SpEl
user = (User) beanFactory.getBean("user");

// 注解配置 SpEl
fieldValueTestBean = (FieldValueTestBean) beanFactory.getBean("fieldValueTestBean");
movieRecommender = (MovieRecommender) beanFactory.getBean("movieRecommender");
propertyValueTestBean = (PropertyValueTestBean) beanFactory.getBean("propertyValueTestBean");

{
// 设置 SpEl 的 根对象
inventorTesla = new Inventor("尼古拉·特斯拉", "塞尔维亚裔美籍");
inventorTesla.setPlaceOfBirth(new PlaceOfBirth("利卡-塞尼县", "克罗地亚"));

inventorEdlson = new Inventor("托马斯·阿尔瓦·爱迪生", "美国");
inventorEdlson.setPlaceOfBirth(new PlaceOfBirth("米兰", "美国俄亥俄州"));

inventorBell = new Inventor("亚历山大·格拉汉姆·贝尔", "美国");
inventorBell.setPlaceOfBirth(new PlaceOfBirth("爱丁堡", "英国苏格兰"));

inventorList.add(inventorTesla);
inventorList.add(inventorEdlson);
inventorList.add(inventorBell);

// 数组和列表的内容通过使用方括号表示法获得
Inventor[] inventorArrays = new Inventor[3];
inventorArrays = inventorList.toArray(inventorArrays);

society = new Society();
society.getOfficers().put(Society.President, inventorEdlson);
society.getOfficers().put(Society.Advisors, inventorList);
}
}
}

1. 入门

支持执行字符串表达式

1
2
3
4
5
6
7
@Test
public void test1() {
ExpressionParser parser = new SpelExpressionParser();
Expression exp = parser.parseExpression("'Hello World'");
String message = (String) exp.getValue();
Assert.assertEquals("Hello World", message);
}

SpEL支持广泛的功能,例如调用方法,访问属性和调用构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
public void test2() {
ExpressionParser parser = new SpelExpressionParser();
// 调用方法
Expression exp = parser.parseExpression("'Hello World'.concat('!')");
String message = (String) exp.getValue();
Assert.assertEquals("Hello World!", message);

// 调用属性
exp = parser.parseExpression("'Hello World'.bytes");
byte[] bytesE = "Hello World".getBytes();
byte[] bytesA = (byte[]) exp.getValue();
Assert.assertArrayEquals(bytesE, bytesA);

// SpEL还通过使用标准点表示法(例如prop1.prop2.prop3)和属性值的设置来支持嵌套属性。也可以访问公共字段。
exp = parser.parseExpression("'Hello World'.bytes.length");
int lengthE = "Hello World".getBytes().length;
int lengthA = (int) exp.getValue();
Assert.assertEquals(lengthE, lengthA);

// 调用构造方法
exp = parser.parseExpression("new String('hello world').toUpperCase()");
String constructorA = exp.getValue(String.class);
Assert.assertEquals("HELLO WORLD", constructorA);
}

SpEL的更常见用法是提供针对特定对象实例(称为根对象)计算的表达式字符串

1
2
3
4
5
6
7
8
@Test
public void test3() {
ExpressionParser parser = new SpelExpressionParser();

Expression exp = parser.parseExpression("username == 'fxc'");
Boolean expValue = exp.getValue(user, Boolean.class);
Assert.assertEquals(true, expValue);
}

Spring Beans 中的 SpEl 表达式, Bean 的引用

Spring SpEL 可以在 XML 配置文件中使用,同样可以在注解配置中使用例如 @Value("#{expression}")

Spring 允许 SpEl 表达式访问 IOC 容器中的 bean,SpEL支持使用 @ 符号来引用Bean, 在引用 Bean 时需要使用 BeanResolver 接口实现来查找Bean, Spring提供 BeanFactoryResolver 实现

ClassPathXmlApplicationContext 实现默认会把 System.getProperties() 注册名为systemProperties 的 Bean,因此使用 @systemProperties 来引用该 Bean。

在 Spring 配置文件中使用 SpEl 表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

<context:property-placeholder location="classpath:application.properties"/>

<context:component-scan base-package="com.littlefxc.examples.spel"/>

<bean id="parser" class="org.springframework.expression.spel.standard.SpelExpressionParser"/>

<!-- 同样也适用于 Java Config, 例如 @Value("#{systemProperties['user.country']}") -->
<bean id="user" class="com.littlefxc.examples.spel.User">
<property name="username" value="fxc"/>
<!--<property name="uuid" value="#{T(java.util.UUID).randomUUID().toString()}"/>-->
<property name="uuid" value="${user.uuid}"/>
<property name="place" value="#{systemProperties['user.country']}"/>
<property name="age" value="${user.age}"/>
</bean>

</beans>

application.properties

1
2
user.uuid=#{T(java.util.UUID).randomUUID().toString()}
user.age=18

在注解中用法1:在成员属性上使用 SpEL 表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.littlefxc.examples.spel;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
* @author fengxuechao
* @date 2019/1/24
**/
@Component
public class FieldValueTestBean {

@Value("#{ systemProperties['user.country'] }")
private String defaultLocale;

// 省略 setter getter

}

在注解中用法2:在构造方法上使用 SpEL 表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class MovieRecommender {

private String movieName;
private String defaultLocale;

@Autowired
public MovieRecommender(@Value("#{ systemProperties['user.country'] }") String defaultLocale) {
this.defaultLocale = defaultLocale;
}

// 省略 setter getter
}

在注解中用法3:在属性设置方法上使用 SpEL 表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.littlefxc.examples.spel;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class PropertyValueTestBean {

private String defaultLocale;

public String getDefaultLocale() {
return this.defaultLocale;
}

@Value("#{ systemProperties['user.country'] }")
public void setDefaultLocale(String defaultLocale) {
this.defaultLocale = defaultLocale;
}

}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
public void test4() {
// 允许 SpEl 表达式访问 IOC 容器中的 bean
// SpEL支持使用 "@" 符号来引用Bean, 在引用Bean时需要使用BeanResolver接口实现来查找Bean, Spring提供BeanFactoryResolver实现
// ClassPathXmlApplicationContext 实现默认会把"System.getProperties()"注册为"systemProperties"Bean,因此我们使用 "@systemProperties"来引用该Bean
Properties result1 = parser.parseExpression("@systemProperties").getValue(context, Properties.class);
System.out.println(result1.getProperty("user.dir", "环境变量中没有该属性"));

// XML 配置
User userActual = parser.parseExpression("@user").getValue(context, User.class);
Assert.assertSame(user, userActual);

// 注解配置
Assert.assertSame(movieRecommender, parser.parseExpression("@movieRecommender").getValue(context, MovieRecommender.class));
Assert.assertSame(propertyValueTestBean, parser.parseExpression("@propertyValueTestBean").getValue(context, PropertyValueTestBean.class));
}

2. 字面量表达式

SpEL支持的字面量包括:字符串、数字类型(int、long、float、double)、布尔类型、null类型。

类型 示例
字符串 String str1 = parser.parseExpression(“‘Hello World!’”).getValue(String.class);
String str2 = parser.parseExpression(“\“Hello World!\“”).getValue(String.class);
数字类型 int int1 = parser.parseExpression(“1”).getValue(Integer.class);
long long1 = parser.parseExpression(“-1L”).getValue(long.class);
float float1 = parser.parseExpression(“1.1”).getValue(Float.class);
double double1 = parser.parseExpression(“1.1E+2”).getValue(double.class);
int hex1 = parser.parseExpression(“0xa”).getValue(Integer.class);
long hex2 = parser.parseExpression(“0xaL”).getValue(long.class);
布尔类型 boolean true1 = parser.parseExpression(“true”).getValue(boolean.class);
boolean false1 = parser.parseExpression(“false”).getValue(boolean.class);
null类型 Object null1 = parser.parseExpression(“null”).getValue(Object.class);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
public void test5() {
ExpressionParser parser = new SpelExpressionParser();

// evals to "Hello World"
String helloWorld = (String) parser.parseExpression("'Hello World'").getValue();
Assert.assertEquals("Hello World", helloWorld);

// 浮点型字面量
double avogadrosNumber = (Double) parser.parseExpression("6.0221415E+23").getValue();
Assert.assertEquals(Double.valueOf("6.0221415E+23"), avogadrosNumber, Double.MIN_VALUE);

// evals to 2147483647
int maxValue = (Integer) parser.parseExpression("0x7FFFFFFF").getValue();
Assert.assertEquals(2147483647, maxValue);

boolean trueValue = (Boolean) parser.parseExpression("true").getValue();
Assert.assertTrue(trueValue);

Object nullValue = parser.parseExpression("null").getValue();
Assert.assertNull(nullValue);
}

3. 集合的表达式(Properties, Arrays, Lists, Maps, and Indexers)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 集合的表达式
*
* @see java.util.Properties
* @see java.util.Arrays
* @see java.util.List
* @see java.util.Map
* Index 索引
*/
@Test
public void test6() {
// 属性名称的第一个字母不区分大小写
// 数组和列表的内容通过使用方括号表示法获得
int year = (Integer) parser.parseExpression("Birthdate.Year = 1856").getValue(context);
String city = (String) parser.parseExpression("placeOfBirth.City").getValue(context);
Assert.assertEquals(1856, year);
Assert.assertEquals("利卡-塞尼县", city);

// 列表
context.setRootObject(inventorList);
String birthCity = parser
.parseExpression("[0].placeOfBirth.City")
.getValue(context, String.class);
Assert.assertEquals("利卡-塞尼县", birthCity);

// 字典
context.setRootObject(society);
// 改变字典的值的某个属性
parser.parseExpression("Officers['president'].placeOfBirth.City")
.setValue(context, "随便");
// 获取字典的值的某个属性
birthCity = (String) parser
.parseExpression("officers['president'].placeOfBirth.City")
.getValue(context);
Assert.assertEquals("随便", birthCity);
}

4. 内联列表( Inline Lists )

可以使用{}符号直接在表达式中表示列表.{}本身表示一个空列表。出于性能原因,如果列表本身完全由固定的文字组成,那么将创建一个常量列表来表示表达式(而不是在每个表达式上构建一个新的列表)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 内联列表(Inline Lists)
*/
@Test(expected = UnsupportedOperationException.class)
public void test7_1() {
// 将返回不可修改的空List
List list = parser.parseExpression("{}").getValue(List.class);

// 对于字面量列表也将返回不可修改的List
List list2 = parser.parseExpression("{1,2,3,4}").getValue(List.class);
// 不会进行不可修改处理
list2.set(0, 2);
}

/**
* 内联列表(Inline Lists)
*/
@Test
public void test7_2() {
// 对于列表中只要有一个不是字面量表达式,将只返回原始List
List<List<Integer>> list = parser.parseExpression("{{1+2, 2+4},{3, 4+4}}").getValue(List.class);
// 操作的不是原始列表
list.get(0).set(0, 1);
Integer value = parser.parseExpression("{{1+2, 2+4},{3, 4+4}}[0][0]").getValue(Integer.class);
Assert.assertEquals(1, list.get(0).get(0).intValue());
Assert.assertEquals(3, value.intValue());
}

5. 内联字典( Inline Maps )

您可以使用熟悉的Java语法来构建数组,也可以提供一个初始化器来在构建时填充数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 内联字典(Inline Maps)
*/
@Test
public void test8() {
// evaluates to a Java map containing the two entries
Map inventorInfo = (Map) parser.parseExpression("{name:'Nikola',dob:'10-July-1856'}").getValue(context);
Assert.assertEquals("Nikola", inventorInfo.get("name"));

// 字典中的字典
Map mapOfMaps = (Map) parser
.parseExpression("{name:{first:'Nikola',last:'Tesla'},dob:{day:10,month:'July',year:1856}}")
.getValue(context);
Map tesla = (Map) mapOfMaps.get("name");
Assert.assertEquals("Tesla", tesla.get("last"));

// 空字典 {:}
Map emptyMap = (Map) parser.parseExpression("{:}").getValue(context);
Assert.assertEquals(0, emptyMap.size());
}

{:}本身表示一个空映射。出于性能原因,如果映射本身由固定的文字或其他嵌套的常量结构(列表或映射)组成,则创建常量映射来表示表达式(而不是在每个表达式上构建新的映射)。

6. 数组构造(Array Construction)

您可以使用熟悉的Java语法来构建数组,也可以提供一个初始化器来在构建时填充数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 数组创建(Array Construction):相似的 Java 数组语法
*/
@Test
public void test9() {
int[] numbers1 = (int[]) parser.parseExpression("new int[4]").getValue(context);
Assert.assertArrayEquals(new int[4], numbers1);

// Array with initializer
int[] numbers2 = (int[]) parser.parseExpression("new int[]{1,2,3}").getValue(context);
Assert.assertArrayEquals(new int[]{1, 2, 3}, numbers2);

// Multi dimensional array
int[][] numbers3 = (int[][]) parser.parseExpression("new int[4][5]").getValue(context);
Assert.assertArrayEquals(new int[4][5], numbers3);
}

在构造多维数组时,无法初始化,也就是 "new int[4][5]" 后面无法添加{}

8. 方法调用

您可以使用典型的Java编程语法来调用方法。您还可以对文字调用方法。还支持变量参数。下面的示例展示了如何调用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 方法调用
*/
@Test
public void test10() {
// string literal, evaluates to "bc"
String bc = parser.parseExpression("'abc'.substring(1, 3)").getValue(String.class);
Assert.assertEquals("bc", bc);

society.getMembers().add(inventorTesla);
context.setRootObject(society);
// evaluates to true
boolean isMember = parser.parseExpression("isMember('尼古拉·特斯拉')")
.getValue(context, Boolean.class);
Assert.assertTrue(isMember);
}

9. 运算符

9.1. 关系运算符

关系运算符(等于、不等于、小于、小于或等于、大于、大于或等于)通过使用标准运算符表示法得到支持。下面的清单显示了一些操作符示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 运算符:关系运算符
*/
@Test
public void test11() {
// 关系运算符(等于、不等于、小于、小于或等于、大于、大于或等于、instanceof、正则表达式)
// lt (<) gt (>) le (<=) ge (>=) eq (==) ne (!=) div (/) mod (%) not (!) 可以在XML文档中替换相应的符号
// evaluates to true
boolean flag = parser.parseExpression("2 == 2").getValue(Boolean.class);
Assert.assertTrue(flag);

// evaluates to false
flag = parser.parseExpression("2 < -5.0").getValue(Boolean.class);
Assert.assertFalse(flag);

// evaluates to true
flag = parser.parseExpression("'black' < 'block'").getValue(Boolean.class);
Assert.assertTrue(flag);

// evaluates to false
flag = parser.parseExpression(
"'xyz' instanceof T(Integer)").getValue(Boolean.class);
Assert.assertFalse(flag);

// evaluates to true
flag = parser.parseExpression(
"'5.00' matches '^-?\\d+(\\.\\d{2})?$'").getValue(Boolean.class);
Assert.assertTrue(flag);

//evaluates to false
flag = parser.parseExpression(
"'5.0067' matches '^-?\\d+(\\.\\d{2})?$'").getValue(Boolean.class);
Assert.assertFalse(flag);
}

每个符号运算符也可以指定为纯字母等价的。这避免了使用的符号对嵌入表达式的文档类型具有特殊意义的问题(例如在XML文档中).

  • lt (<)
  • gt (>)
  • le (<=)
  • ge (>=)
  • eq (==)
  • ne (!=)
  • div (/)
  • mod (%)
  • not (!)

9.2. 逻辑运算符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 运算符:逻辑运算符
*/
@Test
public void test12() {
society.getMembers().add(inventorTesla);
society.getMembers().add(inventorEdlson);
context.setRootObject(society);

// -- AND --

// evaluates to false
boolean flagAnd = parser.parseExpression("true and false").getValue(Boolean.class);
Assert.assertFalse(flagAnd);

// evaluates to true
String expression = "isMember('尼古拉·特斯拉') and isMember('托马斯·阿尔瓦·爱迪生')";
flagAnd = parser.parseExpression(expression).getValue(context, Boolean.class);
Assert.assertTrue(flagAnd);

// -- OR --

// evaluates to true
boolean flagOr = parser.parseExpression("true or false").getValue(Boolean.class);
Assert.assertTrue(flagOr);

// evaluates to true
expression = "isMember('尼古拉·特斯拉') or isMember('托马斯·阿尔瓦·爱迪生')";
flagOr = parser.parseExpression(expression).getValue(context, Boolean.class);
Assert.assertTrue(flagOr);

// -- NOT --

// evaluates to false
boolean flagNot = parser.parseExpression("!true").getValue(Boolean.class);
Assert.assertFalse(flagNot);

// -- AND and NOT --
expression = "isMember('尼古拉·特斯拉') and !isMember('托马斯·阿尔瓦·爱迪生')";
boolean flagAndNot = parser.parseExpression(expression).getValue(context, Boolean.class);
Assert.assertFalse(flagAndNot);
}

9.3. 赋值运算符

若要设置属性,请使用赋值运算符(=)。这通常在对setValue的调用中完成,但也可以在对getValue的调用中完成。下面的代码展示了使用赋值运算符的两种方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 运算符:赋值运算符
*/
@Test
public void test14() {
Inventor inventor = new Inventor();

parser.parseExpression("Name").setValue(context, inventor, "Alexander Graham");

// alternatively
String bell = parser.parseExpression(
"Name = 'Alexander Graham Bell'").getValue(context, inventor, String.class);

Assert.assertEquals("Alexander Graham Bell", bell);
}

10. 构造函数

您可以通过使用新的操作符来调用构造函数。除了基本类型(int、float等)和字符串之外,应该对所有类型使用全限定类名。下面的例子展示了如何使用新的操作符来调用构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 类类型表达式:使用"T(Type)"来表示java.lang.Class实例,"Type"必须是类全限定名,"java.lang"包除外,即该包下的类可以不指定包名;
* 使用类类型表达式还可以进行访问类静态方法及类静态字段。
*/
@Test
public void test15() {
Inventor einstein = parser.parseExpression(
"new com.littlefxc.examples.spel.Inventor('爱因斯坦', '德国')")
.getValue(Inventor.class);
Assert.assertEquals(new Inventor("爱因斯坦", "德国").getName(), einstein.getName());

context.setRootObject(society);
//create new inventor instance within add method of List
parser.parseExpression("Members.add(new com.littlefxc.examples.spel.Inventor('爱因斯坦', '德国'))").getValue(context);
}

11. 变量定义及引用

可以使用#variableName语法引用表达式中的变量。变量是通过在EvaluationContext实现上使用setVariable方法设置的。下面的例子展示了如何使用变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 变量定义及引用
* <p>
* 变量定义通过 EvaluationContext 接口的 setVariable(variableName, value) 方法定义;<br>
* 在表达式中使用 "#variableName" 引用;<br>
* 除了引用自定义变量,SpEL还允许引用根对象及当前上下文对象,使用 "#root" 引用根对象,使用 "#this" 引用当前上下文对象;<br>
* </p>
*/
@Test
public void test16() {
Inventor tesla = new Inventor("Nikola Tesla", "Serbian");

EvaluationContext context = SimpleEvaluationContext.forReadWriteDataBinding().build();
context.setVariable("newName", "Mike Tesla");

parser.parseExpression("Name = #newName").getValue(context, tesla);// "Mike Tesla"
// parser.parseExpression("#root.Name = #newName").getValue(context, tesla);// "Mike Tesla"
Assert.assertEquals("Mike Tesla", tesla.getName());

// create an array of integers
List<Integer> primes = new ArrayList<>();
primes.addAll(Arrays.asList(2, 3, 5, 7, 11, 13, 17));

// create parser and set variable 'primes' as the array of integers
context.setVariable("primes", primes);

// 通过使用诸如 (using selection ?{...}) 这样的选择表达式,选择列表中所有大于10的数字
// evaluates to [11, 13, 17]
List<Integer> primesGreaterThanTen = (List<Integer>) parser
.parseExpression("#primes.?[#this>10]").getValue(context);

System.out.println(primesGreaterThanTen);
}

12. 自定义函数

您可以通过注册可在表达式字符串中调用的用户定义函数来扩展SpEL。 该功能通过EvaluationContext注册。 以下示例显示如何注册用户定义的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 自定义函数
* <p>
* 目前只支持类静态方法注册为自定义函数;<br>
* SpEL使用StandardEvaluationContext的registerFunction方法进行注册自定义函数,
* 其实完全可以使用setVariable代替,两者其实本质是一样的;
* </p>
*/
@Test
public void test17() throws NoSuchMethodException {
Method parseInt = Integer.class.getDeclaredMethod("parseInt", String.class);
// 自定义函数推荐用 context.registerFunction("fnName", fn)
context.registerFunction("parseInt", parseInt);
context.setVariable("parseInt2", parseInt);
Boolean bool = parser.parseExpression("#parseInt('3') == #parseInt2('3')").getValue(context, Boolean.class);
Assert.assertTrue(bool);
}

13. 三目运算及Elivis运算表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 三目运算及Elivis运算表达式
* <br>
* 三目运算符 "表达式1?表达式2:表达式3"用于构造三目运算表达式,如"2>1?true:false"将返回true;
* <br>
* Elivis运算符 "表达式1?:表达式2" 从Groovy语言引入用于简化三目运算符的,
* 当 表达式1 为 非null 时则返回 表达式1,
* 当 表达式1 为 null 时则返回 表达式2,
* 简化了三目运算符方式 "表达式1?表达式1:表达式2",如 "null?:false" 将返回 false,而 "true?:false" 将返回true;
*/
@Test
public void test18() {
int int1 = parser.parseExpression("true and false ? 1 : 0").getValue(Integer.class);
Assert.assertEquals(0, int1);

// 如果是 null, 返回 false
Boolean bool1 = parser.parseExpression("null ?: false").getValue(Boolean.class);
Assert.assertFalse(bool1);

// true 不是 null, 返回 true
Boolean bool2 = parser.parseExpression("true ?: false").getValue(Boolean.class);
Assert.assertTrue(bool2);

// 稍微复杂点
Inventor tesla = new Inventor("Nikola Tesla", "Serbian");
String name = parser.parseExpression("Name?:'Elvis Presley'").getValue(context, tesla, String.class);
Assert.assertEquals("Nikola Tesla", name); // Nikola Tesla

tesla.setName(null);
name = parser.parseExpression("Name?:'Elvis Presley'").getValue(context, tesla, String.class);
Assert.assertEquals("Elvis Presley", name); // Elvis Presley

// 可以把Elivis运算表达式作为 默认值 例如 : @Value("#{systemProperties['pop3.port'] ?: 25}")
}

14. 安全的导航操作符

安全导航操作符用于避免NullPointerException并来自Groovy语言。 通常,在引用对象时,可能需要在访问对象的方法或属性之前验证它是否为null。 为避免这种情况,安全导航操作符返回null而不是抛出异常。 以下示例显示如何使用安全导航运算符:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 安全的导航操作符:
* 安全导航操作符用于避免NullPointerException,它来自Groovy语言。
* 通常,当您有一个对象的引用时,您可能需要在访问该对象的方法或属性之前验证它是否为null。为了避免这种情况,安全导航操作符返回null,而不是抛出异常。
*/
@Test
public void test19() {
Inventor tesla = new Inventor("Nikola Tesla", "Serbian");
tesla.setPlaceOfBirth(new PlaceOfBirth("Smiljan"));

String city = parser.parseExpression("PlaceOfBirth?.City").getValue(context, tesla, String.class);
Assert.assertEquals("Smiljan", city); // Smiljan

tesla.setPlaceOfBirth(null);
city = parser.parseExpression("PlaceOfBirth?.City").getValue(context, tesla, String.class);
Assert.assertNull(city); // null - 没有抛出空指针异常(NullPointerException)!!!
}

15. 集合选择(Collection Selection)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 集合选择(Collection Selection):
* 选择表达式允许通过从源集合的条目中进行选择,将源集合转换为另一个集合。
* 选择表达式通过使用形如 ".?[selectionExpression]" 的语法。它筛选集合并返回包含原始元素子集的新集合。
*/
@Test
public void test20() {
society.getMembers().addAll(inventorList);
context.setRootObject(society);

// 从列表中选出国籍是美国的元素,然后组成新的列表
List<Inventor> list = (List<Inventor>) parser
.parseExpression("Members.?[Nationality == '塞尔维亚裔美籍']")
.getValue(context);
Assert.assertEquals(1, list.size()); // 只有特斯拉是塞尔维亚裔美籍

// 从原字典中选出值大于27的元素,然后组成新的字典
Map<String, Integer> map = new HashMap<>();
map.put("a", 26);
map.put("b", 27);
map.put("c", 28);
context.setVariable("map", map);
Map newMap = (Map) parser.parseExpression("#map.?[value<27]").getValue(context);
Assert.assertEquals(1, newMap.size());
}

16. 集合投影(Collection Projection)

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 集合投影:
* 投影让集合驱动子表达式的求值,结果是一个新的集合。
* 投影的语法是 ".![projectionExpression]"。例如,假设我们有一个发明家列表,但是想要他们出生的城市列表。实际上,我们想要的是“出生地点”。
*/
@Test
public void test21() {
society.getMembers().addAll(inventorList);
context.setRootObject(society);

List<String> placesOfBirth = (List) parser.parseExpression("Members.![placeOfBirth.city]").getValue(context);
MatcherAssert.assertThat(placesOfBirth, IsIterableContainingInOrder.contains("利卡-塞尼县", "米兰", "爱丁堡"));
}

17. 引用

Spring Expression Language (SpEL)

https://jinnianshilongnian.iteye.com/blog/1418309

学习使用Spring batch从CSV文件读取记录,并使用 StaxEventItemWriter 输出经过处理的记录转换为 XML 的数据。

spring-batch-reference-model.png

  1. JobLauncher: 顾名思义,该领域对象就是Job的启动器,其作用就是绑定一组JobParameters到Job上,然后运行该Job。

  2. Job: 定义,配置批处理任务的领域对象,该对象的作用,第一是做Step的容器,配置该批处理任务需要的Step,以及他们之间的逻辑关系。第二是配置该批处理任务的特征,比方说名字,是否可重启,是否对JobParameters进行验证以及验证规则等。

  3. Step: 定义批处理任务中一个对立的逻辑任务处理单元。基本上的业务逻辑处理代码都是封装在Step中的。Step有2种实现形式,一种是Tasklet形式的,这种形式非常自由,开发人员只需要实现Tasklet接口,其中的逻辑完全有自己决定,另一种是Chunk-Oriented形式的,这种形式定义了一个Step的流程必须是“读-处理(可选)-写”,当然Spring Batch也对每一个步骤提供了接口ItemReader, ItemProcessor,ItemWriter还有很多常用的默认实现(读文件,读数据库,写文件,写数据库等等)。 每一个Step只能由一个Tasklet或者一个Chunk构成。

  4. JobRepository: 该领域对象会为Spring Batch的运维数据提供一种持久化机制。其为所有的运维数据的提供CRUD的操作接口,并为所有的操作提供事务支持。

项目概述

在这个应用程序中,我们将执行以下任务:

  1. 使用 FlatFileItemReader 从CSV文件读取交易记录
  2. 使用 CustomItemProcessor 进行项目的业务处理。当 ItemReader 读取一个项目,而 ItemWriter 写入它们时,
    ItemProcessor 提供一个转换或应用其他业务处理的访问点。
  3. 使用 StaxEventItemWriter 获取 CustomItemProcessor 的处理结果,并将它转换成 XML 类型数据作为最终输出。
  4. 使用 MyBatisBatchItemWriter 获取 CustomItemProcessor 的处理结果,并将它转换成 XML 类型数据作为最终输出。
  5. 查看MySQL

工程结构

在这里插入图片描述

Maven 依赖

sqlite-jdbcmysql-connector-java 可以选择其中一个。
当选择其中一种时,同时也要在 applicationContext.xml 文件中做出相应的改动。

改动:

  • 依赖的版本由 platform-bom 来统一管理
  • 添加 mybatis, mybatis-spring
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.littlefxc.example</groupId>
<artifactId>Spring-CSV-to-DB</artifactId>
<version>1.0-snapshot</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring.version>5.0.9.RELEASE</spring.version>
<spring.batch.version>4.0.1.RELEASE</spring.batch.version>
<sqlite.version>3.8.11.2</sqlite.version>
<mysql.version>5.1.47</mysql.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.spring.platform</groupId>
<artifactId>platform-bom</artifactId>
<version>Cairo-RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- MySQL database driver -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- spring的XML文件处理依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

</project>

applicationContext.xml

我们将使用 FlatFileItemReader 读取 CSV 文件。
我们将使用它的标准配置,包括 DefaultLineMapperDelimitedLineTokenizerBeanWrapperFieldSetMapper 类。
为了在XML文件中输出记录,我们将使用 StaxEventItemWriter 作为标准编写器。

改动:

  • 将输出XML变为输出到mysql
  • Spring Batch 持久层框架由 spring-jdbc 改为 mybatis, mybatis-spring

当然,原来的输出 itemWriter 去掉注释后,仍然起作用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">

<context:property-placeholder location="classpath:application.properties"/>
<context:component-scan base-package="com.littlefxc.examples.batch"/>

<!-- 1. 数据库脚本:这里是为了方便起见保证每次重启程序数据库都是最新的(生产环境中不要这么做!!!) -->
<jdbc:initialize-database>
<jdbc:script location="${batch.schema-drop}"/>
<jdbc:script location="${batch.schema-create}"/>
<jdbc:script location="${project.schema-drop}"/>
<jdbc:script location="${project.schema-create}"/>
</jdbc:initialize-database>

<!-- 2. 连接数据库:可以选择其它的数据源实现 -->
<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource"
p:driverClassName="${jdbc.driver-class-name}"
p:url="${jdbc.url}"
p:username="${jdbc.username}"
p:password="${jdbc.password}"/>

<!-- 3. 事务管理 -->
<!--<bean id="transactionManager" class="org.springframework.batch.support.transactionRecord.ResourcelessTransactionManager"/>-->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"
p:dataSource-ref="dataSource"/>

<!-- 3.1 申明式事务 -->
<tx:annotation-driven/>

<!-- 3.2 mybatis 配置 -->
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"
p:dataSource-ref="dataSource"
p:typeAliasesPackage="${mybatis.type-aliases-package}"
p:configLocation="${mybatis.configuration}"/>

<!-- 3.3 mybatis dao 配置 -->
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer"
p:basePackage="com.littlefxc.examples.batch.dao"
p:sqlSessionFactoryBeanName="sqlSessionFactory"/>

<!-- 4. 为JobLauncher,Job和Step实现提供CRUD操作 -->
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"
p:dataSource-ref="dataSource"
p:transactionManager-ref="transactionManager"
p:databaseType="mysql"/>

<!-- 5. JobLauncher表示一个简单的接口,用于使用给定的 JobParameter 启动作业 -->
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"
p:jobRepository-ref="jobRepository"/>

<!-- 6. batch 输入 -->
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
<!-- 输入资源 -->
<property name="resource" value="input/record.csv"/>
<!-- 不读取第一行数据 -->
<property name="linesToSkip" value="1"/>
<!-- 将输入资源转化为对象 -->
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names" value="username,user_id,transaction_date,transaction_amount"/>
</bean>
</property>
<property name="fieldSetMapper" ref="recordFieldSetMapper"/>
</bean>
</property>
</bean>

<!-- 7. batch的处理器 -->
<bean id="itemProcessor" class="com.littlefxc.examples.batch.service.CustomItemProcessor"/>

<!-- 8. batch 输出:输出为xml -->
<!--<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="resource" value="file:xml/output.xml"/>
<property name="marshaller">
&lt;!&ndash; xml 与 对象的转换器 &ndash;&gt;
<bean class="org.springframework.oxm.jaxb.Jaxb2Marshaller"
p:packagesToScan="com.littlefxc.examples.batch.model"/>
</property>
<property name="rootTagName" value="transactionRecord"/>
</bean>-->
<!-- 8. batch 输出:输出到mysql -->
<!--<bean id="itemWriter" class="org.springframework.batch.item.database.JdbcBatchItemWriter"
p:dataSource-ref="dataSource" p:sql="${batch.sql}">
<property name="itemSqlParameterSourceProvider">
<bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider"/>
</property>
</bean>-->
<!-- 8. batch 输出:使用 mybatis 输出到mysql -->
<bean id="itemWriter" class="org.mybatis.spring.batch.MyBatisBatchItemWriter">
<property name="sqlSessionFactory" ref="sqlSessionFactory"/>
<!-- 与 mybatis 关联的 dao 层的接口名:com.littlefxc.examples.batch.dao.TransactionRecordDao.insertTransactionRecord -->
<property name="statementId" value="insertTransactionRecord"/>
</bean>

<!-- 9. 配置batch的输入(6)、处理器(7)、输出(8) -->
<!-- commit-interval:提交事务之前将处理的项目数。 -->
<batch:job id="firstBatchJob">
<batch:step id="step1">
<batch:tasklet>
<batch:chunk reader="itemReader" processor="itemProcessor" writer="itemWriter" commit-interval="2"/>
</batch:tasklet>
</batch:step>
</batch:job>

</beans>

RecordFieldSetMapper

ItemReader 的属性,作用是将 FieldSet 转换为对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.littlefxc.examples.batch.service;

import com.littlefxc.examples.batch.model.TransactionRecord;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

import java.text.ParseException;
import java.text.SimpleDateFormat;

/**
* 将 FieldSet 转换为对象
* @author fengxuechao
* @date 2019/1/4
**/
public class RecordFieldSetMapper implements FieldSetMapper<Transaction> {

public Transaction mapFieldSet(FieldSet fieldSet) throws BindException {
SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
Transaction transactionRecord = new Transaction();

transactionRecord.setUsername(fieldSet.readString("username"));
transactionRecord.setUserId(fieldSet.readInt("user_id"));
transactionRecord.setAmount(fieldSet.readDouble("transaction_amount"));
String dateString = fieldSet.readString("transaction_date");
try {
transactionRecord.setTransactionDate(dateFormat.parse(dateString));
} catch (ParseException e) {
e.printStackTrace();
}
return transactionRecord;
}
}

CustomItemProcessor

自定义实现接口 ItemProcessor, 作为 ItemReaderItemWriter 的转换点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.littlefxc.examples.batch.service;

import com.littlefxc.examples.batch.model.TransactionRecord;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

import java.text.ParseException;
import java.text.SimpleDateFormat;

/**
* 将读取到的数据集合转换为对象
* @author fengxuechao
* @date 2019/1/4
**/
public class RecordFieldSetMapper implements FieldSetMapper<Transaction> {

public Transaction mapFieldSet(FieldSet fieldSet) throws BindException {
SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
Transaction transactionRecord = new Transaction();

transactionRecord.setUsername(fieldSet.readString("username"));
transactionRecord.setUserId(fieldSet.readInt("user_id"));
transactionRecord.setAmount(fieldSet.readDouble("transaction_amount"));
String dateString = fieldSet.readString("transaction_date");
try {
transactionRecord.setTransactionDate(dateFormat.parse(dateString));
} catch (ParseException e) {
e.printStackTrace();
}
return transactionRecord;
}
}

模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.littlefxc.examples.batch.model;

import lombok.Data;

import javax.xml.bind.annotation.XmlRootElement;
import java.util.Date;

/**
* @author fengxuechao
*/
@Data
@XmlRootElement(name = "transactionRecord")
public class Transaction {

private String username;

private int userId;

private Date transactionDate;

private double amount;
}

record.csv

1
2
3
devendra, 1234, 31/10/2015, 10000
john , 2134, 3/12/2015 , 12321
robin , 2134, 2/02/2015 , 23411

启动程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.littlefxc.examples.batch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class App {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext();
context.setConfigLocations("classpath:spring-context.xml");
context.refresh();

JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
Job job = (Job) context.getBean("firstBatchJob");
System.out.println("Starting the batch job");
try {
JobExecution execution = jobLauncher.run(job, new JobParameters());
System.out.println("Job Status : " + execution.getStatus());
System.out.println("Job completed");
} catch (Exception e) {
e.printStackTrace();
System.out.println("Job failed");
}
}
}

验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<?xml version="1.0" encoding="UTF-8"?>
<transactionRecord>
<transactionRecord>
<amount>10000.0</amount>
<transactionDate>2015-10-31T00:00:00+08:00</transactionDate>
<userId>1234</userId>
<username>devendra</username>
</transactionRecord>
<transactionRecord>
<amount>12321.0</amount>
<transactionDate>2015-12-03T00:00:00+08:00</transactionDate>
<userId>2134</userId>
<username>john</username>
</transactionRecord>
<transactionRecord>
<amount>23411.0</amount>
<transactionDate>2015-02-02T00:00:00+08:00</transactionDate>
<userId>2134</userId>
<username>robin</username>
</transactionRecord>
</transactionRecord>

附录:

application.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
batch.schema-drop=org/springframework/batch/core/schema-drop-mysql.sql
batch.schema-create=org/springframework/batch/core/schema-mysql.sql
batch.sql=INSERT INTO transaction_record (user_id, username, transaction_date, amount) VALUES (:userId, :username, :transactionDate, :amount)

jdbc.url=jdbc:mysql://192.168.120.63:3306/batch?useSSL=false
jdbc.username=root
jdbc.password=123456
jdbc.driver-class-name=com.mysql.jdbc.Driver

# 自定义数据库删除脚本
project.schema-drop=classpath:schema-drop.sql
# 自定义数据库创建脚本
project.schema-create=classpath:schema.sql

# Mybatis Config
mybatis.configuration=classpath:mybatis-config.xml
mybatis.type-aliases-package=com.littlefxc.examples.batch.model
mybatis.mapper.base-package=com.littlefxc.examples.batch.dao

mybatis-config.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-config.dtd">

<configuration>
<settings>
<setting name="cacheEnabled" value="true"/>
<setting name="lazyLoadingEnabled" value="true"/>
<setting name="multipleResultSetsEnabled" value="true"/>
<setting name="useColumnLabel" value="true"/>
<setting name="useGeneratedKeys" value="false"/>
<setting name="autoMappingBehavior" value="PARTIAL"/>
<setting name="autoMappingUnknownColumnBehavior" value="WARNING"/>
<setting name="defaultExecutorType" value="SIMPLE"/>
<setting name="defaultStatementTimeout" value="25"/>
<setting name="defaultFetchSize" value="100"/>
<setting name="safeRowBoundsEnabled" value="false"/>
<setting name="mapUnderscoreToCamelCase" value="false"/>
<setting name="localCacheScope" value="SESSION"/>
<setting name="jdbcTypeForNull" value="OTHER"/>
<setting name="lazyLoadTriggerMethods" value="equals,clone,hashCode,toString"/>
</settings>
</configuration>

schema.sql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/*
Navicat Premium Data Transfer

Source Server : localhost
Source Server Type : MySQL
Source Server Version : 50722
Source Host : localhost:3306
Source Schema : batch

Target Server Type : MySQL
Target Server Version : 50722
File Encoding : 65001

Date: 31/01/2019 10:27:20
*/

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for transaction_record
-- ----------------------------
# DROP TABLE IF EXISTS `transaction_record`;
CREATE TABLE `transaction_record` (
`username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`user_id` bigint(20) NOT NULL,
`transaction_date` datetime(6) NOT NULL,
`amount` double(11, 0) NOT NULL,
PRIMARY KEY (`username`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

schema-drop.sql

1
DROP TABLE IF EXISTS `transaction_record`;