ThreadLocal没有清除
如果前面一个线程没有清除的话,后面一个线程会直接拿到上面一个线程的结果,污染当前线程。当然前提是使用了线程池。
@RestController
@RequestMapping("/concurrent/threadlocal")
public class ThreadLocalConcurrentTestController {
private static final ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null);
@GetMapping("wrong")
public Map<String, String> wrong(@RequestParam("userId") Integer userId) {
String before = Thread.currentThread().getName() + ":" + currentUser.get();
currentUser.set(userId);
String after = Thread.currentThread().getName() + ":" + currentUser.get();
Map<String, String> result = new HashMap<>();
result.put("before", before);
result.put("after", after);
return result;
}
@GetMapping("right")
public Map<String, String> right(@RequestParam("userId") Integer userId) {
String before = Thread.currentThread().getName() + ":" + currentUser.get();
currentUser.set(userId);
try {
String after = Thread.currentThread().getName() + ":" + currentUser.get();
Map<String, String> result = new HashMap<>();
result.put("before", before);
result.put("after", after);
return result;
} finally {
currentUser.remove();
}
}
}
初始化tomcat的连接是10,配置了最大连接数是1好像没有用,所以请求了11次,第11次的结果结果:
这个问题其实是有两点保护措施的
- 在一个线程任务执行结束后在finally中手动将ThreadLocal变量清除
- 每一个线程任务开始前不相信当前的ThreadLocal变量是干净的,也删除一次
我们的系统中封装了一些ThreadLocal的变量,而且统一在Transaction结束的时候清空,我们自己其实并不需要再手写ThreadLocal了。而且对于分库这种系统最重要的业务逻辑,也会在使用之前就先清空一次ThreadLocal变量
ConcurrentHashMap
ConcurrentHashMap只做到了对于Map的读写是线程安全的,对于很多聚合方法其实是线程不安全的。
下面一段示例是,先向一个ConcurrentHashMap中插入900条数据,然后起10个线程,每一个线程执行的操作是获取当前状态下map的size和1000的差值,然后向map里添加对应数量的内容。
public class ConcurrentHashMapTestController {
private static int THREAD_COUNT = 10;
private static int ITEM_COUNT = 1000;
private ConcurrentHashMap<String, Long> getData(int count) {
return LongStream.rangeClosed(1, count)
.boxed()
.collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(), Function.identity(),
(o1, o2) -> o1, ConcurrentHashMap::new));
}
@GetMapping("wrong")
public String wrong() throws InterruptedException {
ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
log.info("init size:{}", concurrentHashMap.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
int gap = ITEM_COUNT - concurrentHashMap.size();
log.info("gap size:{}", gap);
concurrentHashMap.putAll(getData(gap));
}));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
log.info("finish size:{}", concurrentHashMap.size());
return "OK";
}
@GetMapping("right")
public String right() throws InterruptedException {
ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
log.info("init size:{}", concurrentHashMap.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
synchronized (concurrentHashMap) {
int gap = ITEM_COUNT - concurrentHashMap.size();
log.info("gap size:{}", gap);
concurrentHashMap.putAll(getData(gap));
}
}));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
log.info("finish size:{}", concurrentHashMap.size());
return "OK";
}
}
如果就是简单的多线程跑,看到结果其实最后添加到了1100+,正常情况应该是只有一个worker看到还剩100个,其它worker都应该是0,这说明其实size并不是一个线程安全方法
在锁住了map之后结果是正常的:
充分发挥并发工具类的特性
假设有这么一个场景,创建10个线程,每个线程循环1000万次,每次生成一个随机key。最后统计所有线程生成的随机key中,每一个key的出现次数。
这个地方肯定不能这么写:
public void count() {
if (map.containsKey(key)) {
map.put(key, map.get(key) + 1);
} else {
map.put(key, 1);
}
}
就算用的是concurrentHashMap,也把应该的一个原子操作拆成了两步,这就和之前我们的redis锁失效一个道理。
想让它不出错,就在这个逻辑的外面锁住map对象,但是锁住map对象会导致很严重的性能问题。
这时候可以这么做:
public void count() {
map.computeIfAbsent(key, new LongAdder()).increment();
}
都是原子性的操作,充分应用了多线程的性能,又保证了安全
认清并发工具的使用场
这里举的例子是CopyOnWriteArrayList,这个东西是一个线程安全的ArrayList,但是每次做add操作都会复制当前的List。所以当写操作非常多的时候,使用这个List就会耗费很多的资源
一共四个例子,前面两个例子对于我们的系统还是有一些参考的,后面就是简单看看就可以,暂时用不上。值得思考的是我们在使用一个新的库的时候,容易出现使用不当的问题。比如我要用ConrrentHashMap,我肯定不会看官方文档,并且认为里面的所有方法都是线程安全的,所以当我调用size的时候就有可能出问题。这种也不可能要求使用的时候都去看明白官方文档,只能说用的时候注意一下