aboutsummaryrefslogtreecommitdiff
path: root/warc
diff options
context:
space:
mode:
authorale <ale@incal.net>2019-01-02 09:53:42 +0000
committerale <ale@incal.net>2019-01-02 09:53:42 +0000
commitc5ec7eb826bfd08aa6e8dd880efa15930f78ba19 (patch)
tree7c7d5fcfc55922cf78a97001b7ca4b879b747d28 /warc
parent3518feaf05fcb7f745975851c6684a63532ff19a (diff)
downloadcrawl-c5ec7eb826bfd08aa6e8dd880efa15930f78ba19.tar.gz
crawl-c5ec7eb826bfd08aa6e8dd880efa15930f78ba19.zip
Add multi-file output
The output stage can now write to size-limited, rotating WARC files using a user-specified pattern, so that output files are always unique.
Diffstat (limited to 'warc')
-rw-r--r--warc/multi.go121
-rw-r--r--warc/warc.go71
-rw-r--r--warc/warc_test.go138
3 files changed, 304 insertions, 26 deletions
diff --git a/warc/multi.go b/warc/multi.go
new file mode 100644
index 0000000..a18ceb8
--- /dev/null
+++ b/warc/multi.go
@@ -0,0 +1,121 @@
+package warc
+
+import (
+ "bufio"
+ "encoding/binary"
+ "encoding/hex"
+ "fmt"
+ "io"
+ "os"
+ "sync/atomic"
+ "time"
+)
+
+func newID() string {
+ var b [8]byte
+ binary.BigEndian.PutUint64(b[:], uint64(time.Now().UnixNano()))
+ return hex.EncodeToString(b[:])
+}
+
+type meteredWriter struct {
+ io.WriteCloser
+ bytes uint64
+}
+
+func (m *meteredWriter) Write(b []byte) (int, error) {
+ n, err := m.WriteCloser.Write(b)
+ if n > 0 {
+ atomic.AddUint64(&m.bytes, uint64(n))
+ }
+ return n, err
+}
+
+func (m *meteredWriter) Bytes() uint64 {
+ return atomic.LoadUint64(&m.bytes)
+}
+
+type bufferedWriter struct {
+ *bufio.Writer
+ io.Closer
+}
+
+func newBufferedWriter(w io.WriteCloser) *bufferedWriter {
+ return &bufferedWriter{
+ Writer: bufio.NewWriter(w),
+ Closer: w,
+ }
+}
+
+func (w *bufferedWriter) Close() error {
+ if err := w.Writer.Flush(); err != nil {
+ return err
+ }
+ return w.Closer.Close()
+}
+
+func openFile(path string) (*meteredWriter, error) {
+ f, err := os.Create(path)
+ if err != nil {
+ return nil, err
+ }
+ return &meteredWriter{WriteCloser: newBufferedWriter(f)}, nil
+}
+
+// Unsafe for concurrent access.
+type multiWriter struct {
+ pattern string
+ maxSize uint64
+
+ cur *meteredWriter
+}
+
+func newMultiWriter(pattern string, maxSize uint64) rawWriter {
+ if maxSize == 0 {
+ maxSize = 100 * 1024 * 1024
+ }
+ return &multiWriter{
+ pattern: pattern,
+ maxSize: maxSize,
+ }
+}
+
+func (w *multiWriter) newFilename() string {
+ return fmt.Sprintf(w.pattern, newID())
+}
+
+func (w *multiWriter) NewRecord() (err error) {
+ if w.cur == nil || w.cur.Bytes() > w.maxSize {
+ if w.cur != nil {
+ if err = w.cur.Close(); err != nil {
+ return
+ }
+ }
+ w.cur, err = openFile(w.newFilename())
+ }
+ return
+}
+
+func (w *multiWriter) Write(b []byte) (int, error) {
+ return w.cur.Write(b)
+}
+
+func (w *multiWriter) Close() error {
+ return w.cur.Close()
+}
+
+type simpleWriter struct {
+ *bufferedWriter
+}
+
+func newSimpleWriter(w io.WriteCloser) rawWriter {
+ return &simpleWriter{newBufferedWriter(w)}
+}
+
+func (w *simpleWriter) NewRecord() error {
+ return nil
+}
+
+type rawWriter interface {
+ io.WriteCloser
+ NewRecord() error
+}
diff --git a/warc/warc.go b/warc/warc.go
index 1184d9c..a552085 100644
--- a/warc/warc.go
+++ b/warc/warc.go
@@ -3,9 +3,10 @@
package warc
import (
- "bufio"
+ "errors"
"fmt"
"io"
+ "strings"
"time"
"compress/gzip"
@@ -57,7 +58,7 @@ func (h Header) Encode(w io.Writer) error {
return err
}
}
- _, err := fmt.Fprintf(w, "\r\n")
+ _, err := io.WriteString(w, "\r\n")
return err
}
@@ -74,20 +75,22 @@ func NewHeader() Header {
// Writer can write records to a file in WARC format. It is safe
// for concurrent access, since writes are serialized internally.
type Writer struct {
- writer io.WriteCloser
- bufwriter *bufio.Writer
- gzwriter *gzip.Writer
- lockCh chan bool
+ writer rawWriter
+ lockCh chan struct{}
}
type recordWriter struct {
- io.Writer
- lockCh chan bool
+ io.WriteCloser
+ lockCh chan struct{}
}
func (rw *recordWriter) Close() error {
// Add the end-of-record marker.
- _, err := fmt.Fprintf(rw, "\r\n\r\n")
+ _, err := io.WriteString(rw, "\r\n\r\n")
+ if err != nil {
+ return err
+ }
+ err = rw.WriteCloser.Close()
<-rw.lockCh
return err
}
@@ -98,41 +101,57 @@ func (rw *recordWriter) Close() error {
// is satisfied. If this function returns an error, the state of the
// Writer is invalid and it should no longer be used.
func (w *Writer) NewRecord(hdr Header) (io.WriteCloser, error) {
- w.lockCh <- true
- if w.gzwriter != nil {
- w.gzwriter.Close() // nolint
+ w.lockCh <- struct{}{}
+
+ if err := w.writer.NewRecord(); err != nil {
+ return nil, err
}
- var err error
- w.gzwriter, err = gzip.NewWriterLevel(w.bufwriter, gzip.BestCompression)
+
+ gzwriter, err := gzip.NewWriterLevel(w.writer, gzip.BestCompression)
if err != nil {
return nil, err
}
- w.gzwriter.Header.Name = hdr.Get("WARC-Record-ID")
- if err = hdr.Encode(w.gzwriter); err != nil {
+ gzwriter.Header.Name = hdr.Get("WARC-Record-ID")
+ if err = hdr.Encode(gzwriter); err != nil {
return nil, err
}
- return &recordWriter{Writer: w.gzwriter, lockCh: w.lockCh}, nil
+ return &recordWriter{
+ WriteCloser: gzwriter,
+ lockCh: w.lockCh,
+ }, nil
}
// Close the WARC writer and flush all buffers. This will also call
// Close on the wrapped io.WriteCloser object.
func (w *Writer) Close() error {
- if err := w.gzwriter.Close(); err != nil {
- return err
- }
- if err := w.bufwriter.Flush(); err != nil {
- return err
- }
+ w.lockCh <- struct{}{} // do not release
+ defer close(w.lockCh) // pending NewRecord calls will panic?
return w.writer.Close()
}
// NewWriter initializes a new Writer and returns it.
func NewWriter(w io.WriteCloser) *Writer {
return &Writer{
- writer: w,
- bufwriter: bufio.NewWriter(w),
+ writer: newSimpleWriter(w),
// Buffering is important here since we're using this
// channel as a semaphore.
- lockCh: make(chan bool, 1),
+ lockCh: make(chan struct{}, 1),
+ }
+}
+
+// NewMultiWriter initializes a new Writer that writes its output to
+// multiple files of limited size approximately equal to maxSize,
+// rotating them when necessary. The input path should contain a
+// literal '%s' token, which will be replaced with a (lexically
+// sortable) unique token.
+func NewMultiWriter(pattern string, maxSize uint64) (*Writer, error) {
+ if !strings.Contains(pattern, "%s") {
+ return nil, errors.New("input path is not a pattern")
}
+ return &Writer{
+ writer: newMultiWriter(pattern, maxSize),
+ // Buffering is important here since we're using this
+ // channel as a semaphore.
+ lockCh: make(chan struct{}, 1),
+ }, nil
}
diff --git a/warc/warc_test.go b/warc/warc_test.go
new file mode 100644
index 0000000..5e2de2f
--- /dev/null
+++ b/warc/warc_test.go
@@ -0,0 +1,138 @@
+package warc
+
+import (
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "sync"
+ "testing"
+)
+
+var testData = []byte("this is some very interesting test data of non-zero size")
+
+func writeRecords(w *Writer, n int) error {
+ for i := 0; i < n; i++ {
+ hdr := NewHeader()
+ rec, err := w.NewRecord(hdr)
+ if err != nil {
+ return fmt.Errorf("NewRecord: %v", err)
+ }
+ _, err = rec.Write(testData)
+ if err != nil {
+ return fmt.Errorf("record Write: %v", err)
+ }
+ if err := rec.Close(); err != nil {
+ return fmt.Errorf("record Close: %v", err)
+ }
+ }
+ return nil
+}
+
+func writeManyRecords(t testing.TB, w *Writer, n int) {
+ if err := writeRecords(w, n); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func writeManyRecordsConcurrently(t testing.TB, w *Writer, n, nproc int) {
+ startCh := make(chan struct{})
+ errCh := make(chan error, nproc+1)
+ var wg sync.WaitGroup
+
+ for i := 0; i < nproc; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ <-startCh
+ if err := writeRecords(w, n); err != nil {
+ errCh <- err
+ }
+ }()
+ }
+ go func() {
+ wg.Wait()
+ errCh <- nil
+ }()
+ close(startCh)
+ if err := <-errCh; err != nil {
+ t.Fatalf("a worker got an error: %v", err)
+ }
+}
+
+func TestWARC_WriteSingleFile(t *testing.T) {
+ dir, err := ioutil.TempDir("", "")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(dir)
+
+ f, err := os.Create(filepath.Join(dir, "out.warc.gz"))
+ if err != nil {
+ t.Fatal(err)
+ }
+ w := NewWriter(f)
+
+ writeManyRecords(t, w, 1000)
+ if err := w.Close(); err != nil {
+ t.Fatalf("Close: %v", err)
+ }
+}
+
+func TestWARC_WriteMulti(t *testing.T) {
+ dir, err := ioutil.TempDir("", "")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(dir)
+
+ var targetSize int64 = 10240
+ w, err := NewMultiWriter(filepath.Join(dir, "out.%s.warc.gz"), uint64(targetSize))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ writeManyRecords(t, w, 1000)
+ if err := w.Close(); err != nil {
+ t.Fatalf("Close: %v", err)
+ }
+
+ files, _ := ioutil.ReadDir(dir)
+ if len(files) < 2 {
+ t.Fatalf("MultiWriter didn't create enough files (%d)", len(files))
+ }
+ for _, f := range files[:len(files)-1] {
+ if f.Size() < targetSize {
+ t.Errorf("output file %s is too small (%d bytes)", f.Name(), f.Size())
+ }
+ }
+}
+
+func TestWARC_WriteMulti_Concurrent(t *testing.T) {
+ dir, err := ioutil.TempDir("", "")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(dir)
+
+ var targetSize int64 = 100000
+ w, err := NewMultiWriter(filepath.Join(dir, "out.%s.warc.gz"), uint64(targetSize))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ writeManyRecordsConcurrently(t, w, 1000, 10)
+ if err := w.Close(); err != nil {
+ t.Fatalf("Close: %v", err)
+ }
+
+ files, _ := ioutil.ReadDir(dir)
+ if len(files) < 2 {
+ t.Fatalf("MultiWriter didn't create enough files (%d)", len(files))
+ }
+ for _, f := range files[:len(files)-1] {
+ if f.Size() < targetSize {
+ t.Errorf("output file %s is too small (%d bytes)", f.Name(), f.Size())
+ }
+ }
+}