diff options
Diffstat (limited to 'warc/warc.go')
-rw-r--r-- | warc/warc.go | 71 |
1 files changed, 45 insertions, 26 deletions
diff --git a/warc/warc.go b/warc/warc.go index 1184d9c..a552085 100644 --- a/warc/warc.go +++ b/warc/warc.go @@ -3,9 +3,10 @@ package warc import ( - "bufio" + "errors" "fmt" "io" + "strings" "time" "compress/gzip" @@ -57,7 +58,7 @@ func (h Header) Encode(w io.Writer) error { return err } } - _, err := fmt.Fprintf(w, "\r\n") + _, err := io.WriteString(w, "\r\n") return err } @@ -74,20 +75,22 @@ func NewHeader() Header { // Writer can write records to a file in WARC format. It is safe // for concurrent access, since writes are serialized internally. type Writer struct { - writer io.WriteCloser - bufwriter *bufio.Writer - gzwriter *gzip.Writer - lockCh chan bool + writer rawWriter + lockCh chan struct{} } type recordWriter struct { - io.Writer - lockCh chan bool + io.WriteCloser + lockCh chan struct{} } func (rw *recordWriter) Close() error { // Add the end-of-record marker. - _, err := fmt.Fprintf(rw, "\r\n\r\n") + _, err := io.WriteString(rw, "\r\n\r\n") + if err != nil { + return err + } + err = rw.WriteCloser.Close() <-rw.lockCh return err } @@ -98,41 +101,57 @@ func (rw *recordWriter) Close() error { // is satisfied. If this function returns an error, the state of the // Writer is invalid and it should no longer be used. func (w *Writer) NewRecord(hdr Header) (io.WriteCloser, error) { - w.lockCh <- true - if w.gzwriter != nil { - w.gzwriter.Close() // nolint + w.lockCh <- struct{}{} + + if err := w.writer.NewRecord(); err != nil { + return nil, err } - var err error - w.gzwriter, err = gzip.NewWriterLevel(w.bufwriter, gzip.BestCompression) + + gzwriter, err := gzip.NewWriterLevel(w.writer, gzip.BestCompression) if err != nil { return nil, err } - w.gzwriter.Header.Name = hdr.Get("WARC-Record-ID") - if err = hdr.Encode(w.gzwriter); err != nil { + gzwriter.Header.Name = hdr.Get("WARC-Record-ID") + if err = hdr.Encode(gzwriter); err != nil { return nil, err } - return &recordWriter{Writer: w.gzwriter, lockCh: w.lockCh}, nil + return &recordWriter{ + WriteCloser: gzwriter, + lockCh: w.lockCh, + }, nil } // Close the WARC writer and flush all buffers. This will also call // Close on the wrapped io.WriteCloser object. func (w *Writer) Close() error { - if err := w.gzwriter.Close(); err != nil { - return err - } - if err := w.bufwriter.Flush(); err != nil { - return err - } + w.lockCh <- struct{}{} // do not release + defer close(w.lockCh) // pending NewRecord calls will panic? return w.writer.Close() } // NewWriter initializes a new Writer and returns it. func NewWriter(w io.WriteCloser) *Writer { return &Writer{ - writer: w, - bufwriter: bufio.NewWriter(w), + writer: newSimpleWriter(w), // Buffering is important here since we're using this // channel as a semaphore. - lockCh: make(chan bool, 1), + lockCh: make(chan struct{}, 1), + } +} + +// NewMultiWriter initializes a new Writer that writes its output to +// multiple files of limited size approximately equal to maxSize, +// rotating them when necessary. The input path should contain a +// literal '%s' token, which will be replaced with a (lexically +// sortable) unique token. +func NewMultiWriter(pattern string, maxSize uint64) (*Writer, error) { + if !strings.Contains(pattern, "%s") { + return nil, errors.New("input path is not a pattern") } + return &Writer{ + writer: newMultiWriter(pattern, maxSize), + // Buffering is important here since we're using this + // channel as a semaphore. + lockCh: make(chan struct{}, 1), + }, nil } |