From 9fbc656c6cd2ad610986a265c6b346bc234bb881 Mon Sep 17 00:00:00 2001 From: ale Date: Mon, 29 Jun 2015 10:07:40 +0100 Subject: improve queue code; golint fixes The queuing code now performs proper lease accounting, and it will not return a URL twice if the page load is slow. --- crawler.go | 128 +++++++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 82 insertions(+), 46 deletions(-) (limited to 'crawler.go') diff --git a/crawler.go b/crawler.go index 52317fb..c337d97 100644 --- a/crawler.go +++ b/crawler.go @@ -40,6 +40,15 @@ func (db *gobDB) PutObj(wo *levigo.WriteOptions, key []byte, obj interface{}) er return db.Put(wo, key, b.Bytes()) } +func (db *gobDB) PutObjBatch(wb *levigo.WriteBatch, key []byte, obj interface{}) error { + var b bytes.Buffer + if err := gob.NewEncoder(&b).Encode(obj); err != nil { + return err + } + wb.Put(key, b.Bytes()) + return nil +} + func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) error { data, err := db.Get(ro, key) if err != nil { @@ -51,25 +60,61 @@ func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) err return nil } -func (db *gobDB) NewPrefixIterator(ro *levigo.ReadOptions, prefix []byte) *gobIterator { +func (db *gobDB) NewPrefixIterator(ro *levigo.ReadOptions, prefix []byte) *gobPrefixIterator { i := db.NewIterator(ro) i.Seek(prefix) - return &gobIterator{Iterator: i, prefix: prefix} + return newGobPrefixIterator(i, prefix) +} + +func (db *gobDB) NewRangeIterator(ro *levigo.ReadOptions, startKey, endKey []byte) *gobRangeIterator { + i := db.NewIterator(ro) + if startKey != nil { + i.Seek(startKey) + } + return newGobRangeIterator(i, endKey) } type gobIterator struct { *levigo.Iterator +} + +func (i *gobIterator) Value(obj interface{}) error { + return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj) +} + +type gobPrefixIterator struct { + *gobIterator prefix []byte } -func (i *gobIterator) Valid() bool { - return i.Iterator.Valid() && bytes.HasPrefix(i.Key(), i.prefix) +func (i *gobPrefixIterator) Valid() bool { + return i.gobIterator.Valid() && bytes.HasPrefix(i.Key(), i.prefix) } -func (i *gobIterator) Value(obj interface{}) error { - return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj) +func newGobPrefixIterator(i *levigo.Iterator, prefix []byte) *gobPrefixIterator { + return &gobPrefixIterator{ + gobIterator: &gobIterator{i}, + prefix: prefix, + } } +type gobRangeIterator struct { + *gobIterator + endKey []byte +} + +func (i *gobRangeIterator) Valid() bool { + return i.gobIterator.Valid() && (i.endKey == nil || bytes.Compare(i.Key(), i.endKey) < 0) +} + +func newGobRangeIterator(i *levigo.Iterator, endKey []byte) *gobRangeIterator { + return &gobRangeIterator{ + gobIterator: &gobIterator{i}, + endKey: endKey, + } +} + +// URLInfo stores information about a crawled URL. type URLInfo struct { URL string StatusCode int @@ -79,11 +124,14 @@ type URLInfo struct { // A Fetcher retrieves contents from remote URLs. type Fetcher interface { + // Fetch retrieves a URL and returns the response. Fetch(string) (*http.Response, error) } +// FetcherFunc wraps a simple function into the Fetcher interface. type FetcherFunc func(string) (*http.Response, error) +// Fetch retrieves a URL and returns the response. func (f FetcherFunc) Fetch(u string) (*http.Response, error) { return f(u) } @@ -92,11 +140,14 @@ func (f FetcherFunc) Fetch(u string) (*http.Response, error) { // implementations of this interface are considered permanent and will // not cause the URL to be fetched again. type Handler interface { + // Handle the response from a URL. Handle(*Crawler, string, int, *http.Response, error) error } +// HandlerFunc wraps a function into the Handler interface. type HandlerFunc func(*Crawler, string, int, *http.Response, error) error +// Handle the response from a URL. func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Response, err error) error { return f(db, u, depth, resp, err) } @@ -104,6 +155,7 @@ func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Respons // The Crawler object contains the crawler state. type Crawler struct { db *gobDB + queue *queue seeds []*url.URL scopes []Scope fetcher Fetcher @@ -112,12 +164,6 @@ type Crawler struct { enqueueMx sync.Mutex } -type queuePair struct { - Key []byte - URL string - Depth int -} - // Enqueue a (possibly new) URL for processing. func (c *Crawler) Enqueue(u *url.URL, depth int) { // Normalize the URL. @@ -143,47 +189,24 @@ func (c *Crawler) Enqueue(u *url.URL, depth int) { return } - // Create a unique key using the URL and the current timestamp. - qkey := []byte(fmt.Sprintf("queue/%d/%s", time.Now().Unix(), urlStr)) - // Store the URL in the queue, and store an empty URLInfo to // make sure that subsequent calls to Enqueue with the same // URL will fail. + wb := levigo.NewWriteBatch() + defer wb.Close() + c.queue.Add(wb, urlStr, depth, time.Now()) + c.db.PutObjBatch(wb, ukey, &info) wo := levigo.NewWriteOptions() defer wo.Close() - c.db.PutObj(wo, qkey, &queuePair{Key: qkey, URL: urlStr, Depth: depth}) - c.db.PutObj(wo, ukey, &info) + c.db.Write(wo, wb) } // Scan the queue for URLs until there are no more. func (c *Crawler) process() <-chan queuePair { ch := make(chan queuePair) go func() { - queuePrefix := []byte("queue/") for range time.Tick(2 * time.Second) { - n := 0 - - // Scan the queue using a snapshot, to ignore - // new URLs that might be added after this. - s := c.db.NewSnapshot() - ro := levigo.NewReadOptions() - ro.SetSnapshot(s) - - iter := c.db.NewPrefixIterator(ro, queuePrefix) - for ; iter.Valid(); iter.Next() { - var p queuePair - if err := iter.Value(&p); err != nil { - continue - } - ch <- p - n++ - } - iter.Close() - - ro.Close() - c.db.ReleaseSnapshot(s) - - if n == 0 { + if err := c.queue.Scan(ch); err != nil { break } } @@ -218,20 +241,27 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) { // Invoke the handler (even if the fetcher errored out). info.Error = c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr) - wo := levigo.NewWriteOptions() + wb := levigo.NewWriteBatch() if httpErr == nil { respBody.Close() // Remove the URL from the queue if the fetcher was successful. - c.db.Delete(wo, p.Key) + c.queue.Release(wb, p) } else { log.Printf("error retrieving %s: %v", p.URL, httpErr) + c.queue.Retry(wb, p, 300*time.Second) } - c.db.PutObj(wo, urlkey, &info) + + c.db.PutObjBatch(wb, urlkey, &info) + + wo := levigo.NewWriteOptions() + c.db.Write(wo, wb) wo.Close() + wb.Close() } } +// MustParseURLs parses a list of URLs and aborts on failure. func MustParseURLs(urls []string) []*url.URL { // Parse the seed URLs. var parsed []*url.URL @@ -252,13 +282,19 @@ func NewCrawler(path string, seeds []*url.URL, scopes []Scope, f Fetcher, h Hand if err != nil { return nil, err } + c := &Crawler{ db: db, + queue: &queue{db: db}, fetcher: f, handler: h, seeds: seeds, scopes: scopes, } + + // Recover active tasks. + c.queue.Recover() + return c, nil } @@ -294,11 +330,11 @@ func (wrap *redirectHandler) Handle(c *Crawler, u string, depth int, resp *http. } else if resp.StatusCode > 300 && resp.StatusCode < 400 { location := resp.Header.Get("Location") if location != "" { - locationUrl, err := resp.Request.URL.Parse(location) + locationURL, err := resp.Request.URL.Parse(location) if err != nil { log.Printf("error parsing Location header: %v", err) } else { - c.Enqueue(locationUrl, depth+1) + c.Enqueue(locationURL, depth+1) } } } else { -- cgit v1.2.3-54-g00ecf