-
Notifications
You must be signed in to change notification settings - Fork 102
Experimental support for arrow
result format
#1860
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 81 commits
47a8379
cde72ea
5bd2579
4eec9a9
8cd09b5
1030dff
d726bbd
eeeda8b
3595ee5
ee3e034
12770a6
055b6f7
84476c8
653842a
18668c3
ad65ffc
8cbb3c3
5d1e6c8
58264e6
2cd2d50
eec15fc
240d6fa
035c982
05cc008
d6253f7
80fae6d
9d6a3bf
49e8086
bb79a18
62c12cb
39edca4
730cf73
96d76cf
03161b1
7e79256
82ca6cd
60b178f
5b3a6e6
a7c9ff9
7a17f1c
0de804e
d91d98c
97ef5a2
6e71dd9
5b4d295
b4c09db
6fb91dd
a3fbaad
7059ac0
be1d200
8960154
132e5fe
fd58878
638bf29
a526ac0
7aa3daf
4e7bce0
1912f04
f8daa75
332ca1e
9e56152
6372c5d
d0666d8
c0b46ab
128e55e
a604ef3
41460e9
70f08cb
f37e04a
3c59b6c
e115aea
70854a0
89eb446
c6a4126
29b6dc1
cbc1035
8803cf5
f88f484
d8244e7
011496d
6380fd2
da4eba9
0202478
12a2e04
ccc9ba4
a17b9a4
2d4129b
b36d432
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
// This example shows how you can get results in Apache Arrow format. | ||
// | ||
// Before starting add apache arrow IPC package: | ||
// | ||
// go get github.com/apache/arrow/go/arrow | ||
// | ||
// Currently (2025-09-11), Apache Arrow supported in the `main` YDB branch and | ||
// enabled by feature flag `EnableArrowResultSetFormat`. | ||
package main | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/apache/arrow/go/arrow/ipc" | ||
"github.com/ydb-platform/ydb-go-sdk/v3" | ||
"github.com/ydb-platform/ydb-go-sdk/v3/query" | ||
) | ||
|
||
func main() { | ||
ctx := context.TODO() | ||
db, err := ydb.Open(ctx, "grpc://localhost:2136/local") | ||
if err != nil { | ||
panic(err) | ||
} | ||
defer db.Close(ctx) // cleanup resources | ||
|
||
sql := `SELECT 42 as id, "my string" as myStr; | ||
SELECT 24 as id, "WOW" as myStr, "UHH" as secondStr;` | ||
|
||
result, err := db.Query().QueryArrow(ctx, sql, query.WithIdempotent()) | ||
if err != nil { | ||
panic(err) | ||
} | ||
defer result.Close(ctx) | ||
|
||
for part, err := range result.Parts(ctx) { | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
fmt.Printf("ResultSet#%d ", part.GetResultSetIndex()) | ||
|
||
// or you can use `part.Bytes()` instead of [io.Reader] interface | ||
|
||
rdr, err := ipc.NewReader(part) // part already implements io.Reader | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
for rdr.Next() { | ||
out := rdr.Record() | ||
fmt.Println(out) | ||
} | ||
} | ||
|
||
// Output: | ||
// ResultSet#0 record: | ||
// | ||
// schema: | ||
// fields: 2 | ||
// - id: type=int32 | ||
// - myStr: type=binary | ||
// rows: 1 | ||
// col[0][id]: [42] | ||
// col[1][myStr]: ["my string"] | ||
// | ||
// ResultSet#1 record: | ||
// | ||
// schema: | ||
// fields: 3 | ||
// - id: type=int32 | ||
// - myStr: type=binary | ||
// - secondStr: type=binary | ||
// rows: 1 | ||
// col[0][id]: [24] | ||
// col[1][myStr]: ["WOW"] | ||
// col[2][secondStr]: ["UHH"] | ||
} |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
package query | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"io" | ||
|
||
"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1" | ||
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb" | ||
|
||
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/arrow" | ||
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" | ||
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" | ||
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" | ||
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter" | ||
) | ||
|
||
type ( | ||
arrowResult struct { | ||
stream Ydb_Query_V1.QueryService_ExecuteQueryClient | ||
resultSetIndex int64 | ||
close context.CancelFunc | ||
} | ||
|
||
arrowPart struct { | ||
resultSetIndex int64 | ||
reader io.Reader | ||
data []byte | ||
} | ||
) | ||
|
||
func (c *Client) QueryArrow(ctx context.Context, q string, opts ...options.Execute) (r arrow.Result, err error) { | ||
|
||
ctx, cancel := xcontext.WithDone(ctx, c.done) | ||
defer cancel() | ||
|
||
r, err = arrowQuery(ctx, c.pool(), q, opts...) | ||
if err != nil { | ||
return nil, xerrors.WithStackTrace(err) | ||
} | ||
|
||
return r, nil | ||
} | ||
|
||
func arrowQuery(ctx context.Context, pool sessionPool, q string, opts ...options.Execute) ( | ||
r arrow.Result, err error, | ||
) { | ||
settings := options.ExecuteSettings(opts...) | ||
err = do(ctx, pool, func(ctx context.Context, s *Session) (err error) { | ||
r, err = s.executeArrow(ctx, q, options.ExecuteSettings(opts...)) | ||
if err != nil { | ||
return xerrors.WithStackTrace(err) | ||
} | ||
|
||
return nil | ||
}, settings.RetryOpts()...) | ||
if err != nil { | ||
return nil, xerrors.WithStackTrace(err) | ||
} | ||
|
||
return r, nil | ||
} | ||
|
||
func (s *Session) executeArrow( | ||
|
||
ctx context.Context, q string, settings executeSettings, | ||
) (_ arrow.Result, finalErr error) { | ||
ctx, cancel := xcontext.WithDone(ctx, s.Done()) | ||
defer func() { | ||
if finalErr != nil { | ||
cancel() | ||
applyStatusByError(s, finalErr) | ||
} | ||
}() | ||
|
||
request, callOptions, err := executeQueryRequest(s.ID(), q, settings) | ||
if err != nil { | ||
return nil, xerrors.WithStackTrace(err) | ||
} | ||
|
||
request.ResultSetFormat = Ydb.ResultSet_FORMAT_ARROW | ||
|
||
executeCtx, executeCancel := xcontext.WithCancel(xcontext.ValueOnly(ctx)) | ||
defer func() { | ||
if finalErr != nil { | ||
executeCancel() | ||
} | ||
}() | ||
|
||
stream, err := s.client.ExecuteQuery(executeCtx, request, callOptions...) | ||
if err != nil { | ||
return nil, xerrors.WithStackTrace(err) | ||
} | ||
|
||
return &arrowResult{stream: stream, close: executeCancel}, nil | ||
} | ||
|
||
func (r *arrowResult) Parts(ctx context.Context) xiter.Seq2[arrow.Part, error] { | ||
return func(yield func(arrow.Part, error) bool) { | ||
for { | ||
part, err := r.nextPart(ctx) | ||
if err != nil { | ||
if xerrors.Is(err, io.EOF) { | ||
return | ||
} | ||
} | ||
cont := yield(part, err) | ||
if !cont || err != nil { | ||
return | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (r *arrowResult) nextPart(ctx context.Context) (arrow.Part, error) { | ||
if ctx.Err() != nil { | ||
return nil, xerrors.WithStackTrace(ctx.Err()) | ||
} | ||
|
||
part, err := r.stream.Recv() | ||
if err != nil { | ||
return nil, xerrors.WithStackTrace(err) | ||
} | ||
|
||
if part.GetResultSetIndex() <= 0 && r.resultSetIndex > 0 { | ||
return nil, xerrors.WithStackTrace(io.EOF) | ||
} | ||
r.resultSetIndex = part.GetResultSetIndex() | ||
|
||
schema := part.GetResultSet().GetArrowFormatMeta().GetSchema() | ||
|
||
// Apache Arrow ipc.Reader expects schema and data to be concatenated | ||
data := append(schema, part.GetResultSet().GetData()...) | ||
|
||
rdr := bytes.NewReader(data) | ||
|
||
return &arrowPart{reader: rdr, data: data, resultSetIndex: part.GetResultSetIndex()}, nil | ||
} | ||
|
||
func (r *arrowResult) Close(ctx context.Context) error { | ||
r.close() | ||
|
||
return nil | ||
} | ||
|
||
func (p *arrowPart) Bytes() []byte { | ||
return p.data | ||
} | ||
|
||
func (p *arrowPart) Read(buf []byte) (n int, err error) { | ||
return p.reader.Read(buf) | ||
} | ||
|
||
func (p *arrowPart) GetResultSetIndex() int64 { | ||
return p.resultSetIndex | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package arrow | ||
|
||
import ( | ||
"context" | ||
"io" | ||
|
||
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer" | ||
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter" | ||
) | ||
|
||
type ( | ||
Result interface { | ||
closer.Closer | ||
|
||
// Parts is the range iterator for result parts (parts implement [io.Reader]) | ||
Parts(ctx context.Context) xiter.Seq2[Part, error] | ||
} | ||
Part interface { | ||
io.Reader | ||
|
||
// Bytes returns part as bytes | ||
Bytes() []byte | ||
|
||
// GetResultSetIndex returns result set index | ||
GetResultSetIndex() int64 | ||
} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo -> background