diff --git a/pkg/transport/proxy/httpsse/http_proxy.go b/pkg/transport/proxy/httpsse/http_proxy.go index 8c0190b27..5b9ff3baa 100644 --- a/pkg/transport/proxy/httpsse/http_proxy.go +++ b/pkg/transport/proxy/httpsse/http_proxy.go @@ -488,7 +488,7 @@ func (p *HTTPSSEProxy) processPendingMessages(clientID string, messageCh chan<- } // Find messages for this client (all messages for now) - for _, pendingMsg := range p.pendingMessages { + for i, pendingMsg := range p.pendingMessages { // Convert to SSE string sseString := pendingMsg.Message.ToSSEString() @@ -498,7 +498,10 @@ func (p *HTTPSSEProxy) processPendingMessages(clientID string, messageCh chan<- // Message sent successfully default: // Channel is full, stop sending - logger.Errorf("Failed to send pending message to client %s (channel full)", clientID) + logger.Errorf("Client %s channel full after sending %d/%d pending messages", + clientID, i, len(p.pendingMessages)) + // Remove successfully sent messages and keep the rest + p.pendingMessages = p.pendingMessages[i:] return } } diff --git a/pkg/transport/proxy/httpsse/http_proxy_test.go b/pkg/transport/proxy/httpsse/http_proxy_test.go index cc11ee7ff..555f395fc 100644 --- a/pkg/transport/proxy/httpsse/http_proxy_test.go +++ b/pkg/transport/proxy/httpsse/http_proxy_test.go @@ -298,6 +298,46 @@ func TestProcessPendingMessages(t *testing.T) { proxy.pendingMutex.Unlock() } +// TestProcessPendingMessages_ChannelFull tests partial delivery when channel is full +// +//nolint:paralleltest // Test modifies shared proxy state +func TestProcessPendingMessages_ChannelFull(t *testing.T) { + proxy := NewHTTPSSEProxy("localhost", 8080, false, nil) + + // Add 10 pending messages + for i := 0; i < 10; i++ { + msg := ssecommon.NewSSEMessage("test", fmt.Sprintf("data-%d", i)) + proxy.pendingMutex.Lock() + proxy.pendingMessages = append(proxy.pendingMessages, ssecommon.NewPendingSSEMessage(msg)) + proxy.pendingMutex.Unlock() + } + + // Create a client channel that can only hold 3 messages + messageCh := make(chan string, 3) + + // Process pending messages + proxy.processPendingMessages("client-1", messageCh) + + // Verify only 3 messages were sent + assert.Len(t, messageCh, 3) + + // Verify 7 messages remain pending for reconnection + proxy.pendingMutex.Lock() + assert.Len(t, proxy.pendingMessages, 7) + proxy.pendingMutex.Unlock() + + // Reconnected client should receive the remaining messages + messageCh2 := make(chan string, 10) + proxy.processPendingMessages("client-1", messageCh2) + + assert.Len(t, messageCh2, 7) + + // Verify all pending messages are now cleared + proxy.pendingMutex.Lock() + assert.Empty(t, proxy.pendingMessages) + proxy.pendingMutex.Unlock() +} + // TestHandleSSEConnection tests the SSE connection handler // //nolint:paralleltest // Test uses HTTP test server