Skip to content

Commit 05e7821

Browse files
committed
refactor(fracmanager): using fifo queues of fractions
1 parent 358ef8e commit 05e7821

27 files changed

+1363
-1463
lines changed

cmd/seq-db/seq-db.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ func startStore(
259259
MaintenanceDelay: 0,
260260
CacheGCDelay: 0,
261261
CacheCleanupDelay: 0,
262+
MinSealFracSize: uint64(cfg.Storage.TotalSize) * consts.DefaultMinSealPercent / 100,
262263
SealParams: common.SealParams{
263264
IDsZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
264265
LIDsZstdLevel: cfg.Compression.SealedZstdCompressionLevel,

frac/active.go

Lines changed: 21 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,6 @@ type Active struct {
3535

3636
BaseFileName string
3737

38-
useMu sync.RWMutex
39-
suicided bool
40-
released bool
41-
4238
infoMu sync.RWMutex
4339
info *common.Info
4440

@@ -269,40 +265,18 @@ func (f *Active) String() string {
269265
}
270266

271267
func (f *Active) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) {
272-
dp, release := f.DataProvider(ctx)
273-
defer release()
274-
if dp == nil {
275-
return EmptyFraction.Fetch(ctx, ids)
268+
if f.Info().DocsTotal == 0 { // it is empty active fraction state
269+
return nil, nil
276270
}
277-
return dp.Fetch(ids)
271+
return f.createDataProvider(ctx).Fetch(ids)
278272
}
279273

280274
func (f *Active) Search(ctx context.Context, params processor.SearchParams) (*seq.QPR, error) {
281-
dp, release := f.DataProvider(ctx)
282-
defer release()
283-
if dp == nil {
284-
return EmptyFraction.Search(ctx, params)
285-
}
286-
return dp.Search(params)
287-
}
288-
289-
func (f *Active) DataProvider(ctx context.Context) (*activeDataProvider, func()) {
290-
f.useMu.RLock()
291-
292-
if f.suicided || f.released || f.Info().DocsTotal == 0 { // it is empty active fraction state
293-
if f.suicided {
294-
metric.CountersTotal.WithLabelValues("fraction_suicided").Inc()
295-
}
296-
f.useMu.RUnlock()
297-
return nil, func() {}
298-
}
299-
300-
// it is ordinary active fraction state
301-
dp := f.createDataProvider(ctx)
302-
return dp, func() {
303-
dp.release()
304-
f.useMu.RUnlock()
275+
if f.Info().DocsTotal == 0 { // it is empty active fraction state
276+
metric.CountersTotal.WithLabelValues("empty_data_provider").Inc()
277+
return &seq.QPR{Aggs: make([]seq.AggregatableSamples, len(params.AggQ))}, nil
305278
}
279+
return f.createDataProvider(ctx).Search(params)
306280
}
307281

308282
func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider {
@@ -338,10 +312,6 @@ func (f *Active) IsIntersecting(from, to seq.MID) bool {
338312
}
339313

340314
func (f *Active) Release() {
341-
f.useMu.Lock()
342-
f.released = true
343-
f.useMu.Unlock()
344-
345315
f.releaseMem()
346316

347317
if !f.Config.KeepMetaFile {
@@ -354,33 +324,11 @@ func (f *Active) Release() {
354324
}
355325
}
356326

357-
// Offload for [Active] fraction is no-op.
358-
//
359-
// Since search within [Active] fraction is too costly (we have to replay the whole index in memory),
360-
// we decided to support offloading only for [Sealed] fractions.
361-
func (f *Active) Offload(context.Context, storage.Uploader) (bool, error) {
362-
return false, nil
363-
}
364-
365327
func (f *Active) Suicide() {
366-
f.useMu.Lock()
367-
released := f.released
368-
f.suicided = true
369-
f.released = true
370-
f.useMu.Unlock()
371-
372-
if released { // fraction can be suicided after release
373-
if f.Config.KeepMetaFile {
374-
f.removeMetaFile() // meta was not removed while release
375-
}
376-
if f.Config.SkipSortDocs {
377-
f.removeDocsFiles() // docs was not removed while release
378-
}
379-
} else { // was not release
380-
f.releaseMem()
381-
f.removeMetaFile()
382-
f.removeDocsFiles()
383-
}
328+
f.releaseMem()
329+
f.removeMetaFile()
330+
f.removeDocsFiles()
331+
f.removeSdocsFiles()
384332
}
385333

386334
func (f *Active) releaseMem() {
@@ -393,6 +341,9 @@ func (f *Active) releaseMem() {
393341
if err := f.metaFile.Close(); err != nil {
394342
logger.Error("can't close meta file", zap.String("frac", f.BaseFileName), zap.Error(err))
395343
}
344+
if err := f.docsFile.Close(); err != nil {
345+
logger.Error("can't close docs file", zap.String("frac", f.BaseFileName), zap.Error(err))
346+
}
396347

397348
f.RIDs = nil
398349
f.MIDs = nil
@@ -401,14 +352,18 @@ func (f *Active) releaseMem() {
401352
}
402353

403354
func (f *Active) removeDocsFiles() {
404-
if err := f.docsFile.Close(); err != nil {
405-
logger.Error("can't close docs file", zap.String("frac", f.BaseFileName), zap.Error(err))
406-
}
407355
if err := os.Remove(f.docsFile.Name()); err != nil {
408356
logger.Error("can't delete docs file", zap.String("frac", f.BaseFileName), zap.Error(err))
409357
}
410358
}
411359

360+
func (f *Active) removeSdocsFiles() {
361+
name := f.BaseFileName + consts.SdocsFileSuffix
362+
if err := os.Remove(name); err != nil && !os.IsNotExist(err) {
363+
logger.Error("can't delete docs file", zap.String("frac", f.BaseFileName), zap.Error(err))
364+
}
365+
}
366+
412367
func (f *Active) removeMetaFile() {
413368
if err := os.Remove(f.metaFile.Name()); err != nil {
414369
logger.Error("can't delete metas file", zap.String("frac", f.BaseFileName), zap.Error(err))

frac/active_indexer.go

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ type ActiveIndexer struct {
1818
ch chan *indexTask
1919
chMerge chan *mergeTask
2020
workerCount int
21-
22-
stopFn func()
2321
}
2422

2523
type indexTask struct {
@@ -34,12 +32,14 @@ type mergeTask struct {
3432
tokenLIDs *TokenLIDs
3533
}
3634

37-
func NewActiveIndexer(workerCount, chLen int) *ActiveIndexer {
38-
return &ActiveIndexer{
35+
func NewActiveIndexer(workerCount, chLen int) (*ActiveIndexer, func()) {
36+
idx := ActiveIndexer{
3937
ch: make(chan *indexTask, chLen),
4038
chMerge: make(chan *mergeTask, chLen),
4139
workerCount: workerCount,
4240
}
41+
stopIdx := idx.start()
42+
return &idx, stopIdx
4343
}
4444

4545
func (ai *ActiveIndexer) Index(frac *Active, metas []byte, wg *sync.WaitGroup, sw *stopwatch.Stopwatch) {
@@ -53,7 +53,7 @@ func (ai *ActiveIndexer) Index(frac *Active, metas []byte, wg *sync.WaitGroup, s
5353
m.Stop()
5454
}
5555

56-
func (ai *ActiveIndexer) Start() {
56+
func (ai *ActiveIndexer) start() func() {
5757
wg := sync.WaitGroup{}
5858
wg.Add(ai.workerCount)
5959

@@ -72,13 +72,10 @@ func (ai *ActiveIndexer) Start() {
7272
}()
7373
}
7474

75-
ai.stopFn = func() {
75+
return func() {
7676
close(ai.ch)
7777
close(ai.chMerge)
78-
7978
wg.Wait()
80-
81-
ai.stopFn = nil
8279
}
8380
}
8481

@@ -88,12 +85,6 @@ func (ai *ActiveIndexer) mergeWorker() {
8885
}
8986
}
9087

91-
func (ai *ActiveIndexer) Stop() {
92-
if ai.stopFn != nil {
93-
ai.stopFn()
94-
}
95-
}
96-
9788
var metaDataPool = sync.Pool{
9889
New: func() any {
9990
return new(indexer.MetaData)

frac/empty.go

Lines changed: 0 additions & 50 deletions
This file was deleted.

frac/fraction.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/ozontech/seq-db/frac/processor"
1414
"github.com/ozontech/seq-db/metric"
1515
"github.com/ozontech/seq-db/seq"
16-
"github.com/ozontech/seq-db/storage"
1716
)
1817

1918
type Fraction interface {
@@ -22,8 +21,6 @@ type Fraction interface {
2221
Contains(mid seq.MID) bool
2322
Fetch(context.Context, []seq.ID) ([][]byte, error)
2423
Search(context.Context, processor.SearchParams) (*seq.QPR, error)
25-
Offload(ctx context.Context, u storage.Uploader) (bool, error)
26-
Suicide()
2724
}
2825

2926
var (

0 commit comments

Comments
 (0)