java / mysql · 16 4 月, 2021 0

线程间的通信四种方式

主要是总结线程间的通信方式有哪些.

通信方式

  1. 通过synchronized进行通信

public class MyObject {
   synchronized public void methodA() {
       //do something....
  }
   synchronized public void methodB() {
       //do some other thing
  }
}
​
public class ThreadA extends Thread {
   private MyObject object;
   //省略构造方法
   @Override
   public void run() {
       super.run();
       object.methodA();
  }
}
​
public class ThreadB extends Thread {
   private MyObject object;
//省略构造方法
   @Override
   public void run() {
       super.run();
       object.methodB();
  }
}
​
public class Run {
   public static void main(String[] args) {
       MyObject object = new MyObject();
       //线程A与线程B 持有的是同一个对象:object
       ThreadA a = new ThreadA(object);
       ThreadB b = new ThreadB(object);
       a.start();
       b.start();
  }
}

 

在上例中, 因为ThreadAThreadB持有有共同的变量object, 因此通过synchronized的方式可以实现两个线程的串行执行. 以为最终被锁住的应该是object对象。 同时根据happend-before原则, 释放锁在获取所之前进行的, 因此线程对同一个对象的修改可以同步给另一个线程。

这种方式是通过共享内存的方式实现两个线程的通信。

  1. 通过while循环的方式

@Test
public void testWhile() throws InterruptedException {
​
 ObjectValue objectValue = new ObjectValue();
​
 Thread thread = new Thread(new Runnable() {
   @Override
   public void run() {
    while(true) {
      for (int i = 0, len = 10; i < len; i++) {
         objectValue.add();
​
        System.out.println("添加了" + (i + 1) + "个元素!");
        // 该处会有一个sleep的操作,为了让工作内存有时机能够把数据同步到主内存之中
//           try {
//             Thread.sleep(1000);
//           } catch (InterruptedException e) {
//             e.printStackTrace();
//           }
      }
    }
  }
});
​
 Thread threadB = new Thread(new Runnable() {
   @Override
   public void run() {
     while (true) {
       System.out.println("ThreadB 获取线程的size: " + objectValue.size());
       // 该条件会被击穿, 因为数据延迟写入主内存之中
       if (objectValue.size() == 5) {
         System.out.println("==5条件成立, 线程B退出");
         break;
      }
    }
  }
});
​
 thread.start();
 threadB.start();
​
 thread.join();
 threadB.join();
}
​
private class ObjectValue {
   private List<String> list = new ArrayList<>();
​
   public void add() {
     list.add("elements");
  }
​
   public int size() {
     return list.size();
  }
}

 

以上程序存在以下问题:

  • 线程B是一个空循环, 没有做其他的事情, 比较耗费CPU的资源

  • ObjectValue中的list中的值会使得工作内存到主内存的数据同步延迟

  • (可见性): 线程B中的条件==5的条件可能会被击穿, 因为线程B始终都是读取的本地内存的缓存, 不能保证数据同时是最新的数据, 因此可能会导致死循环

  • 该处ObjectValue中的list使用volatile也不能解决问题, 因为volatile只是保证了线程每次都能够读到主内存中最新的数据, 但是主内存的数据可能在线程B处理过程中, 被修改了很多次.

  1. wait()和notify()的方式

import java.util.ArrayList;
 import java.util.List;
​
 public class MyList {
     private static List<String> list = new ArrayList<String>();
     public static void add() {
         list.add("anyString");
    }
    public static int size() {
        return list.size();
    }
}
​
public class ThreadA extends Thread {
​
    private Object lock;
    public ThreadA(Object lock) {
        super();
        this.lock = lock;
    }
    @Override
    public void run() {
        try {
            synchronized (lock) {
                if (MyList.size() != 5) {
                    System.out.println("wait begin "
                            + System.currentTimeMillis());
                    lock.wait();
                    System.out.println("wait end "
                            + System.currentTimeMillis());
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
​
​
public class ThreadB extends Thread {
    private Object lock;
​
    public ThreadB(Object lock) {
        super();
        this.lock = lock;
    }
​
    @Override
   public void run() {
       try {
            synchronized (lock) {
                for (int i = 0; i < 10; i++) {
                  MyList.add();
                    if (MyList.size() == 5) {
                        lock.notify();
                        System.out.println("已经发出了通知");
                    }
                    System.out.println("添加了" + (i + 1) + "个元素!");
                    Thread.sleep(1000);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
​
public class Run {
    public static void main(String[] args) {
        try {
            Object lock = new Object();
            ThreadA a = new ThreadA(lock);
            a.start();
            Thread.sleep(50);
            ThreadB b = new ThreadB(lock);
            b.start();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

以上方式是通过两个线程共享一个lock的对象, 以此作为每个线程的出发点, 线程B在list的size到了5的时候, 就通知在lock阻塞的线程。

该方式的缺陷:

  • wait的线程必须执行在notify之前, 否则可能会导致线程无知被执行的情况。

  1. 管道通信 使用java.io.PipedInputStream 和 java.io.PipedOutputStream进行通信, 这两个徐璈成对的使用

public class ThreadPipeExchangeDataTest {
​
 /**
  * 测试通过管道进行线程间的通信
  */
 @Test
 public void testPiped() throws IOException, InterruptedException {
   PipedOutputStream pipedOutputStream = new PipedOutputStream();
   PipedInputStream pipedInputStream = new PipedInputStream();
   pipedInputStream.connect(pipedOutputStream);
​
   ThreadRead threadRead = new ThreadRead(pipedInputStream);
   threadRead.start();
​
   Thread.sleep(200);
​
   ThreadWrite threadWrite = new ThreadWrite(pipedOutputStream);
   threadWrite.start();
}
​
 /**
  * 该线程用于读取数据
  */
 private class ThreadRead extends Thread {
​
   private PipedInputStream pipedInputStream;
​
   public ThreadRead(PipedInputStream inputStream) {
     this.pipedInputStream = inputStream;
  }
​
   @Override
   public void run() {
     readData();
  }
​
   private void readData() {
     System.out.println("开始读取数据");
     byte[] bytes = new byte[20];
     try {
       int readLength = this.pipedInputStream.read(bytes);
       while (readLength != -1) {
         String str = new String(bytes, 0, readLength);
         System.out.println("获取到数据:" + str);
         readLength = this.pipedInputStream.read(bytes);
      }
​
       System.out.println();
       this.pipedInputStream.close();
    } catch (IOException e) {
       e.printStackTrace();
    }
  }
}
​
 private class ThreadWrite extends Thread {
​
   private PipedOutputStream pipedOutputStream;
​
   public ThreadWrite(PipedOutputStream pipedOutputStream) {
     this.pipedOutputStream = pipedOutputStream;
  }
​
   @Override
   public void run() {
     write();
  }
​
   private void write() {
     System.out.print("write: ");
     try {
       for (int i = 0, len = 10; i < len; i++) {
         String outData = " " + i;
​
         this.pipedOutputStream.write(outData.getBytes());
         System.out.print(outData);
      }
​
       System.out.println();
       this.pipedOutputStream.close();
    } catch (IOException e) {
       e.printStackTrace();
    }
  }
}
}