diff --git a/frac/active.go b/frac/active.go index 6e69ff0c..322c405a 100644 --- a/frac/active.go +++ b/frac/active.go @@ -2,8 +2,6 @@ package frac import ( "context" - "errors" - "fmt" "io" "math" "os" @@ -11,17 +9,18 @@ import ( "sync" "time" - "go.uber.org/atomic" - "go.uber.org/zap" - + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/conf" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/disk" "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/metric" + "github.com/ozontech/seq-db/metric/stopwatch" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/util" + "go.uber.org/zap" ) type Active struct { @@ -30,8 +29,8 @@ type Active struct { BaseFileName string useMu sync.RWMutex - sealed Fraction // derivative fraction suicided bool + released bool infoMu sync.RWMutex info *Info @@ -54,11 +53,8 @@ type Active struct { metaFile *os.File metaReader disk.DocBlocksReader - appendQueueSize atomic.Uint32 - appender ActiveAppender - - sealingMu sync.RWMutex - isSealed bool + writer *ActiveWriter + indexer *ActiveIndexer } const ( @@ -73,14 +69,14 @@ var systemSeqID = seq.ID{ func NewActive( baseFileName string, - indexWorkers *IndexWorkers, + activeIndexer *ActiveIndexer, readLimiter *disk.ReadLimiter, docsCache *cache.Cache[[]byte], sortCache *cache.Cache[[]byte], config *Config, ) *Active { - docsFile, docsStats := openFile(baseFileName+consts.DocsFileSuffix, conf.SkipFsync) - metaFile, metaStats := openFile(baseFileName+consts.MetaFileSuffix, conf.SkipFsync) + docsFile, docsStats := mustOpenFile(baseFileName+consts.DocsFileSuffix, conf.SkipFsync) + metaFile, metaStats := mustOpenFile(baseFileName+consts.MetaFileSuffix, conf.SkipFsync) f := &Active{ TokenList: NewActiveTokenList(conf.IndexWorkers), @@ -98,7 +94,8 @@ func NewActive( metaFile: metaFile, metaReader: disk.NewDocBlocksReader(readLimiter, metaFile), - appender: StartAppender(docsFile, metaFile, conf.IndexWorkers, conf.SkipFsync, indexWorkers), + indexer: activeIndexer, + writer: NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), conf.SkipFsync), BaseFileName: baseFileName, info: NewInfo(baseFileName, uint64(docsStats.Size()), uint64(metaStats.Size())), @@ -114,7 +111,7 @@ func NewActive( return f } -func openFile(name string, skipFsync bool) (*os.File, os.FileInfo) { +func mustOpenFile(name string, skipFsync bool) (*os.File, os.FileInfo) { file, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0o776) if err != nil { logger.Fatal("can't create docs file", zap.String("file", name), zap.Error(err)) @@ -132,18 +129,20 @@ func openFile(name string, skipFsync bool) (*os.File, os.FileInfo) { return file, stat } -func (f *Active) ReplayBlocks(ctx context.Context) error { +func (f *Active) Replay(ctx context.Context) error { logger.Info("start replaying...") targetSize := f.info.MetaOnDisk t := time.Now() - f.info.DocsOnDisk = 0 - f.info.MetaOnDisk = 0 docsPos := uint64(0) metaPos := uint64(0) step := targetSize / 10 next := step + + sw := stopwatch.New() + wg := sync.WaitGroup{} + out: for { select { @@ -173,23 +172,17 @@ out: } docBlockLen := disk.DocBlock(meta).GetExt1() + disk.DocBlock(meta).SetExt2(docsPos) // todo: remove this on next release + docsPos += docBlockLen metaPos += metaSize - if err := f.Replay(docBlockLen, meta); err != nil { - return err - } + wg.Add(1) + f.indexer.Index(f, meta, &wg, sw) } } - f.WaitWriteIdle() - - if _, err := f.docsFile.Seek(int64(docsPos), io.SeekStart); err != nil { - return fmt.Errorf("can't seek docs file: file=%s, err=%w", f.docsFile.Name(), err) - } - if _, err := f.metaFile.Seek(int64(metaPos), io.SeekStart); err != nil { - return fmt.Errorf("can't seek meta file: file=%s, err=%w", f.metaFile.Name(), err) - } + wg.Wait() tookSeconds := util.DurationToUnit(time.Since(t), "s") throughputRaw := util.SizeToUnit(f.info.DocsRaw, "mb") / tookSeconds @@ -205,143 +198,37 @@ out: return nil } -// Append causes data to be written on disk and sends IndexTask to index workers -// Checks the state of a faction and may return an error if the faction has already started sealing. -func (f *Active) Append(docs, metas []byte) error { - if !f.incAppendQueueSize() { - return errors.New("fraction is not writable") - } - - f.appender.In(f, docs, metas, &f.appendQueueSize) - - return nil -} - -func (f *Active) Replay(docsLen uint64, metas []byte) error { - if !f.incAppendQueueSize() { - // shouldn't actually be possible - return errors.New("replaying of fraction being sealed") +var bulkStagesSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "seq_db_store", + Subsystem: "bulk", + Name: "stages_seconds", + Buckets: metric.SecondsBuckets, +}, []string{"stage"}) + +// Append causes data to be written on disk and sends metas to index workers +func (f *Active) Append(docs, metas []byte, wg *sync.WaitGroup) (err error) { + sw := stopwatch.New() + m := sw.Start("append") + if err = f.writer.Write(docs, metas, sw); err != nil { + m.Stop() + return err } - - f.appender.InReplay(f, docsLen, metas, &f.appendQueueSize) - + f.updateDiskStats(uint64(len(docs)), uint64(len(metas))) + f.indexer.Index(f, metas, wg, sw) + m.Stop() + sw.Export(bulkStagesSeconds) return nil } -func (f *Active) incAppendQueueSize() bool { - f.sealingMu.RLock() - defer f.sealingMu.RUnlock() - - if f.isSealed { - return false - } - - f.appendQueueSize.Inc() - return true -} - -func (f *Active) WaitWriteIdle() { - for f.appendQueueSize.Load() > 0 { - time.Sleep(time.Millisecond * 10) - } -} - -func (f *Active) setSealed() error { - f.sealingMu.Lock() - defer f.sealingMu.Unlock() - - if f.isSealed { - return errors.New("fraction is already sealed") - } - f.isSealed = true - return nil -} - -func (f *Active) Seal(params SealParams) (*PreloadedData, error) { - if err := f.setSealed(); err != nil { - return nil, err - } - - logger.Info("waiting fraction to stop write...") - start := time.Now() - f.WaitWriteIdle() - logger.Info("write is stopped", zap.Float64("time_wait_s", util.DurationToUnit(time.Since(start), "s"))) - - return seal(f, params) -} - func (f *Active) GetAllDocuments() []uint32 { return f.TokenList.GetAllTokenLIDs().GetLIDs(f.MIDs, f.RIDs) } -func (f *Active) Release(sealed Fraction) error { - f.useMu.Lock() - f.sealed = sealed - f.useMu.Unlock() - - f.TokenList.Stop() - - f.RIDs = nil - f.MIDs = nil - f.TokenList = nil - f.DocsPositions = nil - - // Once frac is released, it is safe to remove the docs file, - // since search queries will use the sealed implementation. - - if !f.Config.SkipSortDocs { - if err := f.docsFile.Close(); err != nil { - return fmt.Errorf("closing docs file: %w", err) - } - rmFileName := f.BaseFileName + consts.DocsFileSuffix - if err := os.Remove(rmFileName); err != nil { - logger.Error("error removing docs file", - zap.String("file", rmFileName), - zap.Error(err)) - } - } - - f.docsCache.Release() - f.sortCache.Release() - - if !f.Config.KeepMetaFile { - rmFileName := f.BaseFileName + consts.MetaFileSuffix - if err := os.Remove(rmFileName); err != nil { - logger.Error("can't delete metas file", zap.String("file", rmFileName), zap.Error(err)) - } - } - - return nil -} - -func (f *Active) UpdateDiskStats(docsLen, metaLen uint64) uint64 { +func (f *Active) updateDiskStats(docsLen, metaLen uint64) { f.infoMu.Lock() - pos := f.info.DocsOnDisk f.info.DocsOnDisk += docsLen f.info.MetaOnDisk += metaLen f.infoMu.Unlock() - return pos -} - -func (f *Active) close(closeDocs bool, hint string) { - f.appender.Stop() - if closeDocs { - if err := f.docsFile.Close(); err != nil { - logger.Error("can't close docs file", - zap.String("frac", f.BaseFileName), - zap.String("type", "active"), - zap.String("hint", hint), - zap.Error(err)) - } - } - - if err := f.metaFile.Close(); err != nil { - logger.Error("can't close meta file", - zap.String("frac", f.BaseFileName), - zap.String("type", "active"), - zap.String("hint", hint), - zap.Error(err)) - } } func (f *Active) AppendIDs(ids []seq.ID) []uint32 { @@ -362,10 +249,6 @@ func (f *Active) AppendIDs(ids []seq.ID) []uint32 { return lidsList } -func (f *Active) AppendID(id seq.ID) uint32 { - return f.AppendIDs([]seq.ID{id})[0] -} - func (f *Active) UpdateStats(minMID, maxMID seq.MID, docCount uint32, sizeCount uint64) { f.infoMu.Lock() defer f.infoMu.Unlock() @@ -380,78 +263,27 @@ func (f *Active) UpdateStats(minMID, maxMID seq.MID, docCount uint32, sizeCount f.info.DocsRaw += sizeCount } -func (f *Active) Suicide() { // it seams we never call this method (of Active fraction) - f.useMu.Lock() - f.suicided = true - f.useMu.Unlock() - - if !f.isSealed { - f.close(true, "suicide") - - err := os.Remove(f.BaseFileName + consts.MetaFileSuffix) - if err != nil { - logger.Error("error removing file", - zap.String("file", f.BaseFileName+consts.MetaFileSuffix), - zap.Error(err), - ) - } - } - - rmPath := f.BaseFileName + consts.DocsFileSuffix - if err := os.Remove(rmPath); err != nil { - logger.Error("error removing file", - zap.String("file", rmPath), - zap.Error(err), - ) - } - - rmPath = f.BaseFileName + consts.SdocsFileSuffix - if err := os.Remove(rmPath); err != nil && !os.IsNotExist(err) { - logger.Error("error removing file", - zap.String("file", rmPath), - zap.Error(err), - ) - } - - if f.isSealed { - rmPath = f.BaseFileName + consts.IndexFileSuffix - if err := os.Remove(rmPath); err != nil { - logger.Error("error removing file", - zap.String("file", rmPath), - zap.Error(err), - ) - } - } -} - func (f *Active) String() string { - return fracToString(f, "active") + return FracToString(f, "active") } func (f *Active) DataProvider(ctx context.Context) (DataProvider, func()) { f.useMu.RLock() - if f.sealed == nil && !f.suicided && f.Info().DocsTotal > 0 { // it is ordinary active fraction state - dp := f.createDataProvider(ctx) - return dp, func() { - dp.release() - f.useMu.RUnlock() + if f.suicided || f.released || f.Info().DocsTotal == 0 { // it is empty active fraction state + if f.suicided { + metric.CountersTotal.WithLabelValues("fraction_suicided").Inc() } + f.useMu.RUnlock() + return EmptyDataProvider{}, func() {} } - defer f.useMu.RUnlock() - - if f.sealed != nil { // move on to the daughter sealed faction - dp, releaseSealed := f.sealed.DataProvider(ctx) - metric.CountersTotal.WithLabelValues("use_sealed_from_active").Inc() - return dp, releaseSealed - } - - if f.suicided { - metric.CountersTotal.WithLabelValues("fraction_suicided").Inc() + // it is ordinary active fraction state + dp := f.createDataProvider(ctx) + return dp, func() { + dp.release() + f.useMu.RUnlock() } - - return EmptyDataProvider{}, func() {} } func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider { @@ -485,3 +317,73 @@ func (f *Active) Contains(id seq.MID) bool { func (f *Active) IsIntersecting(from, to seq.MID) bool { return f.Info().IsIntersecting(from, to) } + +func (f *Active) Release() { + f.useMu.Lock() + f.released = true + f.useMu.Unlock() + + f.releaseMem() + + if !f.Config.KeepMetaFile { + f.removeMetaFile() + } + + if !f.Config.SkipSortDocs { + // we use sorted docs in sealed fraction so we can remove original docs of active fraction + f.removeDocsFiles() + } +} + +func (f *Active) Suicide() { + f.useMu.Lock() + released := f.released + f.suicided = true + f.released = true + f.useMu.Unlock() + + if released { // fraction can be suicided after release + if f.Config.KeepMetaFile { + f.removeMetaFile() // meta was not removed while release + } + if f.Config.SkipSortDocs { + f.removeDocsFiles() // docs was not removed while release + } + } else { // was not release + f.releaseMem() + f.removeMetaFile() + f.removeDocsFiles() + } +} + +func (f *Active) releaseMem() { + f.writer.Stop() + f.TokenList.Stop() + + f.docsCache.Release() + f.sortCache.Release() + + if err := f.metaFile.Close(); err != nil { + logger.Error("can't close meta file", zap.String("frac", f.BaseFileName), zap.Error(err)) + } + + f.RIDs = nil + f.MIDs = nil + f.TokenList = nil + f.DocsPositions = nil +} + +func (f *Active) removeDocsFiles() { + if err := f.docsFile.Close(); err != nil { + logger.Error("can't close docs file", zap.String("frac", f.BaseFileName), zap.Error(err)) + } + if err := os.Remove(f.docsFile.Name()); err != nil { + logger.Error("can't delete docs file", zap.String("frac", f.BaseFileName), zap.Error(err)) + } +} + +func (f *Active) removeMetaFile() { + if err := os.Remove(f.metaFile.Name()); err != nil { + logger.Error("can't delete metas file", zap.String("frac", f.BaseFileName), zap.Error(err)) + } +} diff --git a/frac/active2/active2.go b/frac/active2/active2.go new file mode 100644 index 00000000..91c79bd1 --- /dev/null +++ b/frac/active2/active2.go @@ -0,0 +1,350 @@ +package active2 + +import ( + "context" + "io" + "os" + "path/filepath" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/ozontech/seq-db/cache" + "github.com/ozontech/seq-db/conf" + "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/disk" + "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/metric" + "github.com/ozontech/seq-db/metric/stopwatch" + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/util" + "go.uber.org/zap" +) + +type Active2 struct { + Config *frac.Config + + BaseFileName string + + useMu sync.RWMutex + suicided bool + released bool + + indexMu sync.RWMutex + info *frac.Info + indexes Indexes + indexer Indexer + + docsFile *os.File + docsReader disk.DocsReader + sortReader disk.DocsReader + docsCache *cache.Cache[[]byte] + sortCache *cache.Cache[[]byte] + + metaFile *os.File + metaReader disk.DocBlocksReader + + writer *frac.ActiveWriter +} + +func New( + baseFileName string, + readLimiter *disk.ReadLimiter, + docsCache *cache.Cache[[]byte], + sortCache *cache.Cache[[]byte], + config *frac.Config, +) *Active2 { + docsFile, docsStats := mustOpenFile(baseFileName+consts.DocsFileSuffix, conf.SkipFsync) + metaFile, metaStats := mustOpenFile(baseFileName+consts.MetaFileSuffix, conf.SkipFsync) + + f := &Active2{ + docsFile: docsFile, + docsCache: docsCache, + sortCache: sortCache, + docsReader: disk.NewDocsReader(readLimiter, docsFile, docsCache), + sortReader: disk.NewDocsReader(readLimiter, docsFile, sortCache), + + metaFile: metaFile, + metaReader: disk.NewDocBlocksReader(readLimiter, metaFile), + + writer: frac.NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), conf.SkipFsync), + + BaseFileName: baseFileName, + info: frac.NewInfo(baseFileName, uint64(docsStats.Size()), uint64(metaStats.Size())), + Config: config, + } + + logger.Info("active fraction created", zap.String("fraction", baseFileName)) + + return f +} + +func mustOpenFile(name string, skipFsync bool) (*os.File, os.FileInfo) { + file, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0o776) + if err != nil { + logger.Fatal("can't create docs file", zap.String("file", name), zap.Error(err)) + } + + if !skipFsync { + parentDirPath := filepath.Dir(name) + util.MustSyncPath(parentDirPath) + } + + stat, err := file.Stat() + if err != nil { + logger.Fatal("can't stat docs file", zap.String("file", name), zap.Error(err)) + } + return file, stat +} + +func (f *Active2) Replay(ctx context.Context) error { + logger.Info("start replaying...") + + targetSize := f.info.MetaOnDisk + t := time.Now() + + metaPos := uint64(0) + step := targetSize / 10 + next := step + + sw := stopwatch.New() + wg := sync.WaitGroup{} + +out: + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + meta, metaSize, err := f.metaReader.ReadDocBlock(int64(metaPos)) + if err == io.EOF { + if metaSize != 0 { + logger.Warn("last meta block is partially written, skipping it") + } + break out + } + if err != nil && err != io.EOF { + return err + } + + if metaPos > next { + next += step + progress := float64(metaPos) / float64(targetSize) * 100 + logger.Info("replaying batch, meta", + zap.Uint64("from", metaPos), + zap.Uint64("to", metaPos+metaSize), + zap.Uint64("target", targetSize), + util.ZapFloat64WithPrec("progress_percentage", progress, 2), + ) + } + + metaPos += metaSize + + wg.Add(1) + f.indexer.Index(meta, sw, func(index *memIndex, err error) { + if err != nil { + logger.Fatal("bulk indexing error", zap.Error(err)) + } + f.addIndex(index) + wg.Done() + }) + } + } + + wg.Wait() + + tookSeconds := util.DurationToUnit(time.Since(t), "s") + throughputRaw := util.SizeToUnit(f.info.DocsRaw, "mb") / tookSeconds + throughputMeta := util.SizeToUnit(f.info.MetaOnDisk, "mb") / tookSeconds + logger.Info("active fraction replayed", + zap.String("name", f.info.Name()), + zap.Uint32("docs_total", f.info.DocsTotal), + util.ZapUint64AsSizeStr("docs_size", f.info.DocsOnDisk), + util.ZapFloat64WithPrec("took_s", tookSeconds, 1), + util.ZapFloat64WithPrec("throughput_raw_mb_sec", throughputRaw, 1), + util.ZapFloat64WithPrec("throughput_meta_mb_sec", throughputMeta, 1), + ) + return nil +} + +var bulkStagesSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "seq_db_store", + Subsystem: "bulk", + Name: "stages_seconds", + Buckets: metric.SecondsBuckets, +}, []string{"stage"}) + +// Append causes data to be written on disk and sends metas to index workers +func (f *Active2) Append(docs, meta []byte, wg *sync.WaitGroup) (err error) { + sw := stopwatch.New() + m := sw.Start("append") + if err = f.writer.Write(docs, meta, sw); err != nil { + m.Stop() + return err + } + f.updateDiskStats(uint64(len(docs)), uint64(len(meta))) + + f.indexer.Index(meta, sw, func(index *memIndex, err error) { + if err != nil { + logger.Fatal("bulk indexing error", zap.Error(err)) + } + f.addIndex(index) + wg.Done() + }) + + m.Stop() + sw.Export(bulkStagesSeconds) + return nil +} + +func (f *Active2) updateDiskStats(docsLen, metaLen uint64) { + f.indexMu.Lock() + f.info.DocsOnDisk += docsLen + f.info.MetaOnDisk += metaLen + f.indexMu.Unlock() +} + +func (f *Active2) addIndex(index *memIndex) { + maxMID := index.ids[0].MID + minMID := index.ids[len(index.ids)-1].MID + + f.indexMu.Lock() + defer f.indexMu.Unlock() + + f.indexes.Add(index) + + if f.info.From > minMID { + f.info.From = minMID + } + if f.info.To < maxMID { + f.info.To = maxMID + } + f.info.DocsRaw += index.docsSize + f.info.DocsTotal += index.docsCount +} + +func (f *Active2) String() string { + return frac.FracToString(f, "active2") +} + +func (f *Active2) DataProvider(ctx context.Context) (frac.DataProvider, func()) { + f.useMu.RLock() + + if f.suicided { + f.useMu.RUnlock() + metric.CountersTotal.WithLabelValues("fraction_suicided").Inc() + return frac.EmptyDataProvider{}, func() {} + } + + if f.released { + f.useMu.RUnlock() + metric.CountersTotal.WithLabelValues("fraction_released").Inc() + return frac.EmptyDataProvider{}, func() {} + } + + dp := f.createDataProvider(ctx) + + if dp.info.DocsTotal == 0 { // it is empty active fraction state + f.useMu.RUnlock() + metric.CountersTotal.WithLabelValues("fraction_empty").Inc() + return frac.EmptyDataProvider{}, func() {} + } + + // it is ordinary active fraction state + return dp, f.useMu.RUnlock +} + +func (f *Active2) createDataProvider(ctx context.Context) *dataProvider { + f.indexMu.RLock() + info := *f.info // copy + indexes := f.indexes.Indexes() + f.indexMu.RUnlock() + + return &dataProvider{ + ctx: ctx, + config: f.Config, + info: &info, + indexes: indexes, + docsReader: &f.docsReader, + } +} + +func (f *Active2) Info() *frac.Info { + f.indexMu.RLock() + defer f.indexMu.RUnlock() + + cp := *f.info // copy + return &cp +} + +func (f *Active2) Contains(id seq.MID) bool { + return f.Info().IsIntersecting(id, id) +} + +func (f *Active2) IsIntersecting(from, to seq.MID) bool { + return f.Info().IsIntersecting(from, to) +} + +func (f *Active2) Release() { + f.useMu.Lock() + f.released = true + f.useMu.Unlock() + + f.releaseMem() + + if !f.Config.KeepMetaFile { + f.removeMetaFile() + } + + if !f.Config.SkipSortDocs { + // we use sorted docs in sealed fraction so we can remove original docs of active fraction + f.removeDocsFiles() + } +} + +func (f *Active2) Suicide() { + f.useMu.Lock() + released := f.released + f.suicided = true + f.released = true + f.useMu.Unlock() + + if released { // fraction can be suicided after release + if f.Config.KeepMetaFile { + f.removeMetaFile() // meta was not removed while release + } + if f.Config.SkipSortDocs { + f.removeDocsFiles() // docs was not removed while release + } + } else { // was not release + f.releaseMem() + f.removeMetaFile() + f.removeDocsFiles() + } +} + +func (f *Active2) releaseMem() { + f.writer.Stop() + f.indexes.StopMergeLoop() + + if err := f.metaFile.Close(); err != nil { + logger.Error("can't close meta file", zap.String("frac", f.BaseFileName), zap.Error(err)) + } +} + +func (f *Active2) removeDocsFiles() { + if err := f.docsFile.Close(); err != nil { + logger.Error("can't close docs file", zap.String("frac", f.BaseFileName), zap.Error(err)) + } + if err := os.Remove(f.docsFile.Name()); err != nil { + logger.Error("can't delete docs file", zap.String("frac", f.BaseFileName), zap.Error(err)) + } +} + +func (f *Active2) removeMetaFile() { + if err := os.Remove(f.metaFile.Name()); err != nil { + logger.Error("can't delete metas file", zap.String("frac", f.BaseFileName), zap.Error(err)) + } +} diff --git a/frac/active2/data_provider.go b/frac/active2/data_provider.go new file mode 100644 index 00000000..40b006d1 --- /dev/null +++ b/frac/active2/data_provider.go @@ -0,0 +1,196 @@ +package active2 + +import ( + "context" + "fmt" + "sort" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/ozontech/seq-db/disk" + "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/lids" + "github.com/ozontech/seq-db/frac/processor" + "github.com/ozontech/seq-db/metric" + "github.com/ozontech/seq-db/metric/stopwatch" + "github.com/ozontech/seq-db/node" + "github.com/ozontech/seq-db/parser" + "github.com/ozontech/seq-db/pattern" + "github.com/ozontech/seq-db/seq" +) + +type dataProvider struct { + ctx context.Context + + config *frac.Config + + info *frac.Info + indexes []*memIndex + docsReader *disk.DocsReader +} + +var fetcherStagesSec = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "seq_db_store", + Subsystem: "fetcher", + Name: "active_stages_seconds", + Buckets: metric.SecondsBuckets, +}, []string{"stage"}) + +func (dp *dataProvider) Fetch(ids []seq.ID) ([][]byte, error) { + sw := stopwatch.New() + defer sw.Export(fetcherStagesSec) + + t := sw.Start("total") + res := make([][]byte, len(ids)) + for _, index := range dp.indexes { + fetchIndex := fetchIndex{index: index, docsReader: dp.docsReader} + if err := processor.IndexFetch(ids, sw, &fetchIndex, res); err != nil { + return nil, err + } + } + t.Stop() + + return res, nil +} + +func (dp *dataProvider) Search(params processor.SearchParams) (*seq.QPR, error) { + sw := stopwatch.New() + defer sw.Export(getActiveSearchMetric(params)) + + t := sw.Start("total") + qprs := make([]*seq.QPR, 0, len(dp.indexes)) + aggLimits := processor.AggLimits(dp.config.Search.AggLimits) + for _, index := range dp.indexes { + si := searchIndex{ctx: dp.ctx, index: index} + qpr, err := processor.IndexSearch(dp.ctx, params, &si, aggLimits, sw) + if err != nil { + return nil, err + } + qprs = append(qprs, qpr) + } + res := processor.MergeQPRs(qprs, params) + res.IDs.ApplyHint(dp.info.Name()) + t.Stop() + + return res, nil +} + +type fetchIndex struct { + index *memIndex + docsReader *disk.DocsReader +} + +func (si *fetchIndex) GetBlocksOffsets(blockIndex uint32) uint64 { + return si.index.blocksOffsets[blockIndex] +} + +func (si *fetchIndex) GetDocPos(ids []seq.ID) []seq.DocPos { + docsPos := make([]seq.DocPos, len(ids)) + for i, id := range ids { + var ok bool + if docsPos[i], ok = si.index.positions[id]; !ok { + docsPos[i] = seq.DocPosNotFound + } + } + return docsPos +} + +func (si *fetchIndex) ReadDocs(blockOffset uint64, docOffsets []uint64) ([][]byte, error) { + return si.docsReader.ReadDocs(blockOffset, docOffsets) +} + +type searchIndex struct { + ctx context.Context + index *memIndex +} + +func (si *searchIndex) GetValByTID(tid uint32) []byte { + return si.index.tokens[tid] +} + +func (si *searchIndex) GetTIDsByTokenExpr(t parser.Token) ([]uint32, error) { + field := parser.GetField(t) + tp := si.index.getTokenProvider(field) + tids, err := pattern.Search(si.ctx, t, tp) + if err != nil { + return nil, fmt.Errorf("search error: %w field: %s, query: %s", err, field, parser.GetHint(t)) + } + return tids, nil +} + +func (si *searchIndex) GetLIDsFromTIDs(tids []uint32, _ lids.Counter, minLID, maxLID uint32, order seq.DocsOrder) []node.Node { + nodes := make([]node.Node, 0, len(tids)) + for _, tid := range tids { + nodes = append(nodes, si.geTidLidsNode(tid, minLID, maxLID, order)) + } + return nodes +} + +func (si *searchIndex) geTidLidsNode(tid, minLID, maxLID uint32, order seq.DocsOrder) node.Node { + if tid == si.index.allTID { + return node.NewRange(minLID, maxLID, order.IsReverse()) + } + tidLIDs := si.index.tokenLIDs[tid] + return node.NewStatic(narrowDownLIDs(tidLIDs, minLID, maxLID), order.IsReverse()) +} + +func narrowDownLIDs(tidLIDs []uint32, minLID, maxLID uint32) []uint32 { + n := len(tidLIDs) + left := sort.Search(n, func(i int) bool { return tidLIDs[i] >= minLID }) + right := sort.Search(n, func(i int) bool { return tidLIDs[i] > maxLID }) - 1 + if left > right { + return nil + } + return tidLIDs[left:right] +} + +func (si *searchIndex) LessOrEqual(lid seq.LID, id seq.ID) bool { + checkedMID := si.GetMID(lid) + if checkedMID == id.MID { + return si.GetRID(lid) <= id.RID + } + return checkedMID < id.MID +} + +func (si *searchIndex) GetMID(lid seq.LID) seq.MID { + return si.index.ids[lid-1].MID +} + +func (si *searchIndex) GetRID(lid seq.LID) seq.RID { + return si.index.ids[lid-1].RID +} + +func (si *searchIndex) Len() int { + return len(si.index.ids) + 1 // ?? +} + +var ( + searchAggSec = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "seq_db_store", + Subsystem: "search", + Name: "tracer_active_agg_search_sec", + Buckets: metric.SecondsBuckets, + }, []string{"stage"}) + searchHstSec = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "seq_db_store", + Subsystem: "search", + Name: "tracer_active_hist_search_sec", + Buckets: metric.SecondsBuckets, + }, []string{"stage"}) + searchSimpleSec = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "seq_db_store", + Subsystem: "search", + Name: "tracer_active_reg_search_sec", + Buckets: metric.SecondsBuckets, + }, []string{"stage"}) +) + +func getActiveSearchMetric(params processor.SearchParams) *prometheus.HistogramVec { + if params.HasAgg() { + return searchAggSec + } + if params.HasHist() { + return searchHstSec + } + return searchSimpleSec +} diff --git a/frac/active2/indexer.go b/frac/active2/indexer.go new file mode 100644 index 00000000..a0275273 --- /dev/null +++ b/frac/active2/indexer.go @@ -0,0 +1,112 @@ +package active2 + +import ( + "bytes" + "encoding/binary" + "unsafe" + + "github.com/ozontech/seq-db/disk" + "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/metric/stopwatch" + "github.com/ozontech/seq-db/seq" +) + +type Indexer struct { + sem chan struct{} +} + +func (s *Indexer) Index(meta disk.DocBlock, sw *stopwatch.Stopwatch, fn func(index *memIndex, err error)) { + s.sem <- struct{}{} + go func() { + fn(CreateIndex(meta, sw)) + <-s.sem + }() +} + +func CreateIndex(meta disk.DocBlock, sw *stopwatch.Stopwatch) (*memIndex, error) { + const uint32Len = uint64(unsafe.Sizeof(uint32(0))) + + docsPos := disk.DocBlock(meta).GetExt2() + payload, err := disk.DocBlock(meta).DecompressTo(nil) + if err != nil { + return nil, err + } + + index := &memIndex{ + blocksOffsets: []uint64{docsPos}, + positions: make(map[seq.ID]seq.DocPos), + } + + fieldsSize := 0 + tokensSize := 0 + totalTokens := 0 + + tokenLIDsTotal := 0 + nextDocOffset := uint64(0) + + all := []uint32{} + ids := []seq.ID{} + + fieldsIndex := map[string]map[string][]uint32{ + seq.TokenAll: {"": {}}, // empty _all_ entry + } + + for len(payload) > 0 { + n := binary.LittleEndian.Uint32(payload) + 4 + docMeta := payload[4:n] + payload = payload[n:] + + var meta frac.MetaData + if err := meta.UnmarshalBinary(docMeta); err != nil { + return nil, err + } + + lid := uint32(len(ids)) + all = append(all, lid) + ids = append(ids, meta.ID) + + // build token index map + for _, mt := range meta.Tokens { + if bytes.Equal(mt.Key, seq.AllTokenName) { // skip if _ALL_ comes from seq-proxy + tokenLIDsTotal-- + continue + } + tokensIndex, ok := fieldsIndex[string(mt.Key)] + if !ok { + fieldsSize += len(mt.Key) + tokensIndex = make(map[string][]uint32) + } + tokensLIDs, ok := tokensIndex[string(mt.Value)] + if !ok { + totalTokens++ + tokensSize += len(mt.Value) + } + tokensIndex[string(mt.Value)] = append(tokensLIDs, lid) + fieldsIndex[string(mt.Key)] = tokensIndex + } + + if meta.Size == 0 { // this is a nested document + continue + } + + index.positions[meta.ID] = seq.PackDocPos(0, nextDocOffset) + nextDocOffset += uint64(meta.Size) + uint32Len + index.docsCount++ + } + + index.docsSize = nextDocOffset - uint64(index.docsCount-1)*uint32Len + + index.fields = make([][]byte, 0, len(fieldsIndex)) + index.tokens = make([][]byte, 0, totalTokens) + index.tokenLIDs = make([][]uint32, 0, totalTokens) + + lidsData := make([]uint32, 0, tokenLIDsTotal) + tokensData := make([]byte, 0, tokensSize+fieldsSize) + + inverser := index.sortIDs(all, ids) + index.sortTokens(fieldsIndex, tokensData, lidsData, inverser) + + index.allTID = uint32(index.fieldsTokens[seq.TokenAll].start) + + return index, nil +} diff --git a/frac/active2/indexes.go b/frac/active2/indexes.go new file mode 100644 index 00000000..a4a50c02 --- /dev/null +++ b/frac/active2/indexes.go @@ -0,0 +1,233 @@ +package active2 + +import ( + "slices" + "sync" + "sync/atomic" +) + +type indexItem struct { + id uint64 + tier int + index *memIndex +} + +type Indexes struct { + sem chan struct{} + merge chan struct{} + counter atomic.Uint64 + tiers sizeTiers + + mu sync.RWMutex + unoccupied map[uint64]indexItem // map of indexes ready for merge + processing map[uint64]indexItem // map of indexes taken for merging + indexesBuf []*memIndex + + mergingMu sync.Mutex + stopped bool + + iMu sync.RWMutex + indexes []*memIndex // list of indexes for read - it is copy of `waiting` + `merging` with its own lock + + wg sync.WaitGroup +} + +const ( + minToMerge = 4 // minimum number of indexes required for merging + mergeForcedThreshold = 100 // the number of indexes at which it is necessary to merge some of them + + tierDeltaPercent = 10 // different between size tiers + tierFirstMax = 100 // max size in first tier + tierMax = 1000 // maximum number of size tiers allowed + + bucketPercent = 50 // different between size buckets +) + +func newIndexes(sem chan struct{}) *Indexes { + indexes := Indexes{ + sem: sem, + merge: make(chan struct{}, 1), + unoccupied: make(map[uint64]indexItem), + processing: make(map[uint64]indexItem), + tiers: newSizeTiers(tierFirstMax, tierMax, tierDeltaPercent), + } + go indexes.mergeLoop() + return &indexes +} + +func (indexes *Indexes) StopMergeLoop() { + indexes.mergingMu.Lock() + defer indexes.mergingMu.Unlock() + + indexes.stopped = true + + indexes.wg.Wait() // waiting for the completion of all current merger processes + close(indexes.merge) +} + +func (indexes *Indexes) MergeAll() *memIndex { + indexes.mergingMu.Lock() + defer indexes.mergingMu.Unlock() + + indexes.wg.Wait() // waiting for the completion of all current merger processes + + merging := indexes.allUnoccupied() + merged := MergeIndexes(unwrapIndexes(merging)) + indexes.replace(merging, indexes.wrapIndex(merged)) + + return merged +} + +func (indexes *Indexes) wrapIndex(index *memIndex) indexItem { + return indexItem{ + id: indexes.counter.Add(1), + tier: indexes.tiers.Calc(index.docsCount), + index: index, + } +} + +func unwrapIndexes(items []indexItem) []*memIndex { + res := make([]*memIndex, 0, len(items)) + for _, item := range items { + res = append(res, item.index) + } + return res +} + +func (indexes *Indexes) Add(index *memIndex) { + item := indexes.wrapIndex(index) + + indexes.mu.Lock() + indexes.unoccupied[item.id] = item + { + indexes.iMu.Lock() + indexes.indexes = append(indexes.indexes, index) + indexes.iMu.Unlock() + } + indexes.mu.Unlock() + + indexes.launchMerge() +} + +func (indexes *Indexes) Indexes() []*memIndex { + indexes.iMu.RLock() + defer indexes.iMu.RUnlock() + + return indexes.indexes +} + +func (indexes *Indexes) allUnoccupied() []indexItem { + indexes.mu.RLock() + defer indexes.mu.RUnlock() + + items := make([]indexItem, 0, len(indexes.unoccupied)) + for _, item := range indexes.unoccupied { + items = append(items, item) + } + return items +} + +func (indexes *Indexes) markAsProcessing(items []indexItem) { + indexes.mu.Lock() + defer indexes.mu.Unlock() + + for _, item := range items { + delete(indexes.unoccupied, item.id) + indexes.processing[item.id] = item + } +} + +func (indexes *Indexes) prepareToMerge() [][]indexItem { + indexes.mergingMu.Lock() + defer indexes.mergingMu.Unlock() + + if indexes.stopped { + return nil + } + + suitable := SelectForMerge(indexes.allUnoccupied(), minToMerge) + for n, items := range suitable { + if !indexes.acquireWorker() { // there are no free workers + suitable = suitable[:n] // so we cut off the tail that we can't process right now + break + } + indexes.markAsProcessing(items) + } + + // it is important to wg.Add() inside the lock otherwise indexes.wg.Wait() and close(indexes.merge) + // will be called before wg.Add() and indexes.mergeNotify() (see indexes.Stop()) + indexes.wg.Add(len(suitable)) + + return suitable +} + +func (indexes *Indexes) mergeLoop() { + for range indexes.merge { + for { + prepared := indexes.prepareToMerge() + if len(prepared) == 0 { + break + } + for _, merging := range prepared { + go func(merging []indexItem) { + merged := MergeIndexes(unwrapIndexes(merging)) + indexes.replace(merging, indexes.wrapIndex(merged)) + + indexes.releaseWorker() + indexes.launchMerge() + indexes.wg.Done() + }(merging) + } + } + } +} + +func (indexes *Indexes) acquireWorker() bool { + select { + case indexes.sem <- struct{}{}: + return true + default: + return false + } +} + +func (indexes *Indexes) releaseWorker() { + <-indexes.sem +} + +func (indexes *Indexes) launchMerge() { + select { + case indexes.merge <- struct{}{}: + default: + } + return +} + +func (indexes *Indexes) replace(processed []indexItem, merged indexItem) { + indexes.mu.Lock() + defer indexes.mu.Unlock() + + // replace + for _, index := range processed { + delete(indexes.processing, index.id) + } + indexes.unoccupied[merged.id] = merged + + // rebuild indexes.indexes + indexes.indexesBuf = indexes.indexesBuf[:0] + indexes.indexesBuf = slices.Grow(indexes.indexesBuf, len(indexes.unoccupied)+len(indexes.processing)) + for _, index := range indexes.unoccupied { + indexes.indexesBuf = append(indexes.indexesBuf, index.index) + } + for _, index := range indexes.processing { + indexes.indexesBuf = append(indexes.indexesBuf, index.index) + } + + // swap indexes.indexes + indexes.iMu.Lock() + indexes.indexesBuf, indexes.indexes = indexes.indexes, indexes.indexesBuf + indexes.iMu.Unlock() + + // avoid old indexes leak + clear(indexes.indexesBuf) +} diff --git a/frac/active2/mem_index.go b/frac/active2/mem_index.go new file mode 100644 index 00000000..1aac5773 --- /dev/null +++ b/frac/active2/mem_index.go @@ -0,0 +1,112 @@ +package active2 + +import ( + "bytes" + "sort" + + "github.com/ozontech/seq-db/seq" +) + +type tokensRange struct { + start int + count int +} + +type memIndex struct { + ids []seq.ID // IDs ordered DESC + tokens [][]byte // tokens ordered ASC by field:token + tokenLIDs [][]uint32 // LIDs list for each token from `tokens` + fieldsTokens map[string]tokensRange // tokens locator for each field + fields [][]byte // fields ordered ASC + blocksOffsets []uint64 // blocks offsets ordered by offset + positions map[seq.ID]seq.DocPos // just map seq.ID => seq.DocPos + allTID uint32 + + docsSize uint64 + docsCount uint32 +} + +func (index *memIndex) getTokenProvider(field string) *tokenProvider { + r := index.fieldsTokens[field] + return &tokenProvider{ + firstTID: uint32(r.start), + lastTID: uint32(r.start + r.count - 1), + tokens: index.tokens, + } +} + +func (index *memIndex) IsIntersecting(from, to seq.MID) bool { + maxMID := index.ids[0].MID + minMID := index.ids[len(index.ids)-1].MID + if to < minMID || maxMID < from { + return false + } + return true +} + +func (index *memIndex) sortIDs(all []uint32, ids []seq.ID) []uint32 { + // sort all + sort.Slice(all, func(i, j int) bool { return seq.Less(ids[int(all[i])], ids[int(all[j])]) }) + + // sort ids and make inverser + index.ids = make([]seq.ID, len(ids)) + inverser := make([]uint32, len(ids)) + for new, old := range all { + index.ids[new] = ids[old] + inverser[old] = uint32(new) + } + return inverser +} + +func (index *memIndex) sortTokens(fieldsIndex map[string]map[string][]uint32, tokensData []byte, lidsData, inverser []uint32) { + // copy fields and sort + for field := range fieldsIndex { + tokensData = append(tokensData, field...) // copy field + index.fields = append(index.fields, tokensData[len(tokensData)-len(field):]) + } + sort.Slice(index.fields, func(i, j int) bool { return bytes.Compare(index.fields[i], index.fields[j]) < 0 }) + + // copy tokens and sort + for _, field := range index.fields { + start := len(index.tokens) + for token, tokenLIDs := range fieldsIndex[string(field)] { + sortLIDs(tokenLIDs, inverser) + lidsData = append(lidsData, tokenLIDs...) // copy lids + tokensData = append(tokensData, token...) // copy token + index.tokens = append(index.tokens, tokensData[len(tokensData)-len(token):]) + index.tokenLIDs = append(index.tokenLIDs, lidsData[len(lidsData)-len(tokenLIDs):]) + } + tokens := index.tokens[start:] + sort.Slice(tokens, func(i, j int) bool { return bytes.Compare(tokens[i], tokens[j]) < 0 }) + index.fieldsTokens[string(field)] = tokensRange{start: start, count: len(tokens)} + } +} + +func sortLIDs(lids, inverser []uint32) { + for i, old := range lids { + lids[i] = inverser[old] + 1 // inverse: LIDs starts from 1 not 0 + } + sort.Slice(lids, func(i, j int) bool { return lids[i] < lids[j] }) // sort +} + +type tokenProvider struct { + firstTID uint32 + lastTID uint32 + tokens [][]byte +} + +func (p *tokenProvider) GetToken(tid uint32) []byte { + return p.tokens[tid] +} + +func (p *tokenProvider) FirstTID() uint32 { + return p.firstTID +} + +func (p *tokenProvider) LastTID() uint32 { + return p.lastTID +} + +func (p *tokenProvider) Ordered() bool { + return true +} diff --git a/frac/active2/merge.go b/frac/active2/merge.go new file mode 100644 index 00000000..18f92221 --- /dev/null +++ b/frac/active2/merge.go @@ -0,0 +1,208 @@ +package active2 + +import ( + "bytes" + + "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/seq" + "go.uber.org/zap" +) + +func MergeIndexes(indexes []*memIndex) *memIndex { + docsCount := 0 + blocksCount := 0 + fieldsCount := 0 + docsSize := uint64(0) + iterators := make([]mergeIterator, 0, len(indexes)) + for _, index := range indexes { + docsSize += index.docsSize + docsCount += len(index.ids) + fieldsCount += len(index.fields) + blocksCount += len(index.blocksOffsets) + iterators = append(iterators, newIndexIterator(index)) + } + + dst := &memIndex{ + ids: make([]seq.ID, 0, docsCount), + positions: make(map[seq.ID]seq.DocPos, docsCount), + fieldsTokens: make(map[string]tokensRange, fieldsCount), + blocksOffsets: make([]uint64, 0, blocksCount), + docsSize: docsSize, + } + + doubles := mergeIDs(dst, iterators) + mergeTokens(dst, iterators) + mergePositions(dst, iterators) + + dst.docsCount = uint32(len(dst.ids)) + dst.allTID = uint32(dst.fieldsTokens[seq.TokenAll].start) + + if len(doubles) > 0 { + logger.Warn("there are duplicate IDs when compaction", zap.Int("doubles", len(doubles))) + } + + return dst +} + +func mergeIDs(dst *memIndex, orig []mergeIterator) []seq.ID { + doubles := []seq.ID{} + iterators := append([]mergeIterator{}, orig...) // make copy + + for len(iterators) > 0 { + // try select first + selected := []int{0} + maxID := iterators[0].CurrentID() + + for i := 1; i < len(iterators); i++ { + if cur := iterators[i].CurrentID(); cur == maxID { + selected = append(selected, i) + } else if seq.Less(maxID, cur) { + maxID = cur + selected = []int{i} + } + } + + lid := uint32(len(dst.ids)) + for _, i := range selected { + iterators[i].AddNewLID(lid) + if !iterators[i].ShiftID() { + removeItem(iterators, i) + } + } + dst.ids = append(dst.ids, maxID) + + if len(selected) > 1 { + doubles = append(doubles, maxID) + } + } + return doubles +} + +func mergeTokens(dst *memIndex, orig []mergeIterator) { + // todo copy tokens to compact mem usage + // todo allocate for all lids at once to optimize allocations + var prevField []byte + iterators := append([]mergeIterator{}, orig...) // make copy + for len(iterators) > 0 { + // try select first + selected := []int{0} + minToken := iterators[0].CurrentToken() + + for i := 1; i < len(iterators); i++ { + cur := iterators[i].CurrentToken() + if cmp := compareMetaToken(cur, minToken); cmp < 0 { + minToken = cur + selected = []int{i} + } else if cmp == 0 { + selected = append(selected, i) + } + } + + lids := make([][]uint32, 0, len(selected)) + for _, i := range selected { + lids = append(lids, iterators[i].CurrentTokenLIDs()) + if !iterators[i].ShiftToken() { + removeItem(iterators, i) + } + } + + if !bytes.Equal(prevField, minToken.Key) { // new field + if tr, ok := dst.fieldsTokens[string(prevField)]; ok { + tr.count = len(dst.tokens) - tr.start + dst.fieldsTokens[string(prevField)] = tr + } + dst.fields = append(dst.fields, minToken.Key) + dst.fieldsTokens[string(minToken.Key)] = tokensRange{start: len(dst.tokens)} + prevField = minToken.Key + } + + dst.tokens = append(dst.tokens, minToken.Value) + dst.tokenLIDs = append(dst.tokenLIDs, mergeLIDs(lids)) + } + if tr, ok := dst.fieldsTokens[string(prevField)]; ok { + tr.count = len(dst.tokens) - tr.start + dst.fieldsTokens[string(prevField)] = tr + } +} + +func mergePositions(dst *memIndex, orig []mergeIterator) { + iterators := append([]mergeIterator{}, orig...) // make copy + for len(iterators) > 0 { + // try select first + selected := []int{0} + minOffset := iterators[0].CurrentBlocksOffset() + + for i := 1; i < len(iterators); i++ { + if cur := iterators[i].CurrentBlocksOffset(); cur == minOffset { + selected = append(selected, i) + } else if cur < minOffset { + minOffset = cur + selected = []int{i} + } + } + + newBlockIndex := len(dst.blocksOffsets) + dst.blocksOffsets = append(dst.blocksOffsets, minOffset) + + for _, i := range selected { + iterators[i].AddNewBlockIndex(newBlockIndex) + if !iterators[i].ShiftBlocksOffset() { + removeItem(iterators, i) + } + } + } + + for _, iterator := range orig { + iterator.RepackDocPositions(dst.positions) + } +} + +func compareMetaToken(mt1, mt2 frac.MetaToken) int { + res := bytes.Compare(mt1.Key, mt2.Key) + if res == 0 { + return bytes.Compare(mt1.Value, mt2.Value) + } + return res +} + +func mergeLIDs(lids [][]uint32) []uint32 { + size := 0 + for i := range lids { + size += len(lids[i]) + } + res := make([]uint32, 0, size) + + for len(lids) > 0 { + // try select first + selected := []int{0} + minLID := lids[0][0] + + for i := 1; i < len(lids); i++ { + cur := lids[i][0] + if cur == minLID { // can be doubles + selected = append(selected, i) + } else if cur < minLID { + selected = []int{i} + minLID = cur + } + } + + res = append(res, minLID) + + for _, i := range selected { + if lids[i] = lids[i][1:]; len(lids[i]) == 0 { + removeItem(lids, i) + } + } + } + + return res +} + +func removeItem[V any](items []V, i int) []V { + last := len(items) - 1 + items[i] = items[last] + items = items[:last] + return items +} diff --git a/frac/active2/merge_iterator.go b/frac/active2/merge_iterator.go new file mode 100644 index 00000000..f8f1f62a --- /dev/null +++ b/frac/active2/merge_iterator.go @@ -0,0 +1,97 @@ +package active2 + +import ( + "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/seq" +) + +// For compaction +type mergeIterator struct { + index *memIndex + posIDs int + posField int + posToken int + posBlocks int + lastFieldToken int + newLIDs []uint32 + newBlocks []int +} + +func newIndexIterator(index *memIndex) mergeIterator { + return mergeIterator{ + index: index, + newLIDs: make([]uint32, len(index.ids)), + newBlocks: make([]int, len(index.blocksOffsets)), + lastFieldToken: index.fieldsTokens[string(index.fields[0])].count - 1, + } +} + +func (iq *mergeIterator) ShiftID() bool { + iq.posIDs++ + if iq.posIDs == len(iq.index.ids) { + return false + } + return true +} + +func (iq *mergeIterator) CurrentID() seq.ID { + return iq.index.ids[iq.posIDs] +} + +func (iq *mergeIterator) ShiftToken() bool { + iq.posToken++ + if iq.posToken == len(iq.index.tokens) { + return false + } + if iq.posToken > iq.lastFieldToken { // need shift field + iq.posField++ + field := iq.index.fields[iq.posField] + r := iq.index.fieldsTokens[string(field)] + iq.lastFieldToken += r.count - 1 + } + return true +} + +func (iq *mergeIterator) CurrentToken() frac.MetaToken { + return frac.MetaToken{ + Key: iq.index.fields[iq.posField], + Value: iq.index.tokens[iq.posToken], + } +} + +func (iq *mergeIterator) CurrentTokenLIDs() []uint32 { + src := iq.index.tokenLIDs[iq.posToken] + dst := make([]uint32, 0, len(src)) + for _, oldLid := range src { + dst = append(dst, iq.newLIDs[oldLid-1]+1) + } + return dst +} + +func (iq *mergeIterator) ShiftBlocksOffset() bool { + iq.posBlocks++ + if iq.posBlocks == len(iq.index.blocksOffsets) { + return false + } + return true +} + +func (iq *mergeIterator) CurrentBlocksOffset() uint64 { + return iq.index.blocksOffsets[iq.posBlocks] +} + +func (iq *mergeIterator) AddNewLID(lid uint32) { + iq.newLIDs = append(iq.newLIDs, lid) +} + +func (iq *mergeIterator) AddNewBlockIndex(blockIndex int) { + iq.newBlocks = append(iq.newBlocks, blockIndex) +} + +func (iq *mergeIterator) RepackDocPositions(dst map[seq.ID]seq.DocPos) { + for id, docPos := range iq.index.positions { + oldBlockIndex, docOffset := docPos.Unpack() + newBlockIndex := uint32(iq.newBlocks[oldBlockIndex]) + dst[id] = seq.PackDocPos(newBlockIndex, docOffset) + } +} diff --git a/frac/active2/merge_strategy.go b/frac/active2/merge_strategy.go new file mode 100644 index 00000000..db9a1c81 --- /dev/null +++ b/frac/active2/merge_strategy.go @@ -0,0 +1,103 @@ +package active2 + +import ( + "math" +) + +// SelectForMerge selects merge candidates based on their size. +// It groups items into sets within which the sizes of the items do not differ +// by more than a specified limit in percent (e.g. 50%) +func SelectForMerge(items []indexItem, minToMerge int) [][]indexItem { + if len(items) < minToMerge { + return nil + } + + tiersDist := buildTiersDistribution(items) + findAtAnyCost := len(items) >= mergeForcedThreshold + winSize := int(math.Round(float64(bucketPercent) / tierDeltaPercent)) + + var res [][]indexItem + for { + countInRange, firstTier, lastTier := mostPopulatedTiersRange(tiersDist, minToMerge, winSize, findAtAnyCost) + if countInRange == 0 { + break + } + buf := make([]indexItem, 0, countInRange) + res = append(res, extractIndexesInRange(items, buf, firstTier, lastTier, tiersDist)) + } + return res +} + +func buildTiersDistribution(items []indexItem) []int { + lastTier := 0 + tiersDist := make([]int, tierMax) + for _, index := range items { + tiersDist[index.tier]++ + if index.tier > lastTier { + lastTier = index.tier + } + } + return tiersDist[:lastTier] +} + +func extractIndexesInRange(items []indexItem, buf []indexItem, firstTier, lastTier int, tiersDist []int) []indexItem { + for _, index := range items { + if firstTier <= index.tier && index.tier <= lastTier { + buf = append(buf, index) + tiersDist[index.tier]-- + } + } + return buf +} + +func mostPopulatedTiersRange(tiersDist []int, minToMerge, winSize int, findAtAnyCost bool) (int, int, int) { + var lastWinTier, maxWinSum int + for { + lastWinTier, maxWinSum = findMaxSumWindow(tiersDist, winSize) + if maxWinSum >= minToMerge { // got it! + break + } + if findAtAnyCost { // expand window size and find again + // todo добавить логирования! + winSize *= 2 + continue + } + return 0, 0, 0 + } + + firstTier := max(0, lastWinTier-winSize) + lastTier := lastWinTier + + return maxWinSum, firstTier, lastTier +} + +// sliding window sum +type winSum struct { + buf []int + sum int + pos int +} + +func (w *winSum) Add(v int) { + w.sum += v - w.buf[w.pos] + w.buf[w.pos] = v + w.pos++ + if w.pos == len(w.buf) { + w.pos = 0 + } +} + +func findMaxSumWindow(tiersDist []int, winSize int) (int, int) { + maxWinSum := 0 + lastWinTier := 0 + win := winSum{buf: make([]int, winSize)} + + for tier, size := range tiersDist { + win.Add(size) + if win.sum >= maxWinSum { + lastWinTier = tier + maxWinSum = win.sum + } + } + return lastWinTier, maxWinSum +} diff --git a/frac/active2/tiers.go b/frac/active2/tiers.go new file mode 100644 index 00000000..d866fa6d --- /dev/null +++ b/frac/active2/tiers.go @@ -0,0 +1,74 @@ +package active2 + +import ( + "math" +) + +// sizeTiers splits the entire space of integers into successive ranges [A(n) ; B(n)] where: +// - A(n+1) = B(n) + 1 +// - (B(n) - A(n)) / A(n) ~ deltaPercent +// +// Example: for newSizeTiers(100, 200, 10) we will have: +// +// Tier 0: [ 0; 100 ] delta: 0.00% +// Tier 1: [ 101; 106 ] delta: 6.00% +// Tier 2: [ 107; 117 ] delta: 10.00% +// Tier 3: [ 118; 129 ] delta: 10.00% +// Tier 4: [ 130; 142 ] delta: 10.00% +// Tier 5: [ 143; 156 ] delta: 10.00% +// Tier 6: [ 157; 171 ] delta: 10.00% +// Tier 7: [ 172; 189 ] delta: 10.00% +// Tier 8: [ 190; 207 ] delta: 9.00% +// Tier 9: [ 208; 228 ] delta: 10.00% +// Tier 10: [ 229; 251 ] delta: 10.00% +// Tier 11: [ 252; 276 ] delta: 10.00% +// Tier 12: [ 277; 304 ] delta: 10.00% +// Tier 13: [ 305; 334 ] delta: 10.00% +// Tier 14: [ 335; 368 ] delta: 10.00% +// Tier 15: [ 369; 405 ] delta: 10.00% +// Tier 16: [ 406; 445 ] delta: 10.00% +// Tier 17: [ 446; 490 ] delta: 10.00% +// Tier 18: [ 491; 539 ] delta: 10.00% +// Tier 19: [ 540; 593 ] delta: 10.00% +// Tier 20: [ 594; 652 ] delta: 10.00% +// Tier 21: [ 653; 717 ] delta: 10.00% +// Tier 22: [ 718; 789 ] delta: 10.00% +// Tier 23: [ 790; 868 ] delta: 10.00% +// Tier 24: [ 869; 955 ] delta: 10.00% +// Tier 25: [ 956; 1051 ] delta: 10.00% +// Tier 26: [ 1052; 1156 ] delta: 10.00% +// +// etc. +// +// So, sizeTiers returns us the tier (the number of range) for any integer value. +type sizeTiers struct { + firstMax uint32 // first range always will be [0, firstMax] + maxTier int // maximum number of size tiers allowed, so for last range [An, Bn] where n == maxTier, Bn = +inf + deltaPercent int // for range [A, B], (B - A) / A ~ deltaPercent + + deltaK float64 + offset float64 +} + +func newSizeTiers(firstMax uint32, maxTier int, deltaPercent int) sizeTiers { + deltaK := 1 / math.Log(1+float64(deltaPercent)/100) + return sizeTiers{ + maxTier: maxTier, + firstMax: firstMax, + deltaPercent: deltaPercent, + + deltaK: deltaK, + offset: math.Floor(deltaK*(math.Log(float64(firstMax)))) - 1, + } +} + +func (t sizeTiers) Calc(size uint32) int { + if size <= t.firstMax { + return 0 + } + tier := int(math.Floor(t.deltaK*(math.Log(float64(size)))) - t.offset) + if tier > t.maxTier { + return t.maxTier + } + return tier +} diff --git a/frac/active_appender.go b/frac/active_appender.go deleted file mode 100644 index e9b94b47..00000000 --- a/frac/active_appender.go +++ /dev/null @@ -1,136 +0,0 @@ -package frac - -import ( - "os" - "sync" - - "go.uber.org/atomic" - - "github.com/ozontech/seq-db/disk" - "github.com/ozontech/seq-db/metric" - "github.com/ozontech/seq-db/metric/stopwatch" -) - -type ActiveAppender struct { - inCh chan<- writeTask - docsToMetasCh chan<- writeTask - iw *IndexWorkers -} - -type writeTaskBase struct { - data disk.DocBlock -} - -func (wt *writeTaskBase) fetchBlock() disk.DocBlock { - data := wt.data - wt.data = nil // release buffer immediately - return data -} - -type docsWriteTask struct { - writeTaskBase - outCh chan<- writeTask - payload writeTask -} - -func (wt *docsWriteTask) done() { - wt.outCh <- wt.payload -} - -type metaWriteTask struct { - writeTaskBase - indexTask *IndexTask - wg sync.WaitGroup -} - -func (wt *metaWriteTask) done() { - setPos(wt.indexTask) - wt.wg.Done() -} - -func setPos(t *IndexTask) { - t.Pos = t.Frac.UpdateDiskStats(t.GetDocsLen(), t.GetMetaLen()) -} - -func StartAppender(docsFile, metasFile *os.File, size int, skipFsync bool, iw *IndexWorkers) ActiveAppender { - inCh := make(chan writeTask, size) // closed externally by ActiveAppender.Stop - docsToMetasCh := make(chan writeTask, size) // closed by us - wgDocsWorker := &sync.WaitGroup{} // to close chan - wgDocsWorker.Add(1) - if skipFsync { - startWriteWorkerWithoutFsync(docsFile, inCh, wgDocsWorker, "docs") - startWriteWorkerWithoutFsync(metasFile, docsToMetasCh, nil, "metas") - } else { - // we first need to commit docs file, before we start writing to meta - // otherwise fraction may become corrupted and will fail during replay - startWriteWorker(docsFile, inCh, wgDocsWorker, "docs") - startWriteWorker(metasFile, docsToMetasCh, nil, "metas") - } - - go func() { - wgDocsWorker.Wait() - close(docsToMetasCh) - }() - - return ActiveAppender{ - inCh: inCh, - docsToMetasCh: docsToMetasCh, - iw: iw, - } -} - -func (a *ActiveAppender) In(frac *Active, docs, metas []byte, appendQueue *atomic.Uint32) { - task := &IndexTask{ - DocsLen: uint64(len(docs)), - Metas: metas, - Frac: frac, - AppendQueue: appendQueue, - } - - sw := stopwatch.New() - m := sw.Start("send_write_chan") - metasTask := metaWriteTask{ - writeTaskBase: writeTaskBase{ - data: metas, - }, - indexTask: task, - } - docsTask := docsWriteTask{ - writeTaskBase: writeTaskBase{ - data: docs, - }, - outCh: a.docsToMetasCh, - payload: &metasTask, - } - metasTask.wg.Add(1) - a.inCh <- &docsTask - m.Stop() - - m = sw.Start("wait_write_worker") - metasTask.wg.Wait() - m.Stop() - - m = sw.Start("send_index_chan") - a.iw.In(task) - m.Stop() - - sw.Export(metric.BulkStagesSeconds) -} - -func (a *ActiveAppender) InReplay(frac *Active, docsLen uint64, metas []byte, appendQueue *atomic.Uint32) { - task := &IndexTask{ - DocsLen: docsLen, - Metas: metas, - Frac: frac, - AppendQueue: appendQueue, - } - - setPos(task) - a.iw.In(task) -} - -func (a *ActiveAppender) Stop() { - // we only need to close the input chan - // all goroutines and intermediate and output channels will be finished and closed sequentially - close(a.inCh) -} diff --git a/frac/active_index_workers.go b/frac/active_indexer.go similarity index 70% rename from frac/active_index_workers.go rename to frac/active_indexer.go index a078a7cf..e5ff5b37 100644 --- a/frac/active_index_workers.go +++ b/frac/active_indexer.go @@ -5,109 +5,104 @@ import ( "sync" "github.com/ozontech/seq-db/bytespool" - "go.uber.org/atomic" - "go.uber.org/zap" - "github.com/ozontech/seq-db/disk" "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/metric" "github.com/ozontech/seq-db/metric/stopwatch" + "go.uber.org/zap" ) -type IndexWorkers struct { - ch chan *IndexTask - chMerge chan *MergeTask +type ActiveIndexer struct { + ch chan *indexTask + chMerge chan *mergeTask workerCount int stopFn func() } -type IndexTask struct { - DocsLen uint64 - Frac *Active - Metas disk.DocBlock - Pos uint64 - - AppendQueue *atomic.Uint32 +type indexTask struct { + Frac *Active + Metas disk.DocBlock + Pos uint64 + Wg *sync.WaitGroup } -func (t *IndexTask) GetDocsLen() uint64 { - return t.DocsLen -} - -func (t *IndexTask) GetMetaLen() uint64 { - return uint64(len(t.Metas)) -} - -type MergeTask struct { +type mergeTask struct { frac *Active tokenLIDs *TokenLIDs } -func NewIndexWorkers(workerCount, chLen int) *IndexWorkers { - return &IndexWorkers{ - ch: make(chan *IndexTask, chLen), - chMerge: make(chan *MergeTask, chLen), +func NewActiveIndexer(workerCount, chLen int) *ActiveIndexer { + return &ActiveIndexer{ + ch: make(chan *indexTask, chLen), + chMerge: make(chan *mergeTask, chLen), workerCount: workerCount, } } -func (w *IndexWorkers) Start() { +func (ai *ActiveIndexer) Index(frac *Active, metas []byte, wg *sync.WaitGroup, sw *stopwatch.Stopwatch) { + m := sw.Start("send_index_chan") + ai.ch <- &indexTask{ + Pos: disk.DocBlock(metas).GetExt2(), + Metas: metas, + Frac: frac, + Wg: wg, + } + m.Stop() +} + +func (ai *ActiveIndexer) Start() { wg := sync.WaitGroup{} - wg.Add(w.workerCount) + wg.Add(ai.workerCount) - for i := 0; i < w.workerCount; i++ { + for i := 0; i < ai.workerCount; i++ { go func(index int) { defer wg.Done() - w.appendWorker(index) + ai.appendWorker(index) }(i) } - wg.Add(w.workerCount) - for i := 0; i < w.workerCount; i++ { + wg.Add(ai.workerCount) + for i := 0; i < ai.workerCount; i++ { go func() { defer wg.Done() - w.mergeWorker() + ai.mergeWorker() }() } - w.stopFn = func() { - close(w.ch) - close(w.chMerge) + ai.stopFn = func() { + close(ai.ch) + close(ai.chMerge) wg.Wait() - w.stopFn = nil + ai.stopFn = nil } } -func (w *IndexWorkers) mergeWorker() { - for task := range w.chMerge { +func (ai *ActiveIndexer) mergeWorker() { + for task := range ai.chMerge { task.tokenLIDs.GetLIDs(task.frac.MIDs, task.frac.RIDs) // GetLIDs cause sort and merge LIDs from queue } } -func (w *IndexWorkers) Stop() { - if w.stopFn != nil { - w.stopFn() +func (ai *ActiveIndexer) Stop() { + if ai.stopFn != nil { + ai.stopFn() } } -func (w *IndexWorkers) In(t *IndexTask) { - w.ch <- t -} - var metaDataPool = sync.Pool{ New: func() any { return new(MetaData) }, } -func (w *IndexWorkers) appendWorker(index int) { +func (ai *ActiveIndexer) appendWorker(index int) { // collector of bulk meta data collector := newMetaDataCollector() - for task := range w.ch { + for task := range ai.ch { var err error sw := stopwatch.New() @@ -169,26 +164,26 @@ func (w *IndexWorkers) appendWorker(index int) { m = sw.Start("put_lids_queue") tokensToMerge := addLIDsToTokens(tokenLIDsPlaces, groups) - w.sendTokensToMergeWorkers(active, tokensToMerge) + ai.sendTokensToMergeWorkers(active, tokensToMerge) m.Stop() active.UpdateStats(collector.MinMID, collector.MaxMID, collector.DocsCounter, collector.SizeCounter) - task.AppendQueue.Dec() + task.Wg.Done() total.Stop() - sw.Export(metric.BulkStagesSeconds) + sw.Export(bulkStagesSeconds) } } -func (w *IndexWorkers) sendTokensToMergeWorkers(frac *Active, tokens []*TokenLIDs) { +func (ai *ActiveIndexer) sendTokensToMergeWorkers(frac *Active, tokens []*TokenLIDs) { for _, tl := range tokens { - task := MergeTask{ + task := mergeTask{ frac: frac, tokenLIDs: tl, } select { - case w.chMerge <- &task: + case ai.chMerge <- &task: default: // skip background merge if workers are busy } } diff --git a/frac/active_sealer.go b/frac/active_sealer.go index 8a152c23..2477588a 100644 --- a/frac/active_sealer.go +++ b/frac/active_sealer.go @@ -34,7 +34,7 @@ type SealParams struct { DocBlockSize int // DocBlockSize is decompressed payload size of document block. } -func seal(f *Active, params SealParams) (*PreloadedData, error) { +func Seal(f *Active, params SealParams) (*PreloadedData, error) { logger.Info("sealing fraction", zap.String("fraction", f.BaseFileName)) start := time.Now() @@ -58,8 +58,6 @@ func seal(f *Active, params SealParams) (*PreloadedData, error) { return nil, err } - f.close(false, "seal") - if indexFile, err = syncRename(indexFile, f.BaseFileName+consts.IndexFileSuffix); err != nil { return nil, err } diff --git a/frac/active_write_workers.go b/frac/active_write_workers.go deleted file mode 100644 index 687837fd..00000000 --- a/frac/active_write_workers.go +++ /dev/null @@ -1,113 +0,0 @@ -package frac - -import ( - "os" - "sync" - - "go.uber.org/zap" - - "github.com/ozontech/seq-db/disk" - "github.com/ozontech/seq-db/logger" - "github.com/ozontech/seq-db/metric" - "github.com/ozontech/seq-db/metric/stopwatch" - "github.com/ozontech/seq-db/util" -) - -type writeTask interface { - fetchBlock() disk.DocBlock - done() -} - -type writeWorker struct { - file *os.File - batchan *util.Batchan[writeTask] -} - -func newWriteWorker(file *os.File) *writeWorker { - return &writeWorker{ - file: file, - batchan: util.NewBatchan[writeTask](), - } -} - -func (w *writeWorker) runWrite(inCh <-chan writeTask, name string) { - sw := stopwatch.New() - - for t := range inCh { - m := sw.Start(name + " >> write_duration") - if _, err := w.file.Write(t.fetchBlock()); err != nil { - logger.Fatal("can't write fraction file", zap.String("file", w.file.Name()), zap.Error(err)) - } - m.Stop() - m = sw.Start(name + " >> write_send_duration") - w.batchan.Send(t) - m.Stop() - - sw.Export(metric.BulkStagesSeconds) - } - w.batchan.Close() -} - -func (w *writeWorker) runFsync(name string) { - sw := stopwatch.New() - - var payload []writeTask - for { - payload = w.batchan.Fetch(payload) - if len(payload) == 0 { - break - } - - metric.BulkDiskSyncTasksCount.Observe(float64(len(payload))) - - m := sw.Start(name + " >> fsync") - if err := w.file.Sync(); err != nil { - logger.Fatal("error syncing file", - zap.String("file", w.file.Name()), - zap.Error(err), - ) - } - m.Stop() - - m = sw.Start(name + " >> fsync_done") - for _, v := range payload { - v.done() - } - m.Stop() - - sw.Export(metric.BulkStagesSeconds) - } -} - -func startWriteWorker(file *os.File, inCh <-chan writeTask, wg *sync.WaitGroup, name string) { - worker := newWriteWorker(file) - go worker.runWrite(inCh, name) - go func() { - worker.runFsync(name) - if wg != nil { - wg.Done() - } - }() -} - -func startWriteWorkerWithoutFsync(file *os.File, inCh <-chan writeTask, wg *sync.WaitGroup, name string) { - go func() { - sw := stopwatch.New() - for t := range inCh { - m := sw.Start(name + " >> write_duration") - if _, err := file.Write(t.fetchBlock()); err != nil { - logger.Fatal("can't write fraction file", zap.String("file", file.Name()), zap.Error(err)) - } - m.Stop() - - m = sw.Start(name + " >> write_send_duration") - t.done() - m.Stop() - - sw.Export(metric.BulkStagesSeconds) - } - if wg != nil { - wg.Done() - } - }() -} diff --git a/frac/active_writer.go b/frac/active_writer.go new file mode 100644 index 00000000..a8cfe5c3 --- /dev/null +++ b/frac/active_writer.go @@ -0,0 +1,51 @@ +package frac + +import ( + "os" + "sync" + + "github.com/ozontech/seq-db/disk" + "github.com/ozontech/seq-db/metric/stopwatch" +) + +type ActiveWriter struct { + mu sync.Mutex // todo: remove this mutex on next release + docs *FileWriter + meta *FileWriter +} + +func NewActiveWriter(docsFile, metaFile *os.File, docsOffset, metaOffset int64, skipFsync bool) *ActiveWriter { + return &ActiveWriter{ + docs: NewFileWriter(docsFile, docsOffset, skipFsync), + meta: NewFileWriter(metaFile, metaOffset, skipFsync), + } +} + +func (a *ActiveWriter) Write(docs, meta []byte, sw *stopwatch.Stopwatch) error { + w := sw.Start("wait_lock") + a.mu.Lock() + defer a.mu.Unlock() + w.Stop() + + m := sw.Start("write_docs") + offset, err := a.docs.Write(docs, sw) + m.Stop() + + if err != nil { + return err + } + + disk.DocBlock(meta).SetExt1(uint64(len(docs))) + disk.DocBlock(meta).SetExt2(uint64(offset)) + + m = sw.Start("write_meta") + _, err = a.meta.Write(meta, sw) + m.Stop() + + return err +} + +func (a *ActiveWriter) Stop() { + a.docs.Stop() + a.meta.Stop() +} diff --git a/frac/file_writer.go b/frac/file_writer.go new file mode 100644 index 00000000..9ac5ab9c --- /dev/null +++ b/frac/file_writer.go @@ -0,0 +1,108 @@ +package frac + +import ( + "io" + "sync" + "sync/atomic" + + "github.com/ozontech/seq-db/metric/stopwatch" +) + +type writeSyncer interface { + io.WriterAt + Sync() error +} + +// FileWriter optimizes sequential writing and fsync calls for concurrent writers. +// +// The write offset is calculated strictly sequentially using atomic. After that, a request for fsync is sent. +// The request waits if fsync is being performed from previous requests. During this wait, other fsync +// requests may arrive that are also waiting for the previous one to complete. After that, a new fsync +// is performed, after which all requests receive a response about the successful (or unsuccessful) fsync. +// +// This results in one fsync system call for several writers performing a write at approximately the same time. +type FileWriter struct { + ws writeSyncer + offset atomic.Int64 + skipSync bool + + mu sync.Mutex + queue []chan error + notify chan struct{} + + wg sync.WaitGroup +} + +func NewFileWriter(ws writeSyncer, offset int64, skipSync bool) *FileWriter { + fs := &FileWriter{ + ws: ws, + skipSync: skipSync, + notify: make(chan struct{}, 1), + } + + fs.offset.Store(offset) + + fs.wg.Add(1) + go func() { + fs.syncLoop() + fs.wg.Done() + }() + + return fs +} + +func (fs *FileWriter) syncLoop() { + for range fs.notify { + fs.mu.Lock() + queue := fs.queue + fs.queue = make([]chan error, 0, len(queue)) + fs.mu.Unlock() + + err := fs.ws.Sync() + + for _, syncRes := range queue { + syncRes <- err + } + } +} + +func (fs *FileWriter) Write(data []byte, sw *stopwatch.Stopwatch) (int64, error) { + m := sw.Start("write_duration") + + dataLen := int64(len(data)) + offset := fs.offset.Add(dataLen) - dataLen + _, err := fs.ws.WriteAt(data, offset) + m.Stop() + + if err != nil { + return 0, err + } + + if fs.skipSync { + return offset, nil + } + + m = sw.Start("fsync") + + syncRes := make(chan error) + + fs.mu.Lock() + fs.queue = append(fs.queue, syncRes) + size := len(fs.queue) + fs.mu.Unlock() + + if size == 1 { + fs.notify <- struct{}{} + } + + err = <-syncRes + + m.Stop() + + return offset, err +} + +func (fs *FileWriter) Stop() { + close(fs.notify) + fs.wg.Wait() +} diff --git a/frac/file_writer_test.go b/frac/file_writer_test.go new file mode 100644 index 00000000..23977993 --- /dev/null +++ b/frac/file_writer_test.go @@ -0,0 +1,125 @@ +package frac + +import ( + "errors" + "math/rand/v2" + "strconv" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/ozontech/seq-db/metric/stopwatch" +) + +type testWriterSyncer struct { + mu sync.RWMutex + in [][]byte + out map[string]struct{} + pause time.Duration + err bool +} + +func (ws *testWriterSyncer) WriteAt(p []byte, _ int64) (n int, err error) { + ws.mu.Lock() + defer ws.mu.Unlock() + + ws.in = append(ws.in, p) + + return len(p), nil +} + +func (ws *testWriterSyncer) Sync() error { + time.Sleep(ws.pause) + + ws.mu.Lock() + defer ws.mu.Unlock() + + if ws.err { + ws.in = nil + return errors.New("test") + } + + for _, val := range ws.in { + ws.out[string(val)] = struct{}{} + } + ws.in = nil + + return nil +} + +func (ws *testWriterSyncer) Check(val []byte) bool { + ws.mu.RLock() + defer ws.mu.RUnlock() + _, ok := ws.out[string(val)] + return ok +} + +func TestFileWriter(t *testing.T) { + ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond} + fw := NewFileWriter(ws, 0, false) + + wg := sync.WaitGroup{} + for range 100 { + wg.Add(1) + go func() { + for range 100 { + sw := stopwatch.New() + k := []byte(strconv.FormatUint(rand.Uint64(), 16)) + _, err := fw.Write(k, sw) + assert.NoError(t, err) + assert.True(t, ws.Check(k)) + } + wg.Done() + }() + } + + wg.Wait() + fw.Stop() +} + +func TestFileWriterNoSync(t *testing.T) { + ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond} + fw := NewFileWriter(ws, 0, true) + + wg := sync.WaitGroup{} + for range 100 { + wg.Add(1) + go func() { + for range 100 { + sw := stopwatch.New() + k := []byte(strconv.FormatUint(rand.Uint64(), 16)) + _, err := fw.Write(k, sw) + assert.NoError(t, err) + assert.False(t, ws.Check(k)) + } + wg.Done() + }() + } + + wg.Wait() + fw.Stop() +} + +func TestFileWriterError(t *testing.T) { + ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond, err: true} + fw := NewFileWriter(ws, 0, false) + + wg := sync.WaitGroup{} + for range 100 { + wg.Add(1) + go func() { + for range 100 { + sw := stopwatch.New() + k := []byte(strconv.FormatUint(rand.Uint64(), 16)) + _, err := fw.Write(k, sw) + assert.Error(t, err) + assert.False(t, ws.Check(k)) + } + wg.Done() + }() + } + + wg.Wait() + fw.Stop() +} diff --git a/frac/fraction.go b/frac/fraction.go index 7fcc3bba..ba9a5456 100644 --- a/frac/fraction.go +++ b/frac/fraction.go @@ -23,7 +23,7 @@ type Fraction interface { Suicide() } -func fracToString(f Fraction, fracType string) string { +func FracToString(f Fraction, fracType string) string { info := f.Info() s := fmt.Sprintf( "%s fraction name=%s, creation time=%s, from=%s, to=%s, %s", diff --git a/frac/sealed.go b/frac/sealed.go index 90bd865b..93f718be 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -321,7 +321,7 @@ func (f *Sealed) close(hint string) { } func (f *Sealed) String() string { - return fracToString(f, "sealed") + return FracToString(f, "sealed") } func (f *Sealed) DataProvider(ctx context.Context) (DataProvider, func()) { diff --git a/fracmanager/fetcher_test.go b/fracmanager/fetcher_test.go index 8d8f16bf..73ec946c 100644 --- a/fracmanager/fetcher_test.go +++ b/fracmanager/fetcher_test.go @@ -25,9 +25,8 @@ func testFetcher(t *testing.T, fetcher *Fetcher, hasHint bool) { assert.NoError(t, err) dp := frac.NewDocProvider() addDummyDoc(t, fm, dp, seq.SimpleID(1)) - - fm.GetActiveFrac().WaitWriteIdle() - info := fm.GetActiveFrac().Info() + fm.WaitIdle() + info := fm.Active().Info() id := seq.IDSource{ ID: seq.SimpleID(1), @@ -48,9 +47,9 @@ func testFetcher(t *testing.T, fetcher *Fetcher, hasHint bool) { fm.WaitIdle() dp.TryReset() addDummyDoc(t, fm, dp, seq.SimpleID(2)) - fm.GetActiveFrac().WaitWriteIdle() + fm.WaitIdle() - info = fm.GetActiveFrac().Info() + info = fm.Active().Info() newID := seq.IDSource{ ID: seq.SimpleID(2), diff --git a/fracmanager/fracmanager.go b/fracmanager/fracmanager.go index b9be5766..7ec950d0 100644 --- a/fracmanager/fracmanager.go +++ b/fracmanager/fracmanager.go @@ -59,7 +59,15 @@ type fracRef struct { type activeRef struct { ref *fracRef // ref contains a back reference to the fraction in the slice - frac *frac.Active + frac *proxyFrac +} + +func (fm *FracManager) newActiveRef(active *frac.Active) activeRef { + frac := &proxyFrac{active: active, fp: fm.fracProvider} + return activeRef{ + frac: frac, + ref: &fracRef{instance: frac}, + } } func NewFracManager(config *Config) *FracManager { @@ -106,7 +114,7 @@ func (fm *FracManager) maintenance(sealWG, suicideWG *sync.WaitGroup) { logger.Debug("maintenance started") n := time.Now() - if fm.GetActiveFrac().Info().DocsOnDisk > fm.config.FracSize { + if fm.Active().Info().DocsOnDisk > fm.config.FracSize { active := fm.rotate() sealWG.Add(1) @@ -279,11 +287,19 @@ func (fm *FracManager) Start() { func (fm *FracManager) Load(ctx context.Context) error { var err error - var notSealed []activeRef l := NewLoader(fm.config, fm.fracProvider, fm.fracCache) - if fm.fracs, notSealed, err = l.load(ctx); err != nil { + actives, sealed, err := l.load() + if err != nil { + return err + } + + for _, s := range sealed { + fm.fracs = append(fm.fracs, &fracRef{instance: s}) + } + + if err := fm.replayAll(ctx, actives); err != nil { return err } @@ -297,36 +313,57 @@ func (fm *FracManager) Load(ctx context.Context) error { } } - if len(notSealed) == 0 { - fm.rotate() - } else { - if len(notSealed) > 1 { - logger.Info("sealing active fractions") - for _, active := range notSealed[:len(notSealed)-1] { - fm.seal(active) - } + if fm.active.ref == nil { // no active + _ = fm.rotate() // make new empty active + } + + return nil +} + +func (fm *FracManager) replayAll(ctx context.Context, actives []*frac.Active) error { + wg := sync.WaitGroup{} + defer wg.Wait() + + for i, a := range actives { + if err := a.Replay(ctx); err != nil { + return err + } + if a.Info().DocsTotal == 0 { + a.Suicide() // remove empty + continue + } + r := fm.newActiveRef(a) + fm.fracs = append(fm.fracs, r.ref) + + if i == len(actives)-1 { // last and not empty + fm.active = r + } else { + wg.Wait() // wait previous sealing complete + + wg.Add(1) + go func() { + fm.seal(r) + wg.Done() + }() } - fm.active = notSealed[len(notSealed)-1] } return nil } func (fm *FracManager) Append(ctx context.Context, docs, metas disk.DocBlock) error { + var err error for { select { case <-ctx.Done(): return ctx.Err() default: - if err := fm.GetActiveFrac().Append(docs, metas); err != nil { // can get fail if fraction already sealed - logger.Info("append fail", zap.Error(err)) - continue + if err = fm.Writer().Append(docs, metas); err == nil { + return nil } + logger.Info("append fail", zap.Error(err)) // can get fail if fraction already sealed } - break } - - return nil } var ( @@ -349,23 +386,17 @@ func (fm *FracManager) seal(activeRef activeRef) { sealsDoneSeconds.Observe(time.Since(now).Seconds()) }() - preloaded, err := activeRef.frac.Seal(fm.config.SealParams) + sealed, err := activeRef.frac.Seal(fm.config.SealParams) if err != nil { - logger.Panic("sealing error", zap.Error(err)) + logger.Fatal("sealing error", zap.Error(err)) } - sealed := fm.fracProvider.NewSealedPreloaded(activeRef.frac.BaseFileName, preloaded) - - stats := sealed.Info() - fm.fracCache.AddFraction(stats.Name(), stats) + info := sealed.Info() + fm.fracCache.AddFraction(info.Name(), info) fm.fracMu.Lock() activeRef.ref.instance = sealed fm.fracMu.Unlock() - - if err := activeRef.frac.Release(sealed); err != nil { - logger.Fatal("failed to release active fraction", zap.Error(err)) - } } func (fm *FracManager) rotate() activeRef { @@ -373,7 +404,7 @@ func (fm *FracManager) rotate() activeRef { baseFilePath := filepath.Join(fm.config.DataDir, filePath) logger.Info("creating new fraction", zap.String("filepath", baseFilePath)) - next := fm.fracProvider.newActiveRef(fm.fracProvider.NewActive(baseFilePath)) + next := fm.newActiveRef(fm.fracProvider.NewActive(baseFilePath)) fm.fracMu.Lock() prev := fm.active @@ -384,9 +415,8 @@ func (fm *FracManager) rotate() activeRef { return prev } -func (fm *FracManager) shouldSealOnExit(active *frac.Active) bool { - minSize := float64(fm.config.FracSize) * consts.SealOnExitFracSizePercent / 100 - return active.Info().FullSize() > uint64(minSize) +func (fm *FracManager) minFracSizeToSeal() uint64 { + return fm.config.FracSize * consts.SealOnExitFracSizePercent / 100 } func (fm *FracManager) Stop() { @@ -397,18 +427,35 @@ func (fm *FracManager) Stop() { fm.mntcWG.Wait() fm.cacheWG.Wait() - n := fm.active.frac.Info().Name() - s := uint64(util.SizeToUnit(fm.active.frac.Info().FullSize(), "mb")) + needSealing := false + status := "frac too small to be sealed" + + info := fm.active.frac.Info() + if info.FullSize() > fm.minFracSizeToSeal() { + needSealing = true + status = "need seal active fraction before exit" + } + + logger.Info( + "sealing on exit", + zap.String("status", status), + zap.String("frac", info.Name()), + zap.Uint64("fill_size_mb", uint64(util.SizeToUnit(info.FullSize(), "mb"))), + ) - if fm.shouldSealOnExit(fm.active.frac) { - logger.Info("start sealing fraction on exit", zap.String("frac", n), zap.Uint64("fill_size_mb", s)) + if needSealing { fm.seal(fm.active) - } else { - logger.Info("frac too small to be sealed on exit", zap.String("frac", n), zap.Uint64("fill_size_mb", s)) } } -func (fm *FracManager) GetActiveFrac() *frac.Active { +func (fm *FracManager) Writer() *proxyFrac { + fm.fracMu.RLock() + defer fm.fracMu.RUnlock() + + return fm.active.frac +} + +func (fm *FracManager) Active() frac.Fraction { fm.fracMu.RLock() defer fm.fracMu.RUnlock() @@ -416,7 +463,7 @@ func (fm *FracManager) GetActiveFrac() *frac.Active { } func (fm *FracManager) WaitIdle() { - fm.GetActiveFrac().WaitWriteIdle() + fm.Writer().WaitWriteIdle() } func (fm *FracManager) setMature() { diff --git a/fracmanager/fracmanager_test.go b/fracmanager/fracmanager_test.go index d092d88b..b8718577 100644 --- a/fracmanager/fracmanager_test.go +++ b/fracmanager/fracmanager_test.go @@ -34,18 +34,15 @@ func addDummyDoc(t *testing.T, fm *FracManager, dp *frac.DocProvider, seqID seq. func MakeSomeFractions(t *testing.T, fm *FracManager) { dp := frac.NewDocProvider() addDummyDoc(t, fm, dp, seq.SimpleID(1)) - fm.GetActiveFrac().WaitWriteIdle() fm.seal(fm.rotate()) dp.TryReset() addDummyDoc(t, fm, dp, seq.SimpleID(2)) - fm.GetActiveFrac().WaitWriteIdle() fm.seal(fm.rotate()) dp.TryReset() addDummyDoc(t, fm, dp, seq.SimpleID(3)) - fm.GetActiveFrac().WaitWriteIdle() } func TestCleanUp(t *testing.T) { @@ -72,10 +69,11 @@ func TestCleanUp(t *testing.T) { second := fm.fracs[1].instance.(*frac.Sealed) second.PartialSuicideMode = frac.HalfRemove second.Suicide() + info := fm.active.frac.Info() + shouldSealOnExit := info.FullSize() > fm.minFracSizeToSeal() - shouldSealOnExit := fm.shouldSealOnExit(fm.active.frac) fm.Stop() - if shouldSealOnExit && fm.active.frac.Info().DocsTotal > 0 { + if shouldSealOnExit && info.DocsTotal > 0 { t.Error("active fraction should be empty after rotation and sealing") } @@ -119,7 +117,6 @@ func TestMatureMode(t *testing.T) { addDummyDoc(t, fm, dp, seq.SimpleID(id)) id++ } - fm.GetActiveFrac().WaitWriteIdle() fm.seal(fm.rotate()) dp.TryReset() } diff --git a/fracmanager/fraction_provider.go b/fracmanager/fraction_provider.go index f9a1e339..6e067e0c 100644 --- a/fracmanager/fraction_provider.go +++ b/fracmanager/fraction_provider.go @@ -16,18 +16,18 @@ var storeBytesRead = promauto.NewCounter(prometheus.CounterOpts{ type fractionProvider struct { config *frac.Config cacheProvider *CacheMaintainer - indexWorkers *frac.IndexWorkers + activeIndexer *frac.ActiveIndexer readLimiter *disk.ReadLimiter } func newFractionProvider(c *frac.Config, cp *CacheMaintainer, readerWorkers, indexWorkers int) *fractionProvider { - iw := frac.NewIndexWorkers(indexWorkers, indexWorkers) - iw.Start() // first start indexWorkers to allow active frac replaying + ai := frac.NewActiveIndexer(indexWorkers, indexWorkers) + ai.Start() // first start indexWorkers to allow active frac replaying return &fractionProvider{ config: c, cacheProvider: cp, - indexWorkers: iw, + activeIndexer: ai, readLimiter: disk.NewReadLimiter(readerWorkers, storeBytesRead), } } @@ -35,7 +35,7 @@ func newFractionProvider(c *frac.Config, cp *CacheMaintainer, readerWorkers, ind func (fp *fractionProvider) NewActive(name string) *frac.Active { return frac.NewActive( name, - fp.indexWorkers, + fp.activeIndexer, fp.readLimiter, fp.cacheProvider.CreateDocBlockCache(), fp.cacheProvider.CreateSortDocsCache(), @@ -66,12 +66,5 @@ func (fp *fractionProvider) NewSealedPreloaded(name string, preloadedData *frac. } func (fp *fractionProvider) Stop() { - fp.indexWorkers.Stop() -} - -func (fp *fractionProvider) newActiveRef(active *frac.Active) activeRef { - return activeRef{ - frac: active, - ref: &fracRef{instance: active}, - } + fp.activeIndexer.Stop() } diff --git a/fracmanager/loader.go b/fracmanager/loader.go index 7b7cef6a..5e365e13 100644 --- a/fracmanager/loader.go +++ b/fracmanager/loader.go @@ -1,7 +1,6 @@ package fracmanager import ( - "context" "fmt" "os" "path/filepath" @@ -44,7 +43,7 @@ func NewLoader(config *Config, fracProvider *fractionProvider, fracCache *sealed } } -func (l *loader) load(ctx context.Context) ([]*fracRef, []activeRef, error) { +func (l *loader) load() ([]*frac.Active, []*frac.Sealed, error) { fracIDs, infos := l.makeInfos(l.getFileList()) sort.Strings(fracIDs) @@ -58,7 +57,7 @@ func (l *loader) load(ctx context.Context) ([]*fracRef, []activeRef, error) { infosList := l.filterInfos(fracIDs, infos) cnt := len(infosList) - fracs := make([]*fracRef, 0, cnt) + fracs := make([]*frac.Sealed, 0, cnt) actives := make([]*frac.Active, 0) diskFracCache := NewFracCacheFromDisk(filepath.Join(l.config.DataDir, consts.FracCacheFileSuffix)) @@ -73,13 +72,13 @@ func (l *loader) load(ctx context.Context) ([]*fracRef, []activeRef, error) { removeFile(info.base + consts.DocsFileSuffix) } sealed := l.loadSealedFrac(diskFracCache, info) - fracs = append(fracs, &fracRef{instance: sealed}) + fracs = append(fracs, sealed) } else { if info.hasMeta { actives = append(actives, l.fracProvider.NewActive(info.base)) } else { sealed := l.loadSealedFrac(diskFracCache, info) - fracs = append(fracs, &fracRef{instance: sealed}) + fracs = append(fracs, sealed) } } @@ -97,22 +96,7 @@ func (l *loader) load(ctx context.Context) ([]*fracRef, []activeRef, error) { logger.Info("fractions list created", zap.Int("cached", l.cachedFracs), zap.Int("uncached", l.uncachedFracs)) - logger.Info("replaying active fractions", zap.Int("count", len(actives))) - notSealed := make([]activeRef, 0) - for _, a := range actives { - if err := a.ReplayBlocks(ctx); err != nil { - return nil, nil, fmt.Errorf("while replaying blocks: %w", err) - } - if a.Info().DocsTotal == 0 { // skip empty - removeFractionFiles(a.BaseFileName) - continue - } - activeRef := l.fracProvider.newActiveRef(a) - fracs = append(fracs, activeRef.ref) - notSealed = append(notSealed, activeRef) - } - - return fracs, notSealed, nil + return actives, fracs, nil } func (l *loader) loadSealedFrac(diskFracCache *sealedFracCache, info *fracInfo) *frac.Sealed { diff --git a/fracmanager/proxy_frac.go b/fracmanager/proxy_frac.go new file mode 100644 index 00000000..5d73097e --- /dev/null +++ b/fracmanager/proxy_frac.go @@ -0,0 +1,179 @@ +package fracmanager + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/metric" + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/util" + "go.uber.org/zap" +) + +/** + * Possible states (only 4): + * -------------------------------------------------------- + * | | f.active | f.sealed | f.readonly | + * -------------------------------------------------------- + * | Active & Writable | value | nil | false | + * -------------------------------------------------------- + * | Sealing | value | nil | true | + * -------------------------------------------------------- + * | Sealed | nil | value | --- | + * -------------------------------------------------------- + * | Suicided | nil | nil | --- | + * -------------------------------------------------------- + * All other states are impossible. + */ + +type proxyFrac struct { + fp *fractionProvider + + useMu sync.RWMutex + active *frac.Active + sealed *frac.Sealed + readonly bool + + indexWg sync.WaitGroup + sealWg sync.WaitGroup +} + +func (f *proxyFrac) cur() frac.Fraction { + f.useMu.RLock() + defer f.useMu.RUnlock() + + if f.sealed == nil { + return f.active + } + return f.sealed +} + +func (f *proxyFrac) IsIntersecting(from seq.MID, to seq.MID) bool { + return f.cur().IsIntersecting(from, to) +} + +func (f *proxyFrac) Contains(mid seq.MID) bool { + return f.cur().Contains(mid) +} + +func (f *proxyFrac) Info() *frac.Info { + return f.cur().Info() +} + +func (f *proxyFrac) DataProvider(ctx context.Context) (frac.DataProvider, func()) { + f.useMu.RLock() + defer f.useMu.RUnlock() + + if f.active != nil { + return f.active.DataProvider(ctx) + } + + if f.sealed != nil { + metric.CountersTotal.WithLabelValues("use_sealed_from_active").Inc() + return f.sealed.DataProvider(ctx) + } + + return frac.EmptyDataProvider{}, func() {} +} + +func (f *proxyFrac) Append(docs, meta []byte) error { + f.useMu.RLock() + if !f.isActiveState() { + f.useMu.RUnlock() + return errors.New("fraction is not writable") + } + active := f.active + f.indexWg.Add(1) // It's important to put wg.Add() inside a lock, otherwise we might call WaitWriteIdle() before it + f.useMu.RUnlock() + + active.Append(docs, meta, &f.indexWg) + return nil +} + +func (f *proxyFrac) WaitWriteIdle() { + start := time.Now() + logger.Info("waiting fraction to stop write...", zap.String("name", f.active.BaseFileName)) + f.indexWg.Wait() + waitTime := util.DurationToUnit(time.Since(start), "s") + logger.Info("write is stopped", zap.String("name", f.active.BaseFileName), zap.Float64("time_wait_s", waitTime)) +} + +func (f *proxyFrac) Seal(params frac.SealParams) (*frac.Sealed, error) { + f.useMu.Lock() + if !f.isActiveState() { + f.useMu.Unlock() + return nil, errors.New("fraction is not active") + } + f.readonly = true + active := f.active + f.sealWg.Add(1) // It's important to put wg.Add() inside a lock, otherwise we might call wg.Wait() before it + f.useMu.Unlock() + + f.WaitWriteIdle() + + preloaded, err := frac.Seal(active, params) + if err != nil { + return nil, err + } + + sealed := f.fp.NewSealedPreloaded(active.BaseFileName, preloaded) + + f.useMu.Lock() + f.sealed = sealed + f.active = nil + f.useMu.Unlock() + + f.sealWg.Done() + + active.Release() + + return sealed, nil +} + +// trySetSuicided set suicided state if possible (if not sealing right now) +func (f *proxyFrac) trySetSuicided() (*frac.Active, *frac.Sealed, bool) { + f.useMu.Lock() + defer f.useMu.Unlock() + + sealed := f.sealed + active := f.active + + sealing := f.isSealingState() + + if !sealing { + f.sealed = nil + f.active = nil + } + + return active, sealed, sealing +} + +func (f *proxyFrac) Suicide() { + active, sealed, sealing := f.trySetSuicided() + if sealing { + f.sealWg.Wait() + // we can get `sealing` == true only once here + // next attempt after Wait() should be successful + active, sealed, _ = f.trySetSuicided() + } + + if active != nil { + active.Suicide() + } + + if sealed != nil { + sealed.Suicide() + } +} + +func (f *proxyFrac) isActiveState() bool { + return f.active != nil && f.sealed == nil && !f.readonly +} + +func (f *proxyFrac) isSealingState() bool { + return f.active != nil && f.sealed == nil && f.readonly +} diff --git a/fracmanager/sealed_frac_cache_test.go b/fracmanager/sealed_frac_cache_test.go index 150aa271..4f165324 100644 --- a/fracmanager/sealed_frac_cache_test.go +++ b/fracmanager/sealed_frac_cache_test.go @@ -258,9 +258,9 @@ func TestUnusedFractionsCleanup(t *testing.T) { } func rotateAndSeal(fm *FracManager) frac.Fraction { - active3 := fm.rotate() - fm.seal(active3) - return active3.ref.instance + active := fm.rotate() + fm.seal(active) + return active.ref.instance } func TestFracInfoSavedToCache(t *testing.T) { @@ -289,7 +289,6 @@ func TestFracInfoSavedToCache(t *testing.T) { for totalSize < maxSize { addDummyDoc(t, fm, dp, seq.SimpleID(cnt)) cnt++ - fm.GetActiveFrac().WaitWriteIdle() fracInstance := rotateAndSeal(fm) totalSize += fracInstance.Info().FullSize() info := fracInstance.Info() @@ -373,7 +372,6 @@ func TestExtraFractionsRemoved(t *testing.T) { for i := 1; i < times+1; i++ { addDummyDoc(t, fm, dp, seq.SimpleID(i)) - fm.GetActiveFrac().WaitWriteIdle() fracInstance := rotateAndSeal(fm) info := fracInstance.Info() q.Add(item{ @@ -431,7 +429,6 @@ func TestMissingCacheFilesDeleted(t *testing.T) { for i := 1; i < times+1; i++ { addDummyDoc(t, fm, dp, seq.SimpleID(i)) - fm.GetActiveFrac().WaitWriteIdle() rotateAndSeal(fm) dp.TryReset() } diff --git a/frac/sealer_test.go b/fracmanager/sealer_test.go similarity index 58% rename from frac/sealer_test.go rename to fracmanager/sealer_test.go index 407f5e3c..516f86cb 100644 --- a/frac/sealer_test.go +++ b/fracmanager/sealer_test.go @@ -1,4 +1,4 @@ -package frac +package fracmanager import ( "bufio" @@ -6,39 +6,39 @@ import ( "os" "path/filepath" "strconv" + "sync" "testing" + "time" insaneJSON "github.com/ozontech/insane-json" "github.com/stretchr/testify/assert" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/disk" + "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/tests/common" ) -func fillActiveFraction(active *Active) error { +func fillActiveFraction(active *frac.Active, wg *sync.WaitGroup) error { const muliplier = 10 docRoot := insaneJSON.Spawn() defer insaneJSON.Release(docRoot) - dp := NewDocProvider() + dp := frac.NewDocProvider() file, err := os.Open(filepath.Join(common.TestDataDir, "k8s.logs")) if err != nil { return err } defer file.Close() - for i := 0; i < muliplier; i++ { dp.TryReset() - _, err := file.Seek(0, io.SeekStart) if err != nil { return err } - scanner := bufio.NewScanner(file) for scanner.Scan() { doc := scanner.Bytes() @@ -46,10 +46,12 @@ func fillActiveFraction(active *Active) error { if err != nil { return err } - dp.Append(doc, docRoot, seq.SimpleID(0), nil) + tokens := seq.Tokens("_all_:", "service:100500", "k8s_pod:"+strconv.Itoa(i)) + dp.Append(doc, docRoot, seq.SimpleID(0), tokens) } docs, metas := dp.Provide() - if err := active.Append(docs, metas); err != nil { + wg.Add(1) + if err := active.Append(docs, metas, wg); err != nil { return err } } @@ -57,23 +59,36 @@ func fillActiveFraction(active *Active) error { return nil } +func getCacheMaintainer() (*CacheMaintainer, func()) { + done := make(chan struct{}) + cm := NewCacheMaintainer(32*consts.MB, 24*consts.MB, nil) + wg := cm.RunCleanLoop(done, time.Second, time.Second) + return cm, func() { + close(done) + wg.Wait() + } +} + func BenchmarkSealing(b *testing.B) { b.ResetTimer() b.StopTimer() b.ReportAllocs() + cm, stopFn := getCacheMaintainer() + defer stopFn() + dataDir := filepath.Join(b.TempDir(), "BenchmarkSealing") common.RecreateDir(dataDir) readLimiter := disk.NewReadLimiter(1, nil) - indexWorkers := NewIndexWorkers(10, 10) + activeIndexer := frac.NewActiveIndexer(10, 10) - indexWorkers.Start() - defer indexWorkers.Stop() + activeIndexer.Start() + defer activeIndexer.Stop() const minZstdLevel = -5 - defaultSealParams := SealParams{ + defaultSealParams := frac.SealParams{ IDsZstdLevel: minZstdLevel, LIDsZstdLevel: minZstdLevel, TokenListZstdLevel: minZstdLevel, @@ -83,16 +98,23 @@ func BenchmarkSealing(b *testing.B) { DocBlockSize: consts.MB * 4, } for i := 0; i < b.N; i++ { - - active := NewActive(filepath.Join(dataDir, "test_"+strconv.Itoa(i)), indexWorkers, readLimiter, nil, nil, &Config{}) - err := fillActiveFraction(active) + wg := sync.WaitGroup{} + active := frac.NewActive( + filepath.Join(dataDir, "test_"+strconv.Itoa(i)), + activeIndexer, + readLimiter, + cm.CreateDocBlockCache(), + cm.CreateSortDocsCache(), + &frac.Config{}, + ) + err := fillActiveFraction(active, &wg) assert.NoError(b, err) - active.WaitWriteIdle() + wg.Wait() active.GetAllDocuments() // emulate search-pre-sorted LIDs b.StartTimer() - _, err = active.Seal(defaultSealParams) + _, err = frac.Seal(active, defaultSealParams) assert.NoError(b, err) b.StopTimer() diff --git a/metric/store.go b/metric/store.go index 05b3bc69..92b4e86c 100644 --- a/metric/store.go +++ b/metric/store.go @@ -161,13 +161,6 @@ var ( Help: "", Buckets: prometheus.ExponentialBuckets(1, 4, 16), }) - BulkStagesSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "seq_db_store", - Subsystem: "bulk", - Name: "stages_seconds", - Help: "", - Buckets: SecondsBuckets, - }, []string{"stage"}) SearchInFlightQueriesTotal = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "seq_db_store", diff --git a/storeapi/grpc_v1.go b/storeapi/grpc_v1.go index 67e4c90f..94697363 100644 --- a/storeapi/grpc_v1.go +++ b/storeapi/grpc_v1.go @@ -121,7 +121,7 @@ func NewGrpcV1(config APIConfig, fracManager *fracmanager.FracManager, mappingPr func (g *GrpcV1) bulkStats() { for { - stats := g.fracManager.GetActiveFrac().Info() + stats := g.fracManager.Active().Info() if stats.Name() == "" { time.Sleep(time.Second * 5) continue @@ -131,7 +131,7 @@ func (g *GrpcV1) bulkStats() { fracName := stats.Name() time.Sleep(time.Second * 5) if g.bulkData.batches.Load() > 0 { - stats = g.fracManager.GetActiveFrac().Info() + stats = g.fracManager.Active().Info() if fracName != stats.Name() { continue } diff --git a/util/batchan.go b/util/batchan.go deleted file mode 100644 index f5a33308..00000000 --- a/util/batchan.go +++ /dev/null @@ -1,46 +0,0 @@ -package util - -import "sync" - -// Batchan works as a batching channel. It allows to Send single items, and Fetch all accumulated so far -// Batchan supports multiple writers and multiple readers at the same time -type Batchan[V any] struct { - out []V - mu sync.Mutex - notify chan struct{} -} - -func NewBatchan[V any]() *Batchan[V] { - return &Batchan[V]{ - notify: make(chan struct{}, 1), - } -} - -// Send item into Batchan -func (b *Batchan[V]) Send(v V) { - b.mu.Lock() - defer b.mu.Unlock() - b.out = append(b.out, v) - if len(b.out) == 1 { - b.notify <- struct{}{} - } -} - -// Close Batchan as to inform fetching end that no more items are coming -func (b *Batchan[V]) Close() { - close(b.notify) -} - -// Fetch all items that were sent to the moment and weren't fetched before -// buf will replace inner buffer and shouldn't be reused once given up to Batchan -// Always returns non-empty slice unless closed -// Blocks if no item is available, but Batchan wasn't closed -// Empty returned slice indicates closed Batchan -func (b *Batchan[V]) Fetch(buf []V) []V { - _, _ = <-b.notify - b.mu.Lock() - defer b.mu.Unlock() - res := b.out - b.out = buf[:0] - return res -} diff --git a/util/batchan_test.go b/util/batchan_test.go deleted file mode 100644 index 1a72aa32..00000000 --- a/util/batchan_test.go +++ /dev/null @@ -1,104 +0,0 @@ -package util - -import ( - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func join[V any](b *Batchan[V]) []V { - var res []V - var buf []V - for { - buf = b.Fetch(buf) - time.Sleep(1 * time.Millisecond) - if len(buf) == 0 { - break - } - res = append(res, buf...) - } - return res -} - -func TestBatchan(t *testing.T) { - b := NewBatchan[byte]() - go func() { - b.Send('a') - b.Send('b') - b.Send('c') - b.Send('d') - time.Sleep(2 * time.Millisecond) - b.Send('e') - b.Send('f') - time.Sleep(20 * time.Millisecond) - b.Send('g') - time.Sleep(20 * time.Millisecond) - b.Send('h') - for i := byte(0); i < 10; i++ { - time.Sleep(250 * time.Microsecond) - b.Send('0' + i) - } - b.Send('x') - b.Send('y') - b.Send('z') - b.Close() - }() - assert.Equal(t, string(join(b)), "abcdefgh0123456789xyz") -} - -func stress(t *testing.T, writers, values, readers int) { - b := NewBatchan[int]() - res := make(chan []int, readers) - var readersWg sync.WaitGroup - readersWg.Add(readers) - for i := 0; i < readers; i++ { - go func() { - res <- join(b) - readersWg.Done() - }() - } - var writersWg sync.WaitGroup - writersWg.Add(writers) - for i := 0; i < writers; i++ { - ii := i - go func() { - for j := 0; j < values; j++ { - b.Send(j + ii*values) - } - writersWg.Done() - }() - } - cnt := make([]int, writers*values) - writersWg.Wait() - b.Close() - readersWg.Wait() - close(res) - for i := range res { - for _, j := range i { - cnt[j]++ - } - } - for _, i := range cnt { - if i != 1 { - t.FailNow() - } - } -} - -func TestStressSimple(t *testing.T) { - stress(t, 1, 1, 1) -} - -func TestStressSmall(t *testing.T) { - stress(t, 2, 1000, 2) -} - -func TestStressBig(t *testing.T) { - stress(t, 100, 10000, 100) -} - -func TestStressLong(t *testing.T) { - stress(t, 1, 1000000, 1) -}