博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
JDK源码阅读之PipedInoutStream与PipedOutputStream
阅读量:4320 次
发布时间:2019-06-06

本文共 16473 字,大约阅读时间需要 54 分钟。

前言:

     在java中,PipedOutputStream和PipedInputStream分别是管道输出流和管道输入流。它们的作用是让多线程可以通过管道进行线程间的通讯。在使用管道通信时,必须将PipedOutputStream和PipedInputStream配套使用。如果使用同一个线程处理两个相关联的管道流时,read()方法和write()方法调用时会导致流阻塞,可能会导致线程死锁。

PipedOutputStream

1 public class PipedOutputStream extends OutputStream { 2 //持有一个PipedInputStream对象,PipedOutputStream类里后续很多操作都需要用到此对象 3     private PipedInputStream sink; 4  5 //构造函数,将本类的对象与一个特定的PipedInputStream对象关联 6 public PipedOutputStream(PipedInputStream snk)  throws IOException { 7     connect(snk); 8 } 9 10 //默认构造函数,创建对象后必须调用connect(PipedInputStream snk)方法才能正常工作11 public PipedOutputStream() {12 }13 14 //将“管道输出流” 和 “管道输入流”连接。15 public synchronized void connect(PipedInputStream snk) throws IOException {16     if (snk == null) {
//传入的对象不能为空,否则就抛出异常17 throw new NullPointerException();18 } else if (sink != null || snk.connected) {
//不能重复连接19 throw new IOException("Already connected");20 }21 sink = snk;22 //修改连接的PipedInputStream的成员变量, 使其处于已连接状态.以下//三个变量是在PipedInputStream中定义的,将在PipedInputStream中详//细介绍23 snk.in = -1;24 snk.out = 0;25 snk.connected = true;26 }27 28 //29 public void write(int b) throws IOException {30 if (sink == null) {31 // 确保已经连接32 throw new IOException("Pipe not connected");33 }34 //调用PipedInputStream里的方法35 sink.receive(b);36 }37 38 //将字节数组b写入“管道输出流”中。39 // 将数组b写入“管道输出流”之后,它会将其传输给“管道输入流”40 public void write(byte b[], int off, int len) throws IOException {41 if (sink == null) {42 throw new IOException("Pipe not connected");43 } else if (b == null) {44 throw new NullPointerException();45 } else if ((off < 0) || (off > b.length) || (len < 0) ||46 ((off + len) > b.length) || ((off + len) < 0)) {47 throw new IndexOutOfBoundsException();48 } else if (len == 0) {49 return;50 }51 /*52 以上代码 保证53 1. 已经连接54 2. 输出数组b不为空55 3. off和len不会导致数组越界56 */57 sink.receive(b, off, len);58 }59 /*60 * 从上可以看出, 两个write方法, 最后都调用了响应的PipedInputStream#receive方法, 这表明61 数据存储的地方和写数据的具体逻辑都在PipedInputStream中62 */63 64 65 66 67 /*清空“管道输出流”。68 这里会调用“管道输入流”的notifyAll();69 目的是让“管道输入流”放弃对当前资源的占有,让其它的等待线程(等待读取管道输出流的线程)读取“管道输出流”的值。70 */71 public synchronized void flush() throws IOException {72 if (sink != null) {73 synchronized (sink) {74 sink.notifyAll();75 }76 }77 }78 79 /*80 * 这个方法就是简单的调用了PipedInputStream的receivedLast()方法, 81 * 从方法名可以判断出, 这个方法就是通知PipedInputStream, 数据已经填充完毕.82 * 关闭之后,会调用receivedLast()通知“管道输入流”它已经关闭83 */84 public void close() throws IOException {85 if (sink != null) {86 sink.receivedLast();87 }88 }89 }
View Code

总结:

 从上面的分析可以看出, PipedOutputStream不会对数据进行实际的操作, 也不承担具体的职责, 只负责把数据交给PipedInputStream处理.

下面我们接着分析最关键的PipedInputStream的源码

PipedInputStream

1 public class PipedInputStream extends InputStream {  2      // “管道输出流”是否关闭的标记  3     boolean closedByWriter = false;  4     // “管道输入流”是否关闭的标记  5     volatile boolean closedByReader = false;  6     //是否已经连接的标记  7     boolean connected = false;  8     //读线程  9     Thread readSide; 10    11     /*readSide和writeSide是一种简单的标记读写线程的方式, 源码注释中也有说明这种方式并不可靠,  12                  这 种方式针对的应该是两条线程的情况, 所以我们使用的时候应该尽量按照设计意图来使用 13                 在两条线程中建立"管道"传递数据, 写线程写数据, 读线程读数据. 14      */ 15      16     //写线程 17     Thread writeSide; 18  19     //管道循环输入缓冲区的默认大小 20     private static final int DEFAULT_PIPE_SIZE = 1024; 21  22    23     protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE; 24      25     // 放置传入数据的循环缓冲区 26     protected byte buffer[]; 27  28    /* 循环缓冲区中位置的索引,当从连接的管道输出流中接收到下一个数据字节时,会将其存储到该位置。 29     *  in<0 意味着缓冲区为空, in==out 意味着缓冲区已满,具体原因后面详细解释 30     */ 31     protected int in = -1; 32      33      34     //循环缓冲区中位置的索引,此管道输入流将从该位置读取下一个数据字节 35     protected int out = 0; 36  37     38      39     //创建 PipedInputStream,使其连接到管道输出流 src。写入 src 的数据字节可用作此流的输入 40     public PipedInputStream(PipedOutputStream src) throws IOException { 41         this(src, DEFAULT_PIPE_SIZE); 42     } 43  44      45    /*创建一个 PipedInputStream,使其连接到管道输出流 src, 46     * 并对管道缓冲区使用指定的管道大小。 写入 src 的数据字节可用作此流的输入。 47    */ 48     public PipedInputStream(PipedOutputStream src, int pipeSize) 49             throws IOException { 50          initPipe(pipeSize); 51          connect(src); 52     } 53  54   //创建尚未 连接的 PipedInputStream。在使用前必须将其 连接到 PipedOutputStream 55     public PipedInputStream() { 56         initPipe(DEFAULT_PIPE_SIZE); 57     } 58      59  60    /*创建一个尚未 连接的 PipedInputStream, 61                并对管道缓冲区使用指定的管道大小。在使用前必须将其 连接到 PipedOutputStream。 62     */ 63     public PipedInputStream(int pipeSize) { 64         initPipe(pipeSize); 65     } 66      67       68      69     //对byte数组buffer变量进行赋值, 也就是初始化缓冲区域 70     private void initPipe(int pipeSize) { 71          if (pipeSize <= 0) { 72             throw new IllegalArgumentException("Pipe Size <= 0"); 73          } 74          buffer = new byte[pipeSize]; 75     } 76  77  /*直接调用了PipedOutputStream的connect, 上面已经分析过了, 最终效果就是指明PipedOutputStream的连接对象,  78   * 改变connected变量的值, 使得PipedInputStream处于连接状态. 79   */ 80     public void connect(PipedOutputStream src) throws IOException { 81         src.connect(this); 82     } 83  84      85      86      87     /*通过上面PipedOutputStream的分析可以知道, 写数据的方法会调用PipedInputStream的reveive方法 88      *  89      */ 90     protected synchronized void receive(int b) throws IOException { 91         //检查当前"管道"状态, 确保能够读写数据 92         checkStateForReceive(); 93          94         //本方法由PipedOutputStream所在的线程调用, 所以线程是写线程, 记录该线程 95         writeSide = Thread.currentThread(); 96          97         // in == out表示缓存数组已经满了, 阻塞写线程 98         // 这里确保了未读的缓存数据不会丢失 99         if (in == out)100             awaitSpace();101         102         // 当检测到缓存数组有空间, 等待结束后, 会继续执行以下代码103         if (in < 0) {
//小于0表示缓存中无数据,此时设置读与写的位置,104 in = 0;//设置为0是因为要从0号索引开始往缓存中写入数据105 out = 0;//设置为0是因为要从0号索引开从缓存中读取数据106 }107 // 写操作108 // 1. 把数据写到目标位置(in)109 // 2. 后移in, 指明下一个写数据的位置110 buffer[in++] = (byte)(b & 0xFF);//&0xff是为了保证二进制数据的一致性,具体原因跟反码,int和byte的位数有关111 if (in >= buffer.length) {
// 如果in超出缓存长度, 回到0, 循环利用缓存数组112 in = 0;113 }114 }115 116 117 synchronized void receive(byte b[], int off, int len) throws IOException {118 ///检查当前"管道"状态, 确保能够读写数据119 checkStateForReceive();120 121 //因为这个放在是由PipedOutputStream的对象调用的,所以当前线程为写入线程122 writeSide = Thread.currentThread();123 124 // len是需要写进缓存数据的总长度125 // bytesToTransfer用来记录剩余个数126 int bytesToTransfer = len;127 //循环写入128 while (bytesToTransfer > 0) {129 //in==out表示缓冲已满,调用awaitSpace()阻塞此线程130 if (in == out)131 awaitSpace();132 133 //记录本次写入过程中写进缓冲中的个数134 int nextTransferAmount = 0;135 if (out < in) {136 // 因为out必然大于等于0, 所以这里 0 <= out < int137 // out < in 表示[in, buffer.length)和[0, out)两个区间可以写数据138 // 先写数据进[in, buffer.length)区间, 避免处理头尾连接的逻辑, 如果还有数据剩余, 留到下一个循环处理139 nextTransferAmount = buffer.length - in;140 } else if (in < out) {141 if (in == -1) {142 //in==-1这表示缓存数组为空143 in = out = 0;//将in和out设为0表示写入数据从0开始,读取也要从零开始144 nextTransferAmount = buffer.length - in;145 } else {146 // in < out 表示[in, out)区间可以写数据147 nextTransferAmount = out - in;148 }149 }150 /*151 * 本次可以写入到缓存中的数据个数比还需要的数据个数要多,修改nextTransferAmount,152 * 比如缓存数组中还有5个位置可以写入数据,但此时只需2个数据b[]数组就满了,所以重置153 * nextTransferAmount=2,让他再写入2个数据。154 155 */156 if (nextTransferAmount > bytesToTransfer)157 nextTransferAmount = bytesToTransfer;158 assert(nextTransferAmount > 0);159 //把数据写进缓存160 System.arraycopy(b, off, buffer, in, nextTransferAmount);161 // 计算剩余个数162 bytesToTransfer -= nextTransferAmount;163 // 移动数据起点164 off += nextTransferAmount;165 //移动in166 in += nextTransferAmount;167 // 如果in超出缓存长度, 回到0168 if (in >= buffer.length) {169 in = 0;170 }171 }172 }173 174 /*175 *在写数据前会先通过checkStateForReceive检查"管道"状态, 确保176 当前处于连接状态177 管道读写两端都没有被关闭178 读线程状态正常179 */180 private void checkStateForReceive() throws IOException {181 if (!connected) {182 throw new IOException("Pipe not connected");183 } else if (closedByWriter || closedByReader) {184 throw new IOException("Pipe closed");185 } else if (readSide != null && !readSide.isAlive()) {186 throw new IOException("Read end dead");187 }188 }189 190 191 /*192 判断目标位置(in), 如果in == out表明当前缓存数组已经满了, 193 不能再写数据了, 所以会通过awaitSpace()方法阻塞写线程;194 */195 private void awaitSpace() throws IOException {196 // 只有缓存数组已满才需要等待197 while (in == out) {198 // 检查管道状态, 防止在等待的过程中状态发生变化199 checkStateForReceive();200 //因为Java推荐仅使用读写两条线程,所以这里可以来理解为唤醒读线程201 notifyAll();202 try {203 // 释放对象锁, 等待读线程读数据, 调用后就会阻塞写线程204 // 1s后取消等待是为了再次检查管道状态205 // 注意等待结束后, 锁仍然在写线程206 wait(1000);207 } catch (InterruptedException ex) {208 throw new java.io.InterruptedIOException();209 }210 }211 }212 213 //当输入端关闭时(调用PipedOutputStream#close()), 会调用receivedLast()214 //该方法使用变量标记输入端已经关闭, 表示不会有新数据写入了.215 synchronized void receivedLast() {216 closedByWriter = true;//此方法由PipedOutputStream对象调用,代表由writer线程关闭217 notifyAll();218 }219 220 221 public synchronized int read() throws IOException {222 223 //检查状态224 if (!connected) {225 throw new IOException("Pipe not connected");226 } else if (closedByReader) {227 throw new IOException("Pipe closed");228 } else if (writeSide != null && !writeSide.isAlive()229 && !closedByWriter && (in < 0)) {230 // 为什么是in<0?因为如果in >= 0, 表示还有数据没有读, 所以不抛出异常231 // 这个判断表明了, 即使输入端已经调用了close, 也能继续读已经写入的数据232 throw new IOException("Write end dead");233 }234 //由PipedInoutStream对象所在的线程调用,所以此时当前线程为读取线程235 readSide = Thread.currentThread();236 int trials = 2;237 while (in < 0) {238 // in<0表示缓存区域为空, 只要输入端没有被关闭, 阻塞线程等待数据写入, 即等待in >= 0239 if (closedByWriter) {240 /* closed by writer, return EOF */241 return -1;242 }243 if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {244 throw new IOException("Pipe broken");245 }246 /* 可以理解为等待写入线程 */247 notifyAll();248 try {249 // 阻塞线程, 等待1s, 这里会释放锁, 给机会写线程获取锁, 写数据250 wait(1000);251 } catch (InterruptedException ex) {252 throw new java.io.InterruptedIOException();253 }254 }255 // 执行到这里证明in >= 0, 即缓存数组中有数据256 // 关键的读操作257 // 1. 读取out指向的byte数据258 // 2. 后移out259 // 3. 把byte转成int, 高位补0,保证数据的一致性260 int ret = buffer[out++] & 0xFF;261 if (out >= buffer.length) {262 out = 0;263 }264 if (in == out) {265 // 读取的数据追上了输入的数据, 则当前缓存区域为空, 所以设置in = -1266 in = -1;267 }268 269 return ret;270 }271 272 273 public synchronized int read(byte b[], int off, int len) throws IOException {274 275 //执行判断,确保可以正常读写276 if (b == null) {277 throw new NullPointerException();278 } else if (off < 0 || len < 0 || len > b.length - off) {279 throw new IndexOutOfBoundsException();280 } else if (len == 0) {281 return 0;282 }283 284 /* 先读取一个数据是为了确保有数据可以,如果此时无数据可读,就会阻塞当前线程,唤醒写线程 */285 int c = read();286 if (c < 0) {
//其实如果c<0,c就只能等于-1287 return -1;288 }289 b[off] = (byte) c;//这里不&0xff是因为已经在read()方法里转换了290 int rlen = 1;291 // in >= 0确保还有数据可以读292 // len > 1确保只读取外部请求的数据长度, 因为上面已经读了1个数据, 所以是大于1, 而不是大于0293 while ((in >= 0) && (len > 1)) {294 295 // available用来记录当前可以读取的数据296 int available;297 298 if (in > out) {299 // in > out表示[out, in)区间数据可读,感觉这里有点多余,因为在receive方法中,只要in>length300 //in就会被设为0301 available = Math.min((buffer.length - out), (in - out));302 } else {303 // 首先in是不会等于out的, 因为如果相等, 在上面读第一个数据的时候就会把in赋值-1, 也就不会进入这个循环304 // 当in < out表示[out, buffer.length)和[0, in)两个区间的数据可读305 // 和receive方法类似, 为了不处理跨边界的情况, 先读[out, buffer.length)区间数据306 available = buffer.length - out;307 }308 309 // 外部已经读了一个数据, 所以只需要读(len - 1)个数据了310 if (available > (len - 1)) {311 available = len - 1;312 }313 System.arraycopy(buffer, out, b, off + rlen, available);314 out += available;315 rlen += available;316 len -= available;317 318 if (out >= buffer.length) {319 out = 0;320 }321 if (in == out) {322 /* now empty */323 in = -1;324 }325 }326 return rlen;327 }328 329 330 public synchronized int available() throws IOException {331 if(in < 0)332 return 0;333 else if(in == out)334 return buffer.length;335 else if (in > out)336 return in - out;337 else338 return in + buffer.length - out;339 }340 341 342 public void close() throws IOException {343 closedByReader = true;344 synchronized (this) {345 in = -1;346 }347 }348 }
View Code

总结:

      现在来解释一下receive和read方法交替执行中缓存数组发生的变化,如下图,这是一个大小为9的缓存数组!下标从0到8

   

-1

0

1

2

3

4

5

6

7

8

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

     在初始状态,int==-1,out==0

 

-1

0

1

2

3

4

5

6

7

8

 

out

 

 

 

 

 

 

 

 

In

 

 

 

 

 

 

 

 

 

     当我们第一次调用receive方法时,将执行此方法里的以下代码

 

1 if (in < 0) {
//小于0表示缓存中无数据,此时设置读与写的位置, 2 in = 0;//设置为0是因为要从0号索引开始往缓存中写入数据 3 out = 0;//设置为0是因为要从0号索引开从缓存中读取数据 4 } 5 // 写操作 6 // 1. 把数据写到目标位置(in) 7 // 2. 后移in, 指明下一个写数据的位置 8 buffer[in++] = (byte)(b & 0xFF);//&0xff是为了保证二进制数据的一致性,具体原因跟反码,int和byte的位数有关 9 if (in >= buffer.length) {
// 如果in超出缓存长度, 回到0, 循环利用缓存数组10 in = 0;11 }
View Code

 

    

-1

0

1

2

3

4

5

6

7

8

 

out

 

 

 

 

 

 

 

 

 

in

 

 

 

 

 

 

 

 

设置为0是因为要从0号索引开始往缓存中写入数据,然后写入数据

-1

0

1

2

3

4

5

6

7

8

 

out

 

 

 

 

 

 

 

 

 

data

data

data

data

data

data

dat

data

in

如果in超出缓存长度, 回到0,如果此时再调用receive方法,就会执行此方法的以下代码,(假设不会发生异常)

if (in == out)            awaitSpace();

这也就是为什么in==out表示数据已经写满缓存数组了,awaitSpace()会阻塞此进程,唤醒读线程,让他读取数组中的数据。

接下来观察read方法执行过程,当read所在的线程被唤醒后,因为此时数组中存在数据,那么就会执行方法内的以下代码:

int ret = buffer[out++] & 0xFF;        if (out >= buffer.length) {            out = 0;        }        if (in == out) {             // 读取的数据追上了输入的数据, 则当前缓存区域为空, 所以设置in = -1            in = -1;        }

 

-1

0

1

2

3

4

5

6

7

8

 

 

 

 

 

 

out

 

 

 

 

 

 

 

 

 

data

dat

data

in

当out==in时,此时说明数据已经被读取完,将in设为-1是为了接下来调用receive方法来继续往缓存中写入数据,如果继续调用read方法,就会执行此方法内的以下代码:

//不考虑异常,假设所有线程均正常工作

while (in < 0) {              // in<0表示缓存区域为空, 只要输入端没有被关闭, 阻塞线程等待数据写入, 即等待in >= 0            if (closedByWriter) {                /* closed by writer, return EOF */                return -1;            }            if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {                throw new IOException("Pipe broken");            }            /* 可以理解为等待写入线程 */            notifyAll();            try {                // 阻塞线程, 等待1s, 这里会释放锁, 给机会写线程获取锁, 写数据                wait(1000);            } catch (InterruptedException ex) {                throw new java.io.InterruptedIOException();            }

此后唤醒写线程,执行receive方法。

转载于:https://www.cnblogs.com/lls101/p/11130928.html

你可能感兴趣的文章
Linux 安装及配置 Nginx + ftp 服务器
查看>>
this 函数执行上下文
查看>>
Three.js 3D打印数据模型文件(.STL)载入中
查看>>
HOG特征-理解篇
查看>>
晨会的重要性
查看>>
poj 3735 大数量反复操作问题(矩阵高速幂)
查看>>
自定义扩展Compare比较方法
查看>>
Swift 中的函数
查看>>
IOS开发关于测试的好的网址资源
查看>>
ArcGIS影像配准与空间配准
查看>>
考研日记第一篇
查看>>
个人简介
查看>>
Lucky Conversion(找规律)
查看>>
自定义一个处理图片的HttpHandler
查看>>
2、函数及常用模块
查看>>
Oracle按周统计数据的几种方法
查看>>
FileStream随机文件访问(访问文件中间某点的数据)
查看>>
bootstrap table单元格样式,行样式以及分页显示全部的设置
查看>>
洛谷P1106 删数问题
查看>>
数据库的自增字段代码生成器——解决不同数据库自增字段的差异机制
查看>>