diff options
author | ale <ale@incal.net> | 2019-01-02 09:53:42 +0000 |
---|---|---|
committer | ale <ale@incal.net> | 2019-01-02 09:53:42 +0000 |
commit | c5ec7eb826bfd08aa6e8dd880efa15930f78ba19 (patch) | |
tree | 7c7d5fcfc55922cf78a97001b7ca4b879b747d28 /warc/warc.go | |
parent | 3518feaf05fcb7f745975851c6684a63532ff19a (diff) | |
download | crawl-c5ec7eb826bfd08aa6e8dd880efa15930f78ba19.tar.gz crawl-c5ec7eb826bfd08aa6e8dd880efa15930f78ba19.zip |
Add multi-file output
The output stage can now write to size-limited, rotating WARC files
using a user-specified pattern, so that output files are always
unique.
Diffstat (limited to 'warc/warc.go')
-rw-r--r-- | warc/warc.go | 71 |
1 files changed, 45 insertions, 26 deletions
diff --git a/warc/warc.go b/warc/warc.go index 1184d9c..a552085 100644 --- a/warc/warc.go +++ b/warc/warc.go @@ -3,9 +3,10 @@ package warc import ( - "bufio" + "errors" "fmt" "io" + "strings" "time" "compress/gzip" @@ -57,7 +58,7 @@ func (h Header) Encode(w io.Writer) error { return err } } - _, err := fmt.Fprintf(w, "\r\n") + _, err := io.WriteString(w, "\r\n") return err } @@ -74,20 +75,22 @@ func NewHeader() Header { // Writer can write records to a file in WARC format. It is safe // for concurrent access, since writes are serialized internally. type Writer struct { - writer io.WriteCloser - bufwriter *bufio.Writer - gzwriter *gzip.Writer - lockCh chan bool + writer rawWriter + lockCh chan struct{} } type recordWriter struct { - io.Writer - lockCh chan bool + io.WriteCloser + lockCh chan struct{} } func (rw *recordWriter) Close() error { // Add the end-of-record marker. - _, err := fmt.Fprintf(rw, "\r\n\r\n") + _, err := io.WriteString(rw, "\r\n\r\n") + if err != nil { + return err + } + err = rw.WriteCloser.Close() <-rw.lockCh return err } @@ -98,41 +101,57 @@ func (rw *recordWriter) Close() error { // is satisfied. If this function returns an error, the state of the // Writer is invalid and it should no longer be used. func (w *Writer) NewRecord(hdr Header) (io.WriteCloser, error) { - w.lockCh <- true - if w.gzwriter != nil { - w.gzwriter.Close() // nolint + w.lockCh <- struct{}{} + + if err := w.writer.NewRecord(); err != nil { + return nil, err } - var err error - w.gzwriter, err = gzip.NewWriterLevel(w.bufwriter, gzip.BestCompression) + + gzwriter, err := gzip.NewWriterLevel(w.writer, gzip.BestCompression) if err != nil { return nil, err } - w.gzwriter.Header.Name = hdr.Get("WARC-Record-ID") - if err = hdr.Encode(w.gzwriter); err != nil { + gzwriter.Header.Name = hdr.Get("WARC-Record-ID") + if err = hdr.Encode(gzwriter); err != nil { return nil, err } - return &recordWriter{Writer: w.gzwriter, lockCh: w.lockCh}, nil + return &recordWriter{ + WriteCloser: gzwriter, + lockCh: w.lockCh, + }, nil } // Close the WARC writer and flush all buffers. This will also call // Close on the wrapped io.WriteCloser object. func (w *Writer) Close() error { - if err := w.gzwriter.Close(); err != nil { - return err - } - if err := w.bufwriter.Flush(); err != nil { - return err - } + w.lockCh <- struct{}{} // do not release + defer close(w.lockCh) // pending NewRecord calls will panic? return w.writer.Close() } // NewWriter initializes a new Writer and returns it. func NewWriter(w io.WriteCloser) *Writer { return &Writer{ - writer: w, - bufwriter: bufio.NewWriter(w), + writer: newSimpleWriter(w), // Buffering is important here since we're using this // channel as a semaphore. - lockCh: make(chan bool, 1), + lockCh: make(chan struct{}, 1), + } +} + +// NewMultiWriter initializes a new Writer that writes its output to +// multiple files of limited size approximately equal to maxSize, +// rotating them when necessary. The input path should contain a +// literal '%s' token, which will be replaced with a (lexically +// sortable) unique token. +func NewMultiWriter(pattern string, maxSize uint64) (*Writer, error) { + if !strings.Contains(pattern, "%s") { + return nil, errors.New("input path is not a pattern") } + return &Writer{ + writer: newMultiWriter(pattern, maxSize), + // Buffering is important here since we're using this + // channel as a semaphore. + lockCh: make(chan struct{}, 1), + }, nil } |