当前位置: 首页>后端>正文

Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四)

鑻嶇┕涔嬭竟锛屾旦鐎氫箣鎸氾紝鐪版仸涔嬬編锛?鎮熷績鎮熸€э紝鍠勫鍠勭粓锛屾儫鍠勬儫閬擄紒 鈥斺€?鏈濇Э銆婃湞妲垮叜骞磋銆?/p>

Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第1张

鍐欏湪寮€澶?/h2>
Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第2张

鍦ㄥ苟鍙戠紪绋嬮鍩燂紝鏈変袱澶ф牳蹇冮棶棰橈細涓€涓槸浜掓枼锛屽嵆鍚屼竴鏃跺埢鍙厑璁镐竴涓嚎绋嬭闂叡浜祫婧愶紱鍙︿竴涓槸鍚屾锛屽嵆绾跨▼涔嬮棿濡備綍閫氫俊銆佸崗浣溿€?lt;br />涓昏鍘熷洜鏄紝瀵逛簬澶氱嚎绋嬪疄鐜板疄鐜板苟鍙戯紝涓€鐩翠互鏉ワ紝澶氱嚎绋嬮兘瀛樺湪2涓棶棰橈細

  • 绾跨▼涔嬮棿鍐呭瓨鍏变韩锛岄渶瑕侀€氳繃鍔犻攣杩涜鎺у埗锛屼絾鏄姞閿佷細瀵艰嚧鎬ц兘涓嬮檷锛屽悓鏃跺鏉傜殑鍔犻攣鏈哄埗涔熶細澧炲姞缂栫▼缂栫爜闅惧害
  • 杩囧绾跨▼閫犳垚绾跨▼涔嬮棿鐨勪笂涓嬫枃鍒囨崲锛屽鑷存晥鐜囦綆涓?/li>

鍥犳锛屽湪骞跺彂缂栫▼棰嗗煙涓紝涓€鐩存湁涓€涓緢閲嶈鐨勮璁″師鍒欙細 鈥?涓嶈閫氳繃鍐呭瓨鍏变韩鏉ュ疄鐜伴€氫俊锛岃€屽簲璇ラ€氳繃閫氫俊鏉ュ疄鐜板唴瀛樺叡浜€傗€?lt;br />绠€鍗曟潵璇达紝灏辨槸灏藉彲鑳介€氳繃娑堟伅閫氫俊锛岃€屼笉鏄唴瀛樺叡浜潵瀹炵幇杩涚▼鎴栬€呯嚎绋嬩箣闂寸殑鍚屾銆?/p>

鍏冲仴鏈

Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第3张

<br />鏈枃鐢ㄥ埌鐨勪竴浜涘叧閿瘝璇互鍙婂父鐢ㄦ湳璇紝涓昏濡備笅锛?/p>

  • 骞跺彂(Concurrent): 鍦ㄦ搷浣滅郴缁熶腑锛屾槸鎸囦竴涓椂闂存涓湁鍑犱釜绋嬪簭閮藉浜庡凡鍚姩杩愯鍒拌繍琛屽畬姣曚箣闂达紝涓旇繖鍑犱釜绋嬪簭閮芥槸鍦ㄥ悓涓€涓鐞嗘満涓婅繍琛屻€?/li>
  • 骞惰(Parallel): 褰撶郴缁熸湁涓€涓互涓奀PU鏃讹紝褰撲竴涓狢PU鎵ц涓€涓繘绋嬫椂锛屽彟涓€涓狢PU鍙互鎵ц鍙︿竴涓繘绋嬶紝涓や釜杩涚▼浜掍笉鎶㈠崰CPU璧勬簮锛屽彲浠ュ悓鏃惰繘琛屻€?/li>
  • 淇″彿閲?Semaphore): 鏄湪澶氱嚎绋嬬幆澧冧笅浣跨敤鐨勪竴绉嶈鏂斤紝鏄彲浠ョ敤鏉ヤ繚璇佷袱涓垨澶氫釜鍏抽敭浠g爜娈典笉琚苟鍙戣皟鐢紝涔熸槸浣滅郴缁熺敤鏉ヨВ鍐冲苟鍙戜腑鐨勪簰鏂ュ拰鍚屾闂鐨勪竴绉嶆柟娉曘€?/li>
  • 淇″彿閲忔満鍒?Semaphores)锛?鐢ㄦ潵瑙e喅鍚屾/浜掓枼鐨勯棶棰樼殑锛屽畠鏄?965骞?鑽峰叞瀛﹁€?Dijkstra鎻愬嚭浜嗕竴绉嶅崜鏈夋垚鏁堢殑瀹炵幇杩涚▼浜掓枼涓庡悓姝ョ殑鏂规硶銆?/li>
  • 绠$▼(Monitor) : 涓€鑸槸鎸囩鐞嗗叡浜彉閲忎互鍙婂鍏变韩鍙橀噺鐨勬搷浣滆繃绋嬶紝璁╁畠浠敮鎸佸苟鍙戠殑涓€绉嶆満鍒躲€?/li>
  • 浜掓枼(Mutual Exclusion)锛氫竴涓叕鍏辫祫婧愬悓涓€鏃跺埢鍙兘琚竴涓繘绋嬫垨绾跨▼浣跨敤锛屽涓繘绋嬫垨绾跨▼涓嶈兘鍚屾椂浣跨敤鍏叡璧勬簮銆傚嵆灏辨槸鍚屼竴鏃跺埢鍙厑璁镐竴涓嚎绋嬭闂叡浜祫婧愮殑闂銆?/li>
  • 鍚屾(Synchronization)锛氫袱涓垨涓や釜浠ヤ笂鐨勮繘绋嬫垨绾跨▼鍦ㄨ繍琛岃繃绋嬩腑鍗忓悓姝ヨ皟锛屾寜棰勫畾鐨勫厛鍚庢搴忚繍琛屻€傚嵆灏辨槸绾跨▼涔嬮棿濡備綍閫氫俊銆佸崗浣滅殑闂銆?/li>
  • 瀵硅薄姹?Object Pool): 鎸囩殑鏄竴娆℃€у垱寤哄嚭 N 涓璞★紝涔嬪悗鎵€鏈夌殑绾跨▼閲嶅鍒╃敤杩?N 涓璞★紝褰撶劧瀵硅薄鍦ㄨ閲婃斁鍓嶏紝涔熸槸涓嶅厑璁稿叾浠栫嚎绋嬩娇鐢ㄧ殑, 涓€鑸寚淇濆瓨瀹炰緥瀵硅薄鐨勫鍣ㄣ€?/li>

鍩烘湰姒傝堪

鍦↗ava棰嗗煙涓紝鎴戜滑鍙互灏嗛攣澶ц嚧鍒嗕负鍩轰簬Java璇硶灞傞潰(鍏抽敭璇?瀹炵幇鐨勯攣鍜屽熀浜嶫DK灞傞潰瀹炵幇鐨勯攣銆?/p>

Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第4张

鍦↗ava棰嗗煙涓? 灏ゅ叾鏄湪骞跺彂缂栫▼棰嗗煙锛屽浜庡绾跨▼骞跺彂鎵ц涓€鐩存湁涓ゅぇ鏍稿績闂锛氬悓姝ュ拰浜掓枼銆傚叾涓細

  • 浜掓枼(Mutual Exclusion)锛氫竴涓叕鍏辫祫婧愬悓涓€鏃跺埢鍙兘琚竴涓繘绋嬫垨绾跨▼浣跨敤锛屽涓繘绋嬫垨绾跨▼涓嶈兘鍚屾椂浣跨敤鍏叡璧勬簮銆傚嵆灏辨槸鍚屼竴鏃跺埢鍙厑璁镐竴涓嚎绋嬭闂叡浜祫婧愮殑闂銆?/li>
  • 鍚屾(Synchronization)锛氫袱涓垨涓や釜浠ヤ笂鐨勮繘绋嬫垨绾跨▼鍦ㄨ繍琛岃繃绋嬩腑鍗忓悓姝ヨ皟锛屾寜棰勫畾鐨勫厛鍚庢搴忚繍琛屻€傚嵆灏辨槸绾跨▼涔嬮棿濡備綍閫氫俊銆佸崗浣滅殑闂銆?/li>

閽堝瀵逛簬杩欎袱澶ф牳蹇冮棶棰橈紝鍒╃敤绠$▼鏄兘澶熻В鍐冲拰瀹炵幇鐨勶紝鍥犳鍙互璇达紝绠$▼鏄苟鍙戠紪绋嬬殑涓囪兘閽ュ寵銆?lt;br />铏界劧锛孞ava鍦ㄥ熀浜庤娉曞眰闈?synchronized 鍏抽敭瀛?瀹炵幇浜嗗绠$▼鎶€鏈?浣嗘槸浠庝娇鐢ㄦ柟寮忓拰鎬ц兘涓婃潵璇达紝鍐呯疆閿?synchronized 鍏抽敭瀛?鐨勭矑搴︾浉瀵硅繃澶э紝涓嶆敮鎸佽秴鏃跺拰涓柇绛夐棶棰樸€?lt;br />涓轰簡寮ヨˉ杩欎簺闂锛屼粠JDK灞傞潰瀵瑰叾鈥滈噸澶嶉€犺疆瀛愨€濓紝鍦↗DK鍐呴儴瀵瑰叾閲嶆柊璁捐鍜屽畾涔夛紝鐢氳嚦瀹炵幇浜嗘柊鐨勭壒鎬с€?lt;br />鍦↗ava棰嗗煙涓紝浠嶫DK婧愮爜鍒嗘瀽鏉ョ湅锛屽熀浜嶫DK灞傞潰瀹炵幇鐨勯攣澶ц嚧涓昏鍙互鍒嗕负浠ヤ笅4绉嶆柟寮忥細

Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第5张
  • 鍩轰簬Lock鎺ュ彛瀹炵幇鐨勯攣锛欽DK1.5鐗堟湰鎻愪緵鐨凴eentrantLock绫?/li>
  • 鍩轰簬ReadWriteLock鎺ュ彛瀹炵幇鐨勯攣锛欽DK1.5鐗堟湰鎻愪緵鐨凴eentrantReadWriteLock绫?/li>
  • 鍩轰簬AQS鍩虹鍚屾鍣ㄥ疄鐜扮殑閿侊細JDK1.5鐗堟湰鎻愪緵鐨勫苟鍙戠浉鍏崇殑鍚屾鍣⊿emaphore锛孋yclicBarrier浠ュ強CountDownLatch绛?/li>
  • 鍩轰簬鑷畾涔堿PI鎿嶄綔瀹炵幇鐨勯攣锛欽DK1.8鐗堟湰涓彁渚涚殑StampedLock绫?/li>

浠庨槄璇绘簮鐮佷笉闅惧彂鐜帮紝鍦↗ava SDK 骞跺彂鍖呬富瑕侀€氳繃AbstractQueuedSynchronizer(AQS)瀹炵幇澶氱嚎绋嬪悓姝ユ満鍒剁殑灏佽涓庡畾涔夛紝鑰岄€氳繃Lock 鍜?Condition 涓や釜鎺ュ彛鏉ュ疄鐜扮绋嬶紝鍏朵腑 Lock 鐢ㄤ簬瑙e喅浜掓枼闂锛孋ondition 鐢ㄤ簬瑙e喅鍚屾闂銆?/p>

涓€.AQS鍩虹鍚屾鍣ㄥ熀鏈悊璁?/h2>

鍦↗ava棰嗗煙涓?鍚屾鍣ㄦ槸涓撻棬涓哄绾跨▼骞跺彂璁捐鐨勫悓姝ユ満鍒讹紝涓昏鏄绾跨▼骞跺彂鎵ц鏃剁嚎绋嬩箣闂撮€氳繃鏌愮鍏变韩鐘舵€佹潵瀹炵幇鍚屾锛屽彧鏈夊綋鐘舵€佹弧瓒宠繖绉嶆潯浠舵椂绾跨▼鎵嶅線涓嬫墽琛岀殑涓€绉嶅悓姝ユ満鍒躲€?/p>

Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第6张

<br />涓€涓爣鍑嗙殑AQS鍚屾鍣ㄤ富瑕佹湁鍚屾鐘舵€佹満鍒讹紝绛夊緟闃熷垪锛屾潯浠堕槦鍒楋紝鐙崰妯″紡锛屽叡浜ā寮忕瓑浜斿ぇ鏍稿績瑕佺礌缁勬垚銆?lt;br />鍦↗ava棰嗗煙涓紝JDK鐨凧UC(java.util.concurrent.)鍖呬腑鎻愪緵浜嗗悇绉嶅苟鍙戝伐鍏凤紝浣嗘槸澶ч儴鍒嗗悓姝ュ伐鍏风殑瀹炵幇鍩轰簬AbstractQueuedSynchronizer绫诲疄鐜帮紝鍏跺唴閮ㄧ粨鏋勪富瑕佸涓嬶細

  • 鍚屾鐘舵€佹満鍒?Synchronization Status)锛氫富瑕佺敤浜庡疄鐜伴攣(Lock)鏈哄埗锛屾槸鎸囧悓姝ョ姸鎬侊紝鍏惰姹傚浜庣姸鎬佺殑鏇存柊蹇呴』鍘熷瓙鎬х殑
  • 绛夊緟闃熷垪(Wait Queue)锛氫富瑕佺敤浜庡瓨鏀剧瓑寰呯嚎绋嬭幏鍙栧埌鐨勯攣璧勬簮锛屽苟涓旀妸绾跨▼缁存姢鍒颁竴涓狽ode(鑺傜偣)閲岄潰鍜岀淮鎶や竴涓潪闃诲鐨凜HL Node FIFO(鍏堣繘鍏堝嚭)闃熷垪锛屼富瑕佹槸閲囩敤鑷棆閿?CAS鎿嶄綔鏉ヤ繚璇佽妭鐐规彃鍏ュ拰绉婚櫎鐨勫師瀛愭€ф搷浣溿€?/li>
  • 鏉′欢闃熷垪(Condition Queue)锛氱敤浜庡疄鐜伴攣鐨勬潯浠舵満鍒讹紝涓€鑸富瑕佹槸鎸囨浛鎹⑩€滅瓑寰?閫氱煡鈥濆伐浣滄満鍒讹紝涓昏鏄€氳繃ConditionObject瀵硅薄瀹炵幇Condition鎺ュ彛鎻愪緵鐨勬柟娉曞疄鐜般€?/li>
  • 鐙崰妯″紡(Exclusive Mode)锛氫富瑕佺敤浜庡疄鐜扮嫭鍗犻攣锛屼富瑕佹槸鍩轰簬闈欐€佸唴閮ㄧ被Node鐨勫父閲忔爣蹇桬XCLUSIVE鏉ユ爣璇嗚鑺傜偣鏄嫭鍗犳ā寮?/li>
  • 鍏变韩妯″紡(Shared Mode)锛氫富瑕佺敤浜庡疄鐜板叡浜攣锛屼富瑕佹槸鍩轰簬闈欐€佸唴閮ㄧ被Node鐨勫父閲忔爣蹇桽HARED鏉ユ爣璇嗚鑺傜偣鏄叡浜ā寮?/li>
鎴戜滑鍙互寰楀埌涓€涓瘮杈冮€氱敤鐨勫苟鍙戝悓姝ュ伐鍏峰熀纭€妯″瀷锛屽ぇ鑷村寘鍚涓嬪嚑涓唴瀹癸紝鍏朵腑锛?lt;br />
Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第7张
  • 鏉′欢鍙橀噺(Conditional Variable)锛?鍒╃敤绾跨▼闂村叡浜殑鍙橀噺杩涜鍚屾鐨勪竴绉嶅伐浣滄満鍒?/li>
  • 鍏变韩鍙橀噺((Shared Variable))锛氫竴鑸寚瀵硅薄瀹炰綋瀵硅薄鐨勬垚鍛樺彉閲忓拰灞炴€?/li>
  • 闃诲闃熷垪(Blocking Queue)锛氬叡浜彉閲?Shared Variable)鍙婂叾瀵瑰叡浜彉閲忕殑鎿嶄綔缁熶竴灏佽
  • 绛夊緟闃熷垪(Wait Queue)锛氭瘡涓潯浠跺彉閲忛兘瀵瑰簲鏈変竴涓瓑寰呴槦鍒?Wait Queue),鍐呴儴闇€瑕佸疄鐜板叆闃熸搷浣?Enqueue)鍜屽嚭闃熸搷浣?Dequeue)鏂规硶
  • 鍙橀噺鐘舵€佹弿杩版満(Synchronization Status)锛氭弿杩版潯浠跺彉閲忓拰鍏变韩鍙橀噺涔嬮棿鐘舵€佸彉鍖栵紝鍙堝彲浠ョО鍏朵负鍚屾鐘舵€?/li>
  • 宸ヤ綔妯″紡(Operation Mode)锛?绾跨▼璧勬簮鍏锋湁鎺掍粬鎬э紝鍥犳瀹氫箟鐙崰妯″紡鍜屽叡浜ā寮忎袱绉嶅伐浣滄ā寮?/li>

缁间笂鎵€杩帮紝鏉′欢鍙橀噺鍜岀瓑寰呴槦鍒楃殑浣滅敤鏄В鍐崇嚎绋嬩箣闂寸殑鍚屾闂锛涘叡浜彉閲忎笌闃诲闃熷垪鐨勪綔鐢ㄦ槸瑙e喅绾跨▼涔嬮棿鐨勪簰鏂ラ棶棰樸€?/p>

浜? JDK鏄惧紡閿佺粺涓€姒傚康妯″瀷

鍦ㄥ苟鍙戠紪绋嬮鍩燂紝鏈変袱澶ф牳蹇冮棶棰橈細涓€涓槸浜掓枼锛屽嵆鍚屼竴鏃跺埢鍙厑璁镐竴涓嚎绋嬭闂叡浜祫婧愶紱鍙︿竴涓槸鍚屾锛屽嵆绾跨▼涔嬮棿濡備綍閫氫俊銆佸崗浣溿€?/p>

Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第8张

缁煎悎Java棰嗗煙涓殑骞跺彂閿佺殑鍚勭瀹炵幇涓庡簲鐢ㄥ垎鏋愭潵鐪嬶紝涓€鎶婇攣鎴栬€呬竴绉嶉攣锛屽熀鏈笂閮戒細鍖呭惈浠ヤ笅鍑犱釜鏂归潰锛?/p>

  • 閿佺殑鍚屾鍣ㄥ伐浣滄満鍒讹細涓昏鏄€冭檻鍏变韩妯″紡杩樻槸鐙韩妯″紡锛屾槸鍚︽敮鎸佽秴鏃舵満鍒讹紝浠ュ強鏄惁鏀寔瓒呮椂鏈哄埗锛?/li>
  • 閿佺殑鍚屾鍣ㄥ伐浣滄ā寮忥細涓昏鏄熀浜嶢QS鍩虹鍚屾鍣ㄥ皝瑁呭唴閮ㄥ悓姝ュ櫒锛屾槸鍚﹁€冭檻鍏钩/闈炲叕骞虫ā寮忥紵
  • 閿佺殑鐘舵€佸彉閲忔満鍒讹細 涓昏閿佺殑鐘舵€佽缃紝鏄惁鍏变韩鐘舵€佸彉閲忥紵
  • 閿佺殑闃熷垪灏佽瀹氫箟锛氫富瑕佹槸鎸囩瓑寰呴槦鍒楀拰鏉′欢闃熷垪锛屾槸鍚﹂渶瑕佹潯浠堕槦鍒楁垨鑰呯瓑寰呴槦鍒楀畾涔夛紵
  • 閿佺殑搴曞眰瀹炵幇鎿嶄綔锛?涓昏鏄寚搴曞眰CL閿佸拰CAS鎿嶄綔锛屾槸鍚﹂渶瑕佽€冭檻鑷棆閿佹垨鑰匔AS鎿嶄綔瀹炰緥瀵硅薄鏂规硶锛?/li>
  • 閿佺殑缁勫悎瀹炵幇鏂伴攣锛?涓昏鏄熀浜庣嫭鍗犻攣鍜屽叡浜攣锛屾槸鍚﹁€冭檻瀵瑰簲API鑷畾涔夋搷浣滃疄鐜帮紵

缁间笂鎵€杩帮紝澶ц嚧鍙互鏍规嵁涓婅堪杩欎簺鏂瑰悜锛屾垜浠究鍙互娓呮馃墣锔忕煡閬揓ava棰嗗煙涓悇绉嶉攣瀹炵幇鐨勫熀鏈悊璁烘椂鍜屽疄鐜版€濇兂銆?/p>

鍏?CountDownLatch(闂攣)鐨勮璁′笌瀹炵幇

鍦↗ava棰嗗煙涓紝CountDownLatch(闂攣)鏄拡瀵逛簬Java澶氱嚎绋嬪苟鍙戞帶鍒朵腑鍊掕鏁板櫒鐨勫叿浣撴暟閲忥紝涓昏鏄噰鐢ㄩ€掑噺璁℃暟鏂瑰紡鐨勫€掕鏁板櫒鎬濇兂鍜屽熀浜嶢QS鍩虹鍚屾鍣ㄦ潵瀹炵幇鐨勪竴绉嶅悓姝ュ櫒宸ュ叿绫汇€?/p>

Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第9张

CountDownLatch(闂攣)鏄疛ava澶氱嚎绋嬪苟鍙戜腑鏈€甯歌鐨勪竴绉嶅悓姝ュ櫒锛屼粠閿佺殑鎬ц川涓婃潵鐪嬶紝灞炰簬鍏变韩閿侊紝鍏跺姛鑳界浉褰撲簬涓€涓绾跨▼鐜涓嬬殑鍊掓暟闂ㄩ棭銆?lt;br />CountDownLatch閫氳繃瀹氫箟涓€涓€掕鏁板櫒锛屽湪骞跺彂鐜涓嬬敱绾跨▼杩涜閫掑噺1鎿嶄綔锛屽綋璁℃暟鍊煎彉涓?涔嬪悗锛岃await鏂规硶闃诲鐨勭嚎绋嬪皢浼氬敜閱掋€?lt;br />閫氳繃CountDownLatch鍙互瀹炵幇绾跨▼闂寸殑璁℃暟鍚屾銆?/p>

1. 璁捐鎬濇兂

Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第10张

涓€鑸潵璇达紝閫氳繃瀹氫箟涓€涓€掕鏁板櫒锛屼负浜嗚鏌愪釜绾跨▼鎴栬€呭涓嚎绋嬪湪鏌愪釜杩愯鑺傜偣涓婄瓑寰匩涓潯浠堕兘婊¤冻鍚庯紝鎵嶈鎵€鏈夌殑绾跨▼缁х画寰€涓嬫墽琛岋紝鍏朵腑鍊掕鏁板櫒鐨勬暟閲忓垯涓篘锛屾瘡婊¤冻涓€涓潯浠讹紝鍊掕鏁板櫒灏变緷娆¢€愭笎閫掑噺1锛岀洿鍒癗-1=0鐨勬椂锛屾墍鏈夌瓑寰呯殑绾跨▼鎵嶅線涓嬬户缁墽琛屻€?lt;br />CountDownLatch绫绘渶鏃╂槸鍦↗DK1.5鐗堟湰鎻愪緵鐨勶紝浠庤璁℃€濇兂涓婃潵鐪嬶紝涓昏鍖呮嫭鍊掕鏁板櫒鐨勫悓姝ュ櫒锛屾帶鍒堕樆濉炵瓑寰呯殑鏂规硶锛屽€掕鏁板櫒鐨勯€掑噺鎿嶄綔鏂规硶绛?涓牳蹇冭绱犮€傚叾涓細

  • 鍊掕鏁板櫒鐨勫悓姝ュ櫒锛氬熀浜嶢QS鍩虹鎶借薄闃熷垪鍚屾鍣ㄥ皝瑁呭唴缃疄鐜颁竴涓潤鎬佺殑鍐呯疆鍚屾绫?涓昏鐢ㄤ簬璁剧疆鍊掕鏁板櫒鐨勫垵濮嬪€间互鍙婂畾鍒禔QS鍩虹鍚屾鍣ㄧ殑鑾峰彇鍜岄噴鏀惧叡浜攣銆?/li>
  • 鍊掕鏁板櫒鐨勫垵濮嬪€硷細 涓€鑸湪鏋勫缓CountDownLatch绫绘椂鎸囧畾锛岃〃绀虹殑鏄渶瑕佺瓑寰呮潯浠剁殑涓暟锛屽嵆灏辨槸鍊掕鏁板櫒鐨勫叿浣撶殑璧勬簮鏁伴噺Source(N)銆?/li>
  • 鎺у埗绾跨▼闃诲绛夊緟鐨勬柟娉曪細瀹氫箟涓€涓帶鍒剁嚎绋嬮樆濉炵瓑寰呯殑鏂规硶锛屽綋鍊掕鏁板櫒鐨勫叿浣撶殑璧勬簮鏁伴噺 Source(N)>0鏃讹紝璋冪敤鏂规硶浣垮叾绾跨▼杩涘叆闃诲绛夊緟鐘舵€併€?/li>
  • 鍊掕鏁板櫒鐨勯€掑噺鎿嶄綔鏂规硶锛氬畾涔変竴涓€掕鏁板櫒鐨勯€掑噺鎿嶄綔鏂规硶锛岃皟鐢ㄦ柟娉曞氨浼氭妸鍊掕鏁板櫒閫掑噺1锛屽綋鍊掕鏁板櫒鐨勫叿浣撶殑璧勬簮鏁伴噺 Source(N)-1=0鏃讹紝鎵€鏈夌瓑寰呯殑绾跨▼鎵嶅線涓嬬户缁墽琛屻€?/li>

绠€鍗曟潵璇达紝CountDownLatch涓昏鏄鏌愪釜绾跨▼鎴栬€呭涓嚎绋嬶紝绛夊緟鍏朵粬绾跨▼瀹屾垚鏌愪欢浜嬫儏鎴栬€呮煇涓换鍔$粨鏉熶箣鍚庢墠鑳界户缁墽琛屻€?/p>

2. 鍩烘湰瀹炵幇

Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第11张

鍦–ountDownLatch绫荤殑JDK1.8鐗堟湰涓紝瀵逛簬CountDownLatch鐨勫熀鏈疄鐜板涓嬶細


public class CountDownLatch {

    private final Sync sync;

    /**
     * CountDownLatch閿?鏋勯€犱竴涓€掕鏁板櫒
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**
     * CountDownLatch閿?鍩轰簬AQS瀹氫箟鏀寔鍚屾鍣ㄥ疄鐜?
     */
    private  static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860 L;

        //......鍏朵粬鏂规硶浠g爜
    }

    /**
     * CountDownLatch閿?绾跨▼绛夊緟鏂规硶
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * CountDownLatch閿?鍊掕鏁板櫒閫掑噺鎿嶄綔
     */
    public void countDown() {
        sync.releaseShared(1);
    }

    //... 鍏朵粬浠g爜
}
  • 鍊掕鏁板櫒鍚屾鍣細鍩轰簬AQS鍩虹瀹氫箟鏀寔鍚屾鍣ㄥ疄鐜颁竴涓潤鎬佺鏈夊寲鐨勫悓姝ュ櫒Sync绫伙紝鍏朵腑瀹氫箟浜嗚幏鍙栧拰閲婃斁鍏变韩閿佺殑涓や釜鏂规硶
  • 绾跨▼绛夊緟鏂规硶锛氫富瑕佹槸鎻愪緵浜嗕竴涓猘wait()鏂规硶锛屽叾鏈川鏄皟鐢ㄧ殑鏄疉QS鍩虹鍚屾鍣ㄤ腑鐨刟cquireSharedInterruptibly(int arg)鏂规硶锛屽惁鍒檛hrows InterruptedException寮傚父
  • 鍊掕鏁板櫒閫掑噺鎿嶄綔鏂规硶锛?涓昏鏄彁渚涗簡涓€涓猚ountDown()鏂规硶锛屽叾鏈川鏄皟鐢ㄧ殑鏄疉QS鍩虹鍚屾鍣ㄤ腑鐨剅eleaseShared(int arg) 鏂规硶

2.1 鍩轰簬AQS鍚屾鍣ㄥ皝瑁呴潤鎬佸唴閮⊿ync鎶借薄绫?/h4>

        /**
     * CountDownLatch閿?鍩轰簬AQS鍚屾鍣ㄥ皝瑁呬竴涓唴閮ㄧ殑鍚屾鍣?
     */
private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        /**
     * CountDownLatch閿?鑾峰彇鍏变韩閿佹柟娉?
     */
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) 1 : -1;
        }

        /**
     * CountDownLatch閿?閲婃斁鍏变韩閿佹柟娉?
     */
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
                
    }
  • 瀹炵幇鏂瑰紡锛?涓昏鍩轰簬AQS灏佽鐨勫唴閮ㄩ潤鎬佹娊璞ync鍚屾绫诲疄鐜帮紝浣跨敤鐨凙QS鐨勫叡浜ā寮?/li>
  • 涓昏鏂规硶锛?涓昏瀹氬埗閫傞厤鎻愪緵浜唗ryAcquireShared()鍜宼ryReleaseShared()鏂规硶锛屽嵆灏辨槸tryAcquireShared()鐢ㄤ簬鑾峰彇鍏变韩閿侊紝tryReleaseShared()鏂规硶鐢ㄤ簬閲婃斁鍏变韩閿侊紝鍏朵腑锛?
    • 鑾峰彇鍏变韩閿乼ryAcquireShared()鏂规硶锛氶鍏堣幏鍙栫姸鎬佸彉閲弒tatus锛岃繖閲屾槸鎸囧€掕鏁板櫒涓殑鏁伴噺锛屽綋status=0鏃讹紝杩斿洖鍊?1锛岃〃绀鸿幏鍙栭攣鎴愬姛锛涘惁鍒欙紝status !=0 鏃讹紝杩斿洖鍊?-1锛岃〃绀鸿幏鍙栧叡浜攣澶辫触杩涜鍏ラ槦銆?/li>
    • 閲婃斁鍏变韩閿乼ryReleaseShared()鏂规硶锛?閫氳繃鑷棆鏉ュ疄鐜伴€掑噺鎿嶄綔锛屽叾涓細鑾峰彇鐘舵€佸彉閲弒tatus锛屽皢鍏堕€掑噺1鍚庝娇鐢╟ompareAndSetState(c, nextc)鏂规硶閫氳繃CAS淇敼鐘舵€佸€?/li>
  • 閿佽幏鍙栨柟寮忥細 涓昏鏄埄鐢╣etCount()鏉ヨ幏鍙栧€掕鏁板櫒涓殑鏁伴噺锛屽悓鏃惰繕鍙互鍒╃敤鏋勯€犳柟娉曟寚瀵间竴涓€掕鏁板櫒涓殑鏁伴噺銆?/li>

3. 鍏蜂綋瀹炵幇

Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第12张

public class CountDownLatch {

  private final Sync sync;

  /**
* CountDownLatch閿?鍩轰簬AQS鍩虹鍚屾鍣ㄥ疄鐜颁竴涓唴閮ㄥ悓姝ュ櫒
*/
  private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374 L;

    Sync(int count) {
      setState(count);
    }

    int getCount() {
      return getState();
    }

    protected int tryAcquireShared(int acquires) {
      return (getState() == 0) 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
      // Decrement count; signal when transition to zero
      for (;;) {
        int c = getState();
        if (c == 0)
          return false;
        int nextc = c - 1;
        if (compareAndSetState(c, nextc))
          return nextc == 0;
      }
    }
  }


  /**
* CountDownLatch閿?鏋勯€犱竴涓€掕鏁板櫒
*/
  public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
  }

  /**
* CountDownLatch閿?鍩轰簬AQS瀹氫箟鏀寔鍚屾鍣ㄥ疄鐜?
*/
  private static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860 L;

    //......鍏朵粬鏂规硶浠g爜
  }

  /**
* CountDownLatch閿?绾跨▼绛夊緟鏂规硶
*/
  public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
  }

  /**
* CountDownLatch閿?杩斿洖褰撳墠璁℃暟鍣?
*/
  public long getCount() {
    return sync.getCount();
  }

  /**
* CountDownLatch閿?绾跨▼绛夊緟鏂规硶(鏀寔瓒呮椂鏈哄埗)
*/
  public boolean await(long timeout, TimeUnit unit)
  throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  }

  /**
* CountDownLatch閿?鍊掕鏁板櫒閫掑噺鎿嶄綔
*/
  public void countDown() {
    sync.releaseShared(1);
  }
}
  • 鍊掕鏁板垵濮嬪€硷細閫氳繃鏋勯€犳柟娉旵ountDownLatch(int count)鎸囧畾涓€涓€掕鏁板櫒鐨勫垵濮嬪€硷紝鍏跺繀椤诲ぇ浜?锛屽惁鍒欎細throw new IllegalArgumentException("count < 0")
  • 绾跨▼绛夊緟鏂规硶锛?涓昏鎻愪緵浜哸wait() 鏂规硶鍜宎wait(long timeout, TimeUnit unit)鏂规硶锛屽叾涓細
    • 鏃犲弬鏁癮wait() 鏂规硶锛?涓€鑸粯璁ょ殑鏂规硶锛屽叾鏈川鏄皟鐢ˋQS鍚屾鍣ㄤ腑鐨刟cquireSharedInterruptibly()鏂规硶锛屼富瑕佽〃绀烘敮鎸佷腑鏂満鍒?/li>
    • 鏈夊弬鏁癮wait(long timeout, TimeUnit unit)鏂规硶锛?鏄敤浜庡疄鐜拌秴鏃舵満鍒讹紝鍏舵湰璐ㄦ槸璋冪敤AQS鍚屾鍣ㄤ腑鐨則ryAcquireSharedNanos(int arg, long nanosTimeout)鏂规硶
  • 鍊掕鏁伴€掑噺鎿嶄綔鏂规硶锛氫富瑕佹槸countDown() 鏂规硶锛?鍏舵湰璐ㄦ槸璋冪敤AQS鍚屾鍣ㄤ腑鐨剅eleaseShared(int arg) 鏂规硶锛屾牳蹇冨疄鐜版槸AQS鍩虹鍚屾鍣ㄧ殑doReleaseShared鏂规硶銆?/li>
  • 鍏朵粬鏂规硶锛?涓昏鏄痝etCount() 鏂规硶锛岀敤鏉ヨ幏鍙栧€掕鏁颁釜鏁帮紝鍏舵湰璐ㄦ槸璋冪敤AQS鍚屾鍣ㄤ腑getCount()鏂规硶锛屾潵鑾峰彇鐘舵€佸彉閲?/li>

缁间笂鎵€杩帮紝浠庝竴瀹氭剰涔変笂璁诧紝CountDownLatch鏄竴绉嶅叡浜攣锛屽睘浜嶢QS鍩虹鎶借薄闃熷垪鍚屾鍣ㄤ腑鍏变韩妯″紡瀛靛寲鐨勪骇鐗╋紝娌℃湁鏀寔鍏钩妯″紡涓庨潪鍏钩妯″紡鐨勫疄鐜般€?/p>

涓?CyclicBarrier(寰幆灞忛殰)鐨勮璁′笌瀹炵幇

鍦↗ava棰嗗煙涓紝CyclicBarrier(寰幆灞忛殰)鏄拡瀵逛簬Java澶氱嚎绋嬪苟鍙戞帶鍒朵腑鍊掕鏁板櫒鐨勭嚎绋嬫暟閲忥紝涓昏鏄噰鐢ㄩ€掑噺璁℃暟鏂瑰紡鐨勫€掕鏁板櫒鎬濇兂鍜屽熀浜嶢QS鍩虹鍚屾鍣ㄥ疄鐜扮殑ReentrantLock閿佹潵瀹炵幇鐨勪竴绉嶅悓姝ュ櫒宸ュ叿绫汇€?/p>

Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第13张

CyclicBarrier(寰幆灞忛殰)鏄疛ava涓€氳繃瀵圭嚎绋嬮瀹氫箟璁剧疆涓€涓睆闅滐紝鍙湁褰撳埌杈惧睆闅滅殑绾跨▼鏁伴噺鍒拌揪鎸囧畾鐨勬渶澶у睆闅滄椂锛屽睆闅滄墠浼氳杩欎簺绾跨▼閫氳繃鎵ц銆?lt;br />浠庝竴瀹氭剰涔変笂鏉ヨ锛岃繖閲岀殑灞忛殰鏈川涓婅繕鏄竴涓€掕鏁板櫒锛屽€掕鏁板櫒鐨勬渶澶ч檺搴︽敮鎸佺殑鏁伴噺灏辨槸鎴戜滑涓虹嚎绋嬭缃睆闅滃ぇ灏忥紝鍏跺伐浣滃師鐞嗕笌CountDownLatch(闂攣)绫讳技锛岄兘鏄€氳繃璁╃嚎绋嬮樆濉炵瓑寰呮椂锛屽€掕鏁板櫒鎵ц閫掑噺1杩愮畻銆?lt;br />浣嗘槸涓嶤ountDownLatch涓嶅悓鏄紝CyclicBarrier(寰幆灞忛殰)鏄熀浜嶳eentrantLock(鍙噸鍏ラ攣)鏉ュ疄鐜扮殑锛屾洿鍑嗙‘鐨勮锛孋yclicBarrier鏄ReentrantLock鐨勫簲鐢ㄥ疄渚嬨€?/p>

1. 璁捐鎬濇兂

Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第14张

涓€鑸潵璇达紝閫氳繃瀹氫箟涓€涓€掕鏁板櫒锛屼负浜嗚鏌愪釜绾跨▼鎴栬€呭涓嚎绋嬪湪鏌愪釜杩愯鑺傜偣涓婄害鏉烴涓嚎绋嬶紝闇€瑕佽鎸囧畾鏁伴噺鐨勭嚎绋嬪叡鍚屽埌杈炬煇涓€涓妭鐐逛箣鍚庯紝杩欎簺绾跨▼鎵嶄細涓€璧疯鎵ц銆?lt;br />CyclicBarrier(寰幆灞忛殰)鏈€鏃╂槸鍦↗DK1.5鐗堟湰涓彁渚涚殑锛屼粠璁捐鎬濇兂涓婃潵鐪嬶紝涓昏鍖呮嫭鍊掕鏁板櫒鐨勬渶澶у睆闅滐紝鎺у埗闃诲绛夊緟鐨勬柟娉曪紝鍊掕鏁板櫒鐨勯€掑噺鎿嶄綔鏂规硶锛屽拰瑙﹀彂鐐圭嚎绋嬩换鍔$瓑4涓牳蹇冭绱犮€傚叾涓細

  • 鍊掕鏁板櫒鐨勫悓姝ュ櫒锛?涓昏鍩轰簬ReentrantLock鏉ュ疄鐜版帶鍒剁嚎绋嬪璞★紝鍏舵湰璐ㄨ繕鏄熀浜嶢QS鍩虹鍚屾鍣ㄥ疄鐜般€?/li>
  • 鍊掕鏁板櫒鐨勬渶澶у睆闅滄暟閲忥細涓€鑸槸鍦ㄦ瀯寤篊yclicBarrier(寰幆灞忛殰)瀵硅薄鏄瀹氫箟璁剧疆锛岃〃绀洪渶瑕佸湪鏌愪釜杩愯鑺傜偣涓婄害鏉熺殑绾跨▼鏁伴噺銆?/li>
  • 鎺у埗绾跨▼闃诲绛夊緟鐨勬柟娉曪細瀹氫箟涓€涓柟娉曪紝浣垮緱瀹炵幇闃诲绾跨▼璁╁叾杩涘叆绛夊緟鐘舵€併€?/li>
  • 鍊掕鏁板櫒鐨勯€掑噺鎿嶄綔鏂规硶锛氬畾涔変竴涓柟娉曪紝浣垮緱璁╁€掕鏁板櫒杩涜閫掑噺1杩愮畻锛岀洿鍒拌揪鍒板睆闅滄椂锛岀瓑寰呯殑绾跨▼鎵嶇户缁墽琛屻€?/li>
  • 瑙﹀彂鐐圭嚎绋嬩换鍔★細涓€鑸寚鐨勬槸褰撴寚瀹氭暟閲忕殑绾跨▼杈惧埌璁剧疆鐨勫睆闅滄椂锛屾墠浼氬幓瑙﹀彂鎵ц鐨勪换鍔°€?/li>

绠€鍗曟潵璇达紝CyclicBarrier(寰幆灞忛殰)鏄澶氫釜绾跨▼浜掔浉绛夊緟锛岀洿鍒拌揪鍒颁竴涓悓姝ョ殑杩愯鑺傜偣銆傚啀缁х画涓€璧锋墽琛屻€?/p>

2. 鍩烘湰瀹炵幇

Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第15张

鍦–yclicBarrier绫荤殑JDK1.8鐗堟湰涓紝瀵逛簬CountDownLatch鐨勫熀鏈疄鐜板涓嬶細


public class CyclicBarrier {

    /** CyclicBarrier閿佲€斿睆闅渓ock瀹炰綋 */
    private final ReentrantLock lock = new ReentrantLock();

    /** CyclicBarrier閿佲€斿睆闅滄潯浠堕槦鍒?*/
    private final Condition trip = lock.newCondition();

    /**  CyclicBarrier閿佲€斿睆闅滄渶澶у€?*/
    private final int parties;

    /**  CyclicBarrier閿佲€斿睆闅滆Е鍙戠嚎绋嬩换鍔$洰鏍?*/
    private final Runnable barrierCommand;

    /**  CyclicBarrier閿佲€斿綋鍓嶈鏁板櫒鐨勬渶澶у€煎睆闅滃疄渚?*/
    private Generation generation = new Generation();

    /**  CyclicBarrier閿佲€斿綋鍓嶈鏁板櫒鐨勬渶澶у€煎睆闅滃疄渚?*/
    private int count;

    /**  CyclicBarrier閿佲€斿睆闅滃疄渚?*/
    private static class Generation {
        boolean broken = false;
    }

    /**  CyclicBarrier閿佲€旀瀯閫犱竴涓睆闅滃疄渚?涓嶅甫瑙﹀彂浠诲姟鐨? */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    /**  CyclicBarrier閿佲€旀瀯閫犱竴涓睆闅滃疄渚?甯﹁Е鍙戜换鍔$殑) */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    /**  CyclicBarrier閿佲€旀棤鍙傛暟鏋勯€犱竴涓瓑寰呮柟娉?榛樿妯″紡) */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    /**  CyclicBarrier閿佲€旀湁鍙傛暟鏋勯€犱竴涓瓑寰呮柟娉?鏀寔瓒呮椂鏈哄埗) */
    public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
    BrokenBarrierException,
    TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

    /**  CyclicBarrier閿佲€旀洿鏂扮姸鎬佸彉閲?*/
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

    /**  CyclicBarrier閿佲€旈樆濉炲睆闅?*/
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
    //...鍏朵粬浠g爜
}
  • 棰勫畾涔夎缃睆闅滄渶澶у€硷細 涓昏鏄€氳繃鍙橀噺parties鏉ュ疄鐜伴瀹氫箟璁剧疆灞忛殰鏈€澶у€?/li>
  • 璁剧疆褰撳墠灞忛殰鏁伴噺锛氫富瑕佹槸閫氳繃鍙橀噺count鏉ュ疄鐜?/li>
  • 鎺у埗绾跨▼鐨勫璞″疄渚嬶細 涓昏鏄€氳繃ReentrantLock鍜孋ondition鏉ユ帶鍒剁嚎绋嬮棿閫氫俊
  • 瑙﹀彂鐩爣浠诲姟瀵硅薄锛?涓昏鏄€氳繃Runable鏉ュ畾涔塨arrierCommand鍙橀噺
  • 鎻愪緵浜嗕袱涓瀯閫犳柟娉曪細閮介渶瑕侀瀹氫箟鎸囧畾灞忛殰鏈€澶у€紁arties锛屽叾涓竴涓渶瑕佷紶鍏arrierAction瑙﹀彂鐐逛换鍔?/li>
  • 绾跨▼闃诲绛夊緟鏂规硶锛氫富瑕佹彁渚涗簡2涓猘wait()鏂规硶锛屽叾涓細
    • 鏃犲弬鏁癮wait()鏂规硶锛氶粯璁ゅ鐞嗘柟寮忥紝涓嶆敮鎸佽秴鏃舵満鍒讹紝鍏舵牳蹇冨鐞嗛€昏緫鍦╠owait(boolean timed, long nanos)鏂规硶涓疄鐜?/li>
    • 鏈夊弬鏁癮wait()鏂规硶锛氭寚瀹氬弬鏁板鐞嗭紝鏀寔瓒呮椂鏈哄埗锛屽叾鏍稿績澶勭悊閫昏緫鍦╠owait(boolean timed, long nanos)鏂规硶涓疄鐜?/li>
  • 灞忛殰璁剧疆鍏冲仴鏂规硶锛氫富瑕佹槸breakBarrier() 鏉ュ疄鐜帮紝鍏朵腑锛?
    • 閫氱煡鍒拌揪灞忛殰鐨勬墍鏈夌嚎绋嬶細涓昏鏄€氳繃Condition涓殑signalAll()鏉ラ€氱煡灞忛殰涓墍鏈夌嚎绋嬪凡缁忔弧瓒虫潯浠?/li>
    • 灞忛殰璁剧疆锛氶粯璁ら瀹氫箟璁剧疆灞忛殰鏈€澶у€间笌璁剧疆褰撳墠灞忛殰鏁扮浉鍚岋紝涓昏璁剧疆count = parties
    • 鏇存柊灞忛殰鐘舵€侊細涓昏鏄€氳繃generation.broken = true鏉ュ疄鐜?/li>
  • 鏇存柊灞忛殰鐨勭姸鎬侊細涓昏鏄彁渚涗簡nextGeneration() 鏂规硶锛岃〃绀哄凡缁忓埌杈鹃瀹氫箟璁剧疆灞忛殰鏈€澶у€硷紝鍏朵腑锛?
    • 閫氱煡鍒拌揪灞忛殰鐨勬墍鏈夌嚎绋嬶細涓昏鏄€氳繃Condition涓殑signalAll()鏉ラ€氱煡灞忛殰涓墍鏈夌嚎绋嬪凡缁忔弧瓒虫潯浠?/li>
    • 鍑嗗涓嬩竴杞睆闅滆缃細鎰忓懗鐫€棰勫畾涔夎缃睆闅滄渶澶у€间笌璁剧疆褰撳墠灞忛殰鏁扮浉鍚岋紝涓昏璁剧疆count = parties
    • 閲嶇疆灞忛殰鐘舵€侊細涓昏鏄€氳繃generation = new Generation()鏉ュ疄鐜?/li>
涓€鑸潵璇达紝鍋囪鎴戜滑鍏佽鎺у埗鐨勬渶澶х嚎绋嬫暟閲忎负N锛岄瀹氫箟璁剧疆灞忛殰鏈€澶у€间负Parties(N), 褰撳墠灞忛殰鐨勭嚎绋嬫暟閲忎负Current(N) ,褰撳墠灞忛殰涓殑绛夊緟绾跨▼鏁伴噺涓篧aiting(N),閭d箞鎴戜滑浼氬緱鍒颁竴涓绠楀叕寮忥細<br />

2.1 鏋勯€燝eneration灞忛殰瀹炰緥鏍囪

private static class Generation {
    boolean broken = false;
}

涓昏鏄瀯閫犱簡涓€涓潤鎬佺鏈夊寲鐨凣eneration绫伙紝鍏朵腑瀹氫箟浜嗕竴涓猙roken鍙橀噺鏉ヤ綔涓哄睆闅滄爣璁帮紝榛樿鍒濆鍊间负false锛岃〃绀鸿繕娌¤揪鍒板睆闅滄渶澶у€笺€?/p>

2.1 绾跨▼闃诲绛夊緟鏍稿績dowait鏂规硶

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
    TimeoutException {

    // [1].瀹炰緥鍖栨瀯寤篟eentrantLock鐨勫璞?
    final ReentrantLock lock = this.lock;

    // [2].閫氳繃lock()鑾峰彇閿佹垨鑰呰鍔犻攣鎿嶄綔
    lock.lock();

    try {

        // [3].瀹炰緥鍖栨瀯寤篏eneration灞忛殰瀹炰緥瀵硅薄
        final Generation g = generation;

        // [4].鍒ゆ柇Generation灞忛殰瀹炰緥鏍囪鐘舵€?
        if (g.broken)
            throw new BrokenBarrierException();

        // [5].鍒ゆ柇Thread鏄寘鍚腑鏂爣蹇椾綅
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        // [6].瀵瑰€掕鏁板櫒鐨勫睆闅滄暟閲忛€掑噺1杩愮畻
        int index = --count;

        // [7].渚濇嵁缁撴灉index == 0琛ㄧず褰撳墠鎸囧畾鐨勭嚎绋嬫暟閲忓埌杈惧睆闅滄渶澶у€硷紝闇€瑕佽Е鍙慠unnable浠诲姟
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;

                // 杩涜涓嬩竴杞睆闅滆缃?
                nextGeneration();

                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // [7].鑷棆鎿嶄綔

        for (;;) {
            try {
                // 鍒ゆ柇鏄惁瓒呮椂
                if (!timed)

                    trip.await();
                else if (nanos > 0L)
                    // 杩涜涓嬩竴杞睆闅滆缃?
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // 鏄惁鍙戠敓绾跨▼涓柇
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            // 濡傛灉绛夊緟鏃堕棿瓒呰繃鎸囧畾瓒呮椂鏃堕棿锛宼hrow new TimeoutException
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {

        // 鏈€鍚庨噴鏀鹃攣鎿嶄綔
        lock.unlock();
    }
}
  • 鍔犻攣鎿嶄綔锛?瀹炰緥鍖栨瀯寤篟eentrantLock鐨勫璞★紝閫氳繃lock()鏂规硶杩涜鍔犻攣鎿嶄綔
  • 鍒ゆ柇灞忛殰瀹炰緥鏍囪鐘舵€侊細瀹炰緥鍖栨瀯寤篏eneration瀹炰緥鏍囪锛屽垽鏂睆闅滃疄渚嬫爣璁扮姸鎬佹槸鍚︿竴鑷达紝濡傛灉涓嶄竴鑷村垯throw new BrokenBarrierException();
  • 鍒ゆ柇褰撳墠绾跨▼鏄惁琚腑鏂細 鍒ゆ柇Thread鏄寘鍚腑鏂爣蹇椾綅锛屽鏋滀腑鏂璽hrow new InterruptedException()骞惰皟鐢╞reakBarrier()閲嶆柊璁剧疆灞忛殰
  • 灞忛殰鍊掕鏁板櫒閫掑噺杩愮畻锛氬鍊掕鏁板櫒鐨勫睆闅滄暟閲忛€掑噺1杩愮畻锛屽嵆灏辨槸瀵瑰綋鍓嶅€掕鏁板櫒鐨勫綋鍓嶅€煎噺鍘?
  • 瑙﹀彂鑺傜偣绾跨▼浠诲姟锛?褰撳墠鍊掕鏁板櫒鐨勫綋鍓嶅€间负0鏃讹紝闇€瑕佽Е鍙慠unnable浠诲姟锛屽苟璋冪敤nextGeneration鏂规硶寮€鍚笅涓€杞搷浣滐紱鍚﹀垯锛屽綋鍓嶅€掕鏁板櫒鐨勫綋鍓嶅€间笉涓?鏃讹紝璋冪敤awaitNanos(nanos)鏂规硶杩涘叆绛夊緟鐘舵€?/li>
  • 鑷棆鎿嶄綔鍒ゆ柇瓒呮椂锛?濡傛灉浣跨敤浜嗚秴鏃跺弬鏁帮紝璋冪敤awaitNanos(nanos)鏂规硶杩涘叆绛夊緟鐘舵€侊紝鍏朵腑濡傛灉鍙戠敓涓柇鍒欒皟鐢═hread.currentThread().interrupt()璁剧疆涓柇鏍囪銆傚鏋滅瓑寰呮椂闂?gt; 鎸囧畾瓒呮椂鏃堕棿锛屾姏鍑簍hrow new TimeoutException()寮傚父
  • 閲婃斁閿侊細 閫氳繃unlock()鏂规硶杩涜瑙i攣鎿嶄綔锛屽苟閲婃斁閿?/li>

3. 鍏蜂綋瀹炵幇

Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第16张

鍦–yclicBarrier绫荤殑JDK1.8鐗堟湰涓紝瀵逛簬CyclicBarrier鐨勫叿浣撳疄鐜板涓嬶細

public class CyclicBarrier {

    /** CyclicBarrier閿佲€斿睆闅渓ock瀹炰綋 */
    private final ReentrantLock lock = new ReentrantLock();

    /** CyclicBarrier閿佲€斿睆闅滄潯浠堕槦鍒?*/
    private final Condition trip = lock.newCondition();

    /**  CyclicBarrier閿佲€斿睆闅滄渶澶у€?*/
    private final int parties;

    /**  CyclicBarrier閿佲€斿睆闅滆Е鍙戠嚎绋嬩换鍔$洰鏍?*/
    private final Runnable barrierCommand;

    /**  CyclicBarrier閿佲€斿綋鍓嶈鏁板櫒鐨勬渶澶у€煎睆闅滃疄渚?*/
    private Generation generation = new Generation();

    /**  CyclicBarrier閿佲€斿綋鍓嶈鏁板櫒鐨勬渶澶у€煎睆闅滃疄渚?*/
    private int count;

    /**  CyclicBarrier閿佲€斿睆闅滃疄渚?*/
    private static class Generation {
        boolean broken = false;
    }

    /**  CyclicBarrier閿佲€旀瀯閫犱竴涓睆闅滃疄渚?涓嶅甫瑙﹀彂浠诲姟鐨? */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    /**  CyclicBarrier閿佲€旀瀯閫犱竴涓睆闅滃疄渚?甯﹁Е鍙戜换鍔$殑) */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    /**  CyclicBarrier閿佲€旀棤鍙傛暟鏋勯€犱竴涓瓑寰呮柟娉?榛樿妯″紡) */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    /**  CyclicBarrier閿佲€旀湁鍙傛暟鏋勯€犱竴涓瓑寰呮柟娉?鏀寔瓒呮椂鏈哄埗) */
    public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
    BrokenBarrierException,
    TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

    /**  CyclicBarrier閿佲€旀洿鏂扮姸鎬佸彉閲?*/
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

    /**  CyclicBarrier閿佲€旈樆濉炲睆闅?*/
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

    /**  CyclicBarrier閿佲€旈樆濉炲睆闅?*/
    private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
    TimeoutException {

        // [1].瀹炰緥鍖栨瀯寤篟eentrantLock鐨勫璞?
        final ReentrantLock lock = this.lock;

        // [2].閫氳繃lock()鑾峰彇閿佹垨鑰呰鍔犻攣鎿嶄綔
        lock.lock();

        try {

            // [3].瀹炰緥鍖栨瀯寤篏eneration灞忛殰瀹炰緥瀵硅薄
            final Generation g = generation;

            // [4].鍒ゆ柇Generation灞忛殰瀹炰緥鏍囪鐘舵€佹槸鍚︿负true
            if (g.broken)
                throw new BrokenBarrierException();

            // [5].鍒ゆ柇Thread鏄寘鍚腑鏂爣蹇椾綅
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            // [6].瀵瑰€掕鏁板櫒鐨勫睆闅滄暟閲忛€掑噺1杩愮畻
            int index = --count;

            // [7].渚濇嵁缁撴灉index == 0琛ㄧず褰撳墠鎸囧畾鐨勭嚎绋嬫暟閲忓埌杈惧睆闅滄渶澶у€硷紝闇€瑕佽Е鍙慠unnable浠诲姟
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;

                    // 杩涜涓嬩竴杞睆闅滆缃?
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // [7].鑷棆鎿嶄綔

            for (;;) {
                try {
                    // 鍒ゆ柇鏄惁瓒呮椂
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                // 濡傛灉绛夊緟鏃堕棿瓒呰繃鎸囧畾瓒呮椂鏃堕棿锛宼hrow new TimeoutException
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {

            // 鏈€鍚庨噴鏀鹃攣鎿嶄綔
            lock.unlock();
        }
    }


    /**  CyclicBarrier閿佲€旇幏鍙栧綋鍓嶇瓑灞忛殰绛夊緟鏁伴噺 */
    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }

    /**  CyclicBarrier閿佲€旇幏鍙栧綋鍓嶇瓑灞忛殰鏁伴噺 */
    public int getParties() {
        return parties;
    }

    /**  CyclicBarrier閿佲€斿垽鏂綋鍓嶅睆闅?*/
    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

    /**  CyclicBarrier閿佲€旈噸缃睆闅滄暟閲?*/
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

}

涓昏闇€瑕佹敞鎰忓涓嬪嚑涓柟娉曪紝閮芥槸鍩轰簬ReentrantLock鏉ュ疄鐜板姞閿佸拰瑙i攣鎿嶄綔鐨勶紝鍏朵腑锛?/p>

  • getNumberWaiting()鏂规硶锛?鑾峰彇褰撳墠灞忛殰涓瓑寰呯殑绾跨▼鏁伴噺
  • reset() 鏂规硶锛氬綋涓€杞睆闅滄搷浣滅粨鏉燂紝闇€瑕侀噸缃睆闅滀腑鏈€澶х嚎绋嬫暟閲?/li>
  • isBroken() 鏂规硶锛氬垽鏂槸鍚﹀埌杈惧睆闅滄渶澶у€?/li>

缁间笂鎵€杩帮紝浠庝竴瀹氭剰涔変笂璁诧紝CyclicBarrier鏄竴绉嶅彲閲嶅叆閿侊紝灞炰簬ReentrantLock鐨勫簲鐢ㄥ疄渚嬶紝鍏朵腑鍔犻攣鍜岃В閿佹搷浣滈兘鏄嫭鍗犳ā寮忕殑銆?/p>

鍏?Semaphore(淇″彿閲?鐨勮璁′笌瀹炵幇

鍦↗ava棰嗗煙涓紝Semaphore(淇″彿閲?鏄拡瀵逛簬Java澶氱嚎绋嬪苟鍙戞帶鍒朵腑瀹炵幇瀵瑰叕鍏辫祫婧愮殑绾跨▼鏁伴噺杩涜骞跺彂鍚屾椂璁块棶鎺у埗锛屼富瑕佹槸閲囩敤鎸囧畾涓€涓渶澶ц鍙暟鐨勬€濇兂鍜屽熀浜嶢QS鍩虹鍚屾鍣ㄦ潵瀹炵幇鐨勪竴绉嶅悓姝ュ櫒宸ュ叿绫汇€?/p>

Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第17张

Semaphore鍙互鐢ㄦ潵鎺у埗鍦ㄥ悓涓€鏃跺埢璁块棶鍏变韩璧勬簮鐨勭嚎绋嬫暟閲忥紝閫氳繃鍗忚皟鍚勪釜绾跨▼浠ヤ繚璇佸叡浜祫婧愮殑鍚堢悊浣跨敤銆?lt;br />Semaphore缁存姢浜嗕竴缁勮櫄鎷熻鍙紝瀹冪殑鏁伴噺鍙互閫氳繃鏋勯€犲櫒鐨勫弬鏁版寚瀹氥€?lt;br />绾跨▼鍦ㄨ闂叡浜祫婧愬墠锛屽繀椤昏皟鐢⊿emaphore鐨刟cquire()鏂规硶鑾峰緱璁稿彲锛屽鏋滆鍙暟閲忎负0锛岃绾跨▼灏变竴鐩撮樆濉炪€?lt;br />绾跨▼鍦ㄨ闂叡浜祫婧愬悗锛屽繀椤昏皟鐢⊿emaphore鐨剅elease()鏂规硶閲婃斁璁稿彲銆?/p>

1. 璁捐鎬濇兂

Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第18张

涓€鑸潵璇达紝閫氳繃瀹氫箟涓€涓€掕鏁板櫒锛屼负浜嗘帶鍒舵渶澶歂涓嚎绋嬪悓鏃惰闂叕鍏辫祫婧愶紝鍏惰鏁板櫒鐨勬渶澶у€糓ax(N)鏄璁稿彲鐨勬渶澶歂涓嚎绋嬫暟閲忥紝鍗冲氨鏄鍙殑鏈€澶у€糔銆?lt;br />Semaphore绫绘渶鏃╂槸鍦↗DK1.5鐗堟湰鎻愪緵鐨勶紝浠庤璁℃€濇兂涓婃潵鐪嬶紝涓昏鍖呮嫭鍊掕鏁板櫒鐨勬渶澶ц鍙暟锛屽悓姝ュ櫒宸ヤ綔妯″紡锛岃幏鍙栭攣鏂规硶锛岄噴鏀鹃攣鏂规硶绛?涓牳蹇冭绱犮€傚叾涓細

  • 鍚屾鍣ㄥ伐浣滄ā寮忥細鍩轰簬AQS鍩虹鎶借薄闃熷垪鍚屾鍣ㄥ皝瑁呭唴缃疄鐜颁竴涓潤鎬佺殑鍐呯疆鍚屾鍣ㄦ娊璞$被锛岀劧鍚庡熀浜庤繖涓娊璞$被鍒嗗埆瀹炵幇浜嗗叕骞冲悓姝ュ櫒鍜岄潪鍏钩鍚屾鍣紝鐢ㄦ潵鎸囧畾鍜屾弿杩板悓姝ュ櫒宸ヤ綔妯″紡鏄叕骞虫ā寮忚繕鏄潪鍏钩妯″紡銆?/li>
  • 鍏钩/闈炲叕骞虫ā寮忥細涓昏鎻忚堪鐨勬槸澶氫釜绾跨▼鍦ㄥ悓鏃惰幏鍙栭攣鏃舵槸鍚︽寜鐓у厛鍒板厛寰楃殑椤哄簭鑾峰彇閿侊紝濡傛灉鏄垯涓哄叕骞虫ā寮忥紝鍚﹀垯涓洪潪鍏钩妯″紡銆?/li>
  • 鑾峰彇閿佹柟娉曪細涓昏瀹氫箟浜嗕竴涓猯ock()鏂规硶鏉ヨ幏鍙栭攣锛岃〃绀哄亣濡傞攣宸茬粡琚叾浠栫嚎绋嬪崰鏈夋垨鎸佹湁锛屽叾褰撳墠鑾峰彇閿佺殑绾跨▼鍒欒繘鍏ョ瓑寰呯姸鎬併€?/li>
  • 閲婃斁閿佹柟娉曪細涓昏瀹氫箟浜嗕竴涓猽nlock()鏂规硶鏉ラ噴鏀鹃攣锛岃〃绀哄亣濡傞攣宸茬粡琚叾浠栫嚎绋嬫斁寮冩垨閲婃斁锛屽叾褰撳墠鑾峰彇閿佺殑绾跨▼鍒欒幏寰楄閿併€?/li>

2. 鍩烘湰瀹炵幇

Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第19张

鍦↗DK1.8鐗堟湰涓紝瀵逛簬Semaphore鐨勫熀鏈疄鐜板涓嬶細


public class Semaphore implements java.io.Serializable {

    private static final long serialVersionUID = -3222578661600680210 L;

    /**
     * Semaphore閿? 灏佽鍚屾鍣?
     */
    private final Sync sync;

    /**
     * Semaphore閿? 灏佽鍚屾鍣?
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        //....鍏朵粬浠g爜
    }

    /**
     * Semaphore閿? 鏋勯€犱竴涓护鐗岃鍙?榛樿闈炲叕妯″紡)
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
     * Semaphore閿? 鏋勯€犱竴涓护鐗岃鍙?鍙€夊叕骞?闈炲叕妯″紡)
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair new FairSync(permits) : new NonfairSync(permits);
    }

    /**
     * Semaphore閿? 鑾峰彇閿佹柟娉?榛樿涓€涓笖鍙腑鏂満鍒?
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
        
    /**
     * Semaphore閿? 鑾峰彇閿佹柟娉?鍙€夋寚瀹氬涓笖鍙腑鏂満鍒?
     */
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    /**
     * Semaphore閿? 鑾峰彇閿佹柟娉?榛樿澶氫釜涓斾笉鍙腑鏂満鍒?
     */
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }
    
    /**
     * Semaphore閿? 鑾峰彇閿佹柟娉?鎸囧畾澶氫釜涓斾笉鍙腑鏂満鍒?
     */
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    /**
     * Semaphore閿?閲婃斁閿佹柟娉?榛樿涓€涓?
     */
    public void release() {
        sync.releaseShared(1);
    }

    /**
     * Semaphore閿?閲婃斁閿佹柟娉?鍙€夋寚瀹氬涓?
     */
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

}
  • 鍐呴儴鍚屾鍣細鍩轰簬AQS鍩虹鍚屾鍣ㄥ皝瑁呭拰瀹氫箟浜嗕竴涓潤鎬佸唴閮⊿ync鎶借薄绫伙紝鍏朵腑鎶借薄浜嗕竴涓唴缃攣lock()鏂规硶
  • 鍚屾鍣ㄥ伐浣滄ā寮忥細涓昏鎻愪緵浜?2涓瀯閫犳柟娉曪紝鍏朵腑鏃犲弬鏁版瀯閫犳柟娉曡〃绀虹殑鏄粯璁ょ殑宸ヤ綔妯″紡锛屾湁鍙傛暟鏋勯€犳柟娉曚富瑕佷緷鎹弬鏁版潵瀹炵幇鎸囧畾鐨勫伐浣滄ā寮?/li>
  • 鑾峰彇閿佹柟娉曪細 涓昏鎻愪緵浜?涓熀浜巃cquire鏂规硶锛岀敤浜庤幏鍙栭攣鍏变韩閿侊紝鍏朵腑锛?
    • 鏃犲弬鏁癮cquire()鏂规硶锛氳幏鍙栧叡浜攣鐨勪竴鑸ā寮忥紝榛樿鎸囧畾涓€涓鍙拰鏀寔鍙腑鏂満鍒?/li>
    • 鏈夊弬鏁癮cquire()鏂规硶锛氳幏鍙栧叡浜攣鐨勬寚瀹氭ā寮忥紝鍙€夋寚瀹氬涓鍙笖鏀寔鍙腑鏂満鍒?/li>
    • 鏃犲弬鏁癮cquireUninterruptibly()鏂规硶锛氳幏鍙栧叡浜攣鐨勬寚瀹氭ā寮忥紝榛樿鎸囧畾涓€涓鍙笖涓嶆敮鎸佸彲涓柇鏈哄埗
  • 閲婃斁閿佹柟娉曪細 涓昏鏄彁渚涗簡2涓猺elease()鏂规硶鐢ㄤ簬閲婃斁閿佸叡浜攣锛屽叾涓細
    • 鏃犲弬鏁皉elease()鏂规硶锛氶噴鏀惧叡浜攣鐨勪竴鑸ā寮忥紝榛樿鎸囧畾涓€涓鍙拰鏀寔鍙腑鏂満鍒?/li>
    • 鏈夊弬鏁皉elease()鏂规硶锛氶噴鏀惧叡浜攣鐨勬寚瀹氭ā寮忥紝鍙€夋寚瀹氬涓鍙笖鏀寔鍙腑鏂満鍒?/li>

2.1 鍩轰簬AQS鍚屾鍣ㄥ皝瑁呴潤鎬佸唴閮⊿ync鎶借薄绫?/h4>

    /**
     * Semaphore閿? 鍩轰簬AQS鍩虹鍚屾鍣ㄥ皝瑁呭悓姝ュ櫒
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933 L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        /**
         * Semaphore閿? 闈炲叕骞虫ā寮忚幏鍙栧叡浜攣
         */
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        /**
         * Semaphore閿? 閲婃斁鍏变韩閿?
         */
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        /**
         * Semaphore閿? 鑷棆+compareAndSetState閫氳繃CAS鎿嶄綔璁$畻鎿嶄綔浠ょ墝璁稿彲鏁?
         */
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        /**
         * Semaphore閿? 鑷棆+compareAndSetState閫氳繃CAS鎿嶄綔閲嶇疆浠ょ墝璁稿彲鏁?
         */
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }
  • 瀹炵幇鏂瑰紡锛氫富瑕佹槸鍩轰簬AQS鍩虹鍚屾鍣ㄥ皝瑁呬簡涓€涓潤鎬佺殑鐨凷ync鎶借薄绫伙紝閫氳繃鏋勯€犳柟娉曟寚瀹氫竴涓渶澶х殑浠ょ墝璁稿彲鏁伴噺
  • 涓昏鏂规硶锛氫富瑕佹槸鐪嬪叡浜攣鐨勮幏鍙杗onfairTryAcquireShared()鏂规硶鍜岄噴鏀鹃攣tryReleaseShared()鏂规硶锛屽叾涓細
    • 鑾峰彇閿乶onfairTryAcquireShared()鏂规硶锛氶潪鍏钩妯″紡涓嬭幏鍙栧叡浜攣锛屽埄鐢ㄨ嚜鏃?compareAndSetState()鏂规硶閫氳繃CAS鎿嶄綔锛屼繚璇佸苟鍙戜慨鏀逛护鐗岃鍙暟閲?/li>
    • 閲婃斁閿乼ryReleaseShared(i)鏂规硶锛?鍏钩/闈炲叕骞虫ā寮忎笅閲婃斁鍏变韩閿侊紝鍒╃敤鑷棆+compareAndSetState()鏂规硶閫氳繃CAS鎿嶄綔閲婃斁锛屼細鎶婇噴鏀剧殑浠ょ墝璁稿彲鏁伴噺澧炲姞鍒板綋鍓嶅墿浣欑殑浠ょ墝璁稿彲鏁伴噺涓€?/li>
  • 浠ょ墝璁稿彲鎿嶄綔鏂规硶锛氫富瑕佹彁渚涗簡drainPermits() 鏂规硶 鍜宺educePermits() 鏂规硶锛屽叾涓細
    • drainPermits() 鏂规硶锛氫富瑕佹槸鍒╃敤鑷棆+compareAndSetState()鏂规硶閫氳繃CAS鎿嶄綔閲嶇疆浠ょ墝璁稿彲鏁?/li>
    • reducePermits() 鏂规硶锛氫富瑕佹槸鑷棆+compareAndSetState)鏂规硶閫氳繃CAS鎿嶄綔閫掑噺璁$畻鎿嶄綔浠ょ墝璁稿彲鏁?/li>
  • 鑾峰彇閿佹柟寮忥細浠ょ墝璁稿彲鏁伴噺QS鍩虹鍚屾鍣ㄧ姸鎬佸彉閲忓搴旓紝閫氳繃getPermits() 鏂规硶鏉ヨ幏鍙栦护鐗岃鍙暟閲忥紝鏈川鏄皟鐢ˋQS鍩虹鍚屾鍣ㄤ腑鐨刧etState()鏉ヨ幏鍙栫姸鎬佸彉閲忋€?/li>
鐗瑰埆鎸囧嚭鐨勬槸锛岃繖閲岀殑闈炲叕骞虫ā寮忎富瑕佹弿杩扮殑鏄紝鍦ㄤ护鐗岃鍙暟閲忓厑璁哥殑鎯呭喌涓嬶紝璁╂墍鏈夌嚎绋嬭繘琛岃嚜鏃嬫搷浣滐紝鍏跺疄灏辨槸涓嶅叧蹇冪嚎绋嬪埌鏉ョ殑椤哄簭锛屽皢鍏ㄩ儴绾跨▼鏀惧埌涓€璧峰幓鍙備笌绔炰簤浠ょ墝璁稿彲銆?lt;br />鍏朵腑锛屼富瑕佽繕鍒╃敤compareAndSetState鏂规硶鏉ヨ繘琛孋AS鎿嶄綔锛屼繚璇佷慨鏀逛护鐗岃鍙暟閲忕殑鍘熷瓙鎬ф搷浣溿€?lt;br />涓€鑸潵璇达紝鍋囪鎴戜滑鍏佽鎺у埗鐨勬渶澶х嚎绋嬫暟閲忎负N锛屽墿浣欎护鐗岃鍙暟閲忎负Remanent(N), 褰撳墠鍙敤浠ょ墝璁稿彲鏁伴噺涓篊urrent(N) , 娑堣€椾护鐗岃鍙暟閲忎负Reduction(N)锛岄偅涔堟垜浠細寰楀埌涓€涓绠楀叕寮忥細<br />

<br />鍗冲氨鎰忓懗鐫€锛屽墿浣欎护鐗岃鍙暟閲忕瓑浜庡綋鍓嶅彲鐢ㄤ护鐗岃鍙暟閲忎笌娑堣€椾护鐗岃鍙暟閲忎箣宸€?lt;br />鐢辨鍙锛屽湪鍏钩/闈炲叕骞虫ā寮忎笅锛屾垜浠浜庡浜庤幏鍙栭攣鍜岄噴鏀鹃攣鏃讹紝瀵逛簬鍓╀綑浠ょ墝璁稿彲鏁伴噺Remanent(N)璁$畻閮芥弧瓒充互涓嬪叕寮忥細

  • 棣栧厛锛屽湪绾跨▼鍦ㄨ闂叡浜祫婧愬墠锛屾垜浠彲浠ュ厑璁哥殑鏈€澶у€间负Available(N),鑷棆鑾峰彇閿佺殑鏁伴噺涓篈cquires(N)锛岄偅涔堟垜浠湪鑾峰彇閿佹椂锛?/li>
  • 鍏舵锛屽湪绾跨▼鍦ㄨ闂叡浜祫婧愬悗锛岃嚜鏃嬮噴鏀鹃攣鐨勬暟閲忎负Releases(N)锛岄偅涔堟垜浠湪閲婃斁閿佹椂锛?/li>

<br />褰撶劧锛岄渶瑕佹敞鎰忕殑鐨勪竴涓棶棰橈紝灏辨槸褰撳墿浣欎护鐗岃鍙暟閲廟emanent(N) < 0鏃讹紝琛ㄧず褰撳墠绾跨▼浼氳繘鍏ラ樆濉炵瓑寰呯姸鎬併€?/p>

2.2 鍩轰簬Sync鎶借薄绫诲皝瑁匜airSync鍏钩鍚屾鍣?/h4>

    /**
     * Semaphore閿? 鍩轰簬Sync鎶借薄绫诲皝瑁匜airSync鍏钩鍚屾鍣?
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944 L;

        /**
         * Semaphore閿? Semaphore閿? 閫氳繃鏋勯€犳柟娉曟寚瀹氫护鐗岃鍙?
         */
        FairSync(int permits) {
            super(permits);
        }

        /**
         * Semaphore閿? Semaphore閿? 鍏钩妯″紡閲婃斁鍏变韩閿?
         */
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
  • 瀹炵幇鏂瑰紡锛?涓昏鏄湪鍩轰簬闈欐€佸唴閮⊿ync鎶借薄绫绘潵瀹炵幇锛屾瀯閫犱簡涓€涓彲鎸囧畾澶у皬鐨勭殑浠ょ墝璁稿彲
  • 涓昏鏂规硶锛?涓昏鏄彁渚涗簡涓€涓猼ryAcquireShared鏂规硶锛屽叾涓埄鐢╤asQueuedPredecessors()鏉ヤ繚璇佸叕骞虫€?/li>
  • 宸ヤ綔鏈哄埗锛?閫氳繃鍩轰簬AQS鍩虹鍚屾鍣ㄤ腑鐨勭瓑寰呴槦鍒楁潵瀹炵幇鍏钩鏈哄埗

闇€瑕佹敞鎰忕殑鏄紝鍦ㄦ湭杈惧埌鏈€澶х殑浠ょ墝璁稿彲鏁伴噺鏃讹紝鎵€鏈夌嚎绋嬮兘涓嶄細杩涘叆绛夊緟闃熷垪涓€?/p>

2.3 鍩轰簬Sync鎶借薄绫诲皝瑁匩onfairSync闈炲叕骞冲悓姝ュ櫒


    /**
     * Semaphore閿? 鍩轰簬Sync鎶借薄绫诲皝瑁匩onfairSync闈炲叕骞冲悓姝ュ櫒
     */
    static final class NonfairSync extends Sync {

        private static final long serialVersionUID = -2694183684443567898 L;

        /**
         * Semaphore閿? Semaphore閿? 閫氳繃鏋勯€犳柟娉曟寚瀹氫护鐗岃鍙?
         */
        NonfairSync(int permits) {
            super(permits);
        }


        /**
         * Semaphore閿? Semaphore閿? 闈炲叕骞虫ā寮忛噴鏀惧叡浜攣
         */
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
  • 瀹炵幇鏂瑰紡锛?涓昏鏄湪鍩轰簬闈欐€佸唴閮⊿ync鎶借薄绫绘潵瀹炵幇锛屾瀯閫犱簡涓€涓彲鎸囧畾澶у皬鐨勭殑浠ょ墝璁稿彲
  • 涓昏鏂规硶锛?涓昏鏄彁渚涗簡涓€涓猼ryAcquireShared鏂规硶锛屽叾涓富瑕佹槸璋冪敤浜嗛潤鎬佸唴閮⊿ync鎶借薄绫籲onfairTryAcquireShared鏂规硶銆?/li>
  • 宸ヤ綔鏈哄埗锛?閫氳繃鑷棆鎿嶄綔璁╂墍鏈夌嚎绋嬬珵浜夎幏鍙栦护鐗岃鍙紝鏈川杩樻槸閲囩敤浜咥QS鍩虹鍚屾鍣ㄤ腑闂叆绛栫暐鍒版墦鐮村叕骞崇殑

3. 鍏蜂綋瀹炵幇

Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第20张

鍦↗DK1.8鐗堟湰涓紝瀵逛簬Semaphore鐨勫叿浣撳疄鐜板涓嬶細


public class Semaphore implements java.io.Serializable {

    private static final long serialVersionUID = -3222578661600680210 L;

    /**
     * Semaphore閿? 灏佽鍚屾鍣?
     */
    private final Sync sync;


    /**
     * Semaphore閿? 鍩轰簬AQS鍩虹鍚屾鍣ㄥ皝瑁呭悓姝ュ櫒
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933 L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        /**
         * Semaphore閿? 闈炲叕骞虫ā寮忚幏鍙栧叡浜攣
         */
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        /**
         * Semaphore閿? 閲婃斁鍏变韩閿?
         */
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        /**
         * Semaphore閿? 鑷棆+compareAndSetState閫氳繃CAS鎿嶄綔璁$畻鎿嶄綔浠ょ墝璁稿彲鏁?
         */
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        /**
         * Semaphore閿? 鑷棆+compareAndSetState閫氳繃CAS鎿嶄綔閲嶇疆浠ょ墝璁稿彲鏁?
         */
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }


    /**
     * Semaphore閿? 鍩轰簬Sync鎶借薄绫诲皝瑁匜airSync鍏钩鍚屾鍣?
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944 L;

        /**
         * Semaphore閿? Semaphore閿? 閫氳繃鏋勯€犳柟娉曟寚瀹氫护鐗岃鍙?
         */
        FairSync(int permits) {
            super(permits);
        }

        /**
         * Semaphore閿? Semaphore閿? 鍏钩妯″紡閲婃斁鍏变韩閿?
         */
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

    /**
     * Semaphore閿? 鍩轰簬Sync鎶借薄绫诲皝瑁匩onfairSync闈炲叕骞冲悓姝ュ櫒
     */
    static final class NonfairSync extends Sync {

        private static final long serialVersionUID = -2694183684443567898 L;

        /**
         * Semaphore閿? Semaphore閿? 閫氳繃鏋勯€犳柟娉曟寚瀹氫护鐗岃鍙?
         */
        NonfairSync(int permits) {
            super(permits);
        }


        /**
         * Semaphore閿? Semaphore閿? 闈炲叕骞虫ā寮忛噴鏀惧叡浜攣
         */
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * Semaphore閿? 鏋勯€犱竴涓护鐗岃鍙?榛樿闈炲叕妯″紡)
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
     * Semaphore閿? 鏋勯€犱竴涓护鐗岃鍙?鍙€夊叕骞?闈炲叕妯″紡)
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair new FairSync(permits) : new NonfairSync(permits);
    }

    /**
     * Semaphore閿? 鑾峰彇閿佹柟娉?榛樿涓€涓笖鍙腑鏂満鍒?
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * Semaphore閿? 鑾峰彇閿佹柟娉?鍙€夋寚瀹氬涓笖鍙腑鏂満鍒?
     */
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    /**
     * Semaphore閿? 鑾峰彇閿佹柟娉?榛樿澶氫釜涓斾笉鍙腑鏂満鍒?
     */
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }
    
    /**
     * Semaphore閿? 鑾峰彇閿佹柟娉?鎸囧畾澶氫釜涓斾笉鍙腑鏂満鍒?
     */
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    /**
     * Semaphore閿?閲婃斁閿佹柟娉?榛樿涓€涓?
     */
    public void release() {
        sync.releaseShared(1);
    }

    /**
     * Semaphore閿?閲婃斁閿佹柟娉?鍙€夋寚瀹氬涓?
     */
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }


    /**
     * Semaphore閿?灏濊瘯鑾峰彇閿佹柟娉?榛樿涓€涓?
     */
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

    /**
     * Semaphore閿?灏濊瘯鑾峰彇閿佹柟娉?鍙€夋寚瀹氬涓?
     */
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }

    /**
     * Semaphore閿?灏濊瘯鑾峰彇閿佹柟娉?鍙€夋寚瀹氬涓苟涓旀敮鎸佽秴鏃舵満鍒?
     */
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }

    /**
     * Semaphore閿?灏濊瘯鑾峰彇閿佹柟娉?榛樿涓€涓苟涓旀敮鎸佽秴鏃舵満鍒?
     */
    public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * Semaphore閿?缁熻鍙互浠ょ墝璁稿彲鏁?
     */
    public int availablePermits() {
        return sync.getPermits();
    }

    /**
     * Semaphore閿?閲嶇疆浠ょ墝璁稿彲鏁?
     */
    public int drainPermits() {
        return sync.drainPermits();
    }

    /**
     * Semaphore閿?閫掑噺璁$畻浠ょ墝璁稿彲鏁?
     */
    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

    /**
     * Semaphore閿?鍒ゆ柇鏄惁鍏钩妯″紡
     */
    public boolean isFair() {
        return sync instanceof FairSync;
    }

    /**
     * Semaphore閿?鍒ゆ柇闃熷垪涓槸鍚﹀瓨鍦ㄧ嚎绋嬪璞?
     */
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    /**
     * Semaphore閿?鑾峰彇闃熷垪闀垮害
     */
    public final int getQueueLength() {
        return sync.getQueueLength();
    }

    /**
     * Semaphore閿?鑾峰彇闃熷垪鐨勭嚎绋嬪璞?
     */
    protected Collection < Thread > getQueuedThreads() {
        return sync.getQueuedThreads();
    }

}
  • 淇″彿閲忓悓姝ュ櫒锛?涓昏鏄彁渚涗簡2涓瀯閫犳柟娉曟潵瀹炵幇浠ょ墝璁稿彲鐨勭鐞嗭紝鍏朵腑锛?
    • 榛樿闈炲叕骞虫ā寮忥細渚濇嵁鎸囧畾浼犲叆鐨勪护鐗岃鍙暟閲弍ermits鐩存帴瀹炰緥鍖朜onfairSync闈炲叕骞冲悓姝ュ櫒
    • 鍙€夊叕骞?闈炲叕骞虫ā寮忥細渚濇嵁鎸囧畾浼犲叆鐨勪护鐗岃鍙暟閲弍ermits鍜屽叕骞虫爣璁癴air鏉ュ疄渚嬪寲NonfairSync闈炲叕骞冲悓姝ュ櫒鍜孎airSync鍏钩鍚屾鍣紝鍏朵腑锛屽綋fair=true鏃讹紝鏄叕骞冲钩妯″紡锛屽惁鍒欎负闈炲叕骞虫ā寮?/li>
  • 鏀寔鍙腑鏂満鍒讹細涓昏鏄彁渚涗簡2涓猘cquire()鏂规硶鏉ヨ幏鍙栭攣锛屽叾涓細
    • 鏃犲弬鏁癮cquire()鏂规硶锛氫竴鑸ā寮忚幏鍙栧叡浜攣锛屼富瑕佹槸鍩轰簬AQS鍩虹鍚屾鍣ㄤ腑鐨刟cquireSharedInterruptibly(int arg)鏉ュ疄鐜帮紝鍏舵牳蹇冮€昏緫鏄痙oAcquireSharedInterruptibly(int arg)鏉ユ搷绾点€?/li>
    • 鏈夊弬鏁癮cquire()鏂规硶锛氫緷鎹寚瀹氫紶鍏ョ殑浠ょ墝璁稿彲鏁伴噺permits鏉ュ垽鏂紝褰損ermits< 0鏃讹紝鐩存帴throw new IllegalArgumentException()锛涘惁鍒欙紝鐩存帴璋冪敤acquireSharedInterruptibly(permits)鏂规硶銆?/li>
  • 鏀寔涓嶅彲涓柇鏈哄埗锛氫富瑕佹槸鎻愪緵浜?涓猘cquireUninterruptibly() 鏂规硶锛屽叾涓細
    • 鏃犲弬鏁癮cquireUninterruptibly() 鏂规硶锛氫竴鑸ā寮忚幏鍙栧叡浜攣锛屼富瑕佹槸鍩轰簬AQS鍩虹鍚屾鍣ㄤ腑acquireShared(int arg)鏂规硶鏉ュ疄鐜帮紝鍏舵牳蹇冮€昏緫鏄痙oAcquireShared(int arg) 鏉ユ搷绾点€?/li>
    • 鏈夊弬鏁癮cquireUninterruptibly() 鏂规硶锛氫緷鎹寚瀹氫紶鍏ョ殑浠ょ墝璁稿彲鏁伴噺permits鏉ュ垽鏂紝褰損ermits< 0鏃讹紝鐩存帴throw new IllegalArgumentException()锛涘惁鍒欙紝鐩存帴璋冪敤acquireShared(int arg)鏂规硶銆?/li>
  • 闈炲叕骞虫ā寮忚幏鍙栭攣鏂瑰紡锛?涓昏鎻愪緵浜?涓猼ryAcquire() 鏂规硶锛屽叾涓細
    • 鏃犲弬鏁皌ryAcquire() 鏂规硶锛氶潪鍏钩妯″紡灏濊瘯鑾峰彇鍏变韩閿侊紝鐩存帴璋冪敤鐨勬槸闈炲叕骞冲悓姝ュ櫒涓殑nonfairTryAcquireShared(int acquires) 鏂规硶銆?/li>
    • 鏈夊弬鏁皌ryAcquire() 鏂规硶锛氫緷鎹寚瀹氫紶鍏ョ殑浠ょ墝璁稿彲鏁伴噺permits鏉ュ垽鏂紝褰損ermits< 0鏃讹紝鐩存帴throw new IllegalArgumentException()锛涘惁鍒欙紝鐩存帴璋冪敤nonfairTryAcquireShared(int acquires) 鏂规硶銆?/li>
  • 鍏钩妯″紡鑾峰彇閿佹柟寮忥細涓昏鎻愪緵浜?涓猼ryAcquire() 鏂规硶锛屾敮鎸佽秴鏃舵満鍒躲€傚叾涓細
    • 鏃犲弬鏁皌ryAcquire() 鏂规硶锛氬叕骞虫ā寮忓皾璇曡幏鍙栧叡浜攣锛屼緷鎹寚瀹氫紶鍏ョ殑浠ょ墝璁稿彲鏁伴噺permits鏉ュ垽鏂紝褰損ermits< 0鏃讹紝鐩存帴throw new IllegalArgumentException()锛涘惁鍒欙紝鐩存帴璋冪敤鐨勬槸AQS鍩虹鍚屾鍣ㄤ腑鐨則ryAcquire(int permits, long timeout, TimeUnit unit)鏂规硶锛屽叾鏍稿績閫昏緫鏄痶ryAcquireSharedNanos(int arg, long nanosTimeout)鏉ユ搷绾点€?/li>
    • 鏈夊弬鏁皌ryAcquire() 鏂规硶锛氬叕骞虫ā寮忓皾璇曡幏鍙栧叡浜攣锛岄粯璁ゆ敮鎸佷竴涓鍙紝鐩存帴璋冪敤鐨勬槸AQS鍩虹鍚屾鍣ㄤ腑鐨則ryAcquire(1锛宭ong timeout, TimeUnit unit)鏂规硶锛屽叾鏍稿績閫昏緫鏄痶ryAcquireSharedNanos(int arg, long nanosTimeout)鏉ユ搷绾点€?/li>
  • 閲婃斁閿佹搷浣滄柟寮忥細涓昏鎻愪緵浜?涓猺elease()鏂规硶,鍏朵腑锛?
    • 鏃犲弬鏁皉elease() 鏂规硶锛氬叕骞?闈炲叕骞虫ā寮忕ず鑼冮攣鎿嶄綔锛岄粯璁ゆ敮鎸佷竴涓鍙紝涓昏鏄洿鎺ヨ皟鐢ˋQS鍩虹鍚屾鍣ㄤ腑鐨剅eleaseShared(int arg) 鏂规硶
    • 鏈夊弬鏁皉elease() 鏂规硶锛氬叕骞?闈炲叕骞虫ā寮忕ず鑼冮攣鎿嶄綔锛屼緷鎹寚瀹氫紶鍏ョ殑浠ょ墝璁稿彲鏁伴噺permits鏉ュ垽鏂紝褰損ermits< 0鏃讹紝鐩存帴throw new IllegalArgumentException()锛涘惁鍒欙紝涓昏鏄洿鎺ヨ皟鐢ˋQS鍩虹鍚屾鍣ㄤ腑鐨剅eleaseShared(int arg) 鏂规硶
  • 浠ょ墝璁稿彲鎿嶄綔鏂规硶锛氫富瑕佹彁渚涗簡availablePermits() 鏂规硶锛宺educePermits(int reduction)鏂规硶 浠ュ強drainPermits() 鏂规硶锛屽叾涓細
    • availablePermits() 鏂规硶锛氳幏鍙栧彲鐢ㄧ殑浠ょ墝璁稿彲鏁伴噺锛屼富瑕佹槸璋冪敤鍐呴儴鍚屾鍣ㄤ腑getPermits()鏂规硶銆?/li>
    • reducePermits()鏂规硶锛氳绠楀墿浣欏彲鐢ㄤ护鐗岃鍙暟閲忥紝渚濇嵁鎸囧畾浼犲叆鐨勪护鐗岃鍙暟閲弐eduction鏉ュ垽鏂紝褰搑eduction< 0鏃讹紝鐩存帴throw new IllegalArgumentException()锛涘惁鍒欙紝璋冪敤鍐呴儴鍚屾鍣ㄤ腑reducePermits()鏂规硶銆?/li>
    • drainPermits() 鏂规硶锛氶噸缃彲鐢ㄤ护鐗岃鍙暟閲忥紝涓昏鏄皟鐢ㄥ唴閮ㄥ悓姝ュ櫒涓璬rainPermits()鏂规硶銆?/li>
  • 闃熷垪鎿嶄綔鏂规硶锛氫富瑕佹彁渚涗簡hasQueuedThreads()鏂规硶锛実etQueuedThreads() 鏂规硶浠ュ強getQueueLength() 鏂规硶锛屽叾涓細
    • hasQueuedThreads()鏂规硶锛氫富瑕佹槸鐢ㄤ簬鑾峰彇闃熷垪涓槸鍚﹀瓨鍦ㄧ瓑寰呰幏鍙栦护鐗岃鍙殑绾跨▼瀵硅薄锛屼富瑕佹槸鐩存帴浣跨敤AQS鍩虹鍚屾鍣ㄧ殑hasQueuedThreads()鏉ュ疄鐜般€?/li>
    • getQueuedThreads() 鏂规硶锛氫富瑕佹槸鐢ㄤ簬鑾峰彇闃熷垪涓瓑寰呰幏鍙栦护鐗岃鍙殑绾跨▼瀵硅薄锛屼富瑕佹槸鐩存帴浣跨敤AQS鍩虹鍚屾鍣ㄧ殑getQueuedThreads()鏉ュ疄鐜般€?/li>
    • getQueueLength() 鏂规硶锛氫富瑕佹槸鐢ㄤ簬鑾峰彇闃熷垪涓瓑寰呰幏鍙栦护鐗岃鍙殑鏁伴噺锛屼富瑕佹槸鐩存帴浣跨敤AQS鍩虹鍚屾鍣ㄧ殑getQueueLength()鏉ュ疄鐜般€?/li>

缁间笂鎵€杩帮紝浠庝竴瀹氭剰涔変笂璁诧紝Semaphore鏄竴绉嶅叡浜攣锛屽睘浜嶢QS鍩虹鎶借薄闃熷垪鍚屾鍣ㄤ腑鍏变韩妯″紡瀛靛寲鐨勪骇鐗╋紝鏀寔鍏钩妯″紡涓庨潪鍏钩妯″紡锛岄粯璁ゆ槸浣跨敤闈炲叕骞虫ā寮忋€?/p>

鍐欏湪鏈€鍚?/h2>
Java并发编程解析 - 基于JDK源码解析Java领域中并发锁之同步器Semaphore,CyclicBarrier以及CountDownLatch等的设计思想与实现原理 (四),第21张

閫氳繃瀵笿ava棰嗗煙涓紝JDK鍐呴儴鎻愪緵鐨勫悇绉嶉攣鐨勫疄鐜版潵鐪嬶紝涓€鐩村洿缁曠殑鏍稿績涓昏杩樻槸鍩轰簬AQS鍩虹鍚屾鍣ㄦ潵瀹炵幇鐨勶紝浣嗘槸AQS鍩虹鍚屾鍣ㄤ笉鏄竴绉嶉潪瀹冧笉鍙殑鎶€鏈爣鍑嗚鑼冿紝鏇村鐨勫彧鏄竴濂楁妧鏈弬鑰冩寚鍗椼€?/p>

浣嗘槸锛屽疄闄呬笂锛孞ava瀵逛簬閿佺殑瀹炵幇涓庤繍鐢ㄨ繙杩滀笉姝㈣繖浜涳紝杩樻湁鐩镐綅鍣?Phaser)鍜屼氦鎹㈠櫒(Exchanger),浠ュ強鍦↗ava JDK1.8鐗堟湰涔嬪墠骞跺彂瀹瑰櫒ConcurrentHashMap涓娇鐢ㄧ殑鍒嗘閿?Segment)銆?/p>

涓嶈鏄綍绉嶅疄鐜板拰搴旂敤锛屽湪Java骞跺彂缂栫▼棰嗗煙鏉ヨ锛岄兘鏄洿缁曠嚎绋嬪畨鍏ㄩ棶棰樼殑瑙掑害鍘昏€冭檻鐨勶紝鍙槸閽堝浜庡悇绉嶅悇鏍风殑涓氬姟鍦烘櫙鍋氱殑鍏蜂綋鐨勫疄鐜般€?/p>

涓€瀹氭剰涔変笂鏉ヨ锛屽绾跨▼鍔犻攣鍙槸骞跺彂缂栫▼鐨勫疄鐜版柟寮忎箣涓€锛岀浉瀵逛簬瀹為檯搴旂敤鏉ヨ锛孞ava棰嗗煙涓殑閿侀兘鍙槸涓€绉嶅崟涓€搴旂敤鐨勯攣锛屽彧鏄粰鎴戜滑鎺屾彙Java骞跺彂缂栫▼鎻愪緵涓€绉嶆€濇兂娌★紝涓夎█涓よ涔熶笉鍙兘璇﹀敖銆?/p>

鍒版涓烘锛岃繖绠楁槸瀵逛簬Java棰嗗煙涓苟鍙戦攣鐨勬渶缁堢珷锛屾枃涓〃杩板潎涓轰釜浜虹湅娉曞拰涓汉鐞嗚В锛屽鏈変笉鍒颁箣澶勶紝蹇樿璋呰В涔熻缁欎簣鎵硅瘎鎸囨銆?/p>

鏈€鍚庯紝鎶€鏈爺绌朵箣璺换閲嶈€岄亾杩滐紝鎰挎垜浠啲鐨勬瘡涓€涓€氬锛岄兘鎾戝緱璧锋垜浠兂鍦ㄨ繖鏉¤矾涓婅蛋涓嬪幓鐨勫媷姘旓紝鏈潵浠嶇劧鍙湡锛屼笌鍚勪綅绋嬪簭缂栫▼鍚涘叡鍕夛紒

鐗堟潈澹版槑锛氭湰鏂囦负鍗氫富鍘熷垱鏂囩珷锛岄伒寰浉鍏崇増鏉冨崗璁紝濡傝嫢杞浇鎴栬€呭垎浜闄勪笂鍘熸枃鍑哄閾炬帴鍜岄摼鎺ユ潵婧愩€?/p>


https://www.xamrdz.com/backend/39g1936024.html

相关文章: