From 9fbc656c6cd2ad610986a265c6b346bc234bb881 Mon Sep 17 00:00:00 2001 From: ale Date: Mon, 29 Jun 2015 10:07:40 +0100 Subject: improve queue code; golint fixes The queuing code now performs proper lease accounting, and it will not return a URL twice if the page load is slow. --- analysis/links.go | 2 + crawler.go | 128 +++++++++++++++++++++++++++++++------------------ queue.go | 141 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ scope.go | 10 +++- 4 files changed, 233 insertions(+), 48 deletions(-) create mode 100644 queue.go diff --git a/analysis/links.go b/analysis/links.go index 22bcb80..a51a467 100644 --- a/analysis/links.go +++ b/analysis/links.go @@ -27,6 +27,8 @@ var ( } ) +// GetLinks returns all the links found in a document. Currently only +// parses HTML pages and CSS stylesheets. func GetLinks(resp *http.Response) ([]*url.URL, error) { var outlinks []string diff --git a/crawler.go b/crawler.go index 52317fb..c337d97 100644 --- a/crawler.go +++ b/crawler.go @@ -40,6 +40,15 @@ func (db *gobDB) PutObj(wo *levigo.WriteOptions, key []byte, obj interface{}) er return db.Put(wo, key, b.Bytes()) } +func (db *gobDB) PutObjBatch(wb *levigo.WriteBatch, key []byte, obj interface{}) error { + var b bytes.Buffer + if err := gob.NewEncoder(&b).Encode(obj); err != nil { + return err + } + wb.Put(key, b.Bytes()) + return nil +} + func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) error { data, err := db.Get(ro, key) if err != nil { @@ -51,25 +60,61 @@ func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) err return nil } -func (db *gobDB) NewPrefixIterator(ro *levigo.ReadOptions, prefix []byte) *gobIterator { +func (db *gobDB) NewPrefixIterator(ro *levigo.ReadOptions, prefix []byte) *gobPrefixIterator { i := db.NewIterator(ro) i.Seek(prefix) - return &gobIterator{Iterator: i, prefix: prefix} + return newGobPrefixIterator(i, prefix) +} + +func (db *gobDB) NewRangeIterator(ro *levigo.ReadOptions, startKey, endKey []byte) *gobRangeIterator { + i := db.NewIterator(ro) + if startKey != nil { + i.Seek(startKey) + } + return newGobRangeIterator(i, endKey) } type gobIterator struct { *levigo.Iterator +} + +func (i *gobIterator) Value(obj interface{}) error { + return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj) +} + +type gobPrefixIterator struct { + *gobIterator prefix []byte } -func (i *gobIterator) Valid() bool { - return i.Iterator.Valid() && bytes.HasPrefix(i.Key(), i.prefix) +func (i *gobPrefixIterator) Valid() bool { + return i.gobIterator.Valid() && bytes.HasPrefix(i.Key(), i.prefix) } -func (i *gobIterator) Value(obj interface{}) error { - return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj) +func newGobPrefixIterator(i *levigo.Iterator, prefix []byte) *gobPrefixIterator { + return &gobPrefixIterator{ + gobIterator: &gobIterator{i}, + prefix: prefix, + } } +type gobRangeIterator struct { + *gobIterator + endKey []byte +} + +func (i *gobRangeIterator) Valid() bool { + return i.gobIterator.Valid() && (i.endKey == nil || bytes.Compare(i.Key(), i.endKey) < 0) +} + +func newGobRangeIterator(i *levigo.Iterator, endKey []byte) *gobRangeIterator { + return &gobRangeIterator{ + gobIterator: &gobIterator{i}, + endKey: endKey, + } +} + +// URLInfo stores information about a crawled URL. type URLInfo struct { URL string StatusCode int @@ -79,11 +124,14 @@ type URLInfo struct { // A Fetcher retrieves contents from remote URLs. type Fetcher interface { + // Fetch retrieves a URL and returns the response. Fetch(string) (*http.Response, error) } +// FetcherFunc wraps a simple function into the Fetcher interface. type FetcherFunc func(string) (*http.Response, error) +// Fetch retrieves a URL and returns the response. func (f FetcherFunc) Fetch(u string) (*http.Response, error) { return f(u) } @@ -92,11 +140,14 @@ func (f FetcherFunc) Fetch(u string) (*http.Response, error) { // implementations of this interface are considered permanent and will // not cause the URL to be fetched again. type Handler interface { + // Handle the response from a URL. Handle(*Crawler, string, int, *http.Response, error) error } +// HandlerFunc wraps a function into the Handler interface. type HandlerFunc func(*Crawler, string, int, *http.Response, error) error +// Handle the response from a URL. func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Response, err error) error { return f(db, u, depth, resp, err) } @@ -104,6 +155,7 @@ func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Respons // The Crawler object contains the crawler state. type Crawler struct { db *gobDB + queue *queue seeds []*url.URL scopes []Scope fetcher Fetcher @@ -112,12 +164,6 @@ type Crawler struct { enqueueMx sync.Mutex } -type queuePair struct { - Key []byte - URL string - Depth int -} - // Enqueue a (possibly new) URL for processing. func (c *Crawler) Enqueue(u *url.URL, depth int) { // Normalize the URL. @@ -143,47 +189,24 @@ func (c *Crawler) Enqueue(u *url.URL, depth int) { return } - // Create a unique key using the URL and the current timestamp. - qkey := []byte(fmt.Sprintf("queue/%d/%s", time.Now().Unix(), urlStr)) - // Store the URL in the queue, and store an empty URLInfo to // make sure that subsequent calls to Enqueue with the same // URL will fail. + wb := levigo.NewWriteBatch() + defer wb.Close() + c.queue.Add(wb, urlStr, depth, time.Now()) + c.db.PutObjBatch(wb, ukey, &info) wo := levigo.NewWriteOptions() defer wo.Close() - c.db.PutObj(wo, qkey, &queuePair{Key: qkey, URL: urlStr, Depth: depth}) - c.db.PutObj(wo, ukey, &info) + c.db.Write(wo, wb) } // Scan the queue for URLs until there are no more. func (c *Crawler) process() <-chan queuePair { ch := make(chan queuePair) go func() { - queuePrefix := []byte("queue/") for range time.Tick(2 * time.Second) { - n := 0 - - // Scan the queue using a snapshot, to ignore - // new URLs that might be added after this. - s := c.db.NewSnapshot() - ro := levigo.NewReadOptions() - ro.SetSnapshot(s) - - iter := c.db.NewPrefixIterator(ro, queuePrefix) - for ; iter.Valid(); iter.Next() { - var p queuePair - if err := iter.Value(&p); err != nil { - continue - } - ch <- p - n++ - } - iter.Close() - - ro.Close() - c.db.ReleaseSnapshot(s) - - if n == 0 { + if err := c.queue.Scan(ch); err != nil { break } } @@ -218,20 +241,27 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) { // Invoke the handler (even if the fetcher errored out). info.Error = c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr) - wo := levigo.NewWriteOptions() + wb := levigo.NewWriteBatch() if httpErr == nil { respBody.Close() // Remove the URL from the queue if the fetcher was successful. - c.db.Delete(wo, p.Key) + c.queue.Release(wb, p) } else { log.Printf("error retrieving %s: %v", p.URL, httpErr) + c.queue.Retry(wb, p, 300*time.Second) } - c.db.PutObj(wo, urlkey, &info) + + c.db.PutObjBatch(wb, urlkey, &info) + + wo := levigo.NewWriteOptions() + c.db.Write(wo, wb) wo.Close() + wb.Close() } } +// MustParseURLs parses a list of URLs and aborts on failure. func MustParseURLs(urls []string) []*url.URL { // Parse the seed URLs. var parsed []*url.URL @@ -252,13 +282,19 @@ func NewCrawler(path string, seeds []*url.URL, scopes []Scope, f Fetcher, h Hand if err != nil { return nil, err } + c := &Crawler{ db: db, + queue: &queue{db: db}, fetcher: f, handler: h, seeds: seeds, scopes: scopes, } + + // Recover active tasks. + c.queue.Recover() + return c, nil } @@ -294,11 +330,11 @@ func (wrap *redirectHandler) Handle(c *Crawler, u string, depth int, resp *http. } else if resp.StatusCode > 300 && resp.StatusCode < 400 { location := resp.Header.Get("Location") if location != "" { - locationUrl, err := resp.Request.URL.Parse(location) + locationURL, err := resp.Request.URL.Parse(location) if err != nil { log.Printf("error parsing Location header: %v", err) } else { - c.Enqueue(locationUrl, depth+1) + c.Enqueue(locationURL, depth+1) } } } else { diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..5bad577 --- /dev/null +++ b/queue.go @@ -0,0 +1,141 @@ +package crawl + +import ( + "bytes" + "encoding/binary" + "errors" + "math/rand" + "sync/atomic" + "time" + + "github.com/jmhodges/levigo" +) + +type queue struct { + db *gobDB + numActive int32 +} + +var ( + queuePrefix = []byte("queue") + activePrefix = []byte("queue_active") + + queueKeySep = []byte{'/'} + queueKeySepP1 = []byte{'/' + 1} +) + +type queuePair struct { + key []byte + + URL string + Depth int +} + +// Scan the pending queue and send items on 'ch'. Returns an error +// when the queue is empty (work is done). +func (q *queue) Scan(ch chan<- queuePair) error { + snap := q.db.NewSnapshot() + defer q.db.ReleaseSnapshot(snap) + + ro := levigo.NewReadOptions() + ro.SetSnapshot(snap) + defer ro.Close() + + n := 0 + startKey, endKey := queueScanRange() + iter := q.db.NewRangeIterator(ro, startKey, endKey) + + for ; iter.Valid(); iter.Next() { + var p queuePair + if err := iter.Value(&p); err != nil { + continue + } + p.key = iter.Key() + q.acquire(p) + ch <- p + n++ + } + + if n == 0 && q.numActive == 0 { + return errors.New("EOF") + } + return nil +} + +// Add an item to the pending work queue. +func (q *queue) Add(wb *levigo.WriteBatch, urlStr string, depth int, when time.Time) { + t := uint64(when.UnixNano()) + qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep) + q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth}) +} + +func (q *queue) acquire(qp queuePair) { + wb := levigo.NewWriteBatch() + defer wb.Close() + + q.db.PutObjBatch(wb, activeQueueKey(qp.key), qp) + wb.Delete(qp.key) + + wo := levigo.NewWriteOptions() + defer wo.Close() + q.db.Write(wo, wb) + + atomic.AddInt32(&q.numActive, 1) +} + +// Release an item from the queue. Processing for this item is done. +func (q *queue) Release(wb *levigo.WriteBatch, qp queuePair) { + wb.Delete(activeQueueKey(qp.key)) + atomic.AddInt32(&q.numActive, -1) +} + +// Retry processing this item at a later time. +func (q *queue) Retry(wb *levigo.WriteBatch, qp queuePair, delay time.Duration) { + wb.Delete(activeQueueKey(qp.key)) + q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay)) + atomic.AddInt32(&q.numActive, -1) +} + +// Recover moves all active tasks to the pending queue. To be +// called at startup to recover tasks that were active when the +// previous run terminated. +func (q *queue) Recover() { + wb := levigo.NewWriteBatch() + defer wb.Close() + + ro := levigo.NewReadOptions() + defer ro.Close() + + prefix := bytes.Join([][]byte{activePrefix, []byte{}}, queueKeySep) + iter := q.db.NewPrefixIterator(ro, prefix) + for ; iter.Valid(); iter.Next() { + var p queuePair + if err := iter.Value(&p); err != nil { + continue + } + p.key = iter.Key()[len(activePrefix)+1:] + q.db.PutObjBatch(wb, p.key, &p) + wb.Delete(iter.Key()) + } + + wo := levigo.NewWriteOptions() + defer wo.Close() + q.db.Write(wo, wb) +} + +func encodeUint64(n uint64) []byte { + var b bytes.Buffer + binary.Write(&b, binary.BigEndian, n) + return b.Bytes() +} + +func activeQueueKey(key []byte) []byte { + return bytes.Join([][]byte{activePrefix, key}, queueKeySep) +} + +func queueScanRange() ([]byte, []byte) { + tlim := uint64(time.Now().UnixNano() + 1) + startKey := bytes.Join([][]byte{queuePrefix, []byte{}}, queueKeySep) + endKey := bytes.Join([][]byte{queuePrefix, encodeUint64(tlim)}, queueKeySep) + return startKey, endKey +} diff --git a/scope.go b/scope.go index ccba5f5..6a63018 100644 --- a/scope.go +++ b/scope.go @@ -7,7 +7,9 @@ import ( "strings" ) +// Scope defines the crawling scope. type Scope interface { + // Check a URL to see if it's in scope for crawling. Check(*url.URL, int) bool } @@ -48,14 +50,16 @@ func NewSchemeScope(schemes []string) Scope { // eventual "www." prefix. type URLPrefixMap map[string]struct{} -func normalizeUrlPrefix(uri *url.URL) string { +func normalizeURLPrefix(uri *url.URL) string { return strings.TrimPrefix(uri.Host, "www.") + strings.TrimSuffix(uri.Path, "/") } +// Add an URL to the prefix map. func (m URLPrefixMap) Add(uri *url.URL) { - m[normalizeUrlPrefix(uri)] = struct{}{} + m[normalizeURLPrefix(uri)] = struct{}{} } +// Contains returns true if the given URL matches the prefix map. func (m URLPrefixMap) Contains(uri *url.URL) bool { s := strings.TrimPrefix(uri.Host, "www.") if _, ok := m[s]; ok { @@ -111,6 +115,8 @@ func (s *regexpIgnoreScope) Check(uri *url.URL, depth int) bool { return true } +// NewRegexpIgnoreScope returns a Scope that filters out URLs +// according to a list of regular expressions. func NewRegexpIgnoreScope(ignores []string) Scope { if ignores == nil { ignores = defaultIgnorePatterns -- cgit v1.2.3-54-g00ecf