Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -405,23 +405,15 @@ public void onFailure(Exception e) {
}
}

// Package-access for testing.
@FixForMultiProject(description = "Refactor to supply the project ID associated with the alias and settings, or eliminate this method.")
void updateRemoteCluster(String clusterAlias, Settings newSettings, ActionListener<RemoteClusterConnectionStatus> listener) {
final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build();
@FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.")
final var config = RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, clusterAlias, mergedSettings);
updateRemoteCluster(config, false, listener);
}

/**
* Adds, rebuilds, or closes and removes the connection for the specified remote cluster.
*
* @param config The linked project configuration.
* @param forceRebuild Forces an existing connection to be closed and reconnected even if the connection strategy does not require it.
* @param listener The listener invoked once the configured cluster has been connected.
*/
private synchronized void updateRemoteCluster(
// Package-access for testing.
synchronized void updateRemoteCluster(
LinkedProjectConfig config,
boolean forceRebuild,
ActionListener<RemoteClusterConnectionStatus> listener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.VersionInformation;
import org.elasticsearch.cluster.project.DefaultProjectResolver;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.AbstractScopedSettings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.Node;
Expand Down Expand Up @@ -67,6 +70,7 @@
public class RemoteClusterServiceTests extends ESTestCase {

private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
private final ProjectResolver projectResolver = DefaultProjectResolver.INSTANCE;
private LinkedProjectConfigService linkedProjectConfigService = null;

@Override
Expand All @@ -84,12 +88,8 @@ private RemoteClusterService createRemoteClusterService(
ClusterSettings clusterSettings,
MockTransportService transportService
) {
linkedProjectConfigService = new ClusterSettingsLinkedProjectConfigService(
settings,
clusterSettings,
DefaultProjectResolver.INSTANCE
);
return new RemoteClusterService(settings, transportService, DefaultProjectResolver.INSTANCE);
linkedProjectConfigService = new ClusterSettingsLinkedProjectConfigService(settings, clusterSettings, projectResolver);
return new RemoteClusterService(settings, transportService, projectResolver);
}

private MockTransportService startTransport(
Expand Down Expand Up @@ -776,19 +776,11 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException {
assertFalse(hasRegisteredClusters(service));

final CountDownLatch firstLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_1",
createSettings("cluster_1", Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString())),
connectionListener(firstLatch)
);
updateRemoteCluster(service, "cluster_1", settings, List.of(c1N1Node, c1N2Node), connectionListener(firstLatch));
firstLatch.await();

final CountDownLatch secondLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_2",
createSettings("cluster_2", Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString())),
connectionListener(secondLatch)
);
updateRemoteCluster(service, "cluster_2", settings, List.of(c2N1Node, c2N2Node), connectionListener(secondLatch));
secondLatch.await();

assertTrue(hasRegisteredClusters(service));
Expand Down Expand Up @@ -866,19 +858,11 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException {
assertFalse(hasRegisteredClusters(service));

final CountDownLatch firstLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_1",
createSettings("cluster_1", Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString())),
connectionListener(firstLatch)
);
updateRemoteCluster(service, "cluster_1", settings, List.of(c1N1Node, c1N2Node), connectionListener(firstLatch));
firstLatch.await();

final CountDownLatch secondLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_2",
createSettings("cluster_2", Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString())),
connectionListener(secondLatch)
);
updateRemoteCluster(service, "cluster_2", settings, List.of(c2N1Node, c2N2Node), connectionListener(secondLatch));
secondLatch.await();

assertTrue(hasRegisteredClusters(service));
Expand Down Expand Up @@ -961,20 +945,11 @@ public void testCollectNodes() throws InterruptedException, IOException {
assertFalse(hasRegisteredClusters(service));

final CountDownLatch firstLatch = new CountDownLatch(1);

service.updateRemoteCluster(
"cluster_1",
createSettings("cluster_1", Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString())),
connectionListener(firstLatch)
);
updateRemoteCluster(service, "cluster_1", settings, List.of(c1N1Node, c1N2Node), connectionListener(firstLatch));
firstLatch.await();

final CountDownLatch secondLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_2",
createSettings("cluster_2", Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString())),
connectionListener(secondLatch)
);
updateRemoteCluster(service, "cluster_2", settings, List.of(c2N1Node, c2N2Node), connectionListener(secondLatch));
secondLatch.await();
CountDownLatch latch = new CountDownLatch(1);
service.collectNodes(
Expand Down Expand Up @@ -1108,8 +1083,9 @@ public void testCollectNodesConcurrentWithSettingsChanges() throws IOException {
final var seedList = List.of(c1N1Node.getAddress().toString());
transportService.start();
transportService.acceptIncomingRequests();
final var initialSettings = createSettings("cluster_1", seedList);

try (RemoteClusterService service = createRemoteClusterService(createSettings("cluster_1", seedList), transportService)) {
try (RemoteClusterService service = createRemoteClusterService(initialSettings, transportService)) {
initializeRemoteClusters(service);
assertTrue(hasRegisteredClusters(service));
final var numTasks = between(3, 5);
Expand All @@ -1122,7 +1098,7 @@ public void testCollectNodesConcurrentWithSettingsChanges() throws IOException {
while (taskLatch.getCount() != 0) {
final var future = new PlainActionFuture<RemoteClusterService.RemoteClusterConnectionStatus>();
final var settings = createSettings("cluster_1", isLinked ? Collections.emptyList() : seedList);
service.updateRemoteCluster("cluster_1", settings, future);
updateRemoteCluster(service, "cluster_1", initialSettings, settings, future);
safeGet(future);
isLinked = isLinked == false;
}
Expand Down Expand Up @@ -1296,7 +1272,8 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception {

final Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString()));
try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) {
final var initialSettings = builder.build();
try (RemoteClusterService service = createRemoteClusterService(initialSettings, transportService)) {
assertFalse(hasRegisteredClusters(service));
initializeRemoteClusters(service);
assertTrue(hasRegisteredClusters(service));
Expand All @@ -1308,11 +1285,7 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception {
assertFalse(firstRemoteClusterConnection.isClosed());

final CountDownLatch firstLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_test",
createSettings("cluster_test", Collections.singletonList(node0.getAddress().toString())),
connectionListener(firstLatch)
);
updateRemoteCluster(service, "cluster_test", initialSettings, List.of(node0), connectionListener(firstLatch));
firstLatch.await();

assertTrue(hasRegisteredClusters(service));
Expand All @@ -1322,15 +1295,15 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception {
assertFalse(firstRemoteClusterConnection.isClosed());
assertSame(firstRemoteClusterConnection, service.getRemoteClusterConnection("cluster_test"));

final List<String> newSeeds = new ArrayList<>();
newSeeds.add(node1.getAddress().toString());
final List<DiscoveryNode> newSeeds = new ArrayList<>();
newSeeds.add(node1);
if (randomBoolean()) {
newSeeds.add(node0.getAddress().toString());
newSeeds.add(node0);
Collections.shuffle(newSeeds, random());
}

final CountDownLatch secondLatch = new CountDownLatch(1);
service.updateRemoteCluster("cluster_test", createSettings("cluster_test", newSeeds), connectionListener(secondLatch));
updateRemoteCluster(service, "cluster_test", initialSettings, newSeeds, connectionListener(secondLatch));
secondLatch.await();

assertTrue(hasRegisteredClusters(service));
Expand Down Expand Up @@ -1575,7 +1548,8 @@ public void testUseDifferentTransportProfileForCredentialsProtectedRemoteCluster
} else {
firstRemoteClusterSettingsBuilder.put("cluster.remote.cluster_1.seeds", c1Node.getAddress().toString());
}
service.updateRemoteCluster("cluster_1", firstRemoteClusterSettingsBuilder.build(), connectionListener(firstLatch));
final var updatedSettings1 = firstRemoteClusterSettingsBuilder.build();
updateRemoteCluster(service, "cluster_1", settings, updatedSettings1, connectionListener(firstLatch));
firstLatch.await();

final CountDownLatch secondLatch = new CountDownLatch(1);
Expand All @@ -1587,7 +1561,8 @@ public void testUseDifferentTransportProfileForCredentialsProtectedRemoteCluster
} else {
secondRemoteClusterSettingsBuilder.put("cluster.remote.cluster_2.seeds", c2Node.getAddress().toString());
}
service.updateRemoteCluster("cluster_2", secondRemoteClusterSettingsBuilder.build(), connectionListener(secondLatch));
final var updatedSettings2 = secondRemoteClusterSettingsBuilder.build();
updateRemoteCluster(service, "cluster_2", settings, updatedSettings2, connectionListener(secondLatch));
secondLatch.await();

assertTrue(hasRegisteredClusters(service));
Expand Down Expand Up @@ -1642,7 +1617,7 @@ public void testUpdateRemoteClusterCredentialsRebuildsConnectionWithCorrectProfi

final Settings clusterSettings = buildRemoteClusterSettings("cluster_1", discoNode.getAddress().toString());
final CountDownLatch latch = new CountDownLatch(1);
service.updateRemoteCluster("cluster_1", clusterSettings, connectionListener(latch));
updateRemoteCluster(service, "cluster_1", Settings.EMPTY, clusterSettings, connectionListener(latch));
latch.await();

assertConnectionHasProfile(service.getRemoteClusterConnection("cluster_1"), "default");
Expand Down Expand Up @@ -1731,12 +1706,12 @@ public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespite

final Settings cluster1Settings = buildRemoteClusterSettings(goodCluster, c1DiscoNode.getAddress().toString());
final var latch = new CountDownLatch(1);
service.updateRemoteCluster(goodCluster, cluster1Settings, connectionListener(latch));
updateRemoteCluster(service, goodCluster, Settings.EMPTY, cluster1Settings, connectionListener(latch));
latch.await();

final Settings cluster2Settings = buildRemoteClusterSettings(badCluster, c2DiscoNode.getAddress().toString());
final PlainActionFuture<RemoteClusterService.RemoteClusterConnectionStatus> future = new PlainActionFuture<>();
service.updateRemoteCluster(badCluster, cluster2Settings, future);
updateRemoteCluster(service, badCluster, Settings.EMPTY, cluster2Settings, future);
final var ex = expectThrows(Exception.class, () -> future.actionGet(10, TimeUnit.SECONDS));
assertThat(ex.getMessage(), containsString("bad cluster"));

Expand Down Expand Up @@ -1853,6 +1828,30 @@ public void testLogsConnectionResult() throws IOException {
}
}

private void updateRemoteCluster(
RemoteClusterService service,
String alias,
Settings settings,
List<DiscoveryNode> seedNodes,
ActionListener<RemoteClusterService.RemoteClusterConnectionStatus> listener
) {
final var newSettings = createSettings(alias, seedNodes.stream().map(n -> n.getAddress().toString()).toList());
updateRemoteCluster(service, alias, settings, newSettings, listener);
}

private void updateRemoteCluster(
RemoteClusterService service,
String alias,
Settings settings,
Settings newSettings,
ActionListener<RemoteClusterService.RemoteClusterConnectionStatus> listener
) {
final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build();
@FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.")
final var config = RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, alias, mergedSettings);
service.updateRemoteCluster(config, false, listener);
}

private void initializeRemoteClusters(RemoteClusterService remoteClusterService) {
remoteClusterService.initializeRemoteClusters(linkedProjectConfigService.getInitialLinkedProjectConfigs());
}
Expand Down