6.3交换器(Exchangers)

一个exchanger提供一个同步的点,这个会擦除对象。每个在记录中出现的对象,都会去执行exchanger的exchange()的方法,匹配它的父线程,和接收返回父对象返回值。Exchanger在实际应用中颇有做用,像遗传算法和流水线设计。
     java.util.concurrent.Exchanger<V>继承exchanger的同步者。你能够执行Exchanger()的构造器初始化一个exchanger。你还能够执行下面的方法:
  V exchange(V x):在这个exchange的点上等待其它线程的到达(除非这个请求线程被打断),和转换对象给它,接受其它线程返回的对象。若是在这个exchange的点上已经有线程在等待,它会将这个线程加入线程列表中,和接受正在请求线程的对象。在其它线程,经过exchanger接受对象,那么当前线程会快速返回。若是请求的线程被打断,那么将会抛出InterruptedException。
  V exchange(V x, long timeout, TimeUnit unit):这个方法与前面的方法基本上是同样的,只是让你明确要等待时间多长。若是在等待期间超时,那么将会抛出TimeoutException。

Listing 6-3 运用Exchanger去擦除缓冲。java

package com.owen.thread.chapter6;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;

public class ExchangerDemo
{

	final static Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
	final static DataBuffer initialEmptyBuffer = new DataBuffer();
	final static DataBuffer initialFullBuffer = new DataBuffer("I");

	public static void main(String[] args)
	{
		class FillingLoop implements Runnable
		{
			int count = 0;

			@Override
			public void run()
			{
				DataBuffer currentBuffer = initialEmptyBuffer;
				try

				{
					while (true)
					{
						addToBuffer(currentBuffer);
						if (currentBuffer.isFull())
						{
							System.out
									.println("filling thread wants to exchange");
							currentBuffer = exchanger.exchange(currentBuffer);
							System.out
									.println("filling thread receives exchange");
						}
					}
				} catch (InterruptedException ie)
				{
					System.out.println("filling thread interrupted");
				}
			}

			void addToBuffer(DataBuffer buffer)
			{
				String item = "NI" + count++;
				System.out.println("Adding: " + item);
				buffer.add(item);
			}
		}
		class EmptyingLoop implements Runnable
		{
			@Override
			public void run()
			{
				DataBuffer currentBuffer = initialFullBuffer;
				try
				{
					while (true)
					{
						takeFromBuffer(currentBuffer);
						if (currentBuffer.isEmpty())
						{
							System.out.println("emptying thread wants to "
									+ "exchange");
							currentBuffer = exchanger.exchange(currentBuffer);
							System.out.println("emptying thread receives "
									+ "exchange");
						}
					}
				}

				catch (InterruptedException ie)
				{
					System.out.println("emptying thread interrupted");
				}
			}

			void takeFromBuffer(DataBuffer buffer)
			{
				System.out.println("taking: " + buffer.remove());
			}
		}
		new Thread(new EmptyingLoop()).start();
		new Thread(new FillingLoop()).start();
	}
}

class DataBuffer
{
	private final static int MAXITEMS = 10;
	private final List<String> items = new ArrayList<>();

	DataBuffer()
	{
	}

	DataBuffer(String prefix)
	{
		for (int i = 0; i < MAXITEMS; i++)
		{
			String item = prefix + i;
			System.out.printf("Adding %s%n", item);
			items.add(item);
		}
	}

	synchronized void add(String s)
	{
		if (!isFull())
			items.add(s);
	}

	synchronized boolean isEmpty()
	{
		return items.size() == 0;
	}

	synchronized boolean isFull()
	{
		return items.size() == MAXITEMS;
	}

	synchronized String remove()
	{
		if (!isEmpty())
			return items.remove(0);
		return null;
	}
}

主线程建立一个exchanger和初始化一对缓存对象。以后,建立EmptyingLoop和FillingLoop的类,各自都继承Runnable实现线程,经过start就能够调用这个线程了(这里用的执行器(exector))。每个run()的方法都在添加或从缓存中移除。当缓存为充足或为空时,exchanger会去擦除缓存让填充或空继续出现。上面例子执行结果:算法

Adding I0
Adding I1
Adding I2
Adding I3
Adding I4
Adding I5
Adding I6
Adding I7
Adding I8
Adding I9
taking: I0
taking: I1
taking: I2
taking: I3
taking: I4
taking: I5
taking: I6
taking: I7
taking: I8
taking: I9
emptying thread wants to exchange
Adding: NI0
Adding: NI1
Adding: NI2
Adding: NI3
Adding: NI4
Adding: NI5
Adding: NI6
Adding: NI7
Adding: NI8
Adding: NI9
filling thread wants to exchange
filling thread receives exchange
emptying thread receives exchange
Adding: NI10
taking: NI0
Adding: NI11
taking: NI1
Adding: NI12缓存