-
-
Couldn't load subscription status.
- Fork 6.2k
Fix shutdown waitgroup panic #35676
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix shutdown waitgroup panic #35676
Changes from 7 commits
511a231
035aa77
774498b
e7c50a8
42dfd7e
bc710be
8956513
c289b61
7d4506e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,7 +11,6 @@ import ( | |
| "os" | ||
| "strings" | ||
| "sync" | ||
| "sync/atomic" | ||
| "syscall" | ||
| "time" | ||
|
|
||
|
|
@@ -30,12 +29,15 @@ type ServeFunction = func(net.Listener) error | |
|
|
||
| // Server represents our graceful server | ||
| type Server struct { | ||
| network string | ||
| address string | ||
| listener net.Listener | ||
| wg sync.WaitGroup | ||
| state state | ||
| lock *sync.RWMutex | ||
| network string | ||
| address string | ||
| listener net.Listener | ||
|
|
||
| lock sync.RWMutex | ||
| state state | ||
| connCounter int64 | ||
| connEmptyCond *sync.Cond | ||
|
|
||
| BeforeBegin func(network, address string) | ||
| OnShutdown func() | ||
| PerWriteTimeout time.Duration | ||
|
|
@@ -50,14 +52,13 @@ func NewServer(network, address, name string) *Server { | |
| log.Info("Starting new %s server: %s:%s on PID: %d", name, network, address, os.Getpid()) | ||
| } | ||
| srv := &Server{ | ||
| wg: sync.WaitGroup{}, | ||
| state: stateInit, | ||
| lock: &sync.RWMutex{}, | ||
| network: network, | ||
| address: address, | ||
| PerWriteTimeout: setting.PerWriteTimeout, | ||
| PerWritePerKbTimeout: setting.PerWritePerKbTimeout, | ||
| } | ||
| srv.connEmptyCond = sync.NewCond(&srv.lock) | ||
|
|
||
| srv.BeforeBegin = func(network, addr string) { | ||
| log.Debug("Starting server on %s:%s (PID: %d)", network, addr, syscall.Getpid()) | ||
|
|
@@ -154,7 +155,7 @@ func (srv *Server) Serve(serve ServeFunction) error { | |
| GetManager().RegisterServer() | ||
| err := serve(srv.listener) | ||
| log.Debug("Waiting for connections to finish... (PID: %d)", syscall.Getpid()) | ||
| srv.wg.Wait() | ||
| srv.waitForActiveConnections() | ||
| srv.setState(stateTerminate) | ||
| GetManager().ServerDone() | ||
| // use of closed means that the listeners are closed - i.e. we should be shutting down - return nil | ||
|
|
@@ -178,63 +179,87 @@ func (srv *Server) setState(st state) { | |
| srv.state = st | ||
| } | ||
|
|
||
| func (srv *Server) waitForActiveConnections() { | ||
| srv.lock.Lock() | ||
| for srv.connCounter > 0 { | ||
| srv.connEmptyCond.Wait() | ||
| } | ||
| srv.lock.Unlock() | ||
| } | ||
|
|
||
| func (srv *Server) wrapConnection(c net.Conn) (net.Conn, error) { | ||
| srv.lock.Lock() | ||
| defer srv.lock.Unlock() | ||
|
|
||
| if srv.state != stateRunning { | ||
| _ = c.Close() | ||
| return nil, syscall.EINVAL // same as AcceptTCP | ||
| } | ||
|
|
||
| srv.connCounter++ | ||
| return &wrappedConn{Conn: c, server: srv}, nil | ||
| } | ||
|
|
||
| func (srv *Server) removeConnection(_ *wrappedConn) { | ||
| srv.lock.Lock() | ||
| defer srv.lock.Unlock() | ||
|
|
||
| srv.connCounter-- | ||
| if srv.connCounter <= 0 { | ||
| srv.connEmptyCond.Broadcast() | ||
| } | ||
| } | ||
|
|
||
| // closeAllConnections forcefully closes all active connections | ||
| func (srv *Server) closeAllConnections() { | ||
| srv.lock.Lock() | ||
| if srv.connCounter > 0 { | ||
| log.Warn("After graceful shutdown period, %d connections are still active. Forcefully close.", srv.connCounter) | ||
| srv.connCounter = 0 // OS will close all the connections after the process exits, so we just assume there is no active connection now | ||
| } | ||
| srv.lock.Unlock() | ||
| srv.connEmptyCond.Broadcast() | ||
|
Comment on lines
+220
to
+221
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I’m not familiar with the sync functions, so this is just a question. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe AI didn't read the Golang doc: https://pkg.go.dev/sync#Cond.Broadcast There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think @yp05327 means it’s inconsistent because, on line 209, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
He means Does "not safe" mean "inconsistent" in your mind? |
||
| } | ||
|
|
||
| type filer interface { | ||
| File() (*os.File, error) | ||
| } | ||
|
|
||
| type wrappedListener struct { | ||
| net.Listener | ||
| stopped bool | ||
| server *Server | ||
| server *Server | ||
| } | ||
|
|
||
| var ( | ||
| _ net.Listener = (*wrappedListener)(nil) | ||
| _ filer = (*wrappedListener)(nil) | ||
| ) | ||
|
|
||
| func newWrappedListener(l net.Listener, srv *Server) *wrappedListener { | ||
| return &wrappedListener{ | ||
| Listener: l, | ||
| server: srv, | ||
| } | ||
| } | ||
|
|
||
| func (wl *wrappedListener) Accept() (net.Conn, error) { | ||
| var c net.Conn | ||
| // Set keepalive on TCPListeners connections. | ||
| func (wl *wrappedListener) Accept() (c net.Conn, err error) { | ||
| if tcl, ok := wl.Listener.(*net.TCPListener); ok { | ||
| // Set keepalive on TCPListeners connections if possible, see http.tcpKeepAliveListener | ||
| tc, err := tcl.AcceptTCP() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| _ = tc.SetKeepAlive(true) // see http.tcpKeepAliveListener | ||
| _ = tc.SetKeepAlivePeriod(3 * time.Minute) // see http.tcpKeepAliveListener | ||
| _ = tc.SetKeepAlive(true) | ||
| _ = tc.SetKeepAlivePeriod(3 * time.Minute) | ||
| c = tc | ||
| } else { | ||
| var err error | ||
| c, err = wl.Listener.Accept() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| } | ||
|
|
||
| closed := int32(0) | ||
|
|
||
| c = &wrappedConn{ | ||
| Conn: c, | ||
| server: wl.server, | ||
| closed: &closed, | ||
| perWriteTimeout: wl.server.PerWriteTimeout, | ||
| perWritePerKbTimeout: wl.server.PerWritePerKbTimeout, | ||
| } | ||
|
|
||
| wl.server.wg.Add(1) | ||
| return c, nil | ||
| } | ||
|
|
||
| func (wl *wrappedListener) Close() error { | ||
| if wl.stopped { | ||
| return syscall.EINVAL | ||
| } | ||
|
|
||
| wl.stopped = true | ||
| return wl.Listener.Close() | ||
| return wl.server.wrapConnection(c) | ||
| } | ||
|
|
||
| func (wl *wrappedListener) File() (*os.File, error) { | ||
|
|
@@ -244,17 +269,14 @@ func (wl *wrappedListener) File() (*os.File, error) { | |
|
|
||
| type wrappedConn struct { | ||
| net.Conn | ||
| server *Server | ||
| closed *int32 | ||
| deadline time.Time | ||
| perWriteTimeout time.Duration | ||
| perWritePerKbTimeout time.Duration | ||
| server *Server | ||
| deadline time.Time | ||
| } | ||
|
|
||
| func (w *wrappedConn) Write(p []byte) (n int, err error) { | ||
| if w.perWriteTimeout > 0 { | ||
| minTimeout := time.Duration(len(p)/1024) * w.perWritePerKbTimeout | ||
| minDeadline := time.Now().Add(minTimeout).Add(w.perWriteTimeout) | ||
| if w.server.PerWriteTimeout > 0 { | ||
| minTimeout := time.Duration(len(p)/1024) * w.server.PerWritePerKbTimeout | ||
| minDeadline := time.Now().Add(minTimeout).Add(w.server.PerWriteTimeout) | ||
|
|
||
| w.deadline = w.deadline.Add(minTimeout) | ||
| if minDeadline.After(w.deadline) { | ||
|
|
@@ -266,19 +288,6 @@ func (w *wrappedConn) Write(p []byte) (n int, err error) { | |
| } | ||
|
|
||
| func (w *wrappedConn) Close() error { | ||
| if atomic.CompareAndSwapInt32(w.closed, 0, 1) { | ||
| defer func() { | ||
| if err := recover(); err != nil { | ||
| select { | ||
| case <-GetManager().IsHammer(): | ||
| // Likely deadlocked request released at hammertime | ||
| log.Warn("Panic during connection close! %v. Likely there has been a deadlocked request which has been released by forced shutdown.", err) | ||
| default: | ||
| log.Error("Panic during connection close! %v", err) | ||
| } | ||
| } | ||
| }() | ||
| w.server.wg.Done() | ||
| } | ||
| w.server.removeConnection(w) | ||
| return w.Conn.Close() | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.