Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions storage/inmemory/mutexmap/rulebasedsegment.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ func NewRuleBasedSegmentsStorage() *RuleBasedSegmentsStorageImpl {
}

// Update atomically registers new rule-based segments, removes archived ones and updates the change number
func (r *RuleBasedSegmentsStorageImpl) Update(toAdd []dtos.RuleBasedSegmentDTO, toRemove []dtos.RuleBasedSegmentDTO, till int64) {
func (r *RuleBasedSegmentsStorageImpl) Update(toAdd []dtos.RuleBasedSegmentDTO, toRemove []dtos.RuleBasedSegmentDTO, till int64) error {
r.mutex.Lock()
defer r.mutex.Unlock()
r.update(toAdd, toRemove, till)
return nil
}

// Update atomically registers new rule-based, removes archived ones and updates the change number
Expand Down Expand Up @@ -77,14 +78,14 @@ func (r *RuleBasedSegmentsStorageImpl) All() []dtos.RuleBasedSegmentDTO {
}

// RuleBasedSegmentNames returns a slice with the names of all the current rule-baseds
func (r *RuleBasedSegmentsStorageImpl) RuleBasedSegmentNames() []string {
func (r *RuleBasedSegmentsStorageImpl) RuleBasedSegmentNames() ([]string, error) {
r.mutex.RLock()
defer r.mutex.RUnlock()
ruleBasedNames := make([]string, 0)
for key := range r.data {
ruleBasedNames = append(ruleBasedNames, key)
}
return ruleBasedNames
return ruleBasedNames, nil
}

// SegmentNames returns a slice with the names of all segments referenced in rule-based
Expand Down Expand Up @@ -159,7 +160,6 @@ func (r *RuleBasedSegmentsStorageImpl) GetRuleBasedSegmentByName(name string) (*
}

func (r *RuleBasedSegmentsStorageImpl) ReplaceAll(toAdd []dtos.RuleBasedSegmentDTO, changeNumber int64) error {
// Get all current splits under read lock
r.mutex.RLock()
toRemove := make([]dtos.RuleBasedSegmentDTO, 0)
for _, ruleBased := range r.data {
Expand Down
10 changes: 6 additions & 4 deletions storage/inmemory/mutexmap/rulebasedsegment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ func TestRuleBasedSegmentsStorage(t *testing.T) {
changeNumber, _ := storage.ChangeNumber()
assert.Equal(t, int64(-1), changeNumber)
assert.Empty(t, storage.All())
assert.Empty(t, storage.RuleBasedSegmentNames())
names, _ := storage.RuleBasedSegmentNames()
assert.Empty(t, names)

// Create test data
ruleBased1 := dtos.RuleBasedSegmentDTO{
Expand Down Expand Up @@ -84,7 +85,7 @@ func TestRuleBasedSegmentsStorage(t *testing.T) {
assert.Len(t, storage.All(), 2)

// Test RuleBasedSegmentNames
names := storage.RuleBasedSegmentNames()
names, _ = storage.RuleBasedSegmentNames()
assert.Contains(t, names, "rule1")
assert.Contains(t, names, "rule2")

Expand All @@ -107,7 +108,8 @@ func TestRuleBasedSegmentsStorage(t *testing.T) {
changeNumber, _ = storage.ChangeNumber()
assert.Equal(t, int64(124), changeNumber)
assert.Len(t, storage.All(), 1)
assert.Contains(t, storage.RuleBasedSegmentNames(), "rule2")
names, _ = storage.RuleBasedSegmentNames()
assert.Contains(t, names, "rule2")
}

func TestRuleBasedSegmentsStorageReplaceAll(t *testing.T) {
Expand Down Expand Up @@ -311,7 +313,7 @@ func TestRuleBasedSegmentsStorageConcurrent(t *testing.T) {
go func() {
defer wg.Done()
_ = storage.All()
_ = storage.RuleBasedSegmentNames()
_, _ = storage.RuleBasedSegmentNames()
_ = storage.Segments()
_ = storage.Contains([]string{"segment1"})
}()
Expand Down
4 changes: 2 additions & 2 deletions storage/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,15 +273,15 @@ type LargeSegmentsStorage interface {
// RuleBasedSegmentStorageProducer interface should be implemented by all structs that offer writing rule-based segments
type RuleBasedSegmentStorageProducer interface {
SetChangeNumber(till int64) error
Update(toAdd []dtos.RuleBasedSegmentDTO, toRemove []dtos.RuleBasedSegmentDTO, till int64)
Update(toAdd []dtos.RuleBasedSegmentDTO, toRemove []dtos.RuleBasedSegmentDTO, till int64) error
ReplaceAll(toAdd []dtos.RuleBasedSegmentDTO, changeNumber int64) error
}

// RuleBasedStorageConsumer interface should be implemented by all structs that ofer reading rule-based segments
type RuleBasedSegmentStorageConsumer interface {
ChangeNumber() (int64, error)
All() []dtos.RuleBasedSegmentDTO
RuleBasedSegmentNames() []string
RuleBasedSegmentNames() ([]string, error)
Contains(ruleBasedSegmentNames []string) bool
Segments() *set.ThreadUnsafeSet
LargeSegments() *set.ThreadUnsafeSet
Expand Down
9 changes: 5 additions & 4 deletions storage/mocks/rulebasedsegment.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ func (m *MockRuleBasedSegmentStorage) All() []dtos.RuleBasedSegmentDTO {
}

// RuleBasedSegmentNames mock
func (m *MockRuleBasedSegmentStorage) RuleBasedSegmentNames() []string {
func (m *MockRuleBasedSegmentStorage) RuleBasedSegmentNames() ([]string, error) {
args := m.Called()
return args.Get(0).([]string)
return args.Get(0).([]string), args.Error(1)
}

// Contains mock
Expand Down Expand Up @@ -61,8 +61,9 @@ func (m *MockRuleBasedSegmentStorage) SetChangeNumber(till int64) error {
}

// Update mock
func (m *MockRuleBasedSegmentStorage) Update(toAdd []dtos.RuleBasedSegmentDTO, toRemove []dtos.RuleBasedSegmentDTO, till int64) {
m.Called(toAdd, toRemove, till)
func (m *MockRuleBasedSegmentStorage) Update(toAdd []dtos.RuleBasedSegmentDTO, toRemove []dtos.RuleBasedSegmentDTO, till int64) error {
args := m.Called(toAdd, toRemove, till)
return args.Error(0)
}

func (m *MockRuleBasedSegmentStorage) ReplaceAll(toAdd []dtos.RuleBasedSegmentDTO, changeNumber int64) error {
Expand Down
28 changes: 28 additions & 0 deletions storage/redis/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,31 @@ func cleanPrefixedKeys(keys []string, toRemove string) []string {

return toReturn
}

func updateRuleBasedSegments(pipeline redis.Pipeline, toAdd []dtos.RuleBasedSegmentDTO, toRemove []dtos.RuleBasedSegmentDTO) (map[string]error, []string) {
failedToAdd := make(map[string]error)
addedInPipe := make([]string, 0, len(toAdd))

for _, ruleBased := range toAdd {
keyToStore := strings.Replace(KeyRuleBasedSegment, "{rbsegment}", ruleBased.Name, 1)
raw, err := json.Marshal(ruleBased)
if err != nil {
failedToAdd[ruleBased.Name] = fmt.Errorf("failed to serialize rule-based segment: %w", err)
continue
}

// Attach each Feature Flag update into Redis Pipeline
pipeline.Set(keyToStore, raw, 0)
addedInPipe = append(addedInPipe, ruleBased.Name)
}

if len(toRemove) > 0 {
toRemoveKeys := make([]string, 0, len(toRemove))
for idx := range toRemove {
toRemoveKeys = append(toRemoveKeys, strings.Replace(KeyRuleBasedSegment, "{rbsegment}", toRemove[idx].Name, 1))
}
pipeline.Del(toRemoveKeys...)
}

return failedToAdd, addedInPipe
}
Loading
Loading