mirror of
https://github.com/etcd-io/etcd.git
synced 2026-02-05 06:46:49 +01:00
Merge pull request #21122 from zhijun42/fix-lease-keep-alive-unavailable
lease: Fix incorrect gRPC Unavailable on client cancel during LeaseKeepAlive forwarding
This commit is contained in:
@@ -22,7 +22,6 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver"
|
||||
"go.etcd.io/etcd/server/v3/lease"
|
||||
)
|
||||
@@ -98,11 +97,12 @@ func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) (err
|
||||
select {
|
||||
case err = <-errc:
|
||||
case <-stream.Context().Done():
|
||||
// the only server-side cancellation is noleader for now.
|
||||
// We end up here due to:
|
||||
// 1. Client cancellation
|
||||
// 2. Server cancellation: the client ctx is wrapped with WithRequireLeader,
|
||||
// monitorLeader() detects no leader and thus cancels this stream with ErrGRPCNoLeader.
|
||||
// 3. Server cancellation: the server is shutting down.
|
||||
err = stream.Context().Err()
|
||||
if errors.Is(err, context.Canceled) {
|
||||
err = rpctypes.ErrGRPCNoLeader
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -51,6 +51,7 @@ var toGRPCErrorMap = map[error]error{
|
||||
errors.ErrNoLeader: rpctypes.ErrGRPCNoLeader,
|
||||
errors.ErrNotLeader: rpctypes.ErrGRPCNotLeader,
|
||||
errors.ErrLeaderChanged: rpctypes.ErrGRPCLeaderChanged,
|
||||
errors.ErrCanceled: rpctypes.ErrGRPCCanceled,
|
||||
errors.ErrStopped: rpctypes.ErrGRPCStopped,
|
||||
errors.ErrTimeout: rpctypes.ErrGRPCTimeout,
|
||||
errors.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail,
|
||||
|
||||
@@ -404,10 +404,16 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
|
||||
if errorspkg.Is(cctx.Err(), context.DeadlineExceeded) {
|
||||
err := cctx.Err()
|
||||
switch {
|
||||
case errorspkg.Is(err, context.DeadlineExceeded):
|
||||
return -1, errors.ErrTimeout
|
||||
case errorspkg.Is(err, context.Canceled):
|
||||
return -1, errors.ErrCanceled
|
||||
default:
|
||||
s.Logger().Warn("Unexpected lease renew context error", zap.Error(err))
|
||||
return -1, errors.ErrCanceled
|
||||
}
|
||||
return -1, errors.ErrCanceled
|
||||
}
|
||||
|
||||
func (s *EtcdServer) checkLeaseTimeToLive(ctx context.Context, leaseID lease.LeaseID) (uint64, error) {
|
||||
|
||||
@@ -172,8 +172,8 @@ func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundT
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
req = req.WithContext(ctx)
|
||||
req.Header.Set("Content-Type", "application/protobuf")
|
||||
req.Cancel = ctx.Done() //nolint:staticcheck // TODO: remove for a supported version
|
||||
|
||||
resp, err := cc.Do(req)
|
||||
if err != nil {
|
||||
|
||||
@@ -341,7 +341,6 @@ func TestV3LeaseKeepAliveForwardingCatchError(t *testing.T) {
|
||||
require.Positive(t, resp.TTL)
|
||||
})
|
||||
|
||||
// Shows current behavior: client cancel during forwarding incorrectly returns Unavailable.
|
||||
t.Run("client cancels while forwarding", func(t *testing.T) {
|
||||
integration.SkipIfNoGoFail(t)
|
||||
leader, follower, _ := setupLeaseForwardingCluster(t)
|
||||
@@ -371,15 +370,14 @@ func TestV3LeaseKeepAliveForwardingCatchError(t *testing.T) {
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
cancel()
|
||||
|
||||
// Client sees Canceled (gRPC returns this immediately after cancel()),
|
||||
// but server actually generated Unavailable (verified by metrics below).
|
||||
// Client sees Canceled (gRPC returns this immediately after cancel())
|
||||
_, err = keepAliveClient.Recv()
|
||||
require.Equal(t, codes.Canceled, status.Code(err))
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return getLeaseKeepAliveMetric(t, follower, "Unavailable") == prevUnavailableCount+1
|
||||
return getLeaseKeepAliveMetric(t, follower, "Canceled") == prevCanceledCount+1
|
||||
}, 3*time.Second, 100*time.Millisecond)
|
||||
require.Equal(t, prevCanceledCount, getLeaseKeepAliveMetric(t, follower, "Canceled"))
|
||||
require.Equal(t, prevUnavailableCount, getLeaseKeepAliveMetric(t, follower, "Unavailable"))
|
||||
})
|
||||
|
||||
t.Run("forwarding times out", func(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user