线程间的通讯

 

线程间的通讯java

wait/notifydom

等待,通知机制的实现ide

wait()是Object类的方法,在调用wait()方法以前,线程必须得到该对象的对象级别锁,只能在同步中调用wait()方法this

notify()也要在同步方法或者同步块中调用,调用以前也要得到锁。spa

 

public class Test1 {线程

         public static void main(String[] args) {对象

 

                   try {队列

                            String newString = new String("");ip

                            newString.wait();资源

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

 

         }

}

结果:

java.lang.IllegalMonitorStateException

         at java.lang.Object.wait(Native Method)

         at java.lang.Object.wait(Object.java:503)

         at MoreThread.TestWaitAndNotify.Test1.main(Test1.java:8)

结果缘由:没有对象监视器,没有同步锁

public class Test2 {

         public static void main(String[] args) {

                   try {

                            String lock = new String();

                            System.out.println("syn before");

 

                            synchronized (lock) {

                                     System.out.println("syn begin");

                                     lock.wait();

                                     System.out.println("wait end");

                            }

                            System.out.println("syn end");

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

}

结果:程序一直处于等待状态。

notify方法

 

public class MyThread1 extends Thread {

         private Object lock;

 

         public MyThread1(Object lock) {

                   super();

                   this.lock = lock;

         }

 

         @Override

         public void run() {

                   try {

                            synchronized (lock) {

                                     System.out.println("开始 wait time=" + System.currentTimeMillis());

                                     lock.wait();

                                     System.out.println("结束 wait time=" + System.currentTimeMillis());

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

 

public class MyThread2 extends Thread {

         private Object lock;

 

         public MyThread2(Object lock) {

                   super();

                   this.lock = lock;

         }

 

         @Override

         public void run() {

                   synchronized (lock) {

                            System.out.println("开始 wait time=" + System.currentTimeMillis());

                            lock.notify();

                            System.out.println("结束 wait time=" + System.currentTimeMillis());

                   }

         }

}

public class Test3 {

         public static void main(String[] args) {

                   try {

                            Object lock = new Object();

                            MyThread1 t1 = new MyThread1(lock);

                            t1.start();

                            Thread.sleep(3000);

                            MyThread2 t2 = new MyThread2(lock);

                            t2.start();

                   } catch (Exception e) {

                            e.printStackTrace();

                  }

         }

}

结果:开始 wait time=1482979301067

开始 wait time=1482979304068

结束 wait time=1482979304068

结束 wait time=1482979304068

 

 

例子:在list中添加5个元素public class MyList {

 

         private static List list = new ArrayList();

 

         public static void add() {

                   list.add("anystring");

         }

 

         public static int size() {

                   return list.size();

         }

 

}

 

public class ThreadA extends Thread {

         private Object lock;

 

         public ThreadA(Object lock) {

                   super();

                   this.lock = lock;

         }

 

         @Override

         public void run() {

                   try {

                            synchronized (lock) {

                                     if (MyList.size() != 5) {

                                               System.out.println("wait begin " + System.currentTimeMillis());

                                               lock.wait();

                                               System.out.println("wait end" + System.currentTimeMillis());

 

                                     }

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

public class ThreadB extends Thread {

         private Object lock;

 

         public ThreadB(Object lock) {

                   super();

                   this.lock = lock;

         }

 

         @Override

         public void run() {

                   try {

                            synchronized (lock) {

                                     for (int i = 0; i < 10; i++) {

                                               MyList.add();

                                               if (MyList.size() == 5) {

                                                        lock.notify();

                                                        System.out.println("已经发出通知");

                                               }

                                               System.out.println("添加了" + (i + 1) + "个元素!");

                                               Thread.sleep(1000);

                                     }

 

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

 

public class Run {

         public static void main(String[] args) {

                   try {

                            Object lock = new Object();

                            ThreadA a = new ThreadA(lock);

                            a.start();

                            Thread.sleep(50);

                            ThreadB b = new ThreadB(lock);

                            b.start();

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

 

结果:wait begin 1482980039417

添加了1个元素!

添加了2个元素!

添加了3个元素!

添加了4个元素!

已经发出通知

添加了5个元素!

添加了6个元素!

添加了7个元素!

添加了8个元素!

添加了9个元素!

添加了10个元素!

wait end1482980049467

 

说明:notify()方法执行后并非当即释放锁,notify方法能够唤醒一个因调用了wait操做而处于柱塞状态中的线程,使其进入就绪状态。被从新唤醒的线程会试图从新得到临界区的控制权,也就是s锁,并继续执行临界区 内wait以后的代码。

 notify()方法能够随机唤醒等待队列中等待同一个共享资源的“一个”线程,并使该线程退出等待队列,进入可运行状态,也就是notufy()方法仅通知“一个”线程。

notifyAll()方法可使全部正在等待队列中等待同一共享资源的“所有”线程从等待状态中退出,进入可运行状态。此时,优先级最高的那个线程最早执行,但也有多是随机执行。

 

方法wait()锁释放与notify()锁不释放

例子:

public class Service {

         public void testMethod(Object lock) {

                   try {

                            synchronized (lock) {

                                     System.out.println("begin wait()");

                                     lock.wait();

                                     System.out.println("end wait()");

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

public class ThreadA extends Thread {

 

         private Object lock;

 

         public ThreadA(Object lock) {

                   super();

                   this.lock = lock;

         }

 

         @Override

         public void run() {

                   Service service = new Service();

                   service.testMethod(lock);

         }

}

 

 

public class ThreadB extends Thread {

 

         private Object lock;

 

         public ThreadB(Object lock) {

                   super();

                   this.lock = lock;

         }

 

         @Override

         public void run() {

                   Service service = new Service();

                   service.testMethod(lock);

         }

 

}

 

public class Test {

         public static void main(String[] args) {

                   Object lock = new Object();

                   ThreadA a = new ThreadA(lock);

                   a.start();

                   ThreadB b = new ThreadB(lock);

                   b.start();

         }

 

}

 

结果:两个线程都打印

begin wait()

begin wait()

说明wait时释放了锁

 

例子:notify方法不是当即释放锁

public class Service {

         public void testMethod(Object lock) {

                   try {

                            synchronized (lock) {

                                     System.out.println("begin wait()");

                                     lock.wait();

                                     System.out.println("end wait()");

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

         public void synNotifyMethod(Object lock) {

                   try {

                            synchronized (lock) {

                                     System.out.println("begin notify() ThreadName=" + Thread.currentThread().getName() + "time ="

                                                        + System.currentTimeMillis());

                                     lock.notify();

                                     Thread.sleep(5000);

                                     System.out.println("end notify() ThreadName=" + Thread.currentThread().getName() + "time="

                                                        + System.currentTimeMillis());

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

 

         }

}

 

public class ThreadA extends Thread {

         private Object lock;

 

         public ThreadA(Object lock) {

                   super();

                   this.lock = lock;

         }

 

         public void run() {

                   Service service = new Service();

                   service.testMethod(lock);

         }

}

 

public class NotifyThread extends Thread {

         private Object lock;

 

         public NotifyThread(Object lock) {

                   super();

                   this.lock = lock;

         }

 

         @Override

         public void run() {

                   Service service = new Service();

                   service.synNotifyMethod(lock);

                   super.run();

         }

 

}

 

 

public class synNotifyMethodThread extends Thread {

         private Object lock;

 

         public synNotifyMethodThread(Object lock) {

                   super();

                   this.lock = lock;

         }

         @Override

         public void run() {

                   Service service=new Service();

                   service.synNotifyMethod(lock);

         }

}

 

public class Test {

         public static void main(String[] args) {

                   Object lock = new Object();

                   ThreadA a = new ThreadA(lock);

                   a.start();

                   NotifyThread notifyThread = new NotifyThread(lock);

                   notifyThread.start();

                   synNotifyMethodThread c = new synNotifyMethodThread(lock);

                   c.start();

         }

}

 

 

结果:begin wait()

begin notify() ThreadName=Thread-2time =1482989890728

end notify() ThreadName=Thread-2time=1482989895728

end wait()

begin notify() ThreadName=Thread-1time =1482989895728

end notify() ThreadName=Thread-1time=1482989900728

 

必须执行完notify()所在的同步synchronized代码块后才会释放

 

 

当线程处于wait状态时,调用interrupt方法会出现InterruptedException

例子:

public class Service {

         public void testMethod(Object lock) {

                   try {

                            synchronized (lock) {

                                     System.out.println("begin wait()");

                                     lock.wait();

                                     System.out.println(" end wait()");

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                            System.out.println("出现了异常,由于wait状态的线程被interrupt了!");

                   }

         }

}

 

public class ThreadA extends Thread {

         private Object lock;

 

         public ThreadA(Object lock) {

                   super();

                   this.lock = lock;

         }

 

         @Override

         public void run() {

                   Service service = new Service();

                   service.testMethod(lock);

         }

} public class Test {

         public static void main(String[] args) {

                   try {

                            Object lock = new Object();

                            ThreadA a = new ThreadA(lock);

                            a.start();

                            Thread.sleep(5000);

                            a.interrupt();

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

结果:异常产生

执行完同步就会释放锁

异常终止也会释放锁

 

只通知一个线程

调用notify()一次只随机通知一个线程进行唤醒;

屡次调用notify能够唤醒多个等待中的线程

public class NotifyThread extends Thread {

         private Object lock;

 

         public NotifyThread(Object lock) {

                   super();

                   this.lock = lock;

         }

 

         @Override

         public void run() {

                   synchronized (lock) {

                            lock.notify();

                            lock.notify();

                            lock.notify();

 

                   }

         }

}

 

唤醒全部线程,为了唤醒全部线程,可使用notifyAll()方法。

 

方法wait(long):

功能是等待某一个时间是否有线程对锁进行唤醒,若是超过这个时间则自动唤醒。

 

 

通知过早

会打乱程序的正常运行逻辑

正常状况下的例子

public class MyRun {

          private String lock = new String("");

          private Runnable runnableA = new Runnable() {

                   public void run() {

                            try {

                                      synchronized (lock) {

                                               System.out.println("begin wait");

                                               lock.wait();

                                               System.out.println("end wait");

                                      }

                            } catch (Exception e) {

                                      e.printStackTrace();

                            }

                   };

          };

          private Runnable runnableB = new Runnable() {

                   public void run() {

                            synchronized (lock) {

                                      System.out.println("begin notify");

                                      lock.notify();

                                      System.out.println("end notify");

                            }

                   };

          };

 

          public static void main(String[] args) {

                   MyRun run = new MyRun();

                   Thread a = new Thread(run.runnableA);

                   a.start();

                   Thread b = new Thread(run.runnableB);

                   b.start();

 

          }

}

 

结果:

begin wait

begin notify

end notify

end wait

 

用状态标识控制程序执行流程;

public class MyRun {

         private String lock = new String("");

         private boolean isFirstRunB=false;

        

         private Runnable runnableA = new Runnable() {

                   public void run() {

                            try {

                                     synchronized (lock) {

                                               while(isFirstRunB==false){

                                                        System.out.println("begin wait");

                                                        lock.wait();

                                                        System.out.println("end wait");

                                               }

                                              

                                     }

                            } catch (Exception e) {

                                     e.printStackTrace();

                            }

                   };

         };

         private Runnable runnableB = new Runnable() {

                   public void run() {

                            synchronized (lock) {

                                     System.out.println("begin notify");

                                     lock.notify();

                                     System.out.println("end notify");

                                     isFirstRunB=true;

                            }

                   };

         };

 

         public static void main(String[] args) throws InterruptedException {

                   MyRun run = new MyRun();

                   Thread a = new Thread(run.runnableA);

                   a.start();

                   Thread.sleep(100);

                   Thread b = new Thread(run.runnableB);

                   b.start();

 

         }

}

等待wait的条件发生了变化

wait条件发生了变化,也容易形成程序逻辑的混乱;

例子

public class ValueObject {

         public static List list = new ArrayList();

 

}

 

public class ThreadSubtract extends Thread {

         private Subtract r;

 

         public ThreadSubtract(Subtract r) {

                   super();

                   this.r = r;

         }

 

         @Override

         public void run() {

                   r.substract();

         }

}

 

public class ThreadAdd extends Thread {

         private Add p;

 

         public ThreadAdd(Add p) {

                   super();

                   this.p = p;

 

         }

 

         public void run() {

                   p.add();

         }

}

 

 

public class Subtract {

         private String lock;

 

         public Subtract(String lock) {

                   super();

                   this.lock = lock;

         }

 

         public void substract() {

                   try {

                            synchronized (lock) {

                                     if (ValueObject.list.size() == 0) {

                                               System.out.println("wait begin ThreadName=" + Thread.currentThread().getName());

                                               lock.wait();

                                               System.out.println("wait end THreadName=" + Thread.currentThread().getName());

                                     }

                                     ValueObject.list.remove(0);

                                     System.out.println("listsize=" + ValueObject.list.size());

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

}

 

public class Add {

         private String lock;

         public Add(String lock){

                   super();

                   this.lock=lock;

         }

         public void add(){

                   synchronized(lock){

                            ValueObject.list.add("anyString");

                            lock.notifyAll();

                   }

         }

 

}

 

public class Run {

         public static void main(String[] args) throws InterruptedException {

                   String lock = new String("");

                   Add add = new Add(lock);

                   Subtract subtract = new Subtract(lock);

 

                   ThreadSubtract subtract1Thread = new ThreadSubtract(subtract);

                   subtract1Thread.setName("subtract1Thread");

                   subtract1Thread.start();

 

                   ThreadSubtract subtract2Thread = new ThreadSubtract(subtract);

                   subtract2Thread.setName("subtract2Thread");

                   subtract2Thread.start();

                   Thread.sleep(1000);

                   ThreadAdd addThread = new ThreadAdd(add);

                   addThread.setName("addThread");

                   addThread.start();

         }

 

}

 

结果:增长了一个值,但第二次移除的时候已经为空,没有对应的index:0

wait begin ThreadName=subtract1Thread

wait begin ThreadName=subtract2Thread

wait end THreadName=subtract2Thread

listsize=0

wait end THreadName=subtract1Thread

java.lang.IndexOutOfBoundsException: Index: 0, Size: 0

         at java.util.ArrayList.rangeCheck(ArrayList.java:604)

         at java.util.ArrayList.remove(ArrayList.java:445)

         at MoreThread.waitOld.Subtract.substract(Subtract.java:19)

         at MoreThread.waitOld.ThreadSubtract.run(ThreadSubtract.java:13)

 

 

修改程序中条件,使程序再也不继续向前执行remove方法,也就不会产生向下溢出。

 

 

生产者消费者模式的实现

 

等待/通知最经典的就是生产者消费者模式:

1.单个生产者和消费者:

例子:

 

 

public class C {

         private String lock;

 

         public C(String lock) {

                   super();

                   this.lock = lock;

         }

 

         public void getValue() {

                   try {

                            synchronized (lock) {

                                     if (ValueObject.value.equals("")) {

                                               lock.wait();

                                     }

                                     System.err.println("get的值是" + ValueObject.value);

                                     ValueObject.value = "";

                                     lock.notify();

                            }

 

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

}

 

public class P {

         private String lock;

 

         public P(String lock) {

                   super();

                   this.lock = lock;

         }

 

         public void setValue() {

                   try {

                            synchronized (lock) {

                                     if (!ValueObject.value.equals("")) {

                                               lock.wait();

                                     }

                                     String value = System.currentTimeMillis() + "_" + System.nanoTime();

                                     System.out.println("set的值是:" + value);

                                     ValueObject.value = value;

                                     lock.notify();

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

 

         }

}

 

 

public class ValueObject {

         public static String value = "";

}

 

public class ThreadP extends Thread {

         private P p;

 

         public ThreadP(P p) {

                   super();

                   this.p = p;

         }

 

         @Override

         public void run() {

                   while (true) {

                            p.setValue();

                   }

         }

}

 

public class ThreadC extends Thread {

         private C r;

 

         public ThreadC(C r) {

                   super();

                   this.r = r;

         }

 

         public void run() {

                   while (true) {

                            r.getValue();

                   }

         }

}

 

public class Run {

         public static void main(String[] args) {

                   String lock = new String("");

                   P p = new P(lock);

                   C r = new C(lock);

                   ThreadP pThread = new ThreadP(p);

                   ThreadC rthread = new ThreadC(r);

                   pThread.start();

                   rthread.start();

         }

}

多生产者和多消费者:

public class Run {

         public static void main(String[] args) throws InterruptedException {

                   String lock = new String("");

                   P p = new P(lock);

                   C r = new C(lock);

                   ThreadP[] pThread = new ThreadP[2];

                   ThreadC[] rthread = new ThreadC[2];

                   for (int i = 0; i < 2; i++) {

                            pThread[i] = new ThreadP(p);

 

                            pThread[i].setName("生产者" + (i + 1));

 

                            rthread[i] = new ThreadC(r);

                            rthread[i].setName("消费者" + (i + 1));

                            pThread[i].start();

                            rthread[i].start();

                   }

                   Thread.sleep(5000);

                   Thread[] threadArray = new Thread[Thread.currentThread().getThreadGroup().activeCount()];

                   Thread.currentThread().getThreadGroup().enumerate(threadArray);

                   for (int i = 0; i < threadArray.length; i++) {

                            System.out.println(threadArray[i].getName() + "" + threadArray[i].getState());

                   }

 

         }

}

 

public class C {

         private String lock;

 

         public C(String lock) {

                   super();

                   this.lock = lock;

         }

 

         public void getValue() {

                   try {

                            synchronized (lock) {

                                     while (ValueObject.value.equals("")) {

                                               System.out.println("消费者" + Thread.currentThread().getName() + "waiting 了★★★☆☆☞☝☜☚✘█▌▎▓");

                                               lock.wait();

                                     }

 

                                     System.err.println("消费者" + Thread.currentThread().getName() + "runnable 了★★★☆☆☞☝☜☚✘█▌▎▓");

                                     ValueObject.value = "";

                                     lock.notify();

                            }

 

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

}

 

 

public class P {

         private String lock;

 

         public P(String lock) {

                   super();

                   this.lock = lock;

         }

 

         public void setValue() {

                   try {

                            synchronized (lock) {

                                     while (!ValueObject.value.equals("")) {

                                              

                                               System.out.println("生产者"+Thread.currentThread().getName()+"waiting 了★★★☆☆☞☝☜☚✘█▌▎▓");

                                               lock.wait();

                                     }

                                     System.out.println("生产者"+Thread.currentThread().getName()+"runnable 了★★★☆☆☞☝☜☚✘█▌▎▓");

                                     String value = System.currentTimeMillis() + "_" + System.nanoTime();

                                     System.out.println("set的值是:" + value);

                                     ValueObject.value = value;

                                     lock.notify();

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

 

         }

}

 

 

public class ThreadC extends Thread {

         private C r;

 

         public ThreadC(C r) {

                   super();

                   this.r = r;

         }

 

         public void run() {

                   while (true) {

                            r.getValue();

                   }

         }

}

 

public class ThreadP extends Thread {

         private P p;

 

         public ThreadP(P p) {

                   super();

                   this.p = p;

         }

 

         @Override

         public void run() {

                   while (true) {

                            p.setValue();

                   }

         }

}

 

public class ValueObject {

         public static String value = "";

}

 

出现假死状态,全部的生产者和消费者都在等待状态;多是连续唤醒了同类,同时唤醒异类就能解决问题;

 

 

多生产者和多消费者解决死锁的问题:操做值将notify改成notifyAll;

单个生产者和消费者交替执行,操做栈:

public class Run {

         public static void main(String[] args) {

                   MyStack myStack = new MyStack();

                   P p = new P(myStack);

                   C r = new C(myStack);

                   P_Thread pthread = new P_Thread(p);

                   C_Thread rthread = new C_Thread(r);

                   pthread.start();

                   rthread.start();

 

         }

}

 

 

public class C_Thread extends Thread {

         private C r;

 

         public C_Thread(C r) {

                   super();

                   this.r = r;

         }

 

         public void run() {

                   while (true) {

                            r.popService();

                   }

         }

 

}

public class P_Thread extends Thread {

         private P p;

 

         public P_Thread(P p) {

                   super();

                   this.p = p;

         }

 

         @Override

         public void run() {

                   while (true) {

                            p.pushService();

                   }

         }

 

}

 

public class C {

         private MyStack myStack;

 

         public C(MyStack myStack) {

                   super();

                   this.myStack = myStack;

         }

 

         public void popService() {

                   System.out.println("pop=" + myStack.pop());

         }

 

}

 

public class P {

         private MyStack myStack;

 

         public P(MyStack myStack) {

                   super();

                   this.myStack = myStack;

         }

 

         public void pushService() {

                   myStack.push();

         }

}

 

public class MyStack {

         private List list = new ArrayList();

 

         synchronized public void push() {

                   try {

                            if (list.size() == 1) {

                                     this.wait();

                            }

                            list.add("anyString=" + Math.random());

                            this.notify();

                            System.out.println("push=" + list.size());

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

         synchronized public String pop() {

                   String returnValue = "";

                   try {

                            if (list.size() == 0) {

                                     System.out.println("pop中的:" + Thread.currentThread().getName() + "线程wait状态");

                                     this.wait();

                            }

                            returnValue = "" + list.get(0);

                            list.remove(0);

                            this.notify();

                            System.out.println("pop=" + list.size());

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

                   return returnValue;

         }

 

}

 

一个生产者和多个消费者

 

多生产者和一个消费者:操做栈

建立多个生产者线程

多生产者与多消费者

 

 

经过管道进行线程间通讯:字节流

public class Run {

         public static void main(String[] args) {

                   try {

                            WriteData writeData = new WriteData();

                            ReadData readData = new ReadData();

                            PipedInputStream inputStream = new PipedInputStream();

 

                            PipedOutputStream outputStream = new PipedOutputStream();

                            outputStream.connect(inputStream);

                            ThreadRead threadRead = new ThreadRead(readData, inputStream);

                            threadRead.start();

                            Thread.sleep(2000);

                            ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream);

                            threadWrite.start();

                   } catch (Exception e) {

                           

                   }

         }

}

public class ThreadRead extends Thread {

         public ThreadRead(ReadData read, PipedInputStream input) {

                   super();

                   this.read = read;

                   this.input = input;

         }

 

         private ReadData read;

         private PipedInputStream input;

 

         @Override

         public void run() {

                   read.readMethod(input);

         }

}

 

public class ThreadWrite extends Thread {

         private WriteData write;

         private PipedOutputStream out;

 

         public ThreadWrite(WriteData write, PipedOutputStream out) {

                   super();

                   this.write = write;

                   this.out = out;

         }

         @Override

         public void run() {

                   write.writeMethod(out);

         }

}

 

public class ReadData {

         synchronized public void readMethod(PipedInputStream input) {

                   try {

                            System.out.println("read:");

                            byte[] byteArray = new byte[20];

                            int readLength = input.read(byteArray);

                            while (readLength != -1) {

                                     String newData = new String(byteArray, 0, readLength);

                                     System.out.println(newData);

                                     readLength = input.read(byteArray);

 

                            }

                            System.out.println();

                            input.close();

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

public class WriteData {

 

         synchronized public void writeMethod(PipedOutputStream out) {

                   try {

                            System.out.println("write:");

                           

                            for (int i = 0; i < 20; i++) {

                                     String outData = "" + (i + 1);

                                     out.write(outData.getBytes());

                                     System.out.println(outData);

                            }

                            System.out.println();

                            out.close();

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

}

 

 

结果:两个线程经过管道流成功进行了数据的传输。

 

经过管道进行线程间通讯:字符流

经过PipedReader,PipedWriter,char[]等实现字符流的读写。

 

 

 

等待通知交叉;

例子:

public class DBTools {

         volatile private boolean prevIsA = false;

 

         synchronized public void backupA() {

                   try {

                            while (prevIsA == true) {

                                     wait();

                            }

                            for (int i = 0; i < 1; i++) {

                                     System.out.println("★★★★★");

                            }

                            prevIsA = true;

                            notifyAll();

 

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

         synchronized public void backupB() {

                   try {

                            while (prevIsA == false) {

                                     wait();

                            }

                            for (int i = 0; i < 1; i++) {

                                     System.out.println("☆☆☆☆☆");

                            }

                            prevIsA = false;

                            notifyAll();

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

}

 

public class BackupA extends Thread {

         private DBTools dbtools;

 

         public BackupA(DBTools dbtools) {

                   super();

                   this.dbtools = dbtools;

         }

 

         @Override

         public void run() {

                   dbtools.backupA();

         }

 

}

 

public class BackupB extends Thread{

         private DBTools dbtools;

 

         public BackupB(DBTools dbtools) {

                   super();

                   this.dbtools = dbtools;

         }

 

         @Override

         public void run() {

                   dbtools.backupB();

         }

}

 

 

public class Run {

         public static void main(String[] args) {

                   DBTools dbtools = new DBTools();

                   for (int i = 0; i < 20; i++) {

                            BackupB output = new BackupB(dbtools);

                            output.start();

                            BackupA input = new BackupA(dbtools);

                            input.start();

                   }

         }

}

 

交叉打印内容

join方法的使用:

使所属的线程对象正确的执行run中的方法,使当前线程处于无限期的阻塞,等待线程销毁以后再继执行线程后面的代码。

 

具备使线程排队运行的做用

 

join()方法和interrupt同时使用时的异常java.lang.InterruptedException

代码:

public class ThreadA extends Thread {

         @Override

         public void run() {

                   for (int i = 0; i < Integer.MAX_VALUE; i++) {

                            String newString = new String();

                            Math.random();

                   }

         }

}

 

public class ThreadB extends Thread {

         @Override

         public void run() {

                   try {

                            ThreadA a = new ThreadA();

                            a.start();

                            a.join();

                            System.out.println("b线程在run end 处打印了");

                   } catch (Exception e) {

                            System.out.println("b线程在catch  处打印了");

                            e.printStackTrace();

                   }

         }

}

 

public class ThreadC extends Thread {

         private ThreadB threadB;

 

         public ThreadC(ThreadB threadB) {

                   super();

                   this.threadB = threadB;

         }

 

         @Override

         public void run() {

                   threadB.interrupt();

         }

}

 

public class Run {

         public static void main(String[] args) {

                   try {

                            ThreadB b = new ThreadB();

                            b.start();

                            Thread.sleep(500);

                            ThreadC c = new ThreadC(b);

                            c.start();

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

方法join(long)的使用

设置等待的时间

public class MyThread extends Thread {

         @Override

         public void run() {

                   try {

                            System.out.println("begin Timer=" + System.currentTimeMillis());

                            Thread.sleep(5000);

 

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

public class Test {

         public static void main(String[] args) {

                   try {

                            MyThread threadTest = new MyThread();

                            threadTest.start();

                            threadTest.join(2000);

                            System.out.println("  end timer=" + System.currentTimeMillis());

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

结果:只等待了2秒,join(long time)

和sleep(long time)的区别:join(long)是在颞部使用wait(long)方法来实现的,具备释放锁的特色。Thread.sleep(long) 方法却不释放锁。

 

public class JoinDemo {

         public final synchronized void join(long mills) {

                   long base = System.currentTimeMillis();

                   long now = 0;

                   if (mills < 0) {

                            throw new IllegalArgumentException("timeout value is negative");

                   }

                   if (mills == 0) {

                            while (isAlive()) {

                                     wait(0);

                            }

                   } else {

                            while (isAlive()) {

                                     long delay = mills - now;

                                     if (delay <= 0) {

                                               break;

                                     }

                                     wait(delay);

                                     now = System.currentTimeMillis() - base;

                            }

                   }

 

         }

}

 

join()后面的代码提早运行:出现意外

解释意外:

 

几个线程抢锁,根据不一样的状况抢到锁的状况是不同的,就会出现不一样的结果。

 

ThreadLocal的使用:

变量值的共享能够经过public static 变量的形式,全部的线程都是用同一个static 变量;如何让每个线程都有本身的共享变量?ThreadLocal正式为了解决这样的问题。将每一个线程绑定本身的值

public class Run {

         public static ThreadLocal t1 = new ThreadLocal();

 

         public static void main(String[] args) {

                   if (t1.get() == null) {

                            System.out.println("从未放过值");

                            t1.set("个人值");

                   }

                   System.out.println(t1.get());

                   System.out.println(t1.get());

         }

 

}

 

验证线程变量的隔离性

例子1

 

 

public class Tools {

         public static ThreadLocal t1 = new ThreadLocal();

}

public class ThreadB extends Thread {

         @Override

         public void run() {

                   try {

                            for (int i = 0; i < 100; i++) {

                                     Tools.t1.set("TrheadB" + (i + 1));

                                     System.out.println("ThreadB get Value=" + Tools.t1.get());

                                     Thread.sleep(200);

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

 

public class ThreadA extends Thread {

         @Override

         public void run() {

                   try {

                            for (int i = 0; i < 100; i++) {

                                     Tools.t1.set("TrheadA" + (i + 1));

                                     System.out.println("ThreadA get Value=" + Tools.t1.get());

                                     Thread.sleep(200);

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

public class Run {

         public static void main(String[] args) {

                   try {

                            ThreadA a = new ThreadA();

                            ThreadB b = new ThreadB();

                            a.start();

                            b.start();

                            for (int i = 0; i < 100; i++) {

                                     Tools.t1.set("main" + (i + 1));

                                     System.out.println("Main get value=" + Tools.t1.get());

                                     Thread.sleep(200);

 

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

结果:虽然3个线程都向t1对象中et()数据值,可是每一个线程仍是能取出本身的数据

例子2

public class Tools {

         public static ThreadLocal<Date> t1 = new ThreadLocal<Date>();

}

public class ThreadA extends Thread {

         @Override

         public void run() {

                   try {

                            for (int i = 0; i < 20; i++) {

                                     if (Tools.t1.get() == null) {

                                               Tools.t1.set(new Date());

                                     }

                                     System.out.println("A " + Tools.t1.get().getTime());

                                     Thread.sleep(100);

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

 

public class ThreadB extends Thread {

         @Override

         public void run() {

                   try {

                            for (int i = 0; i < 20; i++) {

                                     if (Tools.t1.get() == null) {

                                               Tools.t1.set(new Date());

                                     }

                                     System.out.println("B " + Tools.t1.get().getTime());

                                     Thread.sleep(100);

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

public class Run {

         public static void main(String[] args) {

                   try {

                            ThreadA a = new ThreadA();

                            a.start();

                            Thread.sleep(1000);

                            ThreadB b = new ThreadB();

                            b.start();

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

 

给ThreadLocal赋初值,值再也不为null,重写方法InitialValue()

例子:

public class Run {

         public static ThreadLocalExt t1 = new ThreadLocalExt();

 

         public static void main(String[] args) {

                   if (t1.get() == null) {

                            System.out.println("我从未放过值");

                            t1.set("个人值");

                   }

                   System.out.println(t1.get());

                   System.out.println(t1.get());

         }

}

public class ThreadLocalExt extends ThreadLocal {

         @Override

         protected Object initialValue() {

                  

                   return "我是默认值,再也不为null";

         }

}

结果,证实main线程中有本身的值,那么其余线程是否会有本身的初始值呢?

一样能够证实其余线程也有本身独立的值

再次验证线程变量的隔离性:

类InheritableThreadLocal使用

让子线程从父线程中取得值。

例子以下:public class Tools {

         public static InheritableThreadLocalExt t1 = new InheritableThreadLocalExt();

}

 

public class ThreadA extends Thread {

         @Override

         public void run() {

                   try {

                            for (int i = 0; i < 10; i++) {

                                     System.out.println("在ThreadA线程中取值" + Tools.t1.get());

                                     Thread.sleep(100);

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

public class InheritableThreadLocalExt extends InheritableThreadLocal {

         @Override

         protected Object initialValue() {

                   return new Date().getTime();

         }

        

         @Override

         protected Object childValue(Object parentValue) {

                   return parentValue+"我在子线程加的~!";

         }

}

 

 

public class Run {

         public static void main(String[] args) {

                   try {

                            for (int i = 0; i < 10; i++) {

                                     System.out.println("在Main线程中取值:" + Tools.t1.get());

                                     Thread.sleep(100);

                            }

                            ThreadA a = new ThreadA();

                            a.start();

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}