diff options
author | ale <ale@incal.net> | 2020-02-17 21:39:06 +0000 |
---|---|---|
committer | ale <ale@incal.net> | 2020-02-17 21:40:29 +0000 |
commit | 533f472553d6db42a1ae704285e33f53cf90f81d (patch) | |
tree | 122c472cc685e567d25794357c90ff92b7165b1c | |
parent | fec78595f9986cb908ef1ff61cfb3a5828986456 (diff) | |
download | crawl-533f472553d6db42a1ae704285e33f53cf90f81d.tar.gz crawl-533f472553d6db42a1ae704285e33f53cf90f81d.zip |
Propagate the link tag through redirects
In order to do this we have to plumb it through the queue and the
Handler interface, but it should allow fetches of the resources
associated with a page via the IncludeRelatedScope even if it's behind
a redirect.
-rw-r--r-- | cmd/crawl/crawl.go | 4 | ||||
-rw-r--r-- | crawler.go | 26 | ||||
-rw-r--r-- | crawler_test.go | 2 | ||||
-rw-r--r-- | queue.go | 7 |
4 files changed, 20 insertions, 19 deletions
diff --git a/cmd/crawl/crawl.go b/cmd/crawl/crawl.go index 54bb505..a79e0a6 100644 --- a/cmd/crawl/crawl.go +++ b/cmd/crawl/crawl.go @@ -33,7 +33,7 @@ var ( concurrency = flag.Int("c", 10, "concurrent workers") depth = flag.Int("depth", 100, "maximum link depth") validSchemes = flag.String("schemes", "http,https", "comma-separated list of allowed protocols") - excludeRelated = flag.Bool("exclude-related", false, "include related resources (css, images, etc) only if their URL is in scope") + excludeRelated = flag.Bool("exclude-related", false, "do not include related resources (css, images, etc) if their URL is not in scope") outputFile = flag.String("output", "crawl.warc.gz", "output WARC file or pattern (patterns must include a \"%s\" literal token)") warcFileSizeMB = flag.Int("output-max-size", 100, "maximum output WARC file size (in MB) when using patterns") cpuprofile = flag.String("cpuprofile", "", "create cpu profile") @@ -127,7 +127,7 @@ func (h *warcSaveHandler) writeWARCRecord(typ, uri string, data []byte) error { return w.Close() } -func (h *warcSaveHandler) Handle(p crawl.Publisher, u string, depth int, resp *http.Response, _ error) error { +func (h *warcSaveHandler) Handle(p crawl.Publisher, u string, tag, depth int, resp *http.Response, _ error) error { // Read the response body (so we can save it to the WARC // output) and replace it with a buffer. data, derr := ioutil.ReadAll(resp.Body) @@ -112,15 +112,15 @@ func (f FetcherFunc) Fetch(u string) (*http.Response, error) { // unless the handler returns the special error ErrRetryRequest. type Handler interface { // Handle the response from a URL. - Handle(Publisher, string, int, *http.Response, error) error + Handle(Publisher, string, int, int, *http.Response, error) error } // HandlerFunc wraps a function into the Handler interface. -type HandlerFunc func(Publisher, string, int, *http.Response, error) error +type HandlerFunc func(Publisher, string, int, int, *http.Response, error) error // Handle the response from a URL. -func (f HandlerFunc) Handle(p Publisher, u string, depth int, resp *http.Response, err error) error { - return f(p, u, depth, resp, err) +func (f HandlerFunc) Handle(p Publisher, u string, tag, depth int, resp *http.Response, err error) error { + return f(p, u, tag, depth, resp, err) } // ErrRetryRequest is returned by a Handler when the request should be @@ -197,7 +197,7 @@ func (c *Crawler) Enqueue(link Outlink, depth int) error { // sure that subsequent calls to Enqueue with the same URL // will fail. wb := new(leveldb.Batch) - if err := c.queue.Add(wb, link.URL.String(), depth, time.Now()); err != nil { + if err := c.queue.Add(wb, link.URL.String(), link.Tag, depth, time.Now()); err != nil { return err } c.setSeen(wb, link.URL) @@ -250,7 +250,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) { // Invoke the handler (even if the fetcher errored // out). Errors in handling requests are fatal, crawl // will be aborted. - err := c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr) + err := c.handler.Handle(c, p.URL, p.Tag, p.Depth, httpResp, httpErr) if httpErr == nil { respBody.Close() // nolint } @@ -347,8 +347,8 @@ func (c *Crawler) Close() { // and adds them to the queue for crawling. It will call the wrapped // handler on all requests regardless. func FollowRedirects(wrap Handler) Handler { - return HandlerFunc(func(p Publisher, u string, depth int, resp *http.Response, err error) error { - if herr := wrap.Handle(p, u, depth, resp, err); herr != nil { + return HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error { + if herr := wrap.Handle(p, u, tag, depth, resp, err); herr != nil { return herr } @@ -362,7 +362,7 @@ func FollowRedirects(wrap Handler) Handler { if uerr != nil { log.Printf("error parsing Location header: %v", uerr) } else { - return p.Enqueue(Outlink{URL: locationURL, Tag: TagPrimary}, depth+1) + return p.Enqueue(Outlink{URL: locationURL, Tag: tag}, depth+1) } } return nil @@ -373,14 +373,14 @@ func FollowRedirects(wrap Handler) Handler { // "successful" HTTP status code (anything < 400). When using this // wrapper, subsequent Handle calls will always have err set to nil. func FilterErrors(wrap Handler) Handler { - return HandlerFunc(func(p Publisher, u string, depth int, resp *http.Response, err error) error { + return HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error { if err != nil { return nil } if resp.StatusCode >= 400 { return nil } - return wrap.Handle(p, u, depth, resp, nil) + return wrap.Handle(p, u, tag, depth, resp, nil) }) } @@ -388,11 +388,11 @@ func FilterErrors(wrap Handler) Handler { // temporary errors (all transport-level errors are considered // temporary, as well as any HTTP status code >= 500). func HandleRetries(wrap Handler) Handler { - return HandlerFunc(func(p Publisher, u string, depth int, resp *http.Response, err error) error { + return HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error { if err != nil || resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= 500 { return ErrRetryRequest } - return wrap.Handle(p, u, depth, resp, nil) + return wrap.Handle(p, u, tag, depth, resp, nil) }) } diff --git a/crawler_test.go b/crawler_test.go index 0ad469b..fa81c2f 100644 --- a/crawler_test.go +++ b/crawler_test.go @@ -33,7 +33,7 @@ func TestCrawler(t *testing.T) { ) var crawledPages int - h := HandlerFunc(func(p Publisher, u string, depth int, resp *http.Response, err error) error { + h := HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error { crawledPages++ next := fmt.Sprintf(srv.URL+"/page/%d", crawledPages) log.Printf("%s -> %s", u, next) @@ -28,6 +28,7 @@ type queuePair struct { URL string Depth int + Tag int } // Scan the pending queue and send items on 'ch'. Returns an error @@ -58,10 +59,10 @@ func (q *queue) Scan(ch chan<- queuePair) error { } // Add an item to the pending work queue. -func (q *queue) Add(wb *leveldb.Batch, urlStr string, depth int, when time.Time) error { +func (q *queue) Add(wb *leveldb.Batch, urlStr string, tag, depth int, when time.Time) error { t := uint64(when.UnixNano()) qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep) - return q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth}) + return q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Tag: tag, Depth: depth}) } func (q *queue) acquire(qp queuePair) error { @@ -87,7 +88,7 @@ func (q *queue) Release(wb *leveldb.Batch, qp queuePair) { // Retry processing this item at a later time. func (q *queue) Retry(wb *leveldb.Batch, qp queuePair, delay time.Duration) error { wb.Delete(activeQueueKey(qp.key)) - if err := q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay)); err != nil { + if err := q.Add(wb, qp.URL, qp.Tag, qp.Depth, time.Now().Add(delay)); err != nil { return err } atomic.AddInt32(&q.numActive, -1) |