From ee1a3d8e5278a4a4e8435f9129852b95a9c22afb Mon Sep 17 00:00:00 2001 From: ale Date: Fri, 31 Aug 2018 08:29:14 +0100 Subject: Improve error checking Detect write errors (both on the database and to the WARC output) and abort with an error message. Also fix a bunch of harmless lint warnings. --- queue.go | 37 ++++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) (limited to 'queue.go') diff --git a/queue.go b/queue.go index da4a7b8..ee0e7ed 100644 --- a/queue.go +++ b/queue.go @@ -20,8 +20,7 @@ var ( queuePrefix = []byte("queue") activePrefix = []byte("queue_active") - queueKeySep = []byte{'/'} - queueKeySepP1 = []byte{'/' + 1} + queueKeySep = []byte{'/'} ) type queuePair struct { @@ -45,7 +44,9 @@ func (q *queue) Scan(ch chan<- queuePair) error { continue } p.key = iter.Key() - q.acquire(p) + if err := q.acquire(p); err != nil { + return err + } ch <- p n++ } @@ -57,19 +58,24 @@ func (q *queue) Scan(ch chan<- queuePair) error { } // Add an item to the pending work queue. -func (q *queue) Add(wb *leveldb.Batch, urlStr string, depth int, when time.Time) { +func (q *queue) Add(wb *leveldb.Batch, urlStr string, depth int, when time.Time) error { t := uint64(when.UnixNano()) qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep) - q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth}) + return q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth}) } -func (q *queue) acquire(qp queuePair) { +func (q *queue) acquire(qp queuePair) error { wb := new(leveldb.Batch) - q.db.PutObjBatch(wb, activeQueueKey(qp.key), qp) + if err := q.db.PutObjBatch(wb, activeQueueKey(qp.key), qp); err != nil { + return err + } wb.Delete(qp.key) - q.db.Write(wb, nil) + if err := q.db.Write(wb, nil); err != nil { + return err + } atomic.AddInt32(&q.numActive, 1) + return nil } // Release an item from the queue. Processing for this item is done. @@ -79,16 +85,19 @@ func (q *queue) Release(wb *leveldb.Batch, qp queuePair) { } // Retry processing this item at a later time. -func (q *queue) Retry(wb *leveldb.Batch, qp queuePair, delay time.Duration) { +func (q *queue) Retry(wb *leveldb.Batch, qp queuePair, delay time.Duration) error { wb.Delete(activeQueueKey(qp.key)) - q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay)) + if err := q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay)); err != nil { + return err + } atomic.AddInt32(&q.numActive, -1) + return nil } // Recover moves all active tasks to the pending queue. To be // called at startup to recover tasks that were active when the // previous run terminated. -func (q *queue) Recover() { +func (q *queue) Recover() error { wb := new(leveldb.Batch) prefix := bytes.Join([][]byte{activePrefix, []byte{}}, queueKeySep) @@ -100,11 +109,13 @@ func (q *queue) Recover() { continue } p.key = iter.Key()[len(activePrefix)+1:] - q.db.PutObjBatch(wb, p.key, &p) + if err := q.db.PutObjBatch(wb, p.key, &p); err != nil { + return err + } wb.Delete(iter.Key()) } - q.db.Write(wb, nil) + return q.db.Write(wb, nil) } func encodeUint64(n uint64) []byte { -- cgit v1.2.3-54-g00ecf