aboutsummaryrefslogtreecommitdiff
path: root/crawler.go
diff options
context:
space:
mode:
authorale <ale@incal.net>2015-06-29 10:07:40 +0100
committerale <ale@incal.net>2015-06-29 10:07:40 +0100
commit9fbc656c6cd2ad610986a265c6b346bc234bb881 (patch)
treea5aa8a44c63b239f194617dd09cfa92cf47495e0 /crawler.go
parent63bd51e06b32d48878da68df8931809d42996df1 (diff)
downloadcrawl-9fbc656c6cd2ad610986a265c6b346bc234bb881.tar.gz
crawl-9fbc656c6cd2ad610986a265c6b346bc234bb881.zip
improve queue code; golint fixes
The queuing code now performs proper lease accounting, and it will not return a URL twice if the page load is slow.
Diffstat (limited to 'crawler.go')
-rw-r--r--crawler.go128
1 files changed, 82 insertions, 46 deletions
diff --git a/crawler.go b/crawler.go
index 52317fb..c337d97 100644
--- a/crawler.go
+++ b/crawler.go
@@ -40,6 +40,15 @@ func (db *gobDB) PutObj(wo *levigo.WriteOptions, key []byte, obj interface{}) er
return db.Put(wo, key, b.Bytes())
}
+func (db *gobDB) PutObjBatch(wb *levigo.WriteBatch, key []byte, obj interface{}) error {
+ var b bytes.Buffer
+ if err := gob.NewEncoder(&b).Encode(obj); err != nil {
+ return err
+ }
+ wb.Put(key, b.Bytes())
+ return nil
+}
+
func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) error {
data, err := db.Get(ro, key)
if err != nil {
@@ -51,25 +60,61 @@ func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) err
return nil
}
-func (db *gobDB) NewPrefixIterator(ro *levigo.ReadOptions, prefix []byte) *gobIterator {
+func (db *gobDB) NewPrefixIterator(ro *levigo.ReadOptions, prefix []byte) *gobPrefixIterator {
i := db.NewIterator(ro)
i.Seek(prefix)
- return &gobIterator{Iterator: i, prefix: prefix}
+ return newGobPrefixIterator(i, prefix)
+}
+
+func (db *gobDB) NewRangeIterator(ro *levigo.ReadOptions, startKey, endKey []byte) *gobRangeIterator {
+ i := db.NewIterator(ro)
+ if startKey != nil {
+ i.Seek(startKey)
+ }
+ return newGobRangeIterator(i, endKey)
}
type gobIterator struct {
*levigo.Iterator
+}
+
+func (i *gobIterator) Value(obj interface{}) error {
+ return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj)
+}
+
+type gobPrefixIterator struct {
+ *gobIterator
prefix []byte
}
-func (i *gobIterator) Valid() bool {
- return i.Iterator.Valid() && bytes.HasPrefix(i.Key(), i.prefix)
+func (i *gobPrefixIterator) Valid() bool {
+ return i.gobIterator.Valid() && bytes.HasPrefix(i.Key(), i.prefix)
}
-func (i *gobIterator) Value(obj interface{}) error {
- return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj)
+func newGobPrefixIterator(i *levigo.Iterator, prefix []byte) *gobPrefixIterator {
+ return &gobPrefixIterator{
+ gobIterator: &gobIterator{i},
+ prefix: prefix,
+ }
}
+type gobRangeIterator struct {
+ *gobIterator
+ endKey []byte
+}
+
+func (i *gobRangeIterator) Valid() bool {
+ return i.gobIterator.Valid() && (i.endKey == nil || bytes.Compare(i.Key(), i.endKey) < 0)
+}
+
+func newGobRangeIterator(i *levigo.Iterator, endKey []byte) *gobRangeIterator {
+ return &gobRangeIterator{
+ gobIterator: &gobIterator{i},
+ endKey: endKey,
+ }
+}
+
+// URLInfo stores information about a crawled URL.
type URLInfo struct {
URL string
StatusCode int
@@ -79,11 +124,14 @@ type URLInfo struct {
// A Fetcher retrieves contents from remote URLs.
type Fetcher interface {
+ // Fetch retrieves a URL and returns the response.
Fetch(string) (*http.Response, error)
}
+// FetcherFunc wraps a simple function into the Fetcher interface.
type FetcherFunc func(string) (*http.Response, error)
+// Fetch retrieves a URL and returns the response.
func (f FetcherFunc) Fetch(u string) (*http.Response, error) {
return f(u)
}
@@ -92,11 +140,14 @@ func (f FetcherFunc) Fetch(u string) (*http.Response, error) {
// implementations of this interface are considered permanent and will
// not cause the URL to be fetched again.
type Handler interface {
+ // Handle the response from a URL.
Handle(*Crawler, string, int, *http.Response, error) error
}
+// HandlerFunc wraps a function into the Handler interface.
type HandlerFunc func(*Crawler, string, int, *http.Response, error) error
+// Handle the response from a URL.
func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Response, err error) error {
return f(db, u, depth, resp, err)
}
@@ -104,6 +155,7 @@ func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Respons
// The Crawler object contains the crawler state.
type Crawler struct {
db *gobDB
+ queue *queue
seeds []*url.URL
scopes []Scope
fetcher Fetcher
@@ -112,12 +164,6 @@ type Crawler struct {
enqueueMx sync.Mutex
}
-type queuePair struct {
- Key []byte
- URL string
- Depth int
-}
-
// Enqueue a (possibly new) URL for processing.
func (c *Crawler) Enqueue(u *url.URL, depth int) {
// Normalize the URL.
@@ -143,47 +189,24 @@ func (c *Crawler) Enqueue(u *url.URL, depth int) {
return
}
- // Create a unique key using the URL and the current timestamp.
- qkey := []byte(fmt.Sprintf("queue/%d/%s", time.Now().Unix(), urlStr))
-
// Store the URL in the queue, and store an empty URLInfo to
// make sure that subsequent calls to Enqueue with the same
// URL will fail.
+ wb := levigo.NewWriteBatch()
+ defer wb.Close()
+ c.queue.Add(wb, urlStr, depth, time.Now())
+ c.db.PutObjBatch(wb, ukey, &info)
wo := levigo.NewWriteOptions()
defer wo.Close()
- c.db.PutObj(wo, qkey, &queuePair{Key: qkey, URL: urlStr, Depth: depth})
- c.db.PutObj(wo, ukey, &info)
+ c.db.Write(wo, wb)
}
// Scan the queue for URLs until there are no more.
func (c *Crawler) process() <-chan queuePair {
ch := make(chan queuePair)
go func() {
- queuePrefix := []byte("queue/")
for range time.Tick(2 * time.Second) {
- n := 0
-
- // Scan the queue using a snapshot, to ignore
- // new URLs that might be added after this.
- s := c.db.NewSnapshot()
- ro := levigo.NewReadOptions()
- ro.SetSnapshot(s)
-
- iter := c.db.NewPrefixIterator(ro, queuePrefix)
- for ; iter.Valid(); iter.Next() {
- var p queuePair
- if err := iter.Value(&p); err != nil {
- continue
- }
- ch <- p
- n++
- }
- iter.Close()
-
- ro.Close()
- c.db.ReleaseSnapshot(s)
-
- if n == 0 {
+ if err := c.queue.Scan(ch); err != nil {
break
}
}
@@ -218,20 +241,27 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) {
// Invoke the handler (even if the fetcher errored out).
info.Error = c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr)
- wo := levigo.NewWriteOptions()
+ wb := levigo.NewWriteBatch()
if httpErr == nil {
respBody.Close()
// Remove the URL from the queue if the fetcher was successful.
- c.db.Delete(wo, p.Key)
+ c.queue.Release(wb, p)
} else {
log.Printf("error retrieving %s: %v", p.URL, httpErr)
+ c.queue.Retry(wb, p, 300*time.Second)
}
- c.db.PutObj(wo, urlkey, &info)
+
+ c.db.PutObjBatch(wb, urlkey, &info)
+
+ wo := levigo.NewWriteOptions()
+ c.db.Write(wo, wb)
wo.Close()
+ wb.Close()
}
}
+// MustParseURLs parses a list of URLs and aborts on failure.
func MustParseURLs(urls []string) []*url.URL {
// Parse the seed URLs.
var parsed []*url.URL
@@ -252,13 +282,19 @@ func NewCrawler(path string, seeds []*url.URL, scopes []Scope, f Fetcher, h Hand
if err != nil {
return nil, err
}
+
c := &Crawler{
db: db,
+ queue: &queue{db: db},
fetcher: f,
handler: h,
seeds: seeds,
scopes: scopes,
}
+
+ // Recover active tasks.
+ c.queue.Recover()
+
return c, nil
}
@@ -294,11 +330,11 @@ func (wrap *redirectHandler) Handle(c *Crawler, u string, depth int, resp *http.
} else if resp.StatusCode > 300 && resp.StatusCode < 400 {
location := resp.Header.Get("Location")
if location != "" {
- locationUrl, err := resp.Request.URL.Parse(location)
+ locationURL, err := resp.Request.URL.Parse(location)
if err != nil {
log.Printf("error parsing Location header: %v", err)
} else {
- c.Enqueue(locationUrl, depth+1)
+ c.Enqueue(locationURL, depth+1)
}
}
} else {