diff --git a/src/main/scala/shark/execution/HadoopTableReader.scala b/src/main/scala/shark/execution/HadoopTableReader.scala index d96596f5..3580cc1a 100644 --- a/src/main/scala/shark/execution/HadoopTableReader.scala +++ b/src/main/scala/shark/execution/HadoopTableReader.scala @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazyStruct import org.apache.hadoop.hive.serde2.objectinspector.{StructObjectInspector, ObjectInspectorConverters} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf} +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.IdentityConverter import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} @@ -194,30 +195,39 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf partSerDe.getObjectInspector(), tblConvertedOI) val rowWithPartArr = new Array[Object](2) // Map each tuple to a row object - iter.map { value => - val deserializedRow = { - - // If partition schema does not match table schema, update the row to match - val convertedRow = partTblObjectInspectorConverter.convert(partSerDe.deserialize(value)) - - // If conversion was performed, convertedRow will be a standard Object, but if - // conversion wasn't necessary, it will still be lazy. We can't have both across - // partitions, so we serialize and deserialize again to make it lazy. - if (tableSerDe.isInstanceOf[OrcSerde]) { - convertedRow - } else { - convertedRow match { - case _: LazyStruct => convertedRow - case _: HiveColumnarStruct => convertedRow - case _ => tableSerDe.deserialize( - tableSerDe.asInstanceOf[Serializer].serialize( - convertedRow, tblConvertedOI)) + + // this is done per partition, and no necessary put it in the iterations (in iter.map). + rowWithPartArr.update(1, partValues) + if (partTblObjectInspectorConverter.isInstanceOf[IdentityConverter]) { + iter.map { value => + rowWithPartArr.update(0, partSerDe.deserialize(value)) + rowWithPartArr.asInstanceOf[Object] + } + } + else { + iter.map { value => + val deserializedRow = { + + // If partition schema does not match table schema, update the row to match + val convertedRow = partTblObjectInspectorConverter.convert(partSerDe.deserialize(value)) + + // If conversion was performed, convertedRow will be a standard Object, but if + // conversion wasn't necessary, it will still be lazy. We can't have both across + // partitions, so we serialize and deserialize again to make it lazy. + if (tableSerDe.isInstanceOf[OrcSerde]) { + convertedRow + } else { + convertedRow match { + case _: LazyStruct => convertedRow + case _: HiveColumnarStruct => convertedRow + case _ => + tableSerDe.deserialize( tableSerDe.asInstanceOf[Serializer].serialize( convertedRow, tblConvertedOI)) + } } } + rowWithPartArr.update(0, deserializedRow) + rowWithPartArr.asInstanceOf[Object] } - rowWithPartArr.update(0, deserializedRow) - rowWithPartArr.update(1, partValues) - rowWithPartArr.asInstanceOf[Object] } } }.toSeq