共计 6647 个字符,预计需要花费 17 分钟才能阅读完成。
Magician-Concurrent
Magician-Concurrent 是一个并发编程工具包,当你需要并发执行某些代码的时候,不需要自己创建和管理线程,除此之外里面还提供了生产者与消费者模型
初始化配置
导入依赖
com.github.yuyenews
Magician-Concurrent
1.0.0
并发处理任务
MagicianConcurrent.getConcurrentTaskSync()
.setTimeout(1000) // 超时时间
.setTimeUnit(TimeUnit.MILLISECONDS) // 超时时间的单位
.add(() -> { // 添加一个任务
// 在这里可以写上任务的业务逻辑
}, (result, e) -> {
// 此任务处理后的回调
if(result.equals(ConcurrentTaskResultEnum.FAIL)){// 任务失败,此时 e 里面有详细的异常信息} else if(result.equals(ConcurrentTaskResultEnum.SUCCESS)) {// 任务成功,此时 e 是空的}
})
.add(() -> { // 添加一个任务
// 在这里可以写上任务的业务逻辑
}, (result, e) -> {
// 此任务处理后的回调
if(result.equals(ConcurrentTaskResultEnum.FAIL)){// 任务失败,此时 e 里面有详细的异常信息} else if(result.equals(ConcurrentTaskResultEnum.SUCCESS)) {// 任务成功,此时 e 是空的}
})
.start();
添加进去的任务会并发执行,但是在它们执行完之前,这整个代码块会同步等待在这,一直等到所有任务执行完或者超时才会继续往下走。
这里面的超时时间就是用来设置同步等待多久的。
- 如果设置为 0 表示一直等到所有任务完成为止
- 设置为大于 0 的时候,表示只等待这么久
并发处理 List,Set 等所有 Collection 类的集合里的元素
同步执行
// 假如有一个 List 需要并发处理里面的元素
List dataList = new ArrayList<>();
每个元素并发执行
// 只需要将他传入 syncRunner 方法即可
MagicianConcurrent.getConcurrentCollectionSync()
.syncRunner(dataList, data -> {
// 这里可以拿到 List 里的元素,进行处理
// List 里的元素是什么类型,这个 data 就是什么类型
System.out.println(data);
},
10, // 每组多少条元素
1, // 每组之间同步等待多久
TimeUnit.MINUTES // 等待的时间单位
);
这个方法会将传进去的集合分成若干组,每组的大小由参数指定。
这些组会排队执行,但是每一组在执行的时候都是并发的,里面的每一个元素都会由单独的线程去处理。
需要等一组处理完了,才会处理下一组,但是有时候我们不想这么死板的等待,所以可以设置一个超时时间,超过了这个期限就不等了,直接进行下一组,所以这里的最后两个参数就是用来设置这个期限的。
每一组并发执行
// 也可以用 syncGroupRunner 方法
MagicianConcurrent.getConcurrentCollectionSync()
.syncGroupRunner(dataList, data -> {
// 这里可以拿到每一组的元素,进行处理
// 这个 data 就是每一组 List,可以自己迭代处理
System.out.println(data);
},
10, // 每组多少条元素
1, // 每组之间同步等待多久
TimeUnit.MINUTES // 等待的时间单位
);
这个方法会将传进去的集合分成若干组,每组的大小由参数指定。
每一组由单独的线程处理。
会一直同步等待在这里,直到所有组都处理完了才会进行下一步,但是有时候我们不想这么死板的等待,所以可以设置一个超时时间,超过了这个期限就不等了,直接执行下一步。所以这里的最后两个参数就是用来设置这个期限的。
异步执行
其实就是将上面 [同步处理] 的代码放到了一个线程里,内部处理依然是上面 [同步处理] 的逻辑,但是这整个代码块将会异步执行,不需要等在这。所以个别相同的参数就不再重复解释了。
// 假如有一个 List 需要并发处理里面的元素
List dataList = new ArrayList<>();
每个元素并发执行
// 只需要将他传入 asyncRunner 方法即可
MagicianConcurrent.ConcurrentCollectionAsync(
1, // 核心线程数
1, // 最大线程数
1, // 线程空闲时间
TimeUnit.MINUTES // 空闲时间单位
.asyncRunner(dataList, data -> {
// 这里可以拿到 List 里的元素,进行处理
System.out.println(data);
},
10, // 每组多少条元素
1, // 每组之间同步等待多久
TimeUnit.MINUTES // 等待的时间单位
);
ConcurrentCollectionAsync 里的参数其实就是线程池的参数,除了上面这种写法,还可以这样写。
每调用一次 asyncRunner 都会占用一个线程,而这些线程都是由一个线程池在管理。
ConcurrentCollectionAsync concurrentCollectionAsync = MagicianConcurrent.ConcurrentCollectionAsync(
1, // 核心线程数
1, // 最大线程数
1, // 线程空闲时间
TimeUnit.MINUTES // 空闲时间单位
);
concurrentCollectionAsync.asyncRunner(dataList, data -> {
// 这里可以拿到 List 里的元素,进行处理
System.out.println(data);
},
10, // 每组多少条元素
1, // 每组之间同步等待多久
TimeUnit.MINUTES // 等待的时间单位
);
concurrentCollectionAsync.asyncRunner(dataList2, data -> {
// 这里可以拿到 List 里的元素,进行处理
System.out.println(data);
},
10, // 每组多少条元素
1, // 每组之间同步等待多久
TimeUnit.MINUTES // 等待的时间单位
);
concurrentCollectionAsync.asyncRunner(dataList3, data -> {
// 这里可以拿到 List 里的元素,进行处理
System.out.println(data);
},
10, // 每组多少条元素
1, // 每组之间同步等待多久
TimeUnit.MINUTES // 等待的时间单位
);
用这个方法可以管理线程池
// 关闭线程池
concurrentCollectionAsync.shutdown();
// 立刻关闭线程池
concurrentCollectionAsync.shutdownNow();
// 获取线程池
ThreadPoolExecutor threadPoolExecutor = concurrentCollectionAsync.getPoolExecutor();
每一组并发执行
// 也可以用 asyncGroupRunner 方法,每个参数的具体含义可以参考文档
MagicianConcurrent.ConcurrentCollectionAsync(
1, // 核心线程数
1, // 最大线程数
1, // 线程空闲时间
TimeUnit.MINUTES // 空闲时间单位
.asyncGroupRunner(dataList, data -> {
// 这里可以拿到 List 里的元素,进行处理
System.out.println(data);
},
10, // 每组多少条元素
1, // 每组之间同步等待多久
TimeUnit.MINUTES // 等待的时间单位
同上
并发处理所有 Map 类的集合里的元素
Map 的逻辑跟 Collection 一模一样,只不过是传入的集合变成了 Map,就不再累述了,感谢理解。
同步执行
每个元素并发执行
// 假如有一个 Map 需要并发处理里面的元素
Map dataMap = new HashMap<>();
// 只需要将他传入 syncRunner 方法即可
MagicianConcurrent.getConcurrentMapSync()
.syncRunner(dataMap, (key, value) -> {
// 这里可以拿到 Map 里的元素,进行处理
System.out.println(key);
System.out.println(value);
}, 10, 1, TimeUnit.MINUTES);
每一组并发执行
// 也可以用 syncGroupRunner 方法
MagicianConcurrent.getConcurrentMapSync()
.syncGroupRunner(dataMap, data -> {
// 这里可以拿到每一组 Map 进行处理
System.out.println(data);
}, 10, 1, TimeUnit.MINUTES);
异步执行
每个元素并发执行
// 假如有一个 Map 需要并发处理里面的元素
Map dataMap = new HashMap<>();
// 只需要将他传入 asyncRunner 方法即可
MagicianConcurrent.getConcurrentMapAsync(
1,
1,
1,
TimeUnit.MINUTES
).asyncRunner(dataMap, (key, value) -> {
// 这里可以拿到 Map 里的元素,进行处理
System.out.println(key);
System.out.println(value);
}, 10, 1, TimeUnit.MINUTES);
每一组并发执行
// 也可以用 asyncGroupRunner 方法
MagicianConcurrent.getConcurrentMapAsync(
1,
1,
1,
TimeUnit.MINUTES
).asyncGroupRunner(dataMap, data -> {
// 这里可以拿到每一组 Map 进行处理
System.out.println(data);
}, 10, 1, TimeUnit.MINUTES);
生产者与消费者
这是一个多对多的模型,多个生产者可以给多个消费者推送不同类型的数据,
// 创建一组生产者与消费者,而这样组可以创建无限个
// 每一组的生产者都只会把数据推送给同一组的消费者
MagicianConcurrent.getProducerAndConsumerManager()
.addProducer(new MagicianProducer() { // 添加一个生产者(可以添加多个)/**
* 设置 ID,必须全局唯一,默认是当前类的全名
* 如果采用默认值,可以不重写这个方法
* @return
*/
@Override
public String getId() {return super.getId();
}
/**
* 设置 producer 方法是否重复执行,默认重复
* 如果采用默认值,可以不重写这个方法
* @return
*/
@Override
public boolean getLoop() {return super.getLoop();
}
/**
* 设置 是否等消费者全部空闲了才继续生产下一轮数据,默认 false
* 如果采用默认值,可以不重写这个方法
* @return
*/
@Override
public boolean getAllFree() {return super.getAllFree();
}
/**
* 当生产者启动后,会自动执行这个方法,我们可以在这个方法里生产数据,并通过 publish 方法发布给消费者
*
* 这边举一个例子
* 假如我们需要不断地扫描某张表,根据里面的数据状态去执行一些业务逻辑
* 那么我们可以在这个方法里写一个查询的逻辑,然后将查询到数据发送给消费者
*/
@Override
public void producer() {
// 根据上面的例子,我们可以查询这张表里符合条件的数据
List