一般的生产者消费者模型中,生产者和消费者都是尽可能快地处理任务。但在工作中,我遇到了一种情况,需要每个消费者尽可能多地解决一批任务,这样可以打包处理,降低I/O频次。
我当时用的方法是在消费者端给BlockingQueue加锁。后来想想这种方法多余了。
这篇文章一是讨论一下这种方法,作个反思,二来作为新博客的第一篇文章,起个开头。
模拟当时的解决方法
用来解决生产者消费者问题的BlockingQueue:
private static final BlockingQueue<Task> taskBlockingQueue = new ArrayBlockingQueue<>(100);
生产者部分没有什么区别,直接往队列里添加任务:
private void produce(int taskId) {
try {
taskBlockingQueue.put(new Task(taskId));
System.out.println(String.format("生产者%d\t添加任务%d", id, taskId));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
消费者部分在打包的过程中都对阻塞队列加锁,不允许其他消费者获取任务:
private static final Lock packageLock = new ReentrantLock();
消费者需要在指定时间内打包,超时则退出这轮消费。
private void consume() {
try {
if (packageLock.tryLock(5, TimeUnit.MILLISECONDS)) {
try {
doPackage();
} finally {
packageLock.unlock();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void doPackage() {
long start = System.currentTimeMillis();
long end;
int packageNum = 0;
for (int i = 0; i < consumerPackageSize; i++) {
doConsume();
packageNum++;
end = System.currentTimeMillis();
if (end - start > packageTime) {
break;
}
}
end = System.currentTimeMillis();
System.out.println(String.format("消费者%d\t打包%d个\t耗时%d", id, packageNum, end - start));
}
private void doConsume() {
Task task = null;
try {
task = taskBlockingQueue.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (task == null) {
return;
}
System.out.println(String.format("消费者%d\t完成任务%d", id, task.getId()));
}
整个完整的测试类:
import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author Junious
* @date 2019/02/25
**/
public class ProducerAndConsumerTest {
private static final int producerNumber = 5;
private static final int consumerNumber = 5;
private static final BlockingQueue<Task> taskBlockingQueue = new ArrayBlockingQueue<>(100);
private static final Lock packageLock = new ReentrantLock();
private static final int consumerPackageSize = 20;
private static final int packageTime = 2000;
public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(
new Thread(() -> System.out.println
("queue size:" + taskBlockingQueue.size()))
);
ProducerAndConsumerTest test = new ProducerAndConsumerTest();
test.init();
}
private void init() {
//add producers
for (int i = 0; i < producerNumber; i++) {
Thread t = new Thread(new Producer(i));
t.start();
}
//add consumers
for (int i = 0; i < consumerNumber; i++) {
Thread t = new Thread(new Consumer(i));
t.start();
}
}
@AllArgsConstructor
@Data
class Producer implements Runnable {
private int id;
@Override
public void run() {
Random random = new Random();
while (true) {
int taskId = random.nextInt(1000) + 1;
produce(taskId);
try {
Thread.sleep(random.nextInt(300) + 400);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
private void produce(int taskId) {
try {
taskBlockingQueue.put(new Task(taskId));
System.out.println(String.format("生产者%d\t添加任务%d", id, taskId));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@AllArgsConstructor
@Data
class Consumer implements Runnable {
private int id;
@Override
public void run() {
Random random = new Random();
while (true) {
consume();
try {
Thread.sleep(random.nextInt(300) + 400);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
private void consume() {
try {
if (packageLock.tryLock(5, TimeUnit.MILLISECONDS)) {
try {
doPackage();
} finally {
packageLock.unlock();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void doPackage() {
long start = System.currentTimeMillis();
long end;
int packageNum = 0;
for (int i = 0; i < consumerPackageSize; i++) {
doConsume();
packageNum++;
end = System.currentTimeMillis();
if (end - start > packageTime) {
break;
}
}
end = System.currentTimeMillis();
System.out.println(String.format("消费者%d\t打包%d个\t耗时%d", id, packageNum, end - start));
}
private void doConsume() {
Task task = null;
try {
task = taskBlockingQueue.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (task == null) {
return;
}
System.out.println(String.format("消费者%d\t完成任务%d", id, task.getId()));
}
}
}
@AllArgsConstructor
@Data
class Task {
private int id;
}
截取一段测试结果:
生产者0 添加任务545
生产者2 添加任务943
生产者1 添加任务359
生产者3 添加任务97
生产者4 添加任务705
消费者2 完成任务545
消费者2 完成任务359
消费者2 完成任务943
消费者2 完成任务97
消费者2 完成任务705
生产者2 添加任务32
消费者2 完成任务32
生产者4 添加任务488
消费者2 完成任务488
生产者1 添加任务691
消费者2 完成任务691
消费者2 完成任务3
生产者3 添加任务3
生产者0 添加任务815
消费者2 完成任务815
消费者2 完成任务290
生产者1 添加任务290
消费者2 完成任务408
生产者2 添加任务408
消费者2 完成任务873
生产者3 添加任务873
消费者2 打包20个 耗时1165
生产者0 添加任务852
消费者1 完成任务852
消费者1 完成任务743
生产者4 添加任务743
生产者0 添加任务114
消费者1 完成任务114
生产者1 添加任务454
消费者1 完成任务454
消费者1 完成任务920
生产者2 添加任务920
生产者3 添加任务847
消费者1 完成任务847
生产者4 添加任务905
消费者1 完成任务905
生产者3 添加任务698
消费者1 完成任务698
生产者1 添加任务372
消费者1 完成任务372
生产者0 添加任务568
消费者1 完成任务568
生产者2 添加任务419
消费者1 完成任务419
生产者4 添加任务417
消费者1 完成任务417
消费者1 打包20个 耗时1295
生产者3 添加任务888
消费者0 完成任务888
生产者1 添加任务189
消费者0 完成任务189
生产者2 添加任务892
消费者0 完成任务892
生产者4 添加任务375
消费者0 完成任务375
生产者0 添加任务723
消费者0 完成任务723
生产者3 添加任务543
消费者0 完成任务543
消费者0 完成任务205
生产者1 添加任务205
生产者2 添加任务657
消费者0 完成任务657
生产者0 添加任务549
消费者0 完成任务549
生产者4 添加任务812
消费者0 完成任务812
生产者3 添加任务737
消费者0 完成任务737
消费者0 打包20个 耗时1208
生产者2 添加任务784
消费者4 完成任务784
生产者1 添加任务252
消费者4 完成任务252
生产者0 添加任务622
消费者4 完成任务622
生产者4 添加任务524
消费者4 完成任务524
生产者3 添加任务73
消费者4 完成任务73
生产者2 添加任务491
消费者4 完成任务491
生产者0 添加任务225
消费者4 完成任务225
生产者1 添加任务207
消费者4 完成任务207
生产者4 添加任务326
消费者4 完成任务326
生产者2 添加任务983
消费者4 完成任务983
生产者0 添加任务865
消费者4 完成任务865
生产者3 添加任务347
消费者4 完成任务347
消费者4 打包20个 耗时1318
可以看到每次打包只有一个消费者在进行消费,其实相当于只有一个消费者线程,等于没有使用并发。
当消费者任务很耗时时:
private void doConsume() {
Task task = null;
try {
task = taskBlockingQueue.poll(100, TimeUnit.MILLISECONDS);
//模拟耗时
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (task == null) {
return;
}
System.out.println(String.format("消费者%d\t完成任务%d", id, task.getId()));
}
这时候在中止时可以看到队列有10到30不等的Task暂留。模拟耗时越长,暂留的越多,也就是相当于性能越差。
解决方案
实际上不需要加锁,设定一个超时时间即可。
@AllArgsConstructor
@Data
class Consumer2 implements Runnable {
private int id;
@Override
public void run() {
Random random = new Random();
while (true) {
try {
consume();
Thread.sleep(random.nextInt(300) + 400);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
private void consume() throws InterruptedException {
long start = System.currentTimeMillis();
ArrayList<Task> tasks = new ArrayList<>();
long end;
int packageNum = 0;
for (int i = 0; i < consumerPackageSize; i++) {
//模拟打包任务
Task task = taskBlockingQueue.poll(packageTime, TimeUnit.MILLISECONDS);
if (task == null) {
continue;
}
Thread.sleep(200);
tasks.add(task);
packageNum++;
end = System.currentTimeMillis();
if (end - start > packageTime) {
break;
}
}
end = System.currentTimeMillis();
System.out.println(String.format("消费者%d\t打包%d个\t耗时%d\t%s", id, packageNum, end - start, tasks));
}
}
生产者2 添加任务539
生产者0 添加任务319
生产者1 添加任务655
生产者4 添加任务843
生产者3 添加任务788
生产者0 添加任务716
生产者3 添加任务176
生产者4 添加任务735
生产者2 添加任务7
生产者1 添加任务283
生产者4 添加任务466
生产者3 添加任务486
生产者0 添加任务649
生产者2 添加任务373
生产者1 添加任务158
生产者0 添加任务532
生产者4 添加任务914
生产者1 添加任务734
生产者3 添加任务571
生产者2 添加任务114
生产者1 添加任务340
生产者3 添加任务670
生产者0 添加任务482
生产者2 添加任务298
生产者4 添加任务598
消费者0 打包5个 耗时2213 [Task(id=655), Task(id=716), Task(id=466), Task(id=532), Task(id=340)]
消费者4 打包5个 耗时2280 [Task(id=319), Task(id=176), Task(id=486), Task(id=914), Task(id=670)]
消费者1 打包5个 耗时2328 [Task(id=788), Task(id=735), Task(id=649), Task(id=734), Task(id=482)]
消费者3 打包5个 耗时2351 [Task(id=843), Task(id=7), Task(id=373), Task(id=571), Task(id=298)]
消费者2 打包5个 耗时2400 [Task(id=539), Task(id=283), Task(id=158), Task(id=114), Task(id=598)]
生产者3 添加任务928
生产者0 添加任务360
生产者1 添加任务724
生产者2 添加任务539
生产者4 添加任务926
生产者3 添加任务206
生产者0 添加任务596
生产者1 添加任务841
生产者4 添加任务834
生产者2 添加任务340
生产者3 添加任务585
生产者1 添加任务500
生产者4 添加任务532
生产者0 添加任务800
生产者2 添加任务914
生产者3 添加任务202
生产者1 添加任务850
生产者0 添加任务506
生产者1 添加任务785
生产者2 添加任务633
生产者4 添加任务182
生产者3 添加任务154
生产者0 添加任务13
生产者2 添加任务880
消费者3 打包5个 耗时2199 [Task(id=724), Task(id=841), Task(id=532), Task(id=506), Task(id=13)]
生产者4 添加任务214
消费者0 打包5个 耗时2217 [Task(id=539), Task(id=834), Task(id=800), Task(id=785), Task(id=880)]
生产者1 添加任务789
生产者3 添加任务786
消费者4 打包6个 耗时2583 [Task(id=928), Task(id=926), Task(id=340), Task(id=914), Task(id=633), Task(id=214)]
生产者0 添加任务468
生产者2 添加任务189
消费者1 打包6个 耗时2597 [Task(id=360), Task(id=206), Task(id=585), Task(id=202), Task(id=182), Task(id=789)]
消费者2 打包5个 耗时2465 [Task(id=596), Task(id=500), Task(id=850), Task(id=154), Task(id=786)]
生产者4 添加任务812
生产者0 添加任务239
生产者3 添加任务671
生产者1 添加任务730
生产者2 添加任务124
生产者4 添加任务679
生产者0 添加任务320
生产者2 添加任务917
生产者1 添加任务986
生产者3 添加任务557
生产者0 添加任务415
生产者4 添加任务559
生产者2 添加任务880
生产者1 添加任务920
生产者3 添加任务502
生产者0 添加任务679
生产者4 添加任务823
生产者2 添加任务594
生产者1 添加任务336
生产者3 添加任务502
生产者0 添加任务453
生产者4 添加任务360
消费者0 打包6个 耗时2249 [Task(id=468), Task(id=239), Task(id=679), Task(id=415), Task(id=679), Task(id=453)]
消费者3 打包6个 耗时2320 [Task(id=189), Task(id=671), Task(id=320), Task(id=559), Task(id=823), Task(id=360)]
生产者2 添加任务823
生产者3 添加任务857
生产者1 添加任务395
生产者0 添加任务937
生产者4 添加任务817
消费者2 打包4个 耗时2264 [Task(id=917), Task(id=880), Task(id=594), Task(id=823)]
消费者4 打包6个 耗时2622 [Task(id=812), Task(id=730), Task(id=986), Task(id=920), Task(id=336), Task(id=857)]
消费者1 打包5个 耗时2408 [Task(id=124), Task(id=557), Task(id=502), Task(id=502), Task(id=395)]
这时候中止可以看到队列几乎没有Task暂留
当设置消费者消费时间为1000ms时,运行一段时间队列就满了,这时候是当增加消费者线程数即可让任务处理跟上生产者的生产速度。