aboutsummaryrefslogtreecommitdiff
path: root/warc/multi.go
diff options
context:
space:
mode:
authorale <ale@incal.net>2019-01-02 09:53:42 +0000
committerale <ale@incal.net>2019-01-02 09:53:42 +0000
commitc5ec7eb826bfd08aa6e8dd880efa15930f78ba19 (patch)
tree7c7d5fcfc55922cf78a97001b7ca4b879b747d28 /warc/multi.go
parent3518feaf05fcb7f745975851c6684a63532ff19a (diff)
downloadcrawl-c5ec7eb826bfd08aa6e8dd880efa15930f78ba19.tar.gz
crawl-c5ec7eb826bfd08aa6e8dd880efa15930f78ba19.zip
Add multi-file output
The output stage can now write to size-limited, rotating WARC files using a user-specified pattern, so that output files are always unique.
Diffstat (limited to 'warc/multi.go')
-rw-r--r--warc/multi.go121
1 files changed, 121 insertions, 0 deletions
diff --git a/warc/multi.go b/warc/multi.go
new file mode 100644
index 0000000..a18ceb8
--- /dev/null
+++ b/warc/multi.go
@@ -0,0 +1,121 @@
+package warc
+
+import (
+ "bufio"
+ "encoding/binary"
+ "encoding/hex"
+ "fmt"
+ "io"
+ "os"
+ "sync/atomic"
+ "time"
+)
+
+func newID() string {
+ var b [8]byte
+ binary.BigEndian.PutUint64(b[:], uint64(time.Now().UnixNano()))
+ return hex.EncodeToString(b[:])
+}
+
+type meteredWriter struct {
+ io.WriteCloser
+ bytes uint64
+}
+
+func (m *meteredWriter) Write(b []byte) (int, error) {
+ n, err := m.WriteCloser.Write(b)
+ if n > 0 {
+ atomic.AddUint64(&m.bytes, uint64(n))
+ }
+ return n, err
+}
+
+func (m *meteredWriter) Bytes() uint64 {
+ return atomic.LoadUint64(&m.bytes)
+}
+
+type bufferedWriter struct {
+ *bufio.Writer
+ io.Closer
+}
+
+func newBufferedWriter(w io.WriteCloser) *bufferedWriter {
+ return &bufferedWriter{
+ Writer: bufio.NewWriter(w),
+ Closer: w,
+ }
+}
+
+func (w *bufferedWriter) Close() error {
+ if err := w.Writer.Flush(); err != nil {
+ return err
+ }
+ return w.Closer.Close()
+}
+
+func openFile(path string) (*meteredWriter, error) {
+ f, err := os.Create(path)
+ if err != nil {
+ return nil, err
+ }
+ return &meteredWriter{WriteCloser: newBufferedWriter(f)}, nil
+}
+
+// Unsafe for concurrent access.
+type multiWriter struct {
+ pattern string
+ maxSize uint64
+
+ cur *meteredWriter
+}
+
+func newMultiWriter(pattern string, maxSize uint64) rawWriter {
+ if maxSize == 0 {
+ maxSize = 100 * 1024 * 1024
+ }
+ return &multiWriter{
+ pattern: pattern,
+ maxSize: maxSize,
+ }
+}
+
+func (w *multiWriter) newFilename() string {
+ return fmt.Sprintf(w.pattern, newID())
+}
+
+func (w *multiWriter) NewRecord() (err error) {
+ if w.cur == nil || w.cur.Bytes() > w.maxSize {
+ if w.cur != nil {
+ if err = w.cur.Close(); err != nil {
+ return
+ }
+ }
+ w.cur, err = openFile(w.newFilename())
+ }
+ return
+}
+
+func (w *multiWriter) Write(b []byte) (int, error) {
+ return w.cur.Write(b)
+}
+
+func (w *multiWriter) Close() error {
+ return w.cur.Close()
+}
+
+type simpleWriter struct {
+ *bufferedWriter
+}
+
+func newSimpleWriter(w io.WriteCloser) rawWriter {
+ return &simpleWriter{newBufferedWriter(w)}
+}
+
+func (w *simpleWriter) NewRecord() error {
+ return nil
+}
+
+type rawWriter interface {
+ io.WriteCloser
+ NewRecord() error
+}