-
Notifications
You must be signed in to change notification settings - Fork 25
fix: Prevent data loss when batched events exceed 32KB payload limit #82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -52,34 +52,91 @@ public function getConsumer() | |||||||||||||
* @return boolean whether the request succeeded | ||||||||||||||
*/ | ||||||||||||||
public function flushBatch($messages) | ||||||||||||||
{ | ||||||||||||||
if (empty($messages)) { | ||||||||||||||
return true; | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
return $this->sendBatch($messages); | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
/** | ||||||||||||||
* Sends a batch of messages, splitting if necessary to fit 32KB limit | ||||||||||||||
* @param array $messages | ||||||||||||||
* @return boolean success | ||||||||||||||
*/ | ||||||||||||||
private function sendBatch($messages) | ||||||||||||||
{ | ||||||||||||||
$body = $this->payload($messages); | ||||||||||||||
$payload = json_encode($body); | ||||||||||||||
|
||||||||||||||
// Verify message size is below than 32KB | ||||||||||||||
// Check 32KB limit | ||||||||||||||
if (strlen($payload) >= 32 * 1024) { | ||||||||||||||
if ($this->debug()) { | ||||||||||||||
$msg = "Message size is larger than 32KB"; | ||||||||||||||
error_log("[PostHog][" . $this->type . "] " . $msg); | ||||||||||||||
} | ||||||||||||||
return $this->handleOversizedBatch($messages, strlen($payload)); | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
// Send the batch | ||||||||||||||
return $this->performHttpRequest($payload, $messages[0]); | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
/** | ||||||||||||||
* Handles batches that exceed 32KB limit | ||||||||||||||
* @param array $messages | ||||||||||||||
* @param int $payloadSize | ||||||||||||||
* @return boolean success | ||||||||||||||
*/ | ||||||||||||||
private function handleOversizedBatch($messages, $payloadSize) | ||||||||||||||
{ | ||||||||||||||
$messageCount = count($messages); | ||||||||||||||
|
||||||||||||||
// Single message too large - drop it | ||||||||||||||
if ($messageCount === 1) { | ||||||||||||||
$this->handleError( | ||||||||||||||
'payload_too_large', | ||||||||||||||
sprintf( | ||||||||||||||
'Single message payload size (%d bytes) exceeds 32KB limit. Message will be dropped.', | ||||||||||||||
$payloadSize | ||||||||||||||
) | ||||||||||||||
); | ||||||||||||||
return false; | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
// Split and try both halves | ||||||||||||||
$midpoint = intval($messageCount / 2); | ||||||||||||||
$firstHalf = array_slice($messages, 0, $midpoint); | ||||||||||||||
$secondHalf = array_slice($messages, $midpoint); | ||||||||||||||
|
||||||||||||||
$firstResult = $this->sendBatch($firstHalf); | ||||||||||||||
$secondResult = $this->sendBatch($secondHalf); | ||||||||||||||
|
||||||||||||||
return $firstResult && $secondResult; | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
/** | ||||||||||||||
* Performs the actual HTTP request | ||||||||||||||
* @param string $payload | ||||||||||||||
* @param array $sampleMessage | ||||||||||||||
* @return boolean success | ||||||||||||||
*/ | ||||||||||||||
private function performHttpRequest($payload, $sampleMessage) | ||||||||||||||
{ | ||||||||||||||
if ($this->compress_request) { | ||||||||||||||
$payload = gzencode($payload); | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
return $this->httpClient->sendRequest( | ||||||||||||||
$response = $this->httpClient->sendRequest( | ||||||||||||||
'/batch/', | ||||||||||||||
$payload, | ||||||||||||||
[ | ||||||||||||||
// Send user agent in the form of {library_name}/{library_version} as per RFC 7231. | ||||||||||||||
"User-Agent: {$messages[0]['library']}/{$messages[0]['library_version']}", | ||||||||||||||
"User-Agent: {$sampleMessage['library']}/{$sampleMessage['library_version']}", | ||||||||||||||
], | ||||||||||||||
[ | ||||||||||||||
'shouldVerify' => $this->options['verify_batch_events_request'] ?? true, | ||||||||||||||
] | ||||||||||||||
)->getResponse(); | ||||||||||||||
); | ||||||||||||||
|
||||||||||||||
// Return boolean based on whether we got a response | ||||||||||||||
return !empty($response->getResponse()); | ||||||||||||||
} | ||||||||||||||
Comment on lines
+138
to
140
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The method returns
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||||||||||
|
||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
<?php | ||
|
||
namespace PostHog\Test; | ||
|
||
/** | ||
* Mock error handler to capture and verify error reporting | ||
*/ | ||
class MockErrorHandler | ||
{ | ||
private $errors = []; | ||
|
||
public function handleError($code, $message) | ||
{ | ||
$this->errors[] = ['code' => $code, 'message' => $message]; | ||
} | ||
|
||
public function hasError($code) | ||
{ | ||
foreach ($this->errors as $error) { | ||
if ($error['code'] === $code) { | ||
return true; | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
public function getErrors() | ||
{ | ||
return $this->errors; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,212 @@ | ||||||||
<?php | ||||||||
|
||||||||
namespace PostHog\Test; | ||||||||
|
||||||||
use PHPUnit\Framework\TestCase; | ||||||||
use PostHog\Client; | ||||||||
use PostHog\Test\MockErrorHandler; | ||||||||
|
||||||||
/** | ||||||||
* Test suite for the 32KB payload size limit fix. | ||||||||
* | ||||||||
* This addresses the critical data loss issue where events exceeding 32KB | ||||||||
* when batched together were silently dropped instead of being split and sent. | ||||||||
*/ | ||||||||
class PayloadSizeLimitFixTest extends TestCase | ||||||||
{ | ||||||||
private $client; | ||||||||
private $mockHttpClient; | ||||||||
|
||||||||
public function setUp(): void | ||||||||
{ | ||||||||
date_default_timezone_set("UTC"); | ||||||||
|
||||||||
// Create a mock HTTP client that tracks successful requests | ||||||||
$this->mockHttpClient = new MockedHttpClient( | ||||||||
"app.posthog.com", | ||||||||
true, | ||||||||
10000, | ||||||||
false, | ||||||||
true | ||||||||
); | ||||||||
|
||||||||
$this->client = new Client( | ||||||||
"test_api_key", | ||||||||
[ | ||||||||
"consumer" => "lib_curl", | ||||||||
"debug" => true, | ||||||||
"batch_size" => 10, // Small batch size to control test | ||||||||
], | ||||||||
$this->mockHttpClient | ||||||||
); | ||||||||
} | ||||||||
|
||||||||
/** | ||||||||
* Helper method to reset and count HTTP requests | ||||||||
*/ | ||||||||
private function resetRequestCount(): void | ||||||||
{ | ||||||||
$this->mockHttpClient->calls = []; | ||||||||
} | ||||||||
|
||||||||
/** | ||||||||
* Helper method to get number of batch requests made | ||||||||
*/ | ||||||||
private function getBatchRequestCount(): int | ||||||||
{ | ||||||||
if (!isset($this->mockHttpClient->calls)) { | ||||||||
return 0; | ||||||||
} | ||||||||
|
||||||||
Comment on lines
+57
to
+60
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nitpick] The method checks if
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||||
$batchRequests = 0; | ||||||||
foreach ($this->mockHttpClient->calls as $call) { | ||||||||
if ($call['path'] === '/batch/') { | ||||||||
$batchRequests++; | ||||||||
} | ||||||||
} | ||||||||
return $batchRequests; | ||||||||
} | ||||||||
|
||||||||
/** | ||||||||
* Test that the fix properly handles oversized batches by splitting them | ||||||||
*/ | ||||||||
public function testOversizedBatchSplitting(): void | ||||||||
{ | ||||||||
// Reset request counter | ||||||||
$this->resetRequestCount(); | ||||||||
|
||||||||
// Create events with large properties to exceed 32KB when batched | ||||||||
$largeProperty = str_repeat('A', 4000); // 4KB string | ||||||||
|
||||||||
// Create 8 events, each ~4KB, totaling ~32KB+ (exceeds 32KB limit) | ||||||||
for ($i = 0; $i < 8; $i++) { | ||||||||
$result = $this->client->capture([ | ||||||||
"distinctId" => "user-{$i}", | ||||||||
"event" => "large_event_{$i}", | ||||||||
"properties" => [ | ||||||||
"large_data" => $largeProperty, | ||||||||
"event_index" => $i | ||||||||
] | ||||||||
]); | ||||||||
$this->assertTrue($result, "Event {$i} should be captured successfully"); | ||||||||
} | ||||||||
|
||||||||
// Flush remaining events | ||||||||
$flushResult = $this->client->flush(); | ||||||||
$this->assertTrue($flushResult, "Flush should succeed with splitting"); | ||||||||
|
||||||||
// Verify that multiple HTTP requests were made due to splitting | ||||||||
$requestCount = $this->getBatchRequestCount(); | ||||||||
$this->assertGreaterThan(1, $requestCount, "Multiple requests should be made when batch is split"); | ||||||||
} | ||||||||
|
||||||||
/** | ||||||||
* Test that single oversized messages are properly handled and reported | ||||||||
*/ | ||||||||
public function testSingleOversizedMessage(): void | ||||||||
{ | ||||||||
// Create a single event that exceeds 32KB | ||||||||
$veryLargeProperty = str_repeat('X', 33 * 1024); // 33KB string | ||||||||
|
||||||||
// Capture error logs | ||||||||
$errorHandler = new MockErrorHandler(); | ||||||||
$client = new Client( | ||||||||
"test_api_key", | ||||||||
[ | ||||||||
"consumer" => "lib_curl", | ||||||||
"debug" => true, | ||||||||
"error_handler" => [$errorHandler, 'handleError'] | ||||||||
], | ||||||||
$this->mockHttpClient | ||||||||
); | ||||||||
|
||||||||
$result = $client->capture([ | ||||||||
"distinctId" => "oversized_user", | ||||||||
"event" => "oversized_event", | ||||||||
"properties" => [ | ||||||||
"very_large_data" => $veryLargeProperty | ||||||||
] | ||||||||
]); | ||||||||
|
||||||||
// The event should still be accepted initially | ||||||||
$this->assertTrue($result, "Oversized event should be accepted initially"); | ||||||||
|
||||||||
// But flush should fail and error should be logged | ||||||||
$flushResult = $client->flush(); | ||||||||
$this->assertFalse($flushResult, "Flush should fail for oversized single message"); | ||||||||
|
||||||||
// Verify error was reported | ||||||||
$this->assertTrue( | ||||||||
$errorHandler->hasError('payload_too_large'), | ||||||||
"Error should be reported for oversized message" | ||||||||
); | ||||||||
} | ||||||||
|
||||||||
/** | ||||||||
* Test that multiple small events that accumulate to exceed 32KB are handled properly | ||||||||
*/ | ||||||||
public function testAccumulativePayloadSizeHandling(): void | ||||||||
{ | ||||||||
$this->resetRequestCount(); | ||||||||
|
||||||||
// Each event is small (2KB) but 20 events = 40KB total | ||||||||
$smallProperty = str_repeat('Z', 2000); | ||||||||
|
||||||||
$allSuccessful = true; | ||||||||
for ($i = 0; $i < 20; $i++) { | ||||||||
$result = $this->client->capture([ | ||||||||
"distinctId" => "accumulative_user_{$i}", | ||||||||
"event" => "small_event", | ||||||||
"properties" => [ | ||||||||
"data" => $smallProperty, | ||||||||
"index" => $i | ||||||||
] | ||||||||
]); | ||||||||
if (!$result) { | ||||||||
$allSuccessful = false; | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
$this->assertTrue($allSuccessful, "All small events should be accepted"); | ||||||||
|
||||||||
// Final flush should succeed because batches were split appropriately | ||||||||
$flushResult = $this->client->flush(); | ||||||||
$this->assertTrue($flushResult, "Final flush should succeed with proper batch splitting"); | ||||||||
|
||||||||
// Verify multiple requests were made | ||||||||
$requestCount = $this->getBatchRequestCount(); | ||||||||
$this->assertGreaterThan( | ||||||||
1, | ||||||||
$requestCount, | ||||||||
"Multiple requests should be made for accumulative payload" | ||||||||
); | ||||||||
} | ||||||||
|
||||||||
/** | ||||||||
* Test that normal-sized batches still work correctly | ||||||||
*/ | ||||||||
public function testNormalSizedBatches(): void | ||||||||
{ | ||||||||
$this->resetRequestCount(); | ||||||||
|
||||||||
// Create normal-sized events | ||||||||
for ($i = 0; $i < 5; $i++) { | ||||||||
$result = $this->client->capture([ | ||||||||
"distinctId" => "normal_user_{$i}", | ||||||||
"event" => "normal_event", | ||||||||
"properties" => [ | ||||||||
"small_data" => "normal data", | ||||||||
"index" => $i | ||||||||
] | ||||||||
]); | ||||||||
$this->assertTrue($result, "Normal event {$i} should be captured"); | ||||||||
} | ||||||||
|
||||||||
$flushResult = $this->client->flush(); | ||||||||
$this->assertTrue($flushResult, "Normal batch flush should succeed"); | ||||||||
|
||||||||
// Should only need one request for normal sized batch | ||||||||
$requestCount = $this->getBatchRequestCount(); | ||||||||
$this->assertEquals(1, $requestCount, "Only one request should be made for normal batch"); | ||||||||
} | ||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The
flushBatch
method now serves as a simple wrapper aroundsendBatch
. Consider if this indirection is necessary or if the logic could be simplified by moving the empty check intosendBatch
directly.Copilot uses AI. Check for mistakes.