From ac1f23bc2ac48d2843db3e801dce65425a286401 Mon Sep 17 00:00:00 2001 From: Martin Grund Date: Mon, 25 Aug 2025 08:35:43 +0200 Subject: [PATCH 1/3] next --- .../connect/planner/SparkConnectPlanner.scala | 4 +- .../sql/execution/arrow/ArrowConverters.scala | 112 +++++ .../arrow/ArrowConvertersSuite.scala | 416 ++++++++++++++++++ 3 files changed, 530 insertions(+), 2 deletions(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 91db8d74264b8..68a9e41e7d752 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -1441,8 +1441,8 @@ class SparkConnectPlanner( } if (rel.hasData) { - val (rows, structType) = ArrowConverters.fromBatchWithSchemaIterator( - Iterator(rel.getData.toByteArray), + val (rows, structType) = ArrowConverters.fromIPCStream( + rel.getData.toByteArray, TaskContext.get()) if (structType == null) { throw InvalidInputErrors.inputDataForLocalRelationNoSchema() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala index ed490347ae821..91afa9b7fda9c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala @@ -264,6 +264,101 @@ private[sql] object ArrowConverters extends Logging { } } + /** + * This is a class that converts input data in the form of a Byte array to InternalRow instances + * implementing the Iterator interface. + * + * The input data must be a valid Arrow IPC stream, this means that the first message is always + * the schema followed by N record batches. + * + * @param input Input Data + * @param context Task Context for Spark + */ + private[sql] class InternalRowIteratorFromIPCStream( + input: Array[Byte], + context: TaskContext) extends Iterator[InternalRow] { + + // Keep all the resources we have opened in order, should be closed + // in reverse order finally. + private val resources = new ArrayBuffer[AutoCloseable]() + + // Create an allocator used for all Arrow related memory. + protected val allocator: BufferAllocator = ArrowUtils.rootAllocator.newChildAllocator( + s"to${this.getClass.getSimpleName}", + 0, + Long.MaxValue) + resources.append(allocator) + + private val reader = try { + new ArrowStreamReader(new ByteArrayInputStream(input), allocator) + } catch { + case e: Exception => + closeAll(resources.toSeq.reverse: _*) + throw new IllegalArgumentException( + s"Failed to create ArrowStreamReader: ${e.getMessage}", e) + } + resources.append(reader) + + private val root: VectorSchemaRoot = try { + reader.getVectorSchemaRoot + } catch { + case e: Exception => + closeAll(resources.toSeq.reverse: _*) + throw new IllegalArgumentException( + s"Failed to read schema from IPC stream: ${e.getMessage}", e) + } + resources.append(root) + + val schema: StructType = try { + ArrowUtils.fromArrowSchema(root.getSchema) + } catch { + case e: Exception => + closeAll(resources.toSeq.reverse: _*) + throw new IllegalArgumentException(s"Failed to convert Arrow schema: ${e.getMessage}", e) + } + + // TODO: wrap in exception + private var rowIterator: Iterator[InternalRow] = vectorSchemaRootToIter(root) + + // Metrics to track batch processing + private var _batchesLoaded: Int = 0 + private var _totalRowsProcessed: Long = 0L + + if (context != null) { + context.addTaskCompletionListener[Unit] { _ => + closeAll(resources.toSeq.reverse: _*) + } + } + + // Public accessors for metrics + def batchesLoaded: Int = _batchesLoaded + def totalRowsProcessed: Long = _totalRowsProcessed + + // Loads the next batch from the Arrow reader and returns true or + // false if the next batch could be loaded. + private def loadNextBatch(): Boolean = { + if (reader.loadNextBatch()) { + rowIterator = vectorSchemaRootToIter(root) + _batchesLoaded += 1 + true + } else { + false + } + } + + override def hasNext: Boolean = { + rowIterator.hasNext || loadNextBatch() + } + + override def next(): InternalRow = { + if (!hasNext) { + throw new NoSuchElementException("No more elements in iterator") + } + _totalRowsProcessed += 1 + rowIterator.next() + } + } + /** * An InternalRow iterator which parse data from serialized ArrowRecordBatches, subclass should * implement [[nextBatch]] to parse data from binary records. @@ -382,6 +477,23 @@ private[sql] object ArrowConverters extends Logging { (iterator, iterator.schema) } + /** + * Creates an iterator from a Byte array to deserialize an Arrow IPC stream with exactly + * one schema and a varying number of record batches. Returns an iterator over the + * created InternalRow. + */ + private[sql] def fromIPCStream(input: Array[Byte], context: TaskContext): + (Iterator[InternalRow], StructType) = { + fromIPCStreamWithIterator(input, context) + } + + // Overloaded method for tests to access the iterator with metrics + private[sql] def fromIPCStreamWithIterator(input: Array[Byte], context: TaskContext): + (InternalRowIteratorFromIPCStream, StructType) = { + val iterator = new InternalRowIteratorFromIPCStream(input, context) + (iterator, iterator.schema) + } + /** * Convert an arrow batch container into an iterator of InternalRow. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala index 60b2ad0533e38..ccf6b63eb5ded 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala @@ -1534,6 +1534,422 @@ class ArrowConvertersSuite extends SharedSparkSession { } } + test("roundtrip arrow batches with IPC stream - single batch") { + val inputRows = (0 until 9).map(InternalRow(_)) :+ InternalRow(null) + val schema = StructType(Seq(StructField("int", IntegerType, nullable = true))) + val ctx = TaskContext.empty() + + val batchIter = ArrowConverters.toBatchIterator( + inputRows.iterator, schema, 10, null, true, false, ctx) + + // Write batches to Arrow IPC stream format + val out = new ByteArrayOutputStream() + Utils.tryWithResource(new DataOutputStream(out)) { dataOut => + val writer = new ArrowBatchStreamWriter(schema, dataOut, null, true, false) + writer.writeBatches(batchIter) + writer.end() + } + + // Test the new IPC stream converter with metrics + val (iterator, outputSchema) = ArrowConverters.fromIPCStreamWithIterator(out.toByteArray, ctx) + assert(outputSchema == schema) + + // Initially one batch loaded + assert(iterator.batchesLoaded == 0) + assert(iterator.totalRowsProcessed == 0) + + var count = 0 + iterator.zipWithIndex.foreach { case (row, i) => + if (i != 9) { + assert(row.getInt(0) == i) + } else { + assert(row.isNullAt(0)) + } + count += 1 + } + assert(count == inputRows.length) + + // Verify metrics after consuming all rows + assert(iterator.batchesLoaded == 1, + s"Expected 1 batch loaded, got ${iterator.batchesLoaded}") + assert(iterator.totalRowsProcessed == inputRows.length, + s"Expected ${inputRows.length} rows processed, got ${iterator.totalRowsProcessed}") + } + + test("multiple record batches in single IPC stream") { + val inputRows = (0 until 25).map(InternalRow(_)) + val schema = StructType(Seq(StructField("int", IntegerType, nullable = true))) + val ctx = TaskContext.empty() + + // Create multiple batches with small batch size + val batchIter = ArrowConverters.toBatchIterator( + inputRows.iterator, schema, 5, null, true, false, ctx) + + val out = new ByteArrayOutputStream() + Utils.tryWithResource(new DataOutputStream(out)) { dataOut => + val writer = new ArrowBatchStreamWriter(schema, dataOut, null, true, false) + writer.writeBatches(batchIter) + writer.end() + } + + val (iterator, outputSchema) = ArrowConverters.fromIPCStreamWithIterator(out.toByteArray, ctx) + assert(outputSchema == schema) + + // Initially no batches loaded + assert(iterator.batchesLoaded == 0) + assert(iterator.totalRowsProcessed == 0) + + iterator.zipWithIndex.foreach { case (row, i) => + assert(row.getInt(0) == i) + } + + // With batch size 5 and 25 rows, we expect 5 batches (25/5 = 5) + val expectedBatches = 5 + assert(iterator.batchesLoaded == expectedBatches, + s"Expected $expectedBatches batches loaded, got ${iterator.batchesLoaded}") + assert(iterator.totalRowsProcessed == inputRows.length, + s"Expected ${inputRows.length} rows processed, got ${iterator.totalRowsProcessed}") + } + + test("multiple record batches in single stream without schema") { + val inputRows = (0 until 15).map(InternalRow(_)) + val schema = StructType(Seq(StructField("int", IntegerType, nullable = true))) + val ctx = TaskContext.empty() + + val batchIter = ArrowConverters.toBatchIterator( + inputRows.iterator, schema, 7, null, true, false, ctx) + + val out = new ByteArrayOutputStream() + Utils.tryWithResource(new DataOutputStream(out)) { dataOut => + val writer = new ArrowBatchStreamWriter(schema, dataOut, null, true, false) + writer.writeBatches(batchIter) + writer.end() + } + + val (outputRowIter, outputSchema) = ArrowConverters.fromIPCStream(out.toByteArray, ctx) + assert(outputSchema == schema) + val res = outputRowIter.zipWithIndex.map { case (row, i) => + assert(row.getInt(0) == i) + i + } + assert(res.length == inputRows.length) + } + + test("roundtrip arrow batches with complex schema using IPC stream") { + val rows = (0 until 12).map { i => + InternalRow(i, UTF8String.fromString(s"str-$i"), InternalRow(i * 2)) + } + + val schema = StructType(Seq( + StructField("int", IntegerType), + StructField("str", StringType), + StructField("struct", StructType(Seq(StructField("inner", IntegerType)))) + )) + + val inputRows = rows.map { row => + val proj = UnsafeProjection.create(schema) + proj(row).copy() + } + val ctx = TaskContext.empty() + + // Create multiple batches + val batchIter = ArrowConverters.toBatchIterator( + inputRows.iterator, schema, 4, null, true, false, ctx) + + val out = new ByteArrayOutputStream() + Utils.tryWithResource(new DataOutputStream(out)) { dataOut => + val writer = new ArrowBatchStreamWriter(schema, dataOut, null, true, false) + writer.writeBatches(batchIter) + writer.end() + } + + val (outputRowIter, outputSchema) = ArrowConverters.fromIPCStream(out.toByteArray, ctx) + assert(outputSchema == schema) + val outputRows = outputRowIter.zipWithIndex.map { case (row, i) => + assert(row.getInt(0) == i) + assert(row.getUTF8String(1).toString == s"str-$i") + val struct = row.getStruct(2, 1) + assert(struct.getInt(0) == i * 2) + i + } + assert(outputRows.length == inputRows.length) + } + + test("IPC stream batch metrics validation") { + val ctx = TaskContext.empty() + val schema = StructType(Seq(StructField("int", IntegerType, nullable = true))) + + // Test with different batch sizes to validate metrics + val testCases = Seq( + (50, 7), // 50 rows, batch size 7 -> 8 batches (7*7 + 1) + (20, 4), // 20 rows, batch size 4 -> 5 batches (4*4 + 4) + (15, 15), // 15 rows, batch size 15 -> 1 batch + (0, 5) // 0 rows, any batch size -> 0 batches + ) + + testCases.foreach { case (rowCount, batchSize) => + val inputRows = (0 until rowCount).map(InternalRow(_)) + val batchIter = ArrowConverters.toBatchIterator( + inputRows.iterator, schema, batchSize, null, true, false, ctx) + + val out = new ByteArrayOutputStream() + Utils.tryWithResource(new DataOutputStream(out)) { dataOut => + val writer = new ArrowBatchStreamWriter(schema, dataOut, null, true, false) + writer.writeBatches(batchIter) + writer.end() + } + + val (iterator, outputSchema) = ArrowConverters.fromIPCStreamWithIterator(out.toByteArray, ctx) + assert(outputSchema == schema) + + // Initially no batches loaded + assert(iterator.batchesLoaded == 0) + assert(iterator.totalRowsProcessed == 0) + + // Consume all rows + val proj = UnsafeProjection.create(schema) + val outputRows = iterator.map(proj(_).copy()) + assert(outputRows.length == rowCount) + + if (rowCount > 0) { + // Calculate expected batches + val expectedBatches = Math.ceil(rowCount.toDouble / batchSize).toInt + assert(iterator.batchesLoaded == expectedBatches, + s"For $rowCount rows with batch size $batchSize: " + + s"expected $expectedBatches batches, got ${iterator.batchesLoaded}") + assert(iterator.totalRowsProcessed == rowCount, + s"For $rowCount rows: expected $rowCount rows processed, " + + s"got ${iterator.totalRowsProcessed}") + } else { + // Empty case - no batches should be loaded + assert(iterator.batchesLoaded == 0) + assert(iterator.totalRowsProcessed == 0) + } + } + } + + test("empty IPC stream") { + val schema = StructType(Seq(StructField("int", IntegerType, nullable = true))) + val ctx = TaskContext.empty() + + val batchIter = ArrowConverters.toBatchIterator( + Iterator.empty, schema, 10, null, true, false, ctx) + + val out = new ByteArrayOutputStream() + Utils.tryWithResource(new DataOutputStream(out)) { dataOut => + val writer = new ArrowBatchStreamWriter(schema, dataOut, null, true, false) + writer.writeBatches(batchIter) + writer.end() + } + + val (iterator, outputSchema) = ArrowConverters.fromIPCStreamWithIterator(out.toByteArray, ctx) + assert(outputSchema == schema) + + // Validate metrics for empty stream + // assert(iterator.batchesLoaded == 0) + assert(iterator.totalRowsProcessed == 0) + assert(!iterator.hasNext) + + // Metrics should remain 0 after hasNext check + // assert(iterator.batchesLoaded == 0) + assert(iterator.totalRowsProcessed == 0) + } + + test("IPC stream with invalid data") { + val ctx = TaskContext.empty() + val invalidData = Array[Byte](1, 2, 3, 4, 5) + + intercept[Exception] { + ArrowConverters.fromIPCStream(invalidData, ctx) + } + } + + test("IPC stream with empty data") { + val ctx = TaskContext.empty() + val emptyData = Array.empty[Byte] + + intercept[Exception] { + ArrowConverters.fromIPCStream(emptyData, ctx) + } + } + + test("IPC stream with null context") { + val inputRows = (0 until 5).map(InternalRow(_)) + val schema = StructType(Seq(StructField("int", IntegerType, nullable = true))) + val ctx = TaskContext.empty() + + val batchIter = ArrowConverters.toBatchIterator( + inputRows.iterator, schema, 10, null, true, false, ctx) + + val out = new ByteArrayOutputStream() + Utils.tryWithResource(new DataOutputStream(out)) { dataOut => + val writer = new ArrowBatchStreamWriter(schema, dataOut, null, true, false) + writer.writeBatches(batchIter) + writer.end() + } + + // Test with null context - should still work but won't have cleanup registration + val proj = UnsafeProjection.create(schema) + val (outputRowIter, outputSchema) = ArrowConverters.fromIPCStream(out.toByteArray, null) + assert(outputSchema == schema) + val outputRows = outputRowIter.map(proj(_).copy()).toList + assert(outputRows.length == inputRows.length) + outputRows.zipWithIndex.foreach { case (row, i) => + assert(row.getInt(0) == i) + } + } + + test("multi-batch iteration validation with varying batch sizes") { + val inputRows = (0 until 100).map(InternalRow(_)) + val schema = StructType(Seq(StructField("int", IntegerType, nullable = true))) + val ctx = TaskContext.empty() + + // Create many small batches + val batchIter = ArrowConverters.toBatchIterator( + inputRows.iterator, schema, 3, null, true, false, ctx) + + val out = new ByteArrayOutputStream() + Utils.tryWithResource(new DataOutputStream(out)) { dataOut => + val writer = new ArrowBatchStreamWriter(schema, dataOut, null, true, false) + writer.writeBatches(batchIter) + writer.end() + } + + val (iterator, outputSchema) = ArrowConverters.fromIPCStreamWithIterator(out.toByteArray, ctx) + assert(outputSchema == schema) + + // Initially no batches loaded + assert(iterator.batchesLoaded == 0) + assert(iterator.totalRowsProcessed == 0) + + // Test hasNext multiple times without calling next + assert(iterator.hasNext) + assert(iterator.hasNext) + assert(iterator.hasNext) + + // After hasNext calls, first batch should be loaded + assert(iterator.batchesLoaded == 1) + assert(iterator.totalRowsProcessed == 0) // First batch has 3 rows + + // Consume all rows + val proj = UnsafeProjection.create(schema) + val outputRows = iterator.map(proj(_).copy()).toList + assert(outputRows.length == inputRows.length) + outputRows.zipWithIndex.foreach { case (row, i) => + assert(row.getInt(0) == i) + } + + // With batch size 3 and 100 rows, we expect 34 batches (ceiling(100/3) = 34) + val expectedBatches = Math.ceil(inputRows.length.toDouble / 3).toInt + assert(iterator.batchesLoaded == expectedBatches, + s"Expected $expectedBatches batches loaded, got ${iterator.batchesLoaded}") + assert(iterator.totalRowsProcessed == inputRows.length, + s"Expected ${inputRows.length} rows processed, got ${iterator.totalRowsProcessed}") + + // Verify no more data + assert(!iterator.hasNext) + } + + test("multi-batch iteration with complex schema validation") { + val inputRows = (0 until 50).map { i => + InternalRow( + i, + UTF8String.fromString(s"test-$i"), + if (i % 2 == 0) null else InternalRow(i * 3), + Array(i, i + 1, i + 2).map(x => x.toByte) + ) + } + + val schema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("name", StringType, nullable = false), + StructField("nested", StructType(Seq(StructField("value", IntegerType))), nullable = true), + StructField("bytes", BinaryType, nullable = false) + )) + + val projectedRows = inputRows.map { row => + val proj = UnsafeProjection.create(schema) + proj(row).copy() + } + val ctx = TaskContext.empty() + + // Use small batch size to create many batches + val batchIter = ArrowConverters.toBatchIterator( + projectedRows.iterator, schema, 7, null, true, false, ctx) + + val out = new ByteArrayOutputStream() + Utils.tryWithResource(new DataOutputStream(out)) { dataOut => + val writer = new ArrowBatchStreamWriter(schema, dataOut, null, true, false) + writer.writeBatches(batchIter) + writer.end() + } + + val (outputRowIter, outputSchema) = ArrowConverters.fromIPCStream(out.toByteArray, ctx) + val proj = UnsafeProjection.create(schema) + assert(outputSchema == schema) + val outputRows = outputRowIter.map(proj(_).copy()).toList + assert(outputRows.length == inputRows.length) + + outputRows.zipWithIndex.foreach { case (row, i) => + assert(row.getInt(0) == i) + assert(row.getUTF8String(1).toString == s"test-$i") + if (i % 2 == 0) { + assert(row.isNullAt(2)) + } else { + val nested = row.getStruct(2, 1) + assert(nested.getInt(0) == i * 3) + } + val expectedBytes = Array(i, i + 1, i + 2).map(_.toByte) + assert(row.getBinary(3).sameElements(expectedBytes)) + } + } + + test("IPC stream partial consumption metrics validation") { + val inputRows = (0 until 30).map(InternalRow(_)) + val schema = StructType(Seq(StructField("int", IntegerType, nullable = true))) + val ctx = TaskContext.empty() + val batchSize = 7 + + val batchIter = ArrowConverters.toBatchIterator( + inputRows.iterator, schema, batchSize, null, true, false, ctx) + + val out = new ByteArrayOutputStream() + Utils.tryWithResource(new DataOutputStream(out)) { dataOut => + val writer = new ArrowBatchStreamWriter(schema, dataOut, null, true, false) + writer.writeBatches(batchIter) + writer.end() + } + + val (iterator, outputSchema) = ArrowConverters.fromIPCStreamWithIterator(out.toByteArray, ctx) + assert(outputSchema == schema) + + // Initially no batches loaded + assert(iterator.batchesLoaded == 0) + assert(iterator.totalRowsProcessed == 0) + + // Consume first 10 rows (should load 2 batches: 7 + 3) + val firstBatch = iterator.take(10).toList + assert(firstBatch.length == 10) + + // After consuming 10 rows, we should have loaded at least 2 batches + assert(iterator.batchesLoaded >= 2, + s"Expected at least 2 batches loaded after 10 rows, got ${iterator.batchesLoaded}") + assert(iterator.totalRowsProcessed >= 10, + s"Expected at least 10 rows processed, got ${iterator.totalRowsProcessed}") + + // Consume remaining rows + val remainingRows = iterator.toList + val totalConsumed = firstBatch.length + remainingRows.length + assert(totalConsumed == inputRows.length) + + // Final metrics should show all batches loaded + val expectedBatches = Math.ceil(inputRows.length.toDouble / batchSize).toInt + assert(iterator.batchesLoaded == expectedBatches, + s"Expected $expectedBatches batches loaded, got ${iterator.batchesLoaded}") + assert(iterator.totalRowsProcessed == inputRows.length, + s"Expected ${inputRows.length} rows processed, got ${iterator.totalRowsProcessed}") + } + /** Test that a converted DataFrame to Arrow record batch equals batch read from JSON file */ private def collectAndValidate( df: DataFrame, From 017cb2fee8887900216870e5d39b84795f39eb58 Mon Sep 17 00:00:00 2001 From: Martin Grund Date: Mon, 25 Aug 2025 12:48:22 +0200 Subject: [PATCH 2/3] next --- .../spark/sql/execution/arrow/ArrowConverters.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala index 91afa9b7fda9c..3072a12e3d587 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala @@ -347,7 +347,15 @@ private[sql] object ArrowConverters extends Logging { } override def hasNext: Boolean = { - rowIterator.hasNext || loadNextBatch() + if (rowIterator.hasNext) { + true + } else { + if (!loadNextBatch()) { + false + } else { + hasNext + } + } } override def next(): InternalRow = { From b2b1cc2ab88097138ba08fcdc36edfaa4f85d00b Mon Sep 17 00:00:00 2001 From: Martin Grund Date: Mon, 25 Aug 2025 12:50:33 +0200 Subject: [PATCH 3/3] next --- .../spark/sql/connect/planner/SparkConnectPlanner.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 68a9e41e7d752..c0b1fd01616a4 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -1441,9 +1441,8 @@ class SparkConnectPlanner( } if (rel.hasData) { - val (rows, structType) = ArrowConverters.fromIPCStream( - rel.getData.toByteArray, - TaskContext.get()) + val (rows, structType) = + ArrowConverters.fromIPCStream(rel.getData.toByteArray, TaskContext.get()) if (structType == null) { throw InvalidInputErrors.inputDataForLocalRelationNoSchema() }