diff options
-rw-r--r-- | crawler.go | 28 |
1 files changed, 24 insertions, 4 deletions
@@ -50,6 +50,25 @@ func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) err return nil } +func (db *gobDB) NewPrefixIterator(ro *levigo.ReadOptions, prefix []byte) *gobIterator { + i := db.NewIterator(ro) + i.Seek(prefix) + return &gobIterator{Iterator: i, prefix: prefix} +} + +type gobIterator struct { + *levigo.Iterator + prefix []byte +} + +func (i *gobIterator) Valid() bool { + return i.Iterator.Valid() && bytes.HasPrefix(i.Key(), i.prefix) +} + +func (i *gobIterator) Value(obj interface{}) error { + return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj) +} + type URLInfo struct { URL string StatusCode int @@ -77,7 +96,7 @@ func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Respons return f(db, u, depth, resp, err) } -// UrlDb is the database of crawls (either pending or done). +// UrlDb is the database of crawled URLs (either pending or done). type Crawler struct { db *gobDB seeds []*url.URL @@ -113,6 +132,7 @@ func (c *Crawler) Enqueue(u *url.URL, depth int) { } } + // Protect the read-modify-update below with a mutex. c.enqueueMx.Lock() defer c.enqueueMx.Unlock() @@ -151,10 +171,10 @@ func (c *Crawler) process() <-chan QueuePair { ro := levigo.NewReadOptions() ro.SetSnapshot(s) - iter := c.db.NewIterator(ro) - for iter.Seek(queuePrefix); iter.Valid() && bytes.HasPrefix(iter.Key(), queuePrefix); iter.Next() { + iter := c.db.NewPrefixIterator(ro, queuePrefix) + for ; iter.Valid(); iter.Next() { var p QueuePair - if err := gob.NewDecoder(bytes.NewBuffer(iter.Value())).Decode(&p); err != nil { + if err := iter.Value(&p); err != nil { continue } ch <- p |