Skip to content

Commit 7e4da36

Browse files
authored
Merge pull request #106 from Altinity/backport/21.3_rework_22988_23976_24311_24885_26249_27176_27484
2 parents 7185b6b + 7201864 commit 7e4da36

File tree

17 files changed

+6559
-54
lines changed

17 files changed

+6559
-54
lines changed

src/Core/Settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class IColumn;
7070
M(UInt64, connections_with_failover_max_tries, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, "The maximum number of attempts to connect to replicas.", 0) \
7171
M(UInt64, s3_min_upload_part_size, 512*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
7272
M(UInt64, s3_max_single_part_upload_size, 64*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
73+
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
7374
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
7475
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
7576
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \

src/Disks/S3/DiskS3.cpp

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -238,9 +238,13 @@ DiskS3::Metadata DiskS3::createMeta(const String & path) const
238238
class ReadIndirectBufferFromS3 final : public ReadBufferFromFileBase
239239
{
240240
public:
241-
ReadIndirectBufferFromS3(
242-
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, DiskS3::Metadata metadata_, size_t buf_size_)
243-
: client_ptr(std::move(client_ptr_)), bucket(bucket_), metadata(std::move(metadata_)), buf_size(buf_size_)
241+
explicit ReadIndirectBufferFromS3(
242+
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, DiskS3::Metadata metadata_, size_t max_single_read_retries_, size_t buf_size_)
243+
: client_ptr(std::move(client_ptr_))
244+
, bucket(bucket_)
245+
, metadata(std::move(metadata_))
246+
, max_single_read_retries(max_single_read_retries_)
247+
, buf_size(buf_size_)
244248
{
245249
}
246250

@@ -296,7 +300,7 @@ class ReadIndirectBufferFromS3 final : public ReadBufferFromFileBase
296300
const auto & [path, size] = metadata.s3_objects[i];
297301
if (size > offset)
298302
{
299-
auto buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, buf_size);
303+
auto buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, max_single_read_retries, buf_size);
300304
buf->seek(offset, SEEK_SET);
301305
return buf;
302306
}
@@ -312,11 +316,11 @@ class ReadIndirectBufferFromS3 final : public ReadBufferFromFileBase
312316
current_buf = initialize();
313317

314318
/// If current buffer has remaining data - use it.
315-
if (current_buf && current_buf->next())
319+
if (current_buf)
316320
{
317-
working_buffer = current_buf->buffer();
318-
absolute_position += working_buffer.size();
319-
return true;
321+
bool result = nextAndShiftPosition();
322+
if (result)
323+
return true;
320324
}
321325

322326
/// If there is no available buffers - nothing to read.
@@ -325,17 +329,32 @@ class ReadIndirectBufferFromS3 final : public ReadBufferFromFileBase
325329

326330
++current_buf_idx;
327331
const auto & path = metadata.s3_objects[current_buf_idx].first;
328-
current_buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, buf_size);
329-
current_buf->next();
330-
working_buffer = current_buf->buffer();
331-
absolute_position += working_buffer.size();
332332

333-
return true;
333+
current_buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, max_single_read_retries, buf_size);
334+
335+
return nextAndShiftPosition();
336+
}
337+
338+
bool nextAndShiftPosition()
339+
{
340+
/// Transfer current position and working_buffer to actual ReadBuffer
341+
swap(*current_buf);
342+
/// Position and working_buffer will be updated in next() call
343+
auto result = current_buf->next();
344+
/// and assigned to current buffer.
345+
swap(*current_buf);
346+
347+
/// absolute position is shifted by a data size that was read in next() call above.
348+
if (result)
349+
absolute_position += working_buffer.size();
350+
351+
return result;
334352
}
335353

336354
std::shared_ptr<Aws::S3::S3Client> client_ptr;
337355
const String & bucket;
338356
DiskS3::Metadata metadata;
357+
UInt64 max_single_read_retries;
339358
size_t buf_size;
340359

341360
size_t absolute_position = 0;
@@ -549,6 +568,7 @@ DiskS3::DiskS3(
549568
String bucket_,
550569
String s3_root_path_,
551570
String metadata_path_,
571+
UInt64 max_single_read_retries_,
552572
size_t min_upload_part_size_,
553573
size_t max_single_part_upload_size_,
554574
size_t min_bytes_for_seek_,
@@ -562,6 +582,7 @@ DiskS3::DiskS3(
562582
, bucket(std::move(bucket_))
563583
, s3_root_path(std::move(s3_root_path_))
564584
, metadata_path(std::move(metadata_path_))
585+
, max_single_read_retries(max_single_read_retries_)
565586
, min_upload_part_size(min_upload_part_size_)
566587
, max_single_part_upload_size(max_single_part_upload_size_)
567588
, min_bytes_for_seek(min_bytes_for_seek_)
@@ -659,7 +680,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, si
659680
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Read from file by path: {}. Existing S3 objects: {}",
660681
backQuote(metadata_path + path), metadata.s3_objects.size());
661682

662-
auto reader = std::make_unique<ReadIndirectBufferFromS3>(client, bucket, metadata, buf_size);
683+
auto reader = std::make_unique<ReadIndirectBufferFromS3>(client, bucket, metadata, max_single_read_retries, buf_size);
663684
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), min_bytes_for_seek);
664685
}
665686

@@ -916,6 +937,9 @@ void DiskS3::createFileOperationObject(const String & operation_name, UInt64 rev
916937

917938
void DiskS3::startup()
918939
{
940+
/// Need to be enabled if it was disabled during shutdown() call.
941+
client->EnableRequestProcessing();
942+
919943
if (!send_metadata)
920944
return;
921945

src/Disks/S3/DiskS3.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class DiskS3 : public IDisk
3939
String bucket_,
4040
String s3_root_path_,
4141
String metadata_path_,
42+
UInt64 max_single_read_retries_,
4243
size_t min_upload_part_size_,
4344
size_t max_single_part_upload_size_,
4445
size_t min_bytes_for_seek_,
@@ -158,6 +159,7 @@ class DiskS3 : public IDisk
158159
const String bucket;
159160
const String s3_root_path;
160161
const String metadata_path;
162+
UInt64 max_single_read_retries;
161163
size_t min_upload_part_size;
162164
size_t max_single_part_upload_size;
163165
size_t min_bytes_for_seek;

src/Disks/S3/registerDiskS3.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ void registerDiskS3(DiskFactory & factory)
152152
uri.bucket,
153153
uri.key,
154154
metadata_path,
155+
context.getSettingsRef().s3_max_single_read_retries,
155156
context.getSettingsRef().s3_min_upload_part_size,
156157
context.getSettingsRef().s3_max_single_part_upload_size,
157158
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),

src/IO/ReadBufferFromS3.cpp

Lines changed: 62 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ namespace ProfileEvents
1616
{
1717
extern const Event S3ReadMicroseconds;
1818
extern const Event S3ReadBytes;
19+
extern const Event S3ReadRequestsErrors;
1920
}
2021

2122
namespace DB
@@ -29,38 +30,83 @@ namespace ErrorCodes
2930

3031

3132
ReadBufferFromS3::ReadBufferFromS3(
32-
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, const String & key_, size_t buffer_size_)
33-
: SeekableReadBuffer(nullptr, 0), client_ptr(std::move(client_ptr_)), bucket(bucket_), key(key_), buffer_size(buffer_size_)
33+
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, const String & key_, UInt64 max_single_read_retries_, size_t buffer_size_)
34+
: SeekableReadBuffer(nullptr, 0)
35+
, client_ptr(std::move(client_ptr_))
36+
, bucket(bucket_)
37+
, key(key_)
38+
, max_single_read_retries(max_single_read_retries_)
39+
, buffer_size(buffer_size_)
3440
{
3541
}
3642

3743

3844
bool ReadBufferFromS3::nextImpl()
3945
{
40-
if (!initialized)
46+
bool next_result = false;
47+
48+
if (impl)
49+
{
50+
/// `impl` has been initialized earlier and now we're at the end of the current portion of data.
51+
impl->position() = position();
52+
assert(!impl->hasPendingData());
53+
}
54+
else
4155
{
56+
/// `impl` is not initialized and we're about to read the first portion of data.
4257
impl = initialize();
43-
initialized = true;
58+
next_result = impl->hasPendingData();
4459
}
4560

46-
Stopwatch watch;
47-
auto res = impl->next();
48-
watch.stop();
49-
ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds());
61+
auto sleep_time_with_backoff_milliseconds = std::chrono::milliseconds(100);
62+
for (size_t attempt = 0; (attempt < max_single_read_retries) && !next_result; ++attempt)
63+
{
64+
Stopwatch watch;
65+
try
66+
{
67+
/// Try to read a next portion of data.
68+
next_result = impl->next();
69+
watch.stop();
70+
ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds());
71+
break;
72+
}
73+
catch (const Exception & e)
74+
{
75+
watch.stop();
76+
ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds());
77+
ProfileEvents::increment(ProfileEvents::S3ReadRequestsErrors, 1);
78+
79+
LOG_INFO(log, "Caught exception while reading S3 object. Bucket: {}, Key: {}, Offset: {}, Attempt: {}, Message: {}",
80+
bucket, key, getPosition(), attempt, e.message());
81+
82+
if (attempt + 1 == max_single_read_retries)
83+
throw;
84+
85+
/// Pause before next attempt.
86+
std::this_thread::sleep_for(sleep_time_with_backoff_milliseconds);
87+
sleep_time_with_backoff_milliseconds *= 2;
88+
89+
/// Try to reinitialize `impl`.
90+
impl.reset();
91+
92+
impl = initialize();
93+
next_result = impl->hasPendingData();
94+
}
95+
}
5096

51-
if (!res)
97+
if (!next_result)
5298
return false;
53-
internal_buffer = impl->buffer();
5499

55-
ProfileEvents::increment(ProfileEvents::S3ReadBytes, internal_buffer.size());
100+
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset()); /// use the buffer returned by `impl`
56101

57-
working_buffer = internal_buffer;
102+
ProfileEvents::increment(ProfileEvents::S3ReadBytes, working_buffer.size());
103+
offset += working_buffer.size();
58104
return true;
59105
}
60106

61107
off_t ReadBufferFromS3::seek(off_t offset_, int whence)
62108
{
63-
if (initialized)
109+
if (impl)
64110
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
65111

66112
if (whence != SEEK_SET)
@@ -77,18 +123,17 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence)
77123

78124
off_t ReadBufferFromS3::getPosition()
79125
{
80-
return offset + count();
126+
return offset - available();
81127
}
82128

83129
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
84130
{
85-
LOG_TRACE(log, "Read S3 object. Bucket: {}, Key: {}, Offset: {}", bucket, key, std::to_string(offset));
131+
LOG_TRACE(log, "Read S3 object. Bucket: {}, Key: {}, Offset: {}", bucket, key, offset);
86132

87133
Aws::S3::Model::GetObjectRequest req;
88134
req.SetBucket(bucket);
89135
req.SetKey(key);
90-
if (offset != 0)
91-
req.SetRange("bytes=" + std::to_string(offset) + "-");
136+
req.SetRange(fmt::format("bytes={}-", offset));
92137

93138
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
94139

src/IO/ReadBufferFromS3.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ class ReadBufferFromS3 : public SeekableReadBuffer
2727
std::shared_ptr<Aws::S3::S3Client> client_ptr;
2828
String bucket;
2929
String key;
30+
UInt64 max_single_read_retries;
3031
size_t buffer_size;
31-
bool initialized = false;
3232
off_t offset = 0;
3333
Aws::S3::Model::GetObjectResult read_result;
3434
std::unique_ptr<ReadBuffer> impl;
@@ -40,6 +40,7 @@ class ReadBufferFromS3 : public SeekableReadBuffer
4040
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
4141
const String & bucket_,
4242
const String & key_,
43+
UInt64 max_single_read_retries_,
4344
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
4445

4546
bool nextImpl() override;

src/Storages/StorageS3.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ namespace
7272
const Context & context,
7373
const ColumnsDescription & columns,
7474
UInt64 max_block_size,
75+
UInt64 max_single_read_retries_,
7576
const CompressionMethod compression_method,
7677
const std::shared_ptr<Aws::S3::S3Client> & client,
7778
const String & bucket,
@@ -82,7 +83,7 @@ namespace
8283
, with_path_column(need_path)
8384
, file_path(bucket + "/" + key)
8485
{
85-
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromS3>(client, bucket, key), compression_method);
86+
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromS3>(client, bucket, key, max_single_read_retries_), compression_method);
8687
auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
8788
reader = std::make_shared<InputStreamFromInputFormat>(input_format);
8889

@@ -199,6 +200,7 @@ StorageS3::StorageS3(
199200
const String & secret_access_key_,
200201
const StorageID & table_id_,
201202
const String & format_name_,
203+
UInt64 max_single_read_retries_,
202204
UInt64 min_upload_part_size_,
203205
UInt64 max_single_part_upload_size_,
204206
UInt64 max_connections_,
@@ -213,6 +215,7 @@ StorageS3::StorageS3(
213215
, max_connections(max_connections_)
214216
, global_context(context_.getGlobalContext())
215217
, format_name(format_name_)
218+
, max_single_read_retries(max_single_read_retries_)
216219
, min_upload_part_size(min_upload_part_size_)
217220
, max_single_part_upload_size(max_single_part_upload_size_)
218221
, compression_method(compression_method_)
@@ -318,6 +321,7 @@ Pipe StorageS3::read(
318321
context,
319322
metadata_snapshot->getColumns(),
320323
max_block_size,
324+
max_single_read_retries,
321325
chooseCompressionMethod(uri.key, compression_method),
322326
client,
323327
uri.bucket,
@@ -402,6 +406,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
402406
secret_access_key = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
403407
}
404408

409+
UInt64 max_single_read_retries = args.local_context.getSettingsRef().s3_max_single_read_retries;
405410
UInt64 min_upload_part_size = args.local_context.getSettingsRef().s3_min_upload_part_size;
406411
UInt64 max_single_part_upload_size = args.local_context.getSettingsRef().s3_max_single_part_upload_size;
407412
UInt64 max_connections = args.local_context.getSettingsRef().s3_max_connections;
@@ -425,6 +430,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
425430
secret_access_key,
426431
args.table_id,
427432
format_name,
433+
max_single_read_retries,
428434
min_upload_part_size,
429435
max_single_part_upload_size,
430436
max_connections,

src/Storages/StorageS3.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class StorageS3 : public ext::shared_ptr_helper<StorageS3>, public IStorage
3131
const String & secret_access_key,
3232
const StorageID & table_id_,
3333
const String & format_name_,
34+
UInt64 max_single_read_retries_,
3435
UInt64 min_upload_part_size_,
3536
UInt64 max_single_part_upload_size_,
3637
UInt64 max_connections_,
@@ -65,6 +66,7 @@ class StorageS3 : public ext::shared_ptr_helper<StorageS3>, public IStorage
6566
const Context & global_context;
6667

6768
String format_name;
69+
UInt64 max_single_read_retries;
6870
size_t min_upload_part_size;
6971
size_t max_single_part_upload_size;
7072
String compression_method;

src/TableFunctions/TableFunctionS3.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, const C
6666
{
6767
Poco::URI uri (filename);
6868
S3::URI s3_uri (uri);
69+
UInt64 max_single_read_retries = context.getSettingsRef().s3_max_single_read_retries;
6970
UInt64 min_upload_part_size = context.getSettingsRef().s3_min_upload_part_size;
7071
UInt64 max_single_part_upload_size = context.getSettingsRef().s3_max_single_part_upload_size;
7172
UInt64 max_connections = context.getSettingsRef().s3_max_connections;
@@ -76,6 +77,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, const C
7677
secret_access_key,
7778
StorageID(getDatabaseName(), table_name),
7879
format,
80+
max_single_read_retries,
7981
min_upload_part_size,
8082
max_single_part_upload_size,
8183
max_connections,

0 commit comments

Comments
 (0)