并行化资源池队列3——紧密相关的同步化队列

紧密相关的同步化队列
java

如今来看一种紧密相关的同步方式。一个或多个生产者线程生产数据,并由一个或多个消费者线程按照先进先出的次序来获取。可是,生产者与消费者之间必须相互回合,即向队列中放入一个元素的生产者应阻塞直到该元素被另一个消费者取出,反之亦然。其实现的伪代码描述以下:node

publicclassSynchronousQueue<T>{缓存

      T item=null;数据结构

      booleanenqueuing;并发

      Lock lock;ui

      Condition condition;atom

      publicvoidenq(T value){spa

         lock.lock();线程

         try{设计

            while(enqueuing){

               condition.await();

            }

            enqueuing=true;

            item=value;

            condition.signalAll();

            while(item!=null)

               condition.await();

            enqueuing=false;

            condition.signalAll();

         }finally{

            lock.unlock();

         }

      }

      public T deq(){

         lock.lock();

         try{

            while(item==null)

               condition.await();

            T t=item;

            item=null;

            condition.signalAll();

            return t;

         }finally{

            lock.unlock();

         }

      }

   }

    这是一种基于管程的同步队列实现。因为这个队列设计很是简单,因此他的同步代价很高。在每一个线程可能唤醒另外一个线程的时间点,不管是入队者仍是出队者都会唤醒全部的等待线程,从而唤醒的次数是等待线程数目的平方。尽管可使用条件对象来减小唤醒次数,但因为仍须要阻塞每次调用,因此开销很大。

    为了减小同步队列的同步开销,咱们考虑另一种同步队列的实现,在该实现中将入队与出队分两步完成。如出队从一个空队列删除元素的过程为,第一步,他将一个保留对象放入队列,表示该出队者这在等待一个准备与之会合的入队者。而后,出队者在这个保留对象的flag标志上自旋等待。第二步,当一个入队者发现该保留时,他经过存放一个元素并设置保留对象的flag来通知出队者完成保留。一样,入队者也可以经过建立本身的保留对象,并在保留对象的flag标志上自旋来等待会合同伴。在任意时刻,队列自己或者包含出队者的保留对象或者包含入队者的保留对象,或者为空。所以操做队列的线程共有4种交互对象,即入队者线程、入队者保留、出队者线程、出队者保留,因为须要进行紧密的同步,所以他们之间的对应关系以下:

入队者线程------------>出队者保留;

出队者线程------------>入队者保留;

    即入队者线程要协助处理出队者保留,一样出队者线程要协助处理入队者保留。这种结构称为双重数据结构,其核心是方法是经过两个步骤来生效的:保留和完成。该结构具备不少很好的性质。首先,正在等待的线程能够在一个本地缓存标志上自旋,而这时可扩展性的基础。其次,他很天然的保证了公平性。保留按照他们到达的次序来排队,从而保证请求也按照一样的次序完成,所以这种结构是能够线性化的,由于每一个部分方法调用在他完成时是能够排序的。

    该队列结构能够用节点组成的链表来实现,其中节点或者表示一个等待出队的元素或者表示一个等待完成的保留,由节点的Type域指定。任什么时候候,全部的队列节点都应具有相同的类型,即或者所有是等待出队的元素,或者所有是等待完成的保留。当一个元素入队时,节点的item域存放该元素;当元素出队时,节点的item域被从新设置为null。当一个保留入队时,节点的item域为null;当保留被一个入队者完成时,节点的item域被从新设置为一个元素。

    所以首先定义一个Java枚举,来表示节点的类型,而后定义链表的节点元素,在整个实现过程当中,依然基于Java的原子化操做来实现并发控制,因此源代码以下所示:

publicenumNodeType {

 ITEM,RESERVATION;

}

其中,ITEM表明节点元素,RESERVATION表明保留对象。

import java.util.concurrent.atomic.AtomicReference;

publicclassSynNode<E> {

  volatileNodeType type;//节点类型

//节点元素,元素值为Java泛化类型

  volatileAtomicReference<E> item;

  volatileAtomicReference<SynNode<E>> next;

  SynNode(E eitem,NodeType etype){

      item=newAtomicReference<E>(eitem);

      next=newAtomicReference<SynNode<E>>(null);

      type=etype;

  }

}

接下来定义队列的主体实现,其中主要包括:入队和出对操做。

import java.util.concurrent.atomic.AtomicReference;

publicclassSynchronousDualQueue<E> {

  privateAtomicReference<SynNode<E>> head,tail;//头尾哨兵节点

  publicSynchronousDualQueue(){

//初始化空队列,建立一个具备任意值的节点,并让头尾指针指向该节点

     SynNode<E> snode=new SynNode<E>(null,NodeType.ITEM);

     head=newAtomicReference<SynNode<E>>(snode);

     tail=newAtomicReference<SynNode<E>>(snode);

  }

//入队操做

  publicvoidenq(E item){

//建立将被入队的新节点

      SynNode<E> offer=newSynNode<E>(item,NodeType.ITEM);

      while(true){

//获取头尾哨兵节点

         SynNode<E> t=tail.get();

         SynNode<E> h=head.get();

//检验队列是否为空或者是否包含已入队元素的出队保留

         if(t==h||t.type==NodeType.ITEM){

//读取tail节点的后继

            SynNode<E> n=t.next.get();

//判断读取的tail值是一致的,即tail的状态不会被并发线程更改

            if(t==tail.get()){

//若是tail域没有指向队列的最后一个节点,则尝试推动tail,并从新开始

               if(n!=null){

//尝试推动tail

                   tail.compareAndSet(t,n);

               }

//若是tail域就是队列最后一个元素,那么尝试将tail的后继指向新增节点,//即将新增节点挂接到队尾

                 elseif(t.next.compareAndSet(n,offer)){

//若是成功挂接,那么尝试推动tail域,使其指向新增节点

                   tail.compareAndSet(t,offer);

//自旋等待,等待一个出队者经过设置该节点的item域为null来通知该元素已//经出队。这是由于这是一个紧密同步的队列,入队元素必须等待使用它的出队//

                   while(offer.item.get()==item);

//一旦出队成功,就尝试移动头指针,以便进行清理。这是由于出队是从头节点//出队。经过将新增节点设置为head哨兵节点,实现对被使用节点的清理

                   h=head.get();

                   if(offer==h.next.get()){

                      head.compareAndSet(h,offer);

                   }

                   return;

               }

            }

         }

//若是存在正在等待完成的出队者的保留,那么就找出一个保留并完成它

         else{

//读取head的后继节点

           SynNode<E> node=h.next.get();

//判断读到的值是一致的,即运行过程当中状态的一致性不能被并发线程所破坏,//主要经过尾指针是否一致、头指针是否一致、以及首节点是否为空来判断,这//些状态均可能在运行中被并发线程改变,所以只要有一个状态被改变,即认为//总体状态被破坏了,须要从新开始

           if(t!=tail.get()||h!=head.get()||node==null){

              continue;

           }

//若是状态一致,那么尝试着将节点的item域从null改成要入队的元素。由于//出队者保留等待的是一个入队者,因此要将item域由null置为入队元素的//item

           boolean success=node.item.compareAndSet(null,item);

//无论上一步是否成功,都尝试推动head

           head.compareAndSet(h,node);

//若是推动head成功,该方法返回,不然重试

           if(success){

              return;

           }

         }

      }

  }

//出队操做,其操做逻辑与入队基本类似,在此再也不赘述

public E deq(){

      E result=null;

      while(true){

         SynNode<E> h=head.get();

         SynNode<E> t=tail.get();

         if(h==t || h.type==NodeType.ITEM){

            SynNode<E> n=h.next.get();

            if(h==head.get()){

               if(n==null){

                   tail.compareAndSet(t,new SynNode<E>(null,NodeType.RESERVATION));

               }elseif(head.compareAndSet(h,n)){

                   while(n.item.get()==null);

                   t=tail.get();

                   if(n==t.next.get()){

                      result=n.item.get();

                      tail.compareAndSet(t,n);

                   }

                   return result;

               }

            }

         }

         else{

            SynNode<E> node=t.next.get();

            result=node.item.get();

            if(t!=tail.get()||h!=head.get()||node==null){

               continue;

            }

            boolean success=node.item.compareAndSet(result,null);

            tail.compareAndSet(t,node);

            if(success){

               return result;

            }

         }

      }

  }

}