diff --git a/.chloggen/feat_43097.yaml b/.chloggen/feat_43097.yaml new file mode 100644 index 0000000000000..66ef4e2fcf65e --- /dev/null +++ b/.chloggen/feat_43097.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkareceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add `max_partition_fetch_size` configuration option to kafkareceiver" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [43097] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/internal/kafka/franz_client.go b/internal/kafka/franz_client.go index cb840cf7d6f1a..c2118a42a036d 100644 --- a/internal/kafka/franz_client.go +++ b/internal/kafka/franz_client.go @@ -132,6 +132,9 @@ func NewFranzConsumerGroup(ctx context.Context, clientCfg configkafka.ClientConf if consumerCfg.DefaultFetchSize > 0 { opts = append(opts, kgo.FetchMaxBytes(consumerCfg.DefaultFetchSize)) } + if consumerCfg.MaxPartitionFetchSize > 0 { + opts = append(opts, kgo.FetchMaxPartitionBytes(consumerCfg.MaxPartitionFetchSize)) + } // Configure max fetch wait if consumerCfg.MaxFetchWait > 0 { diff --git a/pkg/kafka/configkafka/config.go b/pkg/kafka/configkafka/config.go index 0c469e4d733fd..caa4a8a7592ab 100644 --- a/pkg/kafka/configkafka/config.go +++ b/pkg/kafka/configkafka/config.go @@ -118,6 +118,11 @@ type ConsumerConfig struct { // The maximum amount of time to wait for MinFetchSize bytes to be // available before the broker returns a response (default 250ms) MaxFetchWait time.Duration `mapstructure:"max_fetch_wait"` + + // MaxPartitionFetchSize defines the maximum number of bytes to fetch + // per partition (default "1048576") + MaxPartitionFetchSize int32 `mapstructure:"max_partition_fetch_size"` + // RebalanceStrategy specifies the strategy to use for partition assignment. // Possible values are "range", "roundrobin", and "sticky". // Defaults to "range". @@ -137,10 +142,11 @@ func NewDefaultConsumerConfig() ConsumerConfig { Enable: true, Interval: time.Second, }, - MinFetchSize: 1, - MaxFetchSize: 0, - MaxFetchWait: 250 * time.Millisecond, - DefaultFetchSize: 1048576, + MinFetchSize: 1, + MaxFetchSize: 0, + MaxFetchWait: 250 * time.Millisecond, + DefaultFetchSize: 1048576, + MaxPartitionFetchSize: 1048576, } } diff --git a/pkg/kafka/configkafka/config_test.go b/pkg/kafka/configkafka/config_test.go index 429c85fd8b817..a10ca95b43dc3 100644 --- a/pkg/kafka/configkafka/config_test.go +++ b/pkg/kafka/configkafka/config_test.go @@ -160,10 +160,11 @@ func TestConsumerConfig(t *testing.T) { Enable: false, Interval: 10 * time.Minute, }, - MinFetchSize: 10, - DefaultFetchSize: 1024, - MaxFetchSize: 4096, - MaxFetchWait: 1 * time.Second, + MinFetchSize: 10, + DefaultFetchSize: 1024, + MaxFetchSize: 4096, + MaxFetchWait: 1 * time.Second, + MaxPartitionFetchSize: 4096, }, }, diff --git a/pkg/kafka/configkafka/testdata/consumer_config.yaml b/pkg/kafka/configkafka/testdata/consumer_config.yaml index 61cc8f11fc4bd..dc2a9d85127ab 100644 --- a/pkg/kafka/configkafka/testdata/consumer_config.yaml +++ b/pkg/kafka/configkafka/testdata/consumer_config.yaml @@ -11,6 +11,7 @@ kafka/full: default_fetch_size: 1024 max_fetch_size: 4096 max_fetch_wait: 1s + max_partition_fetch_size: 4096 # Invalid configurations kafka/invalid_initial_offset: diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index 9d96c81f3fd00..fda1db2db6565 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -76,6 +76,7 @@ The following settings can be optionally configured: - `default_fetch_size` (default = `1048576`): The default number of message bytes to fetch in a request, defaults to 1MB. - `max_fetch_size` (default = `0`): The maximum number of message bytes to fetch in a request, defaults to unlimited. - `max_fetch_wait` (default = `250ms`): The maximum amount of time the broker should wait for `min_fetch_size` bytes to be available before returning anyway. +- `max_partition_fetch_size` (default = `1048576`): The default number of message bytes to fetch in a request per partition, defaults to 1MB. If a single record batch is larger than this value, the broker will still return it to ensure the consumer can make progress. This setting only applies while using [`franz-go`](https://github.com/twmb/franz-go). - `tls`: see [TLS Configuration Settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md) for the full set of available options. - `auth` - `plain_text` (Deprecated in v0.123.0: use sasl with mechanism set to PLAIN instead.)