diff options
author | Russ Cox <rsc@golang.org> | 2010-02-01 17:43:15 -0800 |
---|---|---|
committer | Russ Cox <rsc@golang.org> | 2010-02-01 17:43:15 -0800 |
commit | 2a5d30fbe71033d76fb4a323c289b527d6b61074 (patch) | |
tree | 517031b18ea84991c418ace7fa9cc5bbc264ddae | |
parent | 5db5f68d960baf60be5a5c9c729948e26d500358 (diff) | |
download | go-2a5d30fbe71033d76fb4a323c289b527d6b61074.tar.gz go-2a5d30fbe71033d76fb4a323c289b527d6b61074.zip |
io: revised Pipe implementation
* renamed channels to say what gets sent
* use channel closed status instead of racy check of boolean
R=nigeltao_golang
CC=golang-dev
https://golang.org/cl/196065
-rw-r--r-- | src/pkg/io/pipe.go | 102 |
1 files changed, 41 insertions, 61 deletions
diff --git a/src/pkg/io/pipe.go b/src/pkg/io/pipe.go index 909989ae6a..fe70634468 100644 --- a/src/pkg/io/pipe.go +++ b/src/pkg/io/pipe.go @@ -2,8 +2,8 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Pipe adapter to connect code expecting an io.Read -// with code expecting an io.Write. +// Pipe adapter to connect code expecting an io.Reader +// with code expecting an io.Writer. package io @@ -12,54 +12,43 @@ import ( "sync" ) -type pipeReturn struct { - n int - err os.Error -} - // Shared pipe structure. type pipe struct { - rclosed bool // Read end closed? - rerr os.Error // Error supplied to CloseReader - wclosed bool // Write end closed? - werr os.Error // Error supplied to CloseWriter - wpend []byte // Written data waiting to be read. - wtot int // Bytes consumed so far in current write. - cr chan []byte // Write sends data here... - cw chan pipeReturn // ... and reads the n, err back from here. + rclosed bool // Read end closed? + rerr os.Error // Error supplied to CloseReader + wclosed bool // Write end closed? + werr os.Error // Error supplied to CloseWriter + wpend []byte // Written data waiting to be read. + wtot int // Bytes consumed so far in current write. + cw chan []byte // Write sends data here... + cr chan bool // ... and reads a done notification from here. } func (p *pipe) Read(data []byte) (n int, err os.Error) { - if p == nil || p.rclosed { + if p.rclosed { return 0, os.EINVAL } // Wait for next write block if necessary. if p.wpend == nil { - if !p.wclosed { - p.wpend = <-p.cr + if !closed(p.cw) { + p.wpend = <-p.cw } - if p.wclosed { + if closed(p.cw) { return 0, p.werr } p.wtot = 0 } // Read from current write block. - n = len(data) - if n > len(p.wpend) { - n = len(p.wpend) - } - for i := 0; i < n; i++ { - data[i] = p.wpend[i] - } + n = copy(data, p.wpend) p.wtot += n p.wpend = p.wpend[n:] // If write block is done, finish the write. if len(p.wpend) == 0 { p.wpend = nil - p.cw <- pipeReturn{p.wtot, nil} + p.cr <- true p.wtot = 0 } @@ -67,58 +56,52 @@ func (p *pipe) Read(data []byte) (n int, err os.Error) { } func (p *pipe) Write(data []byte) (n int, err os.Error) { - if p == nil || p.wclosed { + if p.wclosed { return 0, os.EINVAL } - if p.rclosed { + if closed(p.cr) { return 0, p.rerr } - // Send data to reader. - p.cr <- data + // Send write to reader. + p.cw <- data // Wait for reader to finish copying it. - res := <-p.cw - return res.n, res.err + <-p.cr + if closed(p.cr) { + _, _ = <-p.cw // undo send if reader is gone + return 0, p.rerr + } + return len(data), nil } func (p *pipe) CloseReader(rerr os.Error) os.Error { - if p == nil || p.rclosed { + if p.rclosed { return os.EINVAL } - - // Stop any future writes. p.rclosed = true + + // Wake up writes. if rerr == nil { rerr = os.EPIPE } p.rerr = rerr - - // Stop the current write. - if !p.wclosed { - p.cw <- pipeReturn{p.wtot, rerr} - } - + close(p.cr) return nil } func (p *pipe) CloseWriter(werr os.Error) os.Error { - if werr == nil { - werr = os.EOF - } - if p == nil || p.wclosed { + if p.wclosed { return os.EINVAL } - - // Stop any future reads. p.wclosed = true - p.werr = werr - // Stop the current read. - if !p.rclosed { - p.cr <- nil + // Wake up reads. + if werr == nil { + werr = os.EOF } - + p.werr = werr + close(p.cw) return nil } @@ -212,12 +195,9 @@ func (w *PipeWriter) finish() { w.Close() } // Reads on one end are matched with writes on the other, // copying data directly between the two; there is no internal buffering. func Pipe() (*PipeReader, *PipeWriter) { - p := new(pipe) - p.cr = make(chan []byte, 1) - p.cw = make(chan pipeReturn, 1) - r := new(PipeReader) - r.p = p - w := new(PipeWriter) - w.p = p - return r, w + p := &pipe{ + cw: make(chan []byte, 1), + cr: make(chan bool, 1), + } + return &PipeReader{p: p}, &PipeWriter{p: p} } |