diff --git a/.circleci/continue_config.yml b/.circleci/continue_config.yml index e21f3d869b..2d05f783fb 100644 --- a/.circleci/continue_config.yml +++ b/.circleci/continue_config.yml @@ -285,6 +285,7 @@ workflows: matrix: parameters: engine: + - doris - duckdb - postgres - mysql diff --git a/.circleci/wait-for-db.sh b/.circleci/wait-for-db.sh index a313320279..d039400531 100755 --- a/.circleci/wait-for-db.sh +++ b/.circleci/wait-for-db.sh @@ -34,6 +34,34 @@ clickhouse_ready() { probe_port 8123 } +doris_ready() { + probe_port 9030 + + echo "Checking for 3 alive Doris backends..." + sleep 15 + + while true; do + echo "Checking Doris backends..." + ALIVE_BACKENDS=$(docker exec -i doris-fe-01 mysql -h127.0.0.1 -P9030 -uroot -e "show backends \G" | grep -c "^ *Alive: true$") + + # fallback value if failed to get number + if ! [[ "$ALIVE_BACKENDS" =~ ^[0-9]+$ ]]; then + echo "WARN: Unable to parse number of alive backends, got: '$ALIVE_BACKENDS'" + ALIVE_BACKENDS=0 + fi + + echo "Found $ALIVE_BACKENDS alive backends" + + if [ "$ALIVE_BACKENDS" -ge 3 ]; then + echo "Doris has 3 or more alive backends" + break + fi + + echo "Waiting for more backends to become alive..." + sleep 5 + done +} + postgres_ready() { probe_port 5432 } diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 68d856c589..794a599869 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -6,7 +6,7 @@ build: python: "3.10" jobs: pre_build: - - pip install -e ".[athena,azuresql,bigframes,bigquery,clickhouse,databricks,dbt,dlt,gcppostgres,github,llm,mssql,mysql,mwaa,postgres,redshift,slack,snowflake,trino,web,risingwave]" + - pip install -e ".[athena,azuresql,bigframes,bigquery,clickhouse,databricks,dbt,dlt,doris,gcppostgres,github,llm,mssql,mysql,mwaa,postgres,redshift,slack,snowflake,trino,web,risingwave]" - make api-docs mkdocs: diff --git a/Makefile b/Makefile index 96305c4bfb..437fc6d382 100644 --- a/Makefile +++ b/Makefile @@ -196,6 +196,9 @@ trino-test: engine-trino-up risingwave-test: engine-risingwave-up pytest -n auto -m "risingwave" --reruns 3 --junitxml=test-results/junit-risingwave.xml +doris-test: engine-doris-up + pytest -n auto -m "doris" --reruns 3 --junitxml=test-results/junit-doris.xml + ################# # Cloud Engines # ################# diff --git a/docs/concepts/models/overview.md b/docs/concepts/models/overview.md index d6356462b4..9ad73ddc1e 100644 --- a/docs/concepts/models/overview.md +++ b/docs/concepts/models/overview.md @@ -181,6 +181,7 @@ This table lists each engine's support for `TABLE` and `VIEW` object comments: | BigQuery | Y | Y | | ClickHouse | Y | Y | | Databricks | Y | Y | +| Doris | Y | Y | | DuckDB <=0.9 | N | N | | DuckDB >=0.10 | Y | Y | | MySQL | Y | Y | diff --git a/docs/guides/configuration.md b/docs/guides/configuration.md index d2e294a589..6d5d24fdf8 100644 --- a/docs/guides/configuration.md +++ b/docs/guides/configuration.md @@ -908,6 +908,7 @@ These pages describe the connection configuration options for each execution eng * [Athena](../integrations/engines/athena.md) * [BigQuery](../integrations/engines/bigquery.md) * [Databricks](../integrations/engines/databricks.md) +* [Doris](../integrations/engines/doris.md) * [DuckDB](../integrations/engines/duckdb.md) * [Fabric](../integrations/engines/fabric.md) * [MotherDuck](../integrations/engines/motherduck.md) @@ -950,6 +951,7 @@ Unsupported state engines, even for development: * [ClickHouse](../integrations/engines/clickhouse.md) * [Spark](../integrations/engines/spark.md) * [Trino](../integrations/engines/trino.md) +* [Doris](../integrations/engines/doris.md) This example gateway configuration uses Snowflake for the data warehouse connection and Postgres for the state backend connection: diff --git a/docs/guides/connections.md b/docs/guides/connections.md index e0dca0f7a4..ec243a3675 100644 --- a/docs/guides/connections.md +++ b/docs/guides/connections.md @@ -81,6 +81,7 @@ default_gateway: local_db * [BigQuery](../integrations/engines/bigquery.md) * [Databricks](../integrations/engines/databricks.md) +* [Doris](../integrations/engines/doris.md) * [DuckDB](../integrations/engines/duckdb.md) * [MotherDuck](../integrations/engines/motherduck.md) * [MySQL](../integrations/engines/mysql.md) diff --git a/docs/integrations/engines/doris.md b/docs/integrations/engines/doris.md new file mode 100644 index 0000000000..77fa066fd7 --- /dev/null +++ b/docs/integrations/engines/doris.md @@ -0,0 +1,324 @@ +# Apache Doris + +## Overview + +[Apache Doris](https://doris.apache.org/) is a modern analytical database product based on an MPP architecture. It provides real-time analytical capabilities, supporting both high-concurrency point queries and high-throughput complex analysis. + +SQLMesh supports Doris through its MySQL-compatible protocol, while providing Doris-specific optimizations for table models, indexing, partitioning, and other features. The adapter is designed to leverage Doris's strengths for analytical workloads, with sensible defaults and support for advanced configuration. + +## Connection Configuration Example + +```yaml +doris: + connection: + type: doris + host: fe.doris.cluster # Frontend (FE) node address + port: 9030 # Query port (default: 9030) + user: doris_user + password: your_password + database: your_database + # Optional MySQL-compatible settings + charset: utf8mb4 + connect_timeout: 60 + state_connection: + # Use duckdb as state connection + type: duckdb +``` + +## Table Models + +Doris supports three table models: DUPLICATE, UNIQUE, and AGGREGATE. SQLMesh supports **DUPLICATE** and **UNIQUE** models through the `physical_properties` configuration. + +### DUPLICATE Model (Default) + +**Example Configuration:** +```sql +MODEL ( + name user_events, + kind FULL, + physical_properties ( + duplicate_key ('user_id', 'event_time'), + distributed_by ( + kind = 'HASH', + expressions = 'user_id', + buckets = 10 + ) + ) +); +``` + +### UNIQUE Model + +**Example Configuration:** +```sql +MODEL ( + name user_events, + kind FULL, + physical_properties ( + unique_key 'user_id', + distributed_by ( + kind = 'HASH', + expressions = 'user_id', + buckets = 16 + ) + ) +); +``` + +## Table Properties + +The Doris adapter supports a comprehensive set of table properties that can be configured in the `physical_properties` section of your model. + +### Core Table Properties + +| Property | Type | Description | Example | +| --------------------- | --------------------- | ------------------------------------------- | ---------------------------------------------------------- | +| `unique_key` | `Tuple[str]` or `str` | Defines unique key columns for UNIQUE model | `('user_id')` or `'user_id'` | +| `duplicate_key` | `Tuple[str]` or `str` | Defines key columns for DUPLICATE model | `('user_id', 'event_time')` | +| `distributed_by` | `Dict` | Distribution configuration | See Distribution section | +| `partitions` | `Tuple[str]` or `str` | Custom partition expression | `'FROM ("2000-11-14") TO ("2099-11-14") INTERVAL 1 MONTH'` | + +### Distribution Configuration + +The `distributed_by` property supports multiple formats: + +**Dictionary Format:** +```sql +MODEL ( + name my_table, + kind FULL, + physical_properties ( + distributed_by ( + kind = 'HASH', + expressions = 'user_id', + buckets = 10 + ) + ) +); +``` + +```sql +MODEL ( + name my_table, + kind FULL, + physical_properties ( + distributed_by ( + kind = 'RANDOM' + ) + ) +); +``` + +**Supported Distribution Types:** +- `HASH`: Hash-based distribution (most common) +- `RANDOM`: Random distribution + +**Bucket Configuration:** +- Integer value: Fixed number of buckets (e.g., `10`) +- `'AUTO'`: Automatic bucket calculation + +### Partitioning + +Doris table supports range partitioning and list partitioning to improve query performance. + +**Custom Partition Expression:** +```sql +MODEL ( + name my_partitioned_model, + kind INCREMENTAL_BY_TIME_RANGE(time_column (event_date, '%Y-%m-%d')), + partitioned_by RANGE(event_date), + physical_properties ( + partitions = 'FROM ("2000-11-14") TO ("2099-11-14") INTERVAL 2 YEAR', + ), +); +``` + +```sql +MODEL ( + name my_custom_partitioned_model, + kind FULL, + partitioned_by RANGE(event_date), + physical_properties ( + partitioned_by_expr = ( + 'PARTITION `p2023` VALUES [("2023-01-01"), ("2024-01-01"))', + 'PARTITION `p2024` VALUES [("2024-01-01"), ("2025-01-01"))', + 'PARTITION `p2025` VALUES [("2025-01-01"), ("2026-01-01"))', + 'PARTITION `other` VALUES LESS THAN MAXVALUE' + ), + ) +); +``` + +### Generic Properties + +Any additional properties in `physical_properties` are passed through as Doris table properties: + +```sql +MODEL ( + name advanced_table, + kind FULL, + physical_properties ( + unique_key = 'id', + distributed_by ( + kind = 'HASH', + expressions = 'id', + buckets = 8 + ), + replication_allocation = 'tag.location.default: 3', + in_memory = 'false', + storage_format = 'V2', + disable_auto_compaction = 'false', + ) +); +``` + +## Materialized Views + +SQLMesh supports creating materialized views in Doris with comprehensive configuration options. + +### Basic Materialized View + +```sql +MODEL ( + name user_summary_mv, + kind VIEW ( + materialized true + ) +); + +SELECT + user_id, + COUNT(*) as event_count, + MAX(event_time) as last_event +FROM user_events +GROUP BY user_id; +``` + +### Advanced Materialized View Configuration + +```sql +MODEL ( + name sqlmesh_test.view_materialized1, + kind VIEW ( + materialized true + ), + partitioned_by ds, + physical_properties ( + build = 'IMMEDIATE', + refresh = 'AUTO', + refresh_trigger = 'ON SCHEDULE EVERY 12 hour', + unique_key = id, + distributed_by = (kind='HASH', expressions=id, buckets=10), + replication_allocation = 'tag.location.default: 3', + in_memory = 'false', + storage_format = 'V2', + disable_auto_compaction = 'false' + ), + description "customer zip", + columns ( + id int, + ds datetime, + zip int, + ), + column_descriptions ( + id = "order id", + zip = "zip code", + ) +); +``` + +### Materialized View Properties + +| Property | Description | Values | +| --------------------- | ------------------------------------------------------------------------------- | ---------------------------------------------------------- | +| `build` | Build strategy | `'IMMEDIATE'`, `'DEFERRED'` | +| `refresh` | Refresh strategy | `'COMPLETE'`, `'AUTO'` | +| `refresh_trigger` | Schedule for automatic refresh | `'MANUAL'`, `'ON SCHEDULE INTERVAL 1 HOUR'`, `'ON COMMIT'` | +| `unique_key` | Unique key columns | `'user_id'` or `['user_id', 'date']` | +| `duplicate_key` | Duplicate key columns | `'user_id'` or `['user_id', 'date']` | +| `materialized_type` | Materialized type | `SYNC`, `ASYNC` | +| `source_table` | Source table of synchronous materialized view | `schema_name`.`table_name` | + +## Indexing + +SQLMesh supports creating indexes in Doris to accelerate queries. You can define indexes in your model's DDL. + +**Example:** +```sql +MODEL ( + name my_indexed_table, + kind FULL +); + +SELECT + user_id, + username, + city +FROM + users; + +@IF( + @runtime_stage = 'creating', + CREATE INDEX idx_username ON my_indexed_table (username) USING INVERTED COMMENT 'Inverted index on username' +); +``` + +## Comments + +SQLMesh supports adding comments to tables and columns with automatic truncation to Doris limits. + +- **Table Comments**: Use the `description` property in the `MODEL` definition +- **Column Comments**: Use the `column_descriptions` property in the `MODEL` definition + +```sql +MODEL ( + name my_commented_table, + kind TABLE, + description 'This is a comprehensive table comment that describes the purpose and usage of this table in detail.', + column_descriptions ( + id = "Unique identifier for each record", + user_id = "Foreign key reference to users table", + event_type = "Type of event that occurred" + ) +); +``` + +**Limits:** +- Table comments: 2048 characters (automatically truncated) +- Column comments: 255 characters (automatically truncated) + +## Views + +SQLMesh supports both regular and materialized views in Doris. + +### Regular Views + +```sql +MODEL ( + name user_summary_view, + kind VIEW +); + +SELECT + user_id, + COUNT(*) as event_count, + MAX(event_time) as last_event +FROM user_events +GROUP BY user_id; +``` + +## Dependencies + +To use Doris with SQLMesh, install the required MySQL driver: + +```bash +pip install "sqlmesh[doris]" +# or +pip install pymysql +``` + +## Resources + +- [Doris Documentation](https://doris.apache.org/docs/) +- [Doris Data Models Guide](https://doris.apache.org/docs/table-design/data-model/) +- [Doris SQL Reference](https://doris.apache.org/docs/sql-manual/) \ No newline at end of file diff --git a/docs/integrations/overview.md b/docs/integrations/overview.md index 94b9289d21..e665fc3843 100644 --- a/docs/integrations/overview.md +++ b/docs/integrations/overview.md @@ -16,6 +16,7 @@ SQLMesh supports the following execution engines for running SQLMesh projects (e * [BigQuery](./engines/bigquery.md) (bigquery) * [ClickHouse](./engines/clickhouse.md) (clickhouse) * [Databricks](./engines/databricks.md) (databricks) +* [Doris](./engines/doris.md) (doris) * [DuckDB](./engines/duckdb.md) (duckdb) * [Fabric](./engines/fabric.md) (fabric) * [MotherDuck](./engines/motherduck.md) (motherduck) diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index 676f9d7389..661309a33d 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -213,6 +213,7 @@ These pages describe the connection configuration options for each execution eng * [BigQuery](../integrations/engines/bigquery.md) * [ClickHouse](../integrations/engines/clickhouse.md) * [Databricks](../integrations/engines/databricks.md) +* [Doris](../integrations/engines/doris.md) * [DuckDB](../integrations/engines/duckdb.md) * [MotherDuck](../integrations/engines/motherduck.md) * [MySQL](../integrations/engines/mysql.md) diff --git a/mkdocs.yml b/mkdocs.yml index 47ddca54e9..faf4759a49 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -82,6 +82,7 @@ nav: - integrations/engines/bigquery.md - integrations/engines/clickhouse.md - integrations/engines/databricks.md + - integrations/engines/doris.md - integrations/engines/duckdb.md - integrations/engines/fabric.md - integrations/engines/motherduck.md diff --git a/pyproject.toml b/pyproject.toml index 2e39219710..0df3078f91 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -106,6 +106,7 @@ dev = [ ] dbt = ["dbt-core<2"] dlt = ["dlt"] +doris = ["pymysql"] duckdb = [] fabric = ["pyodbc>=5.0.0"] gcppostgres = ["cloud-sql-python-connector[pg8000]>=1.8.0"] @@ -268,6 +269,7 @@ markers = [ "pyspark: test for PySpark that need to run separately from the other spark tests", "trino: test for Trino (all connectors)", "risingwave: test for Risingwave", + "doris: test for Doris", # Other "set_default_connection", diff --git a/sqlmesh/cli/project_init.py b/sqlmesh/cli/project_init.py index 0790562de7..252fe7f77d 100644 --- a/sqlmesh/cli/project_init.py +++ b/sqlmesh/cli/project_init.py @@ -175,7 +175,7 @@ class ExampleObjects: python_macros: t.Dict[str, str] -def _gen_example_objects(schema_name: str) -> ExampleObjects: +def _gen_example_objects(schema_name: str, dialect: str) -> ExampleObjects: sql_models: t.Dict[str, str] = {} python_models: t.Dict[str, str] = {} seeds: t.Dict[str, str] = {} @@ -208,6 +208,7 @@ def _gen_example_objects(schema_name: str) -> ExampleObjects: name {incremental_model_name}, kind INCREMENTAL_BY_TIME_RANGE ( time_column event_date + {",partition_by_time_column false" if dialect == "doris" else ""} ), start '2020-01-01', cron '@daily', @@ -360,7 +361,10 @@ def init_example_project( ) return config_path - example_objects = _gen_example_objects(schema_name=schema_name) + example_objects = _gen_example_objects( + schema_name=schema_name, + dialect=dialect if dialect else DIALECT_TO_TYPE.get(engine_type, "duckdb"), + ) if template != ProjectTemplate.EMPTY: _create_object_files(models_path, example_objects.sql_models, "sql") diff --git a/sqlmesh/core/config/__init__.py b/sqlmesh/core/config/__init__.py index 0dc99c0fd1..dfe6243974 100644 --- a/sqlmesh/core/config/__init__.py +++ b/sqlmesh/core/config/__init__.py @@ -12,6 +12,7 @@ BigQueryConnectionConfig as BigQueryConnectionConfig, ConnectionConfig as ConnectionConfig, DatabricksConnectionConfig as DatabricksConnectionConfig, + DorisConnectionConfig as DorisConnectionConfig, DuckDBConnectionConfig as DuckDBConnectionConfig, FabricConnectionConfig as FabricConnectionConfig, GCPPostgresConnectionConfig as GCPPostgresConnectionConfig, diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 1678f5d147..cd2afbdc4e 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -56,6 +56,8 @@ "trino", # Nullable types are problematic "clickhouse", + # Do not support table name starts with "_" + "doris", } MOTHERDUCK_TOKEN_REGEX = re.compile(r"(\?|\&)(motherduck_token=)(\S*)") @@ -2278,6 +2280,79 @@ def init(cursor: t.Any) -> None: return init +class DorisConnectionConfig(ConnectionConfig): + """Configuration for the Apache Doris connection. + + Apache Doris uses MySQL network protocol and is compatible with MySQL ecosystem tools, + JDBC/ODBC drivers, and various visualization tools. + + Args: + host: The hostname of the Doris FE (Frontend) node. + user: The Doris username. + password: The Doris password. + port: The port number of the Doris FE node. Default is 9030. + database: The optional database name. + charset: The optional character set. + collation: The optional collation. + ssl_disabled: Whether to disable SSL connection. + concurrent_tasks: The maximum number of tasks that can use this connection concurrently. + register_comments: Whether or not to register model comments with the SQL engine. + local_infile: Whether or not to allow local file access. + pre_ping: Whether or not to pre-ping the connection before starting a new transaction to ensure it is still alive. + """ + + host: str + user: str + password: str + port: t.Optional[int] = 9030 + database: t.Optional[str] = None + charset: t.Optional[str] = None + collation: t.Optional[str] = None + ssl_disabled: t.Optional[bool] = None + concurrent_tasks: int = 4 + register_comments: bool = True + local_infile: bool = False + pre_ping: bool = True + + type_: t.Literal["doris"] = Field(alias="type", default="doris") + DIALECT: t.ClassVar[t.Literal["doris"]] = "doris" + DISPLAY_NAME: t.ClassVar[t.Literal["Apache Doris"]] = "Apache Doris" + DISPLAY_ORDER: t.ClassVar[t.Literal[18]] = 18 + + _engine_import_validator = _get_engine_import_validator("pymysql", "doris") + + @property + def _connection_kwargs_keys(self) -> t.Set[str]: + connection_keys = { + "host", + "user", + "password", + } + if self.port is not None: + connection_keys.add("port") + if self.database is not None: + connection_keys.add("database") + if self.charset is not None: + connection_keys.add("charset") + if self.collation is not None: + connection_keys.add("collation") + if self.ssl_disabled is not None: + connection_keys.add("ssl_disabled") + if self.local_infile is not None: + connection_keys.add("local_infile") + return connection_keys + + @property + def _engine_adapter(self) -> t.Type[EngineAdapter]: + return engine_adapter.DorisEngineAdapter + + @property + def _connection_factory(self) -> t.Callable: + from pymysql import connect + + return connect + + CONNECTION_CONFIG_TO_TYPE = { # Map all subclasses of ConnectionConfig to the value of their `type_` field. tpe.all_field_infos()["type_"].default: tpe diff --git a/sqlmesh/core/engine_adapter/__init__.py b/sqlmesh/core/engine_adapter/__init__.py index ab29885c7b..ec615fd443 100644 --- a/sqlmesh/core/engine_adapter/__init__.py +++ b/sqlmesh/core/engine_adapter/__init__.py @@ -9,6 +9,7 @@ from sqlmesh.core.engine_adapter.bigquery import BigQueryEngineAdapter from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter from sqlmesh.core.engine_adapter.databricks import DatabricksEngineAdapter +from sqlmesh.core.engine_adapter.doris import DorisEngineAdapter from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter from sqlmesh.core.engine_adapter.mysql import MySQLEngineAdapter @@ -37,6 +38,7 @@ "athena": AthenaEngineAdapter, "risingwave": RisingwaveEngineAdapter, "fabric": FabricEngineAdapter, + "doris": DorisEngineAdapter, } DIALECT_ALIASES = { diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index c48ce2154d..1cb367ab5c 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -2652,6 +2652,13 @@ def _to_sql(self, expression: exp.Expression, quote: bool = True, **kwargs: t.An return expression.sql(**sql_gen_kwargs, copy=False) # type: ignore + def _get_temp_table_name(self, table: TableName) -> str: + """ + Get the name of the temp table. + """ + table_obj = exp.to_table(table) + return f"__temp_{table_obj.name}_{random_id(short=True)}" + def _get_data_objects( self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None ) -> t.List[DataObject]: @@ -2668,7 +2675,8 @@ def _get_temp_table( """ table = t.cast(exp.Table, exp.to_table(table).copy()) table.set( - "this", exp.to_identifier(f"__temp_{table.name}_{random_id(short=True)}", quoted=quoted) + "this", + exp.to_identifier(self._get_temp_table_name(table), quoted=quoted), ) if table_only: diff --git a/sqlmesh/core/engine_adapter/doris.py b/sqlmesh/core/engine_adapter/doris.py new file mode 100644 index 0000000000..41fcf6711d --- /dev/null +++ b/sqlmesh/core/engine_adapter/doris.py @@ -0,0 +1,1118 @@ +from __future__ import annotations + +import logging +import typing as t +import re + +from sqlglot import exp, parse_one + +from sqlmesh.core.dialect import to_schema + +from sqlmesh.core.engine_adapter.base import ( + InsertOverwriteStrategy, +) +from sqlmesh.core.engine_adapter.mixins import ( + LogicalMergeMixin, + NonTransactionalTruncateMixin, + PandasNativeFetchDFSupportMixin, +) +from sqlmesh.core.engine_adapter.shared import ( + CommentCreationTable, + CommentCreationView, + DataObject, + DataObjectType, + set_catalog, +) +from sqlmesh.utils import random_id +from sqlmesh.utils.errors import ( + SQLMeshError, +) + +if t.TYPE_CHECKING: + from sqlmesh.core._typing import SchemaName, TableName + from sqlmesh.core.engine_adapter._typing import QueryOrDF + from sqlmesh.core.node import IntervalUnit + +logger = logging.getLogger(__name__) + + +@set_catalog() +class DorisEngineAdapter( + LogicalMergeMixin, PandasNativeFetchDFSupportMixin, NonTransactionalTruncateMixin +): + DIALECT = "doris" + DEFAULT_BATCH_SIZE = 200 + SUPPORTS_TRANSACTIONS = False + COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS + COMMENT_CREATION_VIEW = CommentCreationView.IN_SCHEMA_DEF_NO_COMMANDS + MAX_TABLE_COMMENT_LENGTH = 2048 + MAX_COLUMN_COMMENT_LENGTH = 255 + SUPPORTS_INDEXES = True + SUPPORTS_REPLACE_TABLE = False + MAX_IDENTIFIER_LENGTH = 64 + SUPPORTS_MATERIALIZED_VIEWS = True + SUPPORTS_MATERIALIZED_VIEW_SCHEMA = True + SUPPORTS_CREATE_DROP_CATALOG = False + INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT + + def create_schema( + self, + schema_name: SchemaName, + ignore_if_exists: bool = True, + warn_on_error: bool = True, + properties: t.Optional[t.List[exp.Expression]] = None, + ) -> None: + """Create a schema.""" + properties = properties or [] + return super()._create_schema( + schema_name=schema_name, + ignore_if_exists=ignore_if_exists, + warn_on_error=warn_on_error, + properties=properties, + kind="DATABASE", + ) + + def drop_schema( + self, + schema_name: SchemaName, + ignore_if_not_exists: bool = True, + cascade: bool = False, + **drop_args: t.Dict[str, exp.Expression], + ) -> None: + """Drop schema in Doris. Note: Doris doesn't support CASCADE clause.""" + # In Doris, a schema is a database + return self._drop_object( + name=schema_name, + exists=ignore_if_not_exists, + kind="DATABASE", + cascade=False, + **drop_args, + ) + + def _get_data_objects( + self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None + ) -> t.List[DataObject]: + """ + Returns all the data objects that exist in the given schema. + Uses information_schema tables which are compatible with MySQL protocol. + """ + query = ( + exp.select( + exp.column("table_schema").as_("schema_name"), + exp.column("table_name").as_("name"), + exp.case() + .when( + exp.column("table_type").eq("BASE TABLE"), + exp.Literal.string("table"), + ) + .when( + exp.column("table_type").eq("VIEW"), + exp.Literal.string("view"), + ) + .else_("table_type") + .as_("type"), + ) + .from_(exp.table_("tables", db="information_schema")) + .where(exp.column("table_schema").eq(to_schema(schema_name).db)) + ) + if object_names: + # Doris may treat information_schema table_name comparisons as case-sensitive depending on settings. + # Use LOWER(table_name) to match case-insensitively. + lowered_names = [name.lower() for name in object_names] + query = query.where(exp.func("LOWER", exp.column("table_name")).isin(*lowered_names)) + + result = [] + for schema_name, table_name, table_type in self.fetchall(query): + try: + schema = str(schema_name) if schema_name is not None else str(schema_name) + name = str(table_name) if table_name is not None else "unknown" + obj_type = str(table_type) if table_type is not None else "table" + + # Normalize type + if obj_type.upper() == "BASE TABLE": + obj_type = "table" + elif obj_type.upper() == "VIEW": + obj_type = "view" + + data_object = DataObject( + schema=schema, + name=name, + type=DataObjectType.from_str(obj_type), + ) + result.append(data_object) + except (ValueError, AttributeError) as e: + logger.error( + f"Error processing row: {e}, row: {(schema_name, table_name, table_type)}" + ) + continue + + return result + + def create_view( + self, + view_name: TableName, + query_or_df: QueryOrDF, + target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, + replace: bool = True, + materialized: bool = False, + materialized_properties: t.Optional[t.Dict[str, t.Any]] = None, + table_description: t.Optional[str] = None, + column_descriptions: t.Optional[t.Dict[str, str]] = None, + view_properties: t.Optional[t.Dict[str, exp.Expression]] = None, + source_columns: t.Optional[t.List[str]] = None, + **create_kwargs: t.Any, + ) -> None: + if replace: + self.drop_view( + view_name, + ignore_if_not_exists=True, + materialized=materialized, + view_properties=view_properties, + ) + if not materialized: + return super().create_view( + view_name, + query_or_df, + target_columns_to_types, + replace=False, + materialized=False, + materialized_properties=materialized_properties, + table_description=table_description, + column_descriptions=column_descriptions, + view_properties=view_properties, + source_columns=source_columns, + **create_kwargs, + ) + self._create_materialized_view( + view_name, + query_or_df, + target_columns_to_types=target_columns_to_types, + materialized_properties=materialized_properties, + table_description=table_description, + column_descriptions=column_descriptions, + view_properties=view_properties, + source_columns=source_columns, + **create_kwargs, + ) + + def _create_materialized_view( + self, + view_name: TableName, + query_or_df: QueryOrDF, + target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, + materialized_properties: t.Optional[t.Dict[str, t.Any]] = None, + table_description: t.Optional[str] = None, + column_descriptions: t.Optional[t.Dict[str, str]] = None, + view_properties: t.Optional[t.Dict[str, exp.Expression]] = None, + source_columns: t.Optional[t.List[str]] = None, + **create_kwargs: t.Any, + ) -> None: + # Convert query_or_df to proper format using base infrastructure + query_or_df = self._native_df_to_pandas_df(query_or_df) + import pandas as pd + + if isinstance(query_or_df, pd.DataFrame): + values: t.List[t.Tuple[t.Any, ...]] = list( + query_or_df.itertuples(index=False, name=None) + ) + target_columns_to_types, source_columns = self._columns_to_types( + query_or_df, target_columns_to_types, source_columns + ) + if not target_columns_to_types: + raise SQLMeshError("columns_to_types must be provided for dataframes") + query_or_df = self._values_to_sql( + values, + target_columns_to_types, + batch_start=0, + batch_end=len(values), + ) + + source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( + query_or_df, target_columns_to_types, batch_size=0, target_table=view_name + ) + if len(source_queries) != 1: + raise SQLMeshError("Only one source query is supported for creating materialized views") + + # Build the CREATE MATERIALIZED VIEW statement using SQLGlot AST + with source_queries[0] as query: + target_table = exp.to_table(view_name) + + # Build schema for column list and comments if provided + schema: t.Union[exp.Table, exp.Schema] = target_table + if target_columns_to_types: + schema = self._build_schema_exp( + target_table, + target_columns_to_types, + column_descriptions, + is_view=True, + ) + + # Partitioning may be specified in different places for materialized views + partitioned_by = None + if materialized_properties and materialized_properties.get("partitioned_by"): + partitioned_by = materialized_properties.get("partitioned_by") + elif create_kwargs.get("partitioned_by"): + partitioned_by = create_kwargs.get("partitioned_by") + + # Use the unified _build_table_properties_exp to handle all properties + properties_exp = self._build_table_properties_exp( + catalog_name=target_table.catalog, + table_properties=view_properties, + target_columns_to_types=target_columns_to_types, + table_description=table_description, + partitioned_by=partitioned_by, + partition_interval_unit=( + materialized_properties.get("partition_interval_unit") + if materialized_properties + else None + ), + table_kind="MATERIALIZED_VIEW", + ) + + self.execute( + exp.Create( + this=schema, + kind="VIEW", + replace=False, + expression=query, + properties=properties_exp, + ) + ) + + def drop_view( + self, + view_name: TableName, + ignore_if_not_exists: bool = True, + materialized: bool = False, + **kwargs: t.Any, + ) -> None: + """ + Drop view in Doris. + """ + if materialized and kwargs.get("view_properties"): + view_properties = kwargs.pop("view_properties") + if view_properties.get("materialized_type") == "SYNC" and view_properties.get( + "source_table" + ): + # Format the source table name properly for Doris + source_table = view_properties.get("source_table") + if isinstance(source_table, exp.Table): + source_table_sql = source_table.sql(dialect=self.dialect, identify=True) + else: + source_table_sql = str(source_table) + drop_sql = f"DROP MATERIALIZED VIEW {'IF EXISTS ' if ignore_if_not_exists else ''}{view_name} ON {source_table_sql}" + self.execute(drop_sql) + return + super().drop_view(view_name, ignore_if_not_exists, materialized, **kwargs) + + def create_table_like( + self, + target_table_name: TableName, + source_table_name: TableName, + exists: bool = True, + **kwargs: t.Any, + ) -> None: + self.execute( + exp.Create( + this=exp.to_table(target_table_name), + kind="TABLE", + exists=exists, + properties=exp.Properties( + expressions=[ + exp.LikeProperty( + this=exp.to_table(source_table_name), + ), + ], + ), + ) + ) + + def _create_table_comment( + self, table_name: TableName, table_comment: str, table_kind: str = "TABLE" + ) -> None: + table_sql = exp.to_table(table_name).sql(dialect=self.dialect, identify=True) + + self.execute( + f'ALTER TABLE {table_sql} MODIFY COMMENT "{self._truncate_table_comment(table_comment)}"' + ) + + def _build_create_comment_column_exp( + self, table: exp.Table, column_name: str, column_comment: str, table_kind: str = "TABLE" + ) -> exp.Comment | str: + table_sql = table.sql(dialect=self.dialect, identify=True) + return f'ALTER TABLE {table_sql} MODIFY COLUMN {column_name} COMMENT "{self._truncate_column_comment(column_comment)}"' + + def delete_from( + self, table_name: TableName, where: t.Optional[t.Union[str, exp.Expression]] = None + ) -> None: + """ + Delete from a table. + + Args: + table_name: The table to delete from. + where: The where clause to filter rows to delete. + """ + if not where or where == exp.true(): + table_expr = exp.to_table(table_name) if isinstance(table_name, str) else table_name + self.execute(f"TRUNCATE TABLE {table_expr.sql(dialect=self.dialect, identify=True)}") + return + + # Parse where clause if it's a string + if isinstance(where, str): + where = parse_one(where, dialect=self.dialect) + + # Check if WHERE contains subqueries with IN/NOT IN operators + subquery_expr = self._find_subquery_in_condition(where) + if subquery_expr: + self._execute_delete_with_subquery(table_name, subquery_expr) + else: + # Use base implementation for simple conditions + super().delete_from(table_name, where) + + def _find_subquery_in_condition( + self, where: exp.Expression + ) -> t.Optional[t.Tuple[exp.Expression, exp.Expression, bool]]: + """ + Find subquery in IN/NOT IN condition. + + Returns: + Tuple of (column_expr, subquery, is_not_in) or None if no subquery found + """ + # Check for NOT IN expressions first (NOT wrapping an IN expression) + for not_expr in where.find_all(exp.Not): + if isinstance(not_expr.this, exp.In) and self._is_subquery_expression(not_expr.this): + return not_expr.this.args["this"], not_expr.this.args["query"], True + + # Check for IN expressions with subqueries (only if not already found as NOT IN) + for expr in where.find_all(exp.In): + if self._is_subquery_expression(expr): + return expr.args["this"], expr.args["query"], False + + return None + + def _is_subquery_expression(self, expr: exp.Expression) -> bool: + """Check if expression contains a subquery.""" + return ( + "query" in expr.args + and expr.args["query"] + and isinstance(expr.args["query"], exp.Subquery) + ) + + def _execute_delete_with_subquery( + self, table_name: TableName, subquery_info: t.Tuple[exp.Expression, exp.Expression, bool] + ) -> None: + """ + Execute DELETE FROM with subquery using Doris USING syntax. + + Args: + table_name: Target table name + subquery_info: Tuple of (column_expr, subquery, is_not_in) + """ + column_expr, subquery, is_not_in = subquery_info + + # Build join condition + join_condition = self._build_join_condition(column_expr, is_not_in) + + # Build and execute DELETE statement + target_sql = f"{exp.to_table(table_name).sql(dialect=self.dialect, identify=True)} AS `_t1`" + subquery_sql = f"{subquery.sql(dialect=self.dialect, identify=True)} AS `_t2`" + where_sql = join_condition.sql(dialect=self.dialect, identify=True) + + delete_sql = f"DELETE FROM {target_sql} USING {subquery_sql} WHERE {where_sql}" + self.execute(delete_sql) + + def _build_join_condition(self, column_expr: exp.Expression, is_not_in: bool) -> exp.Expression: + """Build join condition for DELETE USING statement.""" + if isinstance(column_expr, exp.Tuple): + # Multiple columns: (id, name) IN (...) + join_conditions = [] + for col in column_expr.expressions: + condition = self._build_column_condition(col, is_not_in) + join_conditions.append(condition) + return exp.and_(*join_conditions) + # Single column: id IN (...) + return self._build_column_condition(column_expr, is_not_in) + + def _build_column_condition(self, column: exp.Expression, is_not_in: bool) -> exp.Expression: + """Build condition for a single column.""" + t1_col = exp.column(column.name, table="_t1") + t2_col = exp.column(column.name, table="_t2") + return t1_col.neq(t2_col) if is_not_in else t1_col.eq(t2_col) + + def _create_table_from_columns( + self, + table_name: TableName, + target_columns_to_types: t.Dict[str, exp.DataType], + primary_key: t.Optional[t.Tuple[str, ...]] = None, + exists: bool = True, + table_description: t.Optional[str] = None, + column_descriptions: t.Optional[t.Dict[str, str]] = None, + **kwargs: t.Any, + ) -> None: + """ + Create a table using a DDL statement. + + Args: + table_name: The name of the table to create. Can be fully qualified or just table name. + target_columns_to_types: Mapping between the column name and its data type. + primary_key: Determines the table primary key. + exists: Indicates whether to include the IF NOT EXISTS check. + table_description: Optional table description from MODEL DDL. + column_descriptions: Optional column descriptions from model query. + kwargs: Optional create table properties. + """ + table_properties = kwargs.get("table_properties", {}) + + # Convert primary_key to unique_key for Doris (Doris doesn't support primary keys) + if primary_key and "unique_key" not in table_properties: + # Represent as a Tuple of columns to match downstream handling + table_properties["unique_key"] = exp.Tuple( + expressions=[exp.to_column(col) for col in primary_key] + ) + + # Update kwargs with the modified table_properties + kwargs["table_properties"] = table_properties + + # Call the parent implementation with primary_key=None since we've converted it to unique_key + super()._create_table_from_columns( + table_name=table_name, + target_columns_to_types=target_columns_to_types, + primary_key=None, # Set to None since we've converted it to unique_key + exists=exists, + table_description=table_description, + column_descriptions=column_descriptions, + **kwargs, + ) + + def _parse_partition_expressions( + self, partitioned_by: t.List[exp.Expression] + ) -> t.Tuple[t.List[exp.Expression], t.Optional[str]]: + """Parse partition expressions and extract partition kind and normalized columns. + + Returns: + Tuple of (normalized_partitioned_by, partition_kind) + """ + parsed_partitioned_by: t.List[exp.Expression] = [] + partition_kind: t.Optional[str] = None + + for expr in partitioned_by: + try: + # Handle Anonymous function calls like RANGE(col) or LIST(col) + if isinstance(expr, exp.Anonymous) and expr.this: + func_name = str(expr.this).upper() + if func_name in ("RANGE", "LIST"): + partition_kind = func_name + # Extract column expressions from function arguments + for arg in expr.expressions: + if isinstance(arg, exp.Column): + parsed_partitioned_by.append(arg) + else: + # Convert other expressions to columns if possible + parsed_partitioned_by.append(exp.to_column(str(arg))) + continue + + # Handle literal strings like "RANGE(col)" or "LIST(col)" + if isinstance(expr, exp.Literal) and getattr(expr, "is_string", False): + text = str(expr.this) + match = re.match(r"^\s*(RANGE|LIST)\s*\((.*?)\)\s*$", text, flags=re.IGNORECASE) + if match: + partition_kind = match.group(1).upper() + inner = match.group(2) + inner_cols = [c.strip().strip("`") for c in inner.split(",") if c.strip()] + for col in inner_cols: + parsed_partitioned_by.append(exp.to_column(col)) + continue + except Exception: + # If anything goes wrong, keep the original expr + pass + parsed_partitioned_by.append(expr) + + return parsed_partitioned_by, partition_kind + + def _build_partitioned_by_exp( + self, + partitioned_by: t.List[exp.Expression], + *, + partition_interval_unit: t.Optional[IntervalUnit] = None, + target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, + catalog_name: t.Optional[str] = None, + **kwargs: t.Any, + ) -> t.Optional[ + t.Union[ + exp.PartitionedByProperty, + exp.PartitionByRangeProperty, + exp.PartitionByListProperty, + exp.Property, + ] + ]: + """Build Doris partitioning expression. + + Supports both RANGE and LIST partition syntaxes using sqlglot's doris dialect nodes. + The partition kind is chosen by: + - inferred from partitioned_by expressions like 'RANGE(col)' or 'LIST(col)' + - otherwise inferred from the provided 'partitions' strings: if any contains 'VALUES IN' -> LIST; else RANGE. + """ + partitions = kwargs.get("partitions") + create_expressions = None + + def to_raw_sql(expr: t.Union[exp.Literal, exp.Var, str, t.Any]) -> exp.Var: + # If it's a Literal, extract the string and wrap as Var (no quotes) + if isinstance(expr, exp.Literal): + return exp.Var(this=expr.this, quoted=False) + # If it's already a Var, return as is + if isinstance(expr, exp.Var): + return expr + # If it's a string, wrap as Var (no quotes) + if isinstance(expr, str): + return exp.Var(this=expr, quoted=False) + # Fallback: return as is + return expr + + # Parse partition kind and columns from partitioned_by expressions + partitioned_by, partition_kind = self._parse_partition_expressions(partitioned_by) + + if partitions: + if isinstance(partitions, exp.Tuple): + create_expressions = [ + exp.Var(this=e.this, quoted=False) + if isinstance(e, exp.Literal) + else to_raw_sql(e) + for e in partitions.expressions + ] + elif isinstance(partitions, exp.Literal): + create_expressions = [exp.Var(this=partitions.this, quoted=False)] + else: + create_expressions = [to_raw_sql(partitions)] + + # Infer partition kind from partitions text if not explicitly provided + inferred_list = False + if partition_kind is None and create_expressions: + try: + texts = [getattr(e, "this", "").upper() for e in create_expressions] + inferred_list = any("VALUES IN" in t for t in texts) + except Exception: + inferred_list = False + if partition_kind: + kind_upper = str(partition_kind).upper() + is_list = kind_upper == "LIST" + else: + is_list = inferred_list + + try: + if is_list: + return exp.PartitionByListProperty( + partition_expressions=partitioned_by, + create_expressions=create_expressions, + ) + return exp.PartitionByRangeProperty( + partition_expressions=partitioned_by, + create_expressions=create_expressions, + ) + except TypeError: + if is_list: + return exp.PartitionByListProperty( + partition_expressions=partitioned_by, + create_expressions=create_expressions, + ) + return exp.PartitionByRangeProperty( + partition_expressions=partitioned_by, + create_expressions=create_expressions, + ) + + def _build_table_properties_exp( + self, + catalog_name: t.Optional[str] = None, + table_format: t.Optional[str] = None, + storage_format: t.Optional[str] = None, + partitioned_by: t.Optional[t.List[exp.Expression]] = None, + partition_interval_unit: t.Optional[IntervalUnit] = None, + clustered_by: t.Optional[t.List[exp.Expression]] = None, + table_properties: t.Optional[t.Dict[str, t.Any]] = None, + target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, + table_description: t.Optional[str] = None, + table_kind: t.Optional[str] = None, + **kwargs: t.Any, + ) -> t.Optional[exp.Properties]: + """Creates a SQLGlot table properties expression for ddl.""" + properties: t.List[exp.Expression] = [] + is_materialized_view = table_kind == "MATERIALIZED_VIEW" + + # Add MATERIALIZED property first for materialized views + if is_materialized_view: + properties.append(exp.MaterializedProperty()) + + table_properties_copy = dict(table_properties) if table_properties else {} + + # Helper functions for materialized view properties + def _to_upper_str(val: t.Any) -> str: + if isinstance(val, exp.Literal): + return str(val.this).upper() + if isinstance(val, exp.Identifier): + return str(val.this).upper() + if isinstance(val, str): + return val.upper() + if isinstance(val, exp.Expression): + return val.sql(dialect=self.dialect).upper() + return str(val).upper() + + def _to_var_upper(val: t.Any) -> exp.Var: + return exp.Var(this=_to_upper_str(val)) + + def _parse_refresh_tuple( + tpl: exp.Tuple, + ) -> t.Tuple[ + exp.Var, + t.Optional[str], + t.Optional[exp.Literal], + t.Optional[exp.Var], + t.Optional[exp.Literal], + ]: + method_var: exp.Var = exp.Var(this="AUTO") + kind_str: t.Optional[str] = None + every_lit: t.Optional[exp.Literal] = None + unit_var: t.Optional[exp.Var] = None + starts_lit: t.Optional[exp.Literal] = None + for e in tpl.expressions: + if ( + isinstance(e, exp.EQ) + and isinstance(e.this, exp.Column) + and isinstance(e.this.this, exp.Identifier) + ): + key = str(e.this.this.this).strip('"').lower() + val = e.expression + if key == "method": + method_var = _to_var_upper(val) + elif key == "kind": + kind_str = _to_upper_str(val) + elif key == "every": + if isinstance(val, exp.Literal): + # Keep numeric literal as-is so generator renders it + every_lit = val + elif isinstance(val, (int, float)): + every_lit = exp.Literal.number(int(val)) + elif key == "unit": + unit_var = _to_var_upper(val) + elif key == "starts": + if isinstance(val, exp.Literal): + starts_lit = exp.Literal.string(str(val.this)) + else: + starts_lit = exp.Literal.string(str(val)) + return method_var, kind_str, every_lit, unit_var, starts_lit + + def _parse_trigger_string( + trigger_text: str, + ) -> t.Tuple[ + t.Optional[str], + t.Optional[exp.Literal], + t.Optional[exp.Var], + t.Optional[exp.Literal], + ]: + text = trigger_text.strip() + if text.upper().startswith("ON "): + text = text[3:].strip() + kind = None + every_lit: t.Optional[exp.Literal] = None + unit_var = None + starts_lit = None + import re as _re + + m_kind = _re.match(r"^(MANUAL|COMMIT|SCHEDULE)\b", text, flags=_re.IGNORECASE) + if m_kind: + kind = m_kind.group(1).upper() + m_every = _re.search(r"\bEVERY\s+(\d+)\s+(\w+)", text, flags=_re.IGNORECASE) + if m_every: + every_lit = exp.Literal.number(int(m_every.group(1))) + unit_var = exp.Var(this=m_every.group(2).upper()) + m_starts = _re.search(r"\bSTARTS\s+'([^']+)'", text, flags=_re.IGNORECASE) + if m_starts: + starts_lit = exp.Literal.string(m_starts.group(1)) + return kind, every_lit, unit_var, starts_lit + + # Handle materialized view specific properties first + if is_materialized_view: + # BUILD property + build_val = table_properties_copy.pop("build", None) + if build_val is not None: + properties.append(exp.BuildProperty(this=_to_var_upper(build_val))) + + # REFRESH + optional trigger combined into RefreshTriggerProperty + refresh_val = table_properties_copy.pop("refresh", None) + refresh_trigger_val = table_properties_copy.pop("refresh_trigger", None) + if refresh_val is not None or refresh_trigger_val is not None: + method_var: exp.Var = _to_var_upper(refresh_val or "AUTO") + kind_str: t.Optional[str] = None + every_lit: t.Optional[exp.Literal] = None + unit_var: t.Optional[exp.Var] = None + starts_lit: t.Optional[exp.Literal] = None + + if isinstance(refresh_val, exp.Tuple): + method_var, kind_str, every_lit, unit_var, starts_lit = _parse_refresh_tuple( + refresh_val + ) + else: + if refresh_trigger_val is not None: + trigger_text = ( + str(refresh_trigger_val.this) + if isinstance(refresh_trigger_val, exp.Literal) + else str(refresh_trigger_val) + ) + k, e, u, s = _parse_trigger_string(trigger_text) + kind_str, every_lit, unit_var, starts_lit = k, e, u, s + + properties.append( + exp.RefreshTriggerProperty( + method=method_var, + kind=kind_str, + every=every_lit, + unit=unit_var, + starts=starts_lit, + ) + ) + + # Handle unique_key - only handle Tuple expressions or single Column expressions + unique_key = table_properties_copy.pop("unique_key", None) + if unique_key is not None: + # For materialized views, KEY is rendered inline as UniqueKeyProperty, not skipped + if not is_materialized_view: + if isinstance(unique_key, exp.Tuple): + # Extract column names from Tuple expressions + column_names = [] + for expr in unique_key.expressions: + if ( + isinstance(expr, exp.Column) + and hasattr(expr, "this") + and hasattr(expr.this, "this") + ): + column_names.append(str(expr.this.this)) + elif hasattr(expr, "this"): + column_names.append(str(expr.this)) + else: + column_names.append(str(expr)) + properties.append( + exp.UniqueKeyProperty(expressions=[exp.to_column(k) for k in column_names]) + ) + elif isinstance(unique_key, exp.Column): + # Handle as single column + if hasattr(unique_key, "this") and hasattr(unique_key.this, "this"): + column_name = str(unique_key.this.this) + else: + column_name = str(unique_key.this) + properties.append( + exp.UniqueKeyProperty(expressions=[exp.to_column(column_name)]) + ) + elif isinstance(unique_key, exp.Literal): + properties.append( + exp.UniqueKeyProperty(expressions=[exp.to_column(unique_key.this)]) + ) + elif isinstance(unique_key, str): + properties.append( + exp.UniqueKeyProperty(expressions=[exp.to_column(unique_key)]) + ) + else: + # For materialized views, also add UniqueKeyProperty + if isinstance(unique_key, exp.Tuple): + # Extract column names from Tuple expressions + column_names = [] + for expr in unique_key.expressions: + if ( + isinstance(expr, exp.Column) + and hasattr(expr, "this") + and hasattr(expr.this, "this") + ): + column_names.append(str(expr.this.this)) + elif hasattr(expr, "this"): + column_names.append(str(expr.this)) + else: + column_names.append(str(expr)) + properties.append( + exp.UniqueKeyProperty(expressions=[exp.to_column(k) for k in column_names]) + ) + elif isinstance(unique_key, exp.Column): + # Handle as single column + if hasattr(unique_key, "this") and hasattr(unique_key.this, "this"): + column_name = str(unique_key.this.this) + else: + column_name = str(unique_key.this) + properties.append( + exp.UniqueKeyProperty(expressions=[exp.to_column(column_name)]) + ) + elif isinstance(unique_key, exp.Literal): + properties.append( + exp.UniqueKeyProperty(expressions=[exp.to_column(unique_key.this)]) + ) + elif isinstance(unique_key, str): + properties.append( + exp.UniqueKeyProperty(expressions=[exp.to_column(unique_key)]) + ) + + # Handle duplicate_key - only handle Tuple expressions or single Column expressions + # Both tables and materialized views support duplicate keys in Doris + duplicate_key = table_properties_copy.pop("duplicate_key", None) + if duplicate_key is not None: + if isinstance(duplicate_key, exp.Tuple): + # Extract column names from Tuple expressions + column_names = [] + for expr in duplicate_key.expressions: + if ( + isinstance(expr, exp.Column) + and hasattr(expr, "this") + and hasattr(expr.this, "this") + ): + column_names.append(str(expr.this.this)) + elif hasattr(expr, "this"): + column_names.append(str(expr.this)) + else: + column_names.append(str(expr)) + properties.append( + exp.DuplicateKeyProperty(expressions=[exp.to_column(k) for k in column_names]) + ) + elif isinstance(duplicate_key, exp.Column): + # Handle as single column + if hasattr(duplicate_key, "this") and hasattr(duplicate_key.this, "this"): + column_name = str(duplicate_key.this.this) + else: + column_name = str(duplicate_key.this) + properties.append( + exp.DuplicateKeyProperty(expressions=[exp.to_column(column_name)]) + ) + elif isinstance(duplicate_key, exp.Literal): + properties.append( + exp.DuplicateKeyProperty(expressions=[exp.to_column(duplicate_key.this)]) + ) + elif isinstance(duplicate_key, str): + properties.append( + exp.DuplicateKeyProperty(expressions=[exp.to_column(duplicate_key)]) + ) + + if table_description: + properties.append( + exp.SchemaCommentProperty( + this=exp.Literal.string(self._truncate_table_comment(table_description)) + ) + ) + + # Handle partitioning + add_partition = True + if partitioned_by: + # Parse and normalize partition expressions + partitioned_by, _ = self._parse_partition_expressions(partitioned_by) + # For tables, check if partitioned_by columns are in unique_key; for materialized views, allow regardless + if unique_key is not None and not is_materialized_view: + # Extract key column names from unique_key (only Tuple or Column expressions) + key_cols_set = set() + if isinstance(unique_key, exp.Tuple): + for expr in unique_key.expressions: + if ( + isinstance(expr, exp.Column) + and hasattr(expr, "this") + and hasattr(expr.this, "this") + ): + key_cols_set.add(str(expr.this.this)) + elif hasattr(expr, "this"): + key_cols_set.add(str(expr.this)) + else: + key_cols_set.add(str(expr)) + elif isinstance(unique_key, exp.Column): + if hasattr(unique_key, "this") and hasattr(unique_key.this, "this"): + key_cols_set.add(str(unique_key.this.this)) + else: + key_cols_set.add(str(unique_key.this)) + + partition_cols = set() + for expr in partitioned_by: + if hasattr(expr, "name"): + partition_cols.add(str(expr.name)) + elif hasattr(expr, "this"): + partition_cols.add(str(expr.this)) + else: + partition_cols.add(str(expr)) + not_in_key = partition_cols - key_cols_set + if not_in_key: + logger.warning( + f"[Doris] UNIQUE KEY model: partitioned_by columns {not_in_key} not in key_cols {key_cols_set}, skip PARTITION BY." + ) + add_partition = False + if add_partition: + partitions = table_properties_copy.pop("partitions", None) + + # If partitioned_by is provided but partitions is not, add dynamic partition properties + # Skip dynamic partitions for materialized views as they use different partitioning + if partitioned_by and not partitions and not is_materialized_view: + # Define the required dynamic partition properties + dynamic_partition_props = { + "dynamic_partition.enable": "true", + "dynamic_partition.time_unit": "DAY", + "dynamic_partition.start": "-490", + "dynamic_partition.end": "10", + "dynamic_partition.prefix": "p", + "dynamic_partition.buckets": "32", + "dynamic_partition.create_history_partition": "true", + } + + # Use partition_interval_unit if provided to set the time_unit + if partition_interval_unit: + if hasattr(partition_interval_unit, "value"): + time_unit = partition_interval_unit.value.upper() + else: + time_unit = str(partition_interval_unit).upper() + dynamic_partition_props["dynamic_partition.time_unit"] = time_unit + + # Add missing dynamic partition properties to table_properties_copy + for key, value in dynamic_partition_props.items(): + if key not in table_properties_copy: + table_properties_copy[key] = value + + # Build partition expression - different for materialized views vs tables + if is_materialized_view: + # For materialized views, use PartitionedByProperty + if partitioned_by: + properties.append( + exp.PartitionedByProperty( + this=exp.Schema(expressions=partitioned_by), + ) + ) + else: + # For tables, use the existing logic with RANGE/LIST partitioning + partition_expr = self._build_partitioned_by_exp( + partitioned_by, + partition_interval_unit=partition_interval_unit, + target_columns_to_types=target_columns_to_types, + catalog_name=catalog_name, + partitions=partitions, + ) + if partition_expr: + properties.append(partition_expr) + + # Handle distributed_by property - parse Tuple with EQ expressions or Paren with single EQ + distributed_by = table_properties_copy.pop("distributed_by", None) + if distributed_by is not None: + distributed_info = {} + + if isinstance(distributed_by, exp.Tuple): + # Parse the Tuple with EQ expressions to extract distributed_by info + for expr in distributed_by.expressions: + if isinstance(expr, exp.EQ) and hasattr(expr.this, "this"): + # Remove quotes from the key if present + key = str(expr.this.this).strip('"') + if isinstance(expr.expression, exp.Literal): + distributed_info[key] = expr.expression.this + elif isinstance(expr.expression, exp.Array): + # Handle expressions array + distributed_info[key] = [ + str(e.this) + for e in expr.expression.expressions + if hasattr(e, "this") + ] + elif isinstance(expr.expression, exp.Tuple): + # Handle expressions tuple (array of strings) + distributed_info[key] = [ + str(e.this) + for e in expr.expression.expressions + if hasattr(e, "this") + ] + else: + distributed_info[key] = str(expr.expression) + elif isinstance(distributed_by, exp.Paren) and isinstance(distributed_by.this, exp.EQ): + # Handle single key-value pair in parentheses (e.g., (kind='RANDOM')) + expr = distributed_by.this + if hasattr(expr.this, "this"): + # Remove quotes from the key if present + key = str(expr.this.this).strip('"') + if isinstance(expr.expression, exp.Literal): + distributed_info[key] = expr.expression.this + else: + distributed_info[key] = str(expr.expression) + elif isinstance(distributed_by, dict): + # Handle as dictionary (legacy format) + distributed_info = distributed_by + + # Create DistributedByProperty from parsed info + if distributed_info: + kind = distributed_info.get("kind") + expressions = distributed_info.get("expressions") + buckets = distributed_info.get("buckets") + + if kind: + # Handle buckets - convert string to int if it's a numeric string + buckets_expr: t.Optional[exp.Expression] = None + if isinstance(buckets, int): + buckets_expr = exp.Literal.number(buckets) + elif isinstance(buckets, str): + if buckets == "AUTO": + buckets_expr = exp.Var(this="AUTO") + elif buckets.isdigit(): + buckets_expr = exp.Literal.number(int(buckets)) + + # Handle expressions - convert single string to list if needed + expressions_list = None + if expressions: + if isinstance(expressions, str): + expressions_list = [exp.to_column(expressions)] + elif isinstance(expressions, list): + expressions_list = [exp.to_column(e) for e in expressions] + else: + expressions_list = [exp.to_column(str(expressions))] + + prop = exp.DistributedByProperty( + kind=exp.Var(this=kind), + expressions=expressions_list, + buckets=buckets_expr, + order=None, + ) + properties.append(prop) + else: + unique_key_property = next( + (prop for prop in properties if isinstance(prop, exp.UniqueKeyProperty)), None + ) + if unique_key_property: + # Use the first column from unique_key as the distribution key + if unique_key_property.expressions: + first_col = unique_key_property.expressions[0] + column_name = ( + str(first_col.this) if hasattr(first_col, "this") else str(first_col) + ) + logger.info( + f"[Doris] Adding default distributed_by using unique_key column: {column_name}" + ) + properties.append( + exp.DistributedByProperty( + expressions=[exp.to_column(column_name)], + kind="HASH", + buckets=exp.Literal.number(10), + ) + ) + + # Only add generic properties if there are any left + if table_properties_copy: + properties.extend(self._properties_to_expressions(table_properties_copy)) + + if properties: + return exp.Properties(expressions=properties) + return None + + def _get_temp_table_name(self, table: TableName) -> str: + table_obj = exp.to_table(table) + return f"temp_{table_obj.name}_{random_id(short=True)}" + + def _properties_to_expressions(self, properties: t.Dict[str, t.Any]) -> t.List[exp.Expression]: + """Convert a dictionary of properties to a list of exp.Property expressions.""" + expressions: t.List[exp.Expression] = [] + for key, value in properties.items(): + if key in { + "build", + "refresh", + "refresh_trigger", + "distributed_by", + "partitioned_by", + "partitions", + "unique_key", + "duplicate_key", + "properties", + "materialized_type", + "source_table", + }: + continue + if not isinstance(value, exp.Expression): + value = exp.Literal.string(str(value)) + expressions.append(exp.Property(this=exp.Literal.string(str(key)), value=value)) + return expressions diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 961062fe45..f8474c8493 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -1753,6 +1753,28 @@ def demote(self, view_name: str, **kwargs: t.Any) -> None: self.adapter.drop_view(view_name, cascade=False) +def _add_unique_key_to_physical_properties_for_doris( + model: Model, physical_properties: t.Dict[str, t.Any] +) -> t.Dict[str, t.Any]: + """ + For Doris dialect with INCREMENTAL_BY_UNIQUE_KEY models, ensure unique_key is added + to physical properties if not already present. + """ + if ( + model.dialect == "doris" + and model.kind.is_incremental_by_unique_key + and model.unique_key + and "unique_key" not in physical_properties + ): + physical_properties = dict(physical_properties) + physical_properties["unique_key"] = ( + model.unique_key[0] + if len(model.unique_key) == 1 + else exp.Tuple(expressions=model.unique_key) + ) + return physical_properties + + class MaterializableStrategy(PromotableStrategy, abc.ABC): def create( self, @@ -1764,6 +1786,9 @@ def create( ) -> None: ctas_query = model.ctas_query(**render_kwargs) physical_properties = kwargs.get("physical_properties", model.physical_properties) + physical_properties = _add_unique_key_to_physical_properties_for_doris( + model, physical_properties + ) logger.info("Creating table '%s'", table_name) if model.annotated: @@ -1861,6 +1886,10 @@ def _replace_query_for_model( except Exception: columns_to_types, source_columns = None, None + physical_properties = kwargs.get("physical_properties", model.physical_properties) + physical_properties = _add_unique_key_to_physical_properties_for_doris( + model, physical_properties + ) self.adapter.replace_query( name, query_or_df, @@ -1869,7 +1898,7 @@ def _replace_query_for_model( partitioned_by=model.partitioned_by, partition_interval_unit=model.partition_interval_unit, clustered_by=model.clustered_by, - table_properties=kwargs.get("physical_properties", model.physical_properties), + table_properties=physical_properties, table_description=model.description, column_descriptions=model.column_descriptions, target_columns_to_types=columns_to_types, @@ -1992,6 +2021,10 @@ def insert( table_name, render_kwargs=render_kwargs, ) + physical_properties = kwargs.get("physical_properties", model.physical_properties) + physical_properties = _add_unique_key_to_physical_properties_for_doris( + model, physical_properties + ) self.adapter.merge( table_name, query_or_df, @@ -2003,7 +2036,7 @@ def insert( end=kwargs.get("end"), execution_time=kwargs.get("execution_time"), ), - physical_properties=kwargs.get("physical_properties", model.physical_properties), + physical_properties=physical_properties, source_columns=source_columns, ) @@ -2560,13 +2593,17 @@ def create( if is_table_deployable and is_snapshot_deployable: # We could deploy this to prod; create a proper managed table logger.info("Creating managed table: %s", table_name) + physical_properties = kwargs.get("physical_properties", model.physical_properties) + physical_properties = _add_unique_key_to_physical_properties_for_doris( + model, physical_properties + ) self.adapter.create_managed_table( table_name=table_name, query=model.render_query_or_raise(**render_kwargs), target_columns_to_types=model.columns_to_types, partitioned_by=model.partitioned_by, clustered_by=model.clustered_by, - table_properties=kwargs.get("physical_properties", model.physical_properties), + table_properties=physical_properties, table_description=model.description, column_descriptions=model.column_descriptions, table_format=model.table_format, diff --git a/tests/core/engine_adapter/integration/__init__.py b/tests/core/engine_adapter/integration/__init__.py index c5377e309a..44d7626e4a 100644 --- a/tests/core/engine_adapter/integration/__init__.py +++ b/tests/core/engine_adapter/integration/__init__.py @@ -75,6 +75,7 @@ def pytest_marks(self) -> t.List[MarkDecorator]: IntegrationTestEngine("spark", native_dataframe_type="pyspark"), IntegrationTestEngine("clickhouse", catalog_types=["standalone", "cluster"]), IntegrationTestEngine("risingwave"), + IntegrationTestEngine("doris"), # Cloud engines that need paid accounts / special credentials IntegrationTestEngine("clickhouse_cloud", cloud=True), IntegrationTestEngine("redshift", cloud=True), @@ -446,7 +447,7 @@ def get_table_comment( AND pgc.relkind = '{"v" if table_kind == "VIEW" else "r"}' ; """ - elif self.dialect in ["mysql", "snowflake"]: + elif self.dialect in ["mysql", "snowflake", "doris"]: # Snowflake treats all identifiers as uppercase unless they are lowercase and quoted. # They are lowercase and quoted in sushi but not in the inline tests. if self.dialect == "snowflake" and snowflake_capitalize_ids: @@ -456,6 +457,7 @@ def get_table_comment( comment_field_name = { "mysql": "table_comment", "snowflake": "comment", + "doris": "table_comment", } query = f""" @@ -561,7 +563,7 @@ def get_column_comments( AND pgc.relkind = '{"v" if table_kind == "VIEW" else "r"}' ; """ - elif self.dialect in ["mysql", "snowflake", "trino"]: + elif self.dialect in ["mysql", "snowflake", "trino", "doris"]: # Snowflake treats all identifiers as uppercase unless they are lowercase and quoted. # They are lowercase and quoted in sushi but not in the inline tests. if self.dialect == "snowflake" and snowflake_capitalize_ids: @@ -572,6 +574,7 @@ def get_column_comments( "mysql": "column_comment", "snowflake": "comment", "trino": "comment", + "doris": "column_comment", } query = f""" diff --git a/tests/core/engine_adapter/integration/config.yaml b/tests/core/engine_adapter/integration/config.yaml index 8e87b2c3c8..eba739c894 100644 --- a/tests/core/engine_adapter/integration/config.yaml +++ b/tests/core/engine_adapter/integration/config.yaml @@ -118,6 +118,16 @@ gateways: host: {{ env_var('DOCKER_HOSTNAME', 'localhost') }} port: 4566 check_import: false + inttest_doris: + connection: + type: doris + host: {{ env_var('DOCKER_HOSTNAME', 'localhost') }} + port: 9030 + user: root + password: "" + check_import: false + state_connection: + type: duckdb # Cloud databases diff --git a/tests/core/engine_adapter/integration/docker/compose.doris.yaml b/tests/core/engine_adapter/integration/docker/compose.doris.yaml new file mode 100644 index 0000000000..90c1f11280 --- /dev/null +++ b/tests/core/engine_adapter/integration/docker/compose.doris.yaml @@ -0,0 +1,80 @@ +services: + fe-01: + image: apache/doris:fe-2.1.10 + container_name: doris-fe-01 + hostname: fe-01 + environment: + - FE_SERVERS=fe1:172.20.80.2:9030 + - FE_ID=1 + ports: + - "8030:8030" + - "9030:9030" + volumes: + - ./fe-meta:/opt/apache-doris/fe/doris-meta + - ./fe-log:/opt/apache-doris/fe/log + networks: + doris_net: + ipv4_address: 172.20.80.2 + + be-01: + image: apache/doris:be-2.1.10 + container_name: doris-be-01 + hostname: be-01 + depends_on: + - fe-01 + environment: + - FE_SERVERS=fe1:172.20.80.2:9030 + - BE_ADDR=172.20.80.3:9050 + ports: + - "8040:8040" + - "9050:9050" + volumes: + - ./be01-storage:/opt/apache-doris/be/storage + - ./be01-log:/opt/apache-doris/be/log + networks: + doris_net: + ipv4_address: 172.20.80.3 + + be-02: + image: apache/doris:be-2.1.10 + container_name: doris-be-02 + hostname: be-02 + depends_on: + - fe-01 + environment: + - FE_SERVERS=fe1:172.20.80.2:9030 + - BE_ADDR=172.20.80.4:9050 + ports: + - "8041:8040" + - "9051:9050" + volumes: + - ./be02-storage:/opt/apache-doris/be/storage + - ./be02-log:/opt/apache-doris/be/log + networks: + doris_net: + ipv4_address: 172.20.80.4 + + be-03: + image: apache/doris:be-2.1.10 + container_name: doris-be-03 + hostname: be-03 + depends_on: + - fe-01 + environment: + - FE_SERVERS=fe1:172.20.80.2:9030 + - BE_ADDR=172.20.80.5:9050 + ports: + - "8042:8040" + - "9052:9050" + volumes: + - ./be03-storage:/opt/apache-doris/be/storage + - ./be03-log:/opt/apache-doris/be/log + networks: + doris_net: + ipv4_address: 172.20.80.5 + +networks: + doris_net: + ipam: + config: + - subnet: 172.20.80.0/24 \ No newline at end of file diff --git a/tests/core/engine_adapter/integration/test_integration.py b/tests/core/engine_adapter/integration/test_integration.py index 42ff8b881f..41e2d16246 100644 --- a/tests/core/engine_adapter/integration/test_integration.py +++ b/tests/core/engine_adapter/integration/test_integration.py @@ -15,6 +15,7 @@ import pandas as pd # noqa: TID253 import pytest import pytz +from tenacity import Retrying, stop_after_delay, wait_fixed, retry_if_exception_type import time_machine from sqlglot import exp, parse_one from sqlglot.optimizer.normalize_identifiers import normalize_identifiers @@ -419,8 +420,22 @@ def test_materialized_view(ctx_query_and_df: TestContext): source_table = ctx.table("source_table") ctx.engine_adapter.ctas(source_table, ctx.input_data(input_data), ctx.columns_to_types) view = ctx.table("test_view") - view_query = exp.select(*ctx.columns_to_types).from_(source_table) - ctx.engine_adapter.create_view(view, view_query, materialized=True) + # For Doris synchronized materialized views, we need to use only the table name without schema and add the source table name to the table properties + if ctx.engine_adapter.dialect == "doris": + view = exp.to_table(view.name) + # For Doris, select only the first column name from columns_to_types. + # Otherwise, Doris will raise an error if the materialized view has the same schema as the base table. + first_column = next(iter(ctx.columns_to_types)) + view_query = exp.select(first_column).from_(source_table) + ctx.engine_adapter.create_view( + view, + view_query, + materialized=True, + view_properties={"materialized_type": "SYNC", "source_table": source_table}, + ) + else: + view_query = exp.select(*ctx.columns_to_types).from_(source_table) + ctx.engine_adapter.create_view(view, view_query, materialized=True) results = ctx.get_metadata_results() # Redshift considers the underlying dataset supporting materialized views as a table therefore we get 2 # tables in the result @@ -429,13 +444,30 @@ def test_materialized_view(ctx_query_and_df: TestContext): else: assert len(results.tables) == 1 assert len(results.views) == 0 - assert len(results.materialized_views) == 1 - assert results.materialized_views[0] == view.name - ctx.compare_with_current(view, input_data) + # Doris information_schema.tables does not show materialized views and cannot query the synchronized materialized view + if not ctx.engine_adapter.dialect == "doris": + assert len(results.materialized_views) == 1 + assert results.materialized_views[0] == view.name + ctx.compare_with_current(view, input_data) # Make sure that dropping a materialized view also works - ctx.engine_adapter.drop_view(view, materialized=True) - results = ctx.get_metadata_results() - assert len(results.materialized_views) == 0 + if ctx.engine_adapter.dialect == "doris": + # Wait for the materialized view to be created by retrying drop until it succeeds + for attempt in Retrying( + stop=stop_after_delay(5), + wait=wait_fixed(1), + retry=retry_if_exception_type(Exception), + reraise=True, + ): + with attempt: + ctx.engine_adapter.drop_view( + view, + materialized=True, + view_properties={"materialized_type": "SYNC", "source_table": source_table}, + ) + else: + ctx.engine_adapter.drop_view(view, materialized=True) + results = ctx.get_metadata_results() + assert len(results.materialized_views) == 0 def test_drop_schema(ctx: TestContext): @@ -773,6 +805,25 @@ def test_insert_overwrite_by_time_partition(ctx_query_and_df: TestContext): ds_type = "datetime" if ctx.dialect == "tsql": ds_type = "varchar(max)" + if ctx.dialect == "doris": + ds_type = "date" + + # Get current year and create dates for testing. Doris cannot have more than 500 history partitions. + current_year = datetime.now().year + current_date = datetime(current_year, 1, 1) + if ctx.dialect == "doris": + # For Doris with DATE type, use pandas date objects + date_1 = current_date.date() + date_2 = (current_date + timedelta(days=1)).date() + date_3 = (current_date + timedelta(days=2)).date() + date_4 = (current_date + timedelta(days=3)).date() + date_5 = (current_date + timedelta(days=4)).date() + else: + date_1 = current_date.strftime("%Y-%m-%d") + date_2 = (current_date + timedelta(days=1)).strftime("%Y-%m-%d") + date_3 = (current_date + timedelta(days=2)).strftime("%Y-%m-%d") + date_4 = (current_date + timedelta(days=3)).strftime("%Y-%m-%d") + date_5 = (current_date + timedelta(days=4)).strftime("%Y-%m-%d") ctx.columns_to_types = {"id": "int", "ds": ds_type} table = ctx.table("test_table") @@ -789,16 +840,18 @@ def test_insert_overwrite_by_time_partition(ctx_query_and_df: TestContext): ) input_data = pd.DataFrame( [ - {"id": 1, ctx.time_column: "2022-01-01"}, - {"id": 2, ctx.time_column: "2022-01-02"}, - {"id": 3, ctx.time_column: "2022-01-03"}, + {"id": 1, ctx.time_column: date_1}, + {"id": 2, ctx.time_column: date_2}, + {"id": 3, ctx.time_column: date_3}, ] ) + if ctx.dialect == "doris": + ctx.engine_adapter.execute("SET enable_insert_strict = false;") ctx.engine_adapter.insert_overwrite_by_time_partition( table, ctx.input_data(input_data), - start="2022-01-02", - end="2022-01-03", + start=date_2, + end=date_3, time_formatter=ctx.time_formatter, time_column=ctx.time_column, target_columns_to_types=ctx.columns_to_types, @@ -818,16 +871,16 @@ def test_insert_overwrite_by_time_partition(ctx_query_and_df: TestContext): if ctx.test_type == "df": overwrite_data = pd.DataFrame( [ - {"id": 10, ctx.time_column: "2022-01-03"}, - {"id": 4, ctx.time_column: "2022-01-04"}, - {"id": 5, ctx.time_column: "2022-01-05"}, + {"id": 10, ctx.time_column: date_3}, + {"id": 4, ctx.time_column: date_4}, + {"id": 5, ctx.time_column: date_5}, ] ) ctx.engine_adapter.insert_overwrite_by_time_partition( table, ctx.input_data(overwrite_data), - start="2022-01-03", - end="2022-01-05", + start=date_3, + end=date_5, time_formatter=ctx.time_formatter, time_column=ctx.time_column, target_columns_to_types=ctx.columns_to_types, @@ -845,10 +898,10 @@ def test_insert_overwrite_by_time_partition(ctx_query_and_df: TestContext): table, pd.DataFrame( [ - {"id": 2, ctx.time_column: "2022-01-02"}, - {"id": 10, ctx.time_column: "2022-01-03"}, - {"id": 4, ctx.time_column: "2022-01-04"}, - {"id": 5, ctx.time_column: "2022-01-05"}, + {"id": 2, ctx.time_column: date_2}, + {"id": 10, ctx.time_column: date_3}, + {"id": 4, ctx.time_column: date_4}, + {"id": 5, ctx.time_column: date_5}, ] ), ) @@ -861,6 +914,25 @@ def test_insert_overwrite_by_time_partition_source_columns(ctx_query_and_df: Tes ds_type = "datetime" if ctx.dialect == "tsql": ds_type = "varchar(max)" + if ctx.dialect == "doris": + ds_type = "date" + + # Get current year and create dates for testing. Doris cannot have more than 500 history partitions. + current_year = datetime.now().year + current_date = datetime(current_year, 1, 1) + if ctx.dialect == "doris": + # For Doris with DATE type, use pandas date objects + date_1 = current_date.date() + date_2 = (current_date + timedelta(days=1)).date() + date_3 = (current_date + timedelta(days=2)).date() + date_4 = (current_date + timedelta(days=3)).date() + date_5 = (current_date + timedelta(days=4)).date() + else: + date_1 = current_date.strftime("%Y-%m-%d") + date_2 = (current_date + timedelta(days=1)).strftime("%Y-%m-%d") + date_3 = (current_date + timedelta(days=2)).strftime("%Y-%m-%d") + date_4 = (current_date + timedelta(days=3)).strftime("%Y-%m-%d") + date_5 = (current_date + timedelta(days=4)).strftime("%Y-%m-%d") ctx.columns_to_types = {"id": "int", "ds": ds_type} columns_to_types = { @@ -882,16 +954,18 @@ def test_insert_overwrite_by_time_partition_source_columns(ctx_query_and_df: Tes ) input_data = pd.DataFrame( [ - {"id": 1, ctx.time_column: "2022-01-01", "ignored_source": "ignored_value"}, - {"id": 2, ctx.time_column: "2022-01-02", "ignored_source": "ignored_value"}, - {"id": 3, ctx.time_column: "2022-01-03", "ignored_source": "ignored_value"}, + {"id": 1, ctx.time_column: date_1, "ignored_source": "ignored_value"}, + {"id": 2, ctx.time_column: date_2, "ignored_source": "ignored_value"}, + {"id": 3, ctx.time_column: date_3, "ignored_source": "ignored_value"}, ] ) + if ctx.dialect == "doris": + ctx.engine_adapter.execute("SET enable_insert_strict = false;") ctx.engine_adapter.insert_overwrite_by_time_partition( table, ctx.input_data(input_data), - start="2022-01-02", - end="2022-01-03", + start=date_2, + end=date_3, time_formatter=ctx.time_formatter, time_column=ctx.time_column, target_columns_to_types=columns_to_types, @@ -917,16 +991,16 @@ def test_insert_overwrite_by_time_partition_source_columns(ctx_query_and_df: Tes if ctx.test_type == "df": overwrite_data = pd.DataFrame( [ - {"id": 10, ctx.time_column: "2022-01-03", "ignored_source": "ignored_value"}, - {"id": 4, ctx.time_column: "2022-01-04", "ignored_source": "ignored_value"}, - {"id": 5, ctx.time_column: "2022-01-05", "ignored_source": "ignored_value"}, + {"id": 10, ctx.time_column: date_3, "ignored_source": "ignored_value"}, + {"id": 4, ctx.time_column: date_4, "ignored_source": "ignored_value"}, + {"id": 5, ctx.time_column: date_5, "ignored_source": "ignored_value"}, ] ) ctx.engine_adapter.insert_overwrite_by_time_partition( table, ctx.input_data(overwrite_data), - start="2022-01-03", - end="2022-01-05", + start=date_3, + end=date_5, time_formatter=ctx.time_formatter, time_column=ctx.time_column, target_columns_to_types=columns_to_types, @@ -945,10 +1019,10 @@ def test_insert_overwrite_by_time_partition_source_columns(ctx_query_and_df: Tes table, pd.DataFrame( [ - {"id": 2, "ignored_column": None, ctx.time_column: "2022-01-02"}, - {"id": 10, "ignored_column": None, ctx.time_column: "2022-01-03"}, - {"id": 4, "ignored_column": None, ctx.time_column: "2022-01-04"}, - {"id": 5, "ignored_column": None, ctx.time_column: "2022-01-05"}, + {"id": 2, "ignored_column": None, ctx.time_column: date_2}, + {"id": 10, "ignored_column": None, ctx.time_column: date_3}, + {"id": 4, "ignored_column": None, ctx.time_column: date_4}, + {"id": 5, "ignored_column": None, ctx.time_column: date_5}, ] ), ) @@ -964,8 +1038,12 @@ def test_merge(ctx_query_and_df: TestContext): # Athena only supports MERGE on Iceberg tables # And it cant fall back to a logical merge on Hive tables because it cant delete records table_format = "iceberg" if ctx.dialect == "athena" else None + # Doris needs to have a UNIQUE KEY to use MERGE + table_properties = {"unique_key": "id"} if ctx.dialect == "doris" else None - ctx.engine_adapter.create_table(table, ctx.columns_to_types, table_format=table_format) + ctx.engine_adapter.create_table( + table, ctx.columns_to_types, table_format=table_format, table_properties=table_properties + ) input_data = pd.DataFrame( [ {"id": 1, "ds": "2022-01-01"}, @@ -1030,11 +1108,15 @@ def test_merge_source_columns(ctx_query_and_df: TestContext): # Athena only supports MERGE on Iceberg tables # And it cant fall back to a logical merge on Hive tables because it cant delete records table_format = "iceberg" if ctx.dialect == "athena" else None + # Doris needs to have a UNIQUE KEY to use MERGE + table_properties = {"unique_key": "id"} if ctx.dialect == "doris" else None columns_to_types = ctx.columns_to_types.copy() columns_to_types["ignored_column"] = exp.DataType.build("int") - ctx.engine_adapter.create_table(table, columns_to_types, table_format=table_format) + ctx.engine_adapter.create_table( + table, columns_to_types, table_format=table_format, table_properties=table_properties + ) input_data = pd.DataFrame( [ {"id": 1, "ds": "2022-01-01", "ignored_source": "ignored_value"}, @@ -2569,6 +2651,7 @@ def test_dialects(ctx: TestContext): "mysql": pd.Timestamp("2020-01-01 00:00:00"), "spark": pd.Timestamp("2020-01-01 00:00:00"), "databricks": pd.Timestamp("2020-01-01 00:00:00"), + "doris": pd.Timestamp("2020-01-01 00:00:00"), }, ), ( @@ -2666,6 +2749,7 @@ def _mutate_config(current_gateway_name: str, config: Config): unique_key item_id, batch_size 1 ), + dialect {ctx.dialect}, {table_format} start '2020-01-01', end '2020-01-07', @@ -3769,6 +3853,10 @@ def test_materialized_view_evaluation(ctx: TestContext, mocker: MockerFixture): pytest.skip(f"Skipping engine {dialect} as it does not support materialized views") elif dialect in ("snowflake", "databricks"): pytest.skip(f"Skipping {dialect} as they're not enabled on standard accounts") + elif dialect == "doris": + pytest.skip( + f"Skipping doris as synchronous materialized views do not support specifying schema" + ) model_name = ctx.table("test_tbl") mview_name = ctx.table("test_mview") diff --git a/tests/core/engine_adapter/integration/test_integration_doris.py b/tests/core/engine_adapter/integration/test_integration_doris.py new file mode 100644 index 0000000000..a7faebdbbc --- /dev/null +++ b/tests/core/engine_adapter/integration/test_integration_doris.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +import typing as t + +import pytest +from pytest import FixtureRequest + +from sqlmesh.core.engine_adapter import DorisEngineAdapter +from tests.core.engine_adapter.integration import TestContext +from tests.core.engine_adapter.integration import ( + generate_pytest_params, + ENGINES_BY_NAME, + IntegrationTestEngine, +) + + +@pytest.fixture(params=list(generate_pytest_params(ENGINES_BY_NAME["doris"]))) +def ctx( + request: FixtureRequest, + create_test_context: t.Callable[[IntegrationTestEngine, str, str], t.Iterable[TestContext]], +) -> t.Iterable[TestContext]: + yield from create_test_context(*request.param) + + +@pytest.fixture +def engine_adapter(ctx: TestContext) -> DorisEngineAdapter: + assert isinstance(ctx.engine_adapter, DorisEngineAdapter) + return ctx.engine_adapter + + +def test_engine_adapter(ctx: TestContext): + assert isinstance(ctx.engine_adapter, DorisEngineAdapter) + assert ctx.engine_adapter.fetchone("select 1") == (1,) diff --git a/tests/core/engine_adapter/test_doris.py b/tests/core/engine_adapter/test_doris.py new file mode 100644 index 0000000000..d9e3ebb799 --- /dev/null +++ b/tests/core/engine_adapter/test_doris.py @@ -0,0 +1,664 @@ +import typing as t + +import pytest +from sqlglot import expressions as exp +from sqlglot import parse_one + +from tests.core.engine_adapter import to_sql_calls + +from sqlmesh.core.engine_adapter.doris import DorisEngineAdapter +from sqlmesh.utils.errors import UnsupportedCatalogOperationError + +from pytest_mock.plugin import MockerFixture + +pytestmark = [pytest.mark.doris, pytest.mark.engine] + + +def test_create_view(make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter]): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + adapter.create_view("test_view", parse_one("SELECT a FROM tbl")) + adapter.create_view("test_view", parse_one("SELECT a FROM tbl"), replace=False) + # check view properties + adapter.create_view( + "test_view", + parse_one("SELECT a FROM tbl"), + replace=False, + view_properties={"a": exp.convert(1)}, + ) + + assert to_sql_calls(adapter) == [ + "DROP VIEW IF EXISTS `test_view`", + "CREATE VIEW `test_view` AS SELECT `a` FROM `tbl`", + "CREATE VIEW `test_view` AS SELECT `a` FROM `tbl`", + "CREATE VIEW `test_view` AS SELECT `a` FROM `tbl`", + ] + + +def test_create_view_with_comment(make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter]): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + adapter.create_view( + "test_view", + parse_one("SELECT a FROM tbl"), + replace=False, + target_columns_to_types={"a": exp.DataType.build("INT")}, + table_description="test_description", + column_descriptions={"a": "test_column_description"}, + ) + + assert to_sql_calls(adapter) == [ + "CREATE VIEW `test_view` (`a` COMMENT 'test_column_description') COMMENT 'test_description' AS SELECT `a` FROM `tbl`", + ] + + +def test_create_materialized_view(make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter]): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + adapter.create_view( + "test_view", + parse_one("SELECT a FROM tbl"), + materialized=True, + target_columns_to_types={"a": exp.DataType.build("INT")}, + ) + adapter.create_view( + "test_view", + parse_one("SELECT a FROM tbl"), + replace=False, + materialized=True, + target_columns_to_types={"a": exp.DataType.build("INT")}, + ) + + assert to_sql_calls(adapter) == [ + "DROP MATERIALIZED VIEW IF EXISTS `test_view`", + "CREATE MATERIALIZED VIEW `test_view` (`a`) AS SELECT `a` FROM `tbl`", + "CREATE MATERIALIZED VIEW `test_view` (`a`) AS SELECT `a` FROM `tbl`", + ] + + adapter.cursor.reset_mock() + adapter.create_view( + "test_view", + parse_one("SELECT a, b FROM tbl"), + replace=False, + materialized=True, + target_columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("INT")}, + column_descriptions={"a": "test_column_description", "b": "test_column_description"}, + ) + adapter.create_view( + "test_view", parse_one("SELECT a, b FROM tbl"), replace=False, materialized=True + ) + + assert to_sql_calls(adapter) == [ + "CREATE MATERIALIZED VIEW `test_view` (`a` COMMENT 'test_column_description', `b` COMMENT 'test_column_description') AS SELECT `a`, `b` FROM `tbl`", + "CREATE MATERIALIZED VIEW `test_view` AS SELECT `a`, `b` FROM `tbl`", + ] + + +def test_create_table(make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter]): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + adapter.create_table( + "test_table", + target_columns_to_types={"a": exp.DataType.build("INT")}, + column_descriptions={"a": "test_column_description"}, + table_properties={"unique_key": exp.Column(this=exp.Identifier(this="a", quoted=True))}, + ) + adapter.create_table( + "test_table", + target_columns_to_types={"a": exp.DataType.build("INT")}, + column_descriptions={"a": "test_column_description"}, + table_properties={"duplicate_key": exp.Column(this=exp.Identifier(this="a", quoted=True))}, + ) + adapter.create_table( + "test_table", + target_columns_to_types={"a": exp.DataType.build("INT")}, + table_description="test_description", + column_descriptions={"a": "test_column_description"}, + ) + assert to_sql_calls(adapter) == [ + "CREATE TABLE IF NOT EXISTS `test_table` (`a` INT COMMENT 'test_column_description') UNIQUE KEY (`a`) DISTRIBUTED BY HASH (`a`) BUCKETS 10", + "CREATE TABLE IF NOT EXISTS `test_table` (`a` INT COMMENT 'test_column_description') DUPLICATE KEY (`a`)", + "CREATE TABLE IF NOT EXISTS `test_table` (`a` INT COMMENT 'test_column_description') COMMENT 'test_description'", + ] + + adapter.cursor.reset_mock() + + +def test_create_table_like(make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter]): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + + adapter.create_table_like("target_table", "source_table") + + assert to_sql_calls(adapter) == [ + "CREATE TABLE IF NOT EXISTS `target_table` LIKE `source_table`", + ] + + +def test_create_schema(make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter]): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + adapter.create_schema("test_schema") + adapter.create_schema("test_schema", ignore_if_exists=False) + + assert to_sql_calls(adapter) == [ + "CREATE DATABASE IF NOT EXISTS `test_schema`", + "CREATE DATABASE `test_schema`", + ] + + with pytest.raises(UnsupportedCatalogOperationError): + adapter.create_schema("test_catalog.test_schema") + + +def test_drop_schema(make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter]): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + adapter.drop_schema("test_schema") + adapter.drop_schema("test_schema", ignore_if_not_exists=False) + + assert to_sql_calls(adapter) == [ + "DROP DATABASE IF EXISTS `test_schema`", + "DROP DATABASE `test_schema`", + ] + + +def test_rename_table(make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter]): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + + adapter.rename_table("old_table", "new_table") + adapter.cursor.execute.assert_called_once_with("ALTER TABLE `old_table` RENAME `new_table`") + + +def test_replace_by_key( + make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter], mocker: MockerFixture +): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + temp_table = parse_one("temp_table") + mocker.patch.object(adapter, "_get_temp_table", return_value=temp_table) + mocker.patch.object(adapter, "ctas") + mocker.patch.object(adapter, "delete_from") + mocker.patch.object(adapter, "drop_table") + + columns_to_types = {"a": exp.DataType.build("INT"), "b": exp.DataType.build("INT")} + source_query = parse_one("SELECT a, b FROM src") + target_table = "target_table" + + adapter._replace_by_key(target_table, source_query, columns_to_types, [exp.column("a")], True) + adapter._replace_by_key(target_table, source_query, columns_to_types, [exp.column("a")], False) + adapter._replace_by_key( + target_table, source_query, columns_to_types, [exp.column("a"), exp.column("b")], True + ) + adapter._replace_by_key( + target_table, source_query, columns_to_types, [exp.column("a"), exp.column("b")], False + ) + + assert to_sql_calls(adapter, identify=True) == [ + "INSERT INTO `target_table` (`a`, `b`) SELECT `a`, `b` FROM (SELECT `a` AS `a`, `b` AS `b`, ROW_NUMBER() OVER (PARTITION BY `a` ORDER BY `a`) AS _row_number FROM `temp_table`) AS _t WHERE _row_number = 1", + "INSERT INTO `target_table` (`a`, `b`) SELECT `a`, `b` FROM `temp_table`", + "INSERT INTO `target_table` (`a`, `b`) SELECT `a`, `b` FROM (SELECT `a` AS `a`, `b` AS `b`, ROW_NUMBER() OVER (PARTITION BY `a`, `b` ORDER BY `a`, `b`) AS _row_number FROM `temp_table`) AS _t WHERE _row_number = 1", + "INSERT INTO `target_table` (`a`, `b`) SELECT `a`, `b` FROM `temp_table`", + ] + + +def test_create_index(make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter]): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + + adapter.create_index("test_table", "test_index", ("cola",)) + adapter.cursor.execute.assert_called_once_with( + "CREATE INDEX IF NOT EXISTS `test_index` ON `test_table`(`cola`)" + ) + + +def test_create_table_with_distributed_by( + make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter], +): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + distributed_by = { + "expressions": ["a", "b"], + "kind": "HASH", + "buckets": 8, + } + adapter.create_table( + "test_table", + target_columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("INT")}, + table_properties={"distributed_by": distributed_by}, + ) + + assert to_sql_calls(adapter) == [ + "CREATE TABLE IF NOT EXISTS `test_table` (`a` INT, `b` INT) DISTRIBUTED BY HASH (`a`, `b`) BUCKETS 8", + ] + + adapter.cursor.execute.reset_mock() + + distributed_by = { + "expressions": None, + "kind": "RANDOM", + "buckets": None, + } + adapter.create_table( + "test_table", + target_columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("INT")}, + table_properties={"distributed_by": distributed_by}, + ) + + assert to_sql_calls(adapter) == [ + "CREATE TABLE IF NOT EXISTS `test_table` (`a` INT, `b` INT) DISTRIBUTED BY RANDOM", + ] + + adapter.cursor.execute.reset_mock() + + distributed_by = { + "expressions": ["a"], + "kind": "HASH", + "buckets": "AUTO", + } + adapter.create_table( + "test_table", + target_columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("INT")}, + table_properties={"distributed_by": distributed_by}, + ) + + assert to_sql_calls(adapter) == [ + "CREATE TABLE IF NOT EXISTS `test_table` (`a` INT, `b` INT) DISTRIBUTED BY HASH (`a`) BUCKETS AUTO", + ] + + +def test_create_table_with_properties( + make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter], +): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + adapter.create_table( + "test_table", + target_columns_to_types={"a": exp.DataType.build("INT")}, + table_properties={ + "refresh_interval": "86400", + }, + ) + + assert to_sql_calls(adapter) == [ + "CREATE TABLE IF NOT EXISTS `test_table` (`a` INT) PROPERTIES ('refresh_interval'='86400')", + ] + + +def test_create_table_with_partitioned_by( + make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter], +): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + adapter.create_table( + "test_table", + target_columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("DATE")}, + partitioned_by=[exp.Literal.string("RANGE(b)")], + table_properties={ + "partitions": exp.Literal.string( + "FROM ('2000-11-14') TO ('2021-11-14') INTERVAL 2 YEAR" + ) + }, + ) + + assert to_sql_calls(adapter) == [ + "CREATE TABLE IF NOT EXISTS `test_table` (`a` INT, `b` DATE) PARTITION BY RANGE (`b`) (FROM ('2000-11-14') TO ('2021-11-14') INTERVAL 2 YEAR)", + ] + + +def test_create_table_with_range_partitioned_by_with_partitions( + make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter], +): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + adapter.create_table( + "test_table", + target_columns_to_types={ + "id": exp.DataType.build("INT"), + "waiter_id": exp.DataType.build("INT"), + "customer_id": exp.DataType.build("INT"), + "ds": exp.DataType.build("DATETIME"), + }, + partitioned_by=[exp.Literal.string("RANGE(ds)")], + table_properties={ + "partitions": exp.Tuple( + expressions=[ + exp.Literal.string('PARTITION `p2023` VALUES [("2023-01-01"), ("2024-01-01"))'), + exp.Literal.string('PARTITION `p2024` VALUES [("2024-01-01"), ("2025-01-01"))'), + exp.Literal.string('PARTITION `p2025` VALUES [("2025-01-01"), ("2026-01-01"))'), + exp.Literal.string("PARTITION `other` VALUES LESS THAN MAXVALUE"), + ] + ), + "distributed_by": exp.Tuple( + expressions=[ + exp.EQ( + this=exp.Column(this=exp.Identifier(this="kind", quoted=True)), + expression=exp.Literal.string("HASH"), + ), + exp.EQ( + this=exp.Column(this=exp.Identifier(this="expressions", quoted=True)), + expression=exp.Column(this=exp.Identifier(this="id", quoted=True)), + ), + exp.EQ( + this=exp.Column(this=exp.Identifier(this="buckets", quoted=True)), + expression=exp.Literal.number(10), + ), + ] + ), + "replication_allocation": exp.Literal.string("tag.location.default: 3"), + "in_memory": exp.Literal.string("false"), + "storage_format": exp.Literal.string("V2"), + "disable_auto_compaction": exp.Literal.string("false"), + }, + ) + + expected_sql = ( + "CREATE TABLE IF NOT EXISTS `test_table` " + "(`id` INT, `waiter_id` INT, `customer_id` INT, `ds` DATETIME) " + "PARTITION BY RANGE (`ds`) " + '(PARTITION `p2023` VALUES [("2023-01-01"), ("2024-01-01")), ' + 'PARTITION `p2024` VALUES [("2024-01-01"), ("2025-01-01")), ' + 'PARTITION `p2025` VALUES [("2025-01-01"), ("2026-01-01")), ' + "PARTITION `other` VALUES LESS THAN MAXVALUE) " + "DISTRIBUTED BY HASH (`id`) BUCKETS 10 " + "PROPERTIES (" + "'replication_allocation'='tag.location.default: 3', " + "'in_memory'='false', " + "'storage_format'='V2', " + "'disable_auto_compaction'='false')" + ) + + assert to_sql_calls(adapter) == [expected_sql] + + +def test_create_table_with_list_partitioned_by( + make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter], +): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + adapter.create_table( + "test_table", + target_columns_to_types={ + "id": exp.DataType.build("INT"), + "status": exp.DataType.build("VARCHAR(10)"), + }, + partitioned_by=[exp.Literal.string("LIST(status)")], + table_properties={ + "partitions": exp.Tuple( + expressions=[ + exp.Literal.string('PARTITION `active` VALUES IN ("active", "pending")'), + exp.Literal.string('PARTITION `inactive` VALUES IN ("inactive", "disabled")'), + ] + ), + }, + ) + + expected_sql = ( + "CREATE TABLE IF NOT EXISTS `test_table` " + "(`id` INT, `status` VARCHAR(10)) " + "PARTITION BY LIST (`status`) " + '(PARTITION `active` VALUES IN ("active", "pending"), ' + 'PARTITION `inactive` VALUES IN ("inactive", "disabled"))' + ) + + assert to_sql_calls(adapter) == [expected_sql] + + +def test_create_table_with_range_partitioned_by_anonymous_function( + make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter], +): + """Test that RANGE(ds) function call syntax generates correct SQL without duplicate RANGE.""" + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + adapter.create_table( + "test_table", + target_columns_to_types={ + "id": exp.DataType.build("INT"), + "ds": exp.DataType.build("DATETIME"), + }, + # This simulates how partitioned_by RANGE(ds) gets parsed from model definition + partitioned_by=[exp.Anonymous(this="RANGE", expressions=[exp.to_column("ds")])], + table_properties={ + "partitions": exp.Literal.string( + 'FROM ("2000-11-14") TO ("2099-11-14") INTERVAL 1 MONTH' + ) + }, + ) + + expected_sql = ( + "CREATE TABLE IF NOT EXISTS `test_table` " + "(`id` INT, `ds` DATETIME) " + "PARTITION BY RANGE (`ds`) " + '(FROM ("2000-11-14") TO ("2099-11-14") INTERVAL 1 MONTH)' + ) + + assert to_sql_calls(adapter) == [expected_sql] + + +def test_create_materialized_view_with_duplicate_key( + make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter], +): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + adapter.create_view( + "test_mv", + parse_one("SELECT id, status, COUNT(*) as cnt FROM orders GROUP BY id, status"), + materialized=True, + target_columns_to_types={ + "id": exp.DataType.build("INT"), + "status": exp.DataType.build("VARCHAR(10)"), + "cnt": exp.DataType.build("BIGINT"), + }, + view_properties={ + "duplicate_key": exp.Tuple( + expressions=[ + exp.to_column("id"), + exp.to_column("status"), + ] + ), + }, + ) + + expected_sqls = [ + "DROP MATERIALIZED VIEW IF EXISTS `test_mv`", + ( + "CREATE MATERIALIZED VIEW `test_mv` " + "(`id`, `status`, `cnt`) " + "DUPLICATE KEY (`id`, `status`) " + "AS SELECT `id`, `status`, COUNT(*) AS `cnt` FROM `orders` GROUP BY `id`, `status`" + ), + ] + + assert to_sql_calls(adapter) == expected_sqls + + +def test_create_full_materialized_view( + make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter], +): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + view_properties: t.Dict[str, exp.Expression] = { + "build": exp.Literal.string("IMMEDIATE"), + "refresh": exp.Tuple( + expressions=[ + exp.EQ( + this=exp.Column(this=exp.Identifier(this="method", quoted=True)), + expression=exp.Literal.string("AUTO"), + ), + exp.EQ( + this=exp.Column(this=exp.Identifier(this="kind", quoted=True)), + expression=exp.Literal.string("SCHEDULE"), + ), + exp.EQ( + this=exp.Column(this=exp.Identifier(this="every", quoted=True)), + expression=exp.Literal.number(1), + ), + exp.EQ( + this=exp.Column(this=exp.Identifier(this="unit", quoted=True)), + expression=exp.Literal.string("DAY"), + ), + exp.EQ( + this=exp.Column(this=exp.Identifier(this="starts", quoted=True)), + expression=exp.Literal.string("2024-12-01 20:30:00"), + ), + ], + ), + "distributed_by": exp.Tuple( + expressions=[ + exp.EQ( + this=exp.Column(this=exp.Identifier(this="kind", quoted=True)), + expression=exp.Literal.string("HASH"), + ), + exp.EQ( + this=exp.Column(this=exp.Identifier(this="expressions", quoted=True)), + expression=exp.Column(this=exp.Identifier(this="orderkey", quoted=True)), + ), + exp.EQ( + this=exp.Column(this=exp.Identifier(this="buckets", quoted=True)), + expression=exp.Literal.number(2), + ), + ] + ), + "unique_key": exp.to_column("orderkey"), + "replication_num": exp.Literal.string("1"), + } + materialized_properties = { + "partitioned_by": [ + parse_one("DATE_TRUNC(o_orderdate, 'MONTH')", dialect="doris"), + ], + "clustered_by": [], + "partition_interval_unit": None, + } + columns_to_types = { + "orderdate": exp.DataType.build("DATE"), + "orderkey": exp.DataType.build("INT"), + "partkey": exp.DataType.build("INT"), + } + column_descriptions = { + "orderdate": "order date", + "orderkey": "order key", + "partkey": "part key", + } + query = parse_one( + """ + SELECT + o_orderdate, + l_orderkey, + l_partkey + FROM + orders + LEFT JOIN lineitem ON l_orderkey = o_orderkey + LEFT JOIN partsupp ON ps_partkey = l_partkey + and l_suppkey = ps_suppkey + """ + ) + adapter.create_view( + "complete_mv", + query, + replace=False, + materialized=True, + target_columns_to_types=columns_to_types, + column_descriptions=column_descriptions, + table_description="test_description", + view_properties=view_properties, + materialized_properties=materialized_properties, + ) + expected_sqls = [ + "CREATE MATERIALIZED VIEW `complete_mv` (`orderdate` COMMENT 'order date', `orderkey` COMMENT 'order key', `partkey` COMMENT 'part key') " + "BUILD IMMEDIATE REFRESH AUTO ON SCHEDULE EVERY 1 DAY STARTS '2024-12-01 20:30:00' KEY (`orderkey`) COMMENT 'test_description' " + "PARTITION BY (DATE_TRUNC(`o_orderdate`, 'MONTH')) " + "DISTRIBUTED BY HASH (`orderkey`) BUCKETS 2 PROPERTIES ('replication_num'='1') " + "AS SELECT `o_orderdate`, `l_orderkey`, `l_partkey` FROM `orders` LEFT JOIN `lineitem` ON `l_orderkey` = `o_orderkey` LEFT JOIN `partsupp` ON `ps_partkey` = `l_partkey` AND `l_suppkey` = `ps_suppkey`", + ] + sql_calls = to_sql_calls(adapter) + + # Remove extra spaces for comparison + def norm(s): + return " ".join(s.split()) + + for expected_sql in expected_sqls: + assert any(norm(expected_sql) == norm(sql) for sql in sql_calls), ( + f"Expected SQL not found.\nExpected: {expected_sql}\nGot: {sql_calls}" + ) + + +def test_create_table_with_single_string_distributed_by( + make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter], +): + """Test creating table with distributed_by where expressions is a single string.""" + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + distributed_by = { + "expressions": "recordid", # Single string instead of array + "kind": "HASH", + "buckets": 10, + } + adapter.create_table( + "test_table", + target_columns_to_types={ + "recordid": exp.DataType.build("INT"), + "name": exp.DataType.build("VARCHAR"), + }, + table_properties={"distributed_by": distributed_by}, + ) + + assert to_sql_calls(adapter) == [ + "CREATE TABLE IF NOT EXISTS `test_table` (`recordid` INT, `name` VARCHAR) DISTRIBUTED BY HASH (`recordid`) BUCKETS 10", + ] + + +def test_delete_from_with_subquery(make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter]): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + + # Test DELETE FROM with IN subquery + adapter.delete_from( + "test_schema_xtll416o.test_table", + "id IN (SELECT id FROM test_schema_xtll416o.temp_test_table_ik29031e)", + ) + + # Test DELETE FROM with NOT IN subquery + adapter.delete_from( + "test_schema_xtll416o.test_table", + "id NOT IN (SELECT id FROM test_schema_xtll416o.temp_test_table_ik29031e)", + ) + + # Test simple DELETE FROM (should use base implementation) + adapter.delete_from("test_schema_xtll416o.test_table", "id = 1") + + assert to_sql_calls(adapter) == [ + "DELETE FROM `test_schema_xtll416o`.`test_table` AS `_t1` USING (SELECT `id` FROM `test_schema_xtll416o`.`temp_test_table_ik29031e`) AS `_t2` WHERE `_t1`.`id` = `_t2`.`id`", + "DELETE FROM `test_schema_xtll416o`.`test_table` AS `_t1` USING (SELECT `id` FROM `test_schema_xtll416o`.`temp_test_table_ik29031e`) AS `_t2` WHERE `_t1`.`id` <> `_t2`.`id`", + "DELETE FROM `test_schema_xtll416o`.`test_table` WHERE `id` = 1", + ] + + +def test_delete_from_with_complex_subquery( + make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter], +): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + + # Test DELETE FROM with complex subquery (multiple columns) + adapter.delete_from( + "test_schema_xtll416o.test_table", + "(id, name) IN (SELECT id, name FROM test_schema_xtll416o.temp_test_table_ik29031e WHERE active = 1)", + ) + + # Test DELETE FROM with subquery that has WHERE clause + adapter.delete_from( + "test_schema_xtll416o.test_table", + "id IN (SELECT id FROM test_schema_xtll416o.temp_test_table_ik29031e WHERE created_date > '2024-01-01')", + ) + + assert to_sql_calls(adapter) == [ + "DELETE FROM `test_schema_xtll416o`.`test_table` AS `_t1` USING (SELECT `id`, `name` FROM `test_schema_xtll416o`.`temp_test_table_ik29031e` WHERE `active` = 1) AS `_t2` WHERE `_t1`.`id` = `_t2`.`id` AND `_t1`.`name` = `_t2`.`name`", + "DELETE FROM `test_schema_xtll416o`.`test_table` AS `_t1` USING (SELECT `id` FROM `test_schema_xtll416o`.`temp_test_table_ik29031e` WHERE `created_date` > '2024-01-01') AS `_t2` WHERE `_t1`.`id` = `_t2`.`id`", + ] + + +def test_delete_from_fallback_to_base( + make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter], +): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + + # Test DELETE FROM with simple conditions (should use base implementation) + adapter.delete_from("test_table", "id = 1") + adapter.delete_from("test_table", "name = 'test' AND status = 'active'") + adapter.delete_from("test_table", "id IN (1, 2, 3)") # Simple IN with values, not subquery + + assert to_sql_calls(adapter) == [ + "DELETE FROM `test_table` WHERE `id` = 1", + "DELETE FROM `test_table` WHERE `name` = 'test' AND `status` = 'active'", + "DELETE FROM `test_table` WHERE `id` IN (1, 2, 3)", + ] + + +def test_delete_from_full_table(make_mocked_engine_adapter: t.Callable[..., DorisEngineAdapter]): + adapter = make_mocked_engine_adapter(DorisEngineAdapter) + + # Test DELETE FROM with WHERE TRUE (should use TRUNCATE) + adapter.delete_from("test_table", exp.true()) + + assert to_sql_calls(adapter) == [ + "TRUNCATE TABLE `test_table`", + ] diff --git a/tests/core/test_connection_config.py b/tests/core/test_connection_config.py index 907d1b70cc..0d2a7d9ca6 100644 --- a/tests/core/test_connection_config.py +++ b/tests/core/test_connection_config.py @@ -11,6 +11,7 @@ ClickhouseConnectionConfig, ConnectionConfig, DatabricksConnectionConfig, + DorisConnectionConfig, DuckDBAttachOptions, FabricConnectionConfig, DuckDBConnectionConfig, @@ -1795,3 +1796,59 @@ def test_schema_differ_overrides(make_config) -> None: adapter = config.create_engine_adapter() assert adapter._schema_differ_overrides == override assert adapter.schema_differ.parameterized_type_defaults == {} + + +def test_doris(make_config): + """Test DorisConnectionConfig basic functionality""" + # Basic configuration + config = make_config( + type="doris", + host="localhost", + user="root", + password="password", + port=9030, + database="demo", + check_import=False, + ) + assert isinstance(config, DorisConnectionConfig) + assert config.type_ == "doris" + assert config.host == "localhost" + assert config.user == "root" + assert config.password == "password" + assert config.port == 9030 + assert config.database == "demo" + assert config.DIALECT == "doris" + assert config.DISPLAY_NAME == "Apache Doris" + assert config.DISPLAY_ORDER == 18 + assert config.is_recommended_for_state_sync is False + + # Test with minimal configuration (using default port) + minimal_config = make_config( + type="doris", + host="fe.doris.cluster", + user="doris_user", + password="doris_pass", + check_import=False, + ) + assert isinstance(minimal_config, DorisConnectionConfig) + assert minimal_config.port == 9030 # Default Doris FE port + assert minimal_config.host == "fe.doris.cluster" + assert minimal_config.user == "doris_user" + + # Test with additional MySQL-compatible options + advanced_config = make_config( + type="doris", + host="doris-fe", + user="admin", + password="admin123", + port=9030, + database="analytics", + charset="utf8mb4", + ssl_disabled=True, + concurrent_tasks=10, + check_import=False, + ) + assert isinstance(advanced_config, DorisConnectionConfig) + assert advanced_config.charset == "utf8mb4" + assert advanced_config.ssl_disabled is True + assert advanced_config.concurrent_tasks == 10