// Copyright (c) 2012, Suryandaru Triandana // All rights reserved. // // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. package table import ( "encoding/binary" "errors" "fmt" "io" "github.com/golang/snappy" "github.com/syndtr/goleveldb/leveldb/comparer" "github.com/syndtr/goleveldb/leveldb/filter" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/util" ) func sharedPrefixLen(a, b []byte) int { i, n := 0, len(a) if n > len(b) { n = len(b) } for i < n && a[i] == b[i] { i++ } return i } type blockWriter struct { restartInterval int buf util.Buffer nEntries int prevKey []byte restarts []uint32 scratch []byte } func (w *blockWriter) append(key, value []byte) { nShared := 0 if w.nEntries%w.restartInterval == 0 { w.restarts = append(w.restarts, uint32(w.buf.Len())) } else { nShared = sharedPrefixLen(w.prevKey, key) } n := binary.PutUvarint(w.scratch[0:], uint64(nShared)) n += binary.PutUvarint(w.scratch[n:], uint64(len(key)-nShared)) n += binary.PutUvarint(w.scratch[n:], uint64(len(value))) w.buf.Write(w.scratch[:n]) w.buf.Write(key[nShared:]) w.buf.Write(value) w.prevKey = append(w.prevKey[:0], key...) w.nEntries++ } func (w *blockWriter) finish() { // Write restarts entry. if w.nEntries == 0 { // Must have at least one restart entry. w.restarts = append(w.restarts, 0) } w.restarts = append(w.restarts, uint32(len(w.restarts))) for _, x := range w.restarts { buf4 := w.buf.Alloc(4) binary.LittleEndian.PutUint32(buf4, x) } } func (w *blockWriter) reset() { w.buf.Reset() w.nEntries = 0 w.restarts = w.restarts[:0] } func (w *blockWriter) bytesLen() int { restartsLen := len(w.restarts) if restartsLen == 0 { restartsLen = 1 } return w.buf.Len() + 4*restartsLen + 4 } type filterWriter struct { generator filter.FilterGenerator buf util.Buffer nKeys int offsets []uint32 } func (w *filterWriter) add(key []byte) { if w.generator == nil { return } w.generator.Add(key) w.nKeys++ } func (w *filterWriter) flush(offset uint64) { if w.generator == nil { return } for x := int(offset / filterBase); x > len(w.offsets); { w.generate() } } func (w *filterWriter) finish() { if w.generator == nil { return } // Generate last keys. if w.nKeys > 0 { w.generate() } w.offsets = append(w.offsets, uint32(w.buf.Len())) for _, x := range w.offsets { buf4 := w.buf.Alloc(4) binary.LittleEndian.PutUint32(buf4, x) } w.buf.WriteByte(filterBaseLg) } func (w *filterWriter) generate() { // Record offset. w.offsets = append(w.offsets, uint32(w.buf.Len())) // Generate filters. if w.nKeys > 0 { w.generator.Generate(&w.buf) w.nKeys = 0 } } // Writer is a table writer. type Writer struct { writer io.Writer err error // Options cmp comparer.Comparer filter filter.Filter compression opt.Compression blockSize int dataBlock blockWriter indexBlock blockWriter filterBlock filterWriter pendingBH blockHandle offset uint64 nEntries int // Scratch allocated enough for 5 uvarint. Block writer should not use // first 20-bytes since it will be used to encode block handle, which // then passed to the block writer itself. scratch [50]byte comparerScratch []byte compressionScratch []byte } func (w *Writer) writeBlock(buf *util.Buffer, compression opt.Compression) (bh blockHandle, err error) { // Compress the buffer if necessary. var b []byte if compression == opt.SnappyCompression { // Allocate scratch enough for compression and block trailer. if n := snappy.MaxEncodedLen(buf.Len()) + blockTrailerLen; len(w.compressionScratch) < n { w.compressionScratch = make([]byte, n) } compressed := snappy.Encode(w.compressionScratch, buf.Bytes()) n := len(compressed) b = compressed[:n+blockTrailerLen] b[n] = blockTypeSnappyCompression } else { tmp := buf.Alloc(blockTrailerLen) tmp[0] = blockTypeNoCompression b = buf.Bytes() } // Calculate the checksum. n := len(b) - 4 checksum := util.NewCRC(b[:n]).Value() binary.LittleEndian.PutUint32(b[n:], checksum) // Write the buffer to the file. _, err = w.writer.Write(b) if err != nil { return } bh = blockHandle{w.offset, uint64(len(b) - blockTrailerLen)} w.offset += uint64(len(b)) return } func (w *Writer) flushPendingBH(key []byte) { if w.pendingBH.length == 0 { return } var separator []byte if len(key) == 0 { separator = w.cmp.Successor(w.comparerScratch[:0], w.dataBlock.prevKey) } else { separator = w.cmp.Separator(w.comparerScratch[:0], w.dataBlock.prevKey, key) } if separator == nil { separator = w.dataBlock.prevKey } else { w.comparerScratch = separator } n := encodeBlockHandle(w.scratch[:20], w.pendingBH) // Append the block handle to the index block. w.indexBlock.append(separator, w.scratch[:n]) // Reset prev key of the data block. w.dataBlock.prevKey = w.dataBlock.prevKey[:0] // Clear pending block handle. w.pendingBH = blockHandle{} } func (w *Writer) finishBlock() error { w.dataBlock.finish() bh, err := w.writeBlock(&w.dataBlock.buf, w.compression) if err != nil { return err } w.pendingBH = bh // Reset the data block. w.dataBlock.reset() // Flush the filter block. w.filterBlock.flush(w.offset) return nil } // Append appends key/value pair to the table. The keys passed must // be in increasing order. // // It is safe to modify the contents of the arguments after Append returns. func (w *Writer) Append(key, value []byte) error { if w.err != nil { return w.err } if w.nEntries > 0 && w.cmp.Compare(w.dataBlock.prevKey, key) >= 0 { w.err = fmt.Errorf("leveldb/table: Writer: keys are not in increasing order: %q, %q", w.dataBlock.prevKey, key) return w.err } w.flushPendingBH(key) // Append key/value pair to the data block. w.dataBlock.append(key, value) // Add key to the filter block. w.filterBlock.add(key) // Finish the data block if block size target reached. if w.dataBlock.bytesLen() >= w.blockSize { if err := w.finishBlock(); err != nil { w.err = err return w.err } } w.nEntries++ return nil } // BlocksLen returns number of blocks written so far. func (w *Writer) BlocksLen() int { n := w.indexBlock.nEntries if w.pendingBH.length > 0 { // Includes the pending block. n++ } return n } // EntriesLen returns number of entries added so far. func (w *Writer) EntriesLen() int { return w.nEntries } // BytesLen returns number of bytes written so far. func (w *Writer) BytesLen() int { return int(w.offset) } // Close will finalize the table. Calling Append is not possible // after Close, but calling BlocksLen, EntriesLen and BytesLen // is still possible. func (w *Writer) Close() error { if w.err != nil { return w.err } // Write the last data block. Or empty data block if there // aren't any data blocks at all. if w.dataBlock.nEntries > 0 || w.nEntries == 0 { if err := w.finishBlock(); err != nil { w.err = err return w.err } } w.flushPendingBH(nil) // Write the filter block. var filterBH blockHandle w.filterBlock.finish() if buf := &w.filterBlock.buf; buf.Len() > 0 { filterBH, w.err = w.writeBlock(buf, opt.NoCompression) if w.err != nil { return w.err } } // Write the metaindex block. if filterBH.length > 0 { key := []byte("filter." + w.filter.Name()) n := encodeBlockHandle(w.scratch[:20], filterBH) w.dataBlock.append(key, w.scratch[:n]) } w.dataBlock.finish() metaindexBH, err := w.writeBlock(&w.dataBlock.buf, w.compression) if err != nil { w.err = err return w.err } // Write the index block. w.indexBlock.finish() indexBH, err := w.writeBlock(&w.indexBlock.buf, w.compression) if err != nil { w.err = err return w.err } // Write the table footer. footer := w.scratch[:footerLen] for i := range footer { footer[i] = 0 } n := encodeBlockHandle(footer, metaindexBH) encodeBlockHandle(footer[n:], indexBH) copy(footer[footerLen-len(magic):], magic) if _, err := w.writer.Write(footer); err != nil { w.err = err return w.err } w.offset += footerLen w.err = errors.New("leveldb/table: writer is closed") return nil } // NewWriter creates a new initialized table writer for the file. // // Table writer is not safe for concurrent use. func NewWriter(f io.Writer, o *opt.Options) *Writer { w := &Writer{ writer: f, cmp: o.GetComparer(), filter: o.GetFilter(), compression: o.GetCompression(), blockSize: o.GetBlockSize(), comparerScratch: make([]byte, 0), } // data block w.dataBlock.restartInterval = o.GetBlockRestartInterval() // The first 20-bytes are used for encoding block handle. w.dataBlock.scratch = w.scratch[20:] // index block w.indexBlock.restartInterval = 1 w.indexBlock.scratch = w.scratch[20:] // filter block if w.filter != nil { w.filterBlock.generator = w.filter.NewGenerator() w.filterBlock.flush(0) } return w }