// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> // All rights reserved. // // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. package leveldb import ( "container/list" "fmt" "io" "os" "runtime" "strings" "sync" "sync/atomic" "time" "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/journal" "github.com/syndtr/goleveldb/leveldb/memdb" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/storage" "github.com/syndtr/goleveldb/leveldb/table" "github.com/syndtr/goleveldb/leveldb/util" ) // DB is a LevelDB database. type DB struct { // Need 64-bit alignment. seq uint64 // Stats. Need 64-bit alignment. cWriteDelay int64 // The cumulative duration of write delays cWriteDelayN int32 // The cumulative number of write delays inWritePaused int32 // The indicator whether write operation is paused by compaction aliveSnaps, aliveIters int32 // Session. s *session // MemDB. memMu sync.RWMutex memPool chan *memdb.DB mem, frozenMem *memDB journal *journal.Writer journalWriter storage.Writer journalFd storage.FileDesc frozenJournalFd storage.FileDesc frozenSeq uint64 // Snapshot. snapsMu sync.Mutex snapsList *list.List // Write. batchPool sync.Pool writeMergeC chan writeMerge writeMergedC chan bool writeLockC chan struct{} writeAckC chan error writeDelay time.Duration writeDelayN int tr *Transaction // Compaction. compCommitLk sync.Mutex tcompCmdC chan cCmd tcompPauseC chan chan<- struct{} mcompCmdC chan cCmd compErrC chan error compPerErrC chan error compErrSetC chan error compWriteLocking bool compStats cStats memdbMaxLevel int // For testing. // Close. closeW sync.WaitGroup closeC chan struct{} closed uint32 closer io.Closer } func openDB(s *session) (*DB, error) { s.log("db@open opening") start := time.Now() db := &DB{ s: s, // Initial sequence seq: s.stSeqNum, // MemDB memPool: make(chan *memdb.DB, 1), // Snapshot snapsList: list.New(), // Write batchPool: sync.Pool{New: newBatch}, writeMergeC: make(chan writeMerge), writeMergedC: make(chan bool), writeLockC: make(chan struct{}, 1), writeAckC: make(chan error), // Compaction tcompCmdC: make(chan cCmd), tcompPauseC: make(chan chan<- struct{}), mcompCmdC: make(chan cCmd), compErrC: make(chan error), compPerErrC: make(chan error), compErrSetC: make(chan error), // Close closeC: make(chan struct{}), } // Read-only mode. readOnly := s.o.GetReadOnly() if readOnly { // Recover journals (read-only mode). if err := db.recoverJournalRO(); err != nil { return nil, err } } else { // Recover journals. if err := db.recoverJournal(); err != nil { return nil, err } // Remove any obsolete files. if err := db.checkAndCleanFiles(); err != nil { // Close journal. if db.journal != nil { db.journal.Close() db.journalWriter.Close() } return nil, err } } // Doesn't need to be included in the wait group. go db.compactionError() go db.mpoolDrain() if readOnly { db.SetReadOnly() } else { db.closeW.Add(2) go db.tCompaction() go db.mCompaction() // go db.jWriter() } s.logf("db@open done T·%v", time.Since(start)) runtime.SetFinalizer(db, (*DB).Close) return db, nil } // Open opens or creates a DB for the given storage. // The DB will be created if not exist, unless ErrorIfMissing is true. // Also, if ErrorIfExist is true and the DB exist Open will returns // os.ErrExist error. // // Open will return an error with type of ErrCorrupted if corruption // detected in the DB. Use errors.IsCorrupted to test whether an error is // due to corruption. Corrupted DB can be recovered with Recover function. // // The returned DB instance is safe for concurrent use. // The DB must be closed after use, by calling Close method. func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) { s, err := newSession(stor, o) if err != nil { return } defer func() { if err != nil { s.close() s.release() } }() err = s.recover() if err != nil { if !os.IsNotExist(err) || s.o.GetErrorIfMissing() || s.o.GetReadOnly() { return } err = s.create() if err != nil { return } } else if s.o.GetErrorIfExist() { err = os.ErrExist return } return openDB(s) } // OpenFile opens or creates a DB for the given path. // The DB will be created if not exist, unless ErrorIfMissing is true. // Also, if ErrorIfExist is true and the DB exist OpenFile will returns // os.ErrExist error. // // OpenFile uses standard file-system backed storage implementation as // described in the leveldb/storage package. // // OpenFile will return an error with type of ErrCorrupted if corruption // detected in the DB. Use errors.IsCorrupted to test whether an error is // due to corruption. Corrupted DB can be recovered with Recover function. // // The returned DB instance is safe for concurrent use. // The DB must be closed after use, by calling Close method. func OpenFile(path string, o *opt.Options) (db *DB, err error) { stor, err := storage.OpenFile(path, o.GetReadOnly()) if err != nil { return } db, err = Open(stor, o) if err != nil { stor.Close() } else { db.closer = stor } return } // Recover recovers and opens a DB with missing or corrupted manifest files // for the given storage. It will ignore any manifest files, valid or not. // The DB must already exist or it will returns an error. // Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. // // The returned DB instance is safe for concurrent use. // The DB must be closed after use, by calling Close method. func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) { s, err := newSession(stor, o) if err != nil { return } defer func() { if err != nil { s.close() s.release() } }() err = recoverTable(s, o) if err != nil { return } return openDB(s) } // RecoverFile recovers and opens a DB with missing or corrupted manifest files // for the given path. It will ignore any manifest files, valid or not. // The DB must already exist or it will returns an error. // Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. // // RecoverFile uses standard file-system backed storage implementation as described // in the leveldb/storage package. // // The returned DB instance is safe for concurrent use. // The DB must be closed after use, by calling Close method. func RecoverFile(path string, o *opt.Options) (db *DB, err error) { stor, err := storage.OpenFile(path, false) if err != nil { return } db, err = Recover(stor, o) if err != nil { stor.Close() } else { db.closer = stor } return } func recoverTable(s *session, o *opt.Options) error { o = dupOptions(o) // Mask StrictReader, lets StrictRecovery doing its job. o.Strict &= ^opt.StrictReader // Get all tables and sort it by file number. fds, err := s.stor.List(storage.TypeTable) if err != nil { return err } sortFds(fds) var ( maxSeq uint64 recoveredKey, goodKey, corruptedKey, corruptedBlock, droppedTable int // We will drop corrupted table. strict = o.GetStrict(opt.StrictRecovery) noSync = o.GetNoSync() rec = &sessionRecord{} bpool = util.NewBufferPool(o.GetBlockSize() + 5) ) buildTable := func(iter iterator.Iterator) (tmpFd storage.FileDesc, size int64, err error) { tmpFd = s.newTemp() writer, err := s.stor.Create(tmpFd) if err != nil { return } defer func() { writer.Close() if err != nil { s.stor.Remove(tmpFd) tmpFd = storage.FileDesc{} } }() // Copy entries. tw := table.NewWriter(writer, o) for iter.Next() { key := iter.Key() if validInternalKey(key) { err = tw.Append(key, iter.Value()) if err != nil { return } } } err = iter.Error() if err != nil && !errors.IsCorrupted(err) { return } err = tw.Close() if err != nil { return } if !noSync { err = writer.Sync() if err != nil { return } } size = int64(tw.BytesLen()) return } recoverTable := func(fd storage.FileDesc) error { s.logf("table@recovery recovering @%d", fd.Num) reader, err := s.stor.Open(fd) if err != nil { return err } var closed bool defer func() { if !closed { reader.Close() } }() // Get file size. size, err := reader.Seek(0, 2) if err != nil { return err } var ( tSeq uint64 tgoodKey, tcorruptedKey, tcorruptedBlock int imin, imax []byte ) tr, err := table.NewReader(reader, size, fd, nil, bpool, o) if err != nil { return err } iter := tr.NewIterator(nil, nil) if itererr, ok := iter.(iterator.ErrorCallbackSetter); ok { itererr.SetErrorCallback(func(err error) { if errors.IsCorrupted(err) { s.logf("table@recovery block corruption @%d %q", fd.Num, err) tcorruptedBlock++ } }) } // Scan the table. for iter.Next() { key := iter.Key() _, seq, _, kerr := parseInternalKey(key) if kerr != nil { tcorruptedKey++ continue } tgoodKey++ if seq > tSeq { tSeq = seq } if imin == nil { imin = append([]byte{}, key...) } imax = append(imax[:0], key...) } if err := iter.Error(); err != nil && !errors.IsCorrupted(err) { iter.Release() return err } iter.Release() goodKey += tgoodKey corruptedKey += tcorruptedKey corruptedBlock += tcorruptedBlock if strict && (tcorruptedKey > 0 || tcorruptedBlock > 0) { droppedTable++ s.logf("table@recovery dropped @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq) return nil } if tgoodKey > 0 { if tcorruptedKey > 0 || tcorruptedBlock > 0 { // Rebuild the table. s.logf("table@recovery rebuilding @%d", fd.Num) iter := tr.NewIterator(nil, nil) tmpFd, newSize, err := buildTable(iter) iter.Release() if err != nil { return err } closed = true reader.Close() if err := s.stor.Rename(tmpFd, fd); err != nil { return err } size = newSize } if tSeq > maxSeq { maxSeq = tSeq } recoveredKey += tgoodKey // Add table to level 0. rec.addTable(0, fd.Num, size, imin, imax) s.logf("table@recovery recovered @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq) } else { droppedTable++ s.logf("table@recovery unrecoverable @%d Ck·%d Cb·%d S·%d", fd.Num, tcorruptedKey, tcorruptedBlock, size) } return nil } // Recover all tables. if len(fds) > 0 { s.logf("table@recovery F·%d", len(fds)) // Mark file number as used. s.markFileNum(fds[len(fds)-1].Num) for _, fd := range fds { if err := recoverTable(fd); err != nil { return err } } s.logf("table@recovery recovered F·%d N·%d Gk·%d Ck·%d Q·%d", len(fds), recoveredKey, goodKey, corruptedKey, maxSeq) } // Set sequence number. rec.setSeqNum(maxSeq) // Create new manifest. if err := s.create(); err != nil { return err } // Commit. return s.commit(rec) } func (db *DB) recoverJournal() error { // Get all journals and sort it by file number. rawFds, err := db.s.stor.List(storage.TypeJournal) if err != nil { return err } sortFds(rawFds) // Journals that will be recovered. var fds []storage.FileDesc for _, fd := range rawFds { if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum { fds = append(fds, fd) } } var ( ofd storage.FileDesc // Obsolete file. rec = &sessionRecord{} ) // Recover journals. if len(fds) > 0 { db.logf("journal@recovery F·%d", len(fds)) // Mark file number as used. db.s.markFileNum(fds[len(fds)-1].Num) var ( // Options. strict = db.s.o.GetStrict(opt.StrictJournal) checksum = db.s.o.GetStrict(opt.StrictJournalChecksum) writeBuffer = db.s.o.GetWriteBuffer() jr *journal.Reader mdb = memdb.New(db.s.icmp, writeBuffer) buf = &util.Buffer{} batchSeq uint64 batchLen int ) for _, fd := range fds { db.logf("journal@recovery recovering @%d", fd.Num) fr, err := db.s.stor.Open(fd) if err != nil { return err } // Create or reset journal reader instance. if jr == nil { jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum) } else { jr.Reset(fr, dropper{db.s, fd}, strict, checksum) } // Flush memdb and remove obsolete journal file. if !ofd.Zero() { if mdb.Len() > 0 { if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil { fr.Close() return err } } rec.setJournalNum(fd.Num) rec.setSeqNum(db.seq) if err := db.s.commit(rec); err != nil { fr.Close() return err } rec.resetAddedTables() db.s.stor.Remove(ofd) ofd = storage.FileDesc{} } // Replay journal to memdb. mdb.Reset() for { r, err := jr.Next() if err != nil { if err == io.EOF { break } fr.Close() return errors.SetFd(err, fd) } buf.Reset() if _, err := buf.ReadFrom(r); err != nil { if err == io.ErrUnexpectedEOF { // This is error returned due to corruption, with strict == false. continue } fr.Close() return errors.SetFd(err, fd) } batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb) if err != nil { if !strict && errors.IsCorrupted(err) { db.s.logf("journal error: %v (skipped)", err) // We won't apply sequence number as it might be corrupted. continue } fr.Close() return errors.SetFd(err, fd) } // Save sequence number. db.seq = batchSeq + uint64(batchLen) // Flush it if large enough. if mdb.Size() >= writeBuffer { if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil { fr.Close() return err } mdb.Reset() } } fr.Close() ofd = fd } // Flush the last memdb. if mdb.Len() > 0 { if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil { return err } } } // Create a new journal. if _, err := db.newMem(0); err != nil { return err } // Commit. rec.setJournalNum(db.journalFd.Num) rec.setSeqNum(db.seq) if err := db.s.commit(rec); err != nil { // Close journal on error. if db.journal != nil { db.journal.Close() db.journalWriter.Close() } return err } // Remove the last obsolete journal file. if !ofd.Zero() { db.s.stor.Remove(ofd) } return nil } func (db *DB) recoverJournalRO() error { // Get all journals and sort it by file number. rawFds, err := db.s.stor.List(storage.TypeJournal) if err != nil { return err } sortFds(rawFds) // Journals that will be recovered. var fds []storage.FileDesc for _, fd := range rawFds { if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum { fds = append(fds, fd) } } var ( // Options. strict = db.s.o.GetStrict(opt.StrictJournal) checksum = db.s.o.GetStrict(opt.StrictJournalChecksum) writeBuffer = db.s.o.GetWriteBuffer() mdb = memdb.New(db.s.icmp, writeBuffer) ) // Recover journals. if len(fds) > 0 { db.logf("journal@recovery RO·Mode F·%d", len(fds)) var ( jr *journal.Reader buf = &util.Buffer{} batchSeq uint64 batchLen int ) for _, fd := range fds { db.logf("journal@recovery recovering @%d", fd.Num) fr, err := db.s.stor.Open(fd) if err != nil { return err } // Create or reset journal reader instance. if jr == nil { jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum) } else { jr.Reset(fr, dropper{db.s, fd}, strict, checksum) } // Replay journal to memdb. for { r, err := jr.Next() if err != nil { if err == io.EOF { break } fr.Close() return errors.SetFd(err, fd) } buf.Reset() if _, err := buf.ReadFrom(r); err != nil { if err == io.ErrUnexpectedEOF { // This is error returned due to corruption, with strict == false. continue } fr.Close() return errors.SetFd(err, fd) } batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb) if err != nil { if !strict && errors.IsCorrupted(err) { db.s.logf("journal error: %v (skipped)", err) // We won't apply sequence number as it might be corrupted. continue } fr.Close() return errors.SetFd(err, fd) } // Save sequence number. db.seq = batchSeq + uint64(batchLen) } fr.Close() } } // Set memDB. db.mem = &memDB{db: db, DB: mdb, ref: 1} return nil } func memGet(mdb *memdb.DB, ikey internalKey, icmp *iComparer) (ok bool, mv []byte, err error) { mk, mv, err := mdb.Find(ikey) if err == nil { ukey, _, kt, kerr := parseInternalKey(mk) if kerr != nil { // Shouldn't have had happen. panic(kerr) } if icmp.uCompare(ukey, ikey.ukey()) == 0 { if kt == keyTypeDel { return true, nil, ErrNotFound } return true, mv, nil } } else if err != ErrNotFound { return true, nil, err } return } func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) { ikey := makeInternalKey(nil, key, seq, keyTypeSeek) if auxm != nil { if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok { return append([]byte{}, mv...), me } } em, fm := db.getMems() for _, m := range [...]*memDB{em, fm} { if m == nil { continue } defer m.decref() if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok { return append([]byte{}, mv...), me } } v := db.s.version() value, cSched, err := v.get(auxt, ikey, ro, false) v.release() if cSched { // Trigger table compaction. db.compTrigger(db.tcompCmdC) } return } func nilIfNotFound(err error) error { if err == ErrNotFound { return nil } return err } func (db *DB) has(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err error) { ikey := makeInternalKey(nil, key, seq, keyTypeSeek) if auxm != nil { if ok, _, me := memGet(auxm, ikey, db.s.icmp); ok { return me == nil, nilIfNotFound(me) } } em, fm := db.getMems() for _, m := range [...]*memDB{em, fm} { if m == nil { continue } defer m.decref() if ok, _, me := memGet(m.DB, ikey, db.s.icmp); ok { return me == nil, nilIfNotFound(me) } } v := db.s.version() _, cSched, err := v.get(auxt, ikey, ro, true) v.release() if cSched { // Trigger table compaction. db.compTrigger(db.tcompCmdC) } if err == nil { ret = true } else if err == ErrNotFound { err = nil } return } // Get gets the value for the given key. It returns ErrNotFound if the // DB does not contains the key. // // The returned slice is its own copy, it is safe to modify the contents // of the returned slice. // It is safe to modify the contents of the argument after Get returns. func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { err = db.ok() if err != nil { return } se := db.acquireSnapshot() defer db.releaseSnapshot(se) return db.get(nil, nil, key, se.seq, ro) } // Has returns true if the DB does contains the given key. // // It is safe to modify the contents of the argument after Has returns. func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) { err = db.ok() if err != nil { return } se := db.acquireSnapshot() defer db.releaseSnapshot(se) return db.has(nil, nil, key, se.seq, ro) } // NewIterator returns an iterator for the latest snapshot of the // underlying DB. // The returned iterator is not safe for concurrent use, but it is safe to use // multiple iterators concurrently, with each in a dedicated goroutine. // It is also safe to use an iterator concurrently with modifying its // underlying DB. The resultant key/value pairs are guaranteed to be // consistent. // // Slice allows slicing the iterator to only contains keys in the given // range. A nil Range.Start is treated as a key before all keys in the // DB. And a nil Range.Limit is treated as a key after all keys in // the DB. // // WARNING: Any slice returned by interator (e.g. slice returned by calling // Iterator.Key() or Iterator.Key() methods), its content should not be modified // unless noted otherwise. // // The iterator must be released after use, by calling Release method. // // Also read Iterator documentation of the leveldb/iterator package. func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { if err := db.ok(); err != nil { return iterator.NewEmptyIterator(err) } se := db.acquireSnapshot() defer db.releaseSnapshot(se) // Iterator holds 'version' lock, 'version' is immutable so snapshot // can be released after iterator created. return db.newIterator(nil, nil, se.seq, slice, ro) } // GetSnapshot returns a latest snapshot of the underlying DB. A snapshot // is a frozen snapshot of a DB state at a particular point in time. The // content of snapshot are guaranteed to be consistent. // // The snapshot must be released after use, by calling Release method. func (db *DB) GetSnapshot() (*Snapshot, error) { if err := db.ok(); err != nil { return nil, err } return db.newSnapshot(), nil } // GetProperty returns value of the given property name. // // Property names: // leveldb.num-files-at-level{n} // Returns the number of files at level 'n'. // leveldb.stats // Returns statistics of the underlying DB. // leveldb.iostats // Returns statistics of effective disk read and write. // leveldb.writedelay // Returns cumulative write delay caused by compaction. // leveldb.sstables // Returns sstables list for each level. // leveldb.blockpool // Returns block pool stats. // leveldb.cachedblock // Returns size of cached block. // leveldb.openedtables // Returns number of opened tables. // leveldb.alivesnaps // Returns number of alive snapshots. // leveldb.aliveiters // Returns number of alive iterators. func (db *DB) GetProperty(name string) (value string, err error) { err = db.ok() if err != nil { return } const prefix = "leveldb." if !strings.HasPrefix(name, prefix) { return "", ErrNotFound } p := name[len(prefix):] v := db.s.version() defer v.release() numFilesPrefix := "num-files-at-level" switch { case strings.HasPrefix(p, numFilesPrefix): var level uint var rest string n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest) if n != 1 { err = ErrNotFound } else { value = fmt.Sprint(v.tLen(int(level))) } case p == "stats": value = "Compactions\n" + " Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" + "-------+------------+---------------+---------------+---------------+---------------\n" for level, tables := range v.levels { duration, read, write := db.compStats.getStat(level) if len(tables) == 0 && duration == 0 { continue } value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n", level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(), float64(read)/1048576.0, float64(write)/1048576.0) } case p == "iostats": value = fmt.Sprintf("Read(MB):%.5f Write(MB):%.5f", float64(db.s.stor.reads())/1048576.0, float64(db.s.stor.writes())/1048576.0) case p == "writedelay": writeDelayN, writeDelay := atomic.LoadInt32(&db.cWriteDelayN), time.Duration(atomic.LoadInt64(&db.cWriteDelay)) paused := atomic.LoadInt32(&db.inWritePaused) == 1 value = fmt.Sprintf("DelayN:%d Delay:%s Paused:%t", writeDelayN, writeDelay, paused) case p == "sstables": for level, tables := range v.levels { value += fmt.Sprintf("--- level %d ---\n", level) for _, t := range tables { value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.fd.Num, t.size, t.imin, t.imax) } } case p == "blockpool": value = fmt.Sprintf("%v", db.s.tops.bpool) case p == "cachedblock": if db.s.tops.bcache != nil { value = fmt.Sprintf("%d", db.s.tops.bcache.Size()) } else { value = "<nil>" } case p == "openedtables": value = fmt.Sprintf("%d", db.s.tops.cache.Size()) case p == "alivesnaps": value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps)) case p == "aliveiters": value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters)) default: err = ErrNotFound } return } // DBStats is database statistics. type DBStats struct { WriteDelayCount int32 WriteDelayDuration time.Duration WritePaused bool AliveSnapshots int32 AliveIterators int32 IOWrite uint64 IORead uint64 BlockCacheSize int OpenedTablesCount int LevelSizes []int64 LevelTablesCounts []int LevelRead []int64 LevelWrite []int64 LevelDurations []time.Duration } // Stats populates s with database statistics. func (db *DB) Stats(s *DBStats) error { err := db.ok() if err != nil { return err } s.IORead = db.s.stor.reads() s.IOWrite = db.s.stor.writes() s.WriteDelayCount = atomic.LoadInt32(&db.cWriteDelayN) s.WriteDelayDuration = time.Duration(atomic.LoadInt64(&db.cWriteDelay)) s.WritePaused = atomic.LoadInt32(&db.inWritePaused) == 1 s.OpenedTablesCount = db.s.tops.cache.Size() if db.s.tops.bcache != nil { s.BlockCacheSize = db.s.tops.bcache.Size() } else { s.BlockCacheSize = 0 } s.AliveIterators = atomic.LoadInt32(&db.aliveIters) s.AliveSnapshots = atomic.LoadInt32(&db.aliveSnaps) s.LevelDurations = s.LevelDurations[:0] s.LevelRead = s.LevelRead[:0] s.LevelWrite = s.LevelWrite[:0] s.LevelSizes = s.LevelSizes[:0] s.LevelTablesCounts = s.LevelTablesCounts[:0] v := db.s.version() defer v.release() for level, tables := range v.levels { duration, read, write := db.compStats.getStat(level) if len(tables) == 0 && duration == 0 { continue } s.LevelDurations = append(s.LevelDurations, duration) s.LevelRead = append(s.LevelRead, read) s.LevelWrite = append(s.LevelWrite, write) s.LevelSizes = append(s.LevelSizes, tables.size()) s.LevelTablesCounts = append(s.LevelTablesCounts, len(tables)) } return nil } // SizeOf calculates approximate sizes of the given key ranges. // The length of the returned sizes are equal with the length of the given // ranges. The returned sizes measure storage space usage, so if the user // data compresses by a factor of ten, the returned sizes will be one-tenth // the size of the corresponding user data size. // The results may not include the sizes of recently written data. func (db *DB) SizeOf(ranges []util.Range) (Sizes, error) { if err := db.ok(); err != nil { return nil, err } v := db.s.version() defer v.release() sizes := make(Sizes, 0, len(ranges)) for _, r := range ranges { imin := makeInternalKey(nil, r.Start, keyMaxSeq, keyTypeSeek) imax := makeInternalKey(nil, r.Limit, keyMaxSeq, keyTypeSeek) start, err := v.offsetOf(imin) if err != nil { return nil, err } limit, err := v.offsetOf(imax) if err != nil { return nil, err } var size int64 if limit >= start { size = limit - start } sizes = append(sizes, size) } return sizes, nil } // Close closes the DB. This will also releases any outstanding snapshot, // abort any in-flight compaction and discard open transaction. // // It is not safe to close a DB until all outstanding iterators are released. // It is valid to call Close multiple times. Other methods should not be // called after the DB has been closed. func (db *DB) Close() error { if !db.setClosed() { return ErrClosed } start := time.Now() db.log("db@close closing") // Clear the finalizer. runtime.SetFinalizer(db, nil) // Get compaction error. var err error select { case err = <-db.compErrC: if err == ErrReadOnly { err = nil } default: } // Signal all goroutines. close(db.closeC) // Discard open transaction. if db.tr != nil { db.tr.Discard() } // Acquire writer lock. db.writeLockC <- struct{}{} // Wait for all gorotines to exit. db.closeW.Wait() // Closes journal. if db.journal != nil { db.journal.Close() db.journalWriter.Close() db.journal = nil db.journalWriter = nil } if db.writeDelayN > 0 { db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay) } // Close session. db.s.close() db.logf("db@close done T·%v", time.Since(start)) db.s.release() if db.closer != nil { if err1 := db.closer.Close(); err == nil { err = err1 } db.closer = nil } // Clear memdbs. db.clearMems() return err }