diff --git a/CHANGELOG.md b/CHANGELOG.md index 046495b8..6adc9199 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,10 +6,12 @@ All notable changes to this project will be documented in this file. ### Added -- Support for fault-tolerant execution ([#779]). +- Support for fault-tolerant execution ([#779], [#793]). +- Support for the client spooling protocol ([#793]). - Helm: Allow Pod `priorityClassName` to be configured ([#798]). [#779]: https://github.com/stackabletech/trino-operator/pull/779 +[#793]: https://github.com/stackabletech/trino-operator/pull/793 [#798]: https://github.com/stackabletech/trino-operator/pull/798 ## [25.7.0] - 2025-07-23 diff --git a/deploy/helm/trino-operator/crds/crds.yaml b/deploy/helm/trino-operator/crds/crds.yaml index 19ee4127..71e503e7 100644 --- a/deploy/helm/trino-operator/crds/crds.yaml +++ b/deploy/helm/trino-operator/crds/crds.yaml @@ -105,6 +105,154 @@ spec: description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed. type: object type: object + clientProtocol: + description: Client spooling protocol configuration. + nullable: true + oneOf: + - required: + - spooling + properties: + spooling: + properties: + filesystem: + oneOf: + - required: + - s3 + properties: + s3: + properties: + connection: + oneOf: + - required: + - inline + - required: + - reference + properties: + inline: + description: S3 connection definition as a resource. Learn more on the [S3 concept documentation](https://docs.stackable.tech/home/nightly/concepts/s3). + properties: + accessStyle: + default: VirtualHosted + description: Which access style to use. Defaults to virtual hosted-style as most of the data products out there. Have a look at the [AWS documentation](https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html). + enum: + - Path + - VirtualHosted + type: string + credentials: + description: If the S3 uses authentication you have to specify you S3 credentials. In the most cases a [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) providing `accessKey` and `secretKey` is sufficient. + nullable: true + properties: + scope: + description: '[Scope](https://docs.stackable.tech/home/nightly/secret-operator/scope) of the [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass).' + nullable: true + properties: + listenerVolumes: + default: [] + description: The listener volume scope allows Node and Service scopes to be inferred from the applicable listeners. This must correspond to Volume names in the Pod that mount Listeners. + items: + type: string + type: array + node: + default: false + description: The node scope is resolved to the name of the Kubernetes Node object that the Pod is running on. This will typically be the DNS name of the node. + type: boolean + pod: + default: false + description: The pod scope is resolved to the name of the Kubernetes Pod. This allows the secret to differentiate between StatefulSet replicas. + type: boolean + services: + default: [] + description: The service scope allows Pod objects to specify custom scopes. This should typically correspond to Service objects that the Pod participates in. + items: + type: string + type: array + type: object + secretClass: + description: '[SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) containing the LDAP bind credentials.' + type: string + required: + - secretClass + type: object + host: + description: 'Host of the S3 server without any protocol or port. For example: `west1.my-cloud.com`.' + type: string + port: + description: Port the S3 server listens on. If not specified the product will determine the port to use. + format: uint16 + minimum: 0.0 + nullable: true + type: integer + region: + default: + name: us-east-1 + description: |- + Bucket region used for signing headers (sigv4). + + This defaults to `us-east-1` which is compatible with other implementations such as Minio. + + WARNING: Some products use the Hadoop S3 implementation which falls back to us-east-2. + properties: + name: + default: us-east-1 + type: string + type: object + tls: + description: Use a TLS connection. If not specified no TLS will be used. + nullable: true + properties: + verification: + description: The verification method used to verify the certificates of the server and/or the client. + oneOf: + - required: + - none + - required: + - server + properties: + none: + description: Use TLS but don't verify certificates. + type: object + server: + description: Use TLS and a CA certificate to verify the server. + properties: + caCert: + description: CA cert to verify the server. + oneOf: + - required: + - webPki + - required: + - secretClass + properties: + secretClass: + description: Name of the [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) which will provide the CA certificate. Note that a SecretClass does not need to have a key but can also work with just a CA certificate, so if you got provided with a CA cert but don't have access to the key you can still use this method. + type: string + webPki: + description: Use TLS and the CA certificates trusted by the common web browsers to verify the server. This can be useful when you e.g. use public AWS S3 or other public available services. + type: object + type: object + required: + - caCert + type: object + type: object + required: + - verification + type: object + required: + - host + type: object + reference: + type: string + type: object + required: + - connection + type: object + type: object + location: + type: string + required: + - filesystem + - location + type: object + type: object faultTolerantExecution: description: Fault tolerant execution configuration. When enabled, Trino can automatically retry queries or tasks in case of failures. nullable: true @@ -132,12 +280,6 @@ spec: - required: - local properties: - configOverrides: - additionalProperties: - type: string - default: {} - description: The `configOverrides` allow overriding arbitrary exchange manager properties. - type: object encryptionEnabled: description: Whether to enable encryption of spooling data. nullable: true @@ -312,14 +454,6 @@ spec: reference: type: string type: object - externalId: - description: External ID for the IAM role trust policy. - nullable: true - type: string - iamRole: - description: IAM role to assume for S3 access. - nullable: true - type: string maxErrorRetries: description: Maximum number of times the S3 client should retry a request. format: uint32 @@ -394,12 +528,6 @@ spec: - required: - local properties: - configOverrides: - additionalProperties: - type: string - default: {} - description: The `configOverrides` allow overriding arbitrary exchange manager properties. - type: object encryptionEnabled: description: Whether to enable encryption of spooling data. nullable: true @@ -574,14 +702,6 @@ spec: reference: type: string type: object - externalId: - description: External ID for the IAM role trust policy. - nullable: true - type: string - iamRole: - description: IAM role to assume for S3 access. - nullable: true - type: string maxErrorRetries: description: Maximum number of times the S3 client should retry a request. format: uint32 diff --git a/docs/modules/trino/pages/usage-guide/client-spooling-protocol.adoc b/docs/modules/trino/pages/usage-guide/client-spooling-protocol.adoc new file mode 100644 index 00000000..1dda133c --- /dev/null +++ b/docs/modules/trino/pages/usage-guide/client-spooling-protocol.adoc @@ -0,0 +1,43 @@ += Client Spooling Protocol +:description: Enable and configure the Client Spooling Protocol in Trino for efficient handling of large result sets. +:keywords: client spooling protocol, Trino, large result sets, memory management +:trino-docs-spooling-url: https://trino.io/docs/476/client/client-protocol.html + +The Client Spooling Protocol in Trino is designed to efficiently handle large result sets. When enabled, this protocol allows the Trino server to spool results to external storage systems, reducing memory consumption and improving performance for queries that return large datasets. + +For more details, refer to the link:{trino-docs-spooling-url}[Trino documentation on Client Spooling Protocol {external-link-icon}^]. + +[IMPORTANT] +==== +The client spooling protocol was introduced in Trino 466 but it only works reliably starting with Trino 476. +==== + +== Configuration + +The client spooling protocol is disabled by default. +To enable it, you need to set the `spec.clusterConfig.clientSpoolingProtocol` configuration property as shown below. + +[source,yaml] +---- +spec: + clusterConfig: + clientProtocol: + spooling: + location: "s3://spooling-bucket/trino/" # <1> + filesystem: + s3: # <2> + connection: + reference: "minio" +---- +<1> Specifies the location where spooled data will be stored. This example uses an S3 bucket. +<2> Configures the filesystem type for spooling. Only S3 is supported currently via the custom resource definition. + +The operator automatically fills in additional settings required by Trino, such as the `protocol.spooling.shared-secret-key`. +To add or replace properties in the generated `spooling-manager.properties` file, use the `configOverrides` property as describe here : xref:usage-guide/configuration.adoc[]. + +[IMPORTANT] +==== +Even if enabled, Trino may decide to not use the client spooling protocol for certain queries. Clients cannot force Trino to use it. +==== + +The clients need to have access to the same storage location configured for spooling. diff --git a/docs/modules/trino/pages/usage-guide/configuration.adoc b/docs/modules/trino/pages/usage-guide/configuration.adoc index 44b58742..d6fe2eac 100644 --- a/docs/modules/trino/pages/usage-guide/configuration.adoc +++ b/docs/modules/trino/pages/usage-guide/configuration.adoc @@ -16,6 +16,7 @@ For a role or role group, at the same level of `config`, you can specify `config * `password-authenticator.properties` * `security.properties` * `exchange-manager.properties` +* `spooling-manager.properties` For a list of possible configuration properties consult the https://trino.io/docs/current/admin/properties.html[Trino Properties Reference]. diff --git a/docs/modules/trino/partials/nav.adoc b/docs/modules/trino/partials/nav.adoc index a1ff7c0d..02dbac77 100644 --- a/docs/modules/trino/partials/nav.adoc +++ b/docs/modules/trino/partials/nav.adoc @@ -7,6 +7,7 @@ ** xref:trino:usage-guide/listenerclass.adoc[] ** xref:trino:usage-guide/configuration.adoc[] ** xref:trino:usage-guide/fault-tolerant-execution.adoc[] +** xref:trino:usage-guide/client-spooling-protocol.adoc[] ** xref:trino:usage-guide/s3.adoc[] ** xref:trino:usage-guide/security.adoc[] ** xref:trino:usage-guide/monitoring.adoc[] diff --git a/rust/operator-binary/src/authentication/mod.rs b/rust/operator-binary/src/authentication/mod.rs index 04bb317a..fd91436f 100644 --- a/rust/operator-binary/src/authentication/mod.rs +++ b/rust/operator-binary/src/authentication/mod.rs @@ -43,26 +43,26 @@ const HTTP_SERVER_AUTHENTICATION_TYPE: &str = "http-server.authentication.type"; #[derive(Snafu, Debug)] pub enum Error { #[snafu(display( - "The Trino Operator does not support the AuthenticationClass provider [{authentication_class_provider}] from AuthenticationClass [{authentication_class}]." + "the Trino Operator does not support the AuthenticationClass provider [{authentication_class_provider}] from AuthenticationClass [{authentication_class}]." ))] AuthenticationClassProviderNotSupported { authentication_class_provider: String, authentication_class: ObjectRef, }, - #[snafu(display("Failed to format trino authentication java properties"))] + #[snafu(display("failed to format trino authentication java properties"))] FailedToWriteJavaProperties { source: product_config::writer::PropertiesWriterError, }, - #[snafu(display("Failed to configure trino password authentication"))] + #[snafu(display("failed to configure trino password authentication"))] InvalidPasswordAuthenticationConfig { source: password::Error }, - #[snafu(display("Failed to configure trino OAuth2 authentication"))] + #[snafu(display("failed to configure trino OAuth2 authentication"))] InvalidOauth2AuthenticationConfig { source: oidc::Error }, #[snafu(display( - "OIDC authentication details not specified. The AuthenticationClass {auth_class_name:?} uses an OIDC provider, you need to specify OIDC authentication details (such as client credentials) as well" + "oidc authentication details not specified. The AuthenticationClass {auth_class_name:?} uses an OIDC provider, you need to specify OIDC authentication details (such as client credentials) as well" ))] OidcAuthenticationDetailsNotSpecified { auth_class_name: String }, diff --git a/rust/operator-binary/src/authentication/oidc/mod.rs b/rust/operator-binary/src/authentication/oidc/mod.rs index 181a9358..7d3615f5 100644 --- a/rust/operator-binary/src/authentication/oidc/mod.rs +++ b/rust/operator-binary/src/authentication/oidc/mod.rs @@ -28,12 +28,12 @@ const WEB_UI_AUTHENTICATION_TYPE: &str = "web-ui.authentication.type"; #[derive(Snafu, Debug)] pub enum Error { #[snafu(display( - "No OAuth2 AuthenticationClass provided. This is an internal operator failure and should not be happening." + "no OAuth2 AuthenticationClass provided. This is an internal operator failure and should not be happening." ))] NoOauth2AuthenticationClassProvided, #[snafu(display( - "Trino cannot configure OAuth2 with multiple Identity providers. \ + "trino cannot configure OAuth2 with multiple Identity providers. \ Received the following AuthenticationClasses {authentication_class_names:?}. \ Please only provide one OAuth2 AuthenticationClass!" ))] @@ -41,15 +41,15 @@ pub enum Error { authentication_class_names: Vec, }, - #[snafu(display("Failed to create OAuth2 issuer endpoint url."))] + #[snafu(display("failed to create OAuth2 issuer endpoint url."))] FailedToCreateIssuerEndpointUrl { source: stackable_operator::crd::authentication::oidc::v1alpha1::Error, }, - #[snafu(display("Trino does not support unverified TLS connections to OIDC"))] + #[snafu(display("trino does not support unverified TLS connections to OIDC"))] UnverifiedOidcTlsConnectionNotSupported, - #[snafu(display("Failed to create OIDC Volumes and VolumeMounts"))] + #[snafu(display("failed to create OIDC Volumes and VolumeMounts"))] FailedToCreateOidcVolumeAndVolumeMounts { source: TlsClientDetailsError }, } diff --git a/rust/operator-binary/src/authentication/password/ldap.rs b/rust/operator-binary/src/authentication/password/ldap.rs index 84a31a1a..671b4a10 100644 --- a/rust/operator-binary/src/authentication/password/ldap.rs +++ b/rust/operator-binary/src/authentication/password/ldap.rs @@ -22,15 +22,15 @@ const LDAP_PASSWORD_ENV: &str = "LDAP_PASSWORD"; #[derive(Snafu, Debug)] pub enum Error { - #[snafu(display("Trino does not support unverified TLS connections to LDAP"))] + #[snafu(display("trino does not support unverified TLS connections to LDAP"))] UnverifiedLdapTlsConnectionNotSupported, - #[snafu(display("Failed to construct LDAP endpoint URL"))] + #[snafu(display("failed to construct LDAP endpoint URL"))] LdapEndpoint { source: stackable_operator::crd::authentication::ldap::v1alpha1::Error, }, - #[snafu(display("Failed to construct LDAP Volumes and VolumeMounts"))] + #[snafu(display("failed to construct LDAP Volumes and VolumeMounts"))] LdapVolumeAndVolumeMounts { source: stackable_operator::crd::authentication::ldap::v1alpha1::Error, }, diff --git a/rust/operator-binary/src/catalog/mod.rs b/rust/operator-binary/src/catalog/mod.rs index 0087c0c4..63c670a6 100644 --- a/rust/operator-binary/src/catalog/mod.rs +++ b/rust/operator-binary/src/catalog/mod.rs @@ -57,7 +57,7 @@ pub enum FromTrinoCatalogError { data_key: String, }, - #[snafu(display("Failed to create the Secret Volume for the S3 credentials"))] + #[snafu(display("failed to create the Secret Volume for the S3 credentials"))] CreateS3CredentialsSecretOperatorVolume { source: stackable_operator::builder::pod::volume::SecretOperatorVolumeSourceBuilderError, }, diff --git a/rust/operator-binary/src/command.rs b/rust/operator-binary/src/command.rs index 4b6791d5..29c5769a 100644 --- a/rust/operator-binary/src/command.rs +++ b/rust/operator-binary/src/command.rs @@ -9,13 +9,14 @@ use stackable_operator::{ use crate::{ authentication::TrinoAuthenticationConfig, catalog::config::CatalogConfig, + config::{client_protocol, fault_tolerant_execution}, controller::{STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR}, crd::{ - CONFIG_DIR_NAME, Container, LOG_PROPERTIES, RW_CONFIG_DIR_NAME, STACKABLE_CLIENT_TLS_DIR, + CONFIG_DIR_NAME, Container, EXCHANGE_MANAGER_PROPERTIES, LOG_PROPERTIES, + RW_CONFIG_DIR_NAME, SPOOLING_MANAGER_PROPERTIES, STACKABLE_CLIENT_TLS_DIR, STACKABLE_INTERNAL_TLS_DIR, STACKABLE_MOUNT_INTERNAL_TLS_DIR, STACKABLE_MOUNT_SERVER_TLS_DIR, STACKABLE_SERVER_TLS_DIR, STACKABLE_TLS_STORE_PASSWORD, - SYSTEM_TRUST_STORE, SYSTEM_TRUST_STORE_PASSWORD, TrinoRole, - fault_tolerant_execution::ResolvedFaultTolerantExecutionConfig, v1alpha1, + SYSTEM_TRUST_STORE, SYSTEM_TRUST_STORE_PASSWORD, TrinoRole, v1alpha1, }, }; @@ -23,7 +24,8 @@ pub fn container_prepare_args( trino: &v1alpha1::TrinoCluster, catalogs: &[CatalogConfig], merged_config: &v1alpha1::TrinoConfig, - resolved_fte_config: &Option, + resolved_fte_config: &Option, + resolved_spooling_config: &Option, ) -> Vec { let mut args = vec![]; @@ -85,13 +87,17 @@ pub fn container_prepare_args( args.extend_from_slice(&resolved_fte.init_container_extra_start_commands); } + // Add the commands that are needed for the client spooling protocol (e.g., TLS certificates for S3) + if let Some(resolved_spooling) = resolved_spooling_config { + args.extend_from_slice(&resolved_spooling.init_container_extra_start_commands); + } + args } pub fn container_trino_args( authentication_config: &TrinoAuthenticationConfig, catalogs: &[CatalogConfig], - resolved_fte_config: &Option, ) -> Vec { let mut args = vec![ // copy config files to a writeable empty folder @@ -119,12 +125,17 @@ pub fn container_trino_args( } }); - // Add fault tolerant execution environment variables from files - if let Some(resolved_fte) = resolved_fte_config { - for (env_name, file) in &resolved_fte.load_env_from_files { - args.push(format!("export {env_name}=\"$(cat {file})\"")); - } - } + // Resolve credentials for fault tolerant execution exchange manager if needed + args.push(format!( + "test -f {rw_exchange_manager_config_file} && config-utils template {rw_exchange_manager_config_file}", + rw_exchange_manager_config_file = format!("{RW_CONFIG_DIR_NAME}/{EXCHANGE_MANAGER_PROPERTIES}") + )); + + // Resolve credentials for spooling manager if needed + args.push(format!( + "test -f {rw_spooling_config_file} && config-utils template {rw_spooling_config_file}", + rw_spooling_config_file = format!("{RW_CONFIG_DIR_NAME}/{SPOOLING_MANAGER_PROPERTIES}") + )); args.push("set -x".to_string()); diff --git a/rust/operator-binary/src/config/client_protocol.rs b/rust/operator-binary/src/config/client_protocol.rs new file mode 100644 index 00000000..96354c21 --- /dev/null +++ b/rust/operator-binary/src/config/client_protocol.rs @@ -0,0 +1,167 @@ +// Consolidate Trino S3 properties in a single reusable struct. + +use std::collections::BTreeMap; + +use snafu::{self, ResultExt, Snafu}; +use stackable_operator::{ + client::Client, + k8s_openapi::api::core::v1::{Volume, VolumeMount}, +}; + +use crate::{ + config, + crd::{ + ENV_SPOOLING_SECRET, + client_protocol::{ClientProtocolConfig, SpoolingFileSystemConfig}, + }, +}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to resolve S3 connection"))] + ResolveS3Connection { source: config::s3::Error }, + + #[snafu(display("trino does not support disabling the TLS verification of S3 servers"))] + S3TlsNoVerificationNotSupported, + + #[snafu(display("failed to convert data size for [{field}] to bytes"))] + QuantityConversion { + source: stackable_operator::memory::Error, + field: &'static str, + }, +} + +pub struct ResolvedClientProtocolConfig { + /// Properties to add to config.properties + pub config_properties: BTreeMap, + + // Properties for spooling-manager.properties + pub spooling_manager_properties: BTreeMap, + + /// Volumes required for the configuration (e.g., for S3 credentials) + pub volumes: Vec, + + /// Volume mounts required for the configuration + pub volume_mounts: Vec, + + /// Additional commands that need to be executed before starting Trino + /// Used to add TLS certificates to the client's trust store. + pub init_container_extra_start_commands: Vec, +} + +impl ResolvedClientProtocolConfig { + /// Resolve S3 connection properties from Kubernetes resources + /// and prepare spooling filesystem configuration. + pub async fn from_config( + config: &ClientProtocolConfig, + client: Option<&Client>, + namespace: &str, + ) -> Result { + let mut resolved_config = Self { + config_properties: BTreeMap::new(), + spooling_manager_properties: BTreeMap::new(), + volumes: Vec::new(), + volume_mounts: Vec::new(), + init_container_extra_start_commands: Vec::new(), + }; + + match config { + ClientProtocolConfig::Spooling(spooling_config) => { + // Resolve external resources if Kubernetes client is available + // This should always be the case, except for when this function is called during unit tests + if let Some(client) = client { + match &spooling_config.filesystem { + SpoolingFileSystemConfig::S3(s3_config) => { + let resolved_s3_config = config::s3::ResolvedS3Config::from_config( + &s3_config.connection, + client, + namespace, + ) + .await + .context(ResolveS3ConnectionSnafu)?; + + // Enable S3 filesystem after successful resolution + resolved_config + .spooling_manager_properties + .insert("fs.s3.enabled".to_string(), "true".to_string()); + + // Copy the S3 configuration over + resolved_config + .spooling_manager_properties + .extend(resolved_s3_config.properties); + resolved_config.volumes.extend(resolved_s3_config.volumes); + resolved_config + .volume_mounts + .extend(resolved_s3_config.volume_mounts); + resolved_config + .init_container_extra_start_commands + .extend(resolved_s3_config.init_container_extra_start_commands); + } + } + } + + resolved_config.spooling_manager_properties.extend([ + ("fs.location".to_string(), spooling_config.location.clone()), + ( + "spooling-manager.name".to_string(), + "filesystem".to_string(), + ), + ]); + + // Enable spooling protocol + resolved_config.config_properties.extend([ + ("protocol.spooling.enabled".to_string(), "true".to_string()), + ( + "protocol.spooling.shared-secret-key".to_string(), + format!("${{ENV:{ENV_SPOOLING_SECRET}}}"), + ), + ]); + } + } + + Ok(resolved_config) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_spooling_config() { + let config_yaml = indoc::indoc! {r#" + spooling: + location: s3://my-bucket/spooling + filesystem: + s3: + connection: + reference: test-s3-connection + "#}; + + let deserializer = serde_yaml::Deserializer::from_str(config_yaml); + let config = serde_yaml::with::singleton_map_recursive::deserialize(deserializer) + .expect("invalid test input"); + + let resolved_spooling_config = ResolvedClientProtocolConfig::from_config( + &config, None, // No client, so no external resolution + "default", + ) + .await + .unwrap(); + + let expected_props = BTreeMap::from([ + ( + "fs.location".to_string(), + "s3://my-bucket/spooling".to_string(), + ), + ( + "spooling-manager.name".to_string(), + "filesystem".to_string(), + ), + ]); + assert_eq!( + expected_props, + resolved_spooling_config.spooling_manager_properties + ); + } +} diff --git a/rust/operator-binary/src/config/fault_tolerant_execution.rs b/rust/operator-binary/src/config/fault_tolerant_execution.rs new file mode 100644 index 00000000..852f15c3 --- /dev/null +++ b/rust/operator-binary/src/config/fault_tolerant_execution.rs @@ -0,0 +1,503 @@ +use std::collections::BTreeMap; + +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + builder::pod::volume::{VolumeBuilder, VolumeMountBuilder}, + client::Client, + crd::s3, + k8s_openapi::{ + api::core::v1::{Volume, VolumeMount}, + apimachinery::pkg::api::resource::Quantity, + }, +}; + +use crate::{ + config, + crd::{ + CONFIG_DIR_NAME, + fault_tolerant_execution::{ + ExchangeManagerBackend, FaultTolerantExecutionConfig, HdfsExchangeConfig, + }, + }, +}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to resolve S3 connection"))] + S3Connection { + source: s3::v1alpha1::ConnectionError, + }, + + #[snafu(display("failed to resolve S3 connection"))] + ResolveS3Connection { source: config::s3::Error }, + + #[snafu(display("trino does not support disabling the TLS verification of S3 servers"))] + S3TlsNoVerificationNotSupported, + + #[snafu(display("failed to convert data size for [{field}] to bytes"))] + QuantityConversion { + source: stackable_operator::memory::Error, + field: &'static str, + }, +} + +/// Fault tolerant execution configuration with external resources resolved +pub struct ResolvedFaultTolerantExecutionConfig { + /// Properties to add to config.properties + pub config_properties: BTreeMap, + + /// Properties to add to exchange-manager.properties (if needed) + pub exchange_manager_properties: BTreeMap, + + /// Volumes required for the configuration (e.g., for S3 credentials) + pub volumes: Vec, + + /// Volume mounts required for the configuration + pub volume_mounts: Vec, + + /// Additional commands that need to be executed before starting Trino + pub init_container_extra_start_commands: Vec, +} + +impl ResolvedFaultTolerantExecutionConfig { + /// Helper function to insert optional values into properties map + fn insert_if_present( + properties: &mut BTreeMap, + key: &str, + value: Option, + ) { + if let Some(v) = value { + properties.insert(key.to_string(), v.to_string()); + } + } + + /// Helper function to insert optional Quantity values after converting to Trino bytes string + fn insert_quantity_if_present( + properties: &mut BTreeMap, + key: &'static str, + quantity: Option<&Quantity>, + ) -> Result<(), Error> { + if let Some(q) = quantity { + use snafu::ResultExt; + let v = crate::crd::quantity_to_trino_bytes(q) + .context(QuantityConversionSnafu { field: key })?; + properties.insert(key.to_string(), v); + } + Ok(()) + } + + /// Create a resolved fault tolerant execution configuration from the cluster config + pub async fn from_config( + config: &FaultTolerantExecutionConfig, + client: Option<&Client>, + namespace: &str, + ) -> Result { + let mut config_properties = BTreeMap::new(); + + // Handle different retry policies and their configurations + let (retry_policy_str, exchange_manager_opt) = match config { + FaultTolerantExecutionConfig::Query(query_config) => { + // Set query-specific properties + Self::insert_if_present( + &mut config_properties, + "query-retry-attempts", + query_config.retry_attempts, + ); + Self::insert_if_present( + &mut config_properties, + "retry-initial-delay", + query_config + .retry_initial_delay + .as_ref() + .map(|d| format!("{}s", d.as_secs())), + ); + Self::insert_if_present( + &mut config_properties, + "retry-max-delay", + query_config + .retry_max_delay + .as_ref() + .map(|d| format!("{}s", d.as_secs())), + ); + Self::insert_if_present( + &mut config_properties, + "retry-delay-scale-factor", + query_config.retry_delay_scale_factor.as_ref(), + ); + Self::insert_quantity_if_present( + &mut config_properties, + "exchange.deduplication-buffer-size", + query_config.exchange_deduplication_buffer_size.as_ref(), + )?; + + ("QUERY", query_config.exchange_manager.as_ref()) + } + FaultTolerantExecutionConfig::Task(task_config) => { + // Set task-specific properties + Self::insert_if_present( + &mut config_properties, + "task-retry-attempts-per-task", + task_config.retry_attempts_per_task, + ); + Self::insert_if_present( + &mut config_properties, + "retry-initial-delay", + task_config + .retry_initial_delay + .as_ref() + .map(|d| format!("{}s", d.as_secs())), + ); + Self::insert_if_present( + &mut config_properties, + "retry-max-delay", + task_config + .retry_max_delay + .as_ref() + .map(|d| format!("{}s", d.as_secs())), + ); + Self::insert_if_present( + &mut config_properties, + "retry-delay-scale-factor", + task_config.retry_delay_scale_factor.as_ref(), + ); + Self::insert_quantity_if_present( + &mut config_properties, + "exchange.deduplication-buffer-size", + task_config.exchange_deduplication_buffer_size.as_ref(), + )?; + + ("TASK", Some(&task_config.exchange_manager)) + } + }; + + config_properties.insert("retry-policy".to_string(), retry_policy_str.to_string()); + + let mut exchange_manager_properties = BTreeMap::new(); + if let Some(exchange_config) = exchange_manager_opt { + Self::insert_if_present( + &mut config_properties, + "fault-tolerant-execution.exchange-encryption-enabled", + exchange_config.encryption_enabled, + ); + Self::insert_if_present( + &mut exchange_manager_properties, + "exchange.sink-buffer-pool-min-size", + exchange_config.sink_buffer_pool_min_size, + ); + Self::insert_if_present( + &mut exchange_manager_properties, + "exchange.sink-buffers-per-partition", + exchange_config.sink_buffers_per_partition, + ); + Self::insert_quantity_if_present( + &mut exchange_manager_properties, + "exchange.sink-max-file-size", + exchange_config.sink_max_file_size.as_ref(), + )?; + Self::insert_if_present( + &mut exchange_manager_properties, + "exchange.source-concurrent-readers", + exchange_config.source_concurrent_readers, + ); + + // Add backend-specific configuration + match &exchange_config.backend { + ExchangeManagerBackend::S3(s3_exchange_config) => { + exchange_manager_properties.insert( + "exchange-manager.name".to_string(), + "filesystem".to_string(), + ); + exchange_manager_properties.insert( + "exchange.base-directories".to_string(), + s3_exchange_config.base_directories.join(","), + ); + + Self::insert_if_present( + &mut exchange_manager_properties, + "exchange.s3.max-error-retries", + s3_exchange_config.max_error_retries, + ); + Self::insert_quantity_if_present( + &mut exchange_manager_properties, + "exchange.s3.upload.part-size", + s3_exchange_config.upload_part_size.as_ref(), + )?; + } + ExchangeManagerBackend::Hdfs(hdfs_config) => { + exchange_manager_properties + .insert("exchange-manager.name".to_string(), "hdfs".to_string()); + exchange_manager_properties.insert( + "exchange.base-directories".to_string(), + hdfs_config.base_directories.join(","), + ); + + Self::insert_quantity_if_present( + &mut exchange_manager_properties, + "exchange.hdfs.block-size", + hdfs_config.block_size.as_ref(), + )?; + Self::insert_if_present( + &mut exchange_manager_properties, + "exchange.hdfs.skip-directory-scheme-validation", + hdfs_config.skip_directory_scheme_validation, + ); + + let hdfs_config_dir = format!("{CONFIG_DIR_NAME}/exchange-hdfs-config"); + exchange_manager_properties.insert( + "hdfs.config.resources".to_string(), + format!("{hdfs_config_dir}/core-site.xml,{hdfs_config_dir}/hdfs-site.xml"), + ); + } + ExchangeManagerBackend::Local(local_config) => { + exchange_manager_properties.insert( + "exchange-manager.name".to_string(), + "filesystem".to_string(), + ); + exchange_manager_properties.insert( + "exchange.base-directories".to_string(), + local_config.base_directories.join(","), + ); + } + } + } + + let mut resolved_config = Self { + config_properties, + exchange_manager_properties, + volumes: Vec::new(), + volume_mounts: Vec::new(), + init_container_extra_start_commands: Vec::new(), + }; + + // Resolve external resources if Kubernetes client is available + // This should always be the case, except for when this function is called during unit tests + if let (Some(client), Some(exchange_config)) = (client, exchange_manager_opt) { + match &exchange_config.backend { + ExchangeManagerBackend::S3(s3_config) => { + let resolved_s3_config = config::s3::ResolvedS3Config::from_config( + &s3_config.connection, + client, + namespace, + ) + .await + .context(ResolveS3ConnectionSnafu)?; + + // Copy the S3 configuration over and add "exchange." prefix + resolved_config.exchange_manager_properties.extend( + resolved_s3_config + .properties + .into_iter() + .map(|(k, v)| (format!("exchange.{k}"), v)), + ); + resolved_config.volumes.extend(resolved_s3_config.volumes); + resolved_config + .volume_mounts + .extend(resolved_s3_config.volume_mounts); + resolved_config + .init_container_extra_start_commands + .extend(resolved_s3_config.init_container_extra_start_commands); + } + ExchangeManagerBackend::Hdfs(hdfs_config) => { + resolved_config.resolve_hdfs_backend(hdfs_config); + } + ExchangeManagerBackend::Local(_) => { + // Local backend requires no external resource resolution + } + } + } + + Ok(resolved_config) + } + + fn resolve_hdfs_backend(&mut self, hdfs_config: &HdfsExchangeConfig) { + let hdfs_config_dir = format!("{CONFIG_DIR_NAME}/exchange-hdfs-config"); + let volume_name = "exchange-hdfs-config".to_string(); + + self.volumes.push( + VolumeBuilder::new(&volume_name) + .with_config_map(&hdfs_config.hdfs.config_map) + .build(), + ); + self.volume_mounts + .push(VolumeMountBuilder::new(&volume_name, &hdfs_config_dir).build()); + } +} + +#[cfg(test)] +mod tests { + + use indoc::indoc; + + use super::*; + + fn parse_config(config_yaml: &str) -> FaultTolerantExecutionConfig { + let deserializer = serde_yaml::Deserializer::from_str(config_yaml); + serde_yaml::with::singleton_map_recursive::deserialize(deserializer) + .expect("invalid test input") + } + + #[tokio::test] + async fn test_query_retry_policy_without_exchange_manager() { + let config = parse_config(indoc! {r#" + query: + retryAttempts: 5 + retryInitialDelay: 15s + retryMaxDelay: 90s + retryDelayScaleFactor: 3 + exchangeDeduplicationBufferSize: 64Mi + "#}); + + let fte_config = + ResolvedFaultTolerantExecutionConfig::from_config(&config, None, "default") + .await + .unwrap(); + + assert_eq!( + fte_config.config_properties.get("retry-policy"), + Some(&"QUERY".to_string()) + ); + assert_eq!( + fte_config.config_properties.get("query-retry-attempts"), + Some(&"5".to_string()) + ); + assert_eq!( + fte_config.config_properties.get("retry-initial-delay"), + Some(&"15s".to_string()) + ); + assert_eq!( + fte_config.config_properties.get("retry-max-delay"), + Some(&"90s".to_string()) + ); + assert_eq!( + fte_config.config_properties.get("retry-delay-scale-factor"), + Some(&"3".to_string()) + ); + assert_eq!( + fte_config + .config_properties + .get("exchange.deduplication-buffer-size"), + Some(&"67108864B".to_string()) + ); + } + + #[tokio::test] + async fn test_query_retry_policy_with_exchange_manager() { + let config = parse_config(indoc! {r#" + query: + retryAttempts: 3 + retryInitialDelay: 10s + retryMaxDelay: 1m + retryDelayScaleFactor: 2 + exchangeDeduplicationBufferSize: 100Mi + exchangeManager: + encryptionEnabled: true + sinkBufferPoolMinSize: 10 + sinkBuffersPerPartition: 2 + sinkMaxFileSize: 1Gi + sourceConcurrentReaders: 4 + local: + baseDirectories: ["/tmp/exchange"] + "#}); + + let fte_config = + ResolvedFaultTolerantExecutionConfig::from_config(&config, None, "default") + .await + .unwrap(); + + assert_eq!( + fte_config.config_properties.get("retry-policy"), + Some(&"QUERY".to_string()) + ); + assert_eq!( + fte_config.config_properties.get("query-retry-attempts"), + Some(&"3".to_string()) + ); + assert_eq!( + fte_config.config_properties.get("retry-initial-delay"), + Some(&"10s".to_string()) + ); + assert_eq!( + fte_config.config_properties.get("retry-max-delay"), + Some(&"60s".to_string()) + ); + assert_eq!( + fte_config.config_properties.get("retry-delay-scale-factor"), + Some(&"2".to_string()) + ); + + assert_eq!( + fte_config + .exchange_manager_properties + .get("exchange-manager.name"), + Some(&"filesystem".to_string()) + ); + assert_eq!( + fte_config + .exchange_manager_properties + .get("exchange.base-directories"), + Some(&"/tmp/exchange".to_string()) + ); + assert_eq!( + fte_config + .config_properties + .get("exchange.deduplication-buffer-size"), + Some(&"104857600B".to_string()) + ); + assert_eq!( + fte_config + .config_properties + .get("fault-tolerant-execution.exchange-encryption-enabled"), + Some(&"true".to_string()) + ); + } + + #[tokio::test] + async fn test_task_retry_policy_with_s3_exchange_manager() { + let config = parse_config(indoc! {r#" + task: + exchangeManager: + s3: + baseDirectories: ["s3://my-bucket/exchange"] + connection: + reference: test-s3-connection + maxErrorRetries: 5 + uploadPartSize: 10Mi + "#}); + + let fte_config = + ResolvedFaultTolerantExecutionConfig::from_config(&config, None, "default") + .await + .unwrap(); + + assert_eq!( + fte_config.config_properties.get("retry-policy"), + Some(&"TASK".to_string()) + ); + + assert_eq!( + fte_config + .exchange_manager_properties + .get("exchange-manager.name"), + Some(&"filesystem".to_string()) + ); + assert_eq!( + fte_config + .exchange_manager_properties + .get("exchange.base-directories"), + Some(&"s3://my-bucket/exchange".to_string()) + ); + + assert_eq!( + fte_config + .exchange_manager_properties + .get("exchange.s3.max-error-retries"), + Some(&"5".to_string()) + ); + assert_eq!( + fte_config + .exchange_manager_properties + .get("exchange.s3.upload.part-size"), + Some(&"10485760B".to_string()) + ); + } +} diff --git a/rust/operator-binary/src/config/mod.rs b/rust/operator-binary/src/config/mod.rs index 271c6d99..152d5080 100644 --- a/rust/operator-binary/src/config/mod.rs +++ b/rust/operator-binary/src/config/mod.rs @@ -1 +1,4 @@ +pub mod client_protocol; +pub mod fault_tolerant_execution; pub mod jvm; +pub mod s3; diff --git a/rust/operator-binary/src/config/s3.rs b/rust/operator-binary/src/config/s3.rs new file mode 100644 index 00000000..dedcbcd6 --- /dev/null +++ b/rust/operator-binary/src/config/s3.rs @@ -0,0 +1,124 @@ +use std::collections::BTreeMap; + +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + client::Client, + commons::tls_verification::{CaCert, TlsServerVerification, TlsVerification}, + crd::s3, + k8s_openapi::api::core::v1::{Volume, VolumeMount}, +}; + +use crate::{command, crd::STACKABLE_CLIENT_TLS_DIR}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to resolve S3 connection"))] + S3Connection { + source: s3::v1alpha1::ConnectionError, + }, + + #[snafu(display("trino does not support disabling the TLS verification of S3 servers"))] + S3TlsNoVerificationNotSupported, + + #[snafu(display("failed to convert data size for [{field}] to bytes"))] + QuantityConversion { + source: stackable_operator::memory::Error, + field: &'static str, + }, +} + +pub struct ResolvedS3Config { + /// Properties to add to config.properties + pub properties: BTreeMap, + + /// Volumes required for the configuration (e.g., for S3 credentials) + pub volumes: Vec, + + /// Volume mounts required for the configuration + pub volume_mounts: Vec, + + /// Additional commands that need to be executed before starting Trino + /// Used to add TLS certificates to the client's trust store. + pub init_container_extra_start_commands: Vec, +} + +impl ResolvedS3Config { + /// Resolve S3 connection properties from Kubernetes resources + /// and prepare spooling filesystem configuration. + pub async fn from_config( + connection: &stackable_operator::crd::s3::v1alpha1::InlineConnectionOrReference, + client: &Client, + namespace: &str, + ) -> Result { + let mut resolved_config = Self { + properties: BTreeMap::new(), + volumes: Vec::new(), + volume_mounts: Vec::new(), + init_container_extra_start_commands: Vec::new(), + }; + + let s3_connection = connection + .clone() + .resolve(client, namespace) + .await + .context(S3ConnectionSnafu)?; + + let (volumes, mounts) = s3_connection + .volumes_and_mounts() + .context(S3ConnectionSnafu)?; + resolved_config.volumes.extend(volumes); + resolved_config.volume_mounts.extend(mounts); + + resolved_config + .properties + .insert("s3.region".to_string(), s3_connection.region.name.clone()); + resolved_config.properties.insert( + "s3.endpoint".to_string(), + s3_connection + .endpoint() + .context(S3ConnectionSnafu)? + .to_string(), + ); + resolved_config.properties.insert( + "s3.path-style-access".to_string(), + (s3_connection.access_style == s3::v1alpha1::S3AccessStyle::Path).to_string(), + ); + + if let Some((access_key_path, secret_key_path)) = s3_connection.credentials_mount_paths() { + resolved_config.properties.extend([ + ( + "s3.aws-access-key".to_string(), + format!("${{file:UTF-8:{access_key_path}}}"), + ), + ( + "s3.aws-secret-key".to_string(), + format!("${{file:UTF-8:{secret_key_path}}}"), + ), + ]); + } + + if let Some(tls) = s3_connection.tls.tls.as_ref() { + match &tls.verification { + TlsVerification::None {} => return S3TlsNoVerificationNotSupportedSnafu.fail(), + TlsVerification::Server(TlsServerVerification { + ca_cert: CaCert::WebPki {}, + }) => {} + TlsVerification::Server(TlsServerVerification { + ca_cert: CaCert::SecretClass(_), + }) => { + if let Some(ca_cert) = s3_connection.tls.tls_ca_cert_mount_path() { + resolved_config.init_container_extra_start_commands.extend( + command::add_cert_to_truststore( + &ca_cert, + STACKABLE_CLIENT_TLS_DIR, + "resolved-s3-ca-cert", + ), + ); + } + } + } + } + + Ok(resolved_config) + } +} diff --git a/rust/operator-binary/src/controller.rs b/rust/operator-binary/src/controller.rs index 58e4775e..d07a817b 100644 --- a/rust/operator-binary/src/controller.rs +++ b/rust/operator-binary/src/controller.rs @@ -79,18 +79,18 @@ use crate::{ authorization::opa::TrinoOpaConfig, catalog::{FromTrinoCatalogError, config::CatalogConfig}, command, config, + config::{client_protocol, fault_tolerant_execution}, crd::{ ACCESS_CONTROL_PROPERTIES, APP_NAME, CONFIG_DIR_NAME, CONFIG_PROPERTIES, Container, - DISCOVERY_URI, ENV_INTERNAL_SECRET, EXCHANGE_MANAGER_PROPERTIES, HTTP_PORT, HTTP_PORT_NAME, - HTTPS_PORT, HTTPS_PORT_NAME, JVM_CONFIG, JVM_SECURITY_PROPERTIES, LOG_PROPERTIES, - MAX_TRINO_LOG_FILES_SIZE, METRICS_PORT, METRICS_PORT_NAME, NODE_PROPERTIES, - RW_CONFIG_DIR_NAME, STACKABLE_CLIENT_TLS_DIR, STACKABLE_INTERNAL_TLS_DIR, - STACKABLE_MOUNT_INTERNAL_TLS_DIR, STACKABLE_MOUNT_SERVER_TLS_DIR, STACKABLE_SERVER_TLS_DIR, - TrinoRole, + DISCOVERY_URI, ENV_INTERNAL_SECRET, ENV_SPOOLING_SECRET, EXCHANGE_MANAGER_PROPERTIES, + HTTP_PORT, HTTP_PORT_NAME, HTTPS_PORT, HTTPS_PORT_NAME, JVM_CONFIG, + JVM_SECURITY_PROPERTIES, LOG_PROPERTIES, MAX_TRINO_LOG_FILES_SIZE, METRICS_PORT, + METRICS_PORT_NAME, NODE_PROPERTIES, RW_CONFIG_DIR_NAME, SPOOLING_MANAGER_PROPERTIES, + STACKABLE_CLIENT_TLS_DIR, STACKABLE_INTERNAL_TLS_DIR, STACKABLE_MOUNT_INTERNAL_TLS_DIR, + STACKABLE_MOUNT_SERVER_TLS_DIR, STACKABLE_SERVER_TLS_DIR, TrinoRole, authentication::resolve_authentication_classes, catalog, discovery::{TrinoDiscovery, TrinoDiscoveryProtocol, TrinoPodRef}, - fault_tolerant_execution::ResolvedFaultTolerantExecutionConfig, rolegroup_headless_service_name, v1alpha1, }, listener::{ @@ -133,6 +133,12 @@ pub enum Error { #[snafu(display("object defines no namespace"))] ObjectHasNoNamespace, + #[snafu(display("trino cluster {name:?} has no namespace"))] + MissingTrinoNamespace { + source: crate::crd::Error, + name: String, + }, + #[snafu(display("object defines no {role:?} role"))] MissingTrinoRole { source: crate::crd::Error, @@ -272,17 +278,17 @@ pub enum Error { source: stackable_operator::commons::rbac::Error, }, - #[snafu(display("Failed to retrieve AuthenticationClass"))] + #[snafu(display("failed to retrieve AuthenticationClass"))] AuthenticationClassRetrieval { source: crate::crd::authentication::Error, }, - #[snafu(display("Unsupported Trino authentication"))] + #[snafu(display("unsupported Trino authentication"))] UnsupportedAuthenticationConfig { source: crate::authentication::Error, }, - #[snafu(display("Invalid Trino authentication"))] + #[snafu(display("invalid Trino authentication"))] InvalidAuthenticationConfig { source: crate::authentication::Error, }, @@ -305,7 +311,7 @@ pub enum Error { #[snafu(display("failed to configure fault tolerant execution"))] FaultTolerantExecution { - source: crate::crd::fault_tolerant_execution::Error, + source: fault_tolerant_execution::Error, }, #[snafu(display("failed to get required Labels"))] @@ -374,6 +380,14 @@ pub enum Error { ResolveProductImage { source: product_image_selection::Error, }, + + #[snafu(display("failed to resolve client protocol configuration"))] + ClientProtocolConfiguration { source: client_protocol::Error }, + + #[snafu(display( + "client spooling protocol is not supported for Trino version {product_version}" + ))] + ClientSpoolingProtocolTrinoVersion { product_version: String }, } type Result = std::result::Result; @@ -397,6 +411,10 @@ pub async fn reconcile_trino( .context(InvalidTrinoClusterSnafu)?; let client = &ctx.client; + let namespace = trino.namespace_r().context(MissingTrinoNamespaceSnafu { + name: trino.name_any(), + })?; + let resolved_product_image = trino .spec .image @@ -443,10 +461,10 @@ pub async fn reconcile_trino( // Resolve fault tolerant execution configuration with S3 connections if needed let resolved_fte_config = match trino.spec.cluster_config.fault_tolerant_execution.as_ref() { Some(fte_config) => Some( - ResolvedFaultTolerantExecutionConfig::from_config( + fault_tolerant_execution::ResolvedFaultTolerantExecutionConfig::from_config( fte_config, Some(client), - &trino.namespace_r().context(ReadRoleSnafu)?, + &namespace, ) .await .context(FaultTolerantExecutionSnafu)?, @@ -454,6 +472,27 @@ pub async fn reconcile_trino( None => None, }; + // Resolve client spooling protocol configuration with S3 connections if needed + let resolved_client_protocol_config = match trino.spec.cluster_config.client_protocol.as_ref() { + Some(spooling_config) => Some( + client_protocol::ResolvedClientProtocolConfig::from_config( + spooling_config, + Some(client), + &namespace, + ) + .await + .context(ClientProtocolConfigurationSnafu)?, + ), + None => None, + }; + if resolved_client_protocol_config.is_some() + && resolved_product_image.product_version.starts_with("45") + { + return Err(Error::ClientSpoolingProtocolTrinoVersion { + product_version: resolved_product_image.product_version, + }); + } + let validated_config = validated_product_config( trino, // The Trino version is a single number like 396. @@ -500,7 +539,25 @@ pub async fn reconcile_trino( None => None, }; - create_shared_internal_secret(trino, client).await?; + create_random_secret( + &shared_internal_secret_name(trino), + ENV_INTERNAL_SECRET, + 512, + trino, + client, + ) + .await?; + + // This secret is created even if spooling is not configured. + // Trino currently requires the secret to be exactly 256 bits long. + create_random_secret( + &shared_spooling_secret_name(trino), + ENV_SPOOLING_SECRET, + 32, + trino, + client, + ) + .await?; let mut sts_cond_builder = StatefulSetConditionBuilder::default(); @@ -557,6 +614,7 @@ pub async fn reconcile_trino( &trino_opa_config, &client.kubernetes_cluster_info, &resolved_fte_config, + &resolved_client_protocol_config, )?; let rg_catalog_configmap = build_rolegroup_catalog_config_map( trino, @@ -575,6 +633,7 @@ pub async fn reconcile_trino( &catalogs, &rbac_sa.name_any(), &resolved_fte_config, + &resolved_client_protocol_config, )?; cluster_resources @@ -683,7 +742,8 @@ fn build_rolegroup_config_map( trino_authentication_config: &TrinoAuthenticationConfig, trino_opa_config: &Option, cluster_info: &KubernetesClusterInfo, - resolved_fte_config: &Option, + resolved_fte_config: &Option, + resolved_spooling_config: &Option, ) -> Result { let mut cm_conf_data = BTreeMap::new(); @@ -755,6 +815,16 @@ fn build_rolegroup_config_map( ); } + // Add spooling properties from resolved configuration + if let Some(resolved_spooling) = resolved_spooling_config { + dynamic_resolved_config.extend( + resolved_spooling + .config_properties + .iter() + .map(|(k, v)| (k.clone(), Some(v.clone()))), + ); + } + // Add static properties and overrides dynamic_resolved_config.extend(transformed_config); @@ -813,28 +883,54 @@ fn build_rolegroup_config_map( } } PropertyNameKind::File(file_name) if file_name == JVM_CONFIG => {} + PropertyNameKind::File(file_name) if file_name == SPOOLING_MANAGER_PROPERTIES => { + // Add automatic properties for the spooling protocol + if let Some(spooling_config) = resolved_spooling_config { + dynamic_resolved_config = spooling_config + .spooling_manager_properties + .iter() + .map(|(k, v)| (k.clone(), Some(v.clone()))) + .collect(); + } + + // Override automatic properties with user provided configuration for the spooling protocol + dynamic_resolved_config.extend(transformed_config); + + if !dynamic_resolved_config.is_empty() { + cm_conf_data.insert( + file_name.to_string(), + to_java_properties_string(dynamic_resolved_config.iter()) + .with_context(|_| FailedToWriteJavaPropertiesSnafu)?, + ); + } + } + PropertyNameKind::File(file_name) if file_name == EXCHANGE_MANAGER_PROPERTIES => { + // Add exchange manager properties from resolved fault tolerant execution configuration + if let Some(resolved_fte) = resolved_fte_config { + dynamic_resolved_config = resolved_fte + .exchange_manager_properties + .iter() + .map(|(k, v)| (k.clone(), Some(v.clone()))) + .collect(); + } + + // Override automatic properties with user provided configuration for the spooling protocol + dynamic_resolved_config.extend(transformed_config); + + if !dynamic_resolved_config.is_empty() { + cm_conf_data.insert( + file_name.to_string(), + to_java_properties_string(dynamic_resolved_config.iter()) + .with_context(|_| FailedToWriteJavaPropertiesSnafu)?, + ); + } + } _ => {} } } cm_conf_data.insert(JVM_CONFIG.to_string(), jvm_config.to_string()); - // Add exchange manager properties from resolved fault tolerant execution configuration - if let Some(resolved_fte) = resolved_fte_config { - if !resolved_fte.exchange_manager_properties.is_empty() { - let exchange_props_with_options: BTreeMap> = resolved_fte - .exchange_manager_properties - .iter() - .map(|(k, v)| (k.clone(), Some(v.clone()))) - .collect(); - cm_conf_data.insert( - EXCHANGE_MANAGER_PROPERTIES.to_string(), - to_java_properties_string(exchange_props_with_options.iter()) - .with_context(|_| FailedToWriteJavaPropertiesSnafu)?, - ); - } - } - let jvm_sec_props: BTreeMap> = config .get(&PropertyNameKind::File(JVM_SECURITY_PROPERTIES.to_string())) .cloned() @@ -943,7 +1039,8 @@ fn build_rolegroup_statefulset( trino_authentication_config: &TrinoAuthenticationConfig, catalogs: &[CatalogConfig], sa_name: &str, - resolved_fte_config: &Option, + resolved_fte_config: &Option, + resolved_spooling_config: &Option, ) -> Result { let role = trino .role(trino_role) @@ -971,8 +1068,19 @@ fn build_rolegroup_statefulset( // additional authentication env vars let mut env = trino_authentication_config.env_vars(trino_role, &Container::Trino); - let secret_name = build_shared_internal_secret_name(trino); - env.push(env_var_from_secret(&secret_name, None, ENV_INTERNAL_SECRET)); + let internal_secret_name = shared_internal_secret_name(trino); + env.push(env_var_from_secret( + &internal_secret_name, + None, + ENV_INTERNAL_SECRET, + )); + + let spooling_secret_name = shared_spooling_secret_name(trino); + env.push(env_var_from_secret( + &spooling_secret_name, + None, + ENV_SPOOLING_SECRET, + )); trino_authentication_config .add_authentication_pod_and_volume_config( @@ -1035,6 +1143,7 @@ fn build_rolegroup_statefulset( catalogs, &requested_secret_lifetime, resolved_fte_config, + resolved_spooling_config, )?; let mut prepare_args = vec![]; @@ -1054,6 +1163,7 @@ fn build_rolegroup_statefulset( catalogs, merged_config, resolved_fte_config, + resolved_spooling_config, )); prepare_args @@ -1118,12 +1228,7 @@ fn build_rolegroup_statefulset( "-c".to_string(), ]) .args(vec![ - command::container_trino_args( - trino_authentication_config, - catalogs, - resolved_fte_config, - ) - .join("\n"), + command::container_trino_args(trino_authentication_config, catalogs).join("\n"), ]) .add_env_vars(env) .add_volume_mount("config", CONFIG_DIR_NAME) @@ -1352,6 +1457,8 @@ fn validated_product_config( PropertyNameKind::File(LOG_PROPERTIES.to_string()), PropertyNameKind::File(JVM_SECURITY_PROPERTIES.to_string()), PropertyNameKind::File(ACCESS_CONTROL_PROPERTIES.to_string()), + PropertyNameKind::File(SPOOLING_MANAGER_PROPERTIES.to_string()), + PropertyNameKind::File(EXCHANGE_MANAGER_PROPERTIES.to_string()), ]; let coordinator_role = TrinoRole::Coordinator; @@ -1404,11 +1511,28 @@ fn build_recommended_labels<'a>( } } -async fn create_shared_internal_secret( +async fn create_random_secret( + secret_name: &str, + secret_key: &str, + secret_byte_size: usize, trino: &v1alpha1::TrinoCluster, client: &Client, ) -> Result<()> { - let secret = build_shared_internal_secret(trino)?; + let mut internal_secret = BTreeMap::new(); + internal_secret.insert(secret_key.to_string(), get_random_base64(secret_byte_size)); + + let secret = Secret { + immutable: Some(true), + metadata: ObjectMetaBuilder::new() + .name(secret_name) + .namespace_opt(trino.namespace()) + .ownerreference_from_resource(trino, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .build(), + string_data: Some(internal_secret), + ..Secret::default() + }; + if client .get_opt::( &secret.name_any(), @@ -1430,29 +1554,18 @@ async fn create_shared_internal_secret( Ok(()) } -fn build_shared_internal_secret(trino: &v1alpha1::TrinoCluster) -> Result { - let mut internal_secret = BTreeMap::new(); - internal_secret.insert(ENV_INTERNAL_SECRET.to_string(), get_random_base64()); - - Ok(Secret { - immutable: Some(true), - metadata: ObjectMetaBuilder::new() - .name(build_shared_internal_secret_name(trino)) - .namespace_opt(trino.namespace()) - .ownerreference_from_resource(trino, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .build(), - string_data: Some(internal_secret), - ..Secret::default() - }) +fn shared_internal_secret_name(trino: &v1alpha1::TrinoCluster) -> String { + format!("{}-internal-secret", trino.name_any()) } -fn build_shared_internal_secret_name(trino: &v1alpha1::TrinoCluster) -> String { - format!("{}-internal-secret", trino.name_any()) +fn shared_spooling_secret_name(trino: &v1alpha1::TrinoCluster) -> String { + format!("{}-spooling-secret", trino.name_any()) } -fn get_random_base64() -> String { - let mut buf = [0; 512]; +// TODO: Maybe switch to something non-openssl. +// See https://github.com/stackabletech/airflow-operator/pull/686#discussion_r2348354468 (which is currently under discussion) +fn get_random_base64(byte_size: usize) -> String { + let mut buf: Vec = vec![0; byte_size]; openssl::rand::rand_bytes(&mut buf).unwrap(); openssl::base64::encode_block(&buf) } @@ -1600,7 +1713,8 @@ fn tls_volume_mounts( cb_trino: &mut ContainerBuilder, catalogs: &[CatalogConfig], requested_secret_lifetime: &Duration, - resolved_fte_config: &Option, + resolved_fte_config: &Option, + resolved_spooling_config: &Option, ) -> Result<()> { if let Some(server_tls) = trino.get_server_tls() { cb_prepare @@ -1693,6 +1807,19 @@ fn tls_volume_mounts( .context(AddVolumeSnafu)?; } + // client spooling S3 credentials and other resources + if let Some(resolved_spooling) = resolved_spooling_config { + cb_prepare + .add_volume_mounts(resolved_spooling.volume_mounts.clone()) + .context(AddVolumeMountSnafu)?; + cb_trino + .add_volume_mounts(resolved_spooling.volume_mounts.clone()) + .context(AddVolumeMountSnafu)?; + pod_builder + .add_volumes(resolved_spooling.volumes.clone()) + .context(AddVolumeSnafu)?; + } + Ok(()) } @@ -1701,9 +1828,16 @@ mod tests { use stackable_operator::commons::networking::DomainName; use super::*; + use crate::{ + config::{ + client_protocol::ResolvedClientProtocolConfig, + fault_tolerant_execution::ResolvedFaultTolerantExecutionConfig, + }, + crd::v1alpha1::TrinoCluster, + }; - #[test] - fn test_config_overrides() { + #[tokio::test] + async fn test_config_overrides() { let trino_yaml = r#" apiVersion: trino.stackable.tech/v1alpha1 kind: TrinoCluster @@ -1737,7 +1871,7 @@ mod tests { default: replicas: 1 "#; - let cm = build_config_map(trino_yaml).data.unwrap(); + let cm = build_config_map(trino_yaml).await.data.unwrap(); let config = cm.get("config.properties").unwrap(); assert!(config.contains("foo=bar")); assert!(config.contains("level=role-group")); @@ -1760,9 +1894,63 @@ mod tests { assert!(cm.contains_key("access-control.properties")); } - fn build_config_map(trino_yaml: &str) -> ConfigMap { - let mut trino: v1alpha1::TrinoCluster = - serde_yaml::from_str(trino_yaml).expect("illegal test input"); + #[tokio::test] + async fn test_client_protocol_config_overrides() { + let trino_yaml = r#" + apiVersion: trino.stackable.tech/v1alpha1 + kind: TrinoCluster + metadata: + name: simple-trino + spec: + image: + productVersion: "470" + clusterConfig: + catalogLabelSelector: + matchLabels: + trino: simple-trino + clientProtocol: + spooling: + location: s3://my-bucket/spooling + filesystem: + s3: + connection: + reference: test-s3-connection + coordinators: + configOverrides: + config.properties: + foo: bar + spooling-manager.properties: + fs.location: s3a://role-level + roleGroups: + default: + replicas: 1 + configOverrides: + spooling-manager.properties: + fs.location: s3a://role-group-level + workers: + roleGroups: + default: + replicas: 1 + "#; + + let cm = build_config_map(trino_yaml).await.data.unwrap(); + let config = cm.get("config.properties").unwrap(); + assert!(config.contains("protocol.spooling.enabled=true")); + assert!(config.contains(&format!( + "protocol.spooling.shared-secret-key=${{ENV\\:{ENV_SPOOLING_SECRET}}}" + ))); + assert!(config.contains("foo=bar")); + + let config = cm.get("spooling-manager.properties").unwrap(); + assert!(config.contains("fs.location=s3a\\://role-group-level")); + assert!(config.contains("spooling-manager.name=filesystem")); + } + + async fn build_config_map(trino_yaml: &str) -> ConfigMap { + let deserializer = serde_yaml::Deserializer::from_str(trino_yaml); + let mut trino: TrinoCluster = + serde_yaml::with::singleton_map_recursive::deserialize(deserializer) + .expect("invalid test input"); trino.metadata.namespace = Some("default".to_owned()); trino.metadata.uid = Some("42".to_owned()); let cluster_info = KubernetesClusterInfo { @@ -1781,6 +1969,8 @@ mod tests { PropertyNameKind::File(LOG_PROPERTIES.to_string()), PropertyNameKind::File(JVM_SECURITY_PROPERTIES.to_string()), PropertyNameKind::File(ACCESS_CONTROL_PROPERTIES.to_string()), + PropertyNameKind::File(SPOOLING_MANAGER_PROPERTIES.to_string()), + PropertyNameKind::File(EXCHANGE_MANAGER_PROPERTIES.to_string()), ]; let validated_config = validate_all_roles_and_groups_config( // The Trino version is a single number like 396. @@ -1844,6 +2034,30 @@ mod tests { ), allow_permission_management_operations: true, }); + let resolved_fte_config = match &trino.spec.cluster_config.fault_tolerant_execution { + Some(fault_tolerant_execution) => Some( + ResolvedFaultTolerantExecutionConfig::from_config( + fault_tolerant_execution, + None, + &trino.namespace().unwrap(), + ) + .await + .unwrap(), + ), + None => None, + }; + let resolved_spooling_config = match &trino.spec.cluster_config.client_protocol { + Some(client_protocol) => Some( + ResolvedClientProtocolConfig::from_config( + client_protocol, + None, + &trino.namespace().unwrap(), + ) + .await + .unwrap(), + ), + None => None, + }; let merged_config = trino .merged_config(&trino_role, &rolegroup_ref, &[]) .unwrap(); @@ -1863,13 +2077,14 @@ mod tests { &trino_authentication_config, &trino_opa_config, &cluster_info, - &None, + &resolved_fte_config, + &resolved_spooling_config, ) .unwrap() } - #[test] - fn test_access_control_overrides() { + #[tokio::test] + async fn test_access_control_overrides() { let trino_yaml = r#" apiVersion: trino.stackable.tech/v1alpha1 kind: TrinoCluster @@ -1906,7 +2121,7 @@ mod tests { replicas: 1 "#; - let cm = build_config_map(trino_yaml).data.unwrap(); + let cm = build_config_map(trino_yaml).await.data.unwrap(); let access_control_config = cm.get("access-control.properties").unwrap(); assert!(access_control_config.contains("access-control.name=opa")); diff --git a/rust/operator-binary/src/crd/authentication.rs b/rust/operator-binary/src/crd/authentication.rs index 8c6fd227..f7b31262 100644 --- a/rust/operator-binary/src/crd/authentication.rs +++ b/rust/operator-binary/src/crd/authentication.rs @@ -7,12 +7,12 @@ use stackable_operator::{ #[derive(Snafu, Debug)] pub enum Error { - #[snafu(display("Failed to retrieve AuthenticationClass"))] + #[snafu(display("failed to retrieve AuthenticationClass"))] AuthenticationClassRetrieval { source: stackable_operator::client::Error, }, - #[snafu(display("Invalid OIDC configuration"))] + #[snafu(display("invalid OIDC configuration"))] InvalidOidcConfiguration { source: stackable_operator::crd::authentication::core::v1alpha1::Error, }, diff --git a/rust/operator-binary/src/crd/client_protocol.rs b/rust/operator-binary/src/crd/client_protocol.rs new file mode 100644 index 00000000..19d4dcae --- /dev/null +++ b/rust/operator-binary/src/crd/client_protocol.rs @@ -0,0 +1,35 @@ +/// This module manages the client protocol properties, especially the for spooling. +/// Trino documentation is available here: https://trino.io/docs/current/client/client-protocol.html +use serde::{Deserialize, Serialize}; +use stackable_operator::schemars::{self, JsonSchema}; + +#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum ClientProtocolConfig { + Spooling(ClientSpoolingProtocolConfig), +} +#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ClientSpoolingProtocolConfig { + // Spool segment location. Each Trino cluster must have its own + // location independent of any other clusters. + pub location: String, + + // Spooling filesystem properties. Only S3 is supported. + pub filesystem: SpoolingFileSystemConfig, +} + +#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum SpoolingFileSystemConfig { + S3(S3FilesystemConfig), +} + +// This adds a `connection` property to keep the structure consistent with the fault-tolerant execution +// config. It is similar to the `crate::crd::fault_tolerant_execution::S3ExchangeConfig` and maybe +// these two structures can be merged in the future. +#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct S3FilesystemConfig { + pub connection: stackable_operator::crd::s3::v1alpha1::InlineConnectionOrReference, +} diff --git a/rust/operator-binary/src/crd/fault_tolerant_execution.rs b/rust/operator-binary/src/crd/fault_tolerant_execution.rs index a58ab302..595cea50 100644 --- a/rust/operator-binary/src/crd/fault_tolerant_execution.rs +++ b/rust/operator-binary/src/crd/fault_tolerant_execution.rs @@ -5,28 +5,14 @@ //! //! Based on the Trino documentation: -use std::collections::{BTreeMap, HashMap}; - use serde::{Deserialize, Serialize}; -use snafu::Snafu; use stackable_operator::{ - builder::pod::volume::{VolumeBuilder, VolumeMountBuilder}, - client::Client, - commons::tls_verification::{CaCert, TlsServerVerification, TlsVerification}, - crd::s3, - k8s_openapi::{ - api::core::v1::{Volume, VolumeMount}, - apimachinery::pkg::api::resource::Quantity, - }, + k8s_openapi::apimachinery::pkg::api::resource::Quantity, schemars::{self, JsonSchema}, shared::time::Duration, }; use super::catalog::commons::HdfsConnection; -use crate::{ - command, - crd::{CONFIG_DIR_NAME, STACKABLE_CLIENT_TLS_DIR}, -}; #[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] @@ -124,10 +110,6 @@ pub struct ExchangeManagerConfig { /// Backend-specific configuration. #[serde(flatten)] pub backend: ExchangeManagerBackend, - - /// The `configOverrides` allow overriding arbitrary exchange manager properties. - #[serde(default)] - pub config_overrides: HashMap, } #[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] @@ -149,18 +131,6 @@ pub struct S3ExchangeConfig { /// S3 bucket URIs for spooling data (e.g., s3://bucket1,s3://bucket2). pub base_directories: Vec, - /// S3 connection configuration. - /// Learn more about S3 configuration in the [S3 concept docs](DOCS_BASE_URL_PLACEHOLDER/concepts/s3). - pub connection: stackable_operator::crd::s3::v1alpha1::InlineConnectionOrReference, - - /// IAM role to assume for S3 access. - #[serde(skip_serializing_if = "Option::is_none")] - pub iam_role: Option, - - /// External ID for the IAM role trust policy. - #[serde(skip_serializing_if = "Option::is_none")] - pub external_id: Option, - /// Maximum number of times the S3 client should retry a request. #[serde(skip_serializing_if = "Option::is_none")] pub max_error_retries: Option, @@ -168,6 +138,10 @@ pub struct S3ExchangeConfig { /// Part data size for S3 multi-part upload. #[serde(skip_serializing_if = "Option::is_none")] pub upload_part_size: Option, + + /// S3 connection configuration. + /// Learn more about S3 configuration in the [S3 concept docs](DOCS_BASE_URL_PLACEHOLDER/concepts/s3). + pub connection: stackable_operator::crd::s3::v1alpha1::InlineConnectionOrReference, } #[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] @@ -194,665 +168,3 @@ pub struct LocalExchangeConfig { /// Local filesystem paths for exchange storage. pub base_directories: Vec, } - -#[derive(Snafu, Debug)] -pub enum Error { - #[snafu(display("Failed to resolve S3 connection"))] - S3Connection { - source: s3::v1alpha1::ConnectionError, - }, - - #[snafu(display("trino does not support disabling the TLS verification of S3 servers"))] - S3TlsNoVerificationNotSupported, - - #[snafu(display("failed to convert data size for [{field}] to bytes"))] - QuantityConversion { - source: stackable_operator::memory::Error, - field: &'static str, - }, -} - -/// Fault tolerant execution configuration with external resources resolved -pub struct ResolvedFaultTolerantExecutionConfig { - /// Properties to add to config.properties - pub config_properties: BTreeMap, - - /// Properties to add to exchange-manager.properties (if needed) - pub exchange_manager_properties: BTreeMap, - - /// Volumes required for the configuration (e.g., for S3 credentials) - pub volumes: Vec, - - /// Volume mounts required for the configuration - pub volume_mounts: Vec, - - /// Env-Vars that should be exported from files. - /// You can think of it like `export ="$(cat )"` - pub load_env_from_files: BTreeMap, - - /// Additional commands that need to be executed before starting Trino - pub init_container_extra_start_commands: Vec, -} - -impl ResolvedFaultTolerantExecutionConfig { - /// Helper function to insert optional values into properties map - fn insert_if_present( - properties: &mut BTreeMap, - key: &str, - value: Option, - ) { - if let Some(v) = value { - properties.insert(key.to_string(), v.to_string()); - } - } - - /// Helper function to insert optional Quantity values after converting to Trino bytes string - fn insert_quantity_if_present( - properties: &mut BTreeMap, - key: &'static str, - quantity: Option<&Quantity>, - ) -> Result<(), Error> { - if let Some(q) = quantity { - use snafu::ResultExt; - let v = crate::crd::quantity_to_trino_bytes(q) - .context(QuantityConversionSnafu { field: key })?; - properties.insert(key.to_string(), v); - } - Ok(()) - } - - /// Create a resolved fault tolerant execution configuration from the cluster config - pub async fn from_config( - config: &FaultTolerantExecutionConfig, - client: Option<&Client>, - namespace: &str, - ) -> Result { - let mut config_properties = BTreeMap::new(); - - // Handle different retry policies and their configurations - let (retry_policy_str, exchange_manager_opt) = match config { - FaultTolerantExecutionConfig::Query(query_config) => { - // Set query-specific properties - Self::insert_if_present( - &mut config_properties, - "query-retry-attempts", - query_config.retry_attempts, - ); - Self::insert_if_present( - &mut config_properties, - "retry-initial-delay", - query_config - .retry_initial_delay - .as_ref() - .map(|d| format!("{}s", d.as_secs())), - ); - Self::insert_if_present( - &mut config_properties, - "retry-max-delay", - query_config - .retry_max_delay - .as_ref() - .map(|d| format!("{}s", d.as_secs())), - ); - Self::insert_if_present( - &mut config_properties, - "retry-delay-scale-factor", - query_config.retry_delay_scale_factor.as_ref(), - ); - Self::insert_quantity_if_present( - &mut config_properties, - "exchange.deduplication-buffer-size", - query_config.exchange_deduplication_buffer_size.as_ref(), - )?; - - ("QUERY", query_config.exchange_manager.as_ref()) - } - FaultTolerantExecutionConfig::Task(task_config) => { - // Set task-specific properties - Self::insert_if_present( - &mut config_properties, - "task-retry-attempts-per-task", - task_config.retry_attempts_per_task, - ); - Self::insert_if_present( - &mut config_properties, - "retry-initial-delay", - task_config - .retry_initial_delay - .as_ref() - .map(|d| format!("{}s", d.as_secs())), - ); - Self::insert_if_present( - &mut config_properties, - "retry-max-delay", - task_config - .retry_max_delay - .as_ref() - .map(|d| format!("{}s", d.as_secs())), - ); - Self::insert_if_present( - &mut config_properties, - "retry-delay-scale-factor", - task_config.retry_delay_scale_factor.as_ref(), - ); - Self::insert_quantity_if_present( - &mut config_properties, - "exchange.deduplication-buffer-size", - task_config.exchange_deduplication_buffer_size.as_ref(), - )?; - - ("TASK", Some(&task_config.exchange_manager)) - } - }; - - config_properties.insert("retry-policy".to_string(), retry_policy_str.to_string()); - - let mut exchange_manager_properties = BTreeMap::new(); - if let Some(exchange_config) = exchange_manager_opt { - Self::insert_if_present( - &mut config_properties, - "fault-tolerant-execution.exchange-encryption-enabled", - exchange_config.encryption_enabled, - ); - Self::insert_if_present( - &mut exchange_manager_properties, - "exchange.sink-buffer-pool-min-size", - exchange_config.sink_buffer_pool_min_size, - ); - Self::insert_if_present( - &mut exchange_manager_properties, - "exchange.sink-buffers-per-partition", - exchange_config.sink_buffers_per_partition, - ); - Self::insert_quantity_if_present( - &mut exchange_manager_properties, - "exchange.sink-max-file-size", - exchange_config.sink_max_file_size.as_ref(), - )?; - Self::insert_if_present( - &mut exchange_manager_properties, - "exchange.source-concurrent-readers", - exchange_config.source_concurrent_readers, - ); - - // Add backend-specific configuration - match &exchange_config.backend { - ExchangeManagerBackend::S3(s3_config) => { - exchange_manager_properties.insert( - "exchange-manager.name".to_string(), - "filesystem".to_string(), - ); - exchange_manager_properties.insert( - "exchange.base-directories".to_string(), - s3_config.base_directories.join(","), - ); - - Self::insert_if_present( - &mut exchange_manager_properties, - "exchange.s3.iam-role", - s3_config.iam_role.as_ref(), - ); - Self::insert_if_present( - &mut exchange_manager_properties, - "exchange.s3.external-id", - s3_config.external_id.as_ref(), - ); - Self::insert_if_present( - &mut exchange_manager_properties, - "exchange.s3.max-error-retries", - s3_config.max_error_retries, - ); - Self::insert_quantity_if_present( - &mut exchange_manager_properties, - "exchange.s3.upload.part-size", - s3_config.upload_part_size.as_ref(), - )?; - } - ExchangeManagerBackend::Hdfs(hdfs_config) => { - exchange_manager_properties - .insert("exchange-manager.name".to_string(), "hdfs".to_string()); - exchange_manager_properties.insert( - "exchange.base-directories".to_string(), - hdfs_config.base_directories.join(","), - ); - - Self::insert_quantity_if_present( - &mut exchange_manager_properties, - "exchange.hdfs.block-size", - hdfs_config.block_size.as_ref(), - )?; - Self::insert_if_present( - &mut exchange_manager_properties, - "exchange.hdfs.skip-directory-scheme-validation", - hdfs_config.skip_directory_scheme_validation, - ); - - let hdfs_config_dir = format!("{CONFIG_DIR_NAME}/exchange-hdfs-config"); - exchange_manager_properties.insert( - "hdfs.config.resources".to_string(), - format!("{hdfs_config_dir}/core-site.xml,{hdfs_config_dir}/hdfs-site.xml"), - ); - } - ExchangeManagerBackend::Local(local_config) => { - exchange_manager_properties.insert( - "exchange-manager.name".to_string(), - "filesystem".to_string(), - ); - exchange_manager_properties.insert( - "exchange.base-directories".to_string(), - local_config.base_directories.join(","), - ); - } - } - - exchange_manager_properties.extend(exchange_config.config_overrides.clone()); - } - - let mut resolved_config = Self { - config_properties, - exchange_manager_properties, - volumes: Vec::new(), - volume_mounts: Vec::new(), - load_env_from_files: BTreeMap::new(), - init_container_extra_start_commands: Vec::new(), - }; - - // Resolve external resources if Kubernetes client is available - // This should always be the case, except for when this function is called during unit tests - if let (Some(client), Some(exchange_config)) = (client, exchange_manager_opt) { - match &exchange_config.backend { - ExchangeManagerBackend::S3(s3_config) => { - resolved_config - .resolve_s3_backend(s3_config, client, namespace) - .await?; - } - ExchangeManagerBackend::Hdfs(hdfs_config) => { - resolved_config.resolve_hdfs_backend(hdfs_config); - } - ExchangeManagerBackend::Local(_) => { - // Local backend requires no external resource resolution - } - } - } - - Ok(resolved_config) - } - - async fn resolve_s3_backend( - &mut self, - s3_config: &S3ExchangeConfig, - client: &Client, - namespace: &str, - ) -> Result<(), Error> { - use snafu::ResultExt; - - let s3_connection = s3_config - .connection - .clone() - .resolve(client, namespace) - .await - .context(S3ConnectionSnafu)?; - - let (volumes, mounts) = s3_connection - .volumes_and_mounts() - .context(S3ConnectionSnafu)?; - self.volumes.extend(volumes); - self.volume_mounts.extend(mounts); - - self.exchange_manager_properties.insert( - "exchange.s3.region".to_string(), - s3_connection.region.name.clone(), - ); - self.exchange_manager_properties.insert( - "exchange.s3.endpoint".to_string(), - s3_connection - .endpoint() - .context(S3ConnectionSnafu)? - .to_string(), - ); - self.exchange_manager_properties.insert( - "exchange.s3.path-style-access".to_string(), - (s3_connection.access_style == s3::v1alpha1::S3AccessStyle::Path).to_string(), - ); - - if let Some((access_key_path, secret_key_path)) = s3_connection.credentials_mount_paths() { - let access_key_env = "EXCHANGE_S3_AWS_ACCESS_KEY".to_string(); - let secret_key_env = "EXCHANGE_S3_AWS_SECRET_KEY".to_string(); - - self.exchange_manager_properties.insert( - "exchange.s3.aws-access-key".to_string(), - format!("${{ENV:{access_key_env}}}"), - ); - self.exchange_manager_properties.insert( - "exchange.s3.aws-secret-key".to_string(), - format!("${{ENV:{secret_key_env}}}"), - ); - - self.load_env_from_files - .insert(access_key_env, access_key_path); - self.load_env_from_files - .insert(secret_key_env, secret_key_path); - } - - if let Some(tls) = s3_connection.tls.tls.as_ref() { - match &tls.verification { - TlsVerification::None {} => return S3TlsNoVerificationNotSupportedSnafu.fail(), - TlsVerification::Server(TlsServerVerification { - ca_cert: CaCert::WebPki {}, - }) => {} - TlsVerification::Server(TlsServerVerification { - ca_cert: CaCert::SecretClass(_), - }) => { - if let Some(ca_cert) = s3_connection.tls.tls_ca_cert_mount_path() { - self.init_container_extra_start_commands.extend( - command::add_cert_to_truststore( - &ca_cert, - STACKABLE_CLIENT_TLS_DIR, - "exchange-s3-ca-cert", - ), - ); - } - } - } - } - - Ok(()) - } - - fn resolve_hdfs_backend(&mut self, hdfs_config: &HdfsExchangeConfig) { - let hdfs_config_dir = format!("{CONFIG_DIR_NAME}/exchange-hdfs-config"); - let volume_name = "exchange-hdfs-config".to_string(); - - self.volumes.push( - VolumeBuilder::new(&volume_name) - .with_config_map(&hdfs_config.hdfs.config_map) - .build(), - ); - self.volume_mounts - .push(VolumeMountBuilder::new(&volume_name, &hdfs_config_dir).build()); - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn test_query_retry_policy_without_exchange_manager() { - let config = FaultTolerantExecutionConfig::Query(QueryRetryConfig { - retry_attempts: Some(5), - retry_initial_delay: Some(Duration::from_secs(15)), - retry_max_delay: Some(Duration::from_secs(90)), - retry_delay_scale_factor: Some(3.0), - exchange_deduplication_buffer_size: Some(Quantity("64Mi".to_string())), - exchange_manager: None, - }); - - let fte_config = - ResolvedFaultTolerantExecutionConfig::from_config(&config, None, "default") - .await - .unwrap(); - - assert_eq!( - fte_config.config_properties.get("retry-policy"), - Some(&"QUERY".to_string()) - ); - assert_eq!( - fte_config.config_properties.get("query-retry-attempts"), - Some(&"5".to_string()) - ); - assert_eq!( - fte_config.config_properties.get("retry-initial-delay"), - Some(&"15s".to_string()) - ); - assert_eq!( - fte_config.config_properties.get("retry-max-delay"), - Some(&"90s".to_string()) - ); - assert_eq!( - fte_config.config_properties.get("retry-delay-scale-factor"), - Some(&"3".to_string()) - ); - assert_eq!( - fte_config - .config_properties - .get("exchange.deduplication-buffer-size"), - Some(&"67108864B".to_string()) - ); - } - - #[tokio::test] - async fn test_query_retry_policy_with_exchange_manager() { - let config = FaultTolerantExecutionConfig::Query(QueryRetryConfig { - retry_attempts: Some(3), - retry_initial_delay: Some(Duration::from_secs(10)), - retry_max_delay: Some(Duration::from_secs(60)), - retry_delay_scale_factor: Some(2.0), - exchange_deduplication_buffer_size: Some(Quantity("100Mi".to_string())), - exchange_manager: Some(ExchangeManagerConfig { - encryption_enabled: Some(true), - sink_buffer_pool_min_size: Some(10), - sink_buffers_per_partition: Some(2), - sink_max_file_size: Some(Quantity("1Gi".to_string())), - source_concurrent_readers: Some(4), - backend: ExchangeManagerBackend::Local(LocalExchangeConfig { - base_directories: vec!["/tmp/exchange".to_string()], - }), - config_overrides: HashMap::new(), - }), - }); - - let fte_config = - ResolvedFaultTolerantExecutionConfig::from_config(&config, None, "default") - .await - .unwrap(); - - assert_eq!( - fte_config.config_properties.get("retry-policy"), - Some(&"QUERY".to_string()) - ); - assert_eq!( - fte_config.config_properties.get("query-retry-attempts"), - Some(&"3".to_string()) - ); - assert_eq!( - fte_config.config_properties.get("retry-initial-delay"), - Some(&"10s".to_string()) - ); - assert_eq!( - fte_config.config_properties.get("retry-max-delay"), - Some(&"60s".to_string()) - ); - assert_eq!( - fte_config.config_properties.get("retry-delay-scale-factor"), - Some(&"2".to_string()) - ); - - assert_eq!( - fte_config - .exchange_manager_properties - .get("exchange-manager.name"), - Some(&"filesystem".to_string()) - ); - assert_eq!( - fte_config - .exchange_manager_properties - .get("exchange.base-directories"), - Some(&"/tmp/exchange".to_string()) - ); - assert_eq!( - fte_config - .config_properties - .get("exchange.deduplication-buffer-size"), - Some(&"104857600B".to_string()) - ); - assert_eq!( - fte_config - .config_properties - .get("fault-tolerant-execution.exchange-encryption-enabled"), - Some(&"true".to_string()) - ); - } - - #[tokio::test] - async fn test_task_retry_policy_with_s3_exchange_manager() { - let config = FaultTolerantExecutionConfig::Task(TaskRetryConfig { - retry_attempts_per_task: Some(2), - retry_initial_delay: None, - retry_max_delay: None, - retry_delay_scale_factor: None, - exchange_deduplication_buffer_size: None, - exchange_manager: ExchangeManagerConfig { - encryption_enabled: None, - sink_buffer_pool_min_size: Some(20), - sink_buffers_per_partition: Some(4), - sink_max_file_size: Some(Quantity("2Gi".to_string())), - source_concurrent_readers: Some(8), - backend: ExchangeManagerBackend::S3(S3ExchangeConfig { - base_directories: vec!["s3://my-bucket/exchange".to_string()], - connection: stackable_operator::crd::s3::v1alpha1::InlineConnectionOrReference::Reference( - "test-s3-connection".to_string() - ), - iam_role: Some("arn:aws:iam::123456789012:role/TrinoRole".to_string()), - external_id: Some("external-id-123".to_string()), - max_error_retries: Some(5), - upload_part_size: Some(Quantity("10Mi".to_string())), - }), - config_overrides: std::collections::HashMap::new(), - }, - }); - - let fte_config = - ResolvedFaultTolerantExecutionConfig::from_config(&config, None, "default") - .await - .unwrap(); - - assert_eq!( - fte_config.config_properties.get("retry-policy"), - Some(&"TASK".to_string()) - ); - assert_eq!( - fte_config - .config_properties - .get("task-retry-attempts-per-task"), - Some(&"2".to_string()) - ); - - assert_eq!( - fte_config - .exchange_manager_properties - .get("exchange-manager.name"), - Some(&"filesystem".to_string()) - ); - assert_eq!( - fte_config - .exchange_manager_properties - .get("exchange.base-directories"), - Some(&"s3://my-bucket/exchange".to_string()) - ); - - assert_eq!( - fte_config - .exchange_manager_properties - .get("exchange.s3.iam-role"), - Some(&"arn:aws:iam::123456789012:role/TrinoRole".to_string()) - ); - assert_eq!( - fte_config - .exchange_manager_properties - .get("exchange.s3.external-id"), - Some(&"external-id-123".to_string()) - ); - assert_eq!( - fte_config - .exchange_manager_properties - .get("exchange.s3.max-error-retries"), - Some(&"5".to_string()) - ); - assert_eq!( - fte_config - .exchange_manager_properties - .get("exchange.s3.upload.part-size"), - Some(&"10485760B".to_string()) - ); - assert_eq!( - fte_config - .exchange_manager_properties - .get("exchange.sink-buffer-pool-min-size"), - Some(&"20".to_string()) - ); - assert_eq!( - fte_config - .exchange_manager_properties - .get("exchange.sink-buffers-per-partition"), - Some(&"4".to_string()) - ); - assert_eq!( - fte_config - .exchange_manager_properties - .get("exchange.sink-max-file-size"), - Some(&"2147483648B".to_string()) - ); - assert_eq!( - fte_config - .exchange_manager_properties - .get("exchange.source-concurrent-readers"), - Some(&"8".to_string()) - ); - } - - #[tokio::test] - async fn test_exchange_manager_config_overrides() { - let mut config_overrides = HashMap::new(); - config_overrides.insert("custom.property".to_string(), "custom-value".to_string()); - config_overrides.insert( - "exchange.s3.upload.part-size".to_string(), - "overridden-value".to_string(), - ); - - let config = FaultTolerantExecutionConfig::Task(TaskRetryConfig { - retry_attempts_per_task: Some(2), - retry_initial_delay: None, - retry_max_delay: None, - retry_delay_scale_factor: None, - exchange_deduplication_buffer_size: None, - exchange_manager: ExchangeManagerConfig { - encryption_enabled: None, - sink_buffer_pool_min_size: None, - sink_buffers_per_partition: None, - sink_max_file_size: None, - source_concurrent_readers: None, - backend: ExchangeManagerBackend::S3(S3ExchangeConfig { - base_directories: vec!["s3://my-bucket/exchange".to_string()], - connection: stackable_operator::crd::s3::v1alpha1::InlineConnectionOrReference::Reference( - "test-s3-connection".to_string() - ), - iam_role: None, - external_id: None, - max_error_retries: None, - upload_part_size: Some(Quantity("10Mi".to_string())), - }), - config_overrides, - }, - }); - - let fte_config = - ResolvedFaultTolerantExecutionConfig::from_config(&config, None, "default") - .await - .unwrap(); - - assert_eq!( - fte_config - .exchange_manager_properties - .get("custom.property"), - Some(&"custom-value".to_string()) - ); - - assert_eq!( - fte_config - .exchange_manager_properties - .get("exchange.s3.upload.part-size"), - Some(&"overridden-value".to_string()) - ); - } -} diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index a5c69932..e3cd0aa9 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -1,6 +1,7 @@ pub mod affinity; pub mod authentication; pub mod catalog; +pub mod client_protocol; pub mod discovery; pub mod fault_tolerant_execution; @@ -61,6 +62,7 @@ pub const LOG_PROPERTIES: &str = "log.properties"; pub const ACCESS_CONTROL_PROPERTIES: &str = "access-control.properties"; pub const JVM_SECURITY_PROPERTIES: &str = "security.properties"; pub const EXCHANGE_MANAGER_PROPERTIES: &str = "exchange-manager.properties"; +pub const SPOOLING_MANAGER_PROPERTIES: &str = "spooling-manager.properties"; // node.properties pub const NODE_ENVIRONMENT: &str = "node.environment"; // config.properties @@ -105,6 +107,7 @@ pub const STACKABLE_TLS_STORE_PASSWORD: &str = "changeit"; pub const SYSTEM_TRUST_STORE_PASSWORD: &str = "changeit"; // secret vars pub const ENV_INTERNAL_SECRET: &str = "INTERNAL_SECRET"; +pub const ENV_SPOOLING_SECRET: &str = "SPOOLING_SECRET"; // TLS pub const TLS_DEFAULT_SECRET_CLASS: &str = "tls"; // Logging @@ -300,6 +303,10 @@ pub mod versioned { pub fault_tolerant_execution: Option, + /// Client spooling protocol configuration. + #[serde(skip_serializing_if = "Option::is_none")] + pub client_protocol: Option, + /// Name of the Vector aggregator [discovery ConfigMap](DOCS_BASE_URL_PLACEHOLDER/concepts/service_discovery). /// It must contain the key `ADDRESS` with the address of the Vector aggregator. /// Follow the [logging tutorial](DOCS_BASE_URL_PLACEHOLDER/tutorials/logging-vector-aggregator) diff --git a/rust/operator-binary/src/operations/graceful_shutdown.rs b/rust/operator-binary/src/operations/graceful_shutdown.rs index 5b00bbab..50bad7ae 100644 --- a/rust/operator-binary/src/operations/graceful_shutdown.rs +++ b/rust/operator-binary/src/operations/graceful_shutdown.rs @@ -15,7 +15,7 @@ use crate::crd::{ #[derive(Debug, Snafu)] pub enum Error { - #[snafu(display("Failed to set terminationGracePeriod"))] + #[snafu(display("failed to set terminationGracePeriod"))] SetTerminationGracePeriod { source: stackable_operator::builder::pod::Error, }, diff --git a/rust/operator-binary/src/operations/pdb.rs b/rust/operator-binary/src/operations/pdb.rs index bee6bac0..4f8a7bb7 100644 --- a/rust/operator-binary/src/operations/pdb.rs +++ b/rust/operator-binary/src/operations/pdb.rs @@ -13,13 +13,13 @@ use crate::{ #[derive(Snafu, Debug)] pub enum Error { - #[snafu(display("Cannot create PodDisruptionBudget for role [{role}]"))] + #[snafu(display("cannot create PodDisruptionBudget for role [{role}]"))] CreatePdb { source: stackable_operator::builder::pdb::Error, role: String, }, - #[snafu(display("Cannot apply PodDisruptionBudget [{name}]"))] + #[snafu(display("cannot apply PodDisruptionBudget [{name}]"))] ApplyPdb { source: stackable_operator::cluster_resources::Error, name: String, diff --git a/tests/templates/kuttl/client-spooling/00-assert.yaml b/tests/templates/kuttl/client-spooling/00-assert.yaml new file mode 100644 index 00000000..47bfe1ea --- /dev/null +++ b/tests/templates/kuttl/client-spooling/00-assert.yaml @@ -0,0 +1,19 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 300 +--- +apiVersion: v1 +kind: Secret +metadata: + name: minio-credentials +--- +apiVersion: v1 +kind: Secret +metadata: + name: minio-tls-certificates +--- +apiVersion: s3.stackable.tech/v1alpha1 +kind: S3Connection +metadata: + name: minio diff --git a/tests/templates/kuttl/client-spooling/00-install-vector-aggregator-discovery-configmap.yaml.j2 b/tests/templates/kuttl/client-spooling/00-install-vector-aggregator-discovery-configmap.yaml.j2 new file mode 100644 index 00000000..2d6a0df5 --- /dev/null +++ b/tests/templates/kuttl/client-spooling/00-install-vector-aggregator-discovery-configmap.yaml.j2 @@ -0,0 +1,9 @@ +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }} +{% endif %} diff --git a/tests/templates/kuttl/client-spooling/00-patch-ns.yaml.j2 b/tests/templates/kuttl/client-spooling/00-patch-ns.yaml.j2 new file mode 100644 index 00000000..67185acf --- /dev/null +++ b/tests/templates/kuttl/client-spooling/00-patch-ns.yaml.j2 @@ -0,0 +1,9 @@ +{% if test_scenario['values']['openshift'] == 'true' %} +# see https://github.com/stackabletech/issues/issues/566 +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl patch namespace $NAMESPACE -p '{"metadata":{"labels":{"pod-security.kubernetes.io/enforce":"privileged"}}}' + timeout: 120 +{% endif %} diff --git a/tests/templates/kuttl/client-spooling/00-rbac.yaml.j2 b/tests/templates/kuttl/client-spooling/00-rbac.yaml.j2 new file mode 100644 index 00000000..9cbf0351 --- /dev/null +++ b/tests/templates/kuttl/client-spooling/00-rbac.yaml.j2 @@ -0,0 +1,29 @@ +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: use-integration-tests-scc +rules: +{% if test_scenario['values']['openshift'] == "true" %} + - apiGroups: ["security.openshift.io"] + resources: ["securitycontextconstraints"] + resourceNames: ["privileged"] + verbs: ["use"] +{% endif %} +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: integration-tests-sa +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: use-integration-tests-scc +subjects: + - kind: ServiceAccount + name: integration-tests-sa +roleRef: + kind: Role + name: use-integration-tests-scc + apiGroup: rbac.authorization.k8s.io diff --git a/tests/templates/kuttl/client-spooling/00-secrets.yaml b/tests/templates/kuttl/client-spooling/00-secrets.yaml new file mode 100644 index 00000000..b9dd795f --- /dev/null +++ b/tests/templates/kuttl/client-spooling/00-secrets.yaml @@ -0,0 +1,62 @@ +--- +apiVersion: v1 +kind: Secret +metadata: + name: minio-credentials + labels: + secrets.stackable.tech/class: s3-credentials-class +stringData: + accessKey: minioAccessKey + secretKey: minioSecretKey + # The following two entries are used by the Bitnami chart for MinIO to + # set up credentials for accessing buckets managed by the MinIO tenant. + root-user: minioAccessKey + root-password: minioSecretKey +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: s3-credentials-class +spec: + backend: + k8sSearch: + searchNamespace: + pod: {} +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: minio-tls-certificates +spec: + backend: + k8sSearch: + searchNamespace: + pod: {} +--- +apiVersion: v1 +kind: Secret +metadata: + name: minio-tls-certificates + labels: + secrets.stackable.tech/class: minio-tls-certificates +# Have a look at the folder certs on how to create this +data: + ca.crt: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUQyVENDQXNHZ0F3SUJBZ0lVTmpxdUdZV3R5SjVhNnd5MjNIejJHUmNNbHdNd0RRWUpLb1pJaHZjTkFRRUwKQlFBd2V6RUxNQWtHQTFVRUJoTUNSRVV4R3pBWkJnTlZCQWdNRWxOamFHeGxjM2RwWnkxSWIyeHpkR1ZwYmpFTwpNQXdHQTFVRUJ3d0ZWMlZrWld3eEtEQW1CZ05WQkFvTUgxTjBZV05yWVdKc1pTQlRhV2R1YVc1bklFRjFkR2h2CmNtbDBlU0JKYm1NeEZUQVRCZ05WQkFNTURITjBZV05yWVdKc1pTNWtaVEFnRncweU16QTJNVFl4TWpVeE1ESmEKR0E4eU1USXpNRFV5TXpFeU5URXdNbG93ZXpFTE1Ba0dBMVVFQmhNQ1JFVXhHekFaQmdOVkJBZ01FbE5qYUd4bApjM2RwWnkxSWIyeHpkR1ZwYmpFT01Bd0dBMVVFQnd3RlYyVmtaV3d4S0RBbUJnTlZCQW9NSDFOMFlXTnJZV0pzClpTQlRhV2R1YVc1bklFRjFkR2h2Y21sMGVTQkpibU14RlRBVEJnTlZCQU1NREhOMFlXTnJZV0pzWlM1a1pUQ0MKQVNJd0RRWUpLb1pJaHZjTkFRRUJCUUFEZ2dFUEFEQ0NBUW9DZ2dFQkFOblYvdmJ5M1JvNTdhMnF2UVJubjBqZQplS01VMitGMCtsWk5DQXZpR1VENWJtOGprOTFvUFpuazBiaFFxZXlFcm1EUzRXVDB6ZXZFUklCSkpEamZMMEQ4CjQ2QmU3UGlNS2UwZEdqb3FJM3o1Y09JZWpjOGFMUEhTSWxnTjZsVDNmSXJ1UzE2Y29RZ0c0dWFLaUhGNStlV0YKRFJVTGR1NmRzWXV6NmRLanFSaVVPaEh3RHd0VUprRHdQditFSXRxbzBIK01MRkxMWU0wK2xFSWFlN2RONUNRNQpTbzVXaEwyY3l2NVZKN2xqL0VBS0NWaUlFZ0NtekRSRGNSZ1NTald5SDRibjZ5WDIwMjZmUEl5V0pGeUVkTC82CmpBT0pBRERSMEd5aE5PWHJFZXFob2NTTW5JYlFWcXdBVDBrTWh1WFN2d3Zscm5MeVRwRzVqWm00bFVNMzRrTUMKQXdFQUFhTlRNRkV3SFFZRFZSME9CQllFRkVJM1JNTWl5aUJqeVExUlM4bmxPUkpWZDFwQk1COEdBMVVkSXdRWQpNQmFBRkVJM1JNTWl5aUJqeVExUlM4bmxPUkpWZDFwQk1BOEdBMVVkRXdFQi93UUZNQU1CQWY4d0RRWUpLb1pJCmh2Y05BUUVMQlFBRGdnRUJBSHRLUlhkRmR0VWh0VWpvZG1ZUWNlZEFEaEhaT2hCcEtpbnpvdTRicmRrNEhmaEYKTHIvV0ZsY1JlbWxWNm1Cc0xweU11SytUZDhaVUVRNkpFUkx5NmxTL2M2cE9HeG5CNGFDbEU4YXQrQytUakpBTwpWbTNXU0k2VlIxY0ZYR2VaamxkVlE2eGtRc2tNSnpPN2RmNmlNVFB0VjVSa01lSlh0TDZYYW1FaTU0ckJvZ05ICk5yYStFSkJRQmwvWmU5ME5qZVlidjIwdVFwWmFhWkZhYVNtVm9OSERwQndsYTBvdXkrTWpPYkMzU3BnT3ExSUMKUGwzTnV3TkxWOFZiT3I1SHJoUUFvS21nU05iM1A4dmFUVnV4L1gwWWZqeS9TN045a1BCYUs5bUZqNzR6d1Y5dwpxU1ExNEtsNWpPM1YzaHJHV1laRWpET2diWnJyRVgxS1hFdXN0K1E9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K + tls.crt: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUR5RENDQXJDZ0F3SUJBZ0lVQ0kyUE5OcnR6cDZRbDdHa3VhRnhtRGE2VUJvd0RRWUpLb1pJaHZjTkFRRUwKQlFBd2V6RUxNQWtHQTFVRUJoTUNSRVV4R3pBWkJnTlZCQWdNRWxOamFHeGxjM2RwWnkxSWIyeHpkR1ZwYmpFTwpNQXdHQTFVRUJ3d0ZWMlZrWld3eEtEQW1CZ05WQkFvTUgxTjBZV05yWVdKc1pTQlRhV2R1YVc1bklFRjFkR2h2CmNtbDBlU0JKYm1NeEZUQVRCZ05WQkFNTURITjBZV05yWVdKc1pTNWtaVEFnRncweU16QTJNVFl4TWpVeE1ESmEKR0E4eU1USXpNRFV5TXpFeU5URXdNbG93WGpFTE1Ba0dBMVVFQmhNQ1JFVXhHekFaQmdOVkJBZ01FbE5qYUd4bApjM2RwWnkxSWIyeHpkR1ZwYmpFT01Bd0dBMVVFQnd3RlYyVmtaV3d4RWpBUUJnTlZCQW9NQ1ZOMFlXTnJZV0pzClpURU9NQXdHQTFVRUF3d0ZiV2x1YVc4d2dnRWlNQTBHQ1NxR1NJYjNEUUVCQVFVQUE0SUJEd0F3Z2dFS0FvSUIKQVFDanluVnorWEhCOE9DWTRwc0VFWW1qb2JwZHpUbG93d2NTUU4rWURQQ2tCZW9yMFRiODdFZ0x6SksrSllidQpwb1hCbE5JSlBRYW93SkVvL1N6U2s4ZnUyWFNNeXZBWlk0RldHeEp5Mnl4SXh2UC9pYk9HT1l1aVBHWEsyNHQ2ClpjR1RVVmhhdWlaR1Nna1dyZWpXV2g3TWpGUytjMXZhWVpxQitRMXpQczVQRk1sYzhsNVYvK2I4WjdqTUppODQKbU9mSVB4amt2SXlKcjVVa2VGM1VmTHFKUzV5NExGNHR5NEZ0MmlBZDdiYmZIYW5mdlltdjZVb0RWdE1YdFdvMQpvUVBmdjNzaFdybVJMenc2ZXVJQXRiWGM1Q2pCeUlha0NiaURuQVU4cktnK0IxSjRtdlFnckx3bzNxUHJ5Smd4ClNkaWRtWjJtRVI3RXorYzVCMG0vTGlJaEFnTUJBQUdqWHpCZE1Cc0dBMVVkRVFRVU1CS0NCVzFwYm1sdmdnbHMKYjJOaGJHaHZjM1F3SFFZRFZSME9CQllFRkpRMGdENWtFdFFyK3REcERTWjdrd1o4SDVoR01COEdBMVVkSXdRWQpNQmFBRkVJM1JNTWl5aUJqeVExUlM4bmxPUkpWZDFwQk1BMEdDU3FHU0liM0RRRUJDd1VBQTRJQkFRQmNkaGQrClI0Sm9HdnFMQms1OWRxSVVlY2N0dUZzcmRQeHNCaU9GaFlOZ1pxZWRMTTBVTDVEenlmQUhmVk8wTGZTRURkZFgKUkpMOXlMNytrTVUwVDc2Y3ZkQzlYVkFJRTZIVXdUbzlHWXNQcXN1eVpvVmpOcEVESkN3WTNDdm9ubEpWZTRkcQovZ0FiSk1ZQitUU21ZNXlEUHovSkZZL1haellhUGI3T2RlR3VqYlZUNUl4cDk3QXBTOFlJaXY3M0Mwd1ViYzZSCmgwcmNmUmJ5a1NRVWg5dmdWZFhSU1I4RFQzV0NmZHFOek5CWVh2OW1xZlc1ejRzYkdqK2wzd1VsL0kzRi9tSXcKZnlPNEN0aTRha2lHVkhsZmZFeTB3a3pWYUJ4aGNYajJJM0JVVGhCNFpxamxzc2llVmFGa3d2WG1teVJUMG9FVwo1SCtOUEhjcXVTMXpQc2NsCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K + tls.key: LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2QUlCQURBTkJna3Foa2lHOXcwQkFRRUZBQVNDQktZd2dnU2lBZ0VBQW9JQkFRQ2p5blZ6K1hIQjhPQ1kKNHBzRUVZbWpvYnBkelRsb3d3Y1NRTitZRFBDa0Jlb3IwVGI4N0VnTHpKSytKWWJ1cG9YQmxOSUpQUWFvd0pFbwovU3pTazhmdTJYU015dkFaWTRGV0d4SnkyeXhJeHZQL2liT0dPWXVpUEdYSzI0dDZaY0dUVVZoYXVpWkdTZ2tXCnJlaldXaDdNakZTK2MxdmFZWnFCK1ExelBzNVBGTWxjOGw1Vi8rYjhaN2pNSmk4NG1PZklQeGprdkl5SnI1VWsKZUYzVWZMcUpTNXk0TEY0dHk0RnQyaUFkN2JiZkhhbmZ2WW12NlVvRFZ0TVh0V28xb1FQZnYzc2hXcm1STHp3NgpldUlBdGJYYzVDakJ5SWFrQ2JpRG5BVThyS2crQjFKNG12UWdyTHdvM3FQcnlKZ3hTZGlkbVoybUVSN0V6K2M1CkIwbS9MaUloQWdNQkFBRUNnZ0VBQWQzdDVzdUNFMjdXY0llc3NxZ3NoSFAwZHRzKyswVzF6K3h6WC8xTnhPRFkKWVhWNkJmbi9mRHJ4dFQ4aVFaZ2VVQzJORTFQaHZveXJXdWMvMm9xYXJjdEd1OUFZV29HNjJLdG9VMnpTSFdZLwpJN3VERTFXV2xOdlJZVFdOYW5DOGV4eGpRRzE4d0RKWjFpdFhTeEl0NWJEM3lrL3dUUlh0dCt1SnpyVjVqb2N1CmNoeERMd293aXUxQWo2ZFJDWk5CejlUSnh5TnI1ME5ZVzJVWEJhVC84N1hyRkZkSndNVFZUMEI3SE9uRzdSQlYKUWxLdzhtcVZiYU5lbmhjdk1qUjI5c3hUekhSK2p4SU8zQndPNk9Hai9PRmhGQllVN1RMWGVsZDFxb2UwdmIyRwpiOGhQcEd1cHRyNUF0OWx3MXc1d1EzSWdpdXRQTkg1cXlEeUNwRWw2RVFLQmdRRGNkYnNsT2ZLSmo3TzJMQXlZCkZ0a1RwaWxFMFYzajBxbVE5M0lqclY0K0RSbUxNRUIyOTk0MDdCVVlRUWoxL0RJYlFjb1oyRUVjVUI1cGRlSHMKN0RNRUQ2WExIYjJKVTEyK2E3c1d5Q05kS2VjZStUNy9JYmxJOFR0MzQwVWxIUTZ6U01TRGNqdmZjRkhWZ3YwcwpDYWpoRng3TmtMRVhUWnI4ZlQzWUloajR2UUtCZ1FDK01nWjFVbW9KdzlJQVFqMnVJVTVDeTl4aldlWURUQU8vCllhWEl6d2xnZTQzOE1jYmI0Y04yU2FOU0dEZ1Y3bnU1a3FpaWhwalBZV0lpaU9CcDlrVFJIWE9kUFc0N3N5ZUkKdDNrd3JwMnpWbFVnbGNNWlo2bW1WM1FWYUFOWmdqVTRSU3Y0ZS9WeFVMamJaYWZqUHRaUnNqWkdwSzBZVTFvdApWajhJZVE3Zk5RS0JnQ1ArWk11ekpsSW5VQ1FTRlF4UHpxbFNtN0pNckpPaHRXV2h3TlRxWFZTc050dHV5VmVqCktIaGpneDR1b0JQcFZSVDJMTlVEWmI0RnByRjVPYVhBK3FOVEdyS0s3SU1iUlZidHArSVVVeEhHNGFGQStIUVgKUVhVVFRhNUpRT1RLVmJnWHpWM1lyTVhTUk1valZNcDMyVWJHeTVTc1p2MXpBamJ2QzhYWjYxSFJBb0dBZEJjUQp2aGU1eFpBUzVEbUtjSGkvemlHa3ViZXJuNk9NUGdxYUtJSEdsVytVOExScFR0ajBkNFRtL1Rydk1PUEovVEU1CllVcUtoenBIcmhDaCtjdHBvY0k2U1dXdm5SenpLbzNpbVFaY0Y1VEFqUTBjY3F0RmI5UzlkRHR5bi9YTUNqYWUKYWlNdll5VUVVRll5TFpDelBGWnNycDNoVVpHKzN5RmZoQXB3TzJrQ2dZQkh3WWFQSWRXNld3NytCMmhpbjBvdwpqYTNjZXN2QTRqYU1Qd1NMVDhPTnRVMUdCU01md2N6TWJuUEhMclJ2Qjg3bjlnUGFSMndRR1VtckZFTzNMUFgvCmtSY09HcFlCSHBEWEVqRGhLa1dkUnVMT0ZnNEhMWmRWOEFOWmxRMFZTY0U4dTNkRERVTzg5cEdEbjA4cVRBcmwKeDlreHN1ZEVWcmtlclpiNVV4RlZxUT09Ci0tLS0tRU5EIFBSSVZBVEUgS0VZLS0tLS0K +--- +apiVersion: s3.stackable.tech/v1alpha1 +kind: S3Connection +metadata: + name: minio +spec: + host: minio + port: 9000 + accessStyle: Path + credentials: + secretClass: s3-credentials-class + tls: + verification: + server: + caCert: + secretClass: minio-tls-certificates diff --git a/tests/templates/kuttl/client-spooling/01-assert.yaml b/tests/templates/kuttl/client-spooling/01-assert.yaml new file mode 100644 index 00000000..4d24ed7d --- /dev/null +++ b/tests/templates/kuttl/client-spooling/01-assert.yaml @@ -0,0 +1,17 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 600 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: minio +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: v1 +kind: Service +metadata: + name: minio diff --git a/tests/templates/kuttl/client-spooling/01-install-minio.yaml b/tests/templates/kuttl/client-spooling/01-install-minio.yaml new file mode 100644 index 00000000..7a063b49 --- /dev/null +++ b/tests/templates/kuttl/client-spooling/01-install-minio.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: >- + helm install minio + --namespace $NAMESPACE + --version 17.0.19 + -f 01_helm-bitnami-minio-values.yaml + oci://registry-1.docker.io/bitnamicharts/minio + timeout: 240 diff --git a/tests/templates/kuttl/client-spooling/01_helm-bitnami-minio-values.yaml b/tests/templates/kuttl/client-spooling/01_helm-bitnami-minio-values.yaml new file mode 100644 index 00000000..72b41c0c --- /dev/null +++ b/tests/templates/kuttl/client-spooling/01_helm-bitnami-minio-values.yaml @@ -0,0 +1,79 @@ +--- +global: + security: + allowInsecureImages: true + +image: + repository: bitnamilegacy/minio +clientImage: + repository: bitnamilegacy/minio-client +defaultInitContainers: + volumePermissions: # volumePermissions moved under defaultInitContainers starting with Chart version 17.0.0 + enabled: false + image: + repository: bitnamilegacy/os-shell +console: + image: + repository: bitnamilegacy/minio-object-browser + +mode: standalone +disableWebUI: false +extraEnvVars: + - name: BITNAMI_DEBUG + value: "true" + - name: MINIO_LOG_LEVEL + value: DEBUG + +provisioning: + enabled: true + buckets: + - name: spooling-bucket + resources: + requests: + memory: 1Gi + cpu: "512m" + limits: + memory: "1Gi" + cpu: "1" + podSecurityContext: + enabled: false + containerSecurityContext: + enabled: false + +# volumePermissions can be removed starting with Chart version 17.0.0, moved under defaultInitContainers +volumePermissions: + enabled: false + image: + repository: bitnamilegacy/os-shell + +podSecurityContext: + enabled: false + +containerSecurityContext: + enabled: false + +persistence: + enabled: false + +resources: + requests: + memory: 1Gi + cpu: "512m" + limits: + memory: "1Gi" + cpu: "1" + +auth: + existingSecret: minio-credentials + +service: + type: NodePort + +tls: + enabled: true + autoGenerated: + enabled: false + existingCASecret: minio-tls-certificates + existingSecret: minio-tls-certificates + server: + existingSecret: minio-tls-certificates diff --git a/tests/templates/kuttl/client-spooling/02-assert.yaml b/tests/templates/kuttl/client-spooling/02-assert.yaml new file mode 100644 index 00000000..77a5cac0 --- /dev/null +++ b/tests/templates/kuttl/client-spooling/02-assert.yaml @@ -0,0 +1,25 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 600 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: trino-coordinator-default +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: trino-worker-default +status: + readyReplicas: 2 + replicas: 2 +--- +apiVersion: trino.stackable.tech/v1alpha1 +kind: TrinoCatalog +metadata: + name: tpch diff --git a/tests/templates/kuttl/client-spooling/02-install-trino.yaml.j2 b/tests/templates/kuttl/client-spooling/02-install-trino.yaml.j2 new file mode 100644 index 00000000..5608e6cb --- /dev/null +++ b/tests/templates/kuttl/client-spooling/02-install-trino.yaml.j2 @@ -0,0 +1,55 @@ +--- +apiVersion: trino.stackable.tech/v1alpha1 +kind: TrinoCluster +metadata: + name: trino +spec: + image: +{% if test_scenario['values']['trino-latest'].find(",") > 0 %} + custom: "{{ test_scenario['values']['trino-latest'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['trino-latest'].split(',')[0] }}" +{% else %} + productVersion: "{{ test_scenario['values']['trino-latest'] }}" +{% endif %} + pullPolicy: IfNotPresent + clusterConfig: + catalogLabelSelector: + matchLabels: + trino: trino + clientProtocol: + spooling: + location: "s3://spooling-bucket/trino/" + filesystem: + s3: + connection: + reference: "minio" +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} + coordinators: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + roleGroups: + default: + replicas: 1 + config: {} + workers: + config: + gracefulShutdownTimeout: 5s # Let the test run faster + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + roleGroups: + default: + replicas: 2 + config: {} +--- +apiVersion: trino.stackable.tech/v1alpha1 +kind: TrinoCatalog +metadata: + name: tpch + labels: + trino: trino +spec: + connector: + tpch: {} diff --git a/tests/templates/kuttl/client-spooling/03-assert.yaml b/tests/templates/kuttl/client-spooling/03-assert.yaml new file mode 100644 index 00000000..71abb09d --- /dev/null +++ b/tests/templates/kuttl/client-spooling/03-assert.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 300 +--- +apiVersion: v1 +kind: Pod +metadata: + name: trino-test-helper +status: + phase: Running diff --git a/tests/templates/kuttl/client-spooling/03-install-test-helper.yaml b/tests/templates/kuttl/client-spooling/03-install-test-helper.yaml new file mode 100644 index 00000000..bff62e61 --- /dev/null +++ b/tests/templates/kuttl/client-spooling/03-install-test-helper.yaml @@ -0,0 +1,22 @@ +--- +apiVersion: v1 +kind: Pod +metadata: + name: trino-test-helper + labels: + app: trino-test-helper +spec: + serviceAccount: integration-tests-sa + containers: + - name: trino-test-helper + image: python:3.13.7-slim-trixie + command: ["bash", "-c"] + args: + - "sleep infinity" + resources: + requests: + cpu: "250m" + memory: "1024Mi" + limits: + cpu: "250m" + memory: "2048Mi" diff --git a/tests/templates/kuttl/client-spooling/04-copy-scripts.yaml b/tests/templates/kuttl/client-spooling/04-copy-scripts.yaml new file mode 100644 index 00000000..b6a3a518 --- /dev/null +++ b/tests/templates/kuttl/client-spooling/04-copy-scripts.yaml @@ -0,0 +1,6 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl cp -n $NAMESPACE ../../../../templates/kuttl/commons/check-active-workers.py trino-test-helper:/tmp || true + - script: kubectl cp -n $NAMESPACE 04_query.py trino-test-helper:/tmp/ diff --git a/tests/templates/kuttl/client-spooling/04_query.py b/tests/templates/kuttl/client-spooling/04_query.py new file mode 100644 index 00000000..7cd1fc7e --- /dev/null +++ b/tests/templates/kuttl/client-spooling/04_query.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python +import trino +import argparse +import urllib3 + + +def get_connection(coordinator): + """Create anonymous connection for basic cluster health check""" + conn = trino.dbapi.connect( + host=coordinator, + port=8443, + user="test", + http_scheme="https", + verify=False, + session_properties={"query_max_execution_time": "60s"}, + ) + return conn + + +if __name__ == "__main__": + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + # Construct an argument parser + all_args = argparse.ArgumentParser() + + # Add arguments to the parser + all_args.add_argument( + "-c", + "--coordinator", + required=True, + help="Trino Coordinator Host to connect to", + ) + + args = vars(all_args.parse_args()) + + conn = get_connection(args["coordinator"]) + + try: + cursor = conn.cursor() + + # The table tpch.sf100.customer has 15 million rows but Python consumes + # too much memory to retrieve all of them at once. + # Fetching them one by one is too slow, so we fetch enough rows + # for Trino to use the spooling protocol. + # Fetching too few rows is risky as Trino might decide to not use spooling. + + print("🚜 fetching many rows from Trino to trigger spooling...") + + customer_count = 0 + batch_count = 50 + batch_size = 1_000 + expected_customers = batch_count * 1_000 + + cursor.execute("SELECT * FROM tpch.sf100.customer") + while batch_count > 0: + print(f"⏳ fetching batch {batch_count} of {batch_size} rows...") + _ = cursor.fetchmany(batch_size) + customer_count += batch_size + batch_count = batch_count - 1 + + assert customer_count == expected_customers, ( + f"💀 crap! expected {expected_customers} customers, got {customer_count}" + ) + + print("🎉 major success!") + + cursor.close() + + except Exception as e: + print(f"💀 oh noes! cannot fetch customers from Trino: {e}") + raise e + + finally: + conn.close() diff --git a/tests/templates/kuttl/client-spooling/05-assert.yaml b/tests/templates/kuttl/client-spooling/05-assert.yaml new file mode 100644 index 00000000..5c2dd20c --- /dev/null +++ b/tests/templates/kuttl/client-spooling/05-assert.yaml @@ -0,0 +1,30 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 300 +commands: + - script: kubectl exec -n $NAMESPACE trino-test-helper -- python /tmp/check-active-workers.py -u admin -p "" -c trino-coordinator -w 2 + - script: kubectl exec -n $NAMESPACE deployment/minio -- mc alias set local https://localhost:9000 minioAccessKey minioSecretKey --api S3v4 + # Verify that the spooling bucket contains data + - script: | + count=$(kubectl exec -n $NAMESPACE deployment/minio -- mc ls --insecure local/spooling-bucket/trino | wc -l) + echo "Number of spool segments is [$count] (should be > 0)" + if [ "$count" -eq 0 ]; then + exit 1 + fi +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: trino-coordinator-default +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: trino-worker-default +status: + readyReplicas: 2 + replicas: 2 diff --git a/tests/templates/kuttl/client-spooling/05-run-tests.yaml b/tests/templates/kuttl/client-spooling/05-run-tests.yaml new file mode 100644 index 00000000..1e872b38 --- /dev/null +++ b/tests/templates/kuttl/client-spooling/05-run-tests.yaml @@ -0,0 +1,6 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl exec -n $NAMESPACE trino-test-helper -- pip install --no-cache-dir trino==0.336.0 + - script: kubectl exec -n $NAMESPACE trino-test-helper -- python /tmp/04_query.py -c trino-coordinator diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 6102c5ca..393d23cd 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -115,6 +115,12 @@ tests: dimensions: - trino - openshift + - name: client-spooling + dimensions: + # The client spooling protocol was introduced in Trino 466 but it only works reliably starting with Trino 476. + # Long term we should test all versions + - trino-latest + - openshift - name: listener dimensions: - trino