老师我现在有个任务,和您的例子有相似的地方,是从一个库里查询多张表的数据同步到另外一个库,就有双重for循环,最外层用与多张表的遍历,内层的for循环用于批量读取某一张表的数据,因为数据量可能在几万条,我想分批次读出来再同步到另一个数据库,昨天写的时候用的是futuretask,今天正好看到老师的文章就改成了CompletableFuture,还没有用异常处理的,后面我还要看看怎么加上异常处理的。其它的不知道我用的对不对,请老师看看:
// 初始化异步工具类,分别异步执行2个任务
CompletableFuture<List<PBSEnergyData>> asyncAquirePBSEnergyData = new CompletableFuture();
CompletableFuture<List<AXEEnergyData>> asyncSaveAxeEnergyData = new CompletableFuture();
// 初始化两个线程池, 分别用于2个任务 ,1个任务一个线程池,互不干扰
Executor aquirePBSEnergyDataExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
Executor saveAxeEnergyDataExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
queryUtils.getTableNames().forEach(tableName -> {
int pageSize = queryUtils.getPageSize();
//查询该表有多少条数据,每${pageSize}条一次
int count = pbsEnergyService.getCount(tableName);
//总页数
int pages = count / pageSize;
int pageNum = 0;
final int pageNo = pageNum;
for(pageNum = 0; pageNum <= pages; pageNum++){
// 异步获取PBS数据库的数据并返回结果
asyncAquirePBSEnergyData
.supplyAsync(() -> {
查询数据库
return pbsEnergyDatas;
},aquirePBSEnergyDataExecutor)
// 任务2任务1,任务1返回的结果
.thenApply(pbsEnergyDatas -> asyncSaveAxeEnergyData.runAsync(()->{
List<AXEEnergyData> axeEnergyDatas = pbsEnergyDatas.stream().map(pbsEnergyData -> {
//进行类型转换
}).collect(Collectors.toList());
//批量保存
},saveAxeEnergyDataExecutor));
}
});
全部贴上去,超过字符数了,只能请老师凑合看了 :(
展开
作者回复: 有个地方需要注意:runAsync和supplyAsync都是静态方法。
线程池设置的太小了,这是个IO密集型的任务
thenApply里面的runAsync我觉得好像是没有必要,增加了复杂的了。
如果thenApply里面需要异步,可以用thenApplyAsync