Java 阻塞队列实现超时机制

阻塞队列概念

定义:阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。并发包下很多高级同步类的实现都是基于BlockingQueue实现的。
BlockingQueue是个接口,实现有ArrayBlockingQueue、DelayQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue。
本文使用ArrayBlockingQueue:有界的阻塞队列,内部实现是将对象放到一个数组中。jdk源码为:

1
2
3
4
5
6
7
8
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

参数capacity为数组容量;fair为true,插入或删除对象时,按FIFO顺序处理。

阻塞队列工具实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class BlockQueueUtil {
/**
* 阻塞队列超时时间,单位秒
*/
private static final int TIMEOUT = 5;
/**
* 全局MAP,一个UUID标识一个阻塞队列
*/
public static Map<UUID, BlockingQueue> msgHdMap = new ConcurrentHashMap<>();
/**
* 单例
*/
private static BlockQueueUtil bqInstance = null;
public static BlockQueueUtil getInstance() {
if (bqInstance == null) {
bqInstance = new BlockQueueUtil();
}
return bqInstance;
}
/**
* 新建阻塞队列,放入全局map中
* @return uuid
*/
public UUID getMsgMap() {
UUID uuid = UUID.randomUUID();
BlockingQueue<String> msgqueue = new ArrayBlockingQueue<String>(3, true);
msgHdMap.put(uuid, msgqueue);
return uuid;
}
/**
* 获取阻塞队列
* @param uuid
*/
public BlockingQueue getQueue(UUID uuid) {
return msgHdMap.get(uuid);
}
/**
* 入队
*/
public void queueIn(UUID uuid, String responseStr) {
BlockingQueue<String> msgqueue = msgHdMap.get(uuid);
msgqueue.add(responseStr);
}
/**
* 出队
* @param uuid
*/
public String queueOut(UUID uuid) {
BlockingQueue<String> msgqueue = msgHdMap.get(uuid);
String tac = null;
try {
tac = msgqueue.poll(TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// 超时
e.printStackTrace();
} finally {
// recycle map
msgHdMap.remove(uuid);
}
return tac;
}
}

ConcurrentHashMap的key为唯一标志uuid,value为阻塞队列。
上述getMsgMap方法创建容量为3的阻塞队列;出队方法queueOut调用poll方法(检索并删除此队列的头),超时时间设置为5秒。

调用

发起阻塞,等待事件:

1
2
3
4
5
6
7
8
BlockQueueUtil queueUtil = BlockQueueUtil.getInstance();
UUID uuid = queueUtil.getMsgMap();
String tac = queueUtil.queueOut(uuid);
if (tac == null) {
System.out.println("timeout....");
} else {
your normal logic balabala
}

初始,每个阻塞队列为空,执行出队操作,阻塞直到事件触发或者超时。
事件触发:

1
// 阻塞队列,入队 BlockQueueUtil.getInstance().queueIn(UUID.fromString(json.getString("uuid")), json.get("res").toString());

入队后,阻塞队列不为空,queueUtil.queueOut(uuid)被触发,执行后续操作your normal logic balabala。
阻塞队列的应用场景为线程间通信(如生产者消费者模式)或为网络通信(socket或http)。