diff options
author | ale <ale@incal.net> | 2014-12-20 13:10:17 +0000 |
---|---|---|
committer | ale <ale@incal.net> | 2014-12-20 13:10:17 +0000 |
commit | 3af8c9a229abb85ad75bef1c3526e44435befd02 (patch) | |
tree | ffe2794c6a65864e4bf3b1a0a5bc2dde05133a67 | |
parent | 7c1b1f70bdae7b28dad864570d6611321a703df0 (diff) | |
download | crawl-3af8c9a229abb85ad75bef1c3526e44435befd02.tar.gz crawl-3af8c9a229abb85ad75bef1c3526e44435befd02.zip |
move URLInfo logic into the Crawler itself
-rw-r--r-- | cmd/crawl/crawl.go | 2 | ||||
-rw-r--r-- | cmd/links/links.go | 2 | ||||
-rw-r--r-- | crawler.go | 78 |
3 files changed, 47 insertions, 35 deletions
diff --git a/cmd/crawl/crawl.go b/cmd/crawl/crawl.go index 8c02089..63a5924 100644 --- a/cmd/crawl/crawl.go +++ b/cmd/crawl/crawl.go @@ -135,7 +135,7 @@ func main() { saver := NewSaveHandler(w) - crawler, err := crawl.NewCrawler("crawldb", seeds, scope, crawl.FetcherFunc(http.Get), saver) + crawler, err := crawl.NewCrawler("crawldb", seeds, scope, crawl.FetcherFunc(http.Get), crawl.NewRedirectHandler(saver)) if err != nil { log.Fatal(err) } diff --git a/cmd/links/links.go b/cmd/links/links.go index e89e22d..eb97577 100644 --- a/cmd/links/links.go +++ b/cmd/links/links.go @@ -44,7 +44,7 @@ func main() { crawl.NewSeedScope(seeds), } - crawler, err := crawl.NewCrawler("crawldb", seeds, scope, crawl.FetcherFunc(http.Get), crawl.HandlerFunc(extractLinks)) + crawler, err := crawl.NewCrawler("crawldb", seeds, scope, crawl.FetcherFunc(http.Get), crawl.NewRedirectHandler(crawl.HandlerFunc(extractLinks))) if err != nil { log.Fatal(err) } @@ -5,6 +5,7 @@ import ( "encoding/gob" "errors" "fmt" + "io" "log" "net/http" "net/url" @@ -76,6 +77,7 @@ type URLInfo struct { Error error } +// A Fetcher retrieves contents from remote URLs. type Fetcher interface { Fetch(string) (*http.Response, error) } @@ -86,6 +88,9 @@ func (f FetcherFunc) Fetch(u string) (*http.Response, error) { return f(u) } +// A Handler processes crawled contents. Any errors returned by public +// implementations of this interface are considered permanent and will +// not cause the URL to be fetched again. type Handler interface { Handle(*Crawler, string, int, *http.Response, error) error } @@ -96,7 +101,7 @@ func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Respons return f(db, u, depth, resp, err) } -// UrlDb is the database of crawled URLs (either pending or done). +// The Crawler object contains the crawler state. type Crawler struct { db *gobDB seeds []*url.URL @@ -107,19 +112,12 @@ type Crawler struct { enqueueMx sync.Mutex } -type QueuePair struct { +type queuePair struct { Key []byte URL string Depth int } -// Update this URLInfo entry in the crawl database. -func (c *Crawler) UpdateURL(info *URLInfo) error { - wo := levigo.NewWriteOptions() - defer wo.Close() - return c.db.PutObj(wo, []byte(fmt.Sprintf("url/%s", info.URL)), info) -} - // Enqueue a (possibly new) URL for processing. func (c *Crawler) Enqueue(u *url.URL, depth int) { // Normalize the URL. @@ -153,13 +151,13 @@ func (c *Crawler) Enqueue(u *url.URL, depth int) { // URL will fail. wo := levigo.NewWriteOptions() defer wo.Close() - c.db.PutObj(wo, qkey, &QueuePair{Key: qkey, URL: urlStr, Depth: depth}) + c.db.PutObj(wo, qkey, &queuePair{Key: qkey, URL: urlStr, Depth: depth}) c.db.PutObj(wo, ukey, &info) } // Scan the queue for URLs until there are no more. -func (c *Crawler) process() <-chan QueuePair { - ch := make(chan QueuePair) +func (c *Crawler) process() <-chan queuePair { + ch := make(chan queuePair) go func() { queuePrefix := []byte("queue/") for range time.Tick(2 * time.Second) { @@ -173,7 +171,7 @@ func (c *Crawler) process() <-chan QueuePair { iter := c.db.NewPrefixIterator(ro, queuePrefix) for ; iter.Valid(); iter.Next() { - var p QueuePair + var p queuePair if err := iter.Value(&p); err != nil { continue } @@ -195,27 +193,42 @@ func (c *Crawler) process() <-chan QueuePair { } // Main worker loop. -func (c *Crawler) urlHandler(queue <-chan QueuePair) { +func (c *Crawler) urlHandler(queue <-chan queuePair) { for p := range queue { + // Retrieve the URLInfo object from the crawl db. + // Ignore errors, we can work with an empty object. + urlkey := []byte(fmt.Sprintf("url/%s", p.URL)) + var info URLInfo + ro := levigo.NewReadOptions() + c.db.GetObj(ro, urlkey, &info) + info.CrawledAt = time.Now() + info.URL = p.URL + // Fetch the URL and handle it. Make sure to Close the // response body (even if it gets replaced in the // Response object). fmt.Printf("%s\n", p.URL) httpResp, httpErr := c.fetcher.Fetch(p.URL) - respBody := httpResp.Body - err := c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr) + var respBody io.ReadCloser if httpErr == nil { - respBody.Close() + respBody = httpResp.Body + info.StatusCode = httpResp.StatusCode } - // Remove the URL from the queue if the handler was successful. - if err == nil { - wo := levigo.NewWriteOptions() + // Invoke the handler (even if the fetcher errored out). + info.Error = c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr) + + wo := levigo.NewWriteOptions() + if httpErr == nil { + respBody.Close() + + // Remove the URL from the queue if the fetcher was successful. c.db.Delete(wo, p.Key) - wo.Close() } else { - log.Printf("error handling %s: %v", p.URL, err) + log.Printf("error retrieving %s: %v", p.URL, httpErr) } + c.db.PutObj(wo, urlkey, &info) + wo.Close() } } @@ -225,7 +238,7 @@ func MustParseURLs(urls []string) []*url.URL { for _, s := range urls { u, err := url.Parse(s) if err != nil { - log.Fatalf("error parsing seed \"%s\": %v", s, err) + log.Fatalf("error parsing URL \"%s\": %v", s, err) } parsed = append(parsed, u) } @@ -242,7 +255,7 @@ func NewCrawler(path string, seeds []*url.URL, scopes []Scope, f Fetcher, h Hand c := &Crawler{ db: db, fetcher: f, - handler: &standardPageHandler{h}, + handler: h, seeds: seeds, scopes: scopes, } @@ -270,16 +283,12 @@ func (c *Crawler) Run(concurrency int) { wg.Wait() } -// Standard page handler, follows redirects and invokes a child -// handler when status == 200. -type standardPageHandler struct { +type redirectHandler struct { h Handler } -func (wrap *standardPageHandler) Handle(c *Crawler, u string, depth int, resp *http.Response, err error) error { - info := &URLInfo{URL: u, CrawledAt: time.Now()} +func (wrap *redirectHandler) Handle(c *Crawler, u string, depth int, resp *http.Response, err error) error { if err == nil { - info.StatusCode = resp.StatusCode if resp.StatusCode == 200 { err = wrap.h.Handle(c, u, depth, resp, err) } else if resp.StatusCode > 300 && resp.StatusCode < 400 { @@ -296,8 +305,11 @@ func (wrap *standardPageHandler) Handle(c *Crawler, u string, depth int, resp *h err = errors.New(resp.Status) } } - info.Error = err + return err +} - c.UpdateURL(info) - return nil +// NewRedirectHandler returns a Handler that follows HTTP redirects, +// and will call the wrapped handler on every request with HTTP status 200. +func NewRedirectHandler(wrap Handler) Handler { + return &redirectHandler{wrap} } |