aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/syndtr/goleveldb/leveldb/session_util.go')
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/session_util.go240
1 files changed, 226 insertions, 14 deletions
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go b/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go
index 40cb2cf..fc56b63 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go
@@ -9,6 +9,7 @@ package leveldb
import (
"fmt"
"sync/atomic"
+ "time"
"github.com/syndtr/goleveldb/leveldb/journal"
"github.com/syndtr/goleveldb/leveldb/storage"
@@ -39,19 +40,213 @@ func (s *session) newTemp() storage.FileDesc {
return storage.FileDesc{Type: storage.TypeTemp, Num: num}
}
-func (s *session) addFileRef(fd storage.FileDesc, ref int) int {
- ref += s.fileRef[fd.Num]
- if ref > 0 {
- s.fileRef[fd.Num] = ref
- } else if ref == 0 {
- delete(s.fileRef, fd.Num)
- } else {
- panic(fmt.Sprintf("negative ref: %v", fd))
- }
- return ref
+// Session state.
+
+const (
+ // maxCachedNumber represents the maximum number of version tasks
+ // that can be cached in the ref loop.
+ maxCachedNumber = 256
+
+ // maxCachedTime represents the maximum time for ref loop to cache
+ // a version task.
+ maxCachedTime = 5 * time.Minute
+)
+
+// vDelta indicates the change information between the next version
+// and the currently specified version
+type vDelta struct {
+ vid int64
+ added []int64
+ deleted []int64
}
-// Session state.
+// vTask defines a version task for either reference or release.
+type vTask struct {
+ vid int64
+ files []tFiles
+ created time.Time
+}
+
+func (s *session) refLoop() {
+ var (
+ fileRef = make(map[int64]int) // Table file reference counter
+ ref = make(map[int64]*vTask) // Current referencing version store
+ deltas = make(map[int64]*vDelta)
+ referenced = make(map[int64]struct{})
+ released = make(map[int64]*vDelta) // Released version that waiting for processing
+ abandoned = make(map[int64]struct{}) // Abandoned version id
+ next, last int64
+ )
+ // addFileRef adds file reference counter with specified file number and
+ // reference value
+ addFileRef := func(fnum int64, ref int) int {
+ ref += fileRef[fnum]
+ if ref > 0 {
+ fileRef[fnum] = ref
+ } else if ref == 0 {
+ delete(fileRef, fnum)
+ } else {
+ panic(fmt.Sprintf("negative ref: %v", fnum))
+ }
+ return ref
+ }
+ // skipAbandoned skips useless abandoned version id.
+ skipAbandoned := func() bool {
+ if _, exist := abandoned[next]; exist {
+ delete(abandoned, next)
+ return true
+ }
+ return false
+ }
+ // applyDelta applies version change to current file reference.
+ applyDelta := func(d *vDelta) {
+ for _, t := range d.added {
+ addFileRef(t, 1)
+ }
+ for _, t := range d.deleted {
+ if addFileRef(t, -1) == 0 {
+ s.tops.remove(storage.FileDesc{Type: storage.TypeTable, Num: t})
+ }
+ }
+ }
+
+ timer := time.NewTimer(0)
+ <-timer.C // discard the initial tick
+ defer timer.Stop()
+
+ // processTasks processes version tasks in strict order.
+ //
+ // If we want to use delta to reduce the cost of file references and dereferences,
+ // we must strictly follow the id of the version, otherwise some files that are
+ // being referenced will be deleted.
+ //
+ // In addition, some db operations (such as iterators) may cause a version to be
+ // referenced for a long time. In order to prevent such operations from blocking
+ // the entire processing queue, we will properly convert some of the version tasks
+ // into full file references and releases.
+ processTasks := func() {
+ timer.Reset(maxCachedTime)
+ // Make sure we don't cache too many version tasks.
+ for {
+ // Skip any abandoned version number to prevent blocking processing.
+ if skipAbandoned() {
+ next += 1
+ continue
+ }
+ // Don't bother the version that has been released.
+ if _, exist := released[next]; exist {
+ break
+ }
+ // Ensure the specified version has been referenced.
+ if _, exist := ref[next]; !exist {
+ break
+ }
+ if last-next < maxCachedNumber && time.Since(ref[next].created) < maxCachedTime {
+ break
+ }
+ // Convert version task into full file references and releases mode.
+ // Reference version(i+1) first and wait version(i) to release.
+ // FileRef(i+1) = FileRef(i) + Delta(i)
+ for _, tt := range ref[next].files {
+ for _, t := range tt {
+ addFileRef(t.fd.Num, 1)
+ }
+ }
+ // Note, if some compactions take a long time, even more than 5 minutes,
+ // we may miss the corresponding delta information here.
+ // Fortunately it will not affect the correctness of the file reference,
+ // and we can apply the delta once we receive it.
+ if d := deltas[next]; d != nil {
+ applyDelta(d)
+ }
+ referenced[next] = struct{}{}
+ delete(ref, next)
+ delete(deltas, next)
+ next += 1
+ }
+
+ // Use delta information to process all released versions.
+ for {
+ if skipAbandoned() {
+ next += 1
+ continue
+ }
+ if d, exist := released[next]; exist {
+ if d != nil {
+ applyDelta(d)
+ }
+ delete(released, next)
+ next += 1
+ continue
+ }
+ return
+ }
+ }
+
+ for {
+ processTasks()
+
+ select {
+ case t := <-s.refCh:
+ if _, exist := ref[t.vid]; exist {
+ panic("duplicate reference request")
+ }
+ ref[t.vid] = t
+ if t.vid > last {
+ last = t.vid
+ }
+
+ case d := <-s.deltaCh:
+ if _, exist := ref[d.vid]; !exist {
+ if _, exist2 := referenced[d.vid]; !exist2 {
+ panic("invalid release request")
+ }
+ // The reference opt is already expired, apply
+ // delta here.
+ applyDelta(d)
+ continue
+ }
+ deltas[d.vid] = d
+
+ case t := <-s.relCh:
+ if _, exist := referenced[t.vid]; exist {
+ for _, tt := range t.files {
+ for _, t := range tt {
+ if addFileRef(t.fd.Num, -1) == 0 {
+ s.tops.remove(t.fd)
+ }
+ }
+ }
+ delete(referenced, t.vid)
+ continue
+ }
+ if _, exist := ref[t.vid]; !exist {
+ panic("invalid release request")
+ }
+ released[t.vid] = deltas[t.vid]
+ delete(deltas, t.vid)
+ delete(ref, t.vid)
+
+ case id := <-s.abandon:
+ if id >= next {
+ abandoned[id] = struct{}{}
+ }
+
+ case <-timer.C:
+
+ case r := <-s.fileRefCh:
+ ref := make(map[int64]int)
+ for f, c := range fileRef {
+ ref[f] = c
+ }
+ r <- ref
+
+ case <-s.closeC:
+ s.closeW.Done()
+ return
+ }
+ }
+}
// Get current version. This will incr version ref, must call
// version.release (exactly once) after use.
@@ -69,13 +264,30 @@ func (s *session) tLen(level int) int {
}
// Set current version to v.
-func (s *session) setVersion(v *version) {
+func (s *session) setVersion(r *sessionRecord, v *version) {
s.vmu.Lock()
defer s.vmu.Unlock()
// Hold by session. It is important to call this first before releasing
// current version, otherwise the still used files might get released.
v.incref()
if s.stVersion != nil {
+ if r != nil {
+ var (
+ added = make([]int64, 0, len(r.addedTables))
+ deleted = make([]int64, 0, len(r.deletedTables))
+ )
+ for _, t := range r.addedTables {
+ added = append(added, t.num)
+ }
+ for _, t := range r.deletedTables {
+ deleted = append(deleted, t.num)
+ }
+ select {
+ case s.deltaCh <- &vDelta{vid: s.stVersion.id, added: added, deleted: deleted}:
+ case <-v.s.closeC:
+ s.log("reference loop already exist")
+ }
+ }
// Release current version.
s.stVersion.releaseNB()
}
@@ -96,7 +308,7 @@ func (s *session) setNextFileNum(num int64) {
func (s *session) markFileNum(num int64) {
nextFileNum := num + 1
for {
- old, x := s.stNextFileNum, nextFileNum
+ old, x := atomic.LoadInt64(&s.stNextFileNum), nextFileNum
if old > x {
x = old
}
@@ -114,7 +326,7 @@ func (s *session) allocFileNum() int64 {
// Reuse given file number.
func (s *session) reuseFileNum(num int64) {
for {
- old, x := s.stNextFileNum, num
+ old, x := atomic.LoadInt64(&s.stNextFileNum), num
if old != x+1 {
x = old
}