前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[JDK] SynchronousQueue 源码阅读【1】

[JDK] SynchronousQueue 源码阅读【1】

作者头像
架构探险之道
发布2019-07-25 16:27:48
5380
发布2019-07-25 16:27:48
举报
文章被收录于专栏:架构探险之道

[JDK] SynchronousQueue 源码阅读

java.util.concurrent.SynchronousQueue

翻译

类注释

  • P1
代码语言:javascript
复制
A {@linkplain BlockingQueue blocking queue} in which each insert
operation must wait for a corresponding remove operation by another
thread, and vice versa.  A synchronous queue does not have any
internal capacity, not even a capacity of one.  You cannot
{@code peek} at a synchronous queue because an element is only
present when you try to remove it; you cannot insert an element
(using any method) unless another thread is trying to remove it;
you cannot iterate as there is nothing to iterate.  The
<em>head</em> of the queue is the element that the first queued
inserting thread is trying to add to the queue; if there is no such
queued thread then no element is available for removal and
{@code poll()} will return {@code null}.  For purposes of other
{@code Collection} methods (for example {@code contains}), a
{@code SynchronousQueue} acts as an empty collection.  This queue
does not permit {@code null} elements.一个{@linkplain BlockingQueue阻塞队列},其中每个插入操作必须等待另一个线程执行相应的删除操作,反之亦然。同步队列没有任何内部容量,甚至没
有1个元素的容量。您不能{@code peek}查看同步队列,因为元素只有在您试图删除它时才会出现;除非另一个线程试图删除元素,否则不能插入元素(使用任
何方法);您不能迭代,因为没有要迭代的内容。队列的<em>head</em>是第一个队列插入线程试图添加到队列中的元素;如果没有这样排队的线程,那么没有元
素可用来删除,{@code poll()}将返回{@code null}。对于其他{@code Collection}方法(例如{@code contains}), {@code synchroniousqueue}
充当空集合。这个队列不允许{@code null}元素。<p>Synchronous queues are similar to rendezvous channels used in
CSP and Ada. They are well suited for handoff designs, in which an
object running in one thread must sync up with an object running
in another thread in order to hand it some information, event, or
task.同步队列类似于CSP和Ada中使用的会合通道。它们非常适合于切换设计,在这种设计中,在一个线程中运行的对象必须与在另一个线程中运行的对象同步,以便
为其传递一些信息、事件或任务。<p>This class supports an optional fairness policy for ordering
waiting producer and consumer threads.  By default, this ordering
is not guaranteed. However, a queue constructed with fairness set
to {@code true} grants threads access in FIFO order.这个类支持一个可选的公平策略来订购等待的生产者和消费者线程。默认情况下,不保证这个顺序的执行。但是,公平性地将设置为{@code true}的队列按照
FIFO顺序授予线程访问权。<p>This class and its iterator implement all of the
<em>optional</em> methods of the {@link Collection} and {@link
Iterator} interfaces.这个类及其迭代器实现了{@link Collection}和{@link iterator}接口的所有<em>optional</em>方法。<p>This class is a member of the
<a href="{@docRoot}/../technotes/guides/collections/index.html">
Java Collections Framework</a>.这个类是<a href="{@docRoot}/../technotes/guides/collections/index "的成员。
Java集合框架</a>。@since 1.5 @author Doug Lea和Bill Scherer以及Michael Scott @param <E>是本集合中所包含元素的类型@since 1.5
@author Doug Lea and Bill Scherer and Michael Scott
@param <E> the type of elements held in this collection从JDK1.5开始发布
  • P2
代码语言:javascript
复制
This class implements extensions of the dual stack and dual
queue algorithms described in "Nonblocking Concurrent Objects
with Condition Synchronization", by W. N. Scherer III and
M. L. Scott.  18th Annual Conf. on Distributed Computing,
Oct. 2004 (see also
http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html).
The (Lifo) stack is used for non-fair mode, and the (Fifo)
queue for fair mode. The performance of the two is generally
similar. Fifo usually supports higher throughput under
contention but Lifo maintains higher thread locality in common
applications.2004年10月,W. N. Scherer III 和 M. L. Scott 在一年一度的第18届分布式计算大会提出,(另见
http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html))
该类实现双堆栈和双队列算法的扩展,被描述为“具有条件同步的非阻塞并发对象”。(Lifo)堆栈用于非公平模式,(Fifo)队列用于公平模式。两者的性能大体
相似。Fifo通常在争用情况下支持更高的吞吐量,但Lifo在普通应用程序中维护更高的线程局部性。A dual queue (and similarly stack) is one that at any given
time either holds "data" -- items provided by put operations,
or "requests" -- slots representing take operations, or is
empty. A call to "fulfill" (i.e., a call requesting an item
from a queue holding data or vice versa) dequeues a
complementary node.  The most interesting feature of these
queues is that any operation can figure out which mode the
queue is in, and act accordingly without needing locks.双队列(和类似的堆栈)是在任意给定的队列时间内,要么通过put操作提供的项保存“数据”,或者通过take操作“请求”插槽中数据,或者是空的。一个对“完成”
状态的队列的调用(即,一个从持有数据的队列请求某个元素的调用操作,反之亦然)取消队列中一个互补的节点。其中最有趣的特点是队列的任何操作都可以确
定队列采用了哪种模式, 并相应地采取行动, 而不需要锁。Both the queue and stack extend abstract class Transferer
defining the single method transfer that does a put or a
take. These are unified into a single method because in dual
data structures, the put and take operations are symmetrical,
so nearly all code can be combined. The resulting transfer
methods are on the long side, but are easier to follow than
they would be if broken up into nearly-duplicated parts.队列和堆栈都扩展了抽象类Transferer,定义了执行put或take的单个方法传输。因为在双数据结构中,put和take操作是对称的,所以几乎所有代码都可以
组合在一起,所以它们被统一到一个方法中。由此产生的传输方法比较长,但是相比分解成几乎重复的几个部分的代码,会更容易执行。The queue and stack data structures share many conceptual
similarities but very few concrete details. For simplicity,
they are kept distinct so that they can later evolve
separately.队列和堆栈数据结构在概念上有许多相似之处,但很少有具体的细节。为了简单起见,它们是不同的,以便以后可以单独演化。The algorithms here differ from the versions in the above paper
in extending them for use in synchronous queues, as well as
dealing with cancellation. The main differences include:这里的算法与上面的版本的不同之处在于,它们可以扩展到同步队列中使用,也可以处理取消的操作。主要差异包括:1. The original algorithms used bit-marked pointers, but
the ones here use mode bits in nodes, leading to a number
of further adaptations.原生的算法使用位标记指针,但这里的算法在节点中使用模式位,这导致了许多进一步的调整。2. SynchronousQueues must block threads waiting to become
fulfilled.同步队列必须阻塞等待线程任务完成(取操作依赖数据放入,反之亦然)。3. Support for cancellation via timeout and interrupts,
including cleaning out cancelled nodes/threads
from lists to avoid garbage retention and memory depletion.通过超时和中断支持取消,包括从列表中清除取消的节点/线程,以避免垃圾残留和内存耗尽。Blocking is mainly accomplished using LockSupport park/unpark,
except that nodes that appear to be the next ones to become
fulfilled first spin a bit (on multiprocessors only). On very
busy synchronous queues, spinning can dramatically improve
throughput. And on less busy ones, the amount of spinning is
small enough not to be noticeable.阻塞主要是使用LockSupport park/unpark来完成的,除了看起来是下一个要完成的节点(仅在多处理器上)会先旋转一点。在非常繁忙的同步队列上,
旋转可以显著提高吞吐量。而在不太忙的情况下,旋转的幅度很小,不会引起注意。[LockSupport park/unpark](https://www.cnblogs.com/moonandstar08/p/5132012.html)
LockSupport是JDK中比较底层的类,用来创建锁和其他同步工具类的基本线程阻塞原语。
Java锁和同步器框架的核心AQS:AbstractQueuedSynchronizer,就是通过调用LockSupport.park()和LockSupport.unpark()实现线程的阻塞和唤醒
的。LockSupport很类似于二元信号量(只有1个许可证可供使用),如果这个许可还没有被占用,当前线程获取许可并继续执行;如果许可已经被占用,当前线
程阻塞,等待获取许可。LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程,而且park()和unpark()不会遇到
“Thread.suspend 和 Thread.resume所可能引发的死锁”问题。因为park() 和 unpark()有许可的存在;调用 park() 的线程和另一个试图将
其 unpark() 的线程之间的竞争将保持活性。Cleaning is done in different ways in queues vs stacks.  For
queues, we can almost always remove a node immediately in O(1)
time (modulo retries for consistency checks) when it is
cancelled. But if it may be pinned as the current tail, it must
wait until some subsequent cancellation. For stacks, we need a
potentially O(n) traversal to be sure that we can remove the
node, but this can run concurrently with other threads
accessing the stack.在队列和堆栈中以不同的方式进行清理。对于队列,当一个节点被取消时,我们几乎总是可以在O(1)时间内立即删除它(为一致性检查而进行模重试)。但如果它
可能被固定为当前尾部,则必须等待后续的取消操作。对于堆栈,我们需要一个潜在的O(n)遍历,以确保可以删除节点,但这可以与访问堆栈的其他线程同时运行。While garbage collection takes care of most node reclamation
issues that otherwise complicate nonblocking algorithms, care
is taken to "forget" references to data, other nodes, and
threads that might be held on to long-term by blocked
threads. In cases where setting to null would otherwise
conflict with main algorithms, this is done by changing a
node's link to now point to the node itself. This doesn't arise
much for Stack nodes (because blocked threads do not hang on to
old head pointers), but references in Queue nodes must be
aggressively forgotten to avoid reachability of everything any
node has ever referred to since arrival.虽然垃圾收集负责处理大多数节点回收问题,这些问题会使非阻塞算法复杂化,但是要注意“忘记”对数据、其他节点和线程的引用,这些引用可能会被阻塞的线
程长期持有。在设置为null会与主要算法冲突的情况下,可以将节点的链接更改为指向节点本身。对于堆栈节点来说,这种情况并不多见(因为阻塞的线程不会
挂在旧的头指针上),但是必须积极地忘记队列节点中的引用,以避免任何节点在到达后所引用的所有内容都是可访问的。
  • P3
代码语言:javascript
复制
/**
* Shared internal API for dual stacks and queues.   用于双堆栈和队列的共享内部API
*/abstract static class Transferer<E> {    /**
    * Performs a put or take.执行put或take操作
    * @param e if non-null, the item to be handed to a consumer;
    *          if null, requests that transfer return an item
    *          offered by producer.
     如果非空,元素被传递给使用者;如果为空,则令该次传输返回生产者提供的元素。
    * @param timed if this operation should timeout 该操作超时时间设置
    * @param nanos the timeout, in nanoseconds 超时时间,以纳秒为单位
    * @return if non-null, the item provided or received; if null,
    *         the operation failed due to timeout or interrupt --
    *         the caller can distinguish which of these occurred
    *         by checking Thread.interrupted.
     如果非空,则提供或接收元素;如果为null,则由于超时或中断而导致操作失败,调用者可以通过检查 thread.interrupted 和 timeout(以纳秒为单位)来区分发生了哪些操作
    */
   abstract E transfer(E e, boolean timed, long nanos);
}/** The number of CPUs, for spin control cpu的数量,用于自旋控制*/static final int NCPUS = Runtime.getRuntime().availableProcessors();/**
* The number of times to spin before blocking in timed waits.
* The value is empirically derived -- it works well across a
* variety of processors and OSes. Empirically, the best value
* seems not to vary with number of CPUs (beyond 2) so is just
* a constant.
在定时等待中阻塞之前要旋转的次数。这个值是经验得出的——它在各种处理器和操作系统中都能很好地工作。根据经验,最佳值似乎不会随着cpu数量(超过2)的变化而变化,所以它只是一个常量。
[超时等待前的自旋次数]
*/static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;/**
* The number of times to spin before blocking in untimed waits.
* This is greater than timed value because untimed waits spin
* faster since they don't need to check times on each spin.
在非定时等待中阻塞之前要旋转的次数。这比计时值大,因为非计时等待旋转更快,因为他们不需要检查每个旋转的时间。
[阻塞等待前的自旋次数]
*/static final int maxUntimedSpins = maxTimedSpins * 16;/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices.
旋转比计时停止快的纳秒数粗略估计就够了
[超时时间小于 1000 纳秒时,自旋比阻塞等待效率高]
*/static final long spinForTimeoutThreshold = 1000L;
  • P3
代码语言:javascript
复制
/** Dual stack 双栈 */static final class TransferStack<E> extends Transferer<E> {   /*
   * This extends Scherer-Scott dual stack algorithm, differing,
   * among other ways, by using "covering" nodes rather than
   * bit-marked pointers: Fulfilling operations push on marker
   * nodes (with FULFILLING bit set in mode) to reserve a spot
   * to match a waiting node.
   这扩展了Scherer-Scott双堆栈算法,不同于其他方法,使用“覆盖”节点而不是位标记指针:持有数据时在标记节点上执行push操作(在模式中设置了“FULFILLING”(持有数据)的状态位),保留一个位置来匹配等待节点。
   */  /* Modes for SNodes, ORed together in node fields 用于SNodes的模式,在节点字段中组合在一起 */
  /** Node represents an unfulfilled consumer 表示一个完成的消费者的节点*/
  static final int REQUEST    = 0;   /** Node represents an unfulfilled producer 表示一个完成的生产者的节点*/
  static final int DATA       = 1;   /** Node is fulfilling another unfulfilled DATA or REQUEST 表示一个持有另外一个未持有的数据(或请求)的节点,节点正在完成一个匹配的操作*/
  static final int FULFILLING = 2;   /** Returns true if m has fulfilling bit set. 如果m有填满了的位集,返回true*/
  static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }   /** Node class for TransferStacks. 用于传输堆栈的节点类。*/
  static final class SNode {       volatile SNode next;        // next node in stack 堆栈的下一个节点
      volatile SNode match;       // the node matched to this 与当前节点匹配的对偶节点
      volatile Thread waiter;     // to control park/unpark 控制线程启动和暂停 当前节点驻留的线程
      Object item;                // data; or null for REQUESTs 如果是数据节点则非空,如果是请求节点则为 null
      int mode; //当前节点的状态
      // Note: item and mode fields don't need to be volatile
      // since they are always written before, and read after,
      // other volatile/atomic operations.      SNode(Object item) {           this.item = item;
      }       // 参数原子更新 next 节点
      boolean casNext(SNode cmp, SNode val) {           return cmp == next &&               //比较obj的offset处内存位置中的值和期望的值,如果相同则更新。此更新是不可中断的。
              UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);               /*
              java不能直接访问操作系统底层,而是通过本地方法来访问。Unsafe类提供了硬件级别的原子操作,主要提供了以下功能:
              - 通过Unsafe类可以分配内存,可以释放内存;
              - 可以定位对象某字段的内存位置,也可以修改对象的字段值,即使它是私有的;
              - 挂起与恢复
              - CAS操作 是通过compareAndSwapXXX方法实现的
              [Java中Unsafe类详解](http://www.cnblogs.com/mickole/articles/3757278.html)
              */
      }       /**
       * Tries to match node s to this node, if so, waking up thread.
       * Fulfillers call tryMatch to identify their waiters.
       * Waiters block until they have been matched.
       尝试将节点s匹配到此节点,如果匹配,则唤醒线程。履行者会调用“tryMatch”来识别他们的等待线程。等待线程阻塞直到匹配。
       *
       * @param s the node to match
       * @return true if successfully matched to s
       */
      boolean tryMatch(SNode s) {           //尝试设置匹配节点为 s
          if (match == null &&
              UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {               //读取驻留线程
              Thread w = waiter;               if (w != null) {    // waiters need at most one unpark 等待线程需要至少启动一个
                  waiter = null;                   //唤醒驻留线程
                  LockSupport.unpark(w);
              }               // 匹配成功返回 true
              return true;
          }           return match == s;
      }       /**
       * Tries to cancel a wait by matching node to itself.
       */
      void tryCancel() {
          UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
      }       //当前节点已经被取消
      boolean isCancelled() {           return match == this;
      }       // Unsafe mechanics
      private static final sun.misc.Unsafe UNSAFE;       private static final long matchOffset;       private static final long nextOffset;       static {           try {
              UNSAFE = sun.misc.Unsafe.getUnsafe();
              Class<?> k = SNode.class;
              matchOffset = UNSAFE.objectFieldOffset
                  (k.getDeclaredField("match"));
              nextOffset = UNSAFE.objectFieldOffset
                  (k.getDeclaredField("next"));
          } catch (Exception e) {               throw new Error(e);
          }
      }
  }   /** The head (top) of the stack */
  volatile SNode head;   boolean casHead(SNode h, SNode nh) {       return h == head &&
          UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
  }   /**
   * Creates or resets fields of a node. Called only from transfer
   * where the node to push on stack is lazily created and
   * reused when possible to help reduce intervals between reads
   * and CASes of head and to avoid surges of garbage when CASes
   * to push nodes fail due to contention.
   创建或重置节点的字段。仅从transfer调用,其中要入栈的节点被延迟创建并在可能的情况下被重用,以帮助减少以CASes为头的读操作和一般的读操作之间的间隔,并避免在CASes在节点入栈时由于争用而失败时出现垃圾激增。
   */
  static SNode snode(SNode s, Object e, SNode next, int mode) {       if (s == null) s = new SNode(e);
      s.mode = mode;
      s.next = next;       return s;
  }   /**
    * Puts or takes an item. 放入或取一个元素
    */
    @SuppressWarnings("unchecked")     E transfer(E e, boolean timed, long nanos) {        /*
        * Basic algorithm is to loop trying one of three actions:
        * 基础算法是循环尝试三个动作之一:
        * 1. If apparently empty or already containing nodes of same
        *    mode, try to push node on stack and wait for a match,
        *    returning it, or null if cancelled.
        * 如果显然是空的或已经包含相同模式的节点,请尝试在堆栈上推入(push)该节点,并等待匹配后返回它,或者如果取消则为空。
        [如果堆栈为空或包含相同类型的节点,则新建节点并入栈,阻塞等待]
        * 2. If apparently containing node of complementary mode,
        *    try to push a fulfilling node on to stack, match
        *    with corresponding waiting node, pop both from
        *    stack, and return matched item. The matching or
        *    unlinking might not actually be necessary because of
        *    other threads performing action 3:
        * 如果显然包含对偶模式的节点,尝试将一个持有数据或请求的节点推入(push)堆栈,匹配与相应的等待节点,从堆栈弹出(pop),并返回匹配的项。匹配或取消链接可能不是必要的,因为其他线程执行动作3:
        [如果堆栈中包含对偶节点,则将一个对偶节点入栈,并同时弹出新增节点及与其匹配的节点。]
        * 3. If top of stack already holds another fulfilling node,
        *    help it out by doing its match and/or pop
        *    operations, and then continue. The code for helping
        *    is essentially the same as for fulfilling, except
        *    that it doesn't return the item.
          如果栈顶已经拥有另一个持有数据或请求的节点,通过执行匹配 和/或 弹出(pop)操作来帮助它,然后继续。除了不返回的元素,帮助的代码与持有数据或请求的代码本质上是相同的。
          [如果堆栈顶部已经持有对偶节点,则帮助它们弹出堆栈。]
        */       SNode s = null; // constructed/reused as needed
       int mode = (e == null) ? REQUEST : DATA; //计算节点模式:持有请求/持有数据       for (;;) {
           SNode h = head;//读取栈顶节点
           //1)堆栈为空或栈顶元素和新增节点模式一致
           if (h == null || h.mode == mode) {  // empty or same-mode
               //1-1)如果设置了超时时间,并且已经超时
               if (timed && nanos <= 0) {      // can't wait
                   // 头节点不为 null && 头节点已经被取消
                   if (h != null && h.isCancelled())                        // 尝试原子更新栈顶节点
                       casHead(h, h.next);     // pop cancelled node 替换当前匹配的栈顶节点为下一个节点
                   else
                       // 删除栈顶后返回 nul
                       return null;                // 1-2)尝试将新节点入栈
               } else if (casHead(h, s = snode(s, e, h, mode))) {                    // 尝试在指定的超时时间内等待对偶节点
                   SNode m = awaitFulfill(s, timed, nanos);                    // 线程超时或被其他线程中断
                   if (m == s) {               // wait was cancelled
                       // 清除新增节点
                       clean(s);                        return null;
                   }                     // 当前节点和对偶节点位于栈顶前两个位置,则弹出它们
                   if ((h = head) != null && h.next == s)
                       casHead(h, s.next);     // help s's fulfiller
                   //新值的是请求节点,则返回对偶节点的数据值
                   //新值的是数据节点,则返回其持有的数据
                   return (E) ((mode == REQUEST) ? m.item : s.item);
               }            //2)头节点不是 fulFill 节点   
           } else if (!isFulfilling(h.mode)) { // try to fulfill
               // 1)头节点已经被取消
               if (h.isCancelled())            // already cancelled
                   // 则重新写入头结点
                   casHead(h, h.next);         // pop and retry
               // 2)尝试写入一个 fulFill 节点到栈顶
               else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {                    for (;;) { // loop until matched or waiters disappear
                       // 读取匹配节点
                       SNode m = s.next;       // m is s's match
                       // 已经不存在等待节点【被其他线程帮助弹出了】
                       if (m == null) {        // all waiters are gone
                           // 写入头节点
                           casHead(s, null);   // pop fulfill node
                           s = null;           // use new node next time
                           break;              // restart main loop
                       }                        // 读取匹配节点的后置节点
                       SNode mn = m.next;                        // 如果仍处于匹配模式
                       if (m.tryMatch(s)) {                            // 同时弹出这两个节点
                           casHead(s, mn);     // pop both s and m
                           //新值的是请求节点,则返回对偶节点的数据值
                           //新值的是数据节点,则返回其持有的数据
                           return (E) ((mode == REQUEST) ? m.item : s.item);
                       } else                  // lost match
                           s.casNext(m, mn);   // help unlink
                   }
               }            //3)头结点是一个 fulFill 节点,则帮助将它和它的对偶节点出栈
           } else {                            // help a fulfiller
               SNode m = h.next;               // m is h's match
               if (m == null)                  // waiter is gone
                   casHead(h, null);           // pop fulfilling node
               else {
                   SNode mn = m.next;                    if (m.tryMatch(h))          // help match
                       casHead(h, mn);         // pop both h and m
                   else                        // lost match
                       h.casNext(m, mn);       // help unlink
               }
           }
       }
    }   /**
   * Spins/blocks until node s is matched by a fulfill operation.
   * 线程执行自旋或阻塞等待,直到堆栈中写入一个对偶节点、线程超时、被其他线程中断
   * @param s the waiting node 等待节点
   * @param timed true if timed wait 是否是超时模式
   * @param nanos timeout value 超时纳秒
   * @return matched node, or s if cancelled
    1)出现对偶节点,则返回对偶节点
    2)线程超时或被中断,则返回 s
   */
  SNode awaitFulfill(SNode s, boolean timed, long nanos) {       /*
       * When a node/thread is about to block, it sets its waiter
       * field and then rechecks state at least one more time
       * before actually parking, thus covering race vs
       * fulfiller noticing that waiter is non-null so should be
       * woken.
       *
       * When invoked by nodes that appear at the point of call
       * to be at the head of the stack, calls to park are
       * preceded by spins to avoid blocking when producers and
       * consumers are arriving very close in time.  This can
       * happen enough to bother only on multiprocessors.
       *
       * The order of checks for returning out of main loop
       * reflects fact that interrupts have precedence over
       * normal returns, which have precedence over
       * timeouts. (So, on timeout, one last check for match is
       * done before giving up.) Except that calls from untimed
       * SynchronousQueue.{poll/offer} don't check interrupts
       * and don't wait at all, so are trapped in transfer
       * method rather than calling awaitFulfill.
       */
      // 计算截止时间
      final long deadline = timed ? System.nanoTime() + nanos : 0L;       // 读取当前线程
      Thread w = Thread.currentThread();       //计算自旋次数
      int spins = (shouldSpin(s) ?
                   (timed ? maxTimedSpins : maxUntimedSpins) : 0);       for (;;) {           // 线程已经被中断
          if (w.isInterrupted())               //则尝试删除节点
              s.tryCancel();           //读取匹配节点
          SNode m = s.match;           if (m != null)               return m; // 如果存在匹配节点,则返回它 1)节点本身 2)对偶节点
          if (timed) {//如果是超时模式
              nanos = deadline - System.nanoTime();//计算剩余时间
              if (nanos <= 0L) {
                  s.tryCancel();// 已经超时,则尝试删除节点
                  continue;
              }
          }           if (spins > 0) // 1)尝试进行自旋
              spins = shouldSpin(s) ? (spins-1) : 0; //重新计算值
          else if (s.waiter == null) //2)节点的驻留线程为 null
              s.waiter = w; // establish waiter so can park next iter //写入自旋线程
          else if (!timed) //如果不是超时阻塞
              LockSupport.park(this);// 阻塞当前线程
          else if (nanos > spinForTimeoutThreshold) //超时时间 > 1000 纳秒
              LockSupport.parkNanos(this, nanos);//超时阻塞当前线程
      }
  }   /**
   * Returns true if node s is at head or there is an active
   * fulfiller.
   */
  boolean shouldSpin(SNode s) {
      SNode h = head;       return (h == s || h == null || isFulfilling(h.mode));
  }   /**
   * Unlinks s from the stack.
   */
  void clean(SNode s) {
      s.item = null;   // forget item
      s.waiter = null; // forget thread      /*
       * At worst we may need to traverse entire stack to unlink
       * s. If there are multiple concurrent calls to clean, we
       * might not see s if another thread has already removed
       * it. But we can stop when we see any node known to
       * follow s. We use s.next unless it too is cancelled, in
       * which case we try the node one past. We don't check any
       * further because we don't want to doubly traverse just to
       * find sentinel.
       */
      //读取后置节点
      SNode past = s.next;       //如果后置节点也被取消了
      if (past != null && past.isCancelled())           //更新终止节点
          past = past.next;       // Absorb cancelled nodes at head 从头部开始遍历,删除已经取消的节点
      // 1)直到发现一个未取消的节点 || 2)一直遍历到 past 为止
      SNode p;       while ((p = head) != null && p != past && p.isCancelled())
          casHead(p, p.next);       // Unsplice embedded nodes 发现了一个未取消的节点,从未取消的节点开始又执行一次清除操作
      while (p != null && p != past) {
          SNode n = p.next; //读取后置节点
          if (n != null && n.isCancelled()) //后置节点已经取消
              p.casNext(n, n.next);//则将其踢除
          else
              p = n;//处理下一个节点
      }
  }   // Unsafe mechanics
  private static final sun.misc.Unsafe UNSAFE;   private static final long headOffset;   static {       try {
          UNSAFE = sun.misc.Unsafe.getUnsafe();
          Class<?> k = TransferStack.class;
          headOffset = UNSAFE.objectFieldOffset
              (k.getDeclaredField("head"));
      } catch (Exception e) {           throw new Error(e);
      }
  }
}
  • P4
代码语言:javascript
复制
/**
 * Creates a {@code SynchronousQueue} with nonfair access policy. 创建一个非公平的同步队列
 */
public SynchronousQueue() {     this(false);
}/**
 * Creates a {@code SynchronousQueue} with the specified fairness policy.
 * fair==true,创建一个公平的同步队列,最先等待的最先释放
   fair==false,创建一个非公平的同步队列
 * @param fair if true, waiting threads contend in FIFO order for
 *        access; otherwise the order is unspecified.
 */public SynchronousQueue(boolean fair) {
  transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}/**
* Adds the specified element to this queue, waiting if necessary for
* another thread to receive it.
* 将目标元素添加到同步队列,如果当前没有消费者,则阻塞等待
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/public void put(E e) throws InterruptedException {    if (e == null) throw new NullPointerException();    if (transferer.transfer(e, false, 0) == null) {
       Thread.interrupted();        throw new InterruptedException();
   }
}/**
* Inserts the specified element into this queue, waiting if necessary
* up to the specified wait time for another thread to receive it.
* 尝试在指定的超时时间内将目标元素 e 传递给队列头部的一个 take 操作,传递成功则返回 true,否则返回 false。
* @return {@code true} if successful, or {@code false} if the
*         specified waiting time elapses before a consumer appears
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/public boolean offer(E e, long timeout, TimeUnit unit)
   throws InterruptedException {    if (e == null) throw new NullPointerException();    if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)        return true;    //线程未被中断,则返回 false;否则抛出 InterruptedException 异常
   if (!Thread.interrupted())        return false;    throw new InterruptedException();
}/**
* Inserts the specified element into this queue, if another thread is
* waiting to receive it.
* 如果队列头部是一个 take 操作,则将当前元素传递给它,并返回 true,否则立刻返回 false
* @param e the element to add
* @return {@code true} if the element was added to this queue, else
*         {@code false}
* @throws NullPointerException if the specified element is null
*/public boolean offer(E e) {    if (e == null) throw new NullPointerException();    return transferer.transfer(e, true, 0) != null;
}/**
* Retrieves and removes the head of this queue, waiting if necessary
* for another thread to insert it.
* 移除并获取栈顶元素,如果无可用元素,则阻塞等待
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/public E take() throws InterruptedException {
   E e = transferer.transfer(null, false, 0);    if (e != null)        return e;    //返回元素为 null 表示线程被中断,则清除中断标识并抛出  InterruptedException 异常。
   Thread.interrupted();    throw new InterruptedException();
}/**
* Retrieves and removes the head of this queue, waiting
* if necessary up to the specified wait time, for another thread
* to insert it.
* 尝试移除并返回队列头部数据元素,如果不存在,则在指定的超时时间内阻塞等待其他线程插入数据.超时则返回 null。
* @return the head of this queue, or {@code null} if the
*         specified waiting time elapses before an element is present
* @throws InterruptedException {@inheritDoc}
*/public E poll(long timeout, TimeUnit unit) throws InterruptedException {
   E e = transferer.transfer(null, true, unit.toNanos(timeout));    if (e != null || !Thread.interrupted())        return e;    throw new InterruptedException();
}/**
* Retrieves and removes the head of this queue, if another thread
* is currently making an element available.
* 尝试移除并返回栈顶的数据元素,如果栈顶节点为一个数据节点,则尝试移除并返回队列头部的数据元素,否则返回 null
* @return the head of this queue, or {@code null} if no
*         element is available
*/public E poll() {    return transferer.transfer(null, true, 0);
}

REFRENCES

  • [Java并发包中的同步队列SynchronousQueue实现原理] (http://ifeve.com/java-synchronousqueue/)
  • [Scalable Synchronous Queues] (http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf)
  • [Javadoc of SynchronousQueue] (https://docs.oracle.com/javase/6/docs/api/java/util/concurrent/SynchronousQueue.html)
  • [Nonblocking Concurrent Data Structures with Condition Synchronization] (http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html)
  • [SynchronousQueue 源码分析-1] (http://www.likecs.com/show-43269.html)
  • [SynchronousQueue 源码分析-2] (https://yq.aliyun.com/articles/647326)
  • [[源码]解析 SynchronousQueue 上界,下界.. 数据保存和数据传递. 堵塞队列. 有无频繁await?] (http://www.cnblogs.com/zsychanpin/p/7010142.html)

Code Ex

Ex1:

  • 生产者
代码语言:javascript
复制
package com.example.concurrence.queue;import java.util.concurrent.SynchronousQueue;import java.util.concurrent.TimeUnit;/**
* <p>
*
* </p>
*
* @author xiachaoyang
* @version V1.0
* @date 2019年01月17日 18:19
* @modificationHistory=========================逻辑或功能性重大变更记录
* @modify By: {修改人} 2019年01月17日
* @modify reason: {方法名}:{原因}
* ...
*/public class ThreadProducer implements Runnable {   SynchronousQueue<String> queue;    static int cnt = 0;    public ThreadProducer(SynchronousQueue<String> queue) {        this.queue = queue;
   }    /**
    * When an object implementing interface <code>Runnable</code> is used
    * to create a thread, starting the thread causes the object's
    * <code>run</code> method to be called in that separately executing
    * thread.
    * <p>
    * The general contract of the method <code>run</code> is that it may
    * take any action whatsoever.
    *
    * @see Thread#run()
    */
   @Override
   public void run() {        //offerOneElement();
       //putOneElement();
       offerOneElementWithTime();
   }    /**
    * 尝试在指定的超时时间内将目标元素 e 传递给队列头部的一个 take 操作,传递成功则返回 true,否则返回 false。
    */
   private void offerOneElementWithTime() {
       String name = "start offer name:" + "n-" + ++cnt;
       System.out.println(name);        boolean offer = false;        try {
           offer = queue.offer("n-" + cnt,3000,TimeUnit.MILLISECONDS);
           Thread.sleep(4000);
           System.out.println("offer is over:" + "n-" + cnt + "---- offer-result:" + offer);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }/*
start offer name:n-1
...超时>4s
offer is over:n-1---- offer-result:false
...
start offer name:n-2
...未超时<4s
take from queue : n-2
offer is over:n-2---- offer-result:true
*/
   }    /**
    * 如果队列头部是一个 take 操作,则将当前元素传递给它,并返回 true,否则立刻返回 false
    */
   private void offerOneElement() {
       String name = "start offer name:" + "n-" + ++cnt;
       System.out.println(name);        boolean offer = queue.offer("n-" + cnt);
       System.out.println("offer is over:" + "n-" + cnt + "---- offer-result:" + offer);/*
先offer后take
start offer name:n-1
offer is over:n-1---- offer-result:false 返回false表示offer操作没有take节点接收先take后offer
start offer name:n-2
take from queue : n-2
offer is over:n-2---- offer-result:true
*/
   }    /**
    * put 阻塞等待
    */
   private void putOneElement() {
       String name = "put name:" + "n-" + ++cnt;        try {
           System.out.println("start put!");
           queue.put(name);
           System.out.println("put over > " + name);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }/*
start put!
put over > put name:n-1
take from queue : put name:n-1
start put!
take from queue : put name:n-2
put over > put name:n-2
...
start put!
...
put over > put name:n-3
take from queue : put name:n-3
*/
   }
}
  • 消费者
代码语言:javascript
复制
package com.example.concurrence.queue;import java.util.concurrent.SynchronousQueue;import static com.example.concurrence.utils.Utils.standOutString;/**
* <p>
* xxxxxx
* </p>
*
* @author xiachaoyang
* @version V1.0
* @date 2019年01月17日 18:24
* @modificationHistory=========================逻辑或功能性重大变更记录
* @modify By: {修改人} 2019年01月17日
* @modify reason: {方法名}:{原因}
* ...
*/public class ThreadConsumer implements Runnable {    public ThreadConsumer(SynchronousQueue<String> queue) {        this.queue = queue;
   }   SynchronousQueue<String> queue;    @Override
   public void run() {
       String name;        try {
           name = queue.take();
           System.out.println("take from queue : " + name);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
   }
}

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-01-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 架构探险之道 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • [JDK] SynchronousQueue 源码阅读
    • 翻译
      • REFRENCES
        • Code Ex
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档