Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions network/circuitbreaker/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type Config struct {
// SleepWindow how long to deny requests before allowing attempts
// again to determine if the chain should be closed again.
SleepWindow time.Duration

// CustomSuffix exists for adding custom suffix in circuit breaker name in order not to catch panic via existing name.
CustomSuffix string
}

const (
Expand All @@ -63,6 +66,10 @@ const (
)

func New(name string, config Config) *CircuitBreaker {
if config.CustomSuffix != "" {
name += fmt.Sprintf("-%s", config.CustomSuffix)
}

breaker := manager.GetCircuit(name)
if breaker != nil {
return &CircuitBreaker{
Expand Down
15 changes: 14 additions & 1 deletion proxy/bulk/ingestor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/mappingprovider"
"github.com/ozontech/seq-db/network/circuitbreaker"
"github.com/ozontech/seq-db/packer"
"github.com/ozontech/seq-db/seq"
"github.com/ozontech/seq-db/storage"
Expand Down Expand Up @@ -83,6 +84,9 @@ func TestProcessDocuments(t *testing.T) {
DocsZSTDCompressLevel: -1,
MetasZSTDCompressLevel: -1,
MaxDocumentSize: int(units.KiB),
BulkCircuit: circuitbreaker.Config{
CustomSuffix: "TestProcessDocuments",
},
}

now := time.Now().UTC()
Expand Down Expand Up @@ -486,6 +490,9 @@ func BenchmarkProcessDocuments(b *testing.B) {
MappingProvider: mappingProvider,
CaseSensitive: false,
MaxTokenSize: int(units.KiB),
BulkCircuit: circuitbreaker.Config{
CustomSuffix: "BenchProcessDocuments",
},
}

ingestor := NewIngestor(cfg, &FakeClient{})
Expand Down Expand Up @@ -563,7 +570,13 @@ func TestProcessDocumentType(t *testing.T) {
client := &FakeClient{}
mp, err := mappingprovider.New("", mappingprovider.WithMapping(map[string]seq.MappingTypes{}))
r.NoError(err)
ingestor := NewIngestor(IngestorConfig{MaxInflightBulks: 1, MappingProvider: mp}, client)
ingestor := NewIngestor(IngestorConfig{
MaxInflightBulks: 1,
MappingProvider: mp,
BulkCircuit: circuitbreaker.Config{
CustomSuffix: "TestProcessDocumentType",
},
}, client)
defer ingestor.Stop()

stop := false
Expand Down
37 changes: 33 additions & 4 deletions tests/integration_tests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func getAutoTimeGenerator(start time.Time, step time.Duration) func() time.Time
}

func (s *IntegrationTestSuite) TestSearchOne() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
origDocs := []string{
`{"service":"a", "xxxx":"yyyy"}`,
`{"k8s_pod":"sq-toloka-loader-1788964-dryrun-58hmw", "yyyy":"xxxx"}`,
Expand Down Expand Up @@ -118,6 +119,7 @@ func (s *IntegrationTestSuite) TestSearchOne() {

func (s *IntegrationTestSuite) TestPipeFields() {
config := *s.Config
config.Name = fmt.Sprintf("%s-%s", config.Name, s.T().Name())
config.Mapping = map[string]seq.MappingTypes{
"event": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
"message": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
Expand Down Expand Up @@ -199,6 +201,7 @@ func (s *IntegrationTestSuite) TestPipeFields() {
}

func (s *IntegrationTestSuite) TestSearchOneHTTP() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
origDocs := []string{
`{"service":"a", "xxxx":"yyyy"}`,
`{"service":"b", "k8s_pod":"sq-toloka-loader-1788964-dryrun-58hmw", "yyyy":"xxxx"}`,
Expand Down Expand Up @@ -240,6 +243,7 @@ func (s *IntegrationTestSuite) TestSearchOneHTTP() {
}

func (s *IntegrationTestSuite) TestSearchNothing() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
origDocs := []string{
`{"service":"a", "xxxx":"yyyy"}`,
`{"k8s_pod":"sq-toloka-loader-1788964-dryrun-58hmw", "yyyy":"xxxx"}`,
Expand All @@ -256,6 +260,7 @@ func (s *IntegrationTestSuite) TestSearchNothing() {
}

func (s *IntegrationTestSuite) TestSearchBackwards() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
now := time.Now()
before := now.Add(-5 * time.Hour)
origDocs := []string{
Expand Down Expand Up @@ -287,6 +292,7 @@ func (s *IntegrationTestSuite) TestSearchBackwards() {
}

func (s *IntegrationTestSuite) TestSearchSequence() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
docTemplate := `{"service":"a","time":"%s"}`
bulks := 16
bulkSize := 1024
Expand Down Expand Up @@ -336,6 +342,7 @@ func (s *IntegrationTestSuite) TestSearchSequence() {
}

func (s *IntegrationTestSuite) TestSearchMany() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
const NetN = 256 * 1024
n := int(math.Floor(NetN * 1.2))

Expand Down Expand Up @@ -377,6 +384,7 @@ func getBulkIterationsNum(e *setup.TestingEnv) int {
}

func (s *IntegrationTestSuite) envWithDummyDocs(n int) (*setup.TestingEnv, []string) {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
env := setup.NewTestingEnv(s.Config)

str := largeString(20)
Expand Down Expand Up @@ -439,6 +447,7 @@ func (s *IntegrationTestSuite) TestFetch() {
}

func (s *IntegrationTestSuite) TestFetchNotFound() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
env := setup.NewTestingEnv(s.Config)
defer env.StopAll()

Expand All @@ -461,6 +470,7 @@ func (s *IntegrationTestSuite) TestFetchNotFound() {
}

func (s *IntegrationTestSuite) TestMulti() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
// ingest
getNextTs := getAutoTsGenerator(time.Now(), -time.Second)
origDocs := []string{
Expand Down Expand Up @@ -498,6 +508,7 @@ func collectIDs(qpr *seq.QPR) []string {
}

func (s *IntegrationTestSuite) TestSearchNot() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
env := setup.NewTestingEnv(s.Config)
defer env.StopAll()

Expand Down Expand Up @@ -573,6 +584,7 @@ func (s *IntegrationTestSuite) TestSearchNot() {
}

func (s *IntegrationTestSuite) TestSearchPattern() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
env := setup.NewTestingEnv(s.Config)
defer env.StopAll()

Expand Down Expand Up @@ -607,6 +619,7 @@ func (s *IntegrationTestSuite) TestSearchPattern() {
}

func (s *IntegrationTestSuite) TestSearchSimple() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
env := setup.NewTestingEnv(s.Config)
defer env.StopAll()

Expand Down Expand Up @@ -645,6 +658,7 @@ func (s *IntegrationTestSuite) TestSearchSimple() {
}

func (s *IntegrationTestSuite) TestManySearchRequests() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
env := setup.NewTestingEnv(s.Config)
defer env.StopAll()

Expand All @@ -665,6 +679,7 @@ func (s *IntegrationTestSuite) TestManySearchRequests() {
}

func (s *IntegrationTestSuite) TestAgg() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
t := s.T()

env := setup.NewTestingEnv(s.Config)
Expand Down Expand Up @@ -750,6 +765,7 @@ func (s *IntegrationTestSuite) TestAgg() {
}

func (s *IntegrationTestSuite) TestTimeseries() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
t := s.T()

env := setup.NewTestingEnv(s.Config)
Expand Down Expand Up @@ -1208,6 +1224,7 @@ func (s *IntegrationTestSuite) TestAggStat() {
}

func (s *IntegrationTestSuite) TestAggNoTotal() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
env := setup.NewTestingEnv(s.Config)
defer env.StopAll()

Expand Down Expand Up @@ -1315,6 +1332,7 @@ func (s *IntegrationTestSuite) TestAggNoTotal() {
}

func (s *IntegrationTestSuite) TestSeal() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
env := setup.NewTestingEnv(s.Config)

bulksNum := getBulkIterationsNum(env)
Expand Down Expand Up @@ -1384,6 +1402,7 @@ func (s *IntegrationTestSuite) TestSeal() {
}

func (s *IntegrationTestSuite) TestSearchRange() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
doc := `{"service": "test-service", "level": "%d"}`

env := setup.NewTestingEnv(s.Config)
Expand Down Expand Up @@ -1424,6 +1443,7 @@ func (s *IntegrationTestSuite) TestSearchRange() {
}

func (s *IntegrationTestSuite) TestQueryErr() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
origDocs := []string{
`{"service":"a", "xxxx":"yyyy"}`,
`{"service":"a", "yyyy":"xxxx"}`,
Expand All @@ -1443,6 +1463,7 @@ func (s *IntegrationTestSuite) TestQueryErr() {
func (s *IntegrationTestSuite) TestConnectionRefused() {
s.T().Skip() // temporary skip this test until we fix it in CORELOG-299

s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
env := setup.NewTestingEnv(s.Config)
env.StopStore()
defer env.StopAll()
Expand All @@ -1469,6 +1490,7 @@ func (s *IntegrationTestSuite) TestConnectionRefused() {
}

func (s *IntegrationTestSuite) TestSearchProxyTimeout() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
if s.Config.Name != configBasic {
s.T().Skip("no need to run in", s.Config.Name, "env")
}
Expand Down Expand Up @@ -1507,6 +1529,7 @@ func (s *IntegrationTestSuite) TestSearchProxyTimeout() {
}

func (s *IntegrationTestSuite) TestSearchStoreTimeout() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
if s.Config.Name != configBasic {
s.T().Skip("no need to run in", s.Config.Name, "env")
}
Expand Down Expand Up @@ -1539,6 +1562,7 @@ func (s *IntegrationTestSuite) TestSearchStoreTimeout() {
}

func (s *IntegrationTestSuite) TestBulkBadTimestamp() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
type Doc struct {
Service string `json:"service"`
Level string `json:"level"`
Expand Down Expand Up @@ -1586,7 +1610,7 @@ const configBasic = "Basic"

func TestBasicIntegration(t *testing.T) {
cfg := setup.TestingEnvConfig{
Name: configBasic,
Name: fmt.Sprintf("%s-%s", configBasic, t.Name()),
IngestorCount: 1,
HotShards: 1,
HotFactor: 1,
Expand All @@ -1598,7 +1622,7 @@ func TestBasicIntegration(t *testing.T) {

func TestColdStoreIntegration(t *testing.T) {
cfg := setup.TestingEnvConfig{
Name: "WithColdStore",
Name: fmt.Sprintf("%s-%s", "WithColdStore", t.Name()),
IngestorCount: 1,
ColdShards: 1,
ColdFactor: 1,
Expand All @@ -1613,7 +1637,7 @@ func TestColdStoreIntegration(t *testing.T) {

func TestColdHotStoreIntegration(t *testing.T) {
cfg := setup.TestingEnvConfig{
Name: "WithColdAndHotStoreEnabled",
Name: fmt.Sprintf("%s-%s", "WithColdAndHotStoreEnabled", t.Name()),
IngestorCount: 2,
ColdShards: 1,
ColdFactor: 1,
Expand All @@ -1628,7 +1652,7 @@ func TestColdHotStoreIntegration(t *testing.T) {

func TestBigWithReplicasIntegration(t *testing.T) {
cfg := setup.TestingEnvConfig{
Name: "BigWithReplicas",
Name: fmt.Sprintf("%s-%s", "BigWithReplicas", t.Name()),
IngestorCount: 2,
ColdShards: 4,
ColdFactor: 1,
Expand Down Expand Up @@ -1684,6 +1708,7 @@ func copySlice[V any](src []V) []V {
}

func (s *IntegrationTestSuite) TestPathSearch() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
env := setup.NewTestingEnv(s.Config)
defer env.StopAll()

Expand Down Expand Up @@ -1742,6 +1767,7 @@ func (s *IntegrationTestSuite) TestPathSearch() {
}

func (s *IntegrationTestSuite) TestSearchFieldsWithMultipleTypes() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
t := s.T()

env := setup.NewTestingEnv(s.Config)
Expand Down Expand Up @@ -1794,6 +1820,7 @@ func (s *IntegrationTestSuite) TestSearchFieldsWithMultipleTypes() {
}

func (s *IntegrationTestSuite) TestAggregateFieldsWithMultipleTypes() {
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, s.T().Name())
t := s.T()

env := setup.NewTestingEnv(s.Config)
Expand Down Expand Up @@ -1843,6 +1870,7 @@ func (s *IntegrationTestSuite) TestAggregateFieldsWithMultipleTypes() {
// time field is replaced with time.Now()
func (s *IntegrationTestSuite) TestTimeField() {
config := *s.Config
config.Name = fmt.Sprintf("%s-%s", config.Name, s.T().Name())
config.Mapping = map[string]seq.MappingTypes{
"event": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
"message": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
Expand Down Expand Up @@ -1883,6 +1911,7 @@ func (s *IntegrationTestSuite) TestAsyncSearch() {
r := require.New(t)

cfg := *s.Config
cfg.Name = fmt.Sprintf("%s-%s", cfg.Name, s.T().Name())
cfg.Mapping = map[string]seq.MappingTypes{
"ip": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
"method": seq.NewSingleType(seq.TokenizerTypeKeyword, "", 0),
Expand Down
1 change: 1 addition & 0 deletions tests/integration_tests/replicas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func NewReplicaEnv(t *testing.T, config setup.TestingEnvConfig) ReplicasEnv {
}
config.DataDir = dir
config.IngestorCount = 1
config.Name = fmt.Sprintf("%s-%s", config.Name, t.Name())

env := ReplicasEnv{setup.NewTestingEnv(&config)}

Expand Down
3 changes: 2 additions & 1 deletion tests/integration_tests/single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,9 +571,10 @@ func (s *SingleTestSuite) TestSealedMultiFetch() {
}

func TestSingleSuite(t *testing.T) {
for _, cfg := range suites.SingleEnvs() {
for i, cfg := range suites.SingleEnvs() {
t.Run(cfg.Name, func(t *testing.T) {
t.Parallel()
cfg.Name = fmt.Sprintf("%s%d", cfg.Name, i)
suite.Run(t, NewSingleTestSuite(cfg))
})
}
Expand Down
1 change: 1 addition & 0 deletions tests/setup/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ func MakeIngestors(cfg *TestingEnvConfig, hot, cold [][]string) []*Ingestor {
BulkCircuit: circuitbreaker.Config{
RequestVolumeThreshold: 101, // disable circuit breaker
Timeout: time.Hour,
CustomSuffix: cfg.Name,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please verify that this will fix the issue?

I see that we have several suites (for example TestBasicIntegration with name Basic) with multiple tests inside those suites. Each test calls setup.NewTestingEnv with this CustomSuffix so we still can face panics.

Or am I missing something?

Copy link
Author

@romanchechyotkin romanchechyotkin Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, you are right, i missed this point

Copy link
Author

@romanchechyotkin romanchechyotkin Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkharms already updated test configs, check please

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkharms friendly reminder

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@romanchechyotkin sorry, I was chilling on a vacation. Feel free to ping other maintainers.

Honestly, I think your approach is too verbose. I would be fine with something like this (where new CB is created):

breaker := circuitbreaker.New(
	fmt.Sprintf("%s-shard-%d-%s", breakerPrefix, i, uuid.NewString()),
	config,
)

Of course, this is a workaround and the real problem is that CB is a global instance (and compiled only once for integration tests).

},
MaxInflightBulks: 0,
AllowedTimeDrift: 24 * time.Hour,
Expand Down
2 changes: 2 additions & 0 deletions tests/suites/single.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package suites

import (
"fmt"
"math"
"slices"
"strings"
Expand Down Expand Up @@ -147,6 +148,7 @@ func NewSingle(cfg *setup.TestingEnvConfig) *Single {

func (s *Single) BeforeTest(suiteName, testName string) {
s.Base.BeforeTest(suiteName, testName)
s.Config.Name = fmt.Sprintf("%s-%s", s.Config.Name, testName)
s.Env = setup.NewTestingEnv(s.Config)
}

Expand Down