aboutsummaryrefslogtreecommitdiff
path: root/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'queue.go')
-rw-r--r--queue.go7
1 files changed, 4 insertions, 3 deletions
diff --git a/queue.go b/queue.go
index ee0e7ed..cd4143c 100644
--- a/queue.go
+++ b/queue.go
@@ -28,6 +28,7 @@ type queuePair struct {
URL string
Depth int
+ Tag int
}
// Scan the pending queue and send items on 'ch'. Returns an error
@@ -58,10 +59,10 @@ func (q *queue) Scan(ch chan<- queuePair) error {
}
// Add an item to the pending work queue.
-func (q *queue) Add(wb *leveldb.Batch, urlStr string, depth int, when time.Time) error {
+func (q *queue) Add(wb *leveldb.Batch, urlStr string, tag, depth int, when time.Time) error {
t := uint64(when.UnixNano())
qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep)
- return q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth})
+ return q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Tag: tag, Depth: depth})
}
func (q *queue) acquire(qp queuePair) error {
@@ -87,7 +88,7 @@ func (q *queue) Release(wb *leveldb.Batch, qp queuePair) {
// Retry processing this item at a later time.
func (q *queue) Retry(wb *leveldb.Batch, qp queuePair, delay time.Duration) error {
wb.Delete(activeQueueKey(qp.key))
- if err := q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay)); err != nil {
+ if err := q.Add(wb, qp.URL, qp.Tag, qp.Depth, time.Now().Add(delay)); err != nil {
return err
}
atomic.AddInt32(&q.numActive, -1)