aboutsummaryrefslogtreecommitdiff
path: root/crawler.go
diff options
context:
space:
mode:
authorale <ale@incal.net>2017-12-18 22:34:48 +0000
committerale <ale@incal.net>2017-12-18 22:34:48 +0000
commit394de2d98a9cfde6244620f0b188625b60f68f96 (patch)
tree388199d2f08e28a38473b9de42979dabafd797e7 /crawler.go
parent4fc0b1d2b5f9143a3067ef31f3558774a3c6d68f (diff)
downloadcrawl-394de2d98a9cfde6244620f0b188625b60f68f96.tar.gz
crawl-394de2d98a9cfde6244620f0b188625b60f68f96.zip
Switch to github.com/syndtr/goleveldb
The native Go implementation of LevelDB.
Diffstat (limited to 'crawler.go')
-rw-r--r--crawler.go104
1 files changed, 28 insertions, 76 deletions
diff --git a/crawler.go b/crawler.go
index f2a8968..f1edc2d 100644
--- a/crawler.go
+++ b/crawler.go
@@ -13,34 +13,29 @@ import (
"time"
"github.com/PuerkitoBio/purell"
- "github.com/jmhodges/levigo"
+ "github.com/syndtr/goleveldb/leveldb"
+ lerr "github.com/syndtr/goleveldb/leveldb/errors"
+ "github.com/syndtr/goleveldb/leveldb/iterator"
+ lutil "github.com/syndtr/goleveldb/leveldb/util"
)
type gobDB struct {
- *levigo.DB
+ *leveldb.DB
}
func newGobDB(path string) (*gobDB, error) {
- opts := levigo.NewOptions()
- opts.SetCreateIfMissing(true)
- opts.SetCache(levigo.NewLRUCache(2 << 20))
- opts.SetFilterPolicy(levigo.NewBloomFilter(10))
- db, err := levigo.Open(path, opts)
+ db, err := leveldb.OpenFile(path, nil)
+ if lerr.IsCorrupted(err) {
+ log.Printf("corrupted database, recovering...")
+ db, err = leveldb.RecoverFile(path, nil)
+ }
if err != nil {
return nil, err
}
return &gobDB{db}, nil
}
-func (db *gobDB) PutObj(wo *levigo.WriteOptions, key []byte, obj interface{}) error {
- var b bytes.Buffer
- if err := gob.NewEncoder(&b).Encode(obj); err != nil {
- return err
- }
- return db.Put(wo, key, b.Bytes())
-}
-
-func (db *gobDB) PutObjBatch(wb *levigo.WriteBatch, key []byte, obj interface{}) error {
+func (db *gobDB) PutObjBatch(wb *leveldb.Batch, key []byte, obj interface{}) error {
var b bytes.Buffer
if err := gob.NewEncoder(&b).Encode(obj); err != nil {
return err
@@ -49,8 +44,8 @@ func (db *gobDB) PutObjBatch(wb *levigo.WriteBatch, key []byte, obj interface{})
return nil
}
-func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) error {
- data, err := db.Get(ro, key)
+func (db *gobDB) GetObj(key []byte, obj interface{}) error {
+ data, err := db.Get(key, nil)
if err != nil {
return err
}
@@ -60,58 +55,24 @@ func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) err
return nil
}
-func (db *gobDB) NewPrefixIterator(ro *levigo.ReadOptions, prefix []byte) *gobPrefixIterator {
- i := db.NewIterator(ro)
- i.Seek(prefix)
- return newGobPrefixIterator(i, prefix)
+func (db *gobDB) NewPrefixIterator(prefix []byte) *gobIterator {
+ return newGobIterator(db.NewIterator(lutil.BytesPrefix(prefix), nil))
}
-func (db *gobDB) NewRangeIterator(ro *levigo.ReadOptions, startKey, endKey []byte) *gobRangeIterator {
- i := db.NewIterator(ro)
- if startKey != nil {
- i.Seek(startKey)
- }
- return newGobRangeIterator(i, endKey)
+func (db *gobDB) NewRangeIterator(startKey, endKey []byte) *gobIterator {
+ return newGobIterator(db.NewIterator(&lutil.Range{Start: startKey, Limit: endKey}, nil))
}
type gobIterator struct {
- *levigo.Iterator
-}
-
-func (i *gobIterator) Value(obj interface{}) error {
- return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj)
+ iterator.Iterator
}
-type gobPrefixIterator struct {
- *gobIterator
- prefix []byte
+func newGobIterator(i iterator.Iterator) *gobIterator {
+ return &gobIterator{i}
}
-func (i *gobPrefixIterator) Valid() bool {
- return i.gobIterator.Valid() && bytes.HasPrefix(i.Key(), i.prefix)
-}
-
-func newGobPrefixIterator(i *levigo.Iterator, prefix []byte) *gobPrefixIterator {
- return &gobPrefixIterator{
- gobIterator: &gobIterator{i},
- prefix: prefix,
- }
-}
-
-type gobRangeIterator struct {
- *gobIterator
- endKey []byte
-}
-
-func (i *gobRangeIterator) Valid() bool {
- return i.gobIterator.Valid() && (i.endKey == nil || bytes.Compare(i.Key(), i.endKey) < 0)
-}
-
-func newGobRangeIterator(i *levigo.Iterator, endKey []byte) *gobRangeIterator {
- return &gobRangeIterator{
- gobIterator: &gobIterator{i},
- endKey: endKey,
- }
+func (i *gobIterator) Value(obj interface{}) error {
+ return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj)
}
// URLInfo stores information about a crawled URL.
@@ -182,23 +143,18 @@ func (c *Crawler) Enqueue(u *url.URL, depth int) {
// Check if we've already seen it.
var info URLInfo
- ro := levigo.NewReadOptions()
- defer ro.Close()
ukey := []byte(fmt.Sprintf("url/%s", urlStr))
- if err := c.db.GetObj(ro, ukey, &info); err == nil {
+ if err := c.db.GetObj(ukey, &info); err == nil {
return
}
// Store the URL in the queue, and store an empty URLInfo to
// make sure that subsequent calls to Enqueue with the same
// URL will fail.
- wb := levigo.NewWriteBatch()
- defer wb.Close()
+ wb := new(leveldb.Batch)
c.queue.Add(wb, urlStr, depth, time.Now())
c.db.PutObjBatch(wb, ukey, &info)
- wo := levigo.NewWriteOptions()
- defer wo.Close()
- c.db.Write(wo, wb)
+ c.db.Write(wb, nil)
}
// Scan the queue for URLs until there are no more.
@@ -222,8 +178,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) {
// Ignore errors, we can work with an empty object.
urlkey := []byte(fmt.Sprintf("url/%s", p.URL))
var info URLInfo
- ro := levigo.NewReadOptions()
- c.db.GetObj(ro, urlkey, &info)
+ c.db.GetObj(urlkey, &info)
info.CrawledAt = time.Now()
info.URL = p.URL
@@ -241,7 +196,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) {
// Invoke the handler (even if the fetcher errored out).
info.Error = c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr)
- wb := levigo.NewWriteBatch()
+ wb := new(leveldb.Batch)
if httpErr == nil {
respBody.Close()
@@ -254,10 +209,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) {
c.db.PutObjBatch(wb, urlkey, &info)
- wo := levigo.NewWriteOptions()
- c.db.Write(wo, wb)
- wo.Close()
- wb.Close()
+ c.db.Write(wb, nil)
}
}