diff --git a/blob/lib/azure/storage/blob/blob.rb b/blob/lib/azure/storage/blob/blob.rb index 30dcfff1..47f0a291 100644 --- a/blob/lib/azure/storage/blob/blob.rb +++ b/blob/lib/azure/storage/blob/blob.rb @@ -23,6 +23,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. #-------------------------------------------------------------------------- +require 'thread' module Azure::Storage module Blob @@ -82,11 +83,17 @@ def initialize # - The lease ID specified in the request matches that of the blob. # If this header is specified and both of these conditions are not met, the request will fail # and the Get Blob operation will fail with status code 412 (Precondition Failed). + # * +:parallel_threshold+ - Integer. Complete the request concurrently if the specified range >= this threshold. + # Takes precedence over the storage_blob_parallel_threshold client option. # # See http://msdn.microsoft.com/en-us/library/azure/dd179440.aspx # # Returns a blob and the blob body def get_blob(container, blob, options = {}) + if options[:end_range].to_i - options[:start_range].to_i >= parallel_threshold(options) + return get_blob_parallel(container, blob, options) + end + query = {} StorageService.with_query query, "snapshot", options[:snapshot] StorageService.with_query query, "timeout", options[:timeout] if options[:timeout] @@ -928,5 +935,50 @@ def delete_blob(container, blob, options = {}) call(:delete, uri, nil, headers, options) nil end + + # Private: executes get_blob with threads + # + # Returns a blob and the blob body + private + def get_blob_parallel(container, blob, options) + blob_size = get_blob_properties(container, blob, options).properties[:content_length] + end_range = [options[:end_range], blob_size].min + start_range = options[:start_range] + total_bytes = end_range - start_range + + # if after calculating the real end_range it is below threshold, return + if total_bytes < parallel_threshold(options) + return get_blob(container, blob, options.merge({parallel_threshold: Float::INFINITY, start_range: start_range, end_range: end_range})) + end + + thread_count = client.storage_blob_parallel_threads + single_thread_chunk = (total_bytes.to_f / thread_count).ceil + ranges = (1..thread_count).to_a.map do |i| + start = options[:start_range] + ((i - 1) * single_thread_chunk) + fin = [start + single_thread_chunk - 1, options[:end_range]].min + [start, fin] + end + + threads = [] + ranges.each do |start, fin| + thread = Thread.new do + get_blob(container, blob, options.merge({parallel_threshold: Float::INFINITY, start_range: start, end_range: fin})) + end + thread.abort_on_exception = true + threads << thread + end + + # each get_blob returns a [result, body]. Transpose into results, bodies + results, bodies = threads.map(&:value).transpose + result = results.first + result.properties[:content_length] = total_bytes if result + return result, bodies.join + end + + # Private: The number of bytes used to determine if a request should be parallelized + private + def parallel_threshold(options) + options[:parallel_threshold] || client.storage_blob_parallel_threshold || Float::INFINITY + end end end diff --git a/blob/lib/azure/storage/blob/block.rb b/blob/lib/azure/storage/blob/block.rb index 007dd20d..b9feb0a7 100644 --- a/blob/lib/azure/storage/blob/block.rb +++ b/blob/lib/azure/storage/blob/block.rb @@ -468,10 +468,25 @@ def create_block_blob_multiple_put(container, blob, content, size, options = {}) # Get the number of blocks block_count = (Float(size) / Float(block_size)).ceil block_list = [] - for block_id in 0...block_count - id = block_id.to_s.rjust(6, "0") - put_blob_block(container, blob, id, content.read(block_size), timeout: options[:timeout], lease_id: options[:lease_id]) - block_list.push([id]) + + # Split the block list into groups of threads and upload in parallel + max_thread_count = client.options[:storage_blob_parallel_threads] || 1 + (0..block_count - 1).to_a.each_slice(max_thread_count) do |block_slice| + threads = [] + thread_count = block_slice.length + block_slice_data = content.read(block_size * thread_count) + + block_slice.each_with_index do |block_id, index_in_block_slice| + thread = Thread.new do + id = block_id.to_s.rjust(6, "0") + put_blob_block(container, blob, id, block_slice_data.slice(index_in_block_slice * block_size, block_size), timeout: options[:timeout], lease_id: options[:lease_id]) + [id] + end + thread.abort_on_exception = true + threads << thread + end + + block_list.concat threads.map(&:value).sort end # Commit the blocks put @@ -521,7 +536,7 @@ def get_block_size(size) if size > BlobConstants::MAX_BLOCK_BLOB_SIZE raise ArgumentError, "Block blob size should be less than #{BlobConstants::MAX_BLOCK_BLOB_SIZE} bytes in size" elsif (size / BlobConstants::MAX_BLOCK_COUNT) < BlobConstants::DEFAULT_WRITE_BLOCK_SIZE_IN_BYTES - BlobConstants::DEFAULT_WRITE_BLOCK_SIZE_IN_BYTES + client.options[:storage_blob_write_block_size] || BlobConstants::DEFAULT_WRITE_BLOCK_SIZE_IN_BYTES else BlobConstants::MAX_BLOCK_SIZE end diff --git a/blob/lib/azure/storage/blob/default.rb b/blob/lib/azure/storage/blob/default.rb index 35338747..5be27fe4 100644 --- a/blob/lib/azure/storage/blob/default.rb +++ b/blob/lib/azure/storage/blob/default.rb @@ -86,7 +86,7 @@ module BlobConstants DEFAULT_SINGLE_BLOB_PUT_THRESHOLD_IN_BYTES = 128 * 1024 * 1024 # The default write block size, in bytes, used by blob streams. - DEFAULT_WRITE_BLOCK_SIZE_IN_BYTES = 4 * 1024 * 1024 + DEFAULT_WRITE_BLOCK_SIZE_IN_BYTES = 5 * 1024 * 1024 # The maximum size of a single block. MAX_BLOCK_SIZE = 100 * 1024 * 1024 diff --git a/common/lib/azure/storage/common/client.rb b/common/lib/azure/storage/common/client.rb index 09a77460..c775b24a 100644 --- a/common/lib/azure/storage/common/client.rb +++ b/common/lib/azure/storage/common/client.rb @@ -47,9 +47,12 @@ class Client # * +:storage_access_key+ - Base64 String. The access key of the storage account. # * +:storage_sas_token+ - String. The signed access signature for the storage account or one of its service. # * +:storage_blob_host+ - String. Specified Blob serivce endpoint or hostname + # * +:storage_blob_parallel_threshold+ - Integer. Complete requests concurrently if the range greater than or equal to this value. + # * +:storage_blob_parallel_threads+ - Integer. Number of threads for parallel operations. Should be less than http_pool_size. # * +:storage_table_host+ - String. Specified Table serivce endpoint or hostname # * +:storage_queue_host+ - String. Specified Queue serivce endpoint or hostname # * +:storage_dns_suffix+ - String. The suffix of a regional Storage Serivce, to + # * +:http_pool_size+ - Integer. Persistent HTTP Client pool size. # * +:default_endpoints_protocol+ - String. http or https # * +:use_path_style_uri+ - String. Whether use path style URI for specified endpoints # * +:ca_file+ - String. File path of the CA file if having issue with SSL @@ -97,9 +100,12 @@ class << self # * +:storage_access_key+ - Base64 String. The access key of the storage account. # * +:storage_sas_token+ - String. The signed access signature for the storage account or one of its service. # * +:storage_blob_host+ - String. Specified Blob service endpoint or hostname + # * +:storage_blob_parallel_threshold+ - Integer. Complete requests concurrently if the range greater than or equal to this value. + # * +:storage_blob_parallel_threads+ - Integer. Number of threads for parallel operations. Should be less than http_pool_size. # * +:storage_table_host+ - String. Specified Table service endpoint or hostname # * +:storage_queue_host+ - String. Specified Queue service endpoint or hostname # * +:storage_dns_suffix+ - String. The suffix of a regional Storage Service, to + # * +:http_pool_size+ - Integer. Persistent HTTP Client pool size. # * +:default_endpoints_protocol+ - String. http or https # * +:use_path_style_uri+ - String. Whether use path style URI for specified endpoints # * +:ca_file+ - String. File path of the CA file if having issue with SSL diff --git a/common/lib/azure/storage/common/client_options.rb b/common/lib/azure/storage/common/client_options.rb index cb768300..bf5153ba 100644 --- a/common/lib/azure/storage/common/client_options.rb +++ b/common/lib/azure/storage/common/client_options.rb @@ -30,7 +30,7 @@ module Azure::Storage::Common module ClientOptions - attr_accessor :ca_file, :ssl_version, :ssl_min_version, :ssl_max_version + attr_accessor :ca_file, :ssl_version, :ssl_min_version, :ssl_max_version, :http_pool_size # Public: Reset options for [Azure::Storage::Common::Client] # @@ -49,9 +49,13 @@ module ClientOptions # * +:storage_access_key+ - Base64 String. The access key of the storage account. # * +:storage_sas_token+ - String. The signed access signature for the storage account or one of its service. # * +:storage_blob_host+ - String. Specified Blob serivce endpoint or hostname + # * +:storage_blob_write_block_size+ - Integer. Block size in bytes for blob writes. Default is 5MB. + # * +:storage_blob_parallel_threshold+ - Integer. Complete requests concurrently if the range greater than or equal to this value. + # * +:storage_blob_parallel_threads+ - Integer. Number of threads for parallel operations. Should be less than http_pool_size. # * +:storage_table_host+ - String. Specified Table serivce endpoint or hostname # * +:storage_queue_host+ - String. Specified Queue serivce endpoint or hostname # * +:storage_dns_suffix+ - String. The suffix of a regional Storage Serivce, to + # * +:http_pool_size+ - Integer. Persistent HTTP Client pool size. Default is 5. # * +:default_endpoints_protocol+ - String. http or https # * +:use_path_style_uri+ - String. Whether use path style URI for specified endpoints # * +:ca_file+ - String. File path of the CA file if having issue with SSL @@ -91,6 +95,7 @@ def reset!(options = {}) @ssl_version = options.delete(:ssl_version) @ssl_min_version = options.delete(:ssl_min_version) @ssl_max_version = options.delete(:ssl_max_version) + @http_pool_size = options.delete(:http_pool_size) @options = filter(options) self.send(:reset_config!, @options) if self.respond_to?(:reset_config!) self @@ -120,6 +125,9 @@ def self.valid_options :storage_connection_string, :storage_sas_token, :storage_blob_host, + :storage_blob_write_block_size, + :storage_blob_parallel_threshold, + :storage_blob_parallel_threads, :storage_table_host, :storage_queue_host, :storage_file_host, @@ -139,6 +147,9 @@ def self.env_vars_mapping "AZURE_STORAGE_ACCESS_KEY" => :storage_access_key, "AZURE_STORAGE_CONNECTION_STRING" => :storage_connection_string, "AZURE_STORAGE_BLOB_HOST" => :storage_blob_host, + "AZURE_STORAGE_WRITE_BLOCK_SIZE" => :storage_blob_write_block_size, + "AZURE_STORAGE_BLOB_PARALLEL_THRESHOLD" => :storage_blob_parallel_threshold, + "AZURE_STORAGE_BLOB_PARALLEL_THREADS" => :storage_blob_parallel_threads, "AZURE_STORAGE_TABLE_HOST" => :storage_table_host, "AZURE_STORAGE_QUEUE_HOST" => :storage_queue_host, "AZURE_STORAGE_FILE_HOST" => :storage_file_host, @@ -154,10 +165,14 @@ def self.connection_string_mapping @connection_string_mapping ||= { "UseDevelopmentStorage" => :use_development_storage, "DevelopmentStorageProxyUri" => :development_storage_proxy_uri, + "HttpPoolSize" => :http_pool_size, "DefaultEndpointsProtocol" => :default_endpoints_protocol, "AccountName" => :storage_account_name, "AccountKey" => :storage_access_key, "BlobEndpoint" => :storage_blob_host, + "BlobParallelThreshold" => :storage_blob_parallel_threshold, + "BlobParallelThreads" => :storage_blob_parallel_threads, + "BlobWriteBlockSize" => :storage_blob_write_block_size, "TableEndpoint" => :storage_table_host, "QueueEndpoint" => :storage_queue_host, "FileEndpoint" => :storage_file_host, @@ -300,7 +315,7 @@ def validated_options(opts, requirements = {}) required = requirements[:required] || [] at_least_one = requirements[:at_least_one] || [] only_one = requirements[:only_one] || [] - optional = requirements[:optional] || [] + optional = (requirements[:optional] || []).concat([:storage_blob_write_block_size, :storage_blob_parallel_threshold, :storage_blob_parallel_threads]) raise InvalidOptionsError, "Not all required keys are provided: #{required}" if required.any? { |k| !opts.key? k } raise InvalidOptionsError, "Only one of #{only_one} is required" unless only_one.length == 0 || only_one.count { |k| opts.key? k } == 1 @@ -313,6 +328,9 @@ def validated_options(opts, requirements = {}) storage_access_key: is_base64_encoded, storage_sas_token: lambda { |i| i.is_a?(String) }, storage_blob_host: is_url, + storage_blob_write_block_size: lambda { |i| i.is_a?(Integer) }, + storage_blob_parallel_threshold: lambda { |i| i.is_a?(Integer) }, + storage_blob_parallel_threads: lambda { |i| i.is_a?(Integer) }, storage_table_host: is_url, storage_queue_host: is_url, storage_file_host: is_url, diff --git a/common/lib/azure/storage/common/configurable.rb b/common/lib/azure/storage/common/configurable.rb index 6a79ceee..f6d935c7 100644 --- a/common/lib/azure/storage/common/configurable.rb +++ b/common/lib/azure/storage/common/configurable.rb @@ -39,6 +39,12 @@ module Configurable # emulator). This should be the complete host, including http:// at the # start. When using the emulator, make sure to include your account name at # the end. + # @!attribute storage_blob_write_block_size + # @return [Integer] Set the block size in bytes for blob writes + # @!attribute storage_blob_parallel_threshold + # @return [Integer] Set the range threshold in bytes for blob operation parallelization. + # @!attribute storage_blob_parallel_threads + # @return [Integer] Set the number of threads for blob operation parallelization. # @!attribute storage_table_host # @return [String] Set the host for the Table service. Only set this if you want # something custom (like, for example, to point this to a LocalStorage @@ -55,7 +61,10 @@ module Configurable attr_accessor :storage_access_key, :storage_account_name, :storage_connection_string, - :storage_sas_token + :storage_sas_token, + :storage_blob_write_block_size, + :storage_blob_parallel_threshold, + :storage_blob_parallel_threads attr_writer :storage_table_host, :storage_blob_host, @@ -79,6 +88,9 @@ def keys :storage_sas_token, :storage_table_host, :storage_blob_host, + :storage_blob_write_block_size, + :storage_blob_parallel_threshold, + :storage_blob_parallel_threads, :storage_queue_host, :storage_file_host, :signer diff --git a/common/lib/azure/storage/common/core/http_client.rb b/common/lib/azure/storage/common/core/http_client.rb index 4fdc06f4..9c35b52d 100644 --- a/common/lib/azure/storage/common/core/http_client.rb +++ b/common/lib/azure/storage/common/core/http_client.rb @@ -72,7 +72,7 @@ def build_http(uri) end || nil Faraday.new(uri, ssl: ssl_options, proxy: proxy_options) do |conn| conn.use FaradayMiddleware::FollowRedirects - conn.adapter :net_http_persistent, pool_size: 5 do |http| + conn.adapter :net_http_persistent, pool_size: (self.http_pool_size || 5) do |http| # yields Net::HTTP::Persistent http.idle_timeout = 100 end diff --git a/common/lib/azure/storage/common/default.rb b/common/lib/azure/storage/common/default.rb index 197d3c4c..52c8a098 100644 --- a/common/lib/azure/storage/common/default.rb +++ b/common/lib/azure/storage/common/default.rb @@ -115,6 +115,18 @@ def storage_blob_host ENV["AZURE_STORAGE_BLOB_HOST"] end + def storage_blob_write_block_size + ENV["AZURE_STORAGE_BLOB_WRITE_BLOCK_SIZE"]&.to_i + end + + def storage_blob_parallel_threads + ENV["AZURE_STORAGE_BLOB_PARALLEL_THREADS"]&.to_i + end + + def storage_blob_parallel_threshold + ENV["AZURE_STORAGE_BLOB_PARALLEL_THRESHOLD"]&.to_i + end + # Default storage queue host # @return [String] def storage_queue_host diff --git a/test/integration/blob/block_blob_test.rb b/test/integration/blob/block_blob_test.rb index 740ad41c..618748d6 100644 --- a/test/integration/blob/block_blob_test.rb +++ b/test/integration/blob/block_blob_test.rb @@ -28,6 +28,17 @@ describe Azure::Storage::Blob::BlobService do subject { Azure::Storage::Blob::BlobService.create(SERVICE_CREATE_OPTIONS()) } + let(:parallel_subject) { + Azure::Storage::Blob::BlobService::create( + { + storage_account_name: "mockaccount", + storage_access_key: "YWNjZXNzLWtleQ==", + storage_blob_parallel_threads: 2, + storage_blob_parallel_threshold: 4, + storage_blob_write_block_size: 4 + }) + } + after { ContainerNameHelper.clean } let(:container_name) { ContainerNameHelper.name } @@ -87,6 +98,20 @@ _(blob.properties[:content_type]).must_equal "text/plain; charset=UTF-8" end + it "creates a block that is larger than single upload in parallel" do + options = {} + options[:single_upload_threshold] = Azure::Storage::Blob::BlobConstants::DEFAULT_WRITE_BLOCK_SIZE_IN_BYTES + content_50_mb = SecureRandom.random_bytes(50 * 1024 * 1024) + content_50_mb.force_encoding "utf-8" + blob_name = BlobNameHelper.name + blob = parallel_subject.create_block_blob container_name, blob_name, content_50_mb, options + _(blob.name).must_equal blob_name + # No content length if single upload + _(blob.properties[:content_length]).must_equal 50 * 1024 * 1024 + _(blob.properties[:content_type]).must_equal "text/plain; charset=UTF-8" + end + + it "should create a block blob with spaces in name" do blob_name = "blob with spaces" blob = subject.create_block_blob container_name, blob_name, "content" diff --git a/test/unit/blob/blob_service_test.rb b/test/unit/blob/blob_service_test.rb index 860fd74d..91d35d60 100644 --- a/test/unit/blob/blob_service_test.rb +++ b/test/unit/blob/blob_service_test.rb @@ -29,6 +29,16 @@ subject { Azure::Storage::Blob::BlobService::create({ storage_account_name: "mockaccount", storage_access_key: "YWNjZXNzLWtleQ==" }) } + let(:parallel_subject) { + Azure::Storage::Blob::BlobService::create( + { + storage_account_name: "mockaccount", + storage_access_key: "YWNjZXNzLWtleQ==", + storage_blob_parallel_threads: 2, + storage_blob_parallel_threshold: 4, + storage_blob_write_block_size: 4 + }) + } let(:serialization) { Azure::Storage::Blob::Serialization } let(:uri) { URI.parse "http://foo.com" } let(:query) { {} } @@ -41,10 +51,17 @@ let(:response_body) { mock() } let(:response) { mock() } + let(:parallel_response_headers) { {} } + let(:parallel_response_body) { "AB" } + let(:parallel_response) { mock() } + before { response.stubs(:body).returns(response_body) response.stubs(:headers).returns(response_headers) subject.stubs(:call).returns(response) + + parallel_response.stubs(:body).returns(parallel_response_body) + parallel_response.stubs(:headers).returns(parallel_response_headers) } describe "#create_from_connection_string" do @@ -2062,6 +2079,35 @@ _(returned_blob_contents).must_equal response_body end + it "fetches in parallel when parallel options are set" do + blob_length = 4 + end_range = 4 + parallel_subject.expects(:call).twice.returns(parallel_response) + parallel_subject.stubs(:get_blob_properties).returns(OpenStruct.new(properties: {content_length: blob_length})) + _, returned_blob_contents = parallel_subject.get_blob container_name, blob_name, {start_range: 0, end_range: end_range} + + # The response is stubbed to return "AB" so fetching 4 bytes returns "AB" + "AB" + _(returned_blob_contents).must_equal (parallel_response_body + parallel_response_body) + end + + it "fetches in parallel limited to blob size" do + blob_length = 4 + end_range = 6 + parallel_subject.expects(:call).twice.returns(parallel_response) + parallel_subject.stubs(:get_blob_properties).returns(OpenStruct.new(properties: {content_length: blob_length})) + _, returned_blob_contents = parallel_subject.get_blob container_name, blob_name, {start_range: 0, end_range: end_range} + + # The response is stubbed to return "AB" so fetching 4 bytes returns "AB" + "AB" + _(returned_blob_contents).must_equal (parallel_response_body + parallel_response_body) + end + + it "fetches in serial when below parallel threshold" do + parallel_subject.expects(:call).once.returns(parallel_response) + _, returned_blob_contents = parallel_subject.get_blob container_name, blob_name, {start_range: 0, end_range: 3} + + _(returned_blob_contents).must_equal parallel_response_body + end + describe "when snapshot is provided" do let(:source_snapshot) { "source-snapshot" } before { query["snapshot"] = source_snapshot } diff --git a/test/unit/config/client_services_test.rb b/test/unit/config/client_services_test.rb index 74b84a2d..967d66ad 100644 --- a/test/unit/config/client_services_test.rb +++ b/test/unit/config/client_services_test.rb @@ -85,5 +85,19 @@ _(subjectA.storage_queue_host).must_equal subjectB.storage_queue_host _(subjectA.storage_file_host).must_equal subjectB.storage_file_host end + + it "storage with optional blob options works" do + subject = Azure::Storage::Common::Client.create( + storage_account_name: azure_storage_account, + storage_access_key: azure_storage_access_key, + storage_blob_parallel_threads: 3, + storage_blob_parallel_threshold: 6 * 1024 * 1024, + storage_blob_write_block_size: 9 * 1024 * 1024, + ) + + _(subject.storage_blob_parallel_threads).must_equal 3 + _(subject.storage_blob_parallel_threshold).must_equal 6 * 1024 * 1024 + _(subject.storage_blob_write_block_size).must_equal 9 * 1024 * 1024 + end end end