diff options
author | ale <ale@incal.net> | 2014-12-20 10:39:53 +0000 |
---|---|---|
committer | ale <ale@incal.net> | 2014-12-20 10:39:53 +0000 |
commit | d4c561c23d016cf6a7507840153e835994915cb8 (patch) | |
tree | 53d25b35a90e5c4e475915ee8746ec24312812d9 | |
parent | b09f05f8137e5bbc27a0a306de0529c59d3f2c28 (diff) | |
download | crawl-d4c561c23d016cf6a7507840153e835994915cb8.tar.gz crawl-d4c561c23d016cf6a7507840153e835994915cb8.zip |
move the WARC code into its own package
Now generates well-formed, indexable WARC files.
-rw-r--r-- | cmd/crawl/crawl.go | 15 | ||||
-rw-r--r-- | cmd/links/links.go | 2 | ||||
-rw-r--r-- | crawler.go | 7 | ||||
-rw-r--r-- | warc/warc.go (renamed from warc.go) | 66 |
4 files changed, 59 insertions, 31 deletions
diff --git a/cmd/crawl/crawl.go b/cmd/crawl/crawl.go index 1e5f952..0979d43 100644 --- a/cmd/crawl/crawl.go +++ b/cmd/crawl/crawl.go @@ -17,6 +17,7 @@ import ( "strings" "git.autistici.org/ale/crawl" + "git.autistici.org/ale/crawl/warc" "github.com/PuerkitoBio/goquery" ) @@ -94,7 +95,7 @@ func hdr2str(h http.Header) []byte { } type warcSaveHandler struct { - warc *crawl.WarcWriter + warc *warc.Writer warcInfoID string } @@ -108,7 +109,7 @@ func (h *warcSaveHandler) Handle(c *crawl.Crawler, u string, depth int, resp *ht // Dump the request. var b bytes.Buffer resp.Request.Write(&b) - hdr := crawl.NewWarcHeader() + hdr := warc.NewHeader() hdr.Set("WARC-Type", "request") hdr.Set("WARC-Target-URI", resp.Request.URL.String()) hdr.Set("WARC-Warcinfo-ID", h.warcInfoID) @@ -122,7 +123,7 @@ func (h *warcSaveHandler) Handle(c *crawl.Crawler, u string, depth int, resp *ht respPayload := bytes.Join([][]byte{ []byte(statusLine), hdr2str(resp.Header), data}, []byte{'\r', '\n'}) - hdr = crawl.NewWarcHeader() + hdr = warc.NewHeader() hdr.Set("WARC-Type", "response") hdr.Set("WARC-Target-URI", resp.Request.URL.String()) hdr.Set("WARC-Warcinfo-ID", h.warcInfoID) @@ -134,14 +135,14 @@ func (h *warcSaveHandler) Handle(c *crawl.Crawler, u string, depth int, resp *ht return extractLinks(c, u, depth, resp, err) } -func NewSaveHandler(w *crawl.WarcWriter) crawl.Handler { +func NewSaveHandler(w *warc.Writer) crawl.Handler { info := strings.Join([]string{ "Software: crawl/1.0\r\n", "Format: WARC File Format 1.0\r\n", "Conformsto: http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf\r\n", }, "") - hdr := crawl.NewWarcHeader() + hdr := warc.NewHeader() hdr.Set("WARC-Type", "warcinfo") hdr.Set("WARC-Warcinfo-ID", hdr.Get("WARC-Record-ID")) hdr.Set("Content-Length", strconv.Itoa(len(info))) @@ -165,7 +166,7 @@ func main() { seeds := crawl.MustParseURLs(flag.Args()) scope := crawl.NewSeedScope(seeds, *depth, strings.Split(*validSchemes, ",")) - w := crawl.NewWarcWriter(outf) + w := warc.NewWriter(outf) defer w.Close() saver := NewSaveHandler(w) @@ -174,5 +175,5 @@ func main() { if err != nil { log.Fatal(err) } - crawler.Run() + crawler.Run(*concurrency) } diff --git a/cmd/links/links.go b/cmd/links/links.go index 3ba63be..9ae2394 100644 --- a/cmd/links/links.go +++ b/cmd/links/links.go @@ -71,5 +71,5 @@ func main() { if err != nil { log.Fatal(err) } - crawler.Run() + crawler.Run(*concurrency) } @@ -272,8 +272,9 @@ func NewCrawler(path string, seeds []*url.URL, scope Scope, f Fetcher, h Handler return c, nil } -// Run the crawl, does not exit until it is done. -func (c *Crawler) Run() { +// Run the crawl with the specified number of workers. This function +// does not exit until all work is done (no URLs left in the queue). +func (c *Crawler) Run(concurrency int) { // Load initial seeds into the queue. for _, u := range c.seeds { c.Enqueue(u, 0) @@ -282,7 +283,7 @@ func (c *Crawler) Run() { // Start some runners and wait until they're done. var wg sync.WaitGroup ch := c.process() - for i := 0; i < 3; i++ { + for i := 0; i < concurrency; i++ { wg.Add(1) go func() { c.urlHandler(ch) @@ -1,4 +1,6 @@ -package crawl +// Package to write WARC files. + +package warc import ( "fmt" @@ -21,12 +23,12 @@ var ( } ) -// A Warc header. Header field names are case-sensitive. -type WarcHeader map[string]string +// A WARC header. Header field names are case-sensitive. +type Header map[string]string // Set a header to the specified value. Multiple values are not // supported. -func (h WarcHeader) Set(key, value string) { +func (h Header) Set(key, value string) { h[key] = value // Keep Content-Type in sync with WARC-Type. @@ -40,12 +42,12 @@ func (h WarcHeader) Set(key, value string) { } // Get the value of a header. If not found, returns an empty string. -func (h WarcHeader) Get(key string) string { +func (h Header) Get(key string) string { return h[key] } // Encode the header to a Writer. -func (h WarcHeader) Encode(w io.Writer) { +func (h Header) Encode(w io.Writer) { fmt.Fprintf(w, "%s\r\n", warcVersion) for hdr, value := range h { fmt.Fprintf(w, "%s: %s\r\n", hdr, value) @@ -53,44 +55,68 @@ func (h WarcHeader) Encode(w io.Writer) { fmt.Fprintf(w, "\r\n") } -// NewWarcHeader returns a WarcHeader with its own unique ID and the +// NewHeader returns a Header with its own unique ID and the // current timestamp. -func NewWarcHeader() WarcHeader { - h := make(WarcHeader) +func NewHeader() Header { + h := make(Header) h.Set("WARC-Record-ID", fmt.Sprintf("<%s>", uuid.NewUUID().URN())) h.Set("WARC-Date", time.Now().Format(warcTimeFmt)) h.Set("Content-Type", "application/octet-stream") return h } -// WarcWriter can write records to a file in WARC format. -type WarcWriter struct { - writer io.WriteCloser +// Writer can write records to a file in WARC format. It is safe +// for concurrent access, since writes are serialized internally. +type Writer struct { + writer io.WriteCloser + gzwriter *gzip.Writer + lockCh chan bool } type recordWriter struct { io.Writer + lockCh chan bool } func (rw *recordWriter) Close() error { // Add the end-of-record marker. fmt.Fprintf(rw, "\r\n\r\n") + + <-rw.lockCh + return nil } // NewRecord starts a new WARC record with the provided header. The // caller must call Close on the returned writer before creating the -// next record. -func (w *WarcWriter) NewRecord(hdr WarcHeader) io.WriteCloser { - hdr.Encode(w.writer) - return &recordWriter{w.writer} +// next record. Note that this function may block until that condition +// is satisfied. +func (w *Writer) NewRecord(hdr Header) io.WriteCloser { + w.lockCh <- true + if w.gzwriter != nil { + w.gzwriter.Close() + } + w.gzwriter, _ = gzip.NewWriterLevel(w.writer, gzip.BestCompression) + w.gzwriter.Header.Name = hdr.Get("WARC-Record-ID") + hdr.Encode(w.gzwriter) + return &recordWriter{Writer: w.gzwriter, lockCh: w.lockCh} } -// Close the WARC writer and flush all buffers. -func (w *WarcWriter) Close() error { +// Close the WARC writer and flush all buffers. This will also call +// Close on the wrapped io.WriteCloser object. +func (w *Writer) Close() error { + if err := w.gzwriter.Close(); err != nil { + return err + } return w.writer.Close() } -func NewWarcWriter(w io.WriteCloser) *WarcWriter { - return &WarcWriter{gzip.NewWriter(w)} +// NewWriter initializes a new Writer and returns it. +func NewWriter(w io.WriteCloser) *Writer { + return &Writer{ + writer: w, + // Buffering is important here since we're using this + // channel as a semaphore. + lockCh: make(chan bool, 1), + } } |