Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/transport/proxy/httpsse/http_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ func (p *HTTPSSEProxy) processPendingMessages(clientID string, messageCh chan<-
}

// Find messages for this client (all messages for now)
for _, pendingMsg := range p.pendingMessages {
for i, pendingMsg := range p.pendingMessages {
// Convert to SSE string
sseString := pendingMsg.Message.ToSSEString()

Expand All @@ -498,7 +498,10 @@ func (p *HTTPSSEProxy) processPendingMessages(clientID string, messageCh chan<-
// Message sent successfully
default:
// Channel is full, stop sending
logger.Errorf("Failed to send pending message to client %s (channel full)", clientID)
logger.Errorf("Client %s channel full after sending %d/%d pending messages",
clientID, i, len(p.pendingMessages))
// Remove successfully sent messages and keep the rest
p.pendingMessages = p.pendingMessages[i:]
return
}
}
Expand Down
40 changes: 40 additions & 0 deletions pkg/transport/proxy/httpsse/http_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,46 @@ func TestProcessPendingMessages(t *testing.T) {
proxy.pendingMutex.Unlock()
}

// TestProcessPendingMessages_ChannelFull tests partial delivery when channel is full
//
//nolint:paralleltest // Test modifies shared proxy state
func TestProcessPendingMessages_ChannelFull(t *testing.T) {
proxy := NewHTTPSSEProxy("localhost", 8080, false, nil)

// Add 10 pending messages
for i := 0; i < 10; i++ {
msg := ssecommon.NewSSEMessage("test", fmt.Sprintf("data-%d", i))
proxy.pendingMutex.Lock()
proxy.pendingMessages = append(proxy.pendingMessages, ssecommon.NewPendingSSEMessage(msg))
proxy.pendingMutex.Unlock()
}

// Create a client channel that can only hold 3 messages
messageCh := make(chan string, 3)

// Process pending messages
proxy.processPendingMessages("client-1", messageCh)

// Verify only 3 messages were sent
assert.Len(t, messageCh, 3)

// Verify 7 messages remain pending for reconnection
proxy.pendingMutex.Lock()
assert.Len(t, proxy.pendingMessages, 7)
proxy.pendingMutex.Unlock()

// Reconnected client should receive the remaining messages
messageCh2 := make(chan string, 10)
proxy.processPendingMessages("client-1", messageCh2)

assert.Len(t, messageCh2, 7)

// Verify all pending messages are now cleared
proxy.pendingMutex.Lock()
assert.Empty(t, proxy.pendingMessages)
proxy.pendingMutex.Unlock()
}

// TestHandleSSEConnection tests the SSE connection handler
//
//nolint:paralleltest // Test uses HTTP test server
Expand Down
Loading