Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ func runCompact(
blockLister = block.NewConcurrentLister(logger, insBkt)
case recursiveDiscovery:
blockLister = block.NewRecursiveLister(logger, insBkt)
case birthstoneDiscovery:
blockLister = block.NewBirthstoneLister(logger, insBkt)
default:
return errors.Errorf("unknown sync strategy %s", conf.blockListStrategy)
}
Expand Down Expand Up @@ -381,6 +383,7 @@ func runCompact(
metadata.HashFunc(conf.hashFunc),
conf.blockFilesConcurrency,
conf.compactBlocksFetchConcurrency,
conf.enableBirthstone,
)
var planner compact.Planner

Expand Down Expand Up @@ -409,6 +412,7 @@ func runCompact(
insBkt,
conf.compactionConcurrency,
conf.skipBlockWithOutOfOrderChunks,
conf.enableBirthstone,
)
if err != nil {
return errors.Wrap(err, "create bucket compactor")
Expand Down Expand Up @@ -518,6 +522,7 @@ func runCompact(
conf.blockFilesConcurrency,
metadata.HashFunc(conf.hashFunc),
conf.acceptMalformedIndex,
conf.enableBirthstone,
); err != nil {
return errors.Wrap(err, "first pass of downsampling failed")
}
Expand Down Expand Up @@ -547,6 +552,7 @@ func runCompact(
conf.blockFilesConcurrency,
metadata.HashFunc(conf.hashFunc),
conf.acceptMalformedIndex,
conf.enableBirthstone,
); err != nil {
return errors.Wrap(err, "second pass of downsampling failed")
}
Expand Down Expand Up @@ -792,6 +798,7 @@ type compactConfig struct {
progressCalculateInterval time.Duration
filterConf *store.FilterConfig
disableAdminOperations bool
enableBirthstone bool
}

func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -832,7 +839,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"as querying long time ranges without non-downsampled data is not efficient and useful e.g it is not possible to render all samples for a human eye anyway").
Default("false").BoolVar(&cc.disableDownsampling)

strategies := strings.Join([]string{string(concurrentDiscovery), string(recursiveDiscovery)}, ", ")
strategies := strings.Join([]string{string(concurrentDiscovery), string(recursiveDiscovery), string(birthstoneDiscovery)}, ", ")
cmd.Flag("block-discovery-strategy", "One of "+strategies+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations.").
Default(string(concurrentDiscovery)).StringVar(&cc.blockListStrategy)
cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage.").
Expand Down Expand Up @@ -910,4 +917,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("bucket-web-label", "External block label to use as group title in the bucket web UI").StringVar(&cc.label)

cmd.Flag("disable-admin-operations", "Disable UI/API admin operations like marking blocks for deletion and no compaction.").Default("false").BoolVar(&cc.disableAdminOperations)

cmd.Flag("enable-birthstone", "When set to true, upload and delete a birthstone file when block is created and deleted. Birthstone file marks the completeness of a block in bucket.").
Hidden().Default("false").BoolVar(&cc.enableBirthstone)
}
4 changes: 4 additions & 0 deletions cmd/thanos/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ type shipperConfig struct {
allowOutOfOrderUpload bool
hashFunc string
metaFileName string
enableBirthstone bool
}

func (sc *shipperConfig) registerFlag(cmd extkingpin.FlagClause) *shipperConfig {
Expand All @@ -184,6 +185,9 @@ func (sc *shipperConfig) registerFlag(cmd extkingpin.FlagClause) *shipperConfig
cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").EnumVar(&sc.hashFunc, "SHA256", "")
cmd.Flag("shipper.meta-file-name", "the file to store shipper metadata in").Default(shipper.DefaultMetaFilename).StringVar(&sc.metaFileName)
cmd.Flag("shipper.enable-birthstone",
"If true shipper will upload a birthstone when a block is completely uploaded to bucket.").
Default("false").BoolVar(&sc.enableBirthstone)
return sc
}

Expand Down
11 changes: 7 additions & 4 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func RunDownsample(
objStoreConfig *extflag.PathOrContent,
comp component.Component,
hashFunc metadata.HashFunc,
enableBirthstone bool,
) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
Expand Down Expand Up @@ -134,7 +135,7 @@ func RunDownsample(
metrics.downsamples.WithLabelValues(resolutionLabel)
metrics.downsampleFailures.WithLabelValues(resolutionLabel)
}
if err := downsampleBucket(ctx, logger, metrics, insBkt, metas, dataDir, downsampleConcurrency, blockFilesConcurrency, hashFunc, false); err != nil {
if err := downsampleBucket(ctx, logger, metrics, insBkt, metas, dataDir, downsampleConcurrency, blockFilesConcurrency, hashFunc, false, enableBirthstone); err != nil {
return errors.Wrap(err, "downsampling failed")
}

Expand All @@ -143,7 +144,7 @@ func RunDownsample(
if err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, metrics, insBkt, metas, dataDir, downsampleConcurrency, blockFilesConcurrency, hashFunc, false); err != nil {
if err := downsampleBucket(ctx, logger, metrics, insBkt, metas, dataDir, downsampleConcurrency, blockFilesConcurrency, hashFunc, false, enableBirthstone); err != nil {
return errors.Wrap(err, "downsampling failed")
}
return nil
Expand Down Expand Up @@ -185,6 +186,7 @@ func downsampleBucket(
blockFilesConcurrency int,
hashFunc metadata.HashFunc,
acceptMalformedIndex bool,
enableBirthstone bool,
) (rerr error) {
if err := os.MkdirAll(dir, 0750); err != nil {
return errors.Wrap(err, "create dir")
Expand Down Expand Up @@ -262,7 +264,7 @@ func downsampleBucket(
resolution = downsample.ResLevel2
errMsg = "downsampling to 60 min"
}
if err := processDownsampling(workerCtx, logger, bkt, m, dir, resolution, hashFunc, metrics, acceptMalformedIndex, blockFilesConcurrency); err != nil {
if err := processDownsampling(workerCtx, logger, bkt, m, dir, resolution, hashFunc, metrics, acceptMalformedIndex, blockFilesConcurrency, enableBirthstone); err != nil {
metrics.downsampleFailures.WithLabelValues(m.Thanos.ResolutionString()).Inc()
errCh <- errors.Wrap(err, errMsg)

Expand Down Expand Up @@ -353,6 +355,7 @@ func processDownsampling(
metrics *DownsampleMetrics,
acceptMalformedIndex bool,
blockFilesConcurrency int,
enableBirthstone bool,
) error {
begin := time.Now()
bdir := filepath.Join(dir, m.ULID.String())
Expand Down Expand Up @@ -418,7 +421,7 @@ func processDownsampling(

begin = time.Now()

err = block.Upload(ctx, logger, bkt, resdir, hashFunc)
err = block.Upload(ctx, logger, bkt, resdir, hashFunc, enableBirthstone)
if err != nil {
return compact.NewRetryError(errors.Wrapf(err, "upload downsampled block %s", id))
}
Expand Down
12 changes: 6 additions & 6 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestRegression4960_Deadlock(t *testing.T) {
labels.Labels{{Name: "e1", Value: "1"}},
downsample.ResLevel0, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String()), metadata.NoneFunc))
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String()), metadata.NoneFunc, false))
}
{
id2, err = e2eutil.CreateBlock(
Expand All @@ -149,7 +149,7 @@ func TestRegression4960_Deadlock(t *testing.T) {
labels.Labels{{Name: "e1", Value: "2"}},
downsample.ResLevel0, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id2.String()), metadata.NoneFunc))
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id2.String()), metadata.NoneFunc, false))
}
{
id3, err = e2eutil.CreateBlock(
Expand All @@ -160,7 +160,7 @@ func TestRegression4960_Deadlock(t *testing.T) {
labels.Labels{{Name: "e1", Value: "2"}},
downsample.ResLevel0, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id3.String()), metadata.NoneFunc))
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id3.String()), metadata.NoneFunc, false))
}

meta, err := block.DownloadMeta(ctx, logger, bkt, id)
Expand All @@ -174,7 +174,7 @@ func TestRegression4960_Deadlock(t *testing.T) {

metas, _, err := metaFetcher.Fetch(ctx)
testutil.Ok(t, err)
err = downsampleBucket(ctx, logger, metrics, bkt, metas, dir, 1, 1, metadata.NoneFunc, false)
err = downsampleBucket(ctx, logger, metrics, bkt, metas, dir, 1, 1, metadata.NoneFunc, false, false)
testutil.NotOk(t, err)

testutil.Assert(t, strings.Contains(err.Error(), "some random error has occurred"))
Expand All @@ -200,7 +200,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {
labels.Labels{{Name: "e1", Value: "1"}},
downsample.ResLevel0, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String()), metadata.NoneFunc))
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String()), metadata.NoneFunc, false))
}

meta, err := block.DownloadMeta(ctx, logger, bkt, id)
Expand All @@ -214,7 +214,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metas, _, err := metaFetcher.Fetch(ctx)
testutil.Ok(t, err)
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir, 1, 1, metadata.NoneFunc, false))
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir, 1, 1, metadata.NoneFunc, false, false))
testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.ResolutionString())))

_, err = os.Stat(dir)
Expand Down
8 changes: 8 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ func runReceive(
level.Info(logger).Log("msg", "metric name filter feature enabled")
}
}
if conf.enableBirthstone {
multiTSDBOptions = append(multiTSDBOptions, receive.WithBirthstoneEnabled())
}

// Create a matcher converter if specified by command line to cache expensive regex matcher conversions.
// Proxy store and TSDB stores of all tenants share a single cache.
Expand Down Expand Up @@ -985,6 +988,7 @@ type receiveConfig struct {

ignoreBlockSize bool
allowOutOfOrderUpload bool
enableBirthstone bool

reqLogConfig *extflag.PathOrContent
relabelConfigPath *extflag.PathOrContent
Expand Down Expand Up @@ -1153,6 +1157,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
"about order.").
Default("false").Hidden().BoolVar(&rc.allowOutOfOrderUpload)

cmd.Flag("shipper.enable-birthstone",
"If true, shipper will upload a birthstone for each complete block to bucket.").
Default("false").Hidden().BoolVar(&rc.enableBirthstone)

rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)

rc.writeLimitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden())
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ func runRule(
}
}()

s := shipper.New(logger, reg, conf.dataDir, bkt, func() labels.Labels { return conf.lset }, metadata.RulerSource, nil, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc), conf.shipper.metaFileName)
s := shipper.New(logger, reg, conf.dataDir, bkt, func() labels.Labels { return conf.lset }, metadata.RulerSource, nil, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc), conf.shipper.metaFileName, conf.shipper.enableBirthstone)

ctx, cancel := context.WithCancel(context.Background())

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func runSidecar(

uploadCompactedFunc := func() bool { return conf.shipper.uploadCompacted }
s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,
uploadCompactedFunc, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc), conf.shipper.metaFileName)
uploadCompactedFunc, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc), conf.shipper.metaFileName, conf.shipper.enableBirthstone)

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type syncStrategy string
const (
concurrentDiscovery syncStrategy = "concurrent"
recursiveDiscovery syncStrategy = "recursive"
birthstoneDiscovery syncStrategy = "birthstone"
)

type storeConfig struct {
Expand Down
Loading
Loading