diff options
author | ale <ale@incal.net> | 2019-09-26 12:10:23 +0100 |
---|---|---|
committer | ale <ale@incal.net> | 2019-09-26 12:10:23 +0100 |
commit | 97221e85d5ab47a1443a7aea0b0461bac5230854 (patch) | |
tree | 7a8fe4093e580f1db64822164fb406b866d2b402 /vendor/github.com | |
parent | 64eb5fb23f64f209e3d813e017097044a111151f (diff) | |
download | crawl-97221e85d5ab47a1443a7aea0b0461bac5230854.tar.gz crawl-97221e85d5ab47a1443a7aea0b0461bac5230854.zip |
Update vendored dependencies
Diffstat (limited to 'vendor/github.com')
21 files changed, 602 insertions, 102 deletions
diff --git a/vendor/github.com/golang/snappy/AUTHORS b/vendor/github.com/golang/snappy/AUTHORS index bcfa195..f10b49b 100644 --- a/vendor/github.com/golang/snappy/AUTHORS +++ b/vendor/github.com/golang/snappy/AUTHORS @@ -11,5 +11,6 @@ Damian Gryski <dgryski@gmail.com> Google Inc. Jan Mercl <0xjnml@gmail.com> +Klaus Post <klauspost@gmail.com> Rodolfo Carvalho <rhcarvalho@gmail.com> Sebastien Binet <seb.binet@gmail.com> diff --git a/vendor/github.com/golang/snappy/CONTRIBUTORS b/vendor/github.com/golang/snappy/CONTRIBUTORS index 931ae31..3bd40cf 100644 --- a/vendor/github.com/golang/snappy/CONTRIBUTORS +++ b/vendor/github.com/golang/snappy/CONTRIBUTORS @@ -29,6 +29,7 @@ Damian Gryski <dgryski@gmail.com> Jan Mercl <0xjnml@gmail.com> Kai Backman <kaib@golang.org> +Klaus Post <klauspost@gmail.com> Marc-Antoine Ruel <maruel@chromium.org> Nigel Tao <nigeltao@golang.org> Rob Pike <r@golang.org> diff --git a/vendor/github.com/golang/snappy/decode.go b/vendor/github.com/golang/snappy/decode.go index 72efb03..f1e04b1 100644 --- a/vendor/github.com/golang/snappy/decode.go +++ b/vendor/github.com/golang/snappy/decode.go @@ -52,6 +52,8 @@ const ( // Otherwise, a newly allocated slice will be returned. // // The dst and src must not overlap. It is valid to pass a nil dst. +// +// Decode handles the Snappy block format, not the Snappy stream format. func Decode(dst, src []byte) ([]byte, error) { dLen, s, err := decodedLen(src) if err != nil { @@ -83,6 +85,8 @@ func NewReader(r io.Reader) *Reader { } // Reader is an io.Reader that can read Snappy-compressed bytes. +// +// Reader handles the Snappy stream format, not the Snappy block format. type Reader struct { r io.Reader err error diff --git a/vendor/github.com/golang/snappy/decode_other.go b/vendor/github.com/golang/snappy/decode_other.go index 8c9f204..b88318e 100644 --- a/vendor/github.com/golang/snappy/decode_other.go +++ b/vendor/github.com/golang/snappy/decode_other.go @@ -85,14 +85,28 @@ func decode(dst, src []byte) int { if offset <= 0 || d < offset || length > len(dst)-d { return decodeErrCodeCorrupt } - // Copy from an earlier sub-slice of dst to a later sub-slice. Unlike - // the built-in copy function, this byte-by-byte copy always runs + // Copy from an earlier sub-slice of dst to a later sub-slice. + // If no overlap, use the built-in copy: + if offset >= length { + copy(dst[d:d+length], dst[d-offset:]) + d += length + continue + } + + // Unlike the built-in copy function, this byte-by-byte copy always runs // forwards, even if the slices overlap. Conceptually, this is: // // d += forwardCopy(dst[d:d+length], dst[d-offset:]) - for end := d + length; d != end; d++ { - dst[d] = dst[d-offset] + // + // We align the slices into a and b and show the compiler they are the same size. + // This allows the loop to run without bounds checks. + a := dst[d : d+length] + b := dst[d-offset:] + b = b[:len(a)] + for i := range a { + a[i] = b[i] } + d += length } if d != len(dst) { return decodeErrCodeCorrupt diff --git a/vendor/github.com/golang/snappy/encode.go b/vendor/github.com/golang/snappy/encode.go index 8d393e9..7f23657 100644 --- a/vendor/github.com/golang/snappy/encode.go +++ b/vendor/github.com/golang/snappy/encode.go @@ -15,6 +15,8 @@ import ( // Otherwise, a newly allocated slice will be returned. // // The dst and src must not overlap. It is valid to pass a nil dst. +// +// Encode handles the Snappy block format, not the Snappy stream format. func Encode(dst, src []byte) []byte { if n := MaxEncodedLen(len(src)); n < 0 { panic(ErrTooLarge) @@ -139,6 +141,8 @@ func NewBufferedWriter(w io.Writer) *Writer { } // Writer is an io.Writer that can write Snappy-compressed bytes. +// +// Writer handles the Snappy stream format, not the Snappy block format. type Writer struct { w io.Writer err error diff --git a/vendor/github.com/golang/snappy/go.mod b/vendor/github.com/golang/snappy/go.mod new file mode 100644 index 0000000..f6406bb --- /dev/null +++ b/vendor/github.com/golang/snappy/go.mod @@ -0,0 +1 @@ +module github.com/golang/snappy diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/batch.go b/vendor/github.com/syndtr/goleveldb/leveldb/batch.go index 2259200..823be93 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/batch.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/batch.go @@ -238,6 +238,11 @@ func newBatch() interface{} { return &Batch{} } +// MakeBatch returns empty batch with preallocated buffer. +func MakeBatch(n int) *Batch { + return &Batch{data: make([]byte, 0, n)} +} + func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error { var index batchIndex for i, o := 0, 0; o < len(data); i++ { diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db.go b/vendor/github.com/syndtr/goleveldb/leveldb/db.go index b27c38d..74e9826 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db.go @@ -38,6 +38,12 @@ type DB struct { inWritePaused int32 // The indicator whether write operation is paused by compaction aliveSnaps, aliveIters int32 + // Compaction statistic + memComp uint32 // The cumulative number of memory compaction + level0Comp uint32 // The cumulative number of level0 compaction + nonLevel0Comp uint32 // The cumulative number of non-level0 compaction + seekComp uint32 // The cumulative number of seek compaction + // Session. s *session @@ -468,7 +474,7 @@ func recoverTable(s *session, o *opt.Options) error { } // Commit. - return s.commit(rec) + return s.commit(rec, false) } func (db *DB) recoverJournal() error { @@ -538,7 +544,7 @@ func (db *DB) recoverJournal() error { rec.setJournalNum(fd.Num) rec.setSeqNum(db.seq) - if err := db.s.commit(rec); err != nil { + if err := db.s.commit(rec, false); err != nil { fr.Close() return err } @@ -617,7 +623,7 @@ func (db *DB) recoverJournal() error { // Commit. rec.setJournalNum(db.journalFd.Num) rec.setSeqNum(db.seq) - if err := db.s.commit(rec); err != nil { + if err := db.s.commit(rec, false); err != nil { // Close journal on error. if db.journal != nil { db.journal.Close() @@ -872,6 +878,10 @@ func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) { // DB. And a nil Range.Limit is treated as a key after all keys in // the DB. // +// WARNING: Any slice returned by interator (e.g. slice returned by calling +// Iterator.Key() or Iterator.Key() methods), its content should not be modified +// unless noted otherwise. +// // The iterator must be released after use, by calling Release method. // // Also read Iterator documentation of the leveldb/iterator package. @@ -953,15 +963,29 @@ func (db *DB) GetProperty(name string) (value string, err error) { value = "Compactions\n" + " Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" + "-------+------------+---------------+---------------+---------------+---------------\n" + var totalTables int + var totalSize, totalRead, totalWrite int64 + var totalDuration time.Duration for level, tables := range v.levels { duration, read, write := db.compStats.getStat(level) if len(tables) == 0 && duration == 0 { continue } + totalTables += len(tables) + totalSize += tables.size() + totalRead += read + totalWrite += write + totalDuration += duration value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n", level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(), float64(read)/1048576.0, float64(write)/1048576.0) } + value += "-------+------------+---------------+---------------+---------------+---------------\n" + value += fmt.Sprintf(" Total | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n", + totalTables, float64(totalSize)/1048576.0, totalDuration.Seconds(), + float64(totalRead)/1048576.0, float64(totalWrite)/1048576.0) + case p == "compcount": + value = fmt.Sprintf("MemComp:%d Level0Comp:%d NonLevel0Comp:%d SeekComp:%d", atomic.LoadUint32(&db.memComp), atomic.LoadUint32(&db.level0Comp), atomic.LoadUint32(&db.nonLevel0Comp), atomic.LoadUint32(&db.seekComp)) case p == "iostats": value = fmt.Sprintf("Read(MB):%.5f Write(MB):%.5f", float64(db.s.stor.reads())/1048576.0, @@ -1013,11 +1037,16 @@ type DBStats struct { BlockCacheSize int OpenedTablesCount int - LevelSizes []int64 + LevelSizes Sizes LevelTablesCounts []int - LevelRead []int64 - LevelWrite []int64 + LevelRead Sizes + LevelWrite Sizes LevelDurations []time.Duration + + MemComp uint32 + Level0Comp uint32 + NonLevel0Comp uint32 + SeekComp uint32 } // Stats populates s with database statistics. @@ -1054,16 +1083,17 @@ func (db *DB) Stats(s *DBStats) error { for level, tables := range v.levels { duration, read, write := db.compStats.getStat(level) - if len(tables) == 0 && duration == 0 { - continue - } + s.LevelDurations = append(s.LevelDurations, duration) s.LevelRead = append(s.LevelRead, read) s.LevelWrite = append(s.LevelWrite, write) s.LevelSizes = append(s.LevelSizes, tables.size()) s.LevelTablesCounts = append(s.LevelTablesCounts, len(tables)) } - + s.MemComp = atomic.LoadUint32(&db.memComp) + s.Level0Comp = atomic.LoadUint32(&db.level0Comp) + s.NonLevel0Comp = atomic.LoadUint32(&db.nonLevel0Comp) + s.SeekComp = atomic.LoadUint32(&db.seekComp) return nil } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go index 0c1b9a5..6b70eb2 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go @@ -8,6 +8,7 @@ package leveldb import ( "sync" + "sync/atomic" "time" "github.com/syndtr/goleveldb/leveldb/errors" @@ -260,7 +261,7 @@ func (db *DB) compactionCommit(name string, rec *sessionRecord) { db.compCommitLk.Lock() defer db.compCommitLk.Unlock() // Defer is necessary. db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error { - return db.s.commit(rec) + return db.s.commit(rec, true) }, nil) } @@ -324,10 +325,12 @@ func (db *DB) memCompaction() { db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration) + // Save compaction stats for _, r := range rec.addedTables { stats.write += r.size } db.compStats.addStat(flushLevel, stats) + atomic.AddUint32(&db.memComp, 1) // Drop frozen memdb. db.dropFrozenMem() @@ -588,6 +591,14 @@ func (db *DB) tableCompaction(c *compaction, noTrivial bool) { for i := range stats { db.compStats.addStat(c.sourceLevel+1, &stats[i]) } + switch c.typ { + case level0Compaction: + atomic.AddUint32(&db.level0Comp, 1) + case nonLevel0Compaction: + atomic.AddUint32(&db.nonLevel0Comp, 1) + case seekCompaction: + atomic.AddUint32(&db.seekComp, 1) + } } func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error { diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go index 03c24cd..e6e8ca5 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go @@ -78,13 +78,17 @@ func (db *DB) newIterator(auxm *memDB, auxt tFiles, seq uint64, slice *util.Rang } rawIter := db.newRawIterator(auxm, auxt, islice, ro) iter := &dbIter{ - db: db, - icmp: db.s.icmp, - iter: rawIter, - seq: seq, - strict: opt.GetStrict(db.s.o.Options, ro, opt.StrictReader), - key: make([]byte, 0), - value: make([]byte, 0), + db: db, + icmp: db.s.icmp, + iter: rawIter, + seq: seq, + strict: opt.GetStrict(db.s.o.Options, ro, opt.StrictReader), + disableSampling: db.s.o.GetDisableSeeksCompaction() || db.s.o.GetIteratorSamplingRate() <= 0, + key: make([]byte, 0), + value: make([]byte, 0), + } + if !iter.disableSampling { + iter.samplingGap = db.iterSamplingRate() } atomic.AddInt32(&db.aliveIters, 1) runtime.SetFinalizer(iter, (*dbIter).Release) @@ -107,13 +111,14 @@ const ( // dbIter represent an interator states over a database session. type dbIter struct { - db *DB - icmp *iComparer - iter iterator.Iterator - seq uint64 - strict bool - - smaplingGap int + db *DB + icmp *iComparer + iter iterator.Iterator + seq uint64 + strict bool + disableSampling bool + + samplingGap int dir dir key []byte value []byte @@ -122,10 +127,14 @@ type dbIter struct { } func (i *dbIter) sampleSeek() { + if i.disableSampling { + return + } + ikey := i.iter.Key() - i.smaplingGap -= len(ikey) + len(i.iter.Value()) - for i.smaplingGap < 0 { - i.smaplingGap += i.db.iterSamplingRate() + i.samplingGap -= len(ikey) + len(i.iter.Value()) + for i.samplingGap < 0 { + i.samplingGap += i.db.iterSamplingRate() i.db.sampleSeek(ikey) } } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go index 2c69d2e..c2ad70c 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go @@ -142,6 +142,10 @@ func (snap *Snapshot) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) // DB. And a nil Range.Limit is treated as a key after all keys in // the DB. // +// WARNING: Any slice returned by interator (e.g. slice returned by calling +// Iterator.Key() or Iterator.Value() methods), its content should not be +// modified unless noted otherwise. +// // The iterator must be released after use, by calling Release method. // Releasing the snapshot doesn't mean releasing the iterator too, the // iterator would be still valid until released. diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go index b8f7e7d..21d1e51 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go @@ -69,6 +69,13 @@ func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) { // DB. And a nil Range.Limit is treated as a key after all keys in // the DB. // +// The returned iterator has locks on its own resources, so it can live beyond +// the lifetime of the transaction who creates them. +// +// WARNING: Any slice returned by interator (e.g. slice returned by calling +// Iterator.Key() or Iterator.Key() methods), its content should not be modified +// unless noted otherwise. +// // The iterator must be released after use, by calling Release method. // // Also read Iterator documentation of the leveldb/iterator package. @@ -205,7 +212,7 @@ func (tr *Transaction) Commit() error { tr.stats.startTimer() var cerr error for retry := 0; retry < 3; retry++ { - cerr = tr.db.s.commit(&tr.rec) + cerr = tr.db.s.commit(&tr.rec, false) if cerr != nil { tr.db.logf("transaction@commit error R·%d %q", retry, cerr) select { @@ -248,13 +255,14 @@ func (tr *Transaction) discard() { // Discard transaction. for _, t := range tr.tables { tr.db.logf("transaction@discard @%d", t.fd.Num) - if err1 := tr.db.s.stor.Remove(t.fd); err1 == nil { - tr.db.s.reuseFileNum(t.fd.Num) - } + // Iterator may still use the table, so we use tOps.remove here. + tr.db.s.tops.remove(t.fd) } } // Discard discards the transaction. +// This method is noop if transaction is already closed (either committed or +// discarded) // // Other methods should not be called after transaction has been discarded. func (tr *Transaction) Discard() { @@ -278,8 +286,10 @@ func (db *DB) waitCompaction() error { // until in-flight transaction is committed or discarded. // The returned transaction handle is safe for concurrent use. // -// Transaction is expensive and can overwhelm compaction, especially if +// Transaction is very expensive and can overwhelm compaction, especially if // transaction size is small. Use with caution. +// The rule of thumb is if you need to merge at least same amount of +// `Options.WriteBuffer` worth of data then use transaction, otherwise don't. // // The transaction must be closed once done, either by committing or discarding // the transaction. diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/filter/bloom.go b/vendor/github.com/syndtr/goleveldb/leveldb/filter/bloom.go index bab0e99..56ccbfb 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/filter/bloom.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/filter/bloom.go @@ -16,7 +16,7 @@ func bloomHash(key []byte) uint32 { type bloomFilter int -// The bloom filter serializes its parameters and is backward compatible +// Name: The bloom filter serializes its parameters and is backward compatible // with respect to them. Therefor, its parameters are not added to its // name. func (bloomFilter) Name() string { diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go b/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go index b661c08..824e47f 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go @@ -397,6 +397,10 @@ func (p *DB) Find(key []byte) (rkey, value []byte, err error) { // DB. And a nil Range.Limit is treated as a key after all keys in // the DB. // +// WARNING: Any slice returned by interator (e.g. slice returned by calling +// Iterator.Key() or Iterator.Key() methods), its content should not be modified +// unless noted otherwise. +// // The iterator must be released after use, by calling Release method. // // Also read Iterator documentation of the leveldb/iterator package. diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go b/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go index 528b164..c02c1e9 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go @@ -278,6 +278,14 @@ type Options struct { // The default is false. DisableLargeBatchTransaction bool + // DisableSeeksCompaction allows disabling 'seeks triggered compaction'. + // The purpose of 'seeks triggered compaction' is to optimize database so + // that 'level seeks' can be minimized, however this might generate many + // small compaction which may not preferable. + // + // The default is false. + DisableSeeksCompaction bool + // ErrorIfExist defines whether an error should returned if the DB already // exist. // @@ -309,6 +317,8 @@ type Options struct { // IteratorSamplingRate defines approximate gap (in bytes) between read // sampling of an iterator. The samples will be used to determine when // compaction should be triggered. + // Use negative value to disable iterator sampling. + // The iterator sampling is disabled if DisableSeeksCompaction is true. // // The default is 1MiB. IteratorSamplingRate int @@ -526,6 +536,13 @@ func (o *Options) GetDisableLargeBatchTransaction() bool { return o.DisableLargeBatchTransaction } +func (o *Options) GetDisableSeeksCompaction() bool { + if o == nil { + return false + } + return o.DisableSeeksCompaction +} + func (o *Options) GetErrorIfExist() bool { if o == nil { return false @@ -548,8 +565,10 @@ func (o *Options) GetFilter() filter.Filter { } func (o *Options) GetIteratorSamplingRate() int { - if o == nil || o.IteratorSamplingRate <= 0 { + if o == nil || o.IteratorSamplingRate == 0 { return DefaultIteratorSamplingRate + } else if o.IteratorSamplingRate < 0 { + return 0 } return o.IteratorSamplingRate } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session.go b/vendor/github.com/syndtr/goleveldb/leveldb/session.go index 3f391f9..7310209 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/session.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/session.go @@ -47,15 +47,24 @@ type session struct { o *cachedOptions icmp *iComparer tops *tOps - fileRef map[int64]int manifest *journal.Writer manifestWriter storage.Writer manifestFd storage.FileDesc - stCompPtrs []internalKey // compaction pointers; need external synchronization - stVersion *version // current version - vmu sync.Mutex + stCompPtrs []internalKey // compaction pointers; need external synchronization + stVersion *version // current version + ntVersionId int64 // next version id to assign + refCh chan *vTask + relCh chan *vTask + deltaCh chan *vDelta + abandon chan int64 + closeC chan struct{} + closeW sync.WaitGroup + vmu sync.Mutex + + // Testing fields + fileRefCh chan chan map[int64]int // channel used to pass current reference stat } // Creates new initialized session instance. @@ -68,13 +77,21 @@ func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) { return } s = &session{ - stor: newIStorage(stor), - storLock: storLock, - fileRef: make(map[int64]int), + stor: newIStorage(stor), + storLock: storLock, + refCh: make(chan *vTask), + relCh: make(chan *vTask), + deltaCh: make(chan *vDelta), + abandon: make(chan int64), + fileRefCh: make(chan chan map[int64]int), + closeC: make(chan struct{}), } s.setOptions(o) s.tops = newTableOps(s) - s.setVersion(newVersion(s)) + + s.closeW.Add(1) + go s.refLoop() + s.setVersion(nil, newVersion(s)) s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed") return } @@ -90,7 +107,11 @@ func (s *session) close() { } s.manifest = nil s.manifestWriter = nil - s.setVersion(&version{s: s, closing: true}) + s.setVersion(nil, &version{s: s, closing: true, id: s.ntVersionId}) + + // Close all background goroutines + close(s.closeC) + s.closeW.Wait() } // Release session lock. @@ -180,19 +201,27 @@ func (s *session) recover() (err error) { } s.manifestFd = fd - s.setVersion(staging.finish()) + s.setVersion(rec, staging.finish(false)) s.setNextFileNum(rec.nextFileNum) s.recordCommited(rec) return nil } // Commit session; need external synchronization. -func (s *session) commit(r *sessionRecord) (err error) { +func (s *session) commit(r *sessionRecord, trivial bool) (err error) { v := s.version() defer v.release() // spawn new version based on current version - nv := v.spawn(r) + nv := v.spawn(r, trivial) + + // abandon useless version id to prevent blocking version processing loop. + defer func() { + if err != nil { + s.abandon <- nv.id + s.logf("commit@abandon useless vid D%d", nv.id) + } + }() if s.manifest == nil { // manifest journal writer not yet created, create one @@ -203,7 +232,7 @@ func (s *session) commit(r *sessionRecord) (err error) { // finally, apply new version if no error rise if err == nil { - s.setVersion(nv) + s.setVersion(r, nv) } return diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go index 089cd00..4c1d336 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go @@ -14,6 +14,13 @@ import ( "github.com/syndtr/goleveldb/leveldb/opt" ) +const ( + undefinedCompaction = iota + level0Compaction + nonLevel0Compaction + seekCompaction +) + func (s *session) pickMemdbLevel(umin, umax []byte, maxLevel int) int { v := s.version() defer v.release() @@ -50,6 +57,7 @@ func (s *session) pickCompaction() *compaction { var sourceLevel int var t0 tFiles + var typ int if v.cScore >= 1 { sourceLevel = v.cLevel cptr := s.getCompPtr(sourceLevel) @@ -63,18 +71,24 @@ func (s *session) pickCompaction() *compaction { if len(t0) == 0 { t0 = append(t0, tables[0]) } + if sourceLevel == 0 { + typ = level0Compaction + } else { + typ = nonLevel0Compaction + } } else { if p := atomic.LoadPointer(&v.cSeek); p != nil { ts := (*tSet)(p) sourceLevel = ts.level t0 = append(t0, ts.table) + typ = seekCompaction } else { v.release() return nil } } - return newCompaction(s, v, sourceLevel, t0) + return newCompaction(s, v, sourceLevel, t0, typ) } // Create compaction from given level and range; need external synchronization. @@ -109,13 +123,18 @@ func (s *session) getCompactionRange(sourceLevel int, umin, umax []byte, noLimit } } - return newCompaction(s, v, sourceLevel, t0) + typ := level0Compaction + if sourceLevel != 0 { + typ = nonLevel0Compaction + } + return newCompaction(s, v, sourceLevel, t0, typ) } -func newCompaction(s *session, v *version, sourceLevel int, t0 tFiles) *compaction { +func newCompaction(s *session, v *version, sourceLevel int, t0 tFiles, typ int) *compaction { c := &compaction{ s: s, v: v, + typ: typ, sourceLevel: sourceLevel, levels: [2]tFiles{t0, nil}, maxGPOverlaps: int64(s.o.GetCompactionGPOverlaps(sourceLevel)), @@ -131,6 +150,7 @@ type compaction struct { s *session v *version + typ int sourceLevel int levels [2]tFiles maxGPOverlaps int64 @@ -181,10 +201,14 @@ func (c *compaction) expand() { t0, t1 := c.levels[0], c.levels[1] imin, imax := t0.getRange(c.s.icmp) - // We expand t0 here just incase ukey hop across tables. - t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0) - if len(t0) != len(c.levels[0]) { - imin, imax = t0.getRange(c.s.icmp) + + // For non-zero levels, the ukey can't hop across tables at all. + if c.sourceLevel == 0 { + // We expand t0 here just incase ukey hop across tables. + t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0) + if len(t0) != len(c.levels[0]) { + imin, imax = t0.getRange(c.s.icmp) + } } t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false) // Get entire range covered by compaction. diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go b/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go index 40cb2cf..fc56b63 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go @@ -9,6 +9,7 @@ package leveldb import ( "fmt" "sync/atomic" + "time" "github.com/syndtr/goleveldb/leveldb/journal" "github.com/syndtr/goleveldb/leveldb/storage" @@ -39,19 +40,213 @@ func (s *session) newTemp() storage.FileDesc { return storage.FileDesc{Type: storage.TypeTemp, Num: num} } -func (s *session) addFileRef(fd storage.FileDesc, ref int) int { - ref += s.fileRef[fd.Num] - if ref > 0 { - s.fileRef[fd.Num] = ref - } else if ref == 0 { - delete(s.fileRef, fd.Num) - } else { - panic(fmt.Sprintf("negative ref: %v", fd)) - } - return ref +// Session state. + +const ( + // maxCachedNumber represents the maximum number of version tasks + // that can be cached in the ref loop. + maxCachedNumber = 256 + + // maxCachedTime represents the maximum time for ref loop to cache + // a version task. + maxCachedTime = 5 * time.Minute +) + +// vDelta indicates the change information between the next version +// and the currently specified version +type vDelta struct { + vid int64 + added []int64 + deleted []int64 } -// Session state. +// vTask defines a version task for either reference or release. +type vTask struct { + vid int64 + files []tFiles + created time.Time +} + +func (s *session) refLoop() { + var ( + fileRef = make(map[int64]int) // Table file reference counter + ref = make(map[int64]*vTask) // Current referencing version store + deltas = make(map[int64]*vDelta) + referenced = make(map[int64]struct{}) + released = make(map[int64]*vDelta) // Released version that waiting for processing + abandoned = make(map[int64]struct{}) // Abandoned version id + next, last int64 + ) + // addFileRef adds file reference counter with specified file number and + // reference value + addFileRef := func(fnum int64, ref int) int { + ref += fileRef[fnum] + if ref > 0 { + fileRef[fnum] = ref + } else if ref == 0 { + delete(fileRef, fnum) + } else { + panic(fmt.Sprintf("negative ref: %v", fnum)) + } + return ref + } + // skipAbandoned skips useless abandoned version id. + skipAbandoned := func() bool { + if _, exist := abandoned[next]; exist { + delete(abandoned, next) + return true + } + return false + } + // applyDelta applies version change to current file reference. + applyDelta := func(d *vDelta) { + for _, t := range d.added { + addFileRef(t, 1) + } + for _, t := range d.deleted { + if addFileRef(t, -1) == 0 { + s.tops.remove(storage.FileDesc{Type: storage.TypeTable, Num: t}) + } + } + } + + timer := time.NewTimer(0) + <-timer.C // discard the initial tick + defer timer.Stop() + + // processTasks processes version tasks in strict order. + // + // If we want to use delta to reduce the cost of file references and dereferences, + // we must strictly follow the id of the version, otherwise some files that are + // being referenced will be deleted. + // + // In addition, some db operations (such as iterators) may cause a version to be + // referenced for a long time. In order to prevent such operations from blocking + // the entire processing queue, we will properly convert some of the version tasks + // into full file references and releases. + processTasks := func() { + timer.Reset(maxCachedTime) + // Make sure we don't cache too many version tasks. + for { + // Skip any abandoned version number to prevent blocking processing. + if skipAbandoned() { + next += 1 + continue + } + // Don't bother the version that has been released. + if _, exist := released[next]; exist { + break + } + // Ensure the specified version has been referenced. + if _, exist := ref[next]; !exist { + break + } + if last-next < maxCachedNumber && time.Since(ref[next].created) < maxCachedTime { + break + } + // Convert version task into full file references and releases mode. + // Reference version(i+1) first and wait version(i) to release. + // FileRef(i+1) = FileRef(i) + Delta(i) + for _, tt := range ref[next].files { + for _, t := range tt { + addFileRef(t.fd.Num, 1) + } + } + // Note, if some compactions take a long time, even more than 5 minutes, + // we may miss the corresponding delta information here. + // Fortunately it will not affect the correctness of the file reference, + // and we can apply the delta once we receive it. + if d := deltas[next]; d != nil { + applyDelta(d) + } + referenced[next] = struct{}{} + delete(ref, next) + delete(deltas, next) + next += 1 + } + + // Use delta information to process all released versions. + for { + if skipAbandoned() { + next += 1 + continue + } + if d, exist := released[next]; exist { + if d != nil { + applyDelta(d) + } + delete(released, next) + next += 1 + continue + } + return + } + } + + for { + processTasks() + + select { + case t := <-s.refCh: + if _, exist := ref[t.vid]; exist { + panic("duplicate reference request") + } + ref[t.vid] = t + if t.vid > last { + last = t.vid + } + + case d := <-s.deltaCh: + if _, exist := ref[d.vid]; !exist { + if _, exist2 := referenced[d.vid]; !exist2 { + panic("invalid release request") + } + // The reference opt is already expired, apply + // delta here. + applyDelta(d) + continue + } + deltas[d.vid] = d + + case t := <-s.relCh: + if _, exist := referenced[t.vid]; exist { + for _, tt := range t.files { + for _, t := range tt { + if addFileRef(t.fd.Num, -1) == 0 { + s.tops.remove(t.fd) + } + } + } + delete(referenced, t.vid) + continue + } + if _, exist := ref[t.vid]; !exist { + panic("invalid release request") + } + released[t.vid] = deltas[t.vid] + delete(deltas, t.vid) + delete(ref, t.vid) + + case id := <-s.abandon: + if id >= next { + abandoned[id] = struct{}{} + } + + case <-timer.C: + + case r := <-s.fileRefCh: + ref := make(map[int64]int) + for f, c := range fileRef { + ref[f] = c + } + r <- ref + + case <-s.closeC: + s.closeW.Done() + return + } + } +} // Get current version. This will incr version ref, must call // version.release (exactly once) after use. @@ -69,13 +264,30 @@ func (s *session) tLen(level int) int { } // Set current version to v. -func (s *session) setVersion(v *version) { +func (s *session) setVersion(r *sessionRecord, v *version) { s.vmu.Lock() defer s.vmu.Unlock() // Hold by session. It is important to call this first before releasing // current version, otherwise the still used files might get released. v.incref() if s.stVersion != nil { + if r != nil { + var ( + added = make([]int64, 0, len(r.addedTables)) + deleted = make([]int64, 0, len(r.deletedTables)) + ) + for _, t := range r.addedTables { + added = append(added, t.num) + } + for _, t := range r.deletedTables { + deleted = append(deleted, t.num) + } + select { + case s.deltaCh <- &vDelta{vid: s.stVersion.id, added: added, deleted: deleted}: + case <-v.s.closeC: + s.log("reference loop already exist") + } + } // Release current version. s.stVersion.releaseNB() } @@ -96,7 +308,7 @@ func (s *session) setNextFileNum(num int64) { func (s *session) markFileNum(num int64) { nextFileNum := num + 1 for { - old, x := s.stNextFileNum, nextFileNum + old, x := atomic.LoadInt64(&s.stNextFileNum), nextFileNum if old > x { x = old } @@ -114,7 +326,7 @@ func (s *session) allocFileNum() int64 { // Reuse given file number. func (s *session) reuseFileNum(num int64) { for { - old, x := s.stNextFileNum, num + old, x := atomic.LoadInt64(&s.stNextFileNum), num if old != x+1 { x = old } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/table.go b/vendor/github.com/syndtr/goleveldb/leveldb/table.go index 1fac60d..b7759b2 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/table.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/table.go @@ -7,6 +7,7 @@ package leveldb import ( + "bytes" "fmt" "sort" "sync/atomic" @@ -150,6 +151,30 @@ func (tf tFiles) searchMax(icmp *iComparer, ikey internalKey) int { }) } +// Searches smallest index of tables whose its file number +// is smaller than the given number. +func (tf tFiles) searchNumLess(num int64) int { + return sort.Search(len(tf), func(i int) bool { + return tf[i].fd.Num < num + }) +} + +// Searches smallest index of tables whose its smallest +// key is after the given key. +func (tf tFiles) searchMinUkey(icmp *iComparer, umin []byte) int { + return sort.Search(len(tf), func(i int) bool { + return icmp.ucmp.Compare(tf[i].imin.ukey(), umin) > 0 + }) +} + +// Searches smallest index of tables whose its largest +// key is after the given key. +func (tf tFiles) searchMaxUkey(icmp *iComparer, umax []byte) int { + return sort.Search(len(tf), func(i int) bool { + return icmp.ucmp.Compare(tf[i].imax.ukey(), umax) > 0 + }) +} + // Returns true if given key range overlaps with one or more // tables key range. If unsorted is true then binary search will not be used. func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool { @@ -181,6 +206,50 @@ func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) boo // expanded. // The dst content will be overwritten. func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles { + // Short circuit if tf is empty + if len(tf) == 0 { + return nil + } + // For non-zero levels, there is no ukey hop across at all. + // And what's more, the files in these levels are strictly sorted, + // so use binary search instead of heavy traverse. + if !overlapped { + var begin, end int + // Determine the begin index of the overlapped file + if umin != nil { + index := tf.searchMinUkey(icmp, umin) + if index == 0 { + begin = 0 + } else if bytes.Compare(tf[index-1].imax.ukey(), umin) >= 0 { + // The min ukey overlaps with the index-1 file, expand it. + begin = index - 1 + } else { + begin = index + } + } + // Determine the end index of the overlapped file + if umax != nil { + index := tf.searchMaxUkey(icmp, umax) + if index == len(tf) { + end = len(tf) + } else if bytes.Compare(tf[index].imin.ukey(), umax) <= 0 { + // The max ukey overlaps with the index file, expand it. + end = index + 1 + } else { + end = index + } + } else { + end = len(tf) + } + // Ensure the overlapped file indexes are valid. + if begin >= end { + return nil + } + dst = make([]*tFile, end-begin) + copy(dst, tf[begin:end]) + return dst + } + dst = dst[:0] for i := 0; i < len(tf); { t := tf[i] @@ -193,11 +262,9 @@ func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, ove } else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 { umax = t.imax.ukey() // Restart search if it is overlapped. - if overlapped { - dst = dst[:0] - i = 0 - continue - } + dst = dst[:0] + i = 0 + continue } dst = append(dst, t) @@ -416,16 +483,18 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite // Removes table from persistent storage. It waits until // no one use the the table. -func (t *tOps) remove(f *tFile) { - t.cache.Delete(0, uint64(f.fd.Num), func() { - if err := t.s.stor.Remove(f.fd); err != nil { - t.s.logf("table@remove removing @%d %q", f.fd.Num, err) +func (t *tOps) remove(fd storage.FileDesc) { + t.cache.Delete(0, uint64(fd.Num), func() { + if err := t.s.stor.Remove(fd); err != nil { + t.s.logf("table@remove removing @%d %q", fd.Num, err) } else { - t.s.logf("table@remove removed @%d", f.fd.Num) + t.s.logf("table@remove removed @%d", fd.Num) } if t.evictRemoved && t.bcache != nil { - t.bcache.EvictNS(uint64(f.fd.Num)) + t.bcache.EvictNS(uint64(fd.Num)) } + // Try to reuse file num, useful for discarded transaction. + t.s.reuseFileNum(fd.Num) }) } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go b/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go index 16cfbaa..496feb6 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go @@ -787,6 +787,10 @@ func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChe // table. And a nil Range.Limit is treated as a key after all keys in // the table. // +// WARNING: Any slice returned by interator (e.g. slice returned by calling +// Iterator.Key() or Iterator.Key() methods), its content should not be modified +// unless noted otherwise. +// // The returned iterator is not safe for concurrent use and should be released // after use. // diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/version.go b/vendor/github.com/syndtr/goleveldb/leveldb/version.go index 73f272a..9535e35 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/version.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/version.go @@ -9,6 +9,7 @@ package leveldb import ( "fmt" "sync/atomic" + "time" "unsafe" "github.com/syndtr/goleveldb/leveldb/iterator" @@ -22,7 +23,8 @@ type tSet struct { } type version struct { - s *session + id int64 // unique monotonous increasing version id + s *session levels []tFiles @@ -39,8 +41,11 @@ type version struct { released bool } +// newVersion creates a new version with an unique monotonous increasing id. func newVersion(s *session) *version { - return &version{s: s} + id := atomic.AddInt64(&s.ntVersionId, 1) + nv := &version{s: s, id: id - 1} + return nv } func (v *version) incref() { @@ -50,11 +55,11 @@ func (v *version) incref() { v.ref++ if v.ref == 1 { - // Incr file ref. - for _, tt := range v.levels { - for _, t := range tt { - v.s.addFileRef(t.fd, 1) - } + select { + case v.s.refCh <- &vTask{vid: v.id, files: v.levels, created: time.Now()}: + // We can use v.levels directly here since it is immutable. + case <-v.s.closeC: + v.s.log("reference loop already exist") } } } @@ -66,13 +71,11 @@ func (v *version) releaseNB() { } else if v.ref < 0 { panic("negative version ref") } - - for _, tt := range v.levels { - for _, t := range tt { - if v.s.addFileRef(t.fd, -1) == 0 { - v.s.tops.remove(t) - } - } + select { + case v.s.relCh <- &vTask{vid: v.id, files: v.levels, created: time.Now()}: + // We can use v.levels directly here since it is immutable. + case <-v.s.closeC: + v.s.log("reference loop already exist") } v.released = true @@ -141,6 +144,7 @@ func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue } ukey := ikey.ukey() + sampleSeeks := !v.s.o.GetDisableSeeksCompaction() var ( tset *tSet @@ -158,7 +162,7 @@ func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue // Since entries never hop across level, finding key/value // in smaller level make later levels irrelevant. v.walkOverlapping(aux, ikey, func(level int, t *tFile) bool { - if level >= 0 && !tseek { + if sampleSeeks && level >= 0 && !tseek { if tset == nil { tset = &tSet{level, t} } else { @@ -273,10 +277,10 @@ func (v *version) newStaging() *versionStaging { } // Spawn a new version based on this version. -func (v *version) spawn(r *sessionRecord) *version { +func (v *version) spawn(r *sessionRecord, trivial bool) *version { staging := v.newStaging() staging.commit(r) - return staging.finish() + return staging.finish(trivial) } func (v *version) fillRecord(r *sessionRecord) { @@ -446,7 +450,7 @@ func (p *versionStaging) commit(r *sessionRecord) { } } -func (p *versionStaging) finish() *version { +func (p *versionStaging) finish(trivial bool) *version { // Build new version. nv := newVersion(p.base.s) numLevel := len(p.levels) @@ -463,6 +467,12 @@ func (p *versionStaging) finish() *version { if level < len(p.levels) { scratch := p.levels[level] + // Short circuit if there is no change at all. + if len(scratch.added) == 0 && len(scratch.deleted) == 0 { + nv.levels[level] = baseTabels + continue + } + var nt tFiles // Prealloc list if possible. if n := len(baseTabels) + len(scratch.added) - len(scratch.deleted); n > 0 { @@ -480,6 +490,41 @@ func (p *versionStaging) finish() *version { nt = append(nt, t) } + // Avoid resort if only files in this level are deleted + if len(scratch.added) == 0 { + nv.levels[level] = nt + continue + } + + // For normal table compaction, one compaction will only involve two levels + // of files. And the new files generated after merging the source level and + // source+1 level related files can be inserted as a whole into source+1 level + // without any overlap with the other source+1 files. + // + // When the amount of data maintained by leveldb is large, the number of files + // per level will be very large. While qsort is very inefficient for sorting + // already ordered arrays. Therefore, for the normal table compaction, we use + // binary search here to find the insert index to insert a batch of new added + // files directly instead of using qsort. + if trivial && len(scratch.added) > 0 { + added := make(tFiles, 0, len(scratch.added)) + for _, r := range scratch.added { + added = append(added, tableFileFromRecord(r)) + } + if level == 0 { + added.sortByNum() + index := nt.searchNumLess(added[len(added)-1].fd.Num) + nt = append(nt[:index], append(added, nt[index:]...)...) + } else { + added.sortByKey(p.base.s.icmp) + _, amax := added.getRange(p.base.s.icmp) + index := nt.searchMin(p.base.s.icmp, amax) + nt = append(nt[:index], append(added, nt[index:]...)...) + } + nv.levels[level] = nt + continue + } + // New tables. for _, r := range scratch.added { nt = append(nt, tableFileFromRecord(r)) |