Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 190 additions & 15 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,42 @@ package duckdb
import (
"database/sql/driver"
"errors"
"runtime"
"sync"

"github.com/marcboeker/go-duckdb/mapping"
)

// Appender holds the DuckDB appender. It allows efficient bulk loading into a DuckDB database.
type Appender struct {
conn *Conn
schema string
table string
appender mapping.Appender
closed bool

// The chunk to append to.
chunk DataChunk
// The column types of the table to append to.
types []mapping.LogicalType
// The number of appended rows.
rowCount int
}
type (
appenderSource[T any] struct {
Source T
}

rowAppenderSource = appenderSource[RowTableSource]
parallelRowAppenderSource = appenderSource[ParallelRowTableSource]
chunkAppenderSource = appenderSource[ChunkTableSource]
parallelChunkAppenderSource = appenderSource[ParallelChunkTableSource]

AppenderSource interface {
_secret()
}

// Appender holds the DuckDB appender. It allows efficient bulk loading into a DuckDB database.
Appender struct {
conn *Conn
schema string
table string
appender mapping.Appender
closed bool

// The chunk to append to.
chunk DataChunk
// The column types of the table to append to.
types []mapping.LogicalType
// The number of appended rows.
rowCount int
}
)

// NewAppenderFromConn returns a new Appender for the default catalog from a DuckDB driver connection.
func NewAppenderFromConn(driverConn driver.Conn, schema, table string) (*Appender, error) {
Expand Down Expand Up @@ -145,6 +162,72 @@ func (a *Appender) AppendRow(args ...driver.Value) error {
return nil
}

func (a *Appender)AppendTableSource(s AppenderSource) error {
lock := &sync.Mutex{}
// projection is not used in chunk, so we must keep it a 1-1 mapping
columnCount := mapping.AppenderColumnCount(a.appender)
projection := make([]int, 0, columnCount)
for i := mapping.IdxT(0); i < columnCount; i++ {
projection = append(projection, int(i))
}
var x any = s
switch s := x.(type) {
case rowAppenderSource:
s.Source.Init()
err := appenderRowThread(&parallelRowTSWrapper{s.Source}, lock, a.types, a.appender, projection)
if err != nil {
return err
}
case parallelRowAppenderSource:
wg := sync.WaitGroup{}

info := s.Source.Init()
threads := min(info.MaxThreads, runtime.GOMAXPROCS(-1))
var oerr error
for range threads {
wg.Add(1)
go func() {
err := appenderRowThread(s.Source, lock, a.types, a.appender, projection)
if err != nil {
oerr = err
}
wg.Done()
}()
}
wg.Wait()
if oerr != nil {
return oerr
}
case chunkAppenderSource:
s.Source.Init()
err := appenderChunkThread(&parallelChunkTSWrapper{s.Source}, lock, a.types, a.appender)
if err != nil {
return err
}
case parallelChunkAppenderSource:
wg := sync.WaitGroup{}

info := s.Source.Init()
threads := min(info.MaxThreads, runtime.GOMAXPROCS(-1))
var oerr error
for range threads {
wg.Add(1)
go func() {
err := appenderChunkThread(s.Source, lock, a.types, a.appender)
if err != nil {
oerr = err
}
wg.Done()
}()
}
wg.Wait()
if oerr != nil {
return oerr
}
}
return nil
}

func (a *Appender) appendRowSlice(args []driver.Value) error {
// Early-out, if the number of args does not match the column count.
if len(args) != len(a.types) {
Expand Down Expand Up @@ -193,3 +276,95 @@ func destroyTypeSlice(slice []mapping.LogicalType) {
mapping.DestroyLogicalType(&t)
}
}

func appenderRowThread(s ParallelRowTableSource, lock *sync.Mutex, types []mapping.LogicalType, duckdbAppender mapping.Appender, projection []int) error {
maxSize := GetDataChunkCapacity()
lstate := s.NewLocalState()
var chunk DataChunk
err := chunk.initFromTypes(types, true)
if err != nil {
return err
}

for {
row := Row{
chunk: &chunk,
projection: projection,
}
var next bool
for row.r = 0; row.r < mapping.IdxT(maxSize); row.r++ {
next, err = s.FillRow(lstate, row)
if err != nil {
chunk.close()
return err
}
if !next {
break
}
}

mapping.DataChunkSetSize(chunk.chunk, row.r)

lock.Lock()
state := mapping.AppendDataChunk(duckdbAppender, chunk.chunk)
if state == mapping.StateError {
return getDuckDBError(mapping.AppenderError(duckdbAppender))
}
lock.Unlock()
if !next {
break
}
chunk.reset(true)
}
chunk.close()
return nil
}

func appenderChunkThread(s ParallelChunkTableSource, lock *sync.Mutex, types []mapping.LogicalType, duckdbAppender mapping.Appender) error {
lstate := s.NewLocalState()
var chunk DataChunk
err := chunk.initFromTypes(types, true)
if err != nil {
return err
}

for {
err = s.FillChunk(lstate, chunk)
if err != nil {
return err
}

if chunk.GetSize() == 0 {
chunk.close()
break
}

lock.Lock()
state := mapping.AppendDataChunk(duckdbAppender, chunk.chunk)
if state == mapping.StateError {
return getDuckDBError(mapping.AppenderError(duckdbAppender))
}
lock.Unlock()
chunk.reset(true)
}
chunk.close()
return nil
}

func (a appenderSource[T]) _secret() {}

func NewAppenderRowSource(source RowTableSource) AppenderSource {
return rowAppenderSource{Source: source}
}

func NewAppenderParallelRowSource(source ParallelRowTableSource) AppenderSource {
return parallelRowAppenderSource{Source: source}
}

func NewAppenderChunkSource(source ChunkTableSource) AppenderSource {
return chunkAppenderSource{Source: source}
}

func NewAppenderParallelChunkSource(source ParallelChunkTableSource) AppenderSource {
return parallelChunkAppenderSource{Source: source}
}
Loading
Loading