Skip to content

Commit 94158fc

Browse files
committed
fix: sse push client shutdown but still send cause short write error
1 parent 8dce0da commit 94158fc

File tree

2 files changed

+72
-7
lines changed

2 files changed

+72
-7
lines changed

internal/sse/model.go

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package sse
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
7+
"io"
68
"net/http"
79
"sync"
810
"time"
@@ -135,8 +137,10 @@ type Event struct {
135137

136138
// Client SSE 客户端
137139
type Client struct {
138-
ID string
139-
Writer http.ResponseWriter
140+
ID string
141+
Writer http.ResponseWriter
142+
disconnected bool
143+
mu sync.RWMutex
140144
}
141145

142146
// Close 关闭客户端连接
@@ -147,13 +151,23 @@ func (c *Client) Close() {
147151

148152
// Send 发送数据给客户端
149153
func (c *Client) Send(data []byte) error {
154+
// 检查客户端是否已断开
155+
if c.IsDisconnected() {
156+
return errors.New("客户端已断开连接")
157+
}
158+
150159
// 使用标准SSE格式发送数据
151160
// 格式: "data: {json数据}\n\n"
152161
sseData := fmt.Sprintf("data: %s\n\n", string(data))
153162

154163
// 直接写入HTTP响应流
155164
_, err := c.Writer.Write([]byte(sseData))
156165
if err != nil {
166+
// 检查是否是连接断开相关的错误
167+
if isConnectionError(err) {
168+
c.SetDisconnected(true)
169+
return fmt.Errorf("客户端连接已断开: %v", err)
170+
}
157171
return fmt.Errorf("写入SSE数据失败: %v", err)
158172
}
159173

@@ -167,6 +181,50 @@ func (c *Client) Send(data []byte) error {
167181

168182
// SetDisconnected 设置客户端断开状态
169183
func (c *Client) SetDisconnected(disconnected bool) {
170-
// 这里可以实现断开状态的设置逻辑
171-
// 目前是一个空实现
184+
c.mu.Lock()
185+
defer c.mu.Unlock()
186+
c.disconnected = disconnected
187+
}
188+
189+
// IsDisconnected 检查客户端是否已断开
190+
func (c *Client) IsDisconnected() bool {
191+
c.mu.RLock()
192+
defer c.mu.RUnlock()
193+
return c.disconnected
194+
}
195+
196+
// isConnectionError 检查错误是否为连接断开相关错误
197+
func isConnectionError(err error) bool {
198+
// 检查常见的连接断开错误
199+
if errors.Is(err, io.ErrShortWrite) {
200+
return true
201+
}
202+
203+
// 检查错误信息中是否包含连接断开的关键字
204+
errorStr := err.Error()
205+
connectionErrors := []string{
206+
"broken pipe",
207+
"connection reset",
208+
"use of closed network connection",
209+
"short write",
210+
"connection aborted",
211+
}
212+
213+
for _, errPattern := range connectionErrors {
214+
if contains(errorStr, errPattern) {
215+
return true
216+
}
217+
}
218+
219+
return false
220+
}
221+
222+
// contains 检查字符串是否包含子字符串(简单实现)
223+
func contains(s, substr string) bool {
224+
for i := 0; i <= len(s)-len(substr); i++ {
225+
if s[i:i+len(substr)] == substr {
226+
return true
227+
}
228+
}
229+
return false
172230
}

internal/sse/service.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -492,10 +492,17 @@ func (s *Service) sendTunnelUpdateByInstanceId(instanceID string, data SSEResp)
492492

493493
for clientID, client := range subscribers {
494494
if err := client.Send(jsonData); err != nil {
495-
log.Errorf("发送隧道更新给客户端 %s 失败: %v", clientID, err)
496-
client.SetDisconnected(true)
495+
// 检查是否是连接断开错误,使用更合适的日志级别
496+
if client.IsDisconnected() {
497+
log.Warnf("[SSE]客户端 %s 连接已断开,移除订阅", clientID)
498+
// 从订阅列表中移除已断开的客户端
499+
delete(subscribers, clientID)
500+
} else {
501+
log.Errorf("发送隧道更新给客户端 %s 失败: %v", clientID, err)
502+
}
503+
} else {
504+
log.Debugf("[SSE]隧道 %s 的订阅者 %s 推送成功", instanceID, clientID)
497505
}
498-
log.Infof("[SSE]隧道 %s 的订阅者 %s 推送成功", instanceID, clientID)
499506
}
500507
}
501508

0 commit comments

Comments
 (0)