aboutsummaryrefslogtreecommitdiff
path: root/warc/warc.go
diff options
context:
space:
mode:
Diffstat (limited to 'warc/warc.go')
-rw-r--r--warc/warc.go71
1 files changed, 45 insertions, 26 deletions
diff --git a/warc/warc.go b/warc/warc.go
index 1184d9c..a552085 100644
--- a/warc/warc.go
+++ b/warc/warc.go
@@ -3,9 +3,10 @@
package warc
import (
- "bufio"
+ "errors"
"fmt"
"io"
+ "strings"
"time"
"compress/gzip"
@@ -57,7 +58,7 @@ func (h Header) Encode(w io.Writer) error {
return err
}
}
- _, err := fmt.Fprintf(w, "\r\n")
+ _, err := io.WriteString(w, "\r\n")
return err
}
@@ -74,20 +75,22 @@ func NewHeader() Header {
// Writer can write records to a file in WARC format. It is safe
// for concurrent access, since writes are serialized internally.
type Writer struct {
- writer io.WriteCloser
- bufwriter *bufio.Writer
- gzwriter *gzip.Writer
- lockCh chan bool
+ writer rawWriter
+ lockCh chan struct{}
}
type recordWriter struct {
- io.Writer
- lockCh chan bool
+ io.WriteCloser
+ lockCh chan struct{}
}
func (rw *recordWriter) Close() error {
// Add the end-of-record marker.
- _, err := fmt.Fprintf(rw, "\r\n\r\n")
+ _, err := io.WriteString(rw, "\r\n\r\n")
+ if err != nil {
+ return err
+ }
+ err = rw.WriteCloser.Close()
<-rw.lockCh
return err
}
@@ -98,41 +101,57 @@ func (rw *recordWriter) Close() error {
// is satisfied. If this function returns an error, the state of the
// Writer is invalid and it should no longer be used.
func (w *Writer) NewRecord(hdr Header) (io.WriteCloser, error) {
- w.lockCh <- true
- if w.gzwriter != nil {
- w.gzwriter.Close() // nolint
+ w.lockCh <- struct{}{}
+
+ if err := w.writer.NewRecord(); err != nil {
+ return nil, err
}
- var err error
- w.gzwriter, err = gzip.NewWriterLevel(w.bufwriter, gzip.BestCompression)
+
+ gzwriter, err := gzip.NewWriterLevel(w.writer, gzip.BestCompression)
if err != nil {
return nil, err
}
- w.gzwriter.Header.Name = hdr.Get("WARC-Record-ID")
- if err = hdr.Encode(w.gzwriter); err != nil {
+ gzwriter.Header.Name = hdr.Get("WARC-Record-ID")
+ if err = hdr.Encode(gzwriter); err != nil {
return nil, err
}
- return &recordWriter{Writer: w.gzwriter, lockCh: w.lockCh}, nil
+ return &recordWriter{
+ WriteCloser: gzwriter,
+ lockCh: w.lockCh,
+ }, nil
}
// Close the WARC writer and flush all buffers. This will also call
// Close on the wrapped io.WriteCloser object.
func (w *Writer) Close() error {
- if err := w.gzwriter.Close(); err != nil {
- return err
- }
- if err := w.bufwriter.Flush(); err != nil {
- return err
- }
+ w.lockCh <- struct{}{} // do not release
+ defer close(w.lockCh) // pending NewRecord calls will panic?
return w.writer.Close()
}
// NewWriter initializes a new Writer and returns it.
func NewWriter(w io.WriteCloser) *Writer {
return &Writer{
- writer: w,
- bufwriter: bufio.NewWriter(w),
+ writer: newSimpleWriter(w),
// Buffering is important here since we're using this
// channel as a semaphore.
- lockCh: make(chan bool, 1),
+ lockCh: make(chan struct{}, 1),
+ }
+}
+
+// NewMultiWriter initializes a new Writer that writes its output to
+// multiple files of limited size approximately equal to maxSize,
+// rotating them when necessary. The input path should contain a
+// literal '%s' token, which will be replaced with a (lexically
+// sortable) unique token.
+func NewMultiWriter(pattern string, maxSize uint64) (*Writer, error) {
+ if !strings.Contains(pattern, "%s") {
+ return nil, errors.New("input path is not a pattern")
}
+ return &Writer{
+ writer: newMultiWriter(pattern, maxSize),
+ // Buffering is important here since we're using this
+ // channel as a semaphore.
+ lockCh: make(chan struct{}, 1),
+ }, nil
}