Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added nodeID and address to operation and transport errors

## v3.99.4
* Fixed bug with wrong context on session closing
* Fixed goroutine leak on closing `database/sql` driver
Expand Down
4 changes: 2 additions & 2 deletions internal/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func invoke(
defer onTransportError(ctx, err)

if !useWrapping {
return opID, issues, err
return opID, issues, withConnInfo(err, nodeID, address)
}

if sentMark.canRetry() {
Expand Down Expand Up @@ -545,7 +545,7 @@ func (c *conn) NewStream(
}()

if !useWrapping {
return nil, err
return nil, withConnInfo(err, c.NodeID(), c.Address())
}

if sentMark.canRetry() {
Expand Down
4 changes: 4 additions & 0 deletions internal/conn/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ func IsBadConn(err error, goodConnCodes ...grpcCodes.Code) bool {

return true
}

func withConnInfo(err error, nodeID uint32, address string) error {
return xerrors.Transport(err, xerrors.WithNodeID(nodeID), xerrors.WithAddress(address))
}
18 changes: 18 additions & 0 deletions internal/conn/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,21 @@ func TestIsBadConn(t *testing.T) {
})
}
}

func TestWithConnInfo(t *testing.T) {
err := withConnInfo(grpcStatus.Error(grpcCodes.Unavailable, "test"), 100500, "example.com:2135")
require.ErrorIs(t, err, grpcStatus.Error(grpcCodes.Unavailable, "test"))
var nodeID interface {
NodeID() uint32
}
require.ErrorAs(t, err, &nodeID)
require.Equal(t, uint32(100500), nodeID.NodeID())
var address interface {
Address() string
}
require.ErrorAs(t, err, &address)
require.Equal(t, "example.com:2135", address.Address())
s, has := grpcStatus.FromError(err)
require.True(t, has)
require.Equal(t, grpcCodes.Unavailable, s.Code())
}
6 changes: 3 additions & 3 deletions internal/conn/grpc_client_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *grpcClientStream) CloseSend() (err error) {
}

if !s.wrapping {
return err
return withConnInfo(err, s.parentConn.NodeID(), s.parentConn.Address())
}

return xerrors.WithStackTrace(xerrors.Transport(
Expand Down Expand Up @@ -97,7 +97,7 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
}()

if !s.wrapping {
return err
return withConnInfo(err, s.parentConn.NodeID(), s.parentConn.Address())
}

if s.sentMark.canRetry() {
Expand Down Expand Up @@ -156,7 +156,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) { //nolint:funlen
}()

if !s.wrapping {
return err
return withConnInfo(err, s.parentConn.NodeID(), s.parentConn.Address())
}

if s.sentMark.canRetry() {
Expand Down
8 changes: 8 additions & 0 deletions internal/xerrors/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ func (e *operationError) Code() int32 {
return int32(e.code)
}

func (e *operationError) NodeID() uint32 {
return e.nodeID
}

func (e *operationError) Address() string {
return e.address
}

func (e *operationError) Name() string {
return "operation/" + e.code.String()
}
Expand Down
33 changes: 30 additions & 3 deletions internal/xerrors/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,30 @@ import (
"github.com/stretchr/testify/require"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
)

func TestIsOperationError(t *testing.T) {
for _, tt := range []struct {
name string
err error
codes []Ydb.StatusIds_StatusCode
match bool
}{
// check only operation error with any ydb status code
{
name: xtest.CurrentFileLine(),
err: &operationError{code: Ydb.StatusIds_BAD_REQUEST},
match: true,
},
{
name: xtest.CurrentFileLine(),
err: fmt.Errorf("wrapped: %w", &operationError{code: Ydb.StatusIds_BAD_REQUEST}),
match: true,
},
{
name: xtest.CurrentFileLine(),
err: Join(
fmt.Errorf("test"),
&operationError{code: Ydb.StatusIds_BAD_REQUEST},
Expand All @@ -34,16 +40,19 @@ func TestIsOperationError(t *testing.T) {
},
// match ydb status code
{
name: xtest.CurrentFileLine(),
err: &operationError{code: Ydb.StatusIds_BAD_REQUEST},
codes: []Ydb.StatusIds_StatusCode{Ydb.StatusIds_BAD_REQUEST},
match: true,
},
{
name: xtest.CurrentFileLine(),
err: fmt.Errorf("wrapped: %w", &operationError{code: Ydb.StatusIds_BAD_REQUEST}),
codes: []Ydb.StatusIds_StatusCode{Ydb.StatusIds_BAD_REQUEST},
match: true,
},
{
name: xtest.CurrentFileLine(),
err: Join(
fmt.Errorf("test"),
&operationError{code: Ydb.StatusIds_BAD_REQUEST},
Expand All @@ -54,16 +63,19 @@ func TestIsOperationError(t *testing.T) {
},
// no match ydb status code
{
name: xtest.CurrentFileLine(),
err: &operationError{code: Ydb.StatusIds_BAD_REQUEST},
codes: []Ydb.StatusIds_StatusCode{Ydb.StatusIds_ABORTED},
match: false,
},
{
name: xtest.CurrentFileLine(),
err: fmt.Errorf("wrapped: %w", &operationError{code: Ydb.StatusIds_BAD_REQUEST}),
codes: []Ydb.StatusIds_StatusCode{Ydb.StatusIds_ABORTED},
match: false,
},
{
name: xtest.CurrentFileLine(),
err: Join(
fmt.Errorf("test"),
&operationError{code: Ydb.StatusIds_BAD_REQUEST},
Expand All @@ -73,18 +85,20 @@ func TestIsOperationError(t *testing.T) {
match: false,
},
} {
t.Run("", func(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.match, IsOperationError(tt.err, tt.codes...))
})
}
}

func TestIsOperationErrorTransactionLocksInvalidated(t *testing.T) {
for _, tt := range [...]struct {
name string
err error
isTLI bool
}{
{
name: xtest.CurrentFileLine(),
err: Operation(
WithStatusCode(Ydb.StatusIds_ABORTED),
WithIssues([]*Ydb_Issue.IssueMessage{{
Expand All @@ -94,6 +108,7 @@ func TestIsOperationErrorTransactionLocksInvalidated(t *testing.T) {
isTLI: true,
},
{
name: xtest.CurrentFileLine(),
err: Operation(
WithStatusCode(Ydb.StatusIds_OVERLOADED),
WithIssues([]*Ydb_Issue.IssueMessage{{
Expand All @@ -103,12 +118,14 @@ func TestIsOperationErrorTransactionLocksInvalidated(t *testing.T) {
isTLI: false,
},
{
name: xtest.CurrentFileLine(),
err: Operation(
WithStatusCode(Ydb.StatusIds_ABORTED),
),
isTLI: false,
},
{
name: xtest.CurrentFileLine(),
err: Operation(
WithStatusCode(Ydb.StatusIds_ABORTED),
WithIssues([]*Ydb_Issue.IssueMessage{{
Expand All @@ -120,30 +137,40 @@ func TestIsOperationErrorTransactionLocksInvalidated(t *testing.T) {
isTLI: true,
},
} {
t.Run("", func(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.isTLI, IsOperationErrorTransactionLocksInvalidated(tt.err))
})
}
}

func Test_operationError_Error(t *testing.T) {
for _, tt := range []struct {
name string
err error
text string
}{
{
name: xtest.CurrentFileLine(),
err: Operation(WithStatusCode(Ydb.StatusIds_BAD_REQUEST), WithAddress("localhost")),
text: "operation/BAD_REQUEST (code = 400010, address = localhost)",
},
{
name: xtest.CurrentFileLine(),
err: Operation(WithStatusCode(Ydb.StatusIds_BAD_REQUEST), WithNodeID(100500)),
text: "operation/BAD_REQUEST (code = 400010, nodeID = 100500)",
},
{
name: xtest.CurrentFileLine(),
err: Operation(WithStatusCode(Ydb.StatusIds_BAD_REQUEST)),
text: "operation/BAD_REQUEST (code = 400010)",
},
{
name: xtest.CurrentFileLine(),
err: Operation(WithStatusCode(Ydb.StatusIds_BAD_SESSION)),
text: "operation/BAD_SESSION (code = 400100)",
},
{
name: xtest.CurrentFileLine(),
err: Operation(WithStatusCode(Ydb.StatusIds_PRECONDITION_FAILED), WithIssues([]*Ydb_Issue.IssueMessage{
{
Message: "issue one",
Expand Down Expand Up @@ -177,7 +204,7 @@ func Test_operationError_Error(t *testing.T) {
text: "operation/PRECONDITION_FAILED (code = 400120, issues = [{15:3 => #1 'issue one'},{#2 'issue two' [{test.yql:16:4 => #3 'issue three'},{#4 'issue four'}]}])", //nolint:lll
},
} {
t.Run("", func(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.text, tt.err.Error())
})
}
Expand Down
12 changes: 10 additions & 2 deletions internal/xerrors/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ func (e *transportError) GRPCStatus() *grpcStatus.Status {

func (e *transportError) isYdbError() {}

func (e *transportError) NodeID() uint32 {
return e.nodeID
}

func (e *transportError) Address() string {
return e.address
}

func (e *transportError) Code() int32 {
return int32(e.status.Code())
}
Expand Down Expand Up @@ -134,8 +142,8 @@ func IsTransportError(err error, codes ...grpcCodes.Code) bool {
var status *grpcStatus.Status
if t := (*transportError)(nil); errors.As(err, &t) {
status = t.status
} else if t, has := grpcStatus.FromError(err); has {
status = t
} else if s, has := grpcStatus.FromError(err); has {
status = s
}
if status != nil {
if len(codes) == 0 {
Expand Down
12 changes: 10 additions & 2 deletions internal/xerrors/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,19 +145,27 @@ func TestGrpcError(t *testing.T) {

func TestTransportErrorString(t *testing.T) {
for _, tt := range []struct {
name string
err error
text string
}{
{
name: xtest.CurrentFileLine(),
err: Transport(grpcStatus.Error(grpcCodes.FailedPrecondition, "")),
text: "transport/FailedPrecondition (code = 9, source error = \"rpc error: code = FailedPrecondition desc = \")",
},
{
name: xtest.CurrentFileLine(),
err: Transport(grpcStatus.Error(grpcCodes.Unavailable, ""), WithAddress("localhost:2135")),
text: "transport/Unavailable (code = 14, source error = \"rpc error: code = Unavailable desc = \", address: \"localhost:2135\")", //nolint:lll
},
{
name: xtest.CurrentFileLine(),
err: Transport(grpcStatus.Error(grpcCodes.Unavailable, ""), WithNodeID(100500)),
text: "transport/Unavailable (code = 14, source error = \"rpc error: code = Unavailable desc = \", nodeID = 100500)", //nolint:lll
},
} {
t.Run("", func(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.text, tt.err.Error())
})
}
Expand Down Expand Up @@ -189,7 +197,7 @@ func TestTransportErrorName(t *testing.T) {
name: "transport/Aborted",
},
} {
t.Run("", func(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
if tt.err == nil {
require.Nil(t, TransportError(tt.err)) //nolint:testifylint
} else {
Expand Down
Loading