aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/crawl/crawl.go4
-rw-r--r--crawler.go26
-rw-r--r--crawler_test.go2
-rw-r--r--queue.go7
4 files changed, 20 insertions, 19 deletions
diff --git a/cmd/crawl/crawl.go b/cmd/crawl/crawl.go
index 54bb505..a79e0a6 100644
--- a/cmd/crawl/crawl.go
+++ b/cmd/crawl/crawl.go
@@ -33,7 +33,7 @@ var (
concurrency = flag.Int("c", 10, "concurrent workers")
depth = flag.Int("depth", 100, "maximum link depth")
validSchemes = flag.String("schemes", "http,https", "comma-separated list of allowed protocols")
- excludeRelated = flag.Bool("exclude-related", false, "include related resources (css, images, etc) only if their URL is in scope")
+ excludeRelated = flag.Bool("exclude-related", false, "do not include related resources (css, images, etc) if their URL is not in scope")
outputFile = flag.String("output", "crawl.warc.gz", "output WARC file or pattern (patterns must include a \"%s\" literal token)")
warcFileSizeMB = flag.Int("output-max-size", 100, "maximum output WARC file size (in MB) when using patterns")
cpuprofile = flag.String("cpuprofile", "", "create cpu profile")
@@ -127,7 +127,7 @@ func (h *warcSaveHandler) writeWARCRecord(typ, uri string, data []byte) error {
return w.Close()
}
-func (h *warcSaveHandler) Handle(p crawl.Publisher, u string, depth int, resp *http.Response, _ error) error {
+func (h *warcSaveHandler) Handle(p crawl.Publisher, u string, tag, depth int, resp *http.Response, _ error) error {
// Read the response body (so we can save it to the WARC
// output) and replace it with a buffer.
data, derr := ioutil.ReadAll(resp.Body)
diff --git a/crawler.go b/crawler.go
index b48646e..be728aa 100644
--- a/crawler.go
+++ b/crawler.go
@@ -112,15 +112,15 @@ func (f FetcherFunc) Fetch(u string) (*http.Response, error) {
// unless the handler returns the special error ErrRetryRequest.
type Handler interface {
// Handle the response from a URL.
- Handle(Publisher, string, int, *http.Response, error) error
+ Handle(Publisher, string, int, int, *http.Response, error) error
}
// HandlerFunc wraps a function into the Handler interface.
-type HandlerFunc func(Publisher, string, int, *http.Response, error) error
+type HandlerFunc func(Publisher, string, int, int, *http.Response, error) error
// Handle the response from a URL.
-func (f HandlerFunc) Handle(p Publisher, u string, depth int, resp *http.Response, err error) error {
- return f(p, u, depth, resp, err)
+func (f HandlerFunc) Handle(p Publisher, u string, tag, depth int, resp *http.Response, err error) error {
+ return f(p, u, tag, depth, resp, err)
}
// ErrRetryRequest is returned by a Handler when the request should be
@@ -197,7 +197,7 @@ func (c *Crawler) Enqueue(link Outlink, depth int) error {
// sure that subsequent calls to Enqueue with the same URL
// will fail.
wb := new(leveldb.Batch)
- if err := c.queue.Add(wb, link.URL.String(), depth, time.Now()); err != nil {
+ if err := c.queue.Add(wb, link.URL.String(), link.Tag, depth, time.Now()); err != nil {
return err
}
c.setSeen(wb, link.URL)
@@ -250,7 +250,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) {
// Invoke the handler (even if the fetcher errored
// out). Errors in handling requests are fatal, crawl
// will be aborted.
- err := c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr)
+ err := c.handler.Handle(c, p.URL, p.Tag, p.Depth, httpResp, httpErr)
if httpErr == nil {
respBody.Close() // nolint
}
@@ -347,8 +347,8 @@ func (c *Crawler) Close() {
// and adds them to the queue for crawling. It will call the wrapped
// handler on all requests regardless.
func FollowRedirects(wrap Handler) Handler {
- return HandlerFunc(func(p Publisher, u string, depth int, resp *http.Response, err error) error {
- if herr := wrap.Handle(p, u, depth, resp, err); herr != nil {
+ return HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error {
+ if herr := wrap.Handle(p, u, tag, depth, resp, err); herr != nil {
return herr
}
@@ -362,7 +362,7 @@ func FollowRedirects(wrap Handler) Handler {
if uerr != nil {
log.Printf("error parsing Location header: %v", uerr)
} else {
- return p.Enqueue(Outlink{URL: locationURL, Tag: TagPrimary}, depth+1)
+ return p.Enqueue(Outlink{URL: locationURL, Tag: tag}, depth+1)
}
}
return nil
@@ -373,14 +373,14 @@ func FollowRedirects(wrap Handler) Handler {
// "successful" HTTP status code (anything < 400). When using this
// wrapper, subsequent Handle calls will always have err set to nil.
func FilterErrors(wrap Handler) Handler {
- return HandlerFunc(func(p Publisher, u string, depth int, resp *http.Response, err error) error {
+ return HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error {
if err != nil {
return nil
}
if resp.StatusCode >= 400 {
return nil
}
- return wrap.Handle(p, u, depth, resp, nil)
+ return wrap.Handle(p, u, tag, depth, resp, nil)
})
}
@@ -388,11 +388,11 @@ func FilterErrors(wrap Handler) Handler {
// temporary errors (all transport-level errors are considered
// temporary, as well as any HTTP status code >= 500).
func HandleRetries(wrap Handler) Handler {
- return HandlerFunc(func(p Publisher, u string, depth int, resp *http.Response, err error) error {
+ return HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error {
if err != nil || resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= 500 {
return ErrRetryRequest
}
- return wrap.Handle(p, u, depth, resp, nil)
+ return wrap.Handle(p, u, tag, depth, resp, nil)
})
}
diff --git a/crawler_test.go b/crawler_test.go
index 0ad469b..fa81c2f 100644
--- a/crawler_test.go
+++ b/crawler_test.go
@@ -33,7 +33,7 @@ func TestCrawler(t *testing.T) {
)
var crawledPages int
- h := HandlerFunc(func(p Publisher, u string, depth int, resp *http.Response, err error) error {
+ h := HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error {
crawledPages++
next := fmt.Sprintf(srv.URL+"/page/%d", crawledPages)
log.Printf("%s -> %s", u, next)
diff --git a/queue.go b/queue.go
index ee0e7ed..cd4143c 100644
--- a/queue.go
+++ b/queue.go
@@ -28,6 +28,7 @@ type queuePair struct {
URL string
Depth int
+ Tag int
}
// Scan the pending queue and send items on 'ch'. Returns an error
@@ -58,10 +59,10 @@ func (q *queue) Scan(ch chan<- queuePair) error {
}
// Add an item to the pending work queue.
-func (q *queue) Add(wb *leveldb.Batch, urlStr string, depth int, when time.Time) error {
+func (q *queue) Add(wb *leveldb.Batch, urlStr string, tag, depth int, when time.Time) error {
t := uint64(when.UnixNano())
qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep)
- return q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth})
+ return q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Tag: tag, Depth: depth})
}
func (q *queue) acquire(qp queuePair) error {
@@ -87,7 +88,7 @@ func (q *queue) Release(wb *leveldb.Batch, qp queuePair) {
// Retry processing this item at a later time.
func (q *queue) Retry(wb *leveldb.Batch, qp queuePair, delay time.Duration) error {
wb.Delete(activeQueueKey(qp.key))
- if err := q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay)); err != nil {
+ if err := q.Add(wb, qp.URL, qp.Tag, qp.Depth, time.Now().Add(delay)); err != nil {
return err
}
atomic.AddInt32(&q.numActive, -1)