From c5ec7eb826bfd08aa6e8dd880efa15930f78ba19 Mon Sep 17 00:00:00 2001 From: ale Date: Wed, 2 Jan 2019 09:53:42 +0000 Subject: Add multi-file output The output stage can now write to size-limited, rotating WARC files using a user-specified pattern, so that output files are always unique. --- README.md | 9 ++++ cmd/crawl/crawl.go | 21 ++++++-- warc/multi.go | 121 ++++++++++++++++++++++++++++++++++++++++++++++ warc/warc.go | 71 +++++++++++++++++---------- warc/warc_test.go | 138 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 330 insertions(+), 30 deletions(-) create mode 100644 warc/multi.go create mode 100644 warc/warc_test.go diff --git a/README.md b/README.md index 38f7bc3..b4d28e5 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,15 @@ avoid calendars, admin panels of common CMS applications, and other well-known pitfalls. This list is sourced from the [ArchiveBot](https://github.com/ArchiveTeam/ArchiveBot) project. +If you're running a larger crawl, the tool can be told to rotate the +output WARC files when they reach a certain size (100MB by default, +controlled by the *--output-max-size* flag. To do so, make sure the +*--output* option contains somewhere the literal token `%s`, which +will be replaced by a unique identifier every time a new file is +created, e.g.: + + $ crawl --output=out-%s.warc.gz http://example.com/ + ## Limitations Like most crawlers, this one has a number of limitations: diff --git a/cmd/crawl/crawl.go b/cmd/crawl/crawl.go index d5e012a..2ebba98 100644 --- a/cmd/crawl/crawl.go +++ b/cmd/crawl/crawl.go @@ -34,7 +34,8 @@ var ( depth = flag.Int("depth", 100, "maximum link depth") validSchemes = flag.String("schemes", "http,https", "comma-separated list of allowed protocols") excludeRelated = flag.Bool("exclude-related", false, "include related resources (css, images, etc) only if their URL is in scope") - outputFile = flag.String("output", "crawl.warc.gz", "output WARC file") + outputFile = flag.String("output", "crawl.warc.gz", "output WARC file or pattern (patterns must include a \"%s\" literal token)") + warcFileSizeMB = flag.Int("output-max-size", 100, "maximum output WARC file size (in MB) when using patterns") cpuprofile = flag.String("cpuprofile", "", "create cpu profile") excludes []*regexp.Regexp @@ -63,7 +64,7 @@ type excludesFileFlag struct{} func (f *excludesFileFlag) String() string { return "" } func (f *excludesFileFlag) Set(s string) error { - ff, err := os.Open(s) + ff, err := os.Open(s) // #nosec if err != nil { return err } @@ -246,6 +247,19 @@ func (b *byteCounter) Read(buf []byte) (int, error) { return n, err } +func warcWriterFromFlags() (w *warc.Writer, err error) { + if strings.Contains(*outputFile, "%s") { + w, err = warc.NewMultiWriter(*outputFile, uint64(*warcFileSizeMB)*1024*1024) + } else { + var f *os.File + f, err = os.Create(*outputFile) + if err == nil { + w = warc.NewWriter(f) + } + } + return +} + func main() { flag.Parse() @@ -271,11 +285,10 @@ func main() { scope = crawl.AND(crawl.OR(scope, crawl.NewIncludeRelatedScope()), crawl.NewRegexpIgnoreScope(excludes)) } - outf, err := os.Create(*outputFile) + w, err := warcWriterFromFlags() if err != nil { log.Fatal(err) } - w := warc.NewWriter(outf) defer w.Close() // nolint saver, err := newWarcSaveHandler(w) diff --git a/warc/multi.go b/warc/multi.go new file mode 100644 index 0000000..a18ceb8 --- /dev/null +++ b/warc/multi.go @@ -0,0 +1,121 @@ +package warc + +import ( + "bufio" + "encoding/binary" + "encoding/hex" + "fmt" + "io" + "os" + "sync/atomic" + "time" +) + +func newID() string { + var b [8]byte + binary.BigEndian.PutUint64(b[:], uint64(time.Now().UnixNano())) + return hex.EncodeToString(b[:]) +} + +type meteredWriter struct { + io.WriteCloser + bytes uint64 +} + +func (m *meteredWriter) Write(b []byte) (int, error) { + n, err := m.WriteCloser.Write(b) + if n > 0 { + atomic.AddUint64(&m.bytes, uint64(n)) + } + return n, err +} + +func (m *meteredWriter) Bytes() uint64 { + return atomic.LoadUint64(&m.bytes) +} + +type bufferedWriter struct { + *bufio.Writer + io.Closer +} + +func newBufferedWriter(w io.WriteCloser) *bufferedWriter { + return &bufferedWriter{ + Writer: bufio.NewWriter(w), + Closer: w, + } +} + +func (w *bufferedWriter) Close() error { + if err := w.Writer.Flush(); err != nil { + return err + } + return w.Closer.Close() +} + +func openFile(path string) (*meteredWriter, error) { + f, err := os.Create(path) + if err != nil { + return nil, err + } + return &meteredWriter{WriteCloser: newBufferedWriter(f)}, nil +} + +// Unsafe for concurrent access. +type multiWriter struct { + pattern string + maxSize uint64 + + cur *meteredWriter +} + +func newMultiWriter(pattern string, maxSize uint64) rawWriter { + if maxSize == 0 { + maxSize = 100 * 1024 * 1024 + } + return &multiWriter{ + pattern: pattern, + maxSize: maxSize, + } +} + +func (w *multiWriter) newFilename() string { + return fmt.Sprintf(w.pattern, newID()) +} + +func (w *multiWriter) NewRecord() (err error) { + if w.cur == nil || w.cur.Bytes() > w.maxSize { + if w.cur != nil { + if err = w.cur.Close(); err != nil { + return + } + } + w.cur, err = openFile(w.newFilename()) + } + return +} + +func (w *multiWriter) Write(b []byte) (int, error) { + return w.cur.Write(b) +} + +func (w *multiWriter) Close() error { + return w.cur.Close() +} + +type simpleWriter struct { + *bufferedWriter +} + +func newSimpleWriter(w io.WriteCloser) rawWriter { + return &simpleWriter{newBufferedWriter(w)} +} + +func (w *simpleWriter) NewRecord() error { + return nil +} + +type rawWriter interface { + io.WriteCloser + NewRecord() error +} diff --git a/warc/warc.go b/warc/warc.go index 1184d9c..a552085 100644 --- a/warc/warc.go +++ b/warc/warc.go @@ -3,9 +3,10 @@ package warc import ( - "bufio" + "errors" "fmt" "io" + "strings" "time" "compress/gzip" @@ -57,7 +58,7 @@ func (h Header) Encode(w io.Writer) error { return err } } - _, err := fmt.Fprintf(w, "\r\n") + _, err := io.WriteString(w, "\r\n") return err } @@ -74,20 +75,22 @@ func NewHeader() Header { // Writer can write records to a file in WARC format. It is safe // for concurrent access, since writes are serialized internally. type Writer struct { - writer io.WriteCloser - bufwriter *bufio.Writer - gzwriter *gzip.Writer - lockCh chan bool + writer rawWriter + lockCh chan struct{} } type recordWriter struct { - io.Writer - lockCh chan bool + io.WriteCloser + lockCh chan struct{} } func (rw *recordWriter) Close() error { // Add the end-of-record marker. - _, err := fmt.Fprintf(rw, "\r\n\r\n") + _, err := io.WriteString(rw, "\r\n\r\n") + if err != nil { + return err + } + err = rw.WriteCloser.Close() <-rw.lockCh return err } @@ -98,41 +101,57 @@ func (rw *recordWriter) Close() error { // is satisfied. If this function returns an error, the state of the // Writer is invalid and it should no longer be used. func (w *Writer) NewRecord(hdr Header) (io.WriteCloser, error) { - w.lockCh <- true - if w.gzwriter != nil { - w.gzwriter.Close() // nolint + w.lockCh <- struct{}{} + + if err := w.writer.NewRecord(); err != nil { + return nil, err } - var err error - w.gzwriter, err = gzip.NewWriterLevel(w.bufwriter, gzip.BestCompression) + + gzwriter, err := gzip.NewWriterLevel(w.writer, gzip.BestCompression) if err != nil { return nil, err } - w.gzwriter.Header.Name = hdr.Get("WARC-Record-ID") - if err = hdr.Encode(w.gzwriter); err != nil { + gzwriter.Header.Name = hdr.Get("WARC-Record-ID") + if err = hdr.Encode(gzwriter); err != nil { return nil, err } - return &recordWriter{Writer: w.gzwriter, lockCh: w.lockCh}, nil + return &recordWriter{ + WriteCloser: gzwriter, + lockCh: w.lockCh, + }, nil } // Close the WARC writer and flush all buffers. This will also call // Close on the wrapped io.WriteCloser object. func (w *Writer) Close() error { - if err := w.gzwriter.Close(); err != nil { - return err - } - if err := w.bufwriter.Flush(); err != nil { - return err - } + w.lockCh <- struct{}{} // do not release + defer close(w.lockCh) // pending NewRecord calls will panic? return w.writer.Close() } // NewWriter initializes a new Writer and returns it. func NewWriter(w io.WriteCloser) *Writer { return &Writer{ - writer: w, - bufwriter: bufio.NewWriter(w), + writer: newSimpleWriter(w), // Buffering is important here since we're using this // channel as a semaphore. - lockCh: make(chan bool, 1), + lockCh: make(chan struct{}, 1), + } +} + +// NewMultiWriter initializes a new Writer that writes its output to +// multiple files of limited size approximately equal to maxSize, +// rotating them when necessary. The input path should contain a +// literal '%s' token, which will be replaced with a (lexically +// sortable) unique token. +func NewMultiWriter(pattern string, maxSize uint64) (*Writer, error) { + if !strings.Contains(pattern, "%s") { + return nil, errors.New("input path is not a pattern") } + return &Writer{ + writer: newMultiWriter(pattern, maxSize), + // Buffering is important here since we're using this + // channel as a semaphore. + lockCh: make(chan struct{}, 1), + }, nil } diff --git a/warc/warc_test.go b/warc/warc_test.go new file mode 100644 index 0000000..5e2de2f --- /dev/null +++ b/warc/warc_test.go @@ -0,0 +1,138 @@ +package warc + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sync" + "testing" +) + +var testData = []byte("this is some very interesting test data of non-zero size") + +func writeRecords(w *Writer, n int) error { + for i := 0; i < n; i++ { + hdr := NewHeader() + rec, err := w.NewRecord(hdr) + if err != nil { + return fmt.Errorf("NewRecord: %v", err) + } + _, err = rec.Write(testData) + if err != nil { + return fmt.Errorf("record Write: %v", err) + } + if err := rec.Close(); err != nil { + return fmt.Errorf("record Close: %v", err) + } + } + return nil +} + +func writeManyRecords(t testing.TB, w *Writer, n int) { + if err := writeRecords(w, n); err != nil { + t.Fatal(err) + } +} + +func writeManyRecordsConcurrently(t testing.TB, w *Writer, n, nproc int) { + startCh := make(chan struct{}) + errCh := make(chan error, nproc+1) + var wg sync.WaitGroup + + for i := 0; i < nproc; i++ { + wg.Add(1) + go func() { + defer wg.Done() + <-startCh + if err := writeRecords(w, n); err != nil { + errCh <- err + } + }() + } + go func() { + wg.Wait() + errCh <- nil + }() + close(startCh) + if err := <-errCh; err != nil { + t.Fatalf("a worker got an error: %v", err) + } +} + +func TestWARC_WriteSingleFile(t *testing.T) { + dir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + f, err := os.Create(filepath.Join(dir, "out.warc.gz")) + if err != nil { + t.Fatal(err) + } + w := NewWriter(f) + + writeManyRecords(t, w, 1000) + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } +} + +func TestWARC_WriteMulti(t *testing.T) { + dir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + var targetSize int64 = 10240 + w, err := NewMultiWriter(filepath.Join(dir, "out.%s.warc.gz"), uint64(targetSize)) + if err != nil { + t.Fatal(err) + } + + writeManyRecords(t, w, 1000) + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + files, _ := ioutil.ReadDir(dir) + if len(files) < 2 { + t.Fatalf("MultiWriter didn't create enough files (%d)", len(files)) + } + for _, f := range files[:len(files)-1] { + if f.Size() < targetSize { + t.Errorf("output file %s is too small (%d bytes)", f.Name(), f.Size()) + } + } +} + +func TestWARC_WriteMulti_Concurrent(t *testing.T) { + dir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + var targetSize int64 = 100000 + w, err := NewMultiWriter(filepath.Join(dir, "out.%s.warc.gz"), uint64(targetSize)) + if err != nil { + t.Fatal(err) + } + + writeManyRecordsConcurrently(t, w, 1000, 10) + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + files, _ := ioutil.ReadDir(dir) + if len(files) < 2 { + t.Fatalf("MultiWriter didn't create enough files (%d)", len(files)) + } + for _, f := range files[:len(files)-1] { + if f.Size() < targetSize { + t.Errorf("output file %s is too small (%d bytes)", f.Name(), f.Size()) + } + } +} -- cgit v1.2.3-54-g00ecf