aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--crawler.go28
1 files changed, 24 insertions, 4 deletions
diff --git a/crawler.go b/crawler.go
index fad8b9e..37f5b82 100644
--- a/crawler.go
+++ b/crawler.go
@@ -50,6 +50,25 @@ func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) err
return nil
}
+func (db *gobDB) NewPrefixIterator(ro *levigo.ReadOptions, prefix []byte) *gobIterator {
+ i := db.NewIterator(ro)
+ i.Seek(prefix)
+ return &gobIterator{Iterator: i, prefix: prefix}
+}
+
+type gobIterator struct {
+ *levigo.Iterator
+ prefix []byte
+}
+
+func (i *gobIterator) Valid() bool {
+ return i.Iterator.Valid() && bytes.HasPrefix(i.Key(), i.prefix)
+}
+
+func (i *gobIterator) Value(obj interface{}) error {
+ return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj)
+}
+
type URLInfo struct {
URL string
StatusCode int
@@ -77,7 +96,7 @@ func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Respons
return f(db, u, depth, resp, err)
}
-// UrlDb is the database of crawls (either pending or done).
+// UrlDb is the database of crawled URLs (either pending or done).
type Crawler struct {
db *gobDB
seeds []*url.URL
@@ -113,6 +132,7 @@ func (c *Crawler) Enqueue(u *url.URL, depth int) {
}
}
+ // Protect the read-modify-update below with a mutex.
c.enqueueMx.Lock()
defer c.enqueueMx.Unlock()
@@ -151,10 +171,10 @@ func (c *Crawler) process() <-chan QueuePair {
ro := levigo.NewReadOptions()
ro.SetSnapshot(s)
- iter := c.db.NewIterator(ro)
- for iter.Seek(queuePrefix); iter.Valid() && bytes.HasPrefix(iter.Key(), queuePrefix); iter.Next() {
+ iter := c.db.NewPrefixIterator(ro, queuePrefix)
+ for ; iter.Valid(); iter.Next() {
var p QueuePair
- if err := gob.NewDecoder(bytes.NewBuffer(iter.Value())).Decode(&p); err != nil {
+ if err := iter.Value(&p); err != nil {
continue
}
ch <- p