当前位置: 首页>后端>正文

Java业务常见开发错误-线程安全工具

ThreadLocal没有清除

如果前面一个线程没有清除的话,后面一个线程会直接拿到上面一个线程的结果,污染当前线程。当然前提是使用了线程池。

@RestController
@RequestMapping("/concurrent/threadlocal")
public class ThreadLocalConcurrentTestController {
    private static final ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null);

    @GetMapping("wrong")
    public Map<String, String> wrong(@RequestParam("userId") Integer userId) {
        String before = Thread.currentThread().getName() + ":" + currentUser.get();
        currentUser.set(userId);
        String after = Thread.currentThread().getName() + ":" + currentUser.get();
        Map<String, String> result = new HashMap<>();
        result.put("before", before);
        result.put("after", after);
        return result;
    }

    @GetMapping("right")
    public Map<String, String> right(@RequestParam("userId") Integer userId) {
        String before = Thread.currentThread().getName() + ":" + currentUser.get();
        currentUser.set(userId);
        try {
            String after = Thread.currentThread().getName() + ":" + currentUser.get();
            Map<String, String> result = new HashMap<>();
            result.put("before", before);
            result.put("after", after);
            return result;
        } finally {
            currentUser.remove();
        }
    }
}

初始化tomcat的连接是10,配置了最大连接数是1好像没有用,所以请求了11次,第11次的结果结果:

Java业务常见开发错误-线程安全工具,第1张
5.png

这个问题其实是有两点保护措施的

  • 在一个线程任务执行结束后在finally中手动将ThreadLocal变量清除
  • 每一个线程任务开始前不相信当前的ThreadLocal变量是干净的,也删除一次
    我们的系统中封装了一些ThreadLocal的变量,而且统一在Transaction结束的时候清空,我们自己其实并不需要再手写ThreadLocal了。而且对于分库这种系统最重要的业务逻辑,也会在使用之前就先清空一次ThreadLocal变量

ConcurrentHashMap

ConcurrentHashMap只做到了对于Map的读写是线程安全的,对于很多聚合方法其实是线程不安全的。

下面一段示例是,先向一个ConcurrentHashMap中插入900条数据,然后起10个线程,每一个线程执行的操作是获取当前状态下map的size和1000的差值,然后向map里添加对应数量的内容。

public class ConcurrentHashMapTestController {
    private static int THREAD_COUNT = 10;
    private static int ITEM_COUNT = 1000;

    private ConcurrentHashMap<String, Long> getData(int count) {
        return LongStream.rangeClosed(1, count)
                .boxed()
                .collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(), Function.identity(),
                        (o1, o2) -> o1, ConcurrentHashMap::new));
    }

    @GetMapping("wrong")
    public String wrong() throws InterruptedException {
        ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
        log.info("init size:{}", concurrentHashMap.size());
        ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
        forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
            int gap = ITEM_COUNT - concurrentHashMap.size();
            log.info("gap size:{}", gap);
            concurrentHashMap.putAll(getData(gap));
        }));
        forkJoinPool.shutdown();
        forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
        log.info("finish size:{}", concurrentHashMap.size());
        return "OK";
    }

    @GetMapping("right")
    public String right() throws InterruptedException {
        ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
        log.info("init size:{}", concurrentHashMap.size());

        ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
        forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
            synchronized (concurrentHashMap) {
                int gap = ITEM_COUNT - concurrentHashMap.size();
                log.info("gap size:{}", gap);
                concurrentHashMap.putAll(getData(gap));
            }
        }));
        forkJoinPool.shutdown();
        forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
        log.info("finish size:{}", concurrentHashMap.size());
        return "OK";
    }
}

如果就是简单的多线程跑,看到结果其实最后添加到了1100+,正常情况应该是只有一个worker看到还剩100个,其它worker都应该是0,这说明其实size并不是一个线程安全方法


Java业务常见开发错误-线程安全工具,第2张
2.png

在锁住了map之后结果是正常的:


Java业务常见开发错误-线程安全工具,第3张
3.png

充分发挥并发工具类的特性

假设有这么一个场景,创建10个线程,每个线程循环1000万次,每次生成一个随机key。最后统计所有线程生成的随机key中,每一个key的出现次数。

这个地方肯定不能这么写:

public void count() {
    if (map.containsKey(key)) {
    map.put(key, map.get(key) + 1);
  } else {
    map.put(key, 1);
  }
}

就算用的是concurrentHashMap,也把应该的一个原子操作拆成了两步,这就和之前我们的redis锁失效一个道理。

想让它不出错,就在这个逻辑的外面锁住map对象,但是锁住map对象会导致很严重的性能问题。

这时候可以这么做:

public void count() {
    map.computeIfAbsent(key, new LongAdder()).increment();
}

都是原子性的操作,充分应用了多线程的性能,又保证了安全

认清并发工具的使用场

这里举的例子是CopyOnWriteArrayList,这个东西是一个线程安全的ArrayList,但是每次做add操作都会复制当前的List。所以当写操作非常多的时候,使用这个List就会耗费很多的资源

一共四个例子,前面两个例子对于我们的系统还是有一些参考的,后面就是简单看看就可以,暂时用不上。值得思考的是我们在使用一个新的库的时候,容易出现使用不当的问题。比如我要用ConrrentHashMap,我肯定不会看官方文档,并且认为里面的所有方法都是线程安全的,所以当我调用size的时候就有可能出问题。这种也不可能要求使用的时候都去看明白官方文档,只能说用的时候注意一下


https://www.xamrdz.com/backend/3cn1941835.html

相关文章: