Skip to content

Commit 3dc890e

Browse files
committed
Add parallel chunks, and reduce a lot of code duplication
1 parent 4f98163 commit 3dc890e

File tree

2 files changed

+177
-102
lines changed

2 files changed

+177
-102
lines changed

appender.go

Lines changed: 117 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@ type (
1818
s T
1919
}
2020

21-
RowAppenderSource = appenderSource[RowTableSource]
22-
ParallelRowAppenderSource = appenderSource[ParallelRowTableSource]
23-
ChunkAppenderSource = appenderSource[ChunkTableSource]
21+
RowAppenderSource = appenderSource[RowTableSource]
22+
ParallelRowAppenderSource = appenderSource[ParallelRowTableSource]
23+
ChunkAppenderSource = appenderSource[ChunkTableSource]
24+
ParallelChunkAppenderSource = appenderSource[ParallelChunkTableSource]
2425

2526
AppenderSource interface {
26-
RowAppenderSource | ParallelRowAppenderSource | ChunkAppenderSource
27+
RowAppenderSource | ParallelRowAppenderSource | ChunkAppenderSource | ParallelChunkAppenderSource
2728
}
2829

2930
// Appender holds the DuckDB appender. It allows efficient bulk loading into a DuckDB database.
@@ -266,7 +267,9 @@ func NewChunkAppenderSource(s ChunkTableSource) ChunkAppenderSource {
266267
}
267268

268269
// TODO: check if we need the schema, or if it can be null (default schema)
269-
func AppendTableSource[AST AppenderSource](driverConn driver.Conn, s AST, schema, table string) error {
270+
271+
// AppendTableSource appends a table source to an already existing table.
272+
func AppendTableSource[AS AppenderSource](driverConn driver.Conn, s AS, schema, table string) (ret error) {
270273
con, ok := driverConn.(*Conn)
271274
if !ok {
272275
return getError(errInvalidCon, nil)
@@ -294,7 +297,28 @@ func AppendTableSource[AST AppenderSource](driverConn driver.Conn, s AST, schema
294297
return getError(errAppenderCreation, err)
295298
}
296299

300+
defer func() {
301+
var errFlush, errClose error
302+
303+
state := C.duckdb_appender_flush(duckdbAppender)
304+
if state == C.DuckDBError {
305+
errFlush = duckdbError(C.duckdb_appender_error(duckdbAppender))
306+
}
307+
// Destroy all appender data and the appender.
308+
state = C.duckdb_appender_destroy(&duckdbAppender)
309+
if state == C.DuckDBError {
310+
errClose = errAppenderClose
311+
}
312+
313+
err := errors.Join(ret, errFlush, errClose)
314+
if err != nil {
315+
ret = getError(invalidatedAppenderError(err), nil)
316+
}
317+
}()
318+
297319
columnCount := int(C.duckdb_appender_column_count(duckdbAppender))
320+
321+
// projection is not used in chunks, so we must keep it a 1-1 mapping
298322
var projection = make([]int, columnCount)
299323
var ptr, types = mallocTypeSlice(columnCount)
300324
for i := 0; i < columnCount; i++ {
@@ -312,84 +336,55 @@ func AppendTableSource[AST AppenderSource](driverConn driver.Conn, s AST, schema
312336
}
313337
}
314338

315-
maxSize := C.idx_t(GetDataChunkCapacity())
339+
defer destroyTypeSlice(ptr, types)
340+
341+
lock := &sync.Mutex{}
316342
var x any = s
317343
switch s := x.(type) {
318344
case RowAppenderSource:
319-
for true {
320-
var chunk DataChunk
321-
chunk.initFromTypes(ptr, types, true)
345+
s.s.Init()
346+
err := appendRowThread(&parallelRowTSWrapper{s.s}, lock, ptr, types, duckdbAppender, projection)
347+
if err != nil {
348+
return err
349+
}
350+
case ParallelRowAppenderSource:
351+
wg := sync.WaitGroup{}
322352

323-
row := Row{
324-
chunk: &chunk,
325-
projection: projection,
326-
}
327-
var next bool
328-
var err error
329-
for row.r = 0; row.r < maxSize; row.r++ {
330-
next, err = s.s.FillRow(row)
353+
info := s.s.Init()
354+
threads := min(info.MaxThreads, runtime.NumCPU())
355+
var oerr error
356+
for range threads {
357+
wg.Add(1)
358+
go func() {
359+
err := appendRowThread(s.s, lock, ptr, types, duckdbAppender, projection)
331360
if err != nil {
332-
return err
361+
oerr = err
333362
}
334-
if !next {
335-
break
336-
}
337-
}
338-
339-
C.duckdb_data_chunk_set_size(chunk.data, row.r)
340-
state = C.duckdb_append_data_chunk(duckdbAppender, chunk.data)
341-
if state == C.DuckDBError {
342-
return duckdbError(C.duckdb_appender_error(duckdbAppender))
343-
}
344-
chunk.close()
345-
if !next {
346-
break
347-
}
363+
wg.Done()
364+
}()
348365
}
349-
case ParallelRowAppenderSource:
366+
wg.Wait()
367+
if oerr != nil {
368+
return oerr
369+
}
370+
case ChunkAppenderSource:
371+
s.s.Init()
372+
err := appendChunkThread(&parallelChunkTSWrapper{s.s}, lock, ptr, types, duckdbAppender)
373+
if err != nil {
374+
return err
375+
}
376+
case ParallelChunkAppenderSource:
350377
wg := sync.WaitGroup{}
351-
378+
352379
info := s.s.Init()
353380
threads := min(info.MaxThreads, runtime.NumCPU())
354-
lock := &sync.Mutex{}
355381
var oerr error
356382
for range threads {
357383
wg.Add(1)
358384
go func() {
359-
lstate := s.s.NewLocalState()
360-
for true {
361-
var chunk DataChunk
362-
chunk.initFromTypes(ptr, types, true)
363-
364-
row := Row{
365-
chunk: &chunk,
366-
projection: projection,
367-
}
368-
var next bool
369-
var err error
370-
for row.r = 0; row.r < maxSize; row.r++ {
371-
next, err = s.s.FillRow(lstate, row)
372-
if err != nil {
373-
oerr = err
374-
return
375-
}
376-
if !next {
377-
break
378-
}
379-
}
380-
381-
C.duckdb_data_chunk_set_size(chunk.data, row.r)
382-
lock.Lock()
383-
state = C.duckdb_append_data_chunk(duckdbAppender, chunk.data)
384-
lock.Unlock()
385-
if state == C.DuckDBError {
386-
oerr = duckdbError(C.duckdb_appender_error(duckdbAppender))
387-
return
388-
}
389-
chunk.close()
390-
if !next {
391-
break
392-
}
385+
err := appendChunkThread(s.s, lock, ptr, types, duckdbAppender)
386+
if err != nil {
387+
oerr = err
393388
}
394389
wg.Done()
395390
}()
@@ -398,48 +393,70 @@ func AppendTableSource[AST AppenderSource](driverConn driver.Conn, s AST, schema
398393
if oerr != nil {
399394
return oerr
400395
}
401-
case ChunkAppenderSource:
402-
for true {
403-
var chunk DataChunk
404-
chunk.initFromTypes(ptr, types, true)
396+
}
405397

406-
err := s.s.FillChunk(chunk)
398+
return nil
399+
}
400+
401+
func appendRowThread(s ParallelRowTableSource, lock *sync.Mutex, ptr unsafe.Pointer, types []C.duckdb_logical_type, duckdbAppender C.duckdb_appender, projection []int) error {
402+
maxSize := C.idx_t(GetDataChunkCapacity())
403+
lstate := s.NewLocalState()
404+
for {
405+
var chunk DataChunk
406+
chunk.initFromTypes(ptr, types, true)
407+
408+
row := Row{
409+
chunk: &chunk,
410+
projection: projection,
411+
}
412+
var next bool
413+
var err error
414+
for row.r = 0; row.r < maxSize; row.r++ {
415+
next, err = s.FillRow(lstate, row)
407416
if err != nil {
408417
return err
409418
}
410-
if chunk.GetSize() == 0 {
411-
chunk.close()
419+
if !next {
412420
break
413421
}
414-
state = C.duckdb_append_data_chunk(duckdbAppender, chunk.data)
415-
if state == C.DuckDBError {
416-
return duckdbError(C.duckdb_appender_error(duckdbAppender))
417-
}
418-
chunk.close()
419422
}
420-
}
421423

422-
// TODO: This should all be in a defer
423-
424-
// We flush before closing to get a meaningful error message.
425-
var errFlush error
426-
state = C.duckdb_appender_flush(duckdbAppender)
427-
if state == C.DuckDBError {
428-
errFlush = duckdbError(C.duckdb_appender_error(duckdbAppender))
424+
C.duckdb_data_chunk_set_size(chunk.data, row.r)
425+
lock.Lock()
426+
state := C.duckdb_append_data_chunk(duckdbAppender, chunk.data)
427+
lock.Unlock()
428+
if state == C.DuckDBError {
429+
return duckdbError(C.duckdb_appender_error(duckdbAppender))
430+
}
431+
chunk.close()
432+
if !next {
433+
break
434+
}
429435
}
436+
return nil
437+
}
430438

431-
// Destroy all appender data and the appender.
432-
destroyTypeSlice(ptr, types)
433-
var errClose error
434-
state = C.duckdb_appender_destroy(&duckdbAppender)
435-
if state == C.DuckDBError {
436-
errClose = errAppenderClose
437-
}
439+
func appendChunkThread(s ParallelChunkTableSource, lock *sync.Mutex, ptr unsafe.Pointer, types []C.duckdb_logical_type, duckdbAppender C.duckdb_appender) error {
440+
lstate := s.NewLocalState()
441+
for {
442+
var chunk DataChunk
443+
chunk.initFromTypes(ptr, types, true)
438444

439-
err := errors.Join(errFlush, errClose)
440-
if err != nil {
441-
return getError(invalidatedAppenderError(err), nil)
445+
err := s.FillChunk(lstate, chunk)
446+
if err != nil {
447+
return err
448+
}
449+
if chunk.GetSize() == 0 {
450+
chunk.close()
451+
break
452+
}
453+
lock.Lock()
454+
state := C.duckdb_append_data_chunk(duckdbAppender, chunk.data)
455+
lock.Unlock()
456+
if state == C.DuckDBError {
457+
return duckdbError(C.duckdb_appender_error(duckdbAppender))
458+
}
459+
chunk.close()
442460
}
443461
return nil
444-
445462
}

tableUDF.go

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,14 @@ type (
135135
FillChunk(any, DataChunk) error
136136
}
137137

138+
parallelChunkTSWrapper struct {
139+
s ChunkTableSource
140+
}
141+
142+
parallelRowTSWrapper struct {
143+
s RowTableSource
144+
}
145+
138146
// TableFunctionConfig contains any information passed to DuckDB when registering the table function.
139147
TableFunctionConfig struct {
140148
// The Arguments of the table function.
@@ -258,7 +266,7 @@ func udfBindTyped[T tableSource](info C.duckdb_bind_info) {
258266

259267
cardinality := instance.Cardinality()
260268
if cardinality != nil {
261-
C.duckdb_bind_set_cardinality(info, C.idx_t(cardinality.Cardinality), C.bool(cardinality.Exact))
269+
C.duckdb_bind_set_cardinality(info, C.idx_t(cardinality.Cardinality), C.bool(cardinality.Exact))
262270
}
263271

264272
pinnedInstanceData := pinnedValue[tableFunctionData]{
@@ -314,7 +322,7 @@ func table_udf_row_callback(info C.duckdb_function_info, output C.duckdb_data_ch
314322
chunk: &chunk,
315323
projection: instance.projection,
316324
}
317-
maxSize := C.idx_t(GetDataChunkCapacity())
325+
maxSize := C.idx_t(GetDataChunkCapacity())
318326

319327
switch fun := instance.fun.(type) {
320328
case RowTableSource:
@@ -482,3 +490,53 @@ func RegisterTableUDF[TFT TableFunction](c *sql.Conn, name string, f TFT) error
482490
})
483491
return err
484492
}
493+
494+
// ParallelChunk wrapper
495+
496+
func (s parallelChunkTSWrapper) ColumnInfos() []ColumnInfo {
497+
return s.s.ColumnInfos()
498+
}
499+
500+
func (s parallelChunkTSWrapper) Cardinality() *CardinalityInfo {
501+
return s.s.Cardinality()
502+
}
503+
504+
func (s parallelChunkTSWrapper) Init() ParallelTableSourceInfo {
505+
s.s.Init()
506+
return ParallelTableSourceInfo{
507+
MaxThreads: 1,
508+
}
509+
}
510+
511+
func (s parallelChunkTSWrapper) NewLocalState() any {
512+
return struct{}{}
513+
}
514+
515+
func (s parallelChunkTSWrapper) FillChunk(ls any, chunk DataChunk) error {
516+
return s.s.FillChunk(chunk)
517+
}
518+
519+
// ParallelRow wrapper
520+
521+
func (s parallelRowTSWrapper) ColumnInfos() []ColumnInfo {
522+
return s.s.ColumnInfos()
523+
}
524+
525+
func (s parallelRowTSWrapper) Cardinality() *CardinalityInfo {
526+
return s.s.Cardinality()
527+
}
528+
529+
func (s parallelRowTSWrapper) Init() ParallelTableSourceInfo {
530+
s.s.Init()
531+
return ParallelTableSourceInfo{
532+
MaxThreads: 1,
533+
}
534+
}
535+
536+
func (s parallelRowTSWrapper) NewLocalState() any {
537+
return struct{}{}
538+
}
539+
540+
func (s parallelRowTSWrapper) FillRow(ls any, chunk Row) (bool, error) {
541+
return s.s.FillRow(chunk)
542+
}

0 commit comments

Comments
 (0)