Tweak one way stream ping (#19743)

Do not log errors on oneway streams when sending ping fails. Instead cancel the stream.

This also makes sure pings are sent when blocked on sending responses.

I will do a separate PR that includes this and adds pings to two-way streams as well as tests for pings.
This commit is contained in:
Klaus Post 2024-05-15 08:39:21 -07:00 committed by GitHub
parent 0e59e50b39
commit 6d3e0c7db6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -331,6 +331,7 @@ func (m *muxClient) handleOneWayStream(respHandler chan<- Response, respServer <
if !ok {
return
}
sendResp:
select {
case respHandler <- resp:
m.respMu.Lock()
@ -341,18 +342,49 @@ func (m *muxClient) handleOneWayStream(respHandler chan<- Response, respServer <
case <-m.ctx.Done():
// Client canceled. Don't block.
// Next loop will catch it.
case <-pingTimer:
if !m.doPing(respHandler) {
return
}
goto sendResp
}
case <-pingTimer:
if time.Since(time.Unix(atomic.LoadInt64(&m.LastPong), 0)) > clientPingInterval*2 {
m.addErrorNonBlockingClose(respHandler, ErrDisconnected)
if !m.doPing(respHandler) {
return
}
// Send new ping.
gridLogIf(m.ctx, m.send(message{Op: OpPing, MuxID: m.MuxID}))
}
}
}
// doPing checks last ping time and sends another ping.
func (m *muxClient) doPing(respHandler chan<- Response) (ok bool) {
m.respMu.Lock()
if m.closed {
m.respMu.Unlock()
// Already closed. This is not an error state;
// we may just be delivering the last responses.
return true
}
// Only check ping when not closed.
if got := time.Since(time.Unix(atomic.LoadInt64(&m.LastPong), 0)); got > clientPingInterval*2 {
m.respMu.Unlock()
if debugPrint {
fmt.Printf("Mux %d: last pong %v ago, disconnecting\n", m.MuxID, got)
}
m.addErrorNonBlockingClose(respHandler, ErrDisconnected)
return false
}
// Send new ping
err := m.sendLocked(message{Op: OpPing, MuxID: m.MuxID})
m.respMu.Unlock()
if err != nil {
m.addErrorNonBlockingClose(respHandler, err)
}
return err == nil
}
// responseCh is the channel to that goes to the requester.
// internalResp is the channel that comes from the server.
func (m *muxClient) handleTwowayResponses(responseCh chan<- Response, internalResp <-chan Response) {