36
36
#include " dwio/nimble/velox/SchemaSerialization.h"
37
37
#include " dwio/nimble/velox/SchemaTypes.h"
38
38
#include " dwio/nimble/velox/StatsGenerated.h"
39
+ #include " dwio/nimble/velox/StreamLabels.h"
39
40
#include " folly/ScopeGuard.h"
40
41
#include " velox/common/time/CpuWallTimer.h"
41
42
#include " velox/dwio/common/ExecutorBarrier.h"
@@ -47,6 +48,11 @@ namespace detail {
47
48
48
49
class WriterContext : public FieldWriterContext {
49
50
public:
51
+ struct ColumnStats {
52
+ uint64_t logicalSize{0 };
53
+ uint64_t nullCount{0 };
54
+ };
55
+
50
56
const VeloxWriterOptions options;
51
57
std::unique_ptr<FlushPolicy> flushPolicy;
52
58
velox::CpuWallTiming totalFlushTiming;
@@ -63,6 +69,8 @@ class WriterContext : public FieldWriterContext {
63
69
uint64_t stripeSize{0 };
64
70
uint64_t rawSize{0 };
65
71
std::vector<uint64_t > rowsPerStripe;
72
+ std::unordered_map<offset_size, std::atomic<uint64_t >> streamPhysicalSize;
73
+ std::vector<ColumnStats> columnStats;
66
74
67
75
WriterContext (
68
76
velox::memory::MemoryPool& memoryPool,
@@ -516,10 +524,20 @@ bool VeloxWriter::write(const velox::VectorPtr& vector) {
516
524
auto size = vector->size ();
517
525
518
526
// Calculate raw size.
519
- auto rawSize = nimble::getRawSizeFromVector (
520
- vector, velox::common::Ranges::of (0 , size));
527
+ RawSizeContext context;
528
+ auto rawSize = nimble::getRawSizeFromRowVector (
529
+ vector, velox::common::Ranges::of (0 , size), context, /* topLevel=*/ true );
521
530
DWIO_ENSURE_GE (rawSize, 0 , " Invalid raw size" );
522
531
context_->rawSize += rawSize;
532
+ auto columnCount = context.columnCount ();
533
+ if (context_->columnStats .empty ()) {
534
+ context_->columnStats =
535
+ std::vector<detail::WriterContext::ColumnStats>(columnCount);
536
+ }
537
+ for (auto i = 0 ; i < columnCount; ++i) {
538
+ context_->columnStats [i].logicalSize += context.sizeAt (i);
539
+ context_->columnStats [i].nullCount += context.nullsAt (i);
540
+ }
523
541
524
542
if (context_->options .writeExecutor ) {
525
543
velox::dwio::common::ExecutorBarrier barrier{
@@ -580,6 +598,34 @@ void VeloxWriter::close() {
580
598
builder.GetSize ()});
581
599
}
582
600
601
+ {
602
+ // Accumulate column physical size.
603
+ std::vector<uint64_t > columnPhysicalSize (
604
+ context_->columnStats .size (), 0 );
605
+ nimble::StreamLabels streamLabels{nimble::SchemaReader::getSchema (
606
+ context_->schemaBuilder .getSchemaNodes ())};
607
+ for (const auto & [offset, streamSize] : context_->streamPhysicalSize ) {
608
+ if (offset == 0 ) {
609
+ continue ;
610
+ }
611
+ std::vector<std::string> streamLabel;
612
+ folly::split (
613
+ ' /' ,
614
+ streamLabels.streamLabel (offset),
615
+ streamLabel,
616
+ /* ignoreEmpty=*/ true );
617
+ NIMBLE_ASSERT (!streamLabel.empty (), " Invalid stream label" );
618
+ auto column = std::stoi (streamLabel[0 ]);
619
+ NIMBLE_ASSERT (
620
+ column < columnPhysicalSize.size (),
621
+ fmt::format (
622
+ " Index {} is out of range for physical size vector of size {}" ,
623
+ column,
624
+ columnPhysicalSize.size ()));
625
+ columnPhysicalSize[column] += streamSize;
626
+ }
627
+ }
628
+
583
629
{
584
630
flatbuffers::FlatBufferBuilder builder;
585
631
builder.Finish (serialization::CreateStats (builder, context_->rawSize ));
@@ -691,14 +737,16 @@ void VeloxWriter::writeChunk(bool lastChunk) {
691
737
StreamData& streamData_;
692
738
};
693
739
694
- auto encode = [&](StreamData& streamData) {
740
+ auto encode = [&](StreamData& streamData,
741
+ std::atomic<uint64_t >& streamSize) {
695
742
const auto offset = streamData.descriptor ().offset ();
696
743
auto encoded = encodeStream (*context_, *encodingBuffer_, streamData);
697
744
if (!encoded.empty ()) {
698
745
ChunkedStreamWriter chunkWriter{*encodingBuffer_};
699
746
NIMBLE_DASSERT (offset < streams_.size (), " Stream offset out of range." );
700
747
auto & stream = streams_[offset];
701
748
for (auto & buffer : chunkWriter.encode (encoded)) {
749
+ streamSize += buffer.size ();
702
750
chunkSize += buffer.size ();
703
751
stream.content .push_back (std::move (buffer));
704
752
}
@@ -739,29 +787,35 @@ void VeloxWriter::writeChunk(bool lastChunk) {
739
787
velox::dwio::common::ExecutorBarrier barrier{
740
788
context_->options .encodingExecutor };
741
789
for (auto & streamData : context_->streams ()) {
790
+ auto & streamSize =
791
+ context_->streamPhysicalSize [streamData->descriptor ().offset ()];
742
792
processStream (
743
793
*streamData, [&](StreamData& innerStreamData, bool isNullStream) {
744
- barrier.add ([&innerStreamData, isNullStream, &encode]() {
745
- if (isNullStream) {
746
- NullsAsDataStreamData nullsStreamData{innerStreamData};
747
- encode (nullsStreamData);
748
- } else {
749
- encode (innerStreamData);
750
- }
751
- });
794
+ barrier.add (
795
+ [&innerStreamData, isNullStream, &encode, &streamSize]() {
796
+ if (isNullStream) {
797
+ NullsAsDataStreamData nullsStreamData{innerStreamData};
798
+ encode (nullsStreamData, streamSize);
799
+ } else {
800
+ encode (innerStreamData, streamSize);
801
+ }
802
+ });
752
803
});
753
804
}
754
805
barrier.waitAll ();
755
806
} else {
756
807
for (auto & streamData : context_->streams ()) {
808
+ auto & streamSize =
809
+ context_->streamPhysicalSize [streamData->descriptor ().offset ()];
757
810
processStream (
758
811
*streamData,
759
- [&encode](StreamData& innerStreamData, bool isNullStream) {
812
+ [&encode, &streamSize](
813
+ StreamData& innerStreamData, bool isNullStream) {
760
814
if (isNullStream) {
761
815
NullsAsDataStreamData nullsStreamData{innerStreamData};
762
- encode (nullsStreamData);
816
+ encode (nullsStreamData, streamSize );
763
817
} else {
764
- encode (innerStreamData);
818
+ encode (innerStreamData, streamSize );
765
819
}
766
820
});
767
821
}
0 commit comments