Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ object SparkCatalogUtils extends Logging {
val sparkTableTypes: Set[String] = Set(VIEW, TABLE)

// ///////////////////////////////////////////////////////////////////////////////////////////////
// Catalog //
// Catalog //
// ///////////////////////////////////////////////////////////////////////////////////////////////

/**
* Get all register catalogs in Spark's `CatalogManager`
* Note that the result only contains loaded catalogs because catalogs are lazily loaded in Spark.
*/
def getCatalogs(spark: SparkSession): Seq[Row] = {

Expand All @@ -70,29 +70,34 @@ object SparkCatalogUtils extends Logging {
}

def setCurrentCatalog(spark: SparkSession, catalog: String): Unit = {
// SPARK-36841(3.3.0) Ensure setCurrentCatalog method catalog must exist
// SPARK-36841 (3.3.0) Ensure setCurrentCatalog method catalog must exist
if (spark.sessionState.catalogManager.isCatalogRegistered(catalog)) {
spark.sessionState.catalogManager.setCurrentCatalog(catalog)
} else {
throw new IllegalArgumentException(s"Cannot find catalog plugin class for catalog '$catalog'")
}
}

// SPARK-50700 (4.0.0) adds the `builtin` magic value
private def hasCustomSessionCatalog(spark: SparkSession): Boolean = {
spark.conf.get(s"spark.sql.catalog.$SESSION_CATALOG", "builtin") != "builtin"
}

// ///////////////////////////////////////////////////////////////////////////////////////////////
// Schema //
// Schema //
// ///////////////////////////////////////////////////////////////////////////////////////////////

/**
* a list of [[Row]]s, with 2 fields `schemaName: String, catalogName: String`
* Return a list of [[Row]]s, with 2 fields `schemaName: String, catalogName: String`
*/
def getSchemas(
spark: SparkSession,
catalogName: String,
schemaPattern: String): Seq[Row] = {
if (catalogName == SparkCatalogUtils.SESSION_CATALOG) {
(spark.sessionState.catalog.listDatabases(schemaPattern) ++
getGlobalTempViewManager(spark, schemaPattern))
.map(Row(_, SparkCatalogUtils.SESSION_CATALOG))
if (catalogName == SESSION_CATALOG && !hasCustomSessionCatalog(spark)) {
val dbs = spark.sessionState.catalog.listDatabases(schemaPattern)
val globalTempDb = getGlobalTempViewManager(spark, schemaPattern)
(dbs ++ globalTempDb).map(Row(_, SESSION_CATALOG))
} else {
val catalog = getCatalog(spark, catalogName)
getSchemasWithPattern(catalog, schemaPattern).map(Row(_, catalog.name))
Expand Down Expand Up @@ -162,6 +167,21 @@ object SparkCatalogUtils extends Logging {
val catalog = getCatalog(spark, catalogName)
val namespaces = listNamespacesWithPattern(catalog, schemaPattern)
catalog match {
case tc: TableCatalog =>
val tp = tablePattern.r.pattern
val identifiers = namespaces.flatMap { ns =>
tc.listTables(ns).filter(i => tp.matcher(quoteIfNeeded(i.name())).matches())
}
identifiers.map { ident =>
// TODO: restore view type for session catalog
val comment = if (ignoreTableProperties) ""
else { // load table is a time consuming operation
tc.loadTable(ident).properties().getOrDefault(TableCatalog.PROP_COMMENT, "")
}
val schema = ident.namespace().map(quoteIfNeeded).mkString(".")
val tableName = quoteIfNeeded(ident.name())
Row(catalog.name(), schema, tableName, TABLE, comment, null, null, null, null, null)
}
case builtin if builtin.name() == SESSION_CATALOG =>
val sessionCatalog = spark.sessionState.catalog
val databases = sessionCatalog.listDatabases(schemaPattern)
Expand Down Expand Up @@ -206,21 +226,6 @@ object SparkCatalogUtils extends Logging {
}
}
}
case tc: TableCatalog =>
val tp = tablePattern.r.pattern
val identifiers = namespaces.flatMap { ns =>
tc.listTables(ns).filter(i => tp.matcher(quoteIfNeeded(i.name())).matches())
}
identifiers.map { ident =>
// TODO: restore view type for session catalog
val comment = if (ignoreTableProperties) ""
else { // load table is a time consuming operation
tc.loadTable(ident).properties().getOrDefault(TableCatalog.PROP_COMMENT, "")
}
val schema = ident.namespace().map(quoteIfNeeded).mkString(".")
val tableName = quoteIfNeeded(ident.name())
Row(catalog.name(), schema, tableName, TABLE, comment, null, null, null, null, null)
}
case _ => Seq.empty[Row]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ import java.nio.file.Path

trait DeltaSuiteMixin extends DataLakeSuiteMixin {

override protected def format: String = "delta"
override protected val format: String = "delta"

override protected def catalog: String = "spark_catalog"
override protected val catalog: String = "spark_catalog"

override protected def warehouse: Path = Utils.createTempDir()
override protected val warehouse: Path = Utils.createTempDir()

override protected def extraJars: String = {
override protected val extraJars: String = {
System.getProperty("java.class.path")
.split(":")
.filter(_.contains("io/delta/delta")).mkString(",")
}

override protected def extraConfigs = Map(
override protected val extraConfigs: Map[String, String] = Map(
"spark.sql.catalogImplementation" -> "in-memory",
"spark.sql.defaultCatalog" -> catalog,
"spark.sql.extensions" -> "io.delta.sql.DeltaSparkSessionExtension",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ import java.nio.file.Path

trait IcebergSuiteMixin extends DataLakeSuiteMixin {

override protected def format: String = "iceberg"
override protected val format: String = "iceberg"

override protected def catalog: String = "hadoop_prod"
override protected val catalog: String = "hadoop_prod"

override protected def warehouse: Path = Utils.createTempDir()
override protected val warehouse: Path = Utils.createTempDir()

override protected def extraJars: String = {
override protected val extraJars: String = {
System.getProperty("java.class.path")
.split(":")
.filter(_.contains("iceberg-spark")).head
}

override protected def extraConfigs = Map(
override protected val extraConfigs: Map[String, String] = Map(
"spark.sql.catalogImplementation" -> "in-memory",
"spark.sql.defaultCatalog" -> catalog,
"spark.sql.extensions" -> "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.kyuubi.operation

import scala.collection.mutable.ListBuffer

import org.apache.kyuubi.{IcebergSuiteMixin, SPARK_COMPILE_VERSION}
import org.apache.kyuubi.IcebergSuiteMixin
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.util.AssertionUtils._
import org.apache.kyuubi.util.SparkVersionUtil
Expand Down Expand Up @@ -51,12 +51,9 @@ trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin wit
checkGetSchemas(metaData.getSchemas("spark_catalog", pattern), dbDflts, "spark_catalog")
}

Seq(null, catalog).foreach { cg =>
Seq("spark_catalog", catalog).foreach { cg =>
matchAllPatterns foreach { pattern =>
checkGetSchemas(
metaData.getSchemas(cg, pattern),
dbs,
catalog)
checkGetSchemas(metaData.getSchemas(cg, pattern), dbs, catalog)
}
}

Expand Down Expand Up @@ -87,7 +84,7 @@ trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin wit
dbs.foreach(db => statement.execute(s"CREATE NAMESPACE IF NOT EXISTS $db"))
val metaData = statement.getConnection.getMetaData

Seq(null, catalog).foreach { cg =>
Seq("spark_catalog", catalog).foreach { cg =>
matchAllPatterns foreach { pattern =>
checkGetSchemas(
metaData.getSchemas(cg, pattern),
Expand Down Expand Up @@ -116,7 +113,7 @@ trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin wit
dbs.foreach(db => statement.execute(s"CREATE NAMESPACE IF NOT EXISTS $db"))
val metaData = statement.getConnection.getMetaData

Seq(catalog).foreach { cg =>
Seq("spark_catalog", catalog).foreach { cg =>
dbs.foreach { db =>
try {
statement.execute(
Expand Down Expand Up @@ -156,12 +153,9 @@ trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin wit
"map<int, bigint>",
"date",
"timestamp",
// SPARK-37931
if (SPARK_ENGINE_RUNTIME_VERSION >= "3.3") "struct<X: bigint, Y: double>"
else "struct<`X`: bigint, `Y`: double>",
"struct<X: bigint, Y: double>",
"binary",
// SPARK-37931
if (SPARK_COMPILE_VERSION >= "3.3") "struct<X: string>" else "struct<`X`: string>")
"struct<X: string>")
val cols = dataTypes.zipWithIndex.map { case (dt, idx) => s"c$idx" -> dt }
val (colNames, _) = cols.unzip

Expand Down
Loading