Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 252 additions & 17 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,43 @@ import "C"
import (
"database/sql/driver"
"errors"
"runtime"
"sync"
"unsafe"
)

// Appender holds the DuckDB appender. It allows efficient bulk loading into a DuckDB database.
type Appender struct {
con *Conn
schema string
table string
duckdbAppender C.duckdb_appender
closed bool

// The appender storage before flushing any data.
chunks []DataChunk
// The column types of the table to append to.
types []C.duckdb_logical_type
// A pointer to the allocated memory of the column types.
ptr unsafe.Pointer
// The number of appended rows.
rowCount int
}
type (
appenderSource[T any] struct {
s T
}

RowAppenderSource = appenderSource[RowTableSource]
ParallelRowAppenderSource = appenderSource[ParallelRowTableSource]
ChunkAppenderSource = appenderSource[ChunkTableSource]
ParallelChunkAppenderSource = appenderSource[ParallelChunkTableSource]

AppenderSource interface {
RowAppenderSource | ParallelRowAppenderSource | ChunkAppenderSource | ParallelChunkAppenderSource
}

// Appender holds the DuckDB appender. It allows efficient bulk loading into a DuckDB database.
Appender struct {
con *Conn
schema string
table string
duckdbAppender C.duckdb_appender
closed bool

// The appender storage before flushing any data.
chunks []DataChunk
// The column types of the table to append to.
types []C.duckdb_logical_type
// A pointer to the allocated memory of the column types.
ptr unsafe.Pointer
// The number of appended rows.
rowCount int
}
)

// NewAppenderFromConn returns a new Appender from a DuckDB driver connection.
func NewAppenderFromConn(driverConn driver.Conn, schema, table string) (*Appender, error) {
Expand Down Expand Up @@ -230,3 +247,221 @@ func destroyTypeSlice(ptr unsafe.Pointer, slice []C.duckdb_logical_type) {
}
C.duckdb_free(ptr)
}

func NewRowAppenderSource(s RowTableSource) RowAppenderSource {
return RowAppenderSource{
s: s,
}
}

func NewParallelRowAppenderSource(s ParallelRowTableSource) ParallelRowAppenderSource {
return ParallelRowAppenderSource{
s: s,
}
}

func NewChunkAppenderSource(s ChunkTableSource) ChunkAppenderSource {
return ChunkAppenderSource{
s: s,
}
}

// TODO: check if we need the schema, or if it can be null (default schema)

// AppendTableSource appends a table source to an already existing table.
func AppendTableSource[AS AppenderSource](driverConn driver.Conn, s AS, schema, table string) (ret error) {
con, ok := driverConn.(*Conn)
if !ok {
return getError(errInvalidCon, nil)
}
if con.closed {
return getError(errClosedCon, nil)
}

var cSchema *C.char
if schema != "" {
cSchema = C.CString(schema)
defer C.duckdb_free(unsafe.Pointer(cSchema))
}

cTable := C.CString(table)
defer C.duckdb_free(unsafe.Pointer(cTable))

var duckdbAppender C.duckdb_appender
state := C.duckdb_appender_create(con.duckdbCon, cSchema, cTable, &duckdbAppender)

if state == C.DuckDBError {
// We destroy the error message when destroying the appender.
err := duckdbError(C.duckdb_appender_error(duckdbAppender))
C.duckdb_appender_destroy(&duckdbAppender)
return getError(errAppenderCreation, err)
}

defer func() {
var errFlush, errClose error

state := C.duckdb_appender_flush(duckdbAppender)
if state == C.DuckDBError {
errFlush = duckdbError(C.duckdb_appender_error(duckdbAppender))
}
// Destroy all appender data and the appender.
state = C.duckdb_appender_destroy(&duckdbAppender)
if state == C.DuckDBError {
errClose = errAppenderClose
}

err := errors.Join(ret, errFlush, errClose)
if err != nil {
ret = getError(invalidatedAppenderError(err), nil)
}
}()

columnCount := int(C.duckdb_appender_column_count(duckdbAppender))

// projection is not used in chunks, so we must keep it a 1-1 mapping
var projection = make([]int, columnCount)
var ptr, types = mallocTypeSlice(columnCount)
for i := 0; i < columnCount; i++ {
projection[i] = i
types[i] = C.duckdb_appender_column_type(duckdbAppender, C.idx_t(i))

// Ensure that we only create an appender for supported column types.
t := Type(C.duckdb_get_type_id(types[i]))
name, found := unsupportedTypeToStringMap[t]
if found {
err := addIndexToError(unsupportedTypeError(name), i+1)
destroyTypeSlice(ptr, types)
C.duckdb_appender_destroy(&duckdbAppender)
return getError(errAppenderCreation, err)
}
}

defer destroyTypeSlice(ptr, types)

lock := &sync.Mutex{}
var x any = s
switch s := x.(type) {
case RowAppenderSource:
s.s.Init()
err := appendRowThread(&parallelRowTSWrapper{s.s}, lock, ptr, types, duckdbAppender, projection)
if err != nil {
return err
}
case ParallelRowAppenderSource:
wg := sync.WaitGroup{}

info := s.s.Init()
threads := min(info.MaxThreads, runtime.NumCPU())
var oerr error
for range threads {
wg.Add(1)
go func() {
err := appendRowThread(s.s, lock, ptr, types, duckdbAppender, projection)
if err != nil {
oerr = err
}
wg.Done()
}()
}
wg.Wait()
if oerr != nil {
return oerr
}
case ChunkAppenderSource:
s.s.Init()
err := appendChunkThread(&parallelChunkTSWrapper{s.s}, lock, ptr, types, duckdbAppender)
if err != nil {
return err
}
case ParallelChunkAppenderSource:
wg := sync.WaitGroup{}

info := s.s.Init()
threads := min(info.MaxThreads, runtime.NumCPU())
var oerr error
for range threads {
wg.Add(1)
go func() {
err := appendChunkThread(s.s, lock, ptr, types, duckdbAppender)
if err != nil {
oerr = err
}
wg.Done()
}()
}
wg.Wait()
if oerr != nil {
return oerr
}
}

return nil
}

func appendRowThread(s ParallelRowTableSource, lock *sync.Mutex, ptr unsafe.Pointer, types []C.duckdb_logical_type, duckdbAppender C.duckdb_appender, projection []int) error {
maxSize := C.idx_t(GetDataChunkCapacity())
lstate := s.NewLocalState()
for {
var chunk DataChunk
err := chunk.initFromTypes(ptr, types, true)
if err != nil {
return err
}

row := Row{
chunk: &chunk,
projection: projection,
}
var next bool
for row.r = 0; row.r < maxSize; row.r++ {
next, err = s.FillRow(lstate, row)
if err != nil {
return err
}
if !next {
break
}
}

C.duckdb_data_chunk_set_size(chunk.data, row.r)
lock.Lock()
state := C.duckdb_append_data_chunk(duckdbAppender, chunk.data)
lock.Unlock()
if state == C.DuckDBError {
return duckdbError(C.duckdb_appender_error(duckdbAppender))
}
chunk.close()
if !next {
break
}
}
return nil
}

func appendChunkThread(s ParallelChunkTableSource, lock *sync.Mutex, ptr unsafe.Pointer, types []C.duckdb_logical_type, duckdbAppender C.duckdb_appender) error {
lstate := s.NewLocalState()
for {
var chunk DataChunk
err := chunk.initFromTypes(ptr, types, true)
if err != nil {
return err
}

err = s.FillChunk(lstate, chunk)
if err != nil {
return err
}
if chunk.GetSize() == 0 {
chunk.close()
break
}
lock.Lock()
state := C.duckdb_append_data_chunk(duckdbAppender, chunk.data)
lock.Unlock()
if state == C.DuckDBError {
return duckdbError(C.duckdb_appender_error(duckdbAppender))
}
chunk.close()
}
return nil
}
Loading
Loading