diff options
Diffstat (limited to 'queue.go')
-rw-r--r-- | queue.go | 7 |
1 files changed, 4 insertions, 3 deletions
@@ -28,6 +28,7 @@ type queuePair struct { URL string Depth int + Tag int } // Scan the pending queue and send items on 'ch'. Returns an error @@ -58,10 +59,10 @@ func (q *queue) Scan(ch chan<- queuePair) error { } // Add an item to the pending work queue. -func (q *queue) Add(wb *leveldb.Batch, urlStr string, depth int, when time.Time) error { +func (q *queue) Add(wb *leveldb.Batch, urlStr string, tag, depth int, when time.Time) error { t := uint64(when.UnixNano()) qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep) - return q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth}) + return q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Tag: tag, Depth: depth}) } func (q *queue) acquire(qp queuePair) error { @@ -87,7 +88,7 @@ func (q *queue) Release(wb *leveldb.Batch, qp queuePair) { // Retry processing this item at a later time. func (q *queue) Retry(wb *leveldb.Batch, qp queuePair, delay time.Duration) error { wb.Delete(activeQueueKey(qp.key)) - if err := q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay)); err != nil { + if err := q.Add(wb, qp.URL, qp.Tag, qp.Depth, time.Now().Add(delay)); err != nil { return err } atomic.AddInt32(&q.numActive, -1) |