Replies: 4 comments
-
Off topic: The |
Beta Was this translation helpful? Give feedback.
0 replies
-
Hey @eusonlito 👋
Fixed, thanks! Can I see your code please? I don't have this many threads here |
Beta Was this translation helpful? Give feedback.
0 replies
-
My code is (similar to): $builder = ConsumerBuilder::create(
brokers: $config['brokers'],
groupId: $config['consumer_group_id'],
);
$builder->subscribe($topic);
$builder->withHandler(fn ($message) => $handler($message));
$builder->withSasl(new Sasl(
securityProtocol: $config['security_protocol'],
mechanisms: $config['sasl_mechanism'],
username: $config['sasl_username'],
password: $config['sasl_password'],
));
$builder->build()->consume(); And the config: <?php
return [
/*
| Your kafka brokers url.
*/
'brokers' => env('KAFKA_BROKERS', 'localhost:9092'),
/*
| Kafka consumers belonging to the same consumer group share a group id.
| The consumers in a group then divides the topic partitions as fairly amongst themselves as possible by
| establishing that each partition is only consumed by a single consumer from the group.
| This config defines the consumer group id you want to use for your project.
*/
'consumer_group_id' => env('KAFKA_CONSUMER_GROUP_ID', 'group'),
/*
| After the consumer receives its assignment from the coordinator,
| it must determine the initial position for each assigned partition.
| When the group is first created, before any messages have been consumed, the position is set according to a configurable
| offset reset policy (auto.offset.reset). Typically, consumption starts either at the earliest offset or the latest offset.
| You can choose between "latest", "earliest" or "none".
*/
'offset_reset' => env('KAFKA_OFFSET_RESET', 'earliest'),
/*
| If you set enable.auto.commit (which is the default), then the consumer will automatically commit offsets periodically at the
| interval set by auto.commit.interval.ms.
*/
'auto_commit' => env('KAFKA_AUTO_COMMIT', true),
'sleep_on_error' => env('KAFKA_ERROR_SLEEP', 5),
'partition' => env('KAFKA_PARTITION', -1),
/*
| Kafka supports 4 compression codecs: none , gzip , lz4 and snappy
*/
'compression' => env('KAFKA_COMPRESSION_TYPE', 'snappy'),
/*
| Choose if debug is enabled or not.
*/
'debug' => env('KAFKA_DEBUG', false),
/*
| Repository for batching messages together
| Implement BatchRepositoryInterface to save batches in different storage
*/
'batch_repository' => env('KAFKA_BATCH_REPOSITORY', \Junges\Kafka\BatchRepositories\InMemoryBatchRepository::class),
'security_protocol' => env('KAFKA_SECURITY_PROTOCOL'),
'sasl_mechanism' => env('KAFKA_SASL_MECHANISM'),
'sasl_username' => env('KAFKA_SASL_USERNAME'),
'sasl_password' => env('KAFKA_SASL_PASSWORD'),
]; |
Beta Was this translation helpful? Give feedback.
0 replies
-
Thanks! I'll take a look tonight |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
My project has a consumer defined as:
And this generate more than 100 threads:
I would like to understand what is the reason, if they are necessary, if it is configurable, or if it is advisable to leave it like that.
Thanks a lot for this great package.
Beta Was this translation helpful? Give feedback.
All reactions