From 71b98d29f0accb45513bcd8c203758a286876e32 Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Tue, 3 Dec 2024 16:18:48 -0800 Subject: [PATCH 1/3] add debug logs --- pkg/receive/handler.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 62308285fd0..b2233c32c3d 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -863,6 +863,17 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) ( } } +func printMap(data map[endpointReplica]map[string]trackedSeries) { + for key, innerMap := range data { + fmt.Printf("Endpoint: %s, Replica: %d\n", key.endpoint.Address, key.replica) + for strKey, series := range innerMap { + fmt.Printf(" Key: %s\n", strKey) + fmt.Printf(" SeriesIDs: %v\n", series.seriesIDs) + fmt.Printf(" TimeSeries length: %d\n", len(series.timeSeries)) + } + } +} + // distributeTimeseriesToReplicas distributes the given timeseries from the tenant to different endpoints in a manner // that achieves the replication factor indicated by replicas. // The first return value are the series that should be written to the local node. The second return value are the @@ -928,6 +939,10 @@ func (h *Handler) distributeTimeseriesToReplicas( if h.receiverMode == IngestorOnly && len(remoteWrites) > 0 { panic("ingestor only mode should not have any remote writes") } + fmt.Println("localWrites:") + printMap(localWrites) + fmt.Println("remoteWrites:") + printMap(remoteWrites) return localWrites, remoteWrites, nil } From e7e96ef7cb8a109c98b212d5f0bf1cc80cff242b Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Tue, 3 Dec 2024 16:50:16 -0800 Subject: [PATCH 2/3] add more debug logs --- pkg/receive/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index b2233c32c3d..40ee7886aba 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -865,7 +865,7 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) ( func printMap(data map[endpointReplica]map[string]trackedSeries) { for key, innerMap := range data { - fmt.Printf("Endpoint: %s, Replica: %d\n", key.endpoint.Address, key.replica) + fmt.Printf("Endpoint: %s, CapNProtoAddress: %s, AZ: %s, Replica: %d\n", key.endpoint.Address, key.endpoint.CapNProtoAddress, key.endpoint.AZ, key.replica) for strKey, series := range innerMap { fmt.Printf(" Key: %s\n", strKey) fmt.Printf(" SeriesIDs: %v\n", series.seriesIDs) From cf49e958ba3159313bc2c5afd3d2df7746c232a9 Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Tue, 3 Dec 2024 17:44:00 -0800 Subject: [PATCH 3/3] add optimized code path for ingestor only mode --- pkg/receive/handler.go | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 40ee7886aba..2a0f82da61f 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -789,7 +789,13 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) ( } requestLogger := log.With(h.logger, logTags...) - localWrites, remoteWrites, err := h.distributeTimeseriesToReplicas(params.tenant, params.replicas, params.writeRequest.Timeseries) + var localWrites, remoteWrites map[endpointReplica]map[string]trackedSeries + var err error + if h.receiverMode == IngestorOnly { + localWrites, remoteWrites, err = h.distributeTimeseriesToReplicasIngestorOnly(params.tenant, params.replicas, params.writeRequest.Timeseries) + } else { + localWrites, remoteWrites, err = h.distributeTimeseriesToReplicas(params.tenant, params.replicas, params.writeRequest.Timeseries) + } if err != nil { level.Error(requestLogger).Log("msg", "failed to distribute timeseries to replicas", "err", err) return stats, err @@ -874,6 +880,22 @@ func printMap(data map[endpointReplica]map[string]trackedSeries) { } } +func (h *Handler) distributeTimeseriesToReplicasIngestorOnly(tenantHTTP string, replicas []uint64, timeseries []prompb.TimeSeries) (map[endpointReplica]map[string]trackedSeries, map[endpointReplica]map[string]trackedSeries, error) { + remoteWrites := make(map[endpointReplica]map[string]trackedSeries) + localWrites := make(map[endpointReplica]map[string]trackedSeries, len(replicas)) + for _, rn := range replicas { + endpointReplica := endpointReplica{endpoint: Endpoint{Address: h.options.Endpoint, CapNProtoAddress: h.options.Endpoint}, replica: rn} + seriesids := make([]int, len(timeseries), len(timeseries)) + for i := range timeseries { + seriesids[i] = i + } + localWrites[endpointReplica] = map[string]trackedSeries{tenantHTTP: {seriesIDs: seriesids, timeSeries: timeseries}} + } + fmt.Println("localWrites (IngestorOnly):") + printMap(localWrites) + return localWrites, remoteWrites, nil +} + // distributeTimeseriesToReplicas distributes the given timeseries from the tenant to different endpoints in a manner // that achieves the replication factor indicated by replicas. // The first return value are the series that should be written to the local node. The second return value are the @@ -939,9 +961,9 @@ func (h *Handler) distributeTimeseriesToReplicas( if h.receiverMode == IngestorOnly && len(remoteWrites) > 0 { panic("ingestor only mode should not have any remote writes") } - fmt.Println("localWrites:") + fmt.Println("localWrites (both):") printMap(localWrites) - fmt.Println("remoteWrites:") + fmt.Println("remoteWrites (both):") printMap(remoteWrites) return localWrites, remoteWrites, nil }