aboutsummaryrefslogtreecommitdiff
path: root/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'queue.go')
-rw-r--r--queue.go48
1 files changed, 17 insertions, 31 deletions
diff --git a/queue.go b/queue.go
index 5bad577..7621226 100644
--- a/queue.go
+++ b/queue.go
@@ -8,7 +8,7 @@ import (
"sync/atomic"
"time"
- "github.com/jmhodges/levigo"
+ "github.com/syndtr/goleveldb/leveldb"
)
type queue struct {
@@ -34,18 +34,12 @@ type queuePair struct {
// Scan the pending queue and send items on 'ch'. Returns an error
// when the queue is empty (work is done).
func (q *queue) Scan(ch chan<- queuePair) error {
- snap := q.db.NewSnapshot()
- defer q.db.ReleaseSnapshot(snap)
-
- ro := levigo.NewReadOptions()
- ro.SetSnapshot(snap)
- defer ro.Close()
-
n := 0
startKey, endKey := queueScanRange()
- iter := q.db.NewRangeIterator(ro, startKey, endKey)
+ iter := q.db.NewRangeIterator(startKey, endKey)
+ defer iter.Release()
- for ; iter.Valid(); iter.Next() {
+ for iter.Next() {
var p queuePair
if err := iter.Value(&p); err != nil {
continue
@@ -63,34 +57,31 @@ func (q *queue) Scan(ch chan<- queuePair) error {
}
// Add an item to the pending work queue.
-func (q *queue) Add(wb *levigo.WriteBatch, urlStr string, depth int, when time.Time) {
+func (q *queue) Add(wb *leveldb.Batch, urlStr string, depth int, when time.Time) {
t := uint64(when.UnixNano())
qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep)
q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth})
}
func (q *queue) acquire(qp queuePair) {
- wb := levigo.NewWriteBatch()
- defer wb.Close()
+ wb := new(leveldb.Batch)
q.db.PutObjBatch(wb, activeQueueKey(qp.key), qp)
wb.Delete(qp.key)
- wo := levigo.NewWriteOptions()
- defer wo.Close()
- q.db.Write(wo, wb)
+ q.db.Write(wb, nil)
atomic.AddInt32(&q.numActive, 1)
}
// Release an item from the queue. Processing for this item is done.
-func (q *queue) Release(wb *levigo.WriteBatch, qp queuePair) {
+func (q *queue) Release(wb *leveldb.Batch, qp queuePair) {
wb.Delete(activeQueueKey(qp.key))
atomic.AddInt32(&q.numActive, -1)
}
// Retry processing this item at a later time.
-func (q *queue) Retry(wb *levigo.WriteBatch, qp queuePair, delay time.Duration) {
+func (q *queue) Retry(wb *leveldb.Batch, qp queuePair, delay time.Duration) {
wb.Delete(activeQueueKey(qp.key))
q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay))
atomic.AddInt32(&q.numActive, -1)
@@ -100,15 +91,12 @@ func (q *queue) Retry(wb *levigo.WriteBatch, qp queuePair, delay time.Duration)
// called at startup to recover tasks that were active when the
// previous run terminated.
func (q *queue) Recover() {
- wb := levigo.NewWriteBatch()
- defer wb.Close()
-
- ro := levigo.NewReadOptions()
- defer ro.Close()
+ wb := new(leveldb.Batch)
prefix := bytes.Join([][]byte{activePrefix, []byte{}}, queueKeySep)
- iter := q.db.NewPrefixIterator(ro, prefix)
- for ; iter.Valid(); iter.Next() {
+ iter := q.db.NewPrefixIterator(prefix)
+ defer iter.Release()
+ for iter.Next() {
var p queuePair
if err := iter.Value(&p); err != nil {
continue
@@ -118,15 +106,13 @@ func (q *queue) Recover() {
wb.Delete(iter.Key())
}
- wo := levigo.NewWriteOptions()
- defer wo.Close()
- q.db.Write(wo, wb)
+ q.db.Write(wb, nil)
}
func encodeUint64(n uint64) []byte {
- var b bytes.Buffer
- binary.Write(&b, binary.BigEndian, n)
- return b.Bytes()
+ var b [8]byte
+ binary.BigEndian.PutUint64(b[:], n)
+ return b[:]
}
func activeQueueKey(key []byte) []byte {