From c3ac85bdffb446261974725f6c1e80b73a6c39f8 Mon Sep 17 00:00:00 2001 From: Jake Petroules Date: Mon, 18 Aug 2025 11:49:04 -0700 Subject: [PATCH] rdar://158256632 (FreeBSD: CommandPlugin Tests Hang waiting on FileDescriptor) Speculative fix for the hang... --- .../Plugins/DefaultPluginScriptRunner.swift | 306 +++++++++++------- 1 file changed, 196 insertions(+), 110 deletions(-) diff --git a/Sources/SPMBuildCore/Plugins/DefaultPluginScriptRunner.swift b/Sources/SPMBuildCore/Plugins/DefaultPluginScriptRunner.swift index f8a98a664e5..cf6f132bfaf 100644 --- a/Sources/SPMBuildCore/Plugins/DefaultPluginScriptRunner.swift +++ b/Sources/SPMBuildCore/Plugins/DefaultPluginScriptRunner.swift @@ -419,7 +419,7 @@ public struct DefaultPluginScriptRunner: PluginScriptRunner, Cancellable { return sdkRootPath } - + /// Private function that invokes a compiled plugin executable and communicates with it until it finishes. fileprivate func invoke( compiledExec: Basics.AbsolutePath, @@ -433,30 +433,47 @@ public struct DefaultPluginScriptRunner: PluginScriptRunner, Cancellable { delegate: PluginScriptRunnerDelegate, completion: @escaping (Result) -> Void ) { -#if canImport(Darwin) && !os(macOS) - callbackQueue.async { - completion(.failure(DefaultPluginScriptRunnerError.pluginUnavailable(reason: "subprocess invocations are unavailable on this platform"))) + Task { + let result: Result + do { + result = try await .success(invoke(compiledExec: compiledExec, workingDirectory: workingDirectory, writableDirectories: writableDirectories, readOnlyDirectories: readOnlyDirectories, allowNetworkConnections: allowNetworkConnections, initialMessage: initialMessage, observabilityScope: observabilityScope, callbackQueue: callbackQueue, delegate: delegate)) + } catch { + result = .failure(error) + } + callbackQueue.async { + completion(result) + } } + } + + /// Private function that invokes a compiled plugin executable and communicates with it until it finishes. + fileprivate func invoke( + compiledExec: Basics.AbsolutePath, + workingDirectory: Basics.AbsolutePath, + writableDirectories: [Basics.AbsolutePath], + readOnlyDirectories: [Basics.AbsolutePath], + allowNetworkConnections: [SandboxNetworkPermission], + initialMessage: Data, + observabilityScope: ObservabilityScope, + callbackQueue: DispatchQueue, + delegate: PluginScriptRunnerDelegate + ) async throws -> Int32 { +#if canImport(Darwin) && !os(macOS) + throw DefaultPluginScriptRunnerError.pluginUnavailable(reason: "subprocess invocations are unavailable on this platform") #else // Construct the command line. Currently we just invoke the executable built from the plugin without any parameters. var command = [compiledExec.pathString] // Optionally wrap the command in a sandbox, which places some limits on what it can do. In particular, it blocks network access and restricts the paths to which the plugin can make file system changes. It does allow writing to temporary directories. if self.enableSandbox { - do { - command = try Sandbox.apply( - command: command, - fileSystem: self.fileSystem, - strictness: .writableTemporaryDirectory, - writableDirectories: writableDirectories + [self.cacheDir], - readOnlyDirectories: readOnlyDirectories, - allowNetworkConnections: allowNetworkConnections - ) - } catch { - return callbackQueue.async { - completion(.failure(error)) - } - } + command = try Sandbox.apply( + command: command, + fileSystem: self.fileSystem, + strictness: .writableTemporaryDirectory, + writableDirectories: writableDirectories + [self.cacheDir], + readOnlyDirectories: readOnlyDirectories, + allowNetworkConnections: allowNetworkConnections + ) } // Create and configure a Process. We set the working directory to the cache directory, so that relative paths end up there. @@ -484,121 +501,88 @@ public struct DefaultPluginScriptRunner: PluginScriptRunner, Cancellable { // Set up a pipe for sending structured messages to the plugin on its stdin. let stdinPipe = Pipe() let outputHandle = stdinPipe.fileHandleForWriting + defer { + // Close the output handle through which we talked to the plugin. + try? outputHandle.close() + } let outputQueue = DispatchQueue(label: "plugin-send-queue") process.standardInput = stdinPipe // Set up a pipe for receiving messages from the plugin on its stdout. let stdoutPipe = Pipe() - let stdoutLock = NSLock() - stdoutPipe.fileHandleForReading.readabilityHandler = { fileHandle in - // Receive the next message and pass it on to the delegate. - stdoutLock.withLock { - do { - while let message = try fileHandle.readPluginMessage() { - // FIXME: We should handle errors here. - callbackQueue.async { - do { - try delegate.handleMessage(data: message, responder: { data in - outputQueue.async { - do { - try outputHandle.writePluginMessage(data) - } - catch { - print("error while trying to send message to plugin: \(error.interpolationDescription)") - } - } - }) - } - catch DecodingError.keyNotFound(let key, _) where key.stringValue == "version" { - print("message from plugin did not contain a 'version' key, likely an incompatible plugin library is being loaded by the plugin") - } - catch { - print("error while trying to handle message from plugin: \(error.interpolationDescription)") + process.standardOutput = stdoutPipe + async let stdout: () = { + while let message = try await stdoutPipe.fileHandleForReading.readPluginMessage() { + // FIXME: We should handle errors here. + callbackQueue.async { + do { + try delegate.handleMessage(data: message, responder: { data in + outputQueue.async { + do { + try outputHandle.writePluginMessage(data) + } + catch { + print("error while trying to send message to plugin: \(error.interpolationDescription)") + } } - } + }) + } + catch DecodingError.keyNotFound(let key, _) where key.stringValue == "version" { + print("message from plugin did not contain a 'version' key, likely an incompatible plugin library is being loaded by the plugin") + } + catch { + print("error while trying to handle message from plugin: \(error.interpolationDescription)") } - } - catch { - print("error while trying to read message from plugin: \(error.interpolationDescription)") } } - } - process.standardOutput = stdoutPipe + }() // Set up a pipe for receiving free-form text output from the plugin on its stderr. let stderrPipe = Pipe() - let stderrLock = NSLock() - var stderrData = Data() - let stderrHandler = { (data: Data) in - // Pass on any available data to the delegate. - if data.isEmpty { return } - stderrData.append(contentsOf: data) - callbackQueue.async { delegate.handleOutput(data: data) } - } - stderrPipe.fileHandleForReading.readabilityHandler = { fileHandle in - // Read and pass on any available free-form text output from the plugin. - // We need the lock since we could run concurrently with the termination handler. - stderrLock.withLock { stderrHandler(fileHandle.availableData) } - } process.standardError = stderrPipe - + async let stderrData = { + var accumulatedData = Data() + for try await chunk in stderrPipe.fileHandleForReading._dataStream() { + callbackQueue.async { delegate.handleOutput(data: Data(chunk)) } + accumulatedData.append(contentsOf: chunk) + } + return accumulatedData + }() + // Add it to the list of currently running plugin processes, so it can be cancelled if the host is interrupted. guard let cancellationKey = self.cancellator.register(process) else { - return callbackQueue.async { - completion(.failure(CancellationError())) - } + throw CancellationError() } - // Set up a handler to deal with the exit of the plugin process. - process.terminationHandler = { process in + defer { // Remove the process from the list of currently running ones. self.cancellator.deregister(cancellationKey) + } - // Close the output handle through which we talked to the plugin. - try? outputHandle.close() - - // Read and pass on any remaining free-form text output from the plugin. - // We need the lock since we could run concurrently with the readability handler. - stderrLock.withLock { - try? stderrPipe.fileHandleForReading.readToEnd().map{ stderrHandler($0) } - } - - // Read and pass on any remaining messages from the plugin. - let handle = stdoutPipe.fileHandleForReading - if let handler = handle.readabilityHandler { - handler(handle) - } - - // Call the completion block with a result that depends on how the process ended. - callbackQueue.async { - completion(Result { - // We throw an error if the plugin ended with a signal. - if process.terminationReason == .uncaughtSignal { - throw DefaultPluginScriptRunnerError.invocationEndedBySignal( - signal: process.terminationStatus, - command: command, - output: String(decoding: stderrData, as: UTF8.self)) - } - // Otherwise return the termination satatus. - return process.terminationStatus - }) - } + /// Send the initial message to the plugin. + outputQueue.async { + try? outputHandle.writePluginMessage(initialMessage) } - + // Start the plugin process. do { - try process.run() - } - catch { - callbackQueue.async { - completion(.failure(DefaultPluginScriptRunnerError.invocationFailed(error: error, command: command))) - } + try await process.run() + } catch { + throw DefaultPluginScriptRunnerError.invocationFailed(error: error, command: command) } - /// Send the initial message to the plugin. - outputQueue.async { - try? outputHandle.writePluginMessage(initialMessage) + _ = try await stdout + let stderr = try await stderrData + + // We throw an error if the plugin ended with a signal. + if process.terminationReason == .uncaughtSignal { + throw DefaultPluginScriptRunnerError.invocationEndedBySignal( + signal: process.terminationStatus, + command: command, + output: String(decoding: stderr, as: UTF8.self)) } + // Otherwise return the termination satatus. + return process.terminationStatus #endif } @@ -667,9 +651,12 @@ fileprivate extension FileHandle { try self.write(contentsOf: message) } - func readPluginMessage() throws -> Data? { + func readPluginMessage() async throws -> Data? { // Read the header (a 64-bit length field in little endian byte order). - guard let header = try self.read(upToCount: 8) else { return nil } + let header = try await Data(self.readChunk(upToLength: 8)) + if header.isEmpty { + return nil + } guard header.count == 8 else { throw PluginMessageError.truncatedHeader } @@ -679,7 +666,8 @@ fileprivate extension FileHandle { } // Read and return the message. - guard let message = try self.read(upToCount: Int(length)), message.count == length else { + let message = try await Data(self.readChunk(upToLength: Int(length))) + guard message.count == length else { throw PluginMessageError.truncatedPayload } return message @@ -691,3 +679,101 @@ fileprivate extension FileHandle { case truncatedPayload } } + +extension Process { + /// Runs the process and suspends until completion. + /// + /// - parameter interruptible: Whether the process should respond to task cancellation. If `true`, task cancellation will cause `SIGTERM` to be sent to the process if it starts running by the time the task is cancelled. If `false`, the process will always run to completion regardless of the task's cancellation status. + /// + /// - note: This method sets the process's termination handler, if one is set. + /// - throws: ``CancellationError`` if the task was cancelled. Applies only when `interruptible` is true. + /// - throws: Rethrows the error from ``Process/run`` if the task could not be launched. + public func run(interruptible: Bool = true) async throws { + @Sendable func cancelIfRunning() { + // Only send the termination signal if the process is already running. + // We might have created the termination monitoring continuation at this + // point but not yet completed run(), and terminate() will raise an + // Objective-C exception if the process has not yet started. + if interruptible && isRunning { + terminate() + } + } + try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation { continuation in + terminationHandler = { _ in + continuation.resume() + } + + do { + // Check for cancellation -- if we entered the cancellation + // handler before the process actually started, and therefore + // didn't call terminate(), don't actually start it. + if interruptible { + try Task.checkCancellation() + } + + try run() + } catch { + terminationHandler = nil + + // If `run` throws, the terminationHandler won't be called, + // so resume the continuation with this error. + continuation.resume(throwing: error) + } + + if Task.isCancelled { + cancelIfRunning() + } + } + + // Check for cancellation -- if terminate() was called, the termination + // handler will have resumed the previous continuation without throwing + // an error; this distinguishes cooperative cancellation from successful + // execution of the task to completion (even if that task otherwise exited + // with a non-zero exit code or terminated due to an uncaught signal). + if interruptible { + try Task.checkCancellation() + } + } onCancel: { + cancelIfRunning() + } + } +} + +extension FileHandle { + public func _dataStream() -> AsyncThrowingStream { + AsyncThrowingStream { + while !Task.isCancelled { + let chunk = try await self.readChunk(upToLength: 4096) + if chunk.isEmpty { + return nil + } + return chunk + } + throw CancellationError() + } + } +} + +extension FileHandle { + public func readChunk(upToLength maxLength: Int) async throws -> DispatchData { + return try await withCheckedThrowingContinuation { continuation in + #if os(Windows) + let fd = _get_osfhandle(fileDescriptor.rawValue) + #else + let fd = self.fileDescriptor + #endif + DispatchIO.read( + fromFileDescriptor: fd, + maxLength: maxLength, + runningHandlerOn: .global() + ) { data, error in + if error != 0 { + continuation.resume(throwing: POSIXError(POSIXError.Code(rawValue: error)!)) + return + } + continuation.resume(returning: data) + } + } + } +}