From 394de2d98a9cfde6244620f0b188625b60f68f96 Mon Sep 17 00:00:00 2001 From: ale Date: Mon, 18 Dec 2017 22:34:48 +0000 Subject: Switch to github.com/syndtr/goleveldb The native Go implementation of LevelDB. --- crawler.go | 104 +++++++++++++++++-------------------------------------------- queue.go | 48 ++++++++++------------------ 2 files changed, 45 insertions(+), 107 deletions(-) diff --git a/crawler.go b/crawler.go index f2a8968..f1edc2d 100644 --- a/crawler.go +++ b/crawler.go @@ -13,34 +13,29 @@ import ( "time" "github.com/PuerkitoBio/purell" - "github.com/jmhodges/levigo" + "github.com/syndtr/goleveldb/leveldb" + lerr "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/iterator" + lutil "github.com/syndtr/goleveldb/leveldb/util" ) type gobDB struct { - *levigo.DB + *leveldb.DB } func newGobDB(path string) (*gobDB, error) { - opts := levigo.NewOptions() - opts.SetCreateIfMissing(true) - opts.SetCache(levigo.NewLRUCache(2 << 20)) - opts.SetFilterPolicy(levigo.NewBloomFilter(10)) - db, err := levigo.Open(path, opts) + db, err := leveldb.OpenFile(path, nil) + if lerr.IsCorrupted(err) { + log.Printf("corrupted database, recovering...") + db, err = leveldb.RecoverFile(path, nil) + } if err != nil { return nil, err } return &gobDB{db}, nil } -func (db *gobDB) PutObj(wo *levigo.WriteOptions, key []byte, obj interface{}) error { - var b bytes.Buffer - if err := gob.NewEncoder(&b).Encode(obj); err != nil { - return err - } - return db.Put(wo, key, b.Bytes()) -} - -func (db *gobDB) PutObjBatch(wb *levigo.WriteBatch, key []byte, obj interface{}) error { +func (db *gobDB) PutObjBatch(wb *leveldb.Batch, key []byte, obj interface{}) error { var b bytes.Buffer if err := gob.NewEncoder(&b).Encode(obj); err != nil { return err @@ -49,8 +44,8 @@ func (db *gobDB) PutObjBatch(wb *levigo.WriteBatch, key []byte, obj interface{}) return nil } -func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) error { - data, err := db.Get(ro, key) +func (db *gobDB) GetObj(key []byte, obj interface{}) error { + data, err := db.Get(key, nil) if err != nil { return err } @@ -60,58 +55,24 @@ func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) err return nil } -func (db *gobDB) NewPrefixIterator(ro *levigo.ReadOptions, prefix []byte) *gobPrefixIterator { - i := db.NewIterator(ro) - i.Seek(prefix) - return newGobPrefixIterator(i, prefix) +func (db *gobDB) NewPrefixIterator(prefix []byte) *gobIterator { + return newGobIterator(db.NewIterator(lutil.BytesPrefix(prefix), nil)) } -func (db *gobDB) NewRangeIterator(ro *levigo.ReadOptions, startKey, endKey []byte) *gobRangeIterator { - i := db.NewIterator(ro) - if startKey != nil { - i.Seek(startKey) - } - return newGobRangeIterator(i, endKey) +func (db *gobDB) NewRangeIterator(startKey, endKey []byte) *gobIterator { + return newGobIterator(db.NewIterator(&lutil.Range{Start: startKey, Limit: endKey}, nil)) } type gobIterator struct { - *levigo.Iterator -} - -func (i *gobIterator) Value(obj interface{}) error { - return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj) + iterator.Iterator } -type gobPrefixIterator struct { - *gobIterator - prefix []byte +func newGobIterator(i iterator.Iterator) *gobIterator { + return &gobIterator{i} } -func (i *gobPrefixIterator) Valid() bool { - return i.gobIterator.Valid() && bytes.HasPrefix(i.Key(), i.prefix) -} - -func newGobPrefixIterator(i *levigo.Iterator, prefix []byte) *gobPrefixIterator { - return &gobPrefixIterator{ - gobIterator: &gobIterator{i}, - prefix: prefix, - } -} - -type gobRangeIterator struct { - *gobIterator - endKey []byte -} - -func (i *gobRangeIterator) Valid() bool { - return i.gobIterator.Valid() && (i.endKey == nil || bytes.Compare(i.Key(), i.endKey) < 0) -} - -func newGobRangeIterator(i *levigo.Iterator, endKey []byte) *gobRangeIterator { - return &gobRangeIterator{ - gobIterator: &gobIterator{i}, - endKey: endKey, - } +func (i *gobIterator) Value(obj interface{}) error { + return gob.NewDecoder(bytes.NewBuffer(i.Iterator.Value())).Decode(obj) } // URLInfo stores information about a crawled URL. @@ -182,23 +143,18 @@ func (c *Crawler) Enqueue(u *url.URL, depth int) { // Check if we've already seen it. var info URLInfo - ro := levigo.NewReadOptions() - defer ro.Close() ukey := []byte(fmt.Sprintf("url/%s", urlStr)) - if err := c.db.GetObj(ro, ukey, &info); err == nil { + if err := c.db.GetObj(ukey, &info); err == nil { return } // Store the URL in the queue, and store an empty URLInfo to // make sure that subsequent calls to Enqueue with the same // URL will fail. - wb := levigo.NewWriteBatch() - defer wb.Close() + wb := new(leveldb.Batch) c.queue.Add(wb, urlStr, depth, time.Now()) c.db.PutObjBatch(wb, ukey, &info) - wo := levigo.NewWriteOptions() - defer wo.Close() - c.db.Write(wo, wb) + c.db.Write(wb, nil) } // Scan the queue for URLs until there are no more. @@ -222,8 +178,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) { // Ignore errors, we can work with an empty object. urlkey := []byte(fmt.Sprintf("url/%s", p.URL)) var info URLInfo - ro := levigo.NewReadOptions() - c.db.GetObj(ro, urlkey, &info) + c.db.GetObj(urlkey, &info) info.CrawledAt = time.Now() info.URL = p.URL @@ -241,7 +196,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) { // Invoke the handler (even if the fetcher errored out). info.Error = c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr) - wb := levigo.NewWriteBatch() + wb := new(leveldb.Batch) if httpErr == nil { respBody.Close() @@ -254,10 +209,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) { c.db.PutObjBatch(wb, urlkey, &info) - wo := levigo.NewWriteOptions() - c.db.Write(wo, wb) - wo.Close() - wb.Close() + c.db.Write(wb, nil) } } diff --git a/queue.go b/queue.go index 5bad577..7621226 100644 --- a/queue.go +++ b/queue.go @@ -8,7 +8,7 @@ import ( "sync/atomic" "time" - "github.com/jmhodges/levigo" + "github.com/syndtr/goleveldb/leveldb" ) type queue struct { @@ -34,18 +34,12 @@ type queuePair struct { // Scan the pending queue and send items on 'ch'. Returns an error // when the queue is empty (work is done). func (q *queue) Scan(ch chan<- queuePair) error { - snap := q.db.NewSnapshot() - defer q.db.ReleaseSnapshot(snap) - - ro := levigo.NewReadOptions() - ro.SetSnapshot(snap) - defer ro.Close() - n := 0 startKey, endKey := queueScanRange() - iter := q.db.NewRangeIterator(ro, startKey, endKey) + iter := q.db.NewRangeIterator(startKey, endKey) + defer iter.Release() - for ; iter.Valid(); iter.Next() { + for iter.Next() { var p queuePair if err := iter.Value(&p); err != nil { continue @@ -63,34 +57,31 @@ func (q *queue) Scan(ch chan<- queuePair) error { } // Add an item to the pending work queue. -func (q *queue) Add(wb *levigo.WriteBatch, urlStr string, depth int, when time.Time) { +func (q *queue) Add(wb *leveldb.Batch, urlStr string, depth int, when time.Time) { t := uint64(when.UnixNano()) qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep) q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth}) } func (q *queue) acquire(qp queuePair) { - wb := levigo.NewWriteBatch() - defer wb.Close() + wb := new(leveldb.Batch) q.db.PutObjBatch(wb, activeQueueKey(qp.key), qp) wb.Delete(qp.key) - wo := levigo.NewWriteOptions() - defer wo.Close() - q.db.Write(wo, wb) + q.db.Write(wb, nil) atomic.AddInt32(&q.numActive, 1) } // Release an item from the queue. Processing for this item is done. -func (q *queue) Release(wb *levigo.WriteBatch, qp queuePair) { +func (q *queue) Release(wb *leveldb.Batch, qp queuePair) { wb.Delete(activeQueueKey(qp.key)) atomic.AddInt32(&q.numActive, -1) } // Retry processing this item at a later time. -func (q *queue) Retry(wb *levigo.WriteBatch, qp queuePair, delay time.Duration) { +func (q *queue) Retry(wb *leveldb.Batch, qp queuePair, delay time.Duration) { wb.Delete(activeQueueKey(qp.key)) q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay)) atomic.AddInt32(&q.numActive, -1) @@ -100,15 +91,12 @@ func (q *queue) Retry(wb *levigo.WriteBatch, qp queuePair, delay time.Duration) // called at startup to recover tasks that were active when the // previous run terminated. func (q *queue) Recover() { - wb := levigo.NewWriteBatch() - defer wb.Close() - - ro := levigo.NewReadOptions() - defer ro.Close() + wb := new(leveldb.Batch) prefix := bytes.Join([][]byte{activePrefix, []byte{}}, queueKeySep) - iter := q.db.NewPrefixIterator(ro, prefix) - for ; iter.Valid(); iter.Next() { + iter := q.db.NewPrefixIterator(prefix) + defer iter.Release() + for iter.Next() { var p queuePair if err := iter.Value(&p); err != nil { continue @@ -118,15 +106,13 @@ func (q *queue) Recover() { wb.Delete(iter.Key()) } - wo := levigo.NewWriteOptions() - defer wo.Close() - q.db.Write(wo, wb) + q.db.Write(wb, nil) } func encodeUint64(n uint64) []byte { - var b bytes.Buffer - binary.Write(&b, binary.BigEndian, n) - return b.Bytes() + var b [8]byte + binary.BigEndian.PutUint64(b[:], n) + return b[:] } func activeQueueKey(key []byte) []byte { -- cgit v1.2.3-54-g00ecf