diff options
-rw-r--r-- | cmd/crawl/crawl.go | 4 | ||||
-rw-r--r-- | crawler.go | 26 | ||||
-rw-r--r-- | crawler_test.go | 2 | ||||
-rw-r--r-- | queue.go | 7 |
4 files changed, 20 insertions, 19 deletions
diff --git a/cmd/crawl/crawl.go b/cmd/crawl/crawl.go index 54bb505..a79e0a6 100644 --- a/cmd/crawl/crawl.go +++ b/cmd/crawl/crawl.go @@ -33,7 +33,7 @@ var ( concurrency = flag.Int("c", 10, "concurrent workers") depth = flag.Int("depth", 100, "maximum link depth") validSchemes = flag.String("schemes", "http,https", "comma-separated list of allowed protocols") - excludeRelated = flag.Bool("exclude-related", false, "include related resources (css, images, etc) only if their URL is in scope") + excludeRelated = flag.Bool("exclude-related", false, "do not include related resources (css, images, etc) if their URL is not in scope") outputFile = flag.String("output", "crawl.warc.gz", "output WARC file or pattern (patterns must include a \"%s\" literal token)") warcFileSizeMB = flag.Int("output-max-size", 100, "maximum output WARC file size (in MB) when using patterns") cpuprofile = flag.String("cpuprofile", "", "create cpu profile") @@ -127,7 +127,7 @@ func (h *warcSaveHandler) writeWARCRecord(typ, uri string, data []byte) error { return w.Close() } -func (h *warcSaveHandler) Handle(p crawl.Publisher, u string, depth int, resp *http.Response, _ error) error { +func (h *warcSaveHandler) Handle(p crawl.Publisher, u string, tag, depth int, resp *http.Response, _ error) error { // Read the response body (so we can save it to the WARC // output) and replace it with a buffer. data, derr := ioutil.ReadAll(resp.Body) @@ -112,15 +112,15 @@ func (f FetcherFunc) Fetch(u string) (*http.Response, error) { // unless the handler returns the special error ErrRetryRequest. type Handler interface { // Handle the response from a URL. - Handle(Publisher, string, int, *http.Response, error) error + Handle(Publisher, string, int, int, *http.Response, error) error } // HandlerFunc wraps a function into the Handler interface. -type HandlerFunc func(Publisher, string, int, *http.Response, error) error +type HandlerFunc func(Publisher, string, int, int, *http.Response, error) error // Handle the response from a URL. -func (f HandlerFunc) Handle(p Publisher, u string, depth int, resp *http.Response, err error) error { - return f(p, u, depth, resp, err) +func (f HandlerFunc) Handle(p Publisher, u string, tag, depth int, resp *http.Response, err error) error { + return f(p, u, tag, depth, resp, err) } // ErrRetryRequest is returned by a Handler when the request should be @@ -197,7 +197,7 @@ func (c *Crawler) Enqueue(link Outlink, depth int) error { // sure that subsequent calls to Enqueue with the same URL // will fail. wb := new(leveldb.Batch) - if err := c.queue.Add(wb, link.URL.String(), depth, time.Now()); err != nil { + if err := c.queue.Add(wb, link.URL.String(), link.Tag, depth, time.Now()); err != nil { return err } c.setSeen(wb, link.URL) @@ -250,7 +250,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) { // Invoke the handler (even if the fetcher errored // out). Errors in handling requests are fatal, crawl // will be aborted. - err := c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr) + err := c.handler.Handle(c, p.URL, p.Tag, p.Depth, httpResp, httpErr) if httpErr == nil { respBody.Close() // nolint } @@ -347,8 +347,8 @@ func (c *Crawler) Close() { // and adds them to the queue for crawling. It will call the wrapped // handler on all requests regardless. func FollowRedirects(wrap Handler) Handler { - return HandlerFunc(func(p Publisher, u string, depth int, resp *http.Response, err error) error { - if herr := wrap.Handle(p, u, depth, resp, err); herr != nil { + return HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error { + if herr := wrap.Handle(p, u, tag, depth, resp, err); herr != nil { return herr } @@ -362,7 +362,7 @@ func FollowRedirects(wrap Handler) Handler { if uerr != nil { log.Printf("error parsing Location header: %v", uerr) } else { - return p.Enqueue(Outlink{URL: locationURL, Tag: TagPrimary}, depth+1) + return p.Enqueue(Outlink{URL: locationURL, Tag: tag}, depth+1) } } return nil @@ -373,14 +373,14 @@ func FollowRedirects(wrap Handler) Handler { // "successful" HTTP status code (anything < 400). When using this // wrapper, subsequent Handle calls will always have err set to nil. func FilterErrors(wrap Handler) Handler { - return HandlerFunc(func(p Publisher, u string, depth int, resp *http.Response, err error) error { + return HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error { if err != nil { return nil } if resp.StatusCode >= 400 { return nil } - return wrap.Handle(p, u, depth, resp, nil) + return wrap.Handle(p, u, tag, depth, resp, nil) }) } @@ -388,11 +388,11 @@ func FilterErrors(wrap Handler) Handler { // temporary errors (all transport-level errors are considered // temporary, as well as any HTTP status code >= 500). func HandleRetries(wrap Handler) Handler { - return HandlerFunc(func(p Publisher, u string, depth int, resp *http.Response, err error) error { + return HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error { if err != nil || resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= 500 { return ErrRetryRequest } - return wrap.Handle(p, u, depth, resp, nil) + return wrap.Handle(p, u, tag, depth, resp, nil) }) } diff --git a/crawler_test.go b/crawler_test.go index 0ad469b..fa81c2f 100644 --- a/crawler_test.go +++ b/crawler_test.go @@ -33,7 +33,7 @@ func TestCrawler(t *testing.T) { ) var crawledPages int - h := HandlerFunc(func(p Publisher, u string, depth int, resp *http.Response, err error) error { + h := HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error { crawledPages++ next := fmt.Sprintf(srv.URL+"/page/%d", crawledPages) log.Printf("%s -> %s", u, next) @@ -28,6 +28,7 @@ type queuePair struct { URL string Depth int + Tag int } // Scan the pending queue and send items on 'ch'. Returns an error @@ -58,10 +59,10 @@ func (q *queue) Scan(ch chan<- queuePair) error { } // Add an item to the pending work queue. -func (q *queue) Add(wb *leveldb.Batch, urlStr string, depth int, when time.Time) error { +func (q *queue) Add(wb *leveldb.Batch, urlStr string, tag, depth int, when time.Time) error { t := uint64(when.UnixNano()) qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep) - return q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth}) + return q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Tag: tag, Depth: depth}) } func (q *queue) acquire(qp queuePair) error { @@ -87,7 +88,7 @@ func (q *queue) Release(wb *leveldb.Batch, qp queuePair) { // Retry processing this item at a later time. func (q *queue) Retry(wb *leveldb.Batch, qp queuePair, delay time.Duration) error { wb.Delete(activeQueueKey(qp.key)) - if err := q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay)); err != nil { + if err := q.Add(wb, qp.URL, qp.Tag, qp.Depth, time.Now().Add(delay)); err != nil { return err } atomic.AddInt32(&q.numActive, -1) |