aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorale <ale@incal.net>2014-12-20 13:10:17 +0000
committerale <ale@incal.net>2014-12-20 13:10:17 +0000
commit3af8c9a229abb85ad75bef1c3526e44435befd02 (patch)
treeffe2794c6a65864e4bf3b1a0a5bc2dde05133a67
parent7c1b1f70bdae7b28dad864570d6611321a703df0 (diff)
downloadcrawl-3af8c9a229abb85ad75bef1c3526e44435befd02.tar.gz
crawl-3af8c9a229abb85ad75bef1c3526e44435befd02.zip
move URLInfo logic into the Crawler itself
-rw-r--r--cmd/crawl/crawl.go2
-rw-r--r--cmd/links/links.go2
-rw-r--r--crawler.go78
3 files changed, 47 insertions, 35 deletions
diff --git a/cmd/crawl/crawl.go b/cmd/crawl/crawl.go
index 8c02089..63a5924 100644
--- a/cmd/crawl/crawl.go
+++ b/cmd/crawl/crawl.go
@@ -135,7 +135,7 @@ func main() {
saver := NewSaveHandler(w)
- crawler, err := crawl.NewCrawler("crawldb", seeds, scope, crawl.FetcherFunc(http.Get), saver)
+ crawler, err := crawl.NewCrawler("crawldb", seeds, scope, crawl.FetcherFunc(http.Get), crawl.NewRedirectHandler(saver))
if err != nil {
log.Fatal(err)
}
diff --git a/cmd/links/links.go b/cmd/links/links.go
index e89e22d..eb97577 100644
--- a/cmd/links/links.go
+++ b/cmd/links/links.go
@@ -44,7 +44,7 @@ func main() {
crawl.NewSeedScope(seeds),
}
- crawler, err := crawl.NewCrawler("crawldb", seeds, scope, crawl.FetcherFunc(http.Get), crawl.HandlerFunc(extractLinks))
+ crawler, err := crawl.NewCrawler("crawldb", seeds, scope, crawl.FetcherFunc(http.Get), crawl.NewRedirectHandler(crawl.HandlerFunc(extractLinks)))
if err != nil {
log.Fatal(err)
}
diff --git a/crawler.go b/crawler.go
index 37f5b82..52317fb 100644
--- a/crawler.go
+++ b/crawler.go
@@ -5,6 +5,7 @@ import (
"encoding/gob"
"errors"
"fmt"
+ "io"
"log"
"net/http"
"net/url"
@@ -76,6 +77,7 @@ type URLInfo struct {
Error error
}
+// A Fetcher retrieves contents from remote URLs.
type Fetcher interface {
Fetch(string) (*http.Response, error)
}
@@ -86,6 +88,9 @@ func (f FetcherFunc) Fetch(u string) (*http.Response, error) {
return f(u)
}
+// A Handler processes crawled contents. Any errors returned by public
+// implementations of this interface are considered permanent and will
+// not cause the URL to be fetched again.
type Handler interface {
Handle(*Crawler, string, int, *http.Response, error) error
}
@@ -96,7 +101,7 @@ func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Respons
return f(db, u, depth, resp, err)
}
-// UrlDb is the database of crawled URLs (either pending or done).
+// The Crawler object contains the crawler state.
type Crawler struct {
db *gobDB
seeds []*url.URL
@@ -107,19 +112,12 @@ type Crawler struct {
enqueueMx sync.Mutex
}
-type QueuePair struct {
+type queuePair struct {
Key []byte
URL string
Depth int
}
-// Update this URLInfo entry in the crawl database.
-func (c *Crawler) UpdateURL(info *URLInfo) error {
- wo := levigo.NewWriteOptions()
- defer wo.Close()
- return c.db.PutObj(wo, []byte(fmt.Sprintf("url/%s", info.URL)), info)
-}
-
// Enqueue a (possibly new) URL for processing.
func (c *Crawler) Enqueue(u *url.URL, depth int) {
// Normalize the URL.
@@ -153,13 +151,13 @@ func (c *Crawler) Enqueue(u *url.URL, depth int) {
// URL will fail.
wo := levigo.NewWriteOptions()
defer wo.Close()
- c.db.PutObj(wo, qkey, &QueuePair{Key: qkey, URL: urlStr, Depth: depth})
+ c.db.PutObj(wo, qkey, &queuePair{Key: qkey, URL: urlStr, Depth: depth})
c.db.PutObj(wo, ukey, &info)
}
// Scan the queue for URLs until there are no more.
-func (c *Crawler) process() <-chan QueuePair {
- ch := make(chan QueuePair)
+func (c *Crawler) process() <-chan queuePair {
+ ch := make(chan queuePair)
go func() {
queuePrefix := []byte("queue/")
for range time.Tick(2 * time.Second) {
@@ -173,7 +171,7 @@ func (c *Crawler) process() <-chan QueuePair {
iter := c.db.NewPrefixIterator(ro, queuePrefix)
for ; iter.Valid(); iter.Next() {
- var p QueuePair
+ var p queuePair
if err := iter.Value(&p); err != nil {
continue
}
@@ -195,27 +193,42 @@ func (c *Crawler) process() <-chan QueuePair {
}
// Main worker loop.
-func (c *Crawler) urlHandler(queue <-chan QueuePair) {
+func (c *Crawler) urlHandler(queue <-chan queuePair) {
for p := range queue {
+ // Retrieve the URLInfo object from the crawl db.
+ // Ignore errors, we can work with an empty object.
+ urlkey := []byte(fmt.Sprintf("url/%s", p.URL))
+ var info URLInfo
+ ro := levigo.NewReadOptions()
+ c.db.GetObj(ro, urlkey, &info)
+ info.CrawledAt = time.Now()
+ info.URL = p.URL
+
// Fetch the URL and handle it. Make sure to Close the
// response body (even if it gets replaced in the
// Response object).
fmt.Printf("%s\n", p.URL)
httpResp, httpErr := c.fetcher.Fetch(p.URL)
- respBody := httpResp.Body
- err := c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr)
+ var respBody io.ReadCloser
if httpErr == nil {
- respBody.Close()
+ respBody = httpResp.Body
+ info.StatusCode = httpResp.StatusCode
}
- // Remove the URL from the queue if the handler was successful.
- if err == nil {
- wo := levigo.NewWriteOptions()
+ // Invoke the handler (even if the fetcher errored out).
+ info.Error = c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr)
+
+ wo := levigo.NewWriteOptions()
+ if httpErr == nil {
+ respBody.Close()
+
+ // Remove the URL from the queue if the fetcher was successful.
c.db.Delete(wo, p.Key)
- wo.Close()
} else {
- log.Printf("error handling %s: %v", p.URL, err)
+ log.Printf("error retrieving %s: %v", p.URL, httpErr)
}
+ c.db.PutObj(wo, urlkey, &info)
+ wo.Close()
}
}
@@ -225,7 +238,7 @@ func MustParseURLs(urls []string) []*url.URL {
for _, s := range urls {
u, err := url.Parse(s)
if err != nil {
- log.Fatalf("error parsing seed \"%s\": %v", s, err)
+ log.Fatalf("error parsing URL \"%s\": %v", s, err)
}
parsed = append(parsed, u)
}
@@ -242,7 +255,7 @@ func NewCrawler(path string, seeds []*url.URL, scopes []Scope, f Fetcher, h Hand
c := &Crawler{
db: db,
fetcher: f,
- handler: &standardPageHandler{h},
+ handler: h,
seeds: seeds,
scopes: scopes,
}
@@ -270,16 +283,12 @@ func (c *Crawler) Run(concurrency int) {
wg.Wait()
}
-// Standard page handler, follows redirects and invokes a child
-// handler when status == 200.
-type standardPageHandler struct {
+type redirectHandler struct {
h Handler
}
-func (wrap *standardPageHandler) Handle(c *Crawler, u string, depth int, resp *http.Response, err error) error {
- info := &URLInfo{URL: u, CrawledAt: time.Now()}
+func (wrap *redirectHandler) Handle(c *Crawler, u string, depth int, resp *http.Response, err error) error {
if err == nil {
- info.StatusCode = resp.StatusCode
if resp.StatusCode == 200 {
err = wrap.h.Handle(c, u, depth, resp, err)
} else if resp.StatusCode > 300 && resp.StatusCode < 400 {
@@ -296,8 +305,11 @@ func (wrap *standardPageHandler) Handle(c *Crawler, u string, depth int, resp *h
err = errors.New(resp.Status)
}
}
- info.Error = err
+ return err
+}
- c.UpdateURL(info)
- return nil
+// NewRedirectHandler returns a Handler that follows HTTP redirects,
+// and will call the wrapped handler on every request with HTTP status 200.
+func NewRedirectHandler(wrap Handler) Handler {
+ return &redirectHandler{wrap}
}