aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com
diff options
context:
space:
mode:
authorale <ale@incal.net>2019-09-26 12:10:23 +0100
committerale <ale@incal.net>2019-09-26 12:10:23 +0100
commit97221e85d5ab47a1443a7aea0b0461bac5230854 (patch)
tree7a8fe4093e580f1db64822164fb406b866d2b402 /vendor/github.com
parent64eb5fb23f64f209e3d813e017097044a111151f (diff)
downloadcrawl-97221e85d5ab47a1443a7aea0b0461bac5230854.tar.gz
crawl-97221e85d5ab47a1443a7aea0b0461bac5230854.zip
Update vendored dependencies
Diffstat (limited to 'vendor/github.com')
-rw-r--r--vendor/github.com/golang/snappy/AUTHORS1
-rw-r--r--vendor/github.com/golang/snappy/CONTRIBUTORS1
-rw-r--r--vendor/github.com/golang/snappy/decode.go4
-rw-r--r--vendor/github.com/golang/snappy/decode_other.go22
-rw-r--r--vendor/github.com/golang/snappy/encode.go4
-rw-r--r--vendor/github.com/golang/snappy/go.mod1
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/batch.go5
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/db.go50
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go13
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go43
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go4
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go20
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/filter/bloom.go2
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go4
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go21
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/session.go55
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go38
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/session_util.go240
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/table.go91
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go4
-rw-r--r--vendor/github.com/syndtr/goleveldb/leveldb/version.go81
21 files changed, 602 insertions, 102 deletions
diff --git a/vendor/github.com/golang/snappy/AUTHORS b/vendor/github.com/golang/snappy/AUTHORS
index bcfa195..f10b49b 100644
--- a/vendor/github.com/golang/snappy/AUTHORS
+++ b/vendor/github.com/golang/snappy/AUTHORS
@@ -11,5 +11,6 @@
Damian Gryski <dgryski@gmail.com>
Google Inc.
Jan Mercl <0xjnml@gmail.com>
+Klaus Post <klauspost@gmail.com>
Rodolfo Carvalho <rhcarvalho@gmail.com>
Sebastien Binet <seb.binet@gmail.com>
diff --git a/vendor/github.com/golang/snappy/CONTRIBUTORS b/vendor/github.com/golang/snappy/CONTRIBUTORS
index 931ae31..3bd40cf 100644
--- a/vendor/github.com/golang/snappy/CONTRIBUTORS
+++ b/vendor/github.com/golang/snappy/CONTRIBUTORS
@@ -29,6 +29,7 @@
Damian Gryski <dgryski@gmail.com>
Jan Mercl <0xjnml@gmail.com>
Kai Backman <kaib@golang.org>
+Klaus Post <klauspost@gmail.com>
Marc-Antoine Ruel <maruel@chromium.org>
Nigel Tao <nigeltao@golang.org>
Rob Pike <r@golang.org>
diff --git a/vendor/github.com/golang/snappy/decode.go b/vendor/github.com/golang/snappy/decode.go
index 72efb03..f1e04b1 100644
--- a/vendor/github.com/golang/snappy/decode.go
+++ b/vendor/github.com/golang/snappy/decode.go
@@ -52,6 +52,8 @@ const (
// Otherwise, a newly allocated slice will be returned.
//
// The dst and src must not overlap. It is valid to pass a nil dst.
+//
+// Decode handles the Snappy block format, not the Snappy stream format.
func Decode(dst, src []byte) ([]byte, error) {
dLen, s, err := decodedLen(src)
if err != nil {
@@ -83,6 +85,8 @@ func NewReader(r io.Reader) *Reader {
}
// Reader is an io.Reader that can read Snappy-compressed bytes.
+//
+// Reader handles the Snappy stream format, not the Snappy block format.
type Reader struct {
r io.Reader
err error
diff --git a/vendor/github.com/golang/snappy/decode_other.go b/vendor/github.com/golang/snappy/decode_other.go
index 8c9f204..b88318e 100644
--- a/vendor/github.com/golang/snappy/decode_other.go
+++ b/vendor/github.com/golang/snappy/decode_other.go
@@ -85,14 +85,28 @@ func decode(dst, src []byte) int {
if offset <= 0 || d < offset || length > len(dst)-d {
return decodeErrCodeCorrupt
}
- // Copy from an earlier sub-slice of dst to a later sub-slice. Unlike
- // the built-in copy function, this byte-by-byte copy always runs
+ // Copy from an earlier sub-slice of dst to a later sub-slice.
+ // If no overlap, use the built-in copy:
+ if offset >= length {
+ copy(dst[d:d+length], dst[d-offset:])
+ d += length
+ continue
+ }
+
+ // Unlike the built-in copy function, this byte-by-byte copy always runs
// forwards, even if the slices overlap. Conceptually, this is:
//
// d += forwardCopy(dst[d:d+length], dst[d-offset:])
- for end := d + length; d != end; d++ {
- dst[d] = dst[d-offset]
+ //
+ // We align the slices into a and b and show the compiler they are the same size.
+ // This allows the loop to run without bounds checks.
+ a := dst[d : d+length]
+ b := dst[d-offset:]
+ b = b[:len(a)]
+ for i := range a {
+ a[i] = b[i]
}
+ d += length
}
if d != len(dst) {
return decodeErrCodeCorrupt
diff --git a/vendor/github.com/golang/snappy/encode.go b/vendor/github.com/golang/snappy/encode.go
index 8d393e9..7f23657 100644
--- a/vendor/github.com/golang/snappy/encode.go
+++ b/vendor/github.com/golang/snappy/encode.go
@@ -15,6 +15,8 @@ import (
// Otherwise, a newly allocated slice will be returned.
//
// The dst and src must not overlap. It is valid to pass a nil dst.
+//
+// Encode handles the Snappy block format, not the Snappy stream format.
func Encode(dst, src []byte) []byte {
if n := MaxEncodedLen(len(src)); n < 0 {
panic(ErrTooLarge)
@@ -139,6 +141,8 @@ func NewBufferedWriter(w io.Writer) *Writer {
}
// Writer is an io.Writer that can write Snappy-compressed bytes.
+//
+// Writer handles the Snappy stream format, not the Snappy block format.
type Writer struct {
w io.Writer
err error
diff --git a/vendor/github.com/golang/snappy/go.mod b/vendor/github.com/golang/snappy/go.mod
new file mode 100644
index 0000000..f6406bb
--- /dev/null
+++ b/vendor/github.com/golang/snappy/go.mod
@@ -0,0 +1 @@
+module github.com/golang/snappy
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/batch.go b/vendor/github.com/syndtr/goleveldb/leveldb/batch.go
index 2259200..823be93 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/batch.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/batch.go
@@ -238,6 +238,11 @@ func newBatch() interface{} {
return &Batch{}
}
+// MakeBatch returns empty batch with preallocated buffer.
+func MakeBatch(n int) *Batch {
+ return &Batch{data: make([]byte, 0, n)}
+}
+
func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error {
var index batchIndex
for i, o := 0, 0; o < len(data); i++ {
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db.go b/vendor/github.com/syndtr/goleveldb/leveldb/db.go
index b27c38d..74e9826 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/db.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/db.go
@@ -38,6 +38,12 @@ type DB struct {
inWritePaused int32 // The indicator whether write operation is paused by compaction
aliveSnaps, aliveIters int32
+ // Compaction statistic
+ memComp uint32 // The cumulative number of memory compaction
+ level0Comp uint32 // The cumulative number of level0 compaction
+ nonLevel0Comp uint32 // The cumulative number of non-level0 compaction
+ seekComp uint32 // The cumulative number of seek compaction
+
// Session.
s *session
@@ -468,7 +474,7 @@ func recoverTable(s *session, o *opt.Options) error {
}
// Commit.
- return s.commit(rec)
+ return s.commit(rec, false)
}
func (db *DB) recoverJournal() error {
@@ -538,7 +544,7 @@ func (db *DB) recoverJournal() error {
rec.setJournalNum(fd.Num)
rec.setSeqNum(db.seq)
- if err := db.s.commit(rec); err != nil {
+ if err := db.s.commit(rec, false); err != nil {
fr.Close()
return err
}
@@ -617,7 +623,7 @@ func (db *DB) recoverJournal() error {
// Commit.
rec.setJournalNum(db.journalFd.Num)
rec.setSeqNum(db.seq)
- if err := db.s.commit(rec); err != nil {
+ if err := db.s.commit(rec, false); err != nil {
// Close journal on error.
if db.journal != nil {
db.journal.Close()
@@ -872,6 +878,10 @@ func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
// DB. And a nil Range.Limit is treated as a key after all keys in
// the DB.
//
+// WARNING: Any slice returned by interator (e.g. slice returned by calling
+// Iterator.Key() or Iterator.Key() methods), its content should not be modified
+// unless noted otherwise.
+//
// The iterator must be released after use, by calling Release method.
//
// Also read Iterator documentation of the leveldb/iterator package.
@@ -953,15 +963,29 @@ func (db *DB) GetProperty(name string) (value string, err error) {
value = "Compactions\n" +
" Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" +
"-------+------------+---------------+---------------+---------------+---------------\n"
+ var totalTables int
+ var totalSize, totalRead, totalWrite int64
+ var totalDuration time.Duration
for level, tables := range v.levels {
duration, read, write := db.compStats.getStat(level)
if len(tables) == 0 && duration == 0 {
continue
}
+ totalTables += len(tables)
+ totalSize += tables.size()
+ totalRead += read
+ totalWrite += write
+ totalDuration += duration
value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(),
float64(read)/1048576.0, float64(write)/1048576.0)
}
+ value += "-------+------------+---------------+---------------+---------------+---------------\n"
+ value += fmt.Sprintf(" Total | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
+ totalTables, float64(totalSize)/1048576.0, totalDuration.Seconds(),
+ float64(totalRead)/1048576.0, float64(totalWrite)/1048576.0)
+ case p == "compcount":
+ value = fmt.Sprintf("MemComp:%d Level0Comp:%d NonLevel0Comp:%d SeekComp:%d", atomic.LoadUint32(&db.memComp), atomic.LoadUint32(&db.level0Comp), atomic.LoadUint32(&db.nonLevel0Comp), atomic.LoadUint32(&db.seekComp))
case p == "iostats":
value = fmt.Sprintf("Read(MB):%.5f Write(MB):%.5f",
float64(db.s.stor.reads())/1048576.0,
@@ -1013,11 +1037,16 @@ type DBStats struct {
BlockCacheSize int
OpenedTablesCount int
- LevelSizes []int64
+ LevelSizes Sizes
LevelTablesCounts []int
- LevelRead []int64
- LevelWrite []int64
+ LevelRead Sizes
+ LevelWrite Sizes
LevelDurations []time.Duration
+
+ MemComp uint32
+ Level0Comp uint32
+ NonLevel0Comp uint32
+ SeekComp uint32
}
// Stats populates s with database statistics.
@@ -1054,16 +1083,17 @@ func (db *DB) Stats(s *DBStats) error {
for level, tables := range v.levels {
duration, read, write := db.compStats.getStat(level)
- if len(tables) == 0 && duration == 0 {
- continue
- }
+
s.LevelDurations = append(s.LevelDurations, duration)
s.LevelRead = append(s.LevelRead, read)
s.LevelWrite = append(s.LevelWrite, write)
s.LevelSizes = append(s.LevelSizes, tables.size())
s.LevelTablesCounts = append(s.LevelTablesCounts, len(tables))
}
-
+ s.MemComp = atomic.LoadUint32(&db.memComp)
+ s.Level0Comp = atomic.LoadUint32(&db.level0Comp)
+ s.NonLevel0Comp = atomic.LoadUint32(&db.nonLevel0Comp)
+ s.SeekComp = atomic.LoadUint32(&db.seekComp)
return nil
}
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go
index 0c1b9a5..6b70eb2 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go
@@ -8,6 +8,7 @@ package leveldb
import (
"sync"
+ "sync/atomic"
"time"
"github.com/syndtr/goleveldb/leveldb/errors"
@@ -260,7 +261,7 @@ func (db *DB) compactionCommit(name string, rec *sessionRecord) {
db.compCommitLk.Lock()
defer db.compCommitLk.Unlock() // Defer is necessary.
db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error {
- return db.s.commit(rec)
+ return db.s.commit(rec, true)
}, nil)
}
@@ -324,10 +325,12 @@ func (db *DB) memCompaction() {
db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration)
+ // Save compaction stats
for _, r := range rec.addedTables {
stats.write += r.size
}
db.compStats.addStat(flushLevel, stats)
+ atomic.AddUint32(&db.memComp, 1)
// Drop frozen memdb.
db.dropFrozenMem()
@@ -588,6 +591,14 @@ func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
for i := range stats {
db.compStats.addStat(c.sourceLevel+1, &stats[i])
}
+ switch c.typ {
+ case level0Compaction:
+ atomic.AddUint32(&db.level0Comp, 1)
+ case nonLevel0Compaction:
+ atomic.AddUint32(&db.nonLevel0Comp, 1)
+ case seekCompaction:
+ atomic.AddUint32(&db.seekComp, 1)
+ }
}
func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error {
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go
index 03c24cd..e6e8ca5 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go
@@ -78,13 +78,17 @@ func (db *DB) newIterator(auxm *memDB, auxt tFiles, seq uint64, slice *util.Rang
}
rawIter := db.newRawIterator(auxm, auxt, islice, ro)
iter := &dbIter{
- db: db,
- icmp: db.s.icmp,
- iter: rawIter,
- seq: seq,
- strict: opt.GetStrict(db.s.o.Options, ro, opt.StrictReader),
- key: make([]byte, 0),
- value: make([]byte, 0),
+ db: db,
+ icmp: db.s.icmp,
+ iter: rawIter,
+ seq: seq,
+ strict: opt.GetStrict(db.s.o.Options, ro, opt.StrictReader),
+ disableSampling: db.s.o.GetDisableSeeksCompaction() || db.s.o.GetIteratorSamplingRate() <= 0,
+ key: make([]byte, 0),
+ value: make([]byte, 0),
+ }
+ if !iter.disableSampling {
+ iter.samplingGap = db.iterSamplingRate()
}
atomic.AddInt32(&db.aliveIters, 1)
runtime.SetFinalizer(iter, (*dbIter).Release)
@@ -107,13 +111,14 @@ const (
// dbIter represent an interator states over a database session.
type dbIter struct {
- db *DB
- icmp *iComparer
- iter iterator.Iterator
- seq uint64
- strict bool
-
- smaplingGap int
+ db *DB
+ icmp *iComparer
+ iter iterator.Iterator
+ seq uint64
+ strict bool
+ disableSampling bool
+
+ samplingGap int
dir dir
key []byte
value []byte
@@ -122,10 +127,14 @@ type dbIter struct {
}
func (i *dbIter) sampleSeek() {
+ if i.disableSampling {
+ return
+ }
+
ikey := i.iter.Key()
- i.smaplingGap -= len(ikey) + len(i.iter.Value())
- for i.smaplingGap < 0 {
- i.smaplingGap += i.db.iterSamplingRate()
+ i.samplingGap -= len(ikey) + len(i.iter.Value())
+ for i.samplingGap < 0 {
+ i.samplingGap += i.db.iterSamplingRate()
i.db.sampleSeek(ikey)
}
}
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go
index 2c69d2e..c2ad70c 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go
@@ -142,6 +142,10 @@ func (snap *Snapshot) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error)
// DB. And a nil Range.Limit is treated as a key after all keys in
// the DB.
//
+// WARNING: Any slice returned by interator (e.g. slice returned by calling
+// Iterator.Key() or Iterator.Value() methods), its content should not be
+// modified unless noted otherwise.
+//
// The iterator must be released after use, by calling Release method.
// Releasing the snapshot doesn't mean releasing the iterator too, the
// iterator would be still valid until released.
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go
index b8f7e7d..21d1e51 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go
@@ -69,6 +69,13 @@ func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) {
// DB. And a nil Range.Limit is treated as a key after all keys in
// the DB.
//
+// The returned iterator has locks on its own resources, so it can live beyond
+// the lifetime of the transaction who creates them.
+//
+// WARNING: Any slice returned by interator (e.g. slice returned by calling
+// Iterator.Key() or Iterator.Key() methods), its content should not be modified
+// unless noted otherwise.
+//
// The iterator must be released after use, by calling Release method.
//
// Also read Iterator documentation of the leveldb/iterator package.
@@ -205,7 +212,7 @@ func (tr *Transaction) Commit() error {
tr.stats.startTimer()
var cerr error
for retry := 0; retry < 3; retry++ {
- cerr = tr.db.s.commit(&tr.rec)
+ cerr = tr.db.s.commit(&tr.rec, false)
if cerr != nil {
tr.db.logf("transaction@commit error R·%d %q", retry, cerr)
select {
@@ -248,13 +255,14 @@ func (tr *Transaction) discard() {
// Discard transaction.
for _, t := range tr.tables {
tr.db.logf("transaction@discard @%d", t.fd.Num)
- if err1 := tr.db.s.stor.Remove(t.fd); err1 == nil {
- tr.db.s.reuseFileNum(t.fd.Num)
- }
+ // Iterator may still use the table, so we use tOps.remove here.
+ tr.db.s.tops.remove(t.fd)
}
}
// Discard discards the transaction.
+// This method is noop if transaction is already closed (either committed or
+// discarded)
//
// Other methods should not be called after transaction has been discarded.
func (tr *Transaction) Discard() {
@@ -278,8 +286,10 @@ func (db *DB) waitCompaction() error {
// until in-flight transaction is committed or discarded.
// The returned transaction handle is safe for concurrent use.
//
-// Transaction is expensive and can overwhelm compaction, especially if
+// Transaction is very expensive and can overwhelm compaction, especially if
// transaction size is small. Use with caution.
+// The rule of thumb is if you need to merge at least same amount of
+// `Options.WriteBuffer` worth of data then use transaction, otherwise don't.
//
// The transaction must be closed once done, either by committing or discarding
// the transaction.
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/filter/bloom.go b/vendor/github.com/syndtr/goleveldb/leveldb/filter/bloom.go
index bab0e99..56ccbfb 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/filter/bloom.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/filter/bloom.go
@@ -16,7 +16,7 @@ func bloomHash(key []byte) uint32 {
type bloomFilter int
-// The bloom filter serializes its parameters and is backward compatible
+// Name: The bloom filter serializes its parameters and is backward compatible
// with respect to them. Therefor, its parameters are not added to its
// name.
func (bloomFilter) Name() string {
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go b/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go
index b661c08..824e47f 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go
@@ -397,6 +397,10 @@ func (p *DB) Find(key []byte) (rkey, value []byte, err error) {
// DB. And a nil Range.Limit is treated as a key after all keys in
// the DB.
//
+// WARNING: Any slice returned by interator (e.g. slice returned by calling
+// Iterator.Key() or Iterator.Key() methods), its content should not be modified
+// unless noted otherwise.
+//
// The iterator must be released after use, by calling Release method.
//
// Also read Iterator documentation of the leveldb/iterator package.
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go b/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go
index 528b164..c02c1e9 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go
@@ -278,6 +278,14 @@ type Options struct {
// The default is false.
DisableLargeBatchTransaction bool
+ // DisableSeeksCompaction allows disabling 'seeks triggered compaction'.
+ // The purpose of 'seeks triggered compaction' is to optimize database so
+ // that 'level seeks' can be minimized, however this might generate many
+ // small compaction which may not preferable.
+ //
+ // The default is false.
+ DisableSeeksCompaction bool
+
// ErrorIfExist defines whether an error should returned if the DB already
// exist.
//
@@ -309,6 +317,8 @@ type Options struct {
// IteratorSamplingRate defines approximate gap (in bytes) between read
// sampling of an iterator. The samples will be used to determine when
// compaction should be triggered.
+ // Use negative value to disable iterator sampling.
+ // The iterator sampling is disabled if DisableSeeksCompaction is true.
//
// The default is 1MiB.
IteratorSamplingRate int
@@ -526,6 +536,13 @@ func (o *Options) GetDisableLargeBatchTransaction() bool {
return o.DisableLargeBatchTransaction
}
+func (o *Options) GetDisableSeeksCompaction() bool {
+ if o == nil {
+ return false
+ }
+ return o.DisableSeeksCompaction
+}
+
func (o *Options) GetErrorIfExist() bool {
if o == nil {
return false
@@ -548,8 +565,10 @@ func (o *Options) GetFilter() filter.Filter {
}
func (o *Options) GetIteratorSamplingRate() int {
- if o == nil || o.IteratorSamplingRate <= 0 {
+ if o == nil || o.IteratorSamplingRate == 0 {
return DefaultIteratorSamplingRate
+ } else if o.IteratorSamplingRate < 0 {
+ return 0
}
return o.IteratorSamplingRate
}
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session.go b/vendor/github.com/syndtr/goleveldb/leveldb/session.go
index 3f391f9..7310209 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/session.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/session.go
@@ -47,15 +47,24 @@ type session struct {
o *cachedOptions
icmp *iComparer
tops *tOps
- fileRef map[int64]int
manifest *journal.Writer
manifestWriter storage.Writer
manifestFd storage.FileDesc
- stCompPtrs []internalKey // compaction pointers; need external synchronization
- stVersion *version // current version
- vmu sync.Mutex
+ stCompPtrs []internalKey // compaction pointers; need external synchronization
+ stVersion *version // current version
+ ntVersionId int64 // next version id to assign
+ refCh chan *vTask
+ relCh chan *vTask
+ deltaCh chan *vDelta
+ abandon chan int64
+ closeC chan struct{}
+ closeW sync.WaitGroup
+ vmu sync.Mutex
+
+ // Testing fields
+ fileRefCh chan chan map[int64]int // channel used to pass current reference stat
}
// Creates new initialized session instance.
@@ -68,13 +77,21 @@ func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
return
}
s = &session{
- stor: newIStorage(stor),
- storLock: storLock,
- fileRef: make(map[int64]int),
+ stor: newIStorage(stor),
+ storLock: storLock,
+ refCh: make(chan *vTask),
+ relCh: make(chan *vTask),
+ deltaCh: make(chan *vDelta),
+ abandon: make(chan int64),
+ fileRefCh: make(chan chan map[int64]int),
+ closeC: make(chan struct{}),
}
s.setOptions(o)
s.tops = newTableOps(s)
- s.setVersion(newVersion(s))
+
+ s.closeW.Add(1)
+ go s.refLoop()
+ s.setVersion(nil, newVersion(s))
s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed")
return
}
@@ -90,7 +107,11 @@ func (s *session) close() {
}
s.manifest = nil
s.manifestWriter = nil
- s.setVersion(&version{s: s, closing: true})
+ s.setVersion(nil, &version{s: s, closing: true, id: s.ntVersionId})
+
+ // Close all background goroutines
+ close(s.closeC)
+ s.closeW.Wait()
}
// Release session lock.
@@ -180,19 +201,27 @@ func (s *session) recover() (err error) {
}
s.manifestFd = fd
- s.setVersion(staging.finish())
+ s.setVersion(rec, staging.finish(false))
s.setNextFileNum(rec.nextFileNum)
s.recordCommited(rec)
return nil
}
// Commit session; need external synchronization.
-func (s *session) commit(r *sessionRecord) (err error) {
+func (s *session) commit(r *sessionRecord, trivial bool) (err error) {
v := s.version()
defer v.release()
// spawn new version based on current version
- nv := v.spawn(r)
+ nv := v.spawn(r, trivial)
+
+ // abandon useless version id to prevent blocking version processing loop.
+ defer func() {
+ if err != nil {
+ s.abandon <- nv.id
+ s.logf("commit@abandon useless vid D%d", nv.id)
+ }
+ }()
if s.manifest == nil {
// manifest journal writer not yet created, create one
@@ -203,7 +232,7 @@ func (s *session) commit(r *sessionRecord) (err error) {
// finally, apply new version if no error rise
if err == nil {
- s.setVersion(nv)
+ s.setVersion(r, nv)
}
return
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go
index 089cd00..4c1d336 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go
@@ -14,6 +14,13 @@ import (
"github.com/syndtr/goleveldb/leveldb/opt"
)
+const (
+ undefinedCompaction = iota
+ level0Compaction
+ nonLevel0Compaction
+ seekCompaction
+)
+
func (s *session) pickMemdbLevel(umin, umax []byte, maxLevel int) int {
v := s.version()
defer v.release()
@@ -50,6 +57,7 @@ func (s *session) pickCompaction() *compaction {
var sourceLevel int
var t0 tFiles
+ var typ int
if v.cScore >= 1 {
sourceLevel = v.cLevel
cptr := s.getCompPtr(sourceLevel)
@@ -63,18 +71,24 @@ func (s *session) pickCompaction() *compaction {
if len(t0) == 0 {
t0 = append(t0, tables[0])
}
+ if sourceLevel == 0 {
+ typ = level0Compaction
+ } else {
+ typ = nonLevel0Compaction
+ }
} else {
if p := atomic.LoadPointer(&v.cSeek); p != nil {
ts := (*tSet)(p)
sourceLevel = ts.level
t0 = append(t0, ts.table)
+ typ = seekCompaction
} else {
v.release()
return nil
}
}
- return newCompaction(s, v, sourceLevel, t0)
+ return newCompaction(s, v, sourceLevel, t0, typ)
}
// Create compaction from given level and range; need external synchronization.
@@ -109,13 +123,18 @@ func (s *session) getCompactionRange(sourceLevel int, umin, umax []byte, noLimit
}
}
- return newCompaction(s, v, sourceLevel, t0)
+ typ := level0Compaction
+ if sourceLevel != 0 {
+ typ = nonLevel0Compaction
+ }
+ return newCompaction(s, v, sourceLevel, t0, typ)
}
-func newCompaction(s *session, v *version, sourceLevel int, t0 tFiles) *compaction {
+func newCompaction(s *session, v *version, sourceLevel int, t0 tFiles, typ int) *compaction {
c := &compaction{
s: s,
v: v,
+ typ: typ,
sourceLevel: sourceLevel,
levels: [2]tFiles{t0, nil},
maxGPOverlaps: int64(s.o.GetCompactionGPOverlaps(sourceLevel)),
@@ -131,6 +150,7 @@ type compaction struct {
s *session
v *version
+ typ int
sourceLevel int
levels [2]tFiles
maxGPOverlaps int64
@@ -181,10 +201,14 @@ func (c *compaction) expand() {
t0, t1 := c.levels[0], c.levels[1]
imin, imax := t0.getRange(c.s.icmp)
- // We expand t0 here just incase ukey hop across tables.
- t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0)
- if len(t0) != len(c.levels[0]) {
- imin, imax = t0.getRange(c.s.icmp)
+
+ // For non-zero levels, the ukey can't hop across tables at all.
+ if c.sourceLevel == 0 {
+ // We expand t0 here just incase ukey hop across tables.
+ t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0)
+ if len(t0) != len(c.levels[0]) {
+ imin, imax = t0.getRange(c.s.icmp)
+ }
}
t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
// Get entire range covered by compaction.
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go b/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go
index 40cb2cf..fc56b63 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go
@@ -9,6 +9,7 @@ package leveldb
import (
"fmt"
"sync/atomic"
+ "time"
"github.com/syndtr/goleveldb/leveldb/journal"
"github.com/syndtr/goleveldb/leveldb/storage"
@@ -39,19 +40,213 @@ func (s *session) newTemp() storage.FileDesc {
return storage.FileDesc{Type: storage.TypeTemp, Num: num}
}
-func (s *session) addFileRef(fd storage.FileDesc, ref int) int {
- ref += s.fileRef[fd.Num]
- if ref > 0 {
- s.fileRef[fd.Num] = ref
- } else if ref == 0 {
- delete(s.fileRef, fd.Num)
- } else {
- panic(fmt.Sprintf("negative ref: %v", fd))
- }
- return ref
+// Session state.
+
+const (
+ // maxCachedNumber represents the maximum number of version tasks
+ // that can be cached in the ref loop.
+ maxCachedNumber = 256
+
+ // maxCachedTime represents the maximum time for ref loop to cache
+ // a version task.
+ maxCachedTime = 5 * time.Minute
+)
+
+// vDelta indicates the change information between the next version
+// and the currently specified version
+type vDelta struct {
+ vid int64
+ added []int64
+ deleted []int64
}
-// Session state.
+// vTask defines a version task for either reference or release.
+type vTask struct {
+ vid int64
+ files []tFiles
+ created time.Time
+}
+
+func (s *session) refLoop() {
+ var (
+ fileRef = make(map[int64]int) // Table file reference counter
+ ref = make(map[int64]*vTask) // Current referencing version store
+ deltas = make(map[int64]*vDelta)
+ referenced = make(map[int64]struct{})
+ released = make(map[int64]*vDelta) // Released version that waiting for processing
+ abandoned = make(map[int64]struct{}) // Abandoned version id
+ next, last int64
+ )
+ // addFileRef adds file reference counter with specified file number and
+ // reference value
+ addFileRef := func(fnum int64, ref int) int {
+ ref += fileRef[fnum]
+ if ref > 0 {
+ fileRef[fnum] = ref
+ } else if ref == 0 {
+ delete(fileRef, fnum)
+ } else {
+ panic(fmt.Sprintf("negative ref: %v", fnum))
+ }
+ return ref
+ }
+ // skipAbandoned skips useless abandoned version id.
+ skipAbandoned := func() bool {
+ if _, exist := abandoned[next]; exist {
+ delete(abandoned, next)
+ return true
+ }
+ return false
+ }
+ // applyDelta applies version change to current file reference.
+ applyDelta := func(d *vDelta) {
+ for _, t := range d.added {
+ addFileRef(t, 1)
+ }
+ for _, t := range d.deleted {
+ if addFileRef(t, -1) == 0 {
+ s.tops.remove(storage.FileDesc{Type: storage.TypeTable, Num: t})
+ }
+ }
+ }
+
+ timer := time.NewTimer(0)
+ <-timer.C // discard the initial tick
+ defer timer.Stop()
+
+ // processTasks processes version tasks in strict order.
+ //
+ // If we want to use delta to reduce the cost of file references and dereferences,
+ // we must strictly follow the id of the version, otherwise some files that are
+ // being referenced will be deleted.
+ //
+ // In addition, some db operations (such as iterators) may cause a version to be
+ // referenced for a long time. In order to prevent such operations from blocking
+ // the entire processing queue, we will properly convert some of the version tasks
+ // into full file references and releases.
+ processTasks := func() {
+ timer.Reset(maxCachedTime)
+ // Make sure we don't cache too many version tasks.
+ for {
+ // Skip any abandoned version number to prevent blocking processing.
+ if skipAbandoned() {
+ next += 1
+ continue
+ }
+ // Don't bother the version that has been released.
+ if _, exist := released[next]; exist {
+ break
+ }
+ // Ensure the specified version has been referenced.
+ if _, exist := ref[next]; !exist {
+ break
+ }
+ if last-next < maxCachedNumber && time.Since(ref[next].created) < maxCachedTime {
+ break
+ }
+ // Convert version task into full file references and releases mode.
+ // Reference version(i+1) first and wait version(i) to release.
+ // FileRef(i+1) = FileRef(i) + Delta(i)
+ for _, tt := range ref[next].files {
+ for _, t := range tt {
+ addFileRef(t.fd.Num, 1)
+ }
+ }
+ // Note, if some compactions take a long time, even more than 5 minutes,
+ // we may miss the corresponding delta information here.
+ // Fortunately it will not affect the correctness of the file reference,
+ // and we can apply the delta once we receive it.
+ if d := deltas[next]; d != nil {
+ applyDelta(d)
+ }
+ referenced[next] = struct{}{}
+ delete(ref, next)
+ delete(deltas, next)
+ next += 1
+ }
+
+ // Use delta information to process all released versions.
+ for {
+ if skipAbandoned() {
+ next += 1
+ continue
+ }
+ if d, exist := released[next]; exist {
+ if d != nil {
+ applyDelta(d)
+ }
+ delete(released, next)
+ next += 1
+ continue
+ }
+ return
+ }
+ }
+
+ for {
+ processTasks()
+
+ select {
+ case t := <-s.refCh:
+ if _, exist := ref[t.vid]; exist {
+ panic("duplicate reference request")
+ }
+ ref[t.vid] = t
+ if t.vid > last {
+ last = t.vid
+ }
+
+ case d := <-s.deltaCh:
+ if _, exist := ref[d.vid]; !exist {
+ if _, exist2 := referenced[d.vid]; !exist2 {
+ panic("invalid release request")
+ }
+ // The reference opt is already expired, apply
+ // delta here.
+ applyDelta(d)
+ continue
+ }
+ deltas[d.vid] = d
+
+ case t := <-s.relCh:
+ if _, exist := referenced[t.vid]; exist {
+ for _, tt := range t.files {
+ for _, t := range tt {
+ if addFileRef(t.fd.Num, -1) == 0 {
+ s.tops.remove(t.fd)
+ }
+ }
+ }
+ delete(referenced, t.vid)
+ continue
+ }
+ if _, exist := ref[t.vid]; !exist {
+ panic("invalid release request")
+ }
+ released[t.vid] = deltas[t.vid]
+ delete(deltas, t.vid)
+ delete(ref, t.vid)
+
+ case id := <-s.abandon:
+ if id >= next {
+ abandoned[id] = struct{}{}
+ }
+
+ case <-timer.C:
+
+ case r := <-s.fileRefCh:
+ ref := make(map[int64]int)
+ for f, c := range fileRef {
+ ref[f] = c
+ }
+ r <- ref
+
+ case <-s.closeC:
+ s.closeW.Done()
+ return
+ }
+ }
+}
// Get current version. This will incr version ref, must call
// version.release (exactly once) after use.
@@ -69,13 +264,30 @@ func (s *session) tLen(level int) int {
}
// Set current version to v.
-func (s *session) setVersion(v *version) {
+func (s *session) setVersion(r *sessionRecord, v *version) {
s.vmu.Lock()
defer s.vmu.Unlock()
// Hold by session. It is important to call this first before releasing
// current version, otherwise the still used files might get released.
v.incref()
if s.stVersion != nil {
+ if r != nil {
+ var (
+ added = make([]int64, 0, len(r.addedTables))
+ deleted = make([]int64, 0, len(r.deletedTables))
+ )
+ for _, t := range r.addedTables {
+ added = append(added, t.num)
+ }
+ for _, t := range r.deletedTables {
+ deleted = append(deleted, t.num)
+ }
+ select {
+ case s.deltaCh <- &vDelta{vid: s.stVersion.id, added: added, deleted: deleted}:
+ case <-v.s.closeC:
+ s.log("reference loop already exist")
+ }
+ }
// Release current version.
s.stVersion.releaseNB()
}
@@ -96,7 +308,7 @@ func (s *session) setNextFileNum(num int64) {
func (s *session) markFileNum(num int64) {
nextFileNum := num + 1
for {
- old, x := s.stNextFileNum, nextFileNum
+ old, x := atomic.LoadInt64(&s.stNextFileNum), nextFileNum
if old > x {
x = old
}
@@ -114,7 +326,7 @@ func (s *session) allocFileNum() int64 {
// Reuse given file number.
func (s *session) reuseFileNum(num int64) {
for {
- old, x := s.stNextFileNum, num
+ old, x := atomic.LoadInt64(&s.stNextFileNum), num
if old != x+1 {
x = old
}
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/table.go b/vendor/github.com/syndtr/goleveldb/leveldb/table.go
index 1fac60d..b7759b2 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/table.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/table.go
@@ -7,6 +7,7 @@
package leveldb
import (
+ "bytes"
"fmt"
"sort"
"sync/atomic"
@@ -150,6 +151,30 @@ func (tf tFiles) searchMax(icmp *iComparer, ikey internalKey) int {
})
}
+// Searches smallest index of tables whose its file number
+// is smaller than the given number.
+func (tf tFiles) searchNumLess(num int64) int {
+ return sort.Search(len(tf), func(i int) bool {
+ return tf[i].fd.Num < num
+ })
+}
+
+// Searches smallest index of tables whose its smallest
+// key is after the given key.
+func (tf tFiles) searchMinUkey(icmp *iComparer, umin []byte) int {
+ return sort.Search(len(tf), func(i int) bool {
+ return icmp.ucmp.Compare(tf[i].imin.ukey(), umin) > 0
+ })
+}
+
+// Searches smallest index of tables whose its largest
+// key is after the given key.
+func (tf tFiles) searchMaxUkey(icmp *iComparer, umax []byte) int {
+ return sort.Search(len(tf), func(i int) bool {
+ return icmp.ucmp.Compare(tf[i].imax.ukey(), umax) > 0
+ })
+}
+
// Returns true if given key range overlaps with one or more
// tables key range. If unsorted is true then binary search will not be used.
func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool {
@@ -181,6 +206,50 @@ func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) boo
// expanded.
// The dst content will be overwritten.
func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles {
+ // Short circuit if tf is empty
+ if len(tf) == 0 {
+ return nil
+ }
+ // For non-zero levels, there is no ukey hop across at all.
+ // And what's more, the files in these levels are strictly sorted,
+ // so use binary search instead of heavy traverse.
+ if !overlapped {
+ var begin, end int
+ // Determine the begin index of the overlapped file
+ if umin != nil {
+ index := tf.searchMinUkey(icmp, umin)
+ if index == 0 {
+ begin = 0
+ } else if bytes.Compare(tf[index-1].imax.ukey(), umin) >= 0 {
+ // The min ukey overlaps with the index-1 file, expand it.
+ begin = index - 1
+ } else {
+ begin = index
+ }
+ }
+ // Determine the end index of the overlapped file
+ if umax != nil {
+ index := tf.searchMaxUkey(icmp, umax)
+ if index == len(tf) {
+ end = len(tf)
+ } else if bytes.Compare(tf[index].imin.ukey(), umax) <= 0 {
+ // The max ukey overlaps with the index file, expand it.
+ end = index + 1
+ } else {
+ end = index
+ }
+ } else {
+ end = len(tf)
+ }
+ // Ensure the overlapped file indexes are valid.
+ if begin >= end {
+ return nil
+ }
+ dst = make([]*tFile, end-begin)
+ copy(dst, tf[begin:end])
+ return dst
+ }
+
dst = dst[:0]
for i := 0; i < len(tf); {
t := tf[i]
@@ -193,11 +262,9 @@ func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, ove
} else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 {
umax = t.imax.ukey()
// Restart search if it is overlapped.
- if overlapped {
- dst = dst[:0]
- i = 0
- continue
- }
+ dst = dst[:0]
+ i = 0
+ continue
}
dst = append(dst, t)
@@ -416,16 +483,18 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite
// Removes table from persistent storage. It waits until
// no one use the the table.
-func (t *tOps) remove(f *tFile) {
- t.cache.Delete(0, uint64(f.fd.Num), func() {
- if err := t.s.stor.Remove(f.fd); err != nil {
- t.s.logf("table@remove removing @%d %q", f.fd.Num, err)
+func (t *tOps) remove(fd storage.FileDesc) {
+ t.cache.Delete(0, uint64(fd.Num), func() {
+ if err := t.s.stor.Remove(fd); err != nil {
+ t.s.logf("table@remove removing @%d %q", fd.Num, err)
} else {
- t.s.logf("table@remove removed @%d", f.fd.Num)
+ t.s.logf("table@remove removed @%d", fd.Num)
}
if t.evictRemoved && t.bcache != nil {
- t.bcache.EvictNS(uint64(f.fd.Num))
+ t.bcache.EvictNS(uint64(fd.Num))
}
+ // Try to reuse file num, useful for discarded transaction.
+ t.s.reuseFileNum(fd.Num)
})
}
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go b/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go
index 16cfbaa..496feb6 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go
@@ -787,6 +787,10 @@ func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChe
// table. And a nil Range.Limit is treated as a key after all keys in
// the table.
//
+// WARNING: Any slice returned by interator (e.g. slice returned by calling
+// Iterator.Key() or Iterator.Key() methods), its content should not be modified
+// unless noted otherwise.
+//
// The returned iterator is not safe for concurrent use and should be released
// after use.
//
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/version.go b/vendor/github.com/syndtr/goleveldb/leveldb/version.go
index 73f272a..9535e35 100644
--- a/vendor/github.com/syndtr/goleveldb/leveldb/version.go
+++ b/vendor/github.com/syndtr/goleveldb/leveldb/version.go
@@ -9,6 +9,7 @@ package leveldb
import (
"fmt"
"sync/atomic"
+ "time"
"unsafe"
"github.com/syndtr/goleveldb/leveldb/iterator"
@@ -22,7 +23,8 @@ type tSet struct {
}
type version struct {
- s *session
+ id int64 // unique monotonous increasing version id
+ s *session
levels []tFiles
@@ -39,8 +41,11 @@ type version struct {
released bool
}
+// newVersion creates a new version with an unique monotonous increasing id.
func newVersion(s *session) *version {
- return &version{s: s}
+ id := atomic.AddInt64(&s.ntVersionId, 1)
+ nv := &version{s: s, id: id - 1}
+ return nv
}
func (v *version) incref() {
@@ -50,11 +55,11 @@ func (v *version) incref() {
v.ref++
if v.ref == 1 {
- // Incr file ref.
- for _, tt := range v.levels {
- for _, t := range tt {
- v.s.addFileRef(t.fd, 1)
- }
+ select {
+ case v.s.refCh <- &vTask{vid: v.id, files: v.levels, created: time.Now()}:
+ // We can use v.levels directly here since it is immutable.
+ case <-v.s.closeC:
+ v.s.log("reference loop already exist")
}
}
}
@@ -66,13 +71,11 @@ func (v *version) releaseNB() {
} else if v.ref < 0 {
panic("negative version ref")
}
-
- for _, tt := range v.levels {
- for _, t := range tt {
- if v.s.addFileRef(t.fd, -1) == 0 {
- v.s.tops.remove(t)
- }
- }
+ select {
+ case v.s.relCh <- &vTask{vid: v.id, files: v.levels, created: time.Now()}:
+ // We can use v.levels directly here since it is immutable.
+ case <-v.s.closeC:
+ v.s.log("reference loop already exist")
}
v.released = true
@@ -141,6 +144,7 @@ func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue
}
ukey := ikey.ukey()
+ sampleSeeks := !v.s.o.GetDisableSeeksCompaction()
var (
tset *tSet
@@ -158,7 +162,7 @@ func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue
// Since entries never hop across level, finding key/value
// in smaller level make later levels irrelevant.
v.walkOverlapping(aux, ikey, func(level int, t *tFile) bool {
- if level >= 0 && !tseek {
+ if sampleSeeks && level >= 0 && !tseek {
if tset == nil {
tset = &tSet{level, t}
} else {
@@ -273,10 +277,10 @@ func (v *version) newStaging() *versionStaging {
}
// Spawn a new version based on this version.
-func (v *version) spawn(r *sessionRecord) *version {
+func (v *version) spawn(r *sessionRecord, trivial bool) *version {
staging := v.newStaging()
staging.commit(r)
- return staging.finish()
+ return staging.finish(trivial)
}
func (v *version) fillRecord(r *sessionRecord) {
@@ -446,7 +450,7 @@ func (p *versionStaging) commit(r *sessionRecord) {
}
}
-func (p *versionStaging) finish() *version {
+func (p *versionStaging) finish(trivial bool) *version {
// Build new version.
nv := newVersion(p.base.s)
numLevel := len(p.levels)
@@ -463,6 +467,12 @@ func (p *versionStaging) finish() *version {
if level < len(p.levels) {
scratch := p.levels[level]
+ // Short circuit if there is no change at all.
+ if len(scratch.added) == 0 && len(scratch.deleted) == 0 {
+ nv.levels[level] = baseTabels
+ continue
+ }
+
var nt tFiles
// Prealloc list if possible.
if n := len(baseTabels) + len(scratch.added) - len(scratch.deleted); n > 0 {
@@ -480,6 +490,41 @@ func (p *versionStaging) finish() *version {
nt = append(nt, t)
}
+ // Avoid resort if only files in this level are deleted
+ if len(scratch.added) == 0 {
+ nv.levels[level] = nt
+ continue
+ }
+
+ // For normal table compaction, one compaction will only involve two levels
+ // of files. And the new files generated after merging the source level and
+ // source+1 level related files can be inserted as a whole into source+1 level
+ // without any overlap with the other source+1 files.
+ //
+ // When the amount of data maintained by leveldb is large, the number of files
+ // per level will be very large. While qsort is very inefficient for sorting
+ // already ordered arrays. Therefore, for the normal table compaction, we use
+ // binary search here to find the insert index to insert a batch of new added
+ // files directly instead of using qsort.
+ if trivial && len(scratch.added) > 0 {
+ added := make(tFiles, 0, len(scratch.added))
+ for _, r := range scratch.added {
+ added = append(added, tableFileFromRecord(r))
+ }
+ if level == 0 {
+ added.sortByNum()
+ index := nt.searchNumLess(added[len(added)-1].fd.Num)
+ nt = append(nt[:index], append(added, nt[index:]...)...)
+ } else {
+ added.sortByKey(p.base.s.icmp)
+ _, amax := added.getRange(p.base.s.icmp)
+ index := nt.searchMin(p.base.s.icmp, amax)
+ nt = append(nt[:index], append(added, nt[index:]...)...)
+ }
+ nv.levels[level] = nt
+ continue
+ }
+
// New tables.
for _, r := range scratch.added {
nt = append(nt, tableFileFromRecord(r))