Skip to content

Conversation

juanmardefago
Copy link

No description provided.

chriswessels and others added 24 commits July 22, 2025 23:01
* feat: implement risingwave sql dialect

* fix: migrate all pg types to risingwave types

* feat: dialect and driver selection working

* fix: risingwave create table

* feat: add id generation and migrate json methods

* chore: logs everywhere

* chore: logs everywhere

* chore: logs everywhere

* feat: working risingwave ingestion

* chore: remove debug logs

* chore: remove risingwave connection overrides
- Rebase RisingWave implementation onto upstream develop branch
- Adapt to new database/dialect/inserter architecture pattern
- Implement UseVersionField() and UseDeletedField() dialect methods
- Add pgInserter/pgFlusher interfaces for database operations
- Refactor inserters to use init pattern with separate construction/initialization
- Fix 'unexpected transaction status idle' error by avoiding explicit transactions
- RisingWave does not support read-write transactions per documentation
- Use autocommit mode for all operations as intended by RisingWave design
- Add comprehensive logging and documentation about transaction limitations
- Update README to clearly explain RisingWave's streaming-first architecture
…l conflict handling

- Remove 'ON CONFLICT (name) DO UPDATE SET' from INSERT statements
- Add 'ON CONFLICT OVERWRITE' to _cursor_ table definition
- RisingWave defines conflict behavior at table creation, not per-INSERT
- This resolves 'sql parser error: expected end of statement, found: ON'
- Remove legacy Insert(table, values, txWrapper) methods
- Remove legacy Flush(tx) methods
- Keep only clean modern interfaces: Insert(table, values) and flush(database)
- RisingWave now implements only the interfaces it needs without PostgreSQL baggage
- Makes codebase cleaner and forces use of modern patterns
- Replace database.tx.Exec() with database.execSql() in flush method
- Prevents nil pointer dereference since tx is nil in RisingWave autocommit mode
- Ensures consistent SQL execution through the autocommit wrapper
- Replace transaction-based block undo with direct SQL execution
- Use execSql helper instead of db.Begin() and tx.Exec()
- Eliminates 'unexpected transaction status idle' error in block reorg handling
- Maintains same functionality with RisingWave's streaming architecture
- Keep RETURNING clause support (verified in RisingWave release #7094)
- Document ON CONFLICT limitation: only supported in CREATE TABLE, not INSERT
- Add comment explaining UPSERT requires tables created with ON CONFLICT OVERWRITE
- JSONB functions (jsonb_populate_record, to_jsonb) are supported by RisingWave
…handling

- Remove unused EXCLUDED syntax building for UPSERT operations
- RisingWave handles conflicts via ON CONFLICT OVERWRITE at table creation
- Simple INSERT statements automatically overwrite on primary key conflicts
- Ensures complete reorg support equivalent to PostgreSQL
This implementation introduces a comprehensive semantic type annotation system
that enables dialect-specific SQL type mapping, with special support for
RisingWave's rw_int256 type for large blockchain integers.

Key Features:
- Semantic type annotations in protobuf schema (semantic_type, format_hint)
- RisingWave rw_int256 support for uint256/int256 semantic types
- Dialect-agnostic type mapping with graceful fallbacks
- Comprehensive value conversion with format validation
- Support for blockchain types (address, hash, signature, pubkey)
- Precision decimal types (decimal18, decimal6, decimal8, money)
- Time and JSON type optimizations

Implementation Details:
- Extended protobuf schema with semantic_type and format_hint fields
- Created semantic type registry with dialect-specific mappings
- Updated RisingWave dialect to support rw_int256 type casting
- Added comprehensive test coverage for all semantic types
- Implemented value conversion with validation and error handling

Benefits:
- Leverages RisingWave's native 256-bit integer arithmetic
- Optimized storage for blockchain addresses and hashes
- Proper decimal precision for DeFi token amounts
- Maintains compatibility across PostgreSQL, RisingWave, and ClickHouse
- Extensible design for future semantic types

Usage:
```protobuf
string value = 1 [(sf.substreams.sink.sql.schema.v1.field) = {
  semantic_type: "uint256",
  format_hint: "decimal"
}]; // → rw_int256 in RisingWave, NUMERIC in PostgreSQL

string address = 2 [(sf.substreams.sink.sql.schema.v1.field) = {
  semantic_type: "address"
}]; // → VARCHAR(42) with validation
```
…e mappings

- Remove decimal6, decimal8, decimal18, and money semantic types from all dialects
- Fix RisingWave types to use correct CHARACTER VARYING without length specs
- Clean up unused convertToDecimal functions
- Update documentation to reflect supported types only
- Ensure consistent semantic type support across PostgreSQL, ClickHouse, and RisingWave
- Remove stale decimal18/decimal6 examples from ClickHouse SQL output
- Update best practices to recommend uint256 instead of decimal types
- Ensure documentation is fully consistent with implementation
- Add ClickHouse testcontainer support with HTTP health checks
- Add RisingWave testcontainer support with fast port-based wait strategy
- Implement container setup functions for both databases
- Add integration test frameworks following PostgreSQL patterns
- Include helper functions for single PK update/delete operations
- RisingWave integration fully functional with 1s startup time
- ClickHouse integration ready, needs SQL generation improvements
@chriswessels chriswessels force-pushed the develop branch 2 times, most recently from 74781ea to fff66b6 Compare July 29, 2025 17:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants