aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRuss Cox <rsc@golang.org>2010-02-01 17:43:15 -0800
committerRuss Cox <rsc@golang.org>2010-02-01 17:43:15 -0800
commit2a5d30fbe71033d76fb4a323c289b527d6b61074 (patch)
tree517031b18ea84991c418ace7fa9cc5bbc264ddae
parent5db5f68d960baf60be5a5c9c729948e26d500358 (diff)
downloadgo-2a5d30fbe71033d76fb4a323c289b527d6b61074.tar.gz
go-2a5d30fbe71033d76fb4a323c289b527d6b61074.zip
io: revised Pipe implementation
* renamed channels to say what gets sent * use channel closed status instead of racy check of boolean R=nigeltao_golang CC=golang-dev https://golang.org/cl/196065
-rw-r--r--src/pkg/io/pipe.go102
1 files changed, 41 insertions, 61 deletions
diff --git a/src/pkg/io/pipe.go b/src/pkg/io/pipe.go
index 909989ae6a..fe70634468 100644
--- a/src/pkg/io/pipe.go
+++ b/src/pkg/io/pipe.go
@@ -2,8 +2,8 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-// Pipe adapter to connect code expecting an io.Read
-// with code expecting an io.Write.
+// Pipe adapter to connect code expecting an io.Reader
+// with code expecting an io.Writer.
package io
@@ -12,54 +12,43 @@ import (
"sync"
)
-type pipeReturn struct {
- n int
- err os.Error
-}
-
// Shared pipe structure.
type pipe struct {
- rclosed bool // Read end closed?
- rerr os.Error // Error supplied to CloseReader
- wclosed bool // Write end closed?
- werr os.Error // Error supplied to CloseWriter
- wpend []byte // Written data waiting to be read.
- wtot int // Bytes consumed so far in current write.
- cr chan []byte // Write sends data here...
- cw chan pipeReturn // ... and reads the n, err back from here.
+ rclosed bool // Read end closed?
+ rerr os.Error // Error supplied to CloseReader
+ wclosed bool // Write end closed?
+ werr os.Error // Error supplied to CloseWriter
+ wpend []byte // Written data waiting to be read.
+ wtot int // Bytes consumed so far in current write.
+ cw chan []byte // Write sends data here...
+ cr chan bool // ... and reads a done notification from here.
}
func (p *pipe) Read(data []byte) (n int, err os.Error) {
- if p == nil || p.rclosed {
+ if p.rclosed {
return 0, os.EINVAL
}
// Wait for next write block if necessary.
if p.wpend == nil {
- if !p.wclosed {
- p.wpend = <-p.cr
+ if !closed(p.cw) {
+ p.wpend = <-p.cw
}
- if p.wclosed {
+ if closed(p.cw) {
return 0, p.werr
}
p.wtot = 0
}
// Read from current write block.
- n = len(data)
- if n > len(p.wpend) {
- n = len(p.wpend)
- }
- for i := 0; i < n; i++ {
- data[i] = p.wpend[i]
- }
+ n = copy(data, p.wpend)
p.wtot += n
p.wpend = p.wpend[n:]
// If write block is done, finish the write.
if len(p.wpend) == 0 {
p.wpend = nil
- p.cw <- pipeReturn{p.wtot, nil}
+ p.cr <- true
p.wtot = 0
}
@@ -67,58 +56,52 @@ func (p *pipe) Read(data []byte) (n int, err os.Error) {
}
func (p *pipe) Write(data []byte) (n int, err os.Error) {
- if p == nil || p.wclosed {
+ if p.wclosed {
return 0, os.EINVAL
}
- if p.rclosed {
+ if closed(p.cr) {
return 0, p.rerr
}
- // Send data to reader.
- p.cr <- data
+ // Send write to reader.
+ p.cw <- data
// Wait for reader to finish copying it.
- res := <-p.cw
- return res.n, res.err
+ <-p.cr
+ if closed(p.cr) {
+ _, _ = <-p.cw // undo send if reader is gone
+ return 0, p.rerr
+ }
+ return len(data), nil
}
func (p *pipe) CloseReader(rerr os.Error) os.Error {
- if p == nil || p.rclosed {
+ if p.rclosed {
return os.EINVAL
}
-
- // Stop any future writes.
p.rclosed = true
+
+ // Wake up writes.
if rerr == nil {
rerr = os.EPIPE
}
p.rerr = rerr
-
- // Stop the current write.
- if !p.wclosed {
- p.cw <- pipeReturn{p.wtot, rerr}
- }
-
+ close(p.cr)
return nil
}
func (p *pipe) CloseWriter(werr os.Error) os.Error {
- if werr == nil {
- werr = os.EOF
- }
- if p == nil || p.wclosed {
+ if p.wclosed {
return os.EINVAL
}
-
- // Stop any future reads.
p.wclosed = true
- p.werr = werr
- // Stop the current read.
- if !p.rclosed {
- p.cr <- nil
+ // Wake up reads.
+ if werr == nil {
+ werr = os.EOF
}
-
+ p.werr = werr
+ close(p.cw)
return nil
}
@@ -212,12 +195,9 @@ func (w *PipeWriter) finish() { w.Close() }
// Reads on one end are matched with writes on the other,
// copying data directly between the two; there is no internal buffering.
func Pipe() (*PipeReader, *PipeWriter) {
- p := new(pipe)
- p.cr = make(chan []byte, 1)
- p.cw = make(chan pipeReturn, 1)
- r := new(PipeReader)
- r.p = p
- w := new(PipeWriter)
- w.p = p
- return r, w
+ p := &pipe{
+ cw: make(chan []byte, 1),
+ cr: make(chan bool, 1),
+ }
+ return &PipeReader{p: p}, &PipeWriter{p: p}
}