Semaphore同步工具类之信号量介绍
什么是Semaphore
- Semaphore是JUC包中的一个很简单的工具类,用来实现多线程下对于资源的同一时刻的访问线程数限制
- Semaphore中存在一个【许可】的概念,即访问资源之前,先要获得许可,如果当前许可数量为0,那么线程阻塞,直到获得许可
- Semaphore内部使用AQS实现,由抽象内部类Sync继承了AQS。因为Semaphore天生就是共享的场景,所以其内部实际上类似于共享锁的实现。
- Semaphore机制是提供给线程抢占式获取许可,所以他可以实现公平或者非公平,类似于ReentrantLock。
- Semaphore提供两个构造方法,用来传入许可数量以及公平或者非公平:
1
2
3
4
5
6public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Semaphore的使用场景
- 限流:并发环境(例如有1000个线程)下只允许100个线程访问数据库某资源
- 亦例如实际的,停车场只有10个车位,目前有15个汽车要来停车,多出的5个需要等其他车辆离开之后才能进行停车
Semaphore源码解读
分为公平与非公平
获取许可的非公平的实现
在抽象类Sync中实现了非公平的消耗“许可”的方法。1
2
3
4
5
6
7
8
9final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
首先获取当前许可数量
判断消耗许可之后的剩余数量是否>=0
是的话执行
compareAndSetState(available, remaining)
设置许可之后返回否则返回的负数会使得其在
doAcquireSharedInterruptibly
中等待许可并挂起,直到被唤醒(这步骤在AQS中实现,如下)
1 | public final void acquireSharedInterruptibly(int arg) |
获取许可的公平实现
首先会在获取许可之前,判断hasQueuedPredecessors()
,是否有线程在等待队列中等待许可,有的话直接返回-1,这个底层实现在AQS中已经实现好了。接下来剩下的操作就和非公平的基本一致了。
1 | static final class FairSync extends Sync { |
许可的释放
许可的释放对于公平和非公平的实现都是一致的,定义在Sync类中。因为是共享式的,释放的时候没有像ReentrantLock一样去判断是否是当前线程来释放许可。释放许可也是采用原子操作将需要释放的许可加回去就完成了。
一旦线程调用releaseShared
释放许可成功,就会同时调用doReleaseShared
方法,其中会对阻塞的线程进行环型,下面是tryReleaseShared
的源码。
1 | protected final boolean tryReleaseShared(int releases) { |
减少许可数量以及将剩余许可数量都取走
Semaphore还提供了几个额外的操作许可的方法
减少许可数量
1
2
3
4
5
6
7
8
9
10final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}取走剩余全部许可
1
2
3
4
5
6
7final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
实际使用信号量的代码实例
如下:使用信号量做了一个限流的功能。
在1000个线程并发访问的情况下,每次限制只有100个线程能够获取到资源
1 | public class SemaphoreStudy { |