aboutsummaryrefslogtreecommitdiff
path: root/queue.go
diff options
context:
space:
mode:
authorale <ale@incal.net>2015-06-29 10:07:40 +0100
committerale <ale@incal.net>2015-06-29 10:07:40 +0100
commit9fbc656c6cd2ad610986a265c6b346bc234bb881 (patch)
treea5aa8a44c63b239f194617dd09cfa92cf47495e0 /queue.go
parent63bd51e06b32d48878da68df8931809d42996df1 (diff)
downloadcrawl-9fbc656c6cd2ad610986a265c6b346bc234bb881.tar.gz
crawl-9fbc656c6cd2ad610986a265c6b346bc234bb881.zip
improve queue code; golint fixes
The queuing code now performs proper lease accounting, and it will not return a URL twice if the page load is slow.
Diffstat (limited to 'queue.go')
-rw-r--r--queue.go141
1 files changed, 141 insertions, 0 deletions
diff --git a/queue.go b/queue.go
new file mode 100644
index 0000000..5bad577
--- /dev/null
+++ b/queue.go
@@ -0,0 +1,141 @@
+package crawl
+
+import (
+ "bytes"
+ "encoding/binary"
+ "errors"
+ "math/rand"
+ "sync/atomic"
+ "time"
+
+ "github.com/jmhodges/levigo"
+)
+
+type queue struct {
+ db *gobDB
+ numActive int32
+}
+
+var (
+ queuePrefix = []byte("queue")
+ activePrefix = []byte("queue_active")
+
+ queueKeySep = []byte{'/'}
+ queueKeySepP1 = []byte{'/' + 1}
+)
+
+type queuePair struct {
+ key []byte
+
+ URL string
+ Depth int
+}
+
+// Scan the pending queue and send items on 'ch'. Returns an error
+// when the queue is empty (work is done).
+func (q *queue) Scan(ch chan<- queuePair) error {
+ snap := q.db.NewSnapshot()
+ defer q.db.ReleaseSnapshot(snap)
+
+ ro := levigo.NewReadOptions()
+ ro.SetSnapshot(snap)
+ defer ro.Close()
+
+ n := 0
+ startKey, endKey := queueScanRange()
+ iter := q.db.NewRangeIterator(ro, startKey, endKey)
+
+ for ; iter.Valid(); iter.Next() {
+ var p queuePair
+ if err := iter.Value(&p); err != nil {
+ continue
+ }
+ p.key = iter.Key()
+ q.acquire(p)
+ ch <- p
+ n++
+ }
+
+ if n == 0 && q.numActive == 0 {
+ return errors.New("EOF")
+ }
+ return nil
+}
+
+// Add an item to the pending work queue.
+func (q *queue) Add(wb *levigo.WriteBatch, urlStr string, depth int, when time.Time) {
+ t := uint64(when.UnixNano())
+ qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep)
+ q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth})
+}
+
+func (q *queue) acquire(qp queuePair) {
+ wb := levigo.NewWriteBatch()
+ defer wb.Close()
+
+ q.db.PutObjBatch(wb, activeQueueKey(qp.key), qp)
+ wb.Delete(qp.key)
+
+ wo := levigo.NewWriteOptions()
+ defer wo.Close()
+ q.db.Write(wo, wb)
+
+ atomic.AddInt32(&q.numActive, 1)
+}
+
+// Release an item from the queue. Processing for this item is done.
+func (q *queue) Release(wb *levigo.WriteBatch, qp queuePair) {
+ wb.Delete(activeQueueKey(qp.key))
+ atomic.AddInt32(&q.numActive, -1)
+}
+
+// Retry processing this item at a later time.
+func (q *queue) Retry(wb *levigo.WriteBatch, qp queuePair, delay time.Duration) {
+ wb.Delete(activeQueueKey(qp.key))
+ q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay))
+ atomic.AddInt32(&q.numActive, -1)
+}
+
+// Recover moves all active tasks to the pending queue. To be
+// called at startup to recover tasks that were active when the
+// previous run terminated.
+func (q *queue) Recover() {
+ wb := levigo.NewWriteBatch()
+ defer wb.Close()
+
+ ro := levigo.NewReadOptions()
+ defer ro.Close()
+
+ prefix := bytes.Join([][]byte{activePrefix, []byte{}}, queueKeySep)
+ iter := q.db.NewPrefixIterator(ro, prefix)
+ for ; iter.Valid(); iter.Next() {
+ var p queuePair
+ if err := iter.Value(&p); err != nil {
+ continue
+ }
+ p.key = iter.Key()[len(activePrefix)+1:]
+ q.db.PutObjBatch(wb, p.key, &p)
+ wb.Delete(iter.Key())
+ }
+
+ wo := levigo.NewWriteOptions()
+ defer wo.Close()
+ q.db.Write(wo, wb)
+}
+
+func encodeUint64(n uint64) []byte {
+ var b bytes.Buffer
+ binary.Write(&b, binary.BigEndian, n)
+ return b.Bytes()
+}
+
+func activeQueueKey(key []byte) []byte {
+ return bytes.Join([][]byte{activePrefix, key}, queueKeySep)
+}
+
+func queueScanRange() ([]byte, []byte) {
+ tlim := uint64(time.Now().UnixNano() + 1)
+ startKey := bytes.Join([][]byte{queuePrefix, []byte{}}, queueKeySep)
+ endKey := bytes.Join([][]byte{queuePrefix, encodeUint64(tlim)}, queueKeySep)
+ return startKey, endKey
+}