aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go')
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go40
1 files changed, 37 insertions, 3 deletions
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go
index b6563e8..28e5090 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go
@@ -640,6 +640,16 @@ func (db *DB) tableNeedCompaction() bool {
return v.needCompaction()
}
+// resumeWrite returns an indicator whether we should resume write operation if enough level0 files are compacted.
+func (db *DB) resumeWrite() bool {
+ v := db.s.version()
+ defer v.release()
+ if v.tLen(0) < db.s.o.GetWriteL0PauseTrigger() {
+ return true
+ }
+ return false
+}
+
func (db *DB) pauseCompaction(ch chan<- struct{}) {
select {
case ch <- struct{}{}:
@@ -653,6 +663,7 @@ type cCmd interface {
}
type cAuto struct {
+ // Note for table compaction, an empty ackC represents it's a compaction waiting command.
ackC chan<- error
}
@@ -765,8 +776,10 @@ func (db *DB) mCompaction() {
}
func (db *DB) tCompaction() {
- var x cCmd
- var ackQ []cCmd
+ var (
+ x cCmd
+ ackQ, waitQ []cCmd
+ )
defer func() {
if x := recover(); x != nil {
@@ -778,6 +791,10 @@ func (db *DB) tCompaction() {
ackQ[i].ack(ErrClosed)
ackQ[i] = nil
}
+ for i := range waitQ {
+ waitQ[i].ack(ErrClosed)
+ waitQ[i] = nil
+ }
if x != nil {
x.ack(ErrClosed)
}
@@ -795,12 +812,25 @@ func (db *DB) tCompaction() {
return
default:
}
+ // Resume write operation as soon as possible.
+ if len(waitQ) > 0 && db.resumeWrite() {
+ for i := range waitQ {
+ waitQ[i].ack(nil)
+ waitQ[i] = nil
+ }
+ waitQ = waitQ[:0]
+ }
} else {
for i := range ackQ {
ackQ[i].ack(nil)
ackQ[i] = nil
}
ackQ = ackQ[:0]
+ for i := range waitQ {
+ waitQ[i].ack(nil)
+ waitQ[i] = nil
+ }
+ waitQ = waitQ[:0]
select {
case x = <-db.tcompCmdC:
case ch := <-db.tcompPauseC:
@@ -813,7 +843,11 @@ func (db *DB) tCompaction() {
if x != nil {
switch cmd := x.(type) {
case cAuto:
- ackQ = append(ackQ, x)
+ if cmd.ackC != nil {
+ waitQ = append(waitQ, x)
+ } else {
+ ackQ = append(ackQ, x)
+ }
case cRange:
x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
default: