首页>>后端>>java->Semaphore源码解析

Semaphore源码解析

时间:2023-11-30 本站 点击:1

Semaphore,信号量,它保存了一系列的许可(permits),每次调用acquire()都将消耗一个许可,每次调用release()都将归还一个许可。

Semaphore可以用来控制同一时间对共享资源的访问次数上,也就是常说的限流。

首先通过一个小例子简单了解下信号量的用法

/***@ClassNameSemaphoreDemo*@Description:TODO*@Authorlirui*@Date2020/4/27*@VersionV1.0**/publicclassSemaphoreDemo{publicstaticvoidmain(String[]args)throwsInterruptedException{Semaphoresemaphore=newSemaphore(2);newThread(newRunnable(){@Overridepublicvoidrun(){try{semaphore.acquire();System.out.println("线程一获取通行证成功");}catch(InterruptedExceptione){e.printStackTrace();}finally{semaphore.release();System.out.println("线程一释放通行证成功");}}}).start();newThread(newRunnable(){@Overridepublicvoidrun(){try{System.out.println("线程二来抢了");semaphore.acquire();System.out.println("线程二获取通行证成功");}catch(InterruptedExceptione){e.printStackTrace();}finally{semaphore.release();System.out.println("线程二释放通行证成功");}}}).start();newThread(newRunnable(){@Overridepublicvoidrun(){try{System.out.println("线程三来抢了");semaphore.acquire(1);System.out.println("线程三获取通行证成功");}catch(InterruptedExceptione){e.printStackTrace();}finally{semaphore.release();System.out.println("线程三释放通行证成功");}}}).start();}}------结果---线程一获取通行证成功线程一释放通行证成功线程二来抢了线程二获取通行证成功线程二释放通行证成功线程三来抢了线程三获取通行证成功线程三释放通行证成功Processfinishedwithexitcode0

这里创建了三个线程,但信号量只设置了两个许可,因此同一个时刻只会有两个获取到执行权限,剩下一个会等获取到执行权限的线程执行完释放许可,然后才能拿到执行的许可。

类结构

Semaphore中包含了一个实现了AQS的同步器Sync,以及它的两个子类FairSync和NonFairSync,这说明Semaphore也是区分公平模式和非公平模式的。

Semaphore构造方法

publicSemaphore(intpermits){sync=newNonfairSync(permits);}publicSemaphore(intpermits,booleanfair){sync=fair?newFairSync(permits):newNonfairSync(permits);}

从源码可知,默认是实现的非公平模式,如果要实现公平模式,就在构造方法里fair传true。

静态内部类Sync源码

publicclassSemaphoreimplementsjava.io.Serializable{privatestaticfinallongserialVersionUID=-3222578661600680210L;/***AllmechanicsviaAbstractQueuedSynchronizersubclass*/privatefinalSyncsync;abstractstaticclassSyncextendsAbstractQueuedSynchronizer{privatestaticfinallongserialVersionUID=1192457210091910933L;//传入许可次数,放入state中Sync(intpermits){setState(permits);}//获取许可次数finalintgetPermits(){returngetState();}//非公平模式尝试获取许可finalintnonfairTryAcquireShared(intacquires){for(;;){intavailable=getState();intremaining=available-acquires;if(remaining<0||compareAndSetState(available,remaining))returnremaining;}}//释放许可protectedfinalbooleantryReleaseShared(intreleases){for(;;){//获取目前的许可数intcurrent=getState();//加上这次释放的许可intnext=current+releases;//看是否溢出if(next<current)//overflowthrownewError("Maximumpermitcountexceeded");//通过cas方式更新state值if(compareAndSetState(current,next))returntrue;}}

通过Sync类的源码可知

许可是在构造方法时传入的

许可存放在状态变量state中

当state值为0的时候,则无法再获取许可

释放一个许可时,state值加1

acquire()方法 获取许可

publicvoidacquire()throwsInterruptedException{sync.acquireSharedInterruptibly(1);}//AQS.acquireSharedInterruptiblypublicfinalvoidacquireSharedInterruptibly(intarg)throwsInterruptedException{//当前线程是中断状态,抛出异常if(Thread.interrupted())thrownewInterruptedException();//尝试获取锁,返回值大于等于0代表获取到锁//返回值小于0代表没有获取锁,加入到等待队列if(tryAcquireShared(arg)<0)//1.加入阻塞队列//2.自旋不断尝试获取锁//3.被挂起//4.获取到锁之后唤醒下一个线程doAcquireSharedInterruptibly(arg);}//FairSync.tryAcquireSharedprotectedinttryAcquireShared(intacquires){for(;;){//判断当前节点是否是头结点的next节点if(hasQueuedPredecessors())//不是next节点返回-1return-1;//获取当前stateintavailable=getState();//计算剩余许可数量intremaining=available-acquires;//如果剩余许可数量小于0则返回//剩余许可数大于0并成功更新state值,返回剩余许可数if(remaining<0||compareAndSetState(available,remaining))returnremaining;}}

这里默认以公平模式为例进行讲解,acquire方法就是用来获取许可,如果没有获取许可就会加入阻塞队列进行等待,源码里的方法其实大部分我都在前面的文章都详细介绍了,所以这里就不会太细致的讲解。

首先会尝试获取许可,调用FairSync的tryAcquireShared方法去获取,这个方法首先会判断当前节点是否是头结点的next节点,如果满足,再判断将当前许可数减去传进来的许可值,如果大于0就说明可以获取到许可,小于0就获取不到许可

如果获取不到许可就会调用AQS的doAcquireSharedInterruptibly方法,这个方法我在前面文章有详细讲解过,我就不展开来讲了,如果有不了解的可以看下我前面几篇文章的讲解,这个方法主要就是将线程加入阻塞队列,自旋不断获取锁,获取不到会被挂起,等待被唤醒接着自旋获取锁,直到获取成功获取,这里获取成功后还会接着唤醒下一个等待的线程,你会发现这就是共享锁和独占锁的一个区别,再获取锁资源的时候独占锁获取锁之后不会去接着唤醒下一个等待线程,而共享锁会唤醒下一个等待线程。

release 方法 释放许可

publicvoidrelease(){sync.releaseShared(1);}//AQS.releaseShared()publicfinalbooleanreleaseShared(intarg){if(tryReleaseShared(arg)){//唤醒等待的线程doReleaseShared();returntrue;}returnfalse;}//Sync.tryReleaseShared()protectedfinalbooleantryReleaseShared(intreleases){for(;;){//获取目前的许可数intcurrent=getState();//加上这次释放的许可intnext=current+releases;//看是否溢出if(next<current)//overflowthrownewError("Maximumpermitcountexceeded");//通过cas方式更新state值if(compareAndSetState(current,next))returntrue;}}

release方法主要就是做两件事,将释放许可,然后唤醒等待的线程(只会唤醒一个)

首先调用Sync的tryReleaseShared方法将持有许可进行释放,就是当前许可数加上自己的一个许可值,然后更新state值。

接着调用doReleaseShared方法去唤醒下一个等待的线程,这个方法我也是在前面文章有详细讲解,这里就不在展开来讲。

总结

Semaphore,也叫信号量,通常用于控制同一时刻对共享资源的访问上,也就是限流场景

Semaphore的内部实现是基于AQS的共享锁来实现的

Semaphore初始化的时候需要指定许可的次数,许可的次数是存储在state中

获取一个许可时,则state值减1

释放一个许可时,则state值加1

可以动态减少n个许可

可以动态增加n个许可


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/5001.html