Skip to content

Commit 55032da

Browse files
authored
Merge pull request #552 from serverlessworkflow/fix-runner-configuration-monitoring
Fixed all runtime implementations to properly resolve the `RunnerConfiguration` to use
2 parents c6f02b4 + 6e862be commit 55032da

File tree

8 files changed

+165
-73
lines changed

8 files changed

+165
-73
lines changed

src/core/Synapse.Core/Resources/KubernetesRuntimeConfiguration.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
// See the License for the specific language governing permissions and
1212
// limitations under the License.
1313

14+
using k8s;
1415
using k8s.Models;
1516
using Neuroglia.Serialization.Yaml;
1617

@@ -108,7 +109,7 @@ public static V1PodTemplateSpec LoadPodTemplate()
108109
var templateFilePath = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runtime.Kubernetes.Pod);
109110
if (string.IsNullOrWhiteSpace(templateFilePath) || !File.Exists(templateFilePath)) return DefaultPodTemplate;
110111
var yaml = File.ReadAllText(templateFilePath);
111-
return YamlSerializer.Default.Deserialize<V1PodTemplateSpec>(yaml)!;
112+
return KubernetesYaml.Deserialize<V1PodTemplateSpec>(yaml);
112113
}
113114

114115
}

src/operator/Synapse.Operator/Program.cs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,6 @@
2828
.ConfigureServices((context, services) =>
2929
{
3030
services.Configure<OperatorOptions>(context.Configuration);
31-
services.AddSingleton(provider =>
32-
{
33-
var options = provider.GetRequiredService<IOptionsMonitor<OperatorOptions>>().CurrentValue;
34-
return Options.Create(options.Runner);
35-
});
3631
services.AddLogging(builder =>
3732
{
3833
builder.AddSimpleConsole(options =>
@@ -67,9 +62,13 @@
6762
services.AddScoped<WorkflowInstanceController>();
6863
services.AddScoped<IResourceController<WorkflowInstance>>(provider => provider.GetRequiredService<WorkflowInstanceController>());
6964

70-
services.AddHostedService<OperatorApplication>();
65+
services.AddSingleton<OperatorApplication>();
66+
services.AddHostedService(provider => provider.GetRequiredService<OperatorApplication>());
67+
services.AddSingleton<RunnerConfigurationMonitor>();
68+
services.AddHostedService(provider => provider.GetRequiredService<RunnerConfigurationMonitor>());
69+
services.AddSingleton<IOptionsMonitor<RunnerConfiguration>>(provider => provider.GetRequiredService<RunnerConfigurationMonitor>());
7170
});
7271

7372
using var app = builder.Build();
7473

75-
await app.RunAsync();
74+
await app.RunAsync();

src/operator/Synapse.Operator/Services/OperatorApplication.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,17 @@ internal class OperatorApplication(IServiceProvider serviceProvider)
2020
readonly IServiceScope _scope = serviceProvider.CreateScope();
2121
IServiceProvider ServiceProvider => this._scope.ServiceProvider;
2222

23-
OperatorController _operatorController = null!;
2423
WorkflowController _workflowController = null!;
2524
WorkflowInstanceController _workflowInstanceController = null!;
2625

26+
internal OperatorController OperatorController { get; private set; } = null!;
27+
2728
public async Task StartAsync(CancellationToken cancellationToken)
2829
{
29-
this._operatorController = this.ServiceProvider.GetRequiredService<OperatorController>();
30+
this.OperatorController = this.ServiceProvider.GetRequiredService<OperatorController>();
3031
this._workflowController = this.ServiceProvider.GetRequiredService<WorkflowController>();
3132
this._workflowInstanceController = this.ServiceProvider.GetRequiredService<WorkflowInstanceController>();
32-
await this._operatorController.StartAsync(cancellationToken).ConfigureAwait(false);
33+
await this.OperatorController.StartAsync(cancellationToken).ConfigureAwait(false);
3334
await Task.WhenAll(
3435
[
3536
this._workflowController.StartAsync(cancellationToken),
@@ -41,12 +42,12 @@ public async Task StopAsync(CancellationToken cancellationToken)
4142
{
4243
await Task.WhenAll(
4344
[
44-
this._operatorController.StopAsync(cancellationToken),
45+
this.OperatorController.StopAsync(cancellationToken),
4546
this._workflowController.StopAsync(cancellationToken),
4647
this._workflowInstanceController.StopAsync(cancellationToken)
4748
]).ConfigureAwait(false);
4849
}
4950

5051
void IDisposable.Dispose() => this._scope.Dispose();
5152

52-
}
53+
}

src/operator/Synapse.Operator/Services/OperatorController.cs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ namespace Synapse.Operator.Services;
1818
/// </summary>
1919
/// <param name="repository">The service used to manage <see cref="IResource"/>s</param>
2020
/// <param name="options">The current <see cref="OperatorOptions"/></param>
21-
/// <param name="runnerOptions">The current <see cref="RunnerConfiguration"/></param>
22-
public class OperatorController(IResourceRepository repository, IOptionsMonitor<OperatorOptions> options, IOptionsMonitor<RunnerConfiguration> runnerOptions)
21+
public class OperatorController(IResourceRepository repository, IOptionsMonitor<OperatorOptions> options)
2322
: IOperatorController
2423
{
2524

@@ -33,11 +32,6 @@ public class OperatorController(IResourceRepository repository, IOptionsMonitor<
3332
/// </summary>
3433
protected OperatorOptions Options => options.CurrentValue;
3534

36-
/// <summary>
37-
/// Gets the current <see cref="RunnerConfiguration"/>
38-
/// </summary>
39-
protected RunnerConfiguration RunnerOptions => runnerOptions.CurrentValue;
40-
4135
/// <inheritdoc/>
4236
public IResourceMonitor<Resources.Operator> Operator { get; protected set; } = null!;
4337

@@ -90,9 +84,6 @@ protected virtual async Task SetOperatorStatusPhaseAsync(string phase, Cancellat
9084
protected virtual void OnOperatorSpecChanged()
9185
{
9286
this.Options.Runner = this.Operator.Resource.Spec.Runner;
93-
this.RunnerOptions.Api = this.Options.Runner.Api;
94-
this.RunnerOptions.Runtime = this.Options.Runner.Runtime;
95-
this.RunnerOptions.Certificates = this.Options.Runner.Certificates;
9687
}
9788

9889
/// <inheritdoc/>
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright © 2024-Present The Synapse Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"),
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
namespace Synapse.Operator.Services;
15+
16+
/// <summary>
17+
/// Represents the service used to monitor the operator's <see cref="RunnerConfiguration"/>.
18+
/// </summary>
19+
/// <param name="application">The service used to monitor the current operator resource</param>
20+
internal sealed class RunnerConfigurationMonitor(OperatorApplication application)
21+
: IHostedService, IOptionsMonitor<RunnerConfiguration>
22+
{
23+
24+
ConcurrentHashSet<RunnerConfigurationChangeTokenSource> _changeTokenSources = [];
25+
IDisposable? _subscription;
26+
IResourceMonitor<Resources.Operator> _operator = null!;
27+
28+
/// <inheritdoc/>
29+
public RunnerConfiguration CurrentValue => _operator.Resource.Spec.Runner;
30+
31+
/// <inheritdoc/>
32+
public Task StartAsync(CancellationToken cancellationToken)
33+
{
34+
_operator = application.OperatorController.Operator;
35+
_operator.Where(e => e.Type == ResourceWatchEventType.Updated).Select(e => e.Resource.Spec.Runner).Distinct().Subscribe(OnConfigurationChanged);
36+
return Task.CompletedTask;
37+
}
38+
39+
/// <inheritdoc/>
40+
public Task StopAsync(CancellationToken cancellationToken)
41+
{
42+
_subscription?.Dispose();
43+
return Task.CompletedTask;
44+
}
45+
46+
/// <inheritdoc/>
47+
public RunnerConfiguration Get(string? name) => CurrentValue;
48+
49+
void OnConfigurationChanged(RunnerConfiguration configuration)
50+
{
51+
foreach (var changeTokenSource in _changeTokenSources) changeTokenSource.Invoke(configuration, null);
52+
}
53+
54+
IDisposable? IOptionsMonitor<RunnerConfiguration>.OnChange(Action<RunnerConfiguration, string?> listener)
55+
{
56+
var changeTokenSource = new RunnerConfigurationChangeTokenSource(listener);
57+
changeTokenSource.OnDisposed += OnChangeTokenSourceDisposed;
58+
_changeTokenSources.Add(changeTokenSource);
59+
return changeTokenSource;
60+
}
61+
62+
void OnChangeTokenSourceDisposed(object? sender, EventArgs e)
63+
{
64+
if (sender is not RunnerConfigurationChangeTokenSource changeTokenSource) return;
65+
changeTokenSource.OnDisposed -= OnChangeTokenSourceDisposed;
66+
_changeTokenSources.Remove(changeTokenSource);
67+
}
68+
69+
class RunnerConfigurationChangeTokenSource(Action<RunnerConfiguration, string?> listener)
70+
: IDisposable
71+
{
72+
73+
public event EventHandler? OnDisposed;
74+
75+
public void Invoke(RunnerConfiguration configuration, string? name) => listener(configuration, name);
76+
77+
public void Dispose()
78+
{
79+
OnDisposed?.Invoke(this, EventArgs.Empty);
80+
GC.SuppressFinalize(this);
81+
}
82+
83+
}
84+
85+
}

src/runtime/Synapse.Runtime.Docker/Services/DockerRuntime.cs

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ namespace Synapse.Runtime.Docker.Services;
2525
/// <param name="serviceProvider">The current <see cref="IServiceProvider"/></param>
2626
/// <param name="loggerFactory">The service used to create <see cref="ILogger"/>s</param>
2727
/// <param name="environment">The current <see cref="IHostEnvironment"/></param>
28-
/// <param name="runner">The service used to access the current <see cref="RunnerConfiguration"/></param>
29-
public class DockerRuntime(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IHostEnvironment environment, IOptions<RunnerConfiguration> runner)
28+
/// <param name="runnerConfigurationMonitor">The service used to access the current <see cref="Resources.RunnerConfiguration"/></param>
29+
public class DockerRuntime(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IHostEnvironment environment, IOptionsMonitor<RunnerConfiguration> runnerConfigurationMonitor)
3030
: WorkflowRuntimeBase(loggerFactory)
3131
{
3232

@@ -41,9 +41,14 @@ public class DockerRuntime(IServiceProvider serviceProvider, ILoggerFactory logg
4141
protected IHostEnvironment Environment { get; } = environment;
4242

4343
/// <summary>
44-
/// Gets the current <see cref="RunnerConfiguration"/>
44+
/// Gets the service used to access the current <see cref="Resources.RunnerConfiguration"/>
4545
/// </summary>
46-
protected RunnerConfiguration Runner => runner.Value;
46+
protected IOptionsMonitor<RunnerConfiguration> RunnerConfigurationMonitor { get; } = runnerConfigurationMonitor;
47+
48+
/// <summary>
49+
/// Gets the current <see cref="Resources.RunnerConfiguration"/>
50+
/// </summary>
51+
protected RunnerConfiguration RunnerConfiguration => RunnerConfigurationMonitor.CurrentValue;
4752

4853
/// <summary>
4954
/// Gets the service used to interact with the Docker API
@@ -62,9 +67,9 @@ public class DockerRuntime(IServiceProvider serviceProvider, ILoggerFactory logg
6267
/// <returns>A new awaitable <see cref="Task"/></returns>
6368
protected virtual Task InitializeAsync(CancellationToken cancellationToken = default)
6469
{
65-
if (this.Runner.Runtime.Docker == null) throw new NullReferenceException($"Failed to initialize the Docker Runtime because the operator is not configured to use Docker as a runtime");
66-
var dockerConfiguration = new DockerClientConfiguration(this.Runner.Runtime.Docker.Api.Endpoint);
67-
this.Docker = dockerConfiguration.CreateClient(string.IsNullOrWhiteSpace(this.Runner.Runtime.Docker.Api.Version) ? null : System.Version.Parse(this.Runner.Runtime.Docker.Api.Version!));
70+
if (this.RunnerConfiguration.Runtime.Docker == null) throw new NullReferenceException($"Failed to initialize the Docker Runtime because the operator is not configured to use Docker as a runtime");
71+
var dockerConfiguration = new DockerClientConfiguration(this.RunnerConfiguration.Runtime.Docker.Api.Endpoint);
72+
this.Docker = dockerConfiguration.CreateClient(string.IsNullOrWhiteSpace(this.RunnerConfiguration.Runtime.Docker.Api.Version) ? null : System.Version.Parse(this.RunnerConfiguration.Runtime.Docker.Api.Version!));
6873
return Task.CompletedTask;
6974
}
7075

@@ -78,7 +83,7 @@ public override async Task<IWorkflowProcess> CreateProcessAsync(Workflow workflo
7883
{
7984
this.Logger.LogDebug("Creating a new Docker container for workflow instance '{workflowInstance}'...", workflowInstance.GetQualifiedName());
8085
if (this.Docker == null) await this.InitializeAsync(cancellationToken).ConfigureAwait(false);
81-
var container = this.Runner.Runtime.Docker!.ContainerTemplate.Clone()!;
86+
var container = this.RunnerConfiguration.Runtime.Docker!.ContainerTemplate.Clone()!;
8287
try
8388
{
8489
await this.Docker!.Images.InspectImageAsync(container.Image, cancellationToken).ConfigureAwait(false);
@@ -93,24 +98,24 @@ public override async Task<IWorkflowProcess> CreateProcessAsync(Workflow workflo
9398
}
9499
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.Namespace, workflowInstance.GetNamespace()!);
95100
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.Name, $"{workflowInstance.GetName()}-{Guid.NewGuid().ToString("N")[..12].ToLowerInvariant()}");
96-
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Api.Uri, this.Runner.Api.Uri.OriginalString);
97-
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.ContainerPlatform, this.Runner.ContainerPlatform);
98-
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.LifecycleEvents, (this.Runner.PublishLifecycleEvents ?? true).ToString());
99-
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Secrets.Directory, this.Runner.Runtime.Docker.Secrets.MountPath);
101+
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Api.Uri, this.RunnerConfiguration.Api.Uri.OriginalString);
102+
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.ContainerPlatform, this.RunnerConfiguration.ContainerPlatform);
103+
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.LifecycleEvents, (this.RunnerConfiguration.PublishLifecycleEvents ?? true).ToString());
104+
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Secrets.Directory, this.RunnerConfiguration.Runtime.Docker.Secrets.MountPath);
100105
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.ServiceAccount.Name, serviceAccount.GetQualifiedName());
101106
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.ServiceAccount.Key, serviceAccount.Spec.Key);
102107
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Workflow.Instance, workflowInstance.GetQualifiedName());
103108
container.SetEnvironmentVariable("DOCKER_HOST", "unix:///var/run/docker.sock");
104109
container.User = "root";
105-
if (this.Runner.Certificates?.Validate == false) container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.SkipCertificateValidation, "true");
106-
var hostConfig = this.Runner.Runtime.Docker.HostConfig?.Clone()! ?? new();
107-
if (!Directory.Exists(this.Runner.Runtime.Docker.Secrets.Directory)) Directory.CreateDirectory(this.Runner.Runtime.Docker.Secrets.Directory);
110+
if (this.RunnerConfiguration.Certificates?.Validate == false) container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.SkipCertificateValidation, "true");
111+
var hostConfig = this.RunnerConfiguration.Runtime.Docker.HostConfig?.Clone()! ?? new();
112+
if (!Directory.Exists(this.RunnerConfiguration.Runtime.Docker.Secrets.Directory)) Directory.CreateDirectory(this.RunnerConfiguration.Runtime.Docker.Secrets.Directory);
108113
hostConfig.Mounts ??= [];
109114
hostConfig.Mounts.Insert(0, new()
110115
{
111116
Type = "bind",
112-
Source = this.Runner.Runtime.Docker.Secrets.Directory,
113-
Target = this.Runner.Runtime.Docker.Secrets.MountPath
117+
Source = this.RunnerConfiguration.Runtime.Docker.Secrets.Directory,
118+
Target = this.RunnerConfiguration.Runtime.Docker.Secrets.MountPath
114119
});
115120
hostConfig.Mounts.Insert(1, new()
116121
{
@@ -128,7 +133,7 @@ public override async Task<IWorkflowProcess> CreateProcessAsync(Workflow workflo
128133
HostConfig = hostConfig
129134
};
130135
var result = await this.Docker!.Containers.CreateContainerAsync(parameters, cancellationToken).ConfigureAwait(false);
131-
if (this.Environment.RunsInDocker()) await this.Docker.Networks.ConnectNetworkAsync(this.Runner.Runtime.Docker.Network, new NetworkConnectParameters() { Container = result.ID }, cancellationToken);
136+
if (this.Environment.RunsInDocker()) await this.Docker.Networks.ConnectNetworkAsync(this.RunnerConfiguration.Runtime.Docker.Network, new NetworkConnectParameters() { Container = result.ID }, cancellationToken);
132137
if (result.Warnings.Count > 0) this.Logger.LogWarning("Warnings have been raised during container creation: {warnings}", string.Join(System.Environment.NewLine, result.Warnings));
133138
var process = ActivatorUtilities.CreateInstance<DockerWorkflowProcess>(this.ServiceProvider, this.Docker!, result.ID);
134139
this.Processes.TryAdd(process.Id, process);

0 commit comments

Comments
 (0)