8
8
"time"
9
9
10
10
"github.com/jmoiron/sqlx"
11
+ _ "github.com/ClickHouse/clickhouse-go/v2"
11
12
_ "github.com/lib/pq"
12
13
"github.com/streamingfast/bstream"
13
14
"github.com/streamingfast/logging"
@@ -20,6 +21,7 @@ import (
20
21
"github.com/stretchr/testify/assert"
21
22
"github.com/stretchr/testify/require"
22
23
"github.com/testcontainers/testcontainers-go"
24
+ "github.com/testcontainers/testcontainers-go/modules/clickhouse"
23
25
"github.com/testcontainers/testcontainers-go/modules/postgres"
24
26
"github.com/testcontainers/testcontainers-go/wait"
25
27
"go.uber.org/zap"
@@ -628,6 +630,195 @@ func TestSinker_Integration_ParentChildOrdering(t *testing.T) {
628
630
)
629
631
}
630
632
633
+ func TestSinker_Integration_ClickHouse_SinglePrimaryKey (t * testing.T ) {
634
+ testTables := db2 .TestTables ("" , map [string ]* db2.TableInfo {
635
+ "xfer" : mustNewTableInfo ("" , "xfer" , []string {"id" }, map [string ]* db2.ColumnInfo {
636
+ "id" : db2 .NewColumnInfo ("id" , "String" , "" ),
637
+ "from" : db2 .NewColumnInfo ("from" , "String" , "" ),
638
+ "to" : db2 .NewColumnInfo ("to" , "String" , "" ),
639
+ }),
640
+ })
641
+
642
+ dbConnectionString , clickhouseContainer := setupClickhouseContainer (t , testTables , nil )
643
+
644
+ type XferSinglePKRow struct {
645
+ ID string `db:"id"`
646
+ From string `db:"from"`
647
+ To string `db:"to"`
648
+ }
649
+
650
+ tests := []struct {
651
+ name string
652
+ events []event
653
+ expectedQueryResponses []* XferSinglePKRow
654
+ expectedFinalCursor string
655
+ }{
656
+ {
657
+ "insert final" ,
658
+ []event {
659
+ newEvent (10 , 10 ,
660
+ insertRowSinglePK ("xfer" , "1234" , "from" , "sender1" , "to" , "receiver1" ),
661
+ ),
662
+ },
663
+ []* XferSinglePKRow {
664
+ {ID : "1234" , From : "sender1" , To : "receiver1" },
665
+ },
666
+ "Block #10 (10) - LIB #10 (10)" ,
667
+ },
668
+ }
669
+
670
+ for _ , test := range tests {
671
+ t .Run (test .name , func (t * testing.T ) {
672
+ runClickHouseSinkerTest (
673
+ t ,
674
+ testTables ,
675
+ dbConnectionString ,
676
+ clickhouseContainer ,
677
+ test .events ,
678
+ test .expectedQueryResponses ,
679
+ test .expectedFinalCursor ,
680
+ )
681
+ })
682
+ }
683
+ }
684
+
685
+ func TestSinker_Integration_RisingWave_SinglePrimaryKey (t * testing.T ) {
686
+ testTables := db2 .TestTables ("public" , map [string ]* db2.TableInfo {
687
+ "xfer" : mustNewTableInfo ("public" , "xfer" , []string {"id" }, map [string ]* db2.ColumnInfo {
688
+ "id" : db2 .NewColumnInfo ("id" , "text" , "" ),
689
+ "from" : db2 .NewColumnInfo ("from" , "text" , "" ),
690
+ "to" : db2 .NewColumnInfo ("to" , "text" , "" ),
691
+ }),
692
+ })
693
+
694
+ dbConnectionString , risingwaveContainer := setupRisingwaveContainer (t , testTables , nil )
695
+
696
+ type XferSinglePKRow struct {
697
+ ID string `db:"id"`
698
+ From string `db:"from"`
699
+ To string `db:"to"`
700
+ }
701
+
702
+ tests := []struct {
703
+ name string
704
+ events []event
705
+ expectedQueryResponses []* XferSinglePKRow
706
+ expectedFinalCursor string
707
+ }{
708
+ {
709
+ "insert final" ,
710
+ []event {
711
+ newEvent (10 , 10 ,
712
+ insertRowSinglePK ("xfer" , "1234" , "from" , "sender1" , "to" , "receiver1" ),
713
+ ),
714
+ },
715
+ []* XferSinglePKRow {
716
+ {ID : "1234" , From : "sender1" , To : "receiver1" },
717
+ },
718
+ "Block #10 (10) - LIB #10 (10)" ,
719
+ },
720
+ }
721
+
722
+ for _ , test := range tests {
723
+ t .Run (test .name , func (t * testing.T ) {
724
+ runRisingWaveSinkerTest (
725
+ t ,
726
+ testTables ,
727
+ dbConnectionString ,
728
+ risingwaveContainer ,
729
+ test .events ,
730
+ test .expectedQueryResponses ,
731
+ test .expectedFinalCursor ,
732
+ )
733
+ })
734
+ }
735
+ }
736
+
737
+ func runRisingWaveSinkerTest [R any ](
738
+ t * testing.T ,
739
+ tables map [string ]* db2.TableInfo ,
740
+ dbDSN string ,
741
+ risingwaveContainer testcontainers.Container ,
742
+ events []event ,
743
+ expectedQueryResponses []R ,
744
+ expectedLogLine string ,
745
+ ) {
746
+ t .Helper ()
747
+ ctx := context .Background ()
748
+
749
+ l := db2 .NewTestLoader (t , dbDSN , nil , tables , logger , tracer )
750
+
751
+ s , err := sink .New (sink .SubstreamsModeDevelopment , false , testPackage , testPackage .Modules .Modules [0 ], []byte ("unused" ), testClientConfig , logger , nil )
752
+ require .NoError (t , err )
753
+ sinker , _ := New (s , l , logger , nil )
754
+ t .Cleanup (func () { sinker .loader .Close () })
755
+
756
+ require .NoError (t , l .InsertCursor (ctx , sinker .OutputModuleHash (), sink .NewBlankCursor ()))
757
+
758
+ for _ , evt := range events {
759
+ err := sinker .HandleBlockScopedData (
760
+ ctx ,
761
+ blockScopedData ("db_out" , evt .tableChanges , evt .blockNum , evt .libNum ),
762
+ flushEveryBlock , sink .MustNewCursor (simpleCursor (evt .blockNum , evt .libNum )),
763
+ )
764
+ require .NoError (t , err )
765
+ }
766
+
767
+ dbx := sqlx .NewDb (l .DB , "postgres" )
768
+
769
+ var actualQueryResponses []R
770
+ readQuery := fmt .Sprintf (`SELECT * FROM "%s"."xfer" ORDER BY id` , l .GetDSN ().Schema ())
771
+
772
+ err = dbx .SelectContext (ctx , & actualQueryResponses , readQuery )
773
+ require .NoError (t , err )
774
+
775
+ assert .Equal (t , expectedQueryResponses , actualQueryResponses )
776
+ }
777
+
778
+ func runClickHouseSinkerTest [R any ](
779
+ t * testing.T ,
780
+ tables map [string ]* db2.TableInfo ,
781
+ dbDSN string ,
782
+ clickhouseContainer * clickhouse.ClickHouseContainer ,
783
+ events []event ,
784
+ expectedQueryResponses []R ,
785
+ expectedLogLine string ,
786
+ ) {
787
+ t .Helper ()
788
+ ctx := context .Background ()
789
+
790
+ // Note: ClickHouse containers don't have Restore method like PostgreSQL
791
+ // So we just use the container as-is
792
+
793
+ l := db2 .NewTestLoader (t , dbDSN , nil , tables , logger , tracer )
794
+
795
+ s , err := sink .New (sink .SubstreamsModeDevelopment , false , testPackage , testPackage .Modules .Modules [0 ], []byte ("unused" ), testClientConfig , logger , nil )
796
+ require .NoError (t , err )
797
+ sinker , _ := New (s , l , logger , nil )
798
+ t .Cleanup (func () { sinker .loader .Close () })
799
+
800
+ require .NoError (t , l .InsertCursor (ctx , sinker .OutputModuleHash (), sink .NewBlankCursor ()))
801
+
802
+ for _ , evt := range events {
803
+ err := sinker .HandleBlockScopedData (
804
+ ctx ,
805
+ blockScopedData ("db_out" , evt .tableChanges , evt .blockNum , evt .libNum ),
806
+ flushEveryBlock , sink .MustNewCursor (simpleCursor (evt .blockNum , evt .libNum )),
807
+ )
808
+ require .NoError (t , err )
809
+ }
810
+
811
+ dbx := sqlx .NewDb (l .DB , "clickhouse" )
812
+
813
+ var actualQueryResponses []R
814
+ readQuery := "SELECT * FROM xfer ORDER BY id"
815
+
816
+ err = dbx .SelectContext (ctx , & actualQueryResponses , readQuery )
817
+ require .NoError (t , err )
818
+
819
+ assert .Equal (t , expectedQueryResponses , actualQueryResponses )
820
+ }
821
+
631
822
func runSinkerTest [R any ](
632
823
t * testing.T ,
633
824
tables map [string ]* db2.TableInfo ,
@@ -779,6 +970,108 @@ func setupPostgresContainer(t *testing.T, testTables map[string]*db2.TableInfo,
779
970
return dbConnectionString + "&schemaName=testschema" , postgresContainer
780
971
}
781
972
973
+ // setupClickhouseContainer spins up a ClickHouse Docker container and initialize the database with the corresponding
974
+ // testTables. If the testTablesSQL is `nil`, it will generate the SQL from the testTables directly, otherwise
975
+ // it will use the provided SQL to set up the tables.
976
+ func setupClickhouseContainer (t * testing.T , testTables map [string ]* db2.TableInfo , testTablesSQL * string ) (dbConnectionString string , container * clickhouse.ClickHouseContainer ) {
977
+ t .Helper ()
978
+ ctx := context .Background ()
979
+
980
+ dbName := "testdb"
981
+ dbUser := "default"
982
+ dbPassword := ""
983
+
984
+ clickhouseContainer , err := clickhouse .Run (ctx ,
985
+ "clickhouse/clickhouse-server:23.9" ,
986
+ clickhouse .WithDatabase (dbName ),
987
+ clickhouse .WithUsername (dbUser ),
988
+ clickhouse .WithPassword (dbPassword ),
989
+ testcontainers .WithWaitStrategy (
990
+ wait .ForHTTP ("/ping" ).
991
+ WithPort ("8123/tcp" ).
992
+ WithStartupTimeout (60 * time .Second )),
993
+ )
994
+ testcontainers .CleanupContainer (t , clickhouseContainer )
995
+ require .NoError (t , err )
996
+
997
+ dbConnectionString , err = clickhouseContainer .ConnectionString (ctx )
998
+ require .NoError (t , err )
999
+
1000
+ l := db2 .NewTestLoader (
1001
+ t ,
1002
+ dbConnectionString ,
1003
+ nil ,
1004
+ testTables ,
1005
+ logger ,
1006
+ tracer ,
1007
+ )
1008
+
1009
+ if testTablesSQL == nil {
1010
+ testTablesSQL = ptr (db2 .GenerateCreateTableSQL (testTables ))
1011
+ }
1012
+
1013
+ err = l .Setup (context .Background (), "" , * testTablesSQL , false )
1014
+ require .NoError (t , err )
1015
+
1016
+ require .NoError (t , l .Close ())
1017
+
1018
+ return dbConnectionString , clickhouseContainer
1019
+ }
1020
+
1021
+ // setupRisingwaveContainer spins up a RisingWave Docker container and initialize the database with the corresponding
1022
+ // testTables. If the testTablesSQL is `nil`, it will generate the SQL from the testTables directly, otherwise
1023
+ // it will use the provided SQL to set up the tables.
1024
+ func setupRisingwaveContainer (t * testing.T , testTables map [string ]* db2.TableInfo , testTablesSQL * string ) (dbConnectionString string , container testcontainers.Container ) {
1025
+ t .Helper ()
1026
+ ctx := context .Background ()
1027
+
1028
+ dbName := "dev"
1029
+ dbUser := "root"
1030
+ dbPassword := ""
1031
+
1032
+ risingwaveContainer , err := testcontainers .GenericContainer (ctx , testcontainers.GenericContainerRequest {
1033
+ ContainerRequest : testcontainers.ContainerRequest {
1034
+ Image : "risingwavelabs/risingwave:latest" ,
1035
+ ExposedPorts : []string {"4566/tcp" , "5691/tcp" },
1036
+ Cmd : []string {"playground" },
1037
+ WaitingFor : wait .ForListeningPort ("4566/tcp" ).
1038
+ WithStartupTimeout (90 * time .Second ),
1039
+ },
1040
+ Started : true ,
1041
+ })
1042
+ testcontainers .CleanupContainer (t , risingwaveContainer )
1043
+ require .NoError (t , err )
1044
+
1045
+ host , err := risingwaveContainer .Host (ctx )
1046
+ require .NoError (t , err )
1047
+
1048
+ port , err := risingwaveContainer .MappedPort (ctx , "4566" )
1049
+ require .NoError (t , err )
1050
+
1051
+ dbConnectionString = fmt .Sprintf ("risingwave://%s:%s@%s:%s/%s?sslmode=disable" ,
1052
+ dbUser , dbPassword , host , port .Port (), dbName )
1053
+
1054
+ l := db2 .NewTestLoader (
1055
+ t ,
1056
+ dbConnectionString ,
1057
+ nil ,
1058
+ testTables ,
1059
+ logger ,
1060
+ tracer ,
1061
+ )
1062
+
1063
+ if testTablesSQL == nil {
1064
+ testTablesSQL = ptr (db2 .GenerateCreateTableSQL (testTables ))
1065
+ }
1066
+
1067
+ err = l .Setup (context .Background (), "public" , * testTablesSQL , false )
1068
+ require .NoError (t , err )
1069
+
1070
+ require .NoError (t , l .Close ())
1071
+
1072
+ return dbConnectionString , risingwaveContainer
1073
+ }
1074
+
782
1075
var T = true
783
1076
var flushEveryBlock = & T
784
1077
@@ -904,6 +1197,27 @@ func deleteRowMultiplePK(table string, pk map[string]string) *pbdatabase.TableCh
904
1197
}
905
1198
}
906
1199
1200
+ func updateRowSinglePK (table string , pk string , fieldsAndValues ... string ) * pbdatabase.TableChange {
1201
+ return & pbdatabase.TableChange {
1202
+ Table : table ,
1203
+ PrimaryKey : & pbdatabase.TableChange_Pk {
1204
+ Pk : pk ,
1205
+ },
1206
+ Operation : pbdatabase .TableChange_OPERATION_UPDATE ,
1207
+ Fields : getFields (fieldsAndValues ... ),
1208
+ }
1209
+ }
1210
+
1211
+ func deleteRowSinglePK (table string , pk string ) * pbdatabase.TableChange {
1212
+ return & pbdatabase.TableChange {
1213
+ Table : table ,
1214
+ PrimaryKey : & pbdatabase.TableChange_Pk {
1215
+ Pk : pk ,
1216
+ },
1217
+ Operation : pbdatabase .TableChange_OPERATION_DELETE ,
1218
+ }
1219
+ }
1220
+
907
1221
func blockScopedData (module string , changes []* pbdatabase.TableChange , blockNum uint64 , finalBlockNum uint64 ) * pbsubstreamsrpc.BlockScopedData {
908
1222
mapOutput , err := anypb .New (& pbdatabase.DatabaseChanges {
909
1223
TableChanges : changes ,
0 commit comments