Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Sources/ClientRuntime/Networking/Http/NIO/NIOHTTPClientError.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

/// Errors that are particular to the NIO-based Smithy HTTP client.
public enum NIOHTTPClientError: Error {

case streamingError(Error)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import AsyncHTTPClient
import Foundation
import NIO
import Smithy
import SmithyStreams

/// Handles streaming between Smithy streams and AsyncHTTPClient
final class NIOHTTPClientStreamBridge {

/// Convert Smithy ByteStream to AsyncHTTPClient request body
static func convertRequestBody(
from body: ByteStream,
allocator: ByteBufferAllocator,
chunkSize: Int = CHUNK_SIZE_BYTES
) async throws -> AsyncHTTPClient.HTTPClientRequest.Body {
switch body {
case .noStream:
// No body to send
return .bytes(allocator.buffer(capacity: 0))

case .data(let data):
// Convert Data to ByteBuffer
if let data = data {
var buffer = allocator.buffer(capacity: data.count)
buffer.writeBytes(data)
return .bytes(buffer)
} else {
return .bytes(allocator.buffer(capacity: 0))
}

case .stream(let stream):
// Handle streaming request body
return try await convertStreamToRequestBody(stream: stream, allocator: allocator, chunkSize: chunkSize)
}
}

/// Convert AsyncHTTPClient response body to Smithy ByteStream
static func convertResponseBody(
from response: AsyncHTTPClient.HTTPClientResponse
) async -> ByteStream {
let bufferedStream = BufferedStream()

do {
var iterator = response.body.makeAsyncIterator()
while let buffer = try await iterator.next() {
// Convert ByteBuffer to Data and write to buffered stream
if let bytes = buffer.getBytes(at: buffer.readerIndex, length: buffer.readableBytes) {
let data = Data(bytes)
try bufferedStream.write(contentsOf: data)
}
}
bufferedStream.close()
} catch {
bufferedStream.closeWithError(error)
}

return .stream(bufferedStream)
}

/// Convert a Smithy Stream to AsyncHTTPClient request body
private static func convertStreamToRequestBody(
stream: Smithy.Stream,
allocator: ByteBufferAllocator,
chunkSize: Int = CHUNK_SIZE_BYTES
) async throws -> AsyncHTTPClient.HTTPClientRequest.Body {
if let streamLength = stream.length {
let asyncSequence = StreamToAsyncSequence(stream: stream, allocator: allocator, chunkSize: chunkSize)
return .stream(asyncSequence, length: .known(Int64(streamLength)))
} else {
do {
let data = try await stream.readToEndAsync()
if let data = data {
var buffer = allocator.buffer(capacity: data.count)
buffer.writeBytes(data)
return .bytes(buffer)
} else {
return .bytes(allocator.buffer(capacity: 0))
}
} catch {
throw NIOHTTPClientError.streamingError(error)
}
}
}
}

/// AsyncSequence adapter that converts a Smithy Stream to ByteBuffer sequence for AsyncHTTPClient
internal struct StreamToAsyncSequence: AsyncSequence, Sendable {
typealias Element = ByteBuffer

private let stream: Smithy.Stream
private let allocator: ByteBufferAllocator
private let chunkSize: Int

init(stream: Smithy.Stream, allocator: ByteBufferAllocator, chunkSize: Int = CHUNK_SIZE_BYTES) {
self.stream = stream
self.allocator = allocator
self.chunkSize = chunkSize
}

func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(stream: stream, allocator: allocator, chunkSize: chunkSize)
}

struct AsyncIterator: AsyncIteratorProtocol {
private let stream: Smithy.Stream
private let allocator: ByteBufferAllocator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do we get this allocator from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ByteBufferAllocator is a struct from NIOCore package utilized by async-http-client

private let chunkSize: Int
private var isFinished = false

init(stream: Smithy.Stream, allocator: ByteBufferAllocator, chunkSize: Int) {
self.stream = stream
self.allocator = allocator
self.chunkSize = chunkSize
}

mutating func next() async throws -> ByteBuffer? {
guard !isFinished else { return nil }

do {
// Read a chunk from the stream (using configurable chunk size)
let data = try await stream.readAsync(upToCount: chunkSize)

if let data = data, !data.isEmpty {
var buffer = allocator.buffer(capacity: data.count)
buffer.writeBytes(data)
return buffer
} else {
isFinished = true
stream.close()
return nil
}
} catch {
isFinished = true
stream.close()
throw NIOHTTPClientError.streamingError(error)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import AsyncHTTPClient
import Foundation
import NIO
import SmithyTestUtil
import XCTest
import class SmithyStreams.BufferedStream
import enum Smithy.LogAgentLevel
import protocol Smithy.LogAgent
import enum Smithy.ByteStream
@testable import ClientRuntime

class NIOHTTPClientStreamBridgeTests: XCTestCase {

func test_convertResponseBody_streamsAllDataCorrectly() async throws {

// The maximum size of input streaming data in the tests
let maxDataSize = 65_536 // 64 kb

// Create & fill a buffer with random bytes, for use in later test setup
// Random buffer is reused because creating random data is slow
// We are responsible for deallocating it
let randomBuffer = UnsafeMutablePointer<UInt8>.allocate(capacity: maxDataSize)
defer { randomBuffer.deallocate() }

for i in 0..<maxDataSize {
randomBuffer[i] = UInt8.random(in: UInt8.min...UInt8.max)
}

// Run this test repeatedly to help uncover any problems
let numberOfRuns = 100

for run in 1...numberOfRuns {
// Test with varying data sizes up to the maximum
let dataSize = min(run * 1000, maxDataSize)

// Create original test data
let originalData = Data(bytes: randomBuffer, count: dataSize)

// Create a mock AsyncHTTPClient response with the test data
let allocator = ByteBufferAllocator()
var buffer = allocator.buffer(capacity: originalData.count)
buffer.writeBytes(originalData)

let response = AsyncHTTPClient.HTTPClientResponse(
version: .http1_1,
status: .ok,
headers: [:],
body: .bytes(buffer)
)

let resultStream = await NIOHTTPClientStreamBridge.convertResponseBody(from: response)
let convertedData = try await readAllData(from: resultStream)

XCTAssertEqual(convertedData, originalData,
"Run \(run) failed (dataSize: \(dataSize))")
}
}

func test_convertRequestBody_withNoStream() async throws {
let allocator = ByteBufferAllocator()
let byteStream = ByteStream.noStream

let result = try await NIOHTTPClientStreamBridge.convertRequestBody(
from: byteStream,
allocator: allocator
)

// Convert the body to an async sequence and verify it's empty
var totalBytes = 0
for try await buffer in result {
totalBytes += buffer.readableBytes
}

XCTAssertEqual(totalBytes, 0)
}

func test_convertRequestBody_withData() async throws {
let allocator = ByteBufferAllocator()
let testData = "Hello, World!".data(using: .utf8)!
let byteStream = ByteStream.data(testData)

let result = try await NIOHTTPClientStreamBridge.convertRequestBody(
from: byteStream,
allocator: allocator
)

var collectedData = Data()
for try await buffer in result {
collectedData.append(Data(buffer: buffer))
}

XCTAssertEqual(collectedData, testData)
}

func test_convertRequestBody_withStream() async throws {
let allocator = ByteBufferAllocator()
let testData = Data(repeating: 42, count: 1000)
let bufferedStream = BufferedStream(data: testData, isClosed: true)
let byteStream = ByteStream.stream(bufferedStream)

let result = try await NIOHTTPClientStreamBridge.convertRequestBody(
from: byteStream,
allocator: allocator,
chunkSize: 100 // try a non-default chunk size
)

var collectedData = Data()
for try await buffer in result {
collectedData.append(Data(buffer: buffer))
}

XCTAssertEqual(collectedData, testData)
}

private func readAllData(from byteStream: ByteStream) async throws -> Data {
switch byteStream {
case .stream(let stream):
return try await stream.readToEndAsync() ?? Data()
case .data(let data):
return data ?? Data()
case .noStream:
return Data()
}
}
}

private extension Data {
init(buffer: ByteBuffer) {
if let bytes = buffer.getBytes(at: buffer.readerIndex, length: buffer.readableBytes) {
self.init(bytes)
} else {
self.init()
}
}
}
Loading