diff options
author | ale <ale@incal.net> | 2019-01-02 09:53:42 +0000 |
---|---|---|
committer | ale <ale@incal.net> | 2019-01-02 09:53:42 +0000 |
commit | c5ec7eb826bfd08aa6e8dd880efa15930f78ba19 (patch) | |
tree | 7c7d5fcfc55922cf78a97001b7ca4b879b747d28 /warc | |
parent | 3518feaf05fcb7f745975851c6684a63532ff19a (diff) | |
download | crawl-c5ec7eb826bfd08aa6e8dd880efa15930f78ba19.tar.gz crawl-c5ec7eb826bfd08aa6e8dd880efa15930f78ba19.zip |
Add multi-file output
The output stage can now write to size-limited, rotating WARC files
using a user-specified pattern, so that output files are always
unique.
Diffstat (limited to 'warc')
-rw-r--r-- | warc/multi.go | 121 | ||||
-rw-r--r-- | warc/warc.go | 71 | ||||
-rw-r--r-- | warc/warc_test.go | 138 |
3 files changed, 304 insertions, 26 deletions
diff --git a/warc/multi.go b/warc/multi.go new file mode 100644 index 0000000..a18ceb8 --- /dev/null +++ b/warc/multi.go @@ -0,0 +1,121 @@ +package warc + +import ( + "bufio" + "encoding/binary" + "encoding/hex" + "fmt" + "io" + "os" + "sync/atomic" + "time" +) + +func newID() string { + var b [8]byte + binary.BigEndian.PutUint64(b[:], uint64(time.Now().UnixNano())) + return hex.EncodeToString(b[:]) +} + +type meteredWriter struct { + io.WriteCloser + bytes uint64 +} + +func (m *meteredWriter) Write(b []byte) (int, error) { + n, err := m.WriteCloser.Write(b) + if n > 0 { + atomic.AddUint64(&m.bytes, uint64(n)) + } + return n, err +} + +func (m *meteredWriter) Bytes() uint64 { + return atomic.LoadUint64(&m.bytes) +} + +type bufferedWriter struct { + *bufio.Writer + io.Closer +} + +func newBufferedWriter(w io.WriteCloser) *bufferedWriter { + return &bufferedWriter{ + Writer: bufio.NewWriter(w), + Closer: w, + } +} + +func (w *bufferedWriter) Close() error { + if err := w.Writer.Flush(); err != nil { + return err + } + return w.Closer.Close() +} + +func openFile(path string) (*meteredWriter, error) { + f, err := os.Create(path) + if err != nil { + return nil, err + } + return &meteredWriter{WriteCloser: newBufferedWriter(f)}, nil +} + +// Unsafe for concurrent access. +type multiWriter struct { + pattern string + maxSize uint64 + + cur *meteredWriter +} + +func newMultiWriter(pattern string, maxSize uint64) rawWriter { + if maxSize == 0 { + maxSize = 100 * 1024 * 1024 + } + return &multiWriter{ + pattern: pattern, + maxSize: maxSize, + } +} + +func (w *multiWriter) newFilename() string { + return fmt.Sprintf(w.pattern, newID()) +} + +func (w *multiWriter) NewRecord() (err error) { + if w.cur == nil || w.cur.Bytes() > w.maxSize { + if w.cur != nil { + if err = w.cur.Close(); err != nil { + return + } + } + w.cur, err = openFile(w.newFilename()) + } + return +} + +func (w *multiWriter) Write(b []byte) (int, error) { + return w.cur.Write(b) +} + +func (w *multiWriter) Close() error { + return w.cur.Close() +} + +type simpleWriter struct { + *bufferedWriter +} + +func newSimpleWriter(w io.WriteCloser) rawWriter { + return &simpleWriter{newBufferedWriter(w)} +} + +func (w *simpleWriter) NewRecord() error { + return nil +} + +type rawWriter interface { + io.WriteCloser + NewRecord() error +} diff --git a/warc/warc.go b/warc/warc.go index 1184d9c..a552085 100644 --- a/warc/warc.go +++ b/warc/warc.go @@ -3,9 +3,10 @@ package warc import ( - "bufio" + "errors" "fmt" "io" + "strings" "time" "compress/gzip" @@ -57,7 +58,7 @@ func (h Header) Encode(w io.Writer) error { return err } } - _, err := fmt.Fprintf(w, "\r\n") + _, err := io.WriteString(w, "\r\n") return err } @@ -74,20 +75,22 @@ func NewHeader() Header { // Writer can write records to a file in WARC format. It is safe // for concurrent access, since writes are serialized internally. type Writer struct { - writer io.WriteCloser - bufwriter *bufio.Writer - gzwriter *gzip.Writer - lockCh chan bool + writer rawWriter + lockCh chan struct{} } type recordWriter struct { - io.Writer - lockCh chan bool + io.WriteCloser + lockCh chan struct{} } func (rw *recordWriter) Close() error { // Add the end-of-record marker. - _, err := fmt.Fprintf(rw, "\r\n\r\n") + _, err := io.WriteString(rw, "\r\n\r\n") + if err != nil { + return err + } + err = rw.WriteCloser.Close() <-rw.lockCh return err } @@ -98,41 +101,57 @@ func (rw *recordWriter) Close() error { // is satisfied. If this function returns an error, the state of the // Writer is invalid and it should no longer be used. func (w *Writer) NewRecord(hdr Header) (io.WriteCloser, error) { - w.lockCh <- true - if w.gzwriter != nil { - w.gzwriter.Close() // nolint + w.lockCh <- struct{}{} + + if err := w.writer.NewRecord(); err != nil { + return nil, err } - var err error - w.gzwriter, err = gzip.NewWriterLevel(w.bufwriter, gzip.BestCompression) + + gzwriter, err := gzip.NewWriterLevel(w.writer, gzip.BestCompression) if err != nil { return nil, err } - w.gzwriter.Header.Name = hdr.Get("WARC-Record-ID") - if err = hdr.Encode(w.gzwriter); err != nil { + gzwriter.Header.Name = hdr.Get("WARC-Record-ID") + if err = hdr.Encode(gzwriter); err != nil { return nil, err } - return &recordWriter{Writer: w.gzwriter, lockCh: w.lockCh}, nil + return &recordWriter{ + WriteCloser: gzwriter, + lockCh: w.lockCh, + }, nil } // Close the WARC writer and flush all buffers. This will also call // Close on the wrapped io.WriteCloser object. func (w *Writer) Close() error { - if err := w.gzwriter.Close(); err != nil { - return err - } - if err := w.bufwriter.Flush(); err != nil { - return err - } + w.lockCh <- struct{}{} // do not release + defer close(w.lockCh) // pending NewRecord calls will panic? return w.writer.Close() } // NewWriter initializes a new Writer and returns it. func NewWriter(w io.WriteCloser) *Writer { return &Writer{ - writer: w, - bufwriter: bufio.NewWriter(w), + writer: newSimpleWriter(w), // Buffering is important here since we're using this // channel as a semaphore. - lockCh: make(chan bool, 1), + lockCh: make(chan struct{}, 1), + } +} + +// NewMultiWriter initializes a new Writer that writes its output to +// multiple files of limited size approximately equal to maxSize, +// rotating them when necessary. The input path should contain a +// literal '%s' token, which will be replaced with a (lexically +// sortable) unique token. +func NewMultiWriter(pattern string, maxSize uint64) (*Writer, error) { + if !strings.Contains(pattern, "%s") { + return nil, errors.New("input path is not a pattern") } + return &Writer{ + writer: newMultiWriter(pattern, maxSize), + // Buffering is important here since we're using this + // channel as a semaphore. + lockCh: make(chan struct{}, 1), + }, nil } diff --git a/warc/warc_test.go b/warc/warc_test.go new file mode 100644 index 0000000..5e2de2f --- /dev/null +++ b/warc/warc_test.go @@ -0,0 +1,138 @@ +package warc + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sync" + "testing" +) + +var testData = []byte("this is some very interesting test data of non-zero size") + +func writeRecords(w *Writer, n int) error { + for i := 0; i < n; i++ { + hdr := NewHeader() + rec, err := w.NewRecord(hdr) + if err != nil { + return fmt.Errorf("NewRecord: %v", err) + } + _, err = rec.Write(testData) + if err != nil { + return fmt.Errorf("record Write: %v", err) + } + if err := rec.Close(); err != nil { + return fmt.Errorf("record Close: %v", err) + } + } + return nil +} + +func writeManyRecords(t testing.TB, w *Writer, n int) { + if err := writeRecords(w, n); err != nil { + t.Fatal(err) + } +} + +func writeManyRecordsConcurrently(t testing.TB, w *Writer, n, nproc int) { + startCh := make(chan struct{}) + errCh := make(chan error, nproc+1) + var wg sync.WaitGroup + + for i := 0; i < nproc; i++ { + wg.Add(1) + go func() { + defer wg.Done() + <-startCh + if err := writeRecords(w, n); err != nil { + errCh <- err + } + }() + } + go func() { + wg.Wait() + errCh <- nil + }() + close(startCh) + if err := <-errCh; err != nil { + t.Fatalf("a worker got an error: %v", err) + } +} + +func TestWARC_WriteSingleFile(t *testing.T) { + dir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + f, err := os.Create(filepath.Join(dir, "out.warc.gz")) + if err != nil { + t.Fatal(err) + } + w := NewWriter(f) + + writeManyRecords(t, w, 1000) + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } +} + +func TestWARC_WriteMulti(t *testing.T) { + dir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + var targetSize int64 = 10240 + w, err := NewMultiWriter(filepath.Join(dir, "out.%s.warc.gz"), uint64(targetSize)) + if err != nil { + t.Fatal(err) + } + + writeManyRecords(t, w, 1000) + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + files, _ := ioutil.ReadDir(dir) + if len(files) < 2 { + t.Fatalf("MultiWriter didn't create enough files (%d)", len(files)) + } + for _, f := range files[:len(files)-1] { + if f.Size() < targetSize { + t.Errorf("output file %s is too small (%d bytes)", f.Name(), f.Size()) + } + } +} + +func TestWARC_WriteMulti_Concurrent(t *testing.T) { + dir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + var targetSize int64 = 100000 + w, err := NewMultiWriter(filepath.Join(dir, "out.%s.warc.gz"), uint64(targetSize)) + if err != nil { + t.Fatal(err) + } + + writeManyRecordsConcurrently(t, w, 1000, 10) + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + files, _ := ioutil.ReadDir(dir) + if len(files) < 2 { + t.Fatalf("MultiWriter didn't create enough files (%d)", len(files)) + } + for _, f := range files[:len(files)-1] { + if f.Size() < targetSize { + t.Errorf("output file %s is too small (%d bytes)", f.Name(), f.Size()) + } + } +} |