Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ func InMemoryOnlyConfig() Config {
indexConfig := index.InMemoryOnlyConfig()
return defaultConfig(indexConfig)
}

func InMemoryFromBackup(path string) Config {
indexConfig := index.InMemoryOnlyConfig()
indexConfig.SnapshotDirectoryFunc = func() index.Directory {
return index.NewFileSystemDirectory(path)
}
return defaultConfig(indexConfig)
}

func DefaultConfigWithDirectory(df func() index.Directory) Config {
indexConfig := index.DefaultConfigWithDirectory(df)
return defaultConfig(indexConfig)
Expand Down
2 changes: 2 additions & 0 deletions index/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Config struct {
DirectoryFunc func() Directory
NormCalc func(string, int) float32

SnapshotDirectoryFunc func() Directory

MergeBufferSize int

// Optimizations
Expand Down
4 changes: 2 additions & 2 deletions index/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (s *Writer) executeMergeTask(merges chan *segmentMerge, task *mergeplan.Mer
return fmt.Errorf("merging failed: %v", err)
}

seg, err = s.loadSegment(newSegmentID, s.segPlugin)
seg, err = s.loadSegment(newSegmentID, s.segPlugin, s.directory)
if err != nil {
atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1)
return err
Expand Down Expand Up @@ -313,7 +313,7 @@ func (s *Writer) mergeSegmentBases(merges chan *segmentMerge, snapshot *Snapshot
return nil, 0, err
}

seg, err := s.loadSegment(newSegmentID, s.segPlugin)
seg, err := s.loadSegment(newSegmentID, s.segPlugin, s.directory)
if err != nil {
atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
return nil, 0, err
Expand Down
2 changes: 1 addition & 1 deletion index/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func (s *Writer) prepareIntroducePersist(persists chan *persistIntroduction, new
}()
var err error
for _, segmentID := range newSegmentIds {
newSegments[segmentID], err = s.loadSegment(segmentID, s.segPlugin)
newSegments[segmentID], err = s.loadSegment(segmentID, s.segPlugin, s.directory)
if err != nil {
return fmt.Errorf("error opening new segment %d, %v", segmentID, err)
}
Expand Down
30 changes: 18 additions & 12 deletions index/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,16 @@ func OpenWriter(config Config) (*Writer, error) {
return nil, fmt.Errorf("error getting exclusive access to diretory: %w", err)
}

lastPersistedEpoch, nextSnapshotEpoch, err2 := rv.loadSnapshots()
if err2 != nil {
var lastPersistedEpoch, nextSnapshotEpoch uint64

if config.SnapshotDirectoryFunc != nil {
lastPersistedEpoch, nextSnapshotEpoch, err = rv.loadSnapshots(config.SnapshotDirectoryFunc())
} else {
lastPersistedEpoch, nextSnapshotEpoch, err = rv.loadSnapshots(rv.directory)
}
if err != nil {
_ = rv.Close()
return nil, err2
return nil, err
}

// initialize nextSegmentID to a safe value
Expand Down Expand Up @@ -133,9 +139,9 @@ func OpenWriter(config Config) (*Writer, error) {
return rv, nil
}

func (s *Writer) loadSnapshots() (lastPersistedEpoch, nextSnapshotEpoch uint64, err error) {
func (s *Writer) loadSnapshots(directory Directory) (lastPersistedEpoch, nextSnapshotEpoch uint64, err error) {
nextSnapshotEpoch = 1
snapshotEpochs, err := s.directory.List(ItemKindSnapshot)
snapshotEpochs, err := directory.List(ItemKindSnapshot)
if err != nil {
return 0, 0, err
}
Expand All @@ -149,7 +155,7 @@ func (s *Writer) loadSnapshots() (lastPersistedEpoch, nextSnapshotEpoch uint64,
snapshotEpoch := snapshotEpochs[i]
snapshotsFound = true
var indexSnapshot *Snapshot
indexSnapshot, err = s.loadSnapshot(snapshotEpoch)
indexSnapshot, err = s.loadSnapshot(snapshotEpoch, directory)
if err != nil {
log.Printf("error loading snapshot epoch: %d: %v", snapshotEpoch, err)
// but keep going and hope there is another newer snapshot that works
Expand Down Expand Up @@ -434,7 +440,7 @@ func OpenReader(config Config) (*Snapshot, error) {
// start with most recent
var indexSnapshot *Snapshot
for _, snapshotEpoch := range snapshotEpochs {
indexSnapshot, err = parent.loadSnapshot(snapshotEpoch)
indexSnapshot, err = parent.loadSnapshot(snapshotEpoch, parent.directory)
if err != nil {
log.Printf("error loading snapshot epoch: %d: %v", snapshotEpoch, err)
// but keep going and hope there is another newer snapshot that works
Expand All @@ -449,15 +455,15 @@ func OpenReader(config Config) (*Snapshot, error) {
return indexSnapshot, nil
}

func (s *Writer) loadSnapshot(epoch uint64) (*Snapshot, error) {
func (s *Writer) loadSnapshot(epoch uint64, directory Directory) (*Snapshot, error) {
snapshot := &Snapshot{
parent: s,
epoch: epoch,
refs: 1,
creator: "loadSnapshot",
}

data, closer, err := s.directory.Load(ItemKindSnapshot, epoch)
data, closer, err := directory.Load(ItemKindSnapshot, epoch)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -510,7 +516,7 @@ func (s *Writer) loadSnapshot(epoch uint64) (*Snapshot, error) {
if err != nil {
return nil, fmt.Errorf("error loading required segment plugin: %v", err)
}
segSnapshot.segment, err = s.loadSegment(segSnapshot.id, segPlugin)
segSnapshot.segment, err = s.loadSegment(segSnapshot.id, segPlugin, directory)
if err != nil {
return nil, fmt.Errorf("error opening segment %d: %w", segSnapshot.id, err)
}
Expand All @@ -522,8 +528,8 @@ func (s *Writer) loadSnapshot(epoch uint64) (*Snapshot, error) {
return snapshot, nil
}

func (s *Writer) loadSegment(id uint64, plugin *SegmentPlugin) (*segmentWrapper, error) {
data, closer, err := s.directory.Load(ItemKindSegment, id)
func (s *Writer) loadSegment(id uint64, plugin *SegmentPlugin, directory Directory) (*segmentWrapper, error) {
data, closer, err := directory.Load(ItemKindSegment, id)
if err != nil {
return nil, fmt.Errorf("error loading segment fromt directory: %v", err)
}
Expand Down
78 changes: 78 additions & 0 deletions index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1818,3 +1818,81 @@ func TestBug87(t *testing.T) {
t.Fatal(err)
}
}

func TestLoadingFromBackup(t *testing.T) {
indexWriter, err := OpenWriter(InMemoryOnlyConfig())
if err != nil {
t.Fatal(err)
}

numDocs := 10

batch := NewBatch()
for i := 0; i < numDocs; i++ {
doc := NewDocument("a" + strconv.Itoa(i)).
AddField(NewKeywordField("location", "/a").Aggregatable().StoreValue())
batch.Update(doc.ID(), doc)
}
err = indexWriter.Batch(batch)
if err != nil {
t.Fatal(err)
}

// create a temp directory to store the index
tmpDir, err := ioutil.TempDir("", "bluge.tests.TestLoadingFromBackup")
if err != nil {
t.Fatal(err)
}
defer func() {
_ = os.RemoveAll(tmpDir)
}()

indexReader, err := indexWriter.Reader()
if err != nil {
t.Fatalf("error getting reader: %v", err)
}
defer func() { _ = indexReader.Close() }()

cancelCh := make(chan struct{})
err = indexReader.Backup(tmpDir, cancelCh)
if err != nil {
t.Fatal(err)
}

_ = indexWriter.Close()

writer, err := OpenWriter(InMemoryFromBackup(tmpDir))
if err != nil {
t.Fatal(err)
}
defer func() { _ = writer.Close() }()

reader, err := writer.Reader()
if err != nil {
t.Fatalf("error getting reader: %v", err)
}
defer func() { _ = indexReader.Close() }()

req := NewAllMatches(NewPrefixQuery("/a").SetField("location"))
documentMatchIterator, err := reader.Search(context.Background(), req)
if err != nil {
t.Fatalf("error search: %v", err)
}

count := 0

match, err := documentMatchIterator.Next()
for err == nil && match != nil {
count++
err = match.VisitStoredFields(func(field string, value []byte) bool {
return true
})
match, err = documentMatchIterator.Next()
}
if err != nil {
t.Fatalf("error iterating: %v", err)
}
if count != numDocs {
t.Fatalf("wrong number of docs: expected %d, got %d", numDocs, count)
}
}