aboutsummaryrefslogtreecommitdiff
path: root/queue.go
diff options
context:
space:
mode:
authorale <ale@incal.net>2018-08-31 08:29:14 +0100
committerale <ale@incal.net>2018-08-31 08:29:14 +0100
commitee1a3d8e5278a4a4e8435f9129852b95a9c22afb (patch)
treefd7a42cfff4aed5bd2379feb35f7172287430ba2 /queue.go
parentb3d419486a87c9193c2fd6c16168f600876e0f73 (diff)
downloadcrawl-ee1a3d8e5278a4a4e8435f9129852b95a9c22afb.tar.gz
crawl-ee1a3d8e5278a4a4e8435f9129852b95a9c22afb.zip
Improve error checking
Detect write errors (both on the database and to the WARC output) and abort with an error message. Also fix a bunch of harmless lint warnings.
Diffstat (limited to 'queue.go')
-rw-r--r--queue.go37
1 files changed, 24 insertions, 13 deletions
diff --git a/queue.go b/queue.go
index da4a7b8..ee0e7ed 100644
--- a/queue.go
+++ b/queue.go
@@ -20,8 +20,7 @@ var (
queuePrefix = []byte("queue")
activePrefix = []byte("queue_active")
- queueKeySep = []byte{'/'}
- queueKeySepP1 = []byte{'/' + 1}
+ queueKeySep = []byte{'/'}
)
type queuePair struct {
@@ -45,7 +44,9 @@ func (q *queue) Scan(ch chan<- queuePair) error {
continue
}
p.key = iter.Key()
- q.acquire(p)
+ if err := q.acquire(p); err != nil {
+ return err
+ }
ch <- p
n++
}
@@ -57,19 +58,24 @@ func (q *queue) Scan(ch chan<- queuePair) error {
}
// Add an item to the pending work queue.
-func (q *queue) Add(wb *leveldb.Batch, urlStr string, depth int, when time.Time) {
+func (q *queue) Add(wb *leveldb.Batch, urlStr string, depth int, when time.Time) error {
t := uint64(when.UnixNano())
qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep)
- q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth})
+ return q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth})
}
-func (q *queue) acquire(qp queuePair) {
+func (q *queue) acquire(qp queuePair) error {
wb := new(leveldb.Batch)
- q.db.PutObjBatch(wb, activeQueueKey(qp.key), qp)
+ if err := q.db.PutObjBatch(wb, activeQueueKey(qp.key), qp); err != nil {
+ return err
+ }
wb.Delete(qp.key)
- q.db.Write(wb, nil)
+ if err := q.db.Write(wb, nil); err != nil {
+ return err
+ }
atomic.AddInt32(&q.numActive, 1)
+ return nil
}
// Release an item from the queue. Processing for this item is done.
@@ -79,16 +85,19 @@ func (q *queue) Release(wb *leveldb.Batch, qp queuePair) {
}
// Retry processing this item at a later time.
-func (q *queue) Retry(wb *leveldb.Batch, qp queuePair, delay time.Duration) {
+func (q *queue) Retry(wb *leveldb.Batch, qp queuePair, delay time.Duration) error {
wb.Delete(activeQueueKey(qp.key))
- q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay))
+ if err := q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay)); err != nil {
+ return err
+ }
atomic.AddInt32(&q.numActive, -1)
+ return nil
}
// Recover moves all active tasks to the pending queue. To be
// called at startup to recover tasks that were active when the
// previous run terminated.
-func (q *queue) Recover() {
+func (q *queue) Recover() error {
wb := new(leveldb.Batch)
prefix := bytes.Join([][]byte{activePrefix, []byte{}}, queueKeySep)
@@ -100,11 +109,13 @@ func (q *queue) Recover() {
continue
}
p.key = iter.Key()[len(activePrefix)+1:]
- q.db.PutObjBatch(wb, p.key, &p)
+ if err := q.db.PutObjBatch(wb, p.key, &p); err != nil {
+ return err
+ }
wb.Delete(iter.Key())
}
- q.db.Write(wb, nil)
+ return q.db.Write(wb, nil)
}
func encodeUint64(n uint64) []byte {