diff options
Diffstat (limited to 'queue.go')
-rw-r--r-- | queue.go | 48 |
1 files changed, 17 insertions, 31 deletions
@@ -8,7 +8,7 @@ import ( "sync/atomic" "time" - "github.com/jmhodges/levigo" + "github.com/syndtr/goleveldb/leveldb" ) type queue struct { @@ -34,18 +34,12 @@ type queuePair struct { // Scan the pending queue and send items on 'ch'. Returns an error // when the queue is empty (work is done). func (q *queue) Scan(ch chan<- queuePair) error { - snap := q.db.NewSnapshot() - defer q.db.ReleaseSnapshot(snap) - - ro := levigo.NewReadOptions() - ro.SetSnapshot(snap) - defer ro.Close() - n := 0 startKey, endKey := queueScanRange() - iter := q.db.NewRangeIterator(ro, startKey, endKey) + iter := q.db.NewRangeIterator(startKey, endKey) + defer iter.Release() - for ; iter.Valid(); iter.Next() { + for iter.Next() { var p queuePair if err := iter.Value(&p); err != nil { continue @@ -63,34 +57,31 @@ func (q *queue) Scan(ch chan<- queuePair) error { } // Add an item to the pending work queue. -func (q *queue) Add(wb *levigo.WriteBatch, urlStr string, depth int, when time.Time) { +func (q *queue) Add(wb *leveldb.Batch, urlStr string, depth int, when time.Time) { t := uint64(when.UnixNano()) qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep) q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth}) } func (q *queue) acquire(qp queuePair) { - wb := levigo.NewWriteBatch() - defer wb.Close() + wb := new(leveldb.Batch) q.db.PutObjBatch(wb, activeQueueKey(qp.key), qp) wb.Delete(qp.key) - wo := levigo.NewWriteOptions() - defer wo.Close() - q.db.Write(wo, wb) + q.db.Write(wb, nil) atomic.AddInt32(&q.numActive, 1) } // Release an item from the queue. Processing for this item is done. -func (q *queue) Release(wb *levigo.WriteBatch, qp queuePair) { +func (q *queue) Release(wb *leveldb.Batch, qp queuePair) { wb.Delete(activeQueueKey(qp.key)) atomic.AddInt32(&q.numActive, -1) } // Retry processing this item at a later time. -func (q *queue) Retry(wb *levigo.WriteBatch, qp queuePair, delay time.Duration) { +func (q *queue) Retry(wb *leveldb.Batch, qp queuePair, delay time.Duration) { wb.Delete(activeQueueKey(qp.key)) q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay)) atomic.AddInt32(&q.numActive, -1) @@ -100,15 +91,12 @@ func (q *queue) Retry(wb *levigo.WriteBatch, qp queuePair, delay time.Duration) // called at startup to recover tasks that were active when the // previous run terminated. func (q *queue) Recover() { - wb := levigo.NewWriteBatch() - defer wb.Close() - - ro := levigo.NewReadOptions() - defer ro.Close() + wb := new(leveldb.Batch) prefix := bytes.Join([][]byte{activePrefix, []byte{}}, queueKeySep) - iter := q.db.NewPrefixIterator(ro, prefix) - for ; iter.Valid(); iter.Next() { + iter := q.db.NewPrefixIterator(prefix) + defer iter.Release() + for iter.Next() { var p queuePair if err := iter.Value(&p); err != nil { continue @@ -118,15 +106,13 @@ func (q *queue) Recover() { wb.Delete(iter.Key()) } - wo := levigo.NewWriteOptions() - defer wo.Close() - q.db.Write(wo, wb) + q.db.Write(wb, nil) } func encodeUint64(n uint64) []byte { - var b bytes.Buffer - binary.Write(&b, binary.BigEndian, n) - return b.Bytes() + var b [8]byte + binary.BigEndian.PutUint64(b[:], n) + return b[:] } func activeQueueKey(key []byte) []byte { |