Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
# Changelog

## master / unreleased
* [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160
* [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371
* [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385
* [FEATURE] Querier: Add timeout classification to classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing. When enabled, queries that spend most of their time in PromQL evaluation return `422 Unprocessable Entity` instead of `503 Service Unavailable`. #7374
* [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420
* [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401
* [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359
* [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357
* [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160
* [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363
* [ENHANCEMENT] Query Scheduler: Add `cortex_query_scheduler_tracked_requests` metric to track the current number of requests held by the scheduler. #7355
* [ENHANCEMENT] Distributor: Optimize memory allocations by reusing the existing capacity of these pooled slices in the Prometheus Remote Write 2.0 path. #7392
* [ENHANCEMENT] Distributor: Optimize memory allocations by pooling PreallocWriteRequestV2 and preserving the capacity of the Symbols slice during resets. #7404
* [BUGFIX] Alertmanager: Fix disappearing user config and state when ring is temporarily unreachable. #7372
* [BUGFIX] Fix nil when ingester_query_max_attempts > 1. #7369
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370
Expand Down
8 changes: 8 additions & 0 deletions pkg/cortexpb/timeseriesv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ func PreallocWriteRequestV2FromPool() *PreallocWriteRequestV2 {
return writeRequestPoolV2.Get().(*PreallocWriteRequestV2)
}

// Reset implements proto.Message and preserves the capacity of the Symbols slice.
func (p *PreallocWriteRequestV2) Reset() {
savedSymbols := p.Symbols
p.WriteRequestV2.Reset()
p.Symbols = savedSymbols[:0]
p.data = nil
}

Comment on lines +133 to +140
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Override a Reset() to preserve the capacity of the Symbols.

// PreallocTimeseriesV2SliceFromPool retrieves a slice of PreallocTimeseriesV2 from a sync.Pool.
// ReuseSliceV2 should be called once done.
func PreallocTimeseriesV2SliceFromPool() []PreallocTimeseriesV2 {
Expand Down
65 changes: 65 additions & 0 deletions pkg/cortexpb/timeseriesv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,71 @@ func TestReuseWriteRequestV2(t *testing.T) {
})
}

func TestPreallocWriteRequestV2Reset(t *testing.T) {
t.Run("preserves Symbols capacity", func(t *testing.T) {
const symbolsCap = 100
req := &PreallocWriteRequestV2{
WriteRequestV2: WriteRequestV2{
Symbols: make([]string, 0, symbolsCap),
},
}
req.Symbols = append(req.Symbols, "a", "b", "c")

ptrBefore := &req.Symbols[:cap(req.Symbols)][0]

req.Reset()

assert.Equal(t, 0, len(req.Symbols), "Symbols length should be 0 after Reset")
assert.Equal(t, symbolsCap, cap(req.Symbols), "Symbols capacity should be preserved after Reset")
assert.Same(t, ptrBefore, &req.Symbols[:cap(req.Symbols)][0], "Symbols backing array should be reused after Reset")
})

t.Run("clears non-Symbols WriteRequestV2 fields", func(t *testing.T) {
b := []byte{1, 2, 3}
req := &PreallocWriteRequestV2{
WriteRequestV2: WriteRequestV2{
Source: RULE,
SkipLabelNameValidation: true,
Timeseries: []PreallocTimeseriesV2{{TimeSeriesV2: &TimeSeriesV2{}}},
},
data: &b,
}

req.Reset()

assert.Equal(t, SourceEnum(0), req.Source)
assert.False(t, req.SkipLabelNameValidation)
assert.Nil(t, req.Timeseries)
assert.Nil(t, req.data)
})

t.Run("Unmarshal after Reset reuses Symbols backing array", func(t *testing.T) {
const symbolsCount = 50
symbols := make([]string, symbolsCount)
for i := range symbols {
symbols[i] = fmt.Sprintf("symbol_%04d", i)
}
data, err := (&WriteRequestV2{Symbols: symbols}).Marshal()
require.NoError(t, err)

req := &PreallocWriteRequestV2{
WriteRequestV2: WriteRequestV2{
Symbols: make([]string, 0, symbolsCount*2),
},
}

// Simulate Reset in util.ParseProtoReader()
req.Reset()
ptrAfterReset := &req.Symbols[:cap(req.Symbols)][0]
capAfterReset := cap(req.Symbols)

require.NoError(t, req.WriteRequestV2.Unmarshal(data))
assert.Equal(t, symbolsCount, len(req.Symbols))
assert.Equal(t, capAfterReset, cap(req.Symbols), "capacity should not change: Unmarshal reused the existing backing array")
assert.Same(t, ptrAfterReset, &req.Symbols[:cap(req.Symbols)][0], "backing array pointer should be identical: no new allocation occurred")
})
}

func BenchmarkMarshallWriteRequestV2(b *testing.B) {
ts := PreallocTimeseriesV2SliceFromPool()

Expand Down
11 changes: 4 additions & 7 deletions pkg/util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,11 @@ func Handler(remoteWrite2Enabled bool, acceptUnknownRemoteWriteContentType bool,
return
}

var req cortexpb.PreallocWriteRequestV2
req := cortexpb.PreallocWriteRequestV2FromPool()
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a req from the pool to reuse the capacity of the Symbols.

// v1 request is put back into the pool by the Distributor.
defer func() {
cortexpb.ReuseWriteRequestV2(&req)
req.Free()
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete the req.Free() since this path is a http.

}()
defer cortexpb.ReuseWriteRequestV2(req)

err = util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
err = util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, req, util.RawSnappy)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand All @@ -113,7 +110,7 @@ func Handler(remoteWrite2Enabled bool, acceptUnknownRemoteWriteContentType bool,
req.Source = cortexpb.API
}

v1Req, err := convertV2RequestToV1(&req, overrides.EnableTypeAndUnitLabels(userID), overrides.EnableStartTimestamp(userID))
v1Req, err := convertV2RequestToV1(req, overrides.EnableTypeAndUnitLabels(userID), overrides.EnableStartTimestamp(userID))
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down
Loading