Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 124 additions & 74 deletions packages/client/lib/tests/test-scenario/connection-handoff.e2e.ts
Original file line number Diff line number Diff line change
@@ -1,126 +1,176 @@
import diagnostics_channel from "node:diagnostics_channel";
import { FaultInjectorClient } from "./fault-injector-client";
import {
createTestClient,
getDatabaseConfig,
getDatabaseConfigFromEnv,
getEnvConfig,
RedisConnectionConfig,
} from "./test-scenario.util";
import { createClient } from "../../..";
import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager";
import { createClient, RedisClientOptions } from "../../..";
import { before } from "mocha";
import { spy } from "sinon";
import Sinon, { SinonSpy, spy, stub } from "sinon";
import assert from "node:assert";
import { TestCommandRunner } from "./test-command-runner";
import net from "node:net";

describe("Connection Handoff", () => {
const diagnosticsLog: DiagnosticsEvent[] = [];
/**
* Creates a spy on a duplicated client method
* @param client - The Redis client instance
* @param funcName - The name of the method to spy on
* @returns Object containing the promise that resolves with the spy and restore function
*/
const spyOnTemporaryClientInstanceMethod = (
client: ReturnType<typeof createClient<any, any, any, any>>,
methodName: string
) => {
const { promise, resolve } = (
Promise as typeof Promise & {
withResolvers: () => {
promise: Promise<{ spy: SinonSpy<any[], any>; restore: () => void }>;
resolve: (value: any) => void;
};
}
).withResolvers();

const originalDuplicate = client.duplicate.bind(client);

const duplicateStub: Sinon.SinonStub<any[], any> = stub(
// Temporary clients (in the context of hitless upgrade)
// are created by calling the duplicate method on the client.
Object.getPrototypeOf(client),
"duplicate"
).callsFake((opts) => {
const tmpClient = originalDuplicate(opts);
resolve({
spy: spy(tmpClient, methodName),
restore: duplicateStub.restore,
});

const onMessageHandler = (message: unknown) => {
diagnosticsLog.push(message as DiagnosticsEvent);
return tmpClient;
});

return {
getSpy: () => promise,
};
};

describe("Connection Handoff", () => {
let clientConfig: RedisConnectionConfig;
let client: ReturnType<typeof createClient<any, any, any, 3>>;
let client: ReturnType<typeof createClient<any, any, any, any>>;
let faultInjectorClient: FaultInjectorClient;
let connectSpy = spy(net, "createConnection");

before(() => {
const envConfig = getEnvConfig();
const redisConfig = getDatabaseConfigFromEnv(
envConfig.redisEndpointsConfigPath,
envConfig.redisEndpointsConfigPath
);

faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl);
clientConfig = getDatabaseConfig(redisConfig);
});

beforeEach(async () => {
diagnosticsLog.length = 0;
diagnostics_channel.subscribe("redis.maintenance", onMessageHandler);

connectSpy.resetHistory();
afterEach(async () => {
if (client && client.isOpen) {
await client.flushAll();
client.destroy();
}
});

client = createClient({
socket: {
host: clientConfig.host,
port: clientConfig.port,
...(clientConfig.tls === true ? { tls: true } : {}),
describe("New Connection Establishment & Traffic Resumption", () => {
const cases: Array<{
name: string;
clientOptions: Partial<RedisClientOptions>;
}> = [
{
name: "default options",
clientOptions: {},
},
password: clientConfig.password,
username: clientConfig.username,
RESP: 3,
maintPushNotifications: "auto",
maintMovingEndpointType: "external-ip",
maintRelaxedCommandTimeout: 10000,
maintRelaxedSocketTimeout: 10000,
});
{
name: "external-ip",
clientOptions: {
maintMovingEndpointType: "external-ip",
},
},
{
name: "external-fqdn",
clientOptions: {
maintMovingEndpointType: "external-fqdn",
},
},
{
name: "auto",
clientOptions: {
maintMovingEndpointType: "auto",
},
},
{
name: "none",
clientOptions: {
maintMovingEndpointType: "none",
},
},
];

client.on("error", (err: Error) => {
throw new Error(`Client error: ${err.message}`);
});
for (const { name, clientOptions } of cases) {
it(`should establish new connection and resume traffic afterwards - ${name}`, async () => {
client = await createTestClient(clientConfig, clientOptions);

await client.connect();
await client.flushAll();
});
const spyObject = spyOnTemporaryClientInstanceMethod(client, "connect");

afterEach(() => {
diagnostics_channel.unsubscribe("redis.maintenance", onMessageHandler);
client.destroy();
});
// PART 1 Establish initial connection
const { action_id: lowTimeoutBindAndMigrateActionId } =
await faultInjectorClient.migrateAndBindAction({
bdbId: clientConfig.bdbId,
clusterIndex: 0,
});

describe("New Connection Establishment", () => {
it("should establish new connection", async () => {
assert.equal(connectSpy.callCount, 1);
await faultInjectorClient.waitForAction(
lowTimeoutBindAndMigrateActionId
);

const { action_id: lowTimeoutBindAndMigrateActionId } =
await faultInjectorClient.migrateAndBindAction({
bdbId: clientConfig.bdbId,
clusterIndex: 0,
});
const spyResult = await spyObject.getSpy();

const lowTimeoutWaitPromise = faultInjectorClient.waitForAction(
lowTimeoutBindAndMigrateActionId,
);
assert.strictEqual(spyResult.spy.callCount, 1);

await lowTimeoutWaitPromise;
assert.equal(connectSpy.callCount, 2);
});
// PART 2 Verify traffic resumption
const currentTime = Date.now().toString();
await client.set("key", currentTime);
const result = await client.get("key");

assert.strictEqual(result, currentTime);

spyResult.restore();
});
}
});

describe("TLS Connection Handoff", () => {
it("TODO receiveMessagesWithTLSEnabledTest", async () => {
it.skip("TODO receiveMessagesWithTLSEnabledTest", async () => {
//
});
it("TODO connectionHandoffWithStaticInternalNameTest", async () => {
it.skip("TODO connectionHandoffWithStaticInternalNameTest", async () => {
//
});
it("TODO connectionHandoffWithStaticExternalNameTest", async () => {
it.skip("TODO connectionHandoffWithStaticExternalNameTest", async () => {
//
});
});

describe("Traffic Resumption", () => {
it("Traffic resumed after handoff", async () => {
const { action_id } = await faultInjectorClient.migrateAndBindAction({
bdbId: clientConfig.bdbId,
clusterIndex: 0,
});
describe("Connection Cleanup", () => {
it("should shut down old connection", async () => {
const spyObject = spyOnTemporaryClientInstanceMethod(client, "destroy");

const { action_id: lowTimeoutBindAndMigrateActionId } =
await faultInjectorClient.migrateAndBindAction({
bdbId: clientConfig.bdbId,
clusterIndex: 0,
});

const workloadPromise = faultInjectorClient.waitForAction(action_id);
await faultInjectorClient.waitForAction(lowTimeoutBindAndMigrateActionId);

const commandPromises =
await TestCommandRunner.fireCommandsUntilStopSignal(
client,
workloadPromise,
);
const spyResult = await spyObject.getSpy();

const rejected = (
await Promise.all(commandPromises.commandPromises)
).filter((result) => result.status === "rejected");
assert.equal(spyResult.spy.callCount, 1);

assert.ok(rejected.length === 0);
spyResult.restore();
});
});
});
21 changes: 5 additions & 16 deletions packages/client/lib/tests/test-scenario/fault-injector-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ export type ActionType =
| "execute_rlutil_command"
| "execute_rladmin_command"
| "migrate"
| "bind";
| "bind"
| "update_cluster_config";

export interface ActionRequest {
type: ActionType;
Expand Down Expand Up @@ -47,7 +48,9 @@ export class FaultInjectorClient {
* @param action The action request to trigger
* @throws {Error} When the HTTP request fails or response cannot be parsed as JSON
*/
public triggerAction<T = unknown>(action: ActionRequest): Promise<T> {
public triggerAction<T extends { action_id: string }>(
action: ActionRequest
): Promise<T> {
return this.#request<T>("POST", "/action", action);
}

Expand All @@ -60,20 +63,6 @@ export class FaultInjectorClient {
return this.#request<T>("GET", `/action/${actionId}`);
}

/**
* Executes an rladmin command.
* @param command The rladmin command to execute
* @param bdbId Optional database ID to target
* @throws {Error} When the HTTP request fails or response cannot be parsed as JSON
*/
public executeRladminCommand<T = unknown>(
command: string,
bdbId?: string
): Promise<T> {
const cmd = bdbId ? `rladmin -b ${bdbId} ${command}` : `rladmin ${command}`;
return this.#request<T>("POST", "/rladmin", cmd);
}

/**
* Waits for an action to complete.
* @param actionId The ID of the action to wait for
Expand Down
15 changes: 15 additions & 0 deletions packages/client/lib/tests/test-scenario/negative-tests.e2e.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import assert from "assert";
import { createClient } from "../../..";

describe("Negative tests", () => {
it("should only be enabled with RESP3", () => {
assert.throws(
() =>
createClient({
RESP: 2,
maintPushNotifications: "enabled",
}),
"Error: Graceful Maintenance is only supported with RESP3",
);
});
});
Loading
Loading