Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@ jobs:

strategy:
matrix:
php: [8.4, 8.3, 8.2]
laravel: [12.*, 11.*, 10.*]
php: [8.4, 8.3]
laravel: [12.*, 11.*]
dependency-version: [prefer-stable]
include:
- laravel: 10.*
testbench: 8.*
- laravel: 11.*
testbench: 9.*
- laravel: 12.*
Expand Down
12 changes: 6 additions & 6 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
"description": "A kafka driver for laravel",
"type": "library",
"require": {
"php": "^8.2|^8.3|^8.4",
"php": "^8.3|^8.4",
"ext-rdkafka": "^6.0",
"monolog/monolog": "^3",
"mateusjunges/avro-serde-php": "^3.0",
"illuminate/support": "^10.0|^11.0|^12.0",
"illuminate/contracts": "^10.0|^11.0|^12.0"
"illuminate/support": "^11.0|^12.0",
"illuminate/contracts": "^11.0|^12.0"
},
"require-dev": {
"phpunit/phpunit": "^10.5|^11.5.3",
"orchestra/testbench": "^7.16|^8.0|^9.0|^10.0",
"orchestra/testbench": "^9.0|^10.0",
"predis/predis": "^1",
"rector/rector": "^0.19.8",
"rector/rector": "^2.1",
"laravel/pint": "dev-main"
},
"minimum-stability": "dev",
Expand All @@ -37,7 +37,7 @@
}
],
"scripts": {
"test": "vendor/bin/phpunit tests",
"test": "vendor/bin/phpunit",
"format": "vendor/bin/pint"
},
"extra": {
Expand Down
61 changes: 0 additions & 61 deletions docs/consuming-messages/queueable-handlers.md

This file was deleted.

18 changes: 13 additions & 5 deletions docs/producing-messages/producing-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,24 @@ Kafka::publish('broker')->onTopic('topic-name')
This method returns a `ProducerBuilder` instance, which contains a few methods to configure your kafka producer.
The following lines describes these methods.

If you are going to produce a lot of messages to different topics, please use the `asyncPublish` method on the `Junges\Kafka\Facades\Kafka` class:
The default `publish()` method now uses asynchronous publishing for better performance. Messages are queued and flushed when the application terminates:

```php
use Junges\Kafka\Facades\Kafka;

Kafka::asyncPublish('broker')->onTopic('topic-name')
Kafka::publish('broker')->onTopic('topic-name')
```

The main difference is that the Async Producer is a singleton and will only flush the producer when the application is shutting down, instead of after each send.
This reduces the overhead when you want to send a lot of messages in your request handlers.
The async producer is a singleton and will only flush messages when the application is shutting down, instead of after each send.
This reduces overhead when you want to send a lot of messages in your request handlers.

If you need immediate message flushing (synchronous publishing), use the `publishSync()` method:

```php
use Junges\Kafka\Facades\Kafka;

Kafka::publishSync('broker')->onTopic('topic-name')
```

```+parse
<x-sponsors.request-sponsor/>
Expand All @@ -37,6 +45,6 @@ available on the `Kafka` facade (added in v2.2.0). This method will return a fre
use Junges\Kafka\Facades\Kafka;

Kafka::fresh()
->asyncPublish('broker')
->publish('broker')
->onTopic('topic-name')
```
15 changes: 13 additions & 2 deletions docs/producing-messages/publishing-to-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,19 @@ $producer = Kafka::publish('broker')
$producer->send();
```

If you want to send multiple messages, consider using the async producer instead. The default `send` method is recommended for low-throughput systems only, as it
flushes the producer after every message that is sent.
The `publish()` method uses asynchronous publishing for better performance, batching messages and flushing them when the application terminates.
If you need immediate message flushing, use `publishSync()` instead:

```php
use Junges\Kafka\Facades\Kafka;

// For immediate flush (synchronous)
$producer = Kafka::publishSync('broker')
->onTopic('topic')
->withKafkaKey('kafka-key');

$producer->send();
```

```+parse
<x-sponsors.request-sponsor/>
Expand Down
30 changes: 30 additions & 0 deletions docs/upgrade-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,36 @@ title: Upgrade guide
weight: 6
---

## Upgrade to v3.0 from v2.9

### Breaking Changes

- `publish()` is now asynchronous by default. Messages are queued and flushed when the application terminates for better performance
- Removed `asyncPublish()` and `publishAsync()` methods - use `publish()` for async behavior (default) or `publishSync()` for immediate flushing
- Minimum PHP version raised to 8.3
- Minimum Laravel version raised to 11.0
- **NEW**: Added `publishSync()` method for synchronous message publishing with immediate flush

### Migration Guide

**Before (v2.9):**
```php
// Async publishing
Kafka::asyncPublish()->onTopic('topic')->withBody(['data' => 'value'])->send();

// Sync publishing
Kafka::publish()->onTopic('topic')->withBody(['data' => 'value'])->send();
```

**After (v3.0):**
```php
// Async publishing (default behavior)
Kafka::publish()->onTopic('topic')->withBody(['data' => 'value'])->send();

// Sync publishing (immediate flush)
Kafka::publishSync()->onTopic('topic')->withBody(['data' => 'value'])->send();
```

## Upgrade to v2.9 from v2.8

- **BREAKING CHANGE**: Deprecated producer batch messages feature has been removed (`MessageBatch`, `sendBatch`, `produceBatch`). Use `Kafka::asyncPublish()` instead for better performance
Expand Down
10 changes: 6 additions & 4 deletions rector.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,22 @@
use Rector\CodeQuality\Rector\Class_\InlineConstructorDefaultToPropertyRector;
use Rector\Config\RectorConfig;
use Rector\Set\ValueObject\LevelSetList;
use Rector\TypeDeclaration\Rector\ClassMethod\ReturnNeverTypeRector;

return static function (RectorConfig $rectorConfig): void {
$rectorConfig->paths([
__DIR__.'/config',
__DIR__.'/dev',
__DIR__.'/src',
__DIR__.'/tests',
]);

// register a single rule
$rectorConfig->rule(InlineConstructorDefaultToPropertyRector::class);

// define sets of rules
$rectorConfig->sets([
LevelSetList::UP_TO_PHP_81,
LevelSetList::UP_TO_PHP_83,
]);

$rectorConfig->skip([
ReturnNeverTypeRector::class,
]);
};
2 changes: 1 addition & 1 deletion src/Commit/RetryableCommitter.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class RetryableCommitter implements Committer
{
private const RETRYABLE_ERRORS = [
private const array RETRYABLE_ERRORS = [
RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
Expand Down
28 changes: 0 additions & 28 deletions src/Concerns/HandleConsumedMessage.php

This file was deleted.

24 changes: 0 additions & 24 deletions src/Concerns/PrepareMiddlewares.php

This file was deleted.

8 changes: 4 additions & 4 deletions src/Config/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

class Config
{
final public const SASL_PLAINTEXT = 'SASL_PLAINTEXT';
final public const string SASL_PLAINTEXT = 'SASL_PLAINTEXT';

final public const SASL_SSL = 'SASL_SSL';
final public const string SASL_SSL = 'SASL_SSL';

final public const PRODUCER_ONLY_CONFIG_OPTIONS = [
final public const array PRODUCER_ONLY_CONFIG_OPTIONS = [
'transactional.id',
'transaction.timeout.ms',
'enable.idempotence',
Expand All @@ -36,7 +36,7 @@ class Config
'sticky.partitioning.linger.ms',
];

final public const CONSUMER_ONLY_CONFIG_OPTIONS = [
final public const array CONSUMER_ONLY_CONFIG_OPTIONS = [
'partition.assignment.strategy',
'session.timeout.ms',
'heartbeat.interval.ms',
Expand Down
Loading
Loading