Skip to content

Commit 74a68ff

Browse files
committed
WIP
1 parent 3d043e8 commit 74a68ff

File tree

1 file changed

+69
-19
lines changed

1 file changed

+69
-19
lines changed

deps/rabbit/src/rabbit_db_cluster.erl

Lines changed: 69 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
-module(rabbit_db_cluster).
1010

1111
-include_lib("kernel/include/logger.hrl").
12+
-include_lib("stdlib/include/assert.hrl").
1213

1314
-include_lib("rabbit_common/include/logging.hrl").
1415

@@ -253,27 +254,30 @@ join_using_khepri(_ClusterNodes, ram = NodeType) ->
253254
RemoveWhenOffline :: boolean().
254255
%% @doc Removes `Node' from the cluster.
255256

256-
forget_member(Node, RemoveWhenOffline) ->
257-
case forget_member0(Node, RemoveWhenOffline) of
258-
ok ->
259-
rabbit_node_monitor:notify_left_cluster(Node);
260-
Error ->
261-
Error
257+
forget_member(Node, RemoveWhenOffline)
258+
when is_atom(Node) andalso Node =/= node() ->
259+
{ok, InitialState} = lock_cluster_changes(Node),
260+
try
261+
forget_member_locked(Node, RemoveWhenOffline)
262+
after
263+
unlock_cluster_changes(InitialState)
262264
end.
263265

264-
forget_member0(Node, RemoveWhenOffline) ->
265-
case rabbit:is_running(Node) of
266-
false ->
267-
?LOG_DEBUG(
268-
"DB: removing cluster member `~ts`", [Node],
269-
#{domain => ?RMQLOG_DOMAIN_DB}),
270-
case rabbit_khepri:is_enabled() of
271-
true -> forget_member_using_khepri(Node, RemoveWhenOffline);
272-
false -> forget_member_using_mnesia(Node, RemoveWhenOffline)
273-
end;
274-
true ->
275-
{error, {failed_to_remove_node, Node, rabbit_still_running}}
276-
end.
266+
forget_member_locked(Node, RemoveWhenOffline) ->
267+
?LOG_DEBUG(
268+
"DB: removing cluster member `~ts`", [Node],
269+
#{domain => ?RMQLOG_DOMAIN_DB}),
270+
?assertNot(rabbit:is_running(Node)),
271+
case rabbit_khepri:is_enabled() of
272+
true -> forget_member_using_khepri(Node, RemoveWhenOffline);
273+
false -> forget_member_using_mnesia(Node, RemoveWhenOffline)
274+
end,
275+
276+
rabbit_amqqueue:forget_all(Node),
277+
rabbit_quorum_queue:shrink_all(Node),
278+
rabbit_stream_queue:delete_all_replicas(Node),
279+
rabbit_stream_coordinator:forget_node(Node),
280+
rabbit_node_monitor:notify_left_cluster(Node).
277281

278282
forget_member_using_mnesia(Node, RemoveWhenOffline) ->
279283
rabbit_mnesia:forget_cluster_node(Node, RemoveWhenOffline).
@@ -287,6 +291,52 @@ forget_member_using_khepri(_Node, true) ->
287291
forget_member_using_khepri(Node, false = _RemoveWhenOffline) ->
288292
rabbit_khepri:remove_member(Node).
289293

294+
lock_cluster_changes(ChangingNode) ->
295+
RabbitWasRunning = stop_rabbit_if_running(ChangingNode),
296+
InitialState = #{changing_node => ChangingNode,
297+
rabbit_was_running => RabbitWasRunning},
298+
299+
%% We acquire the feature flags registry reload lock because between
300+
%% the time we reset the registry (as part of `rabbit_db:reset/0' and
301+
%% the states copy from the remote node, there could be a concurrent
302+
%% reload of the registry (for instance because of peer discovery on
303+
%% another node) with the default/empty states.
304+
%%
305+
%% To make this work, the lock is also acquired from
306+
%% `rabbit_ff_registry_wrapper'.
307+
rabbit_ff_registry_factory:acquire_state_change_lock(),
308+
{ok, InitialState}.
309+
310+
stop_rabbit_if_running(ThisNode) when ThisNode =:= node() ->
311+
RabbitWasRunning = rabbit:is_running(),
312+
case RabbitWasRunning of
313+
true -> ok = rabbit:stop();
314+
false -> ok
315+
end,
316+
RabbitWasRunning;
317+
stop_rabbit_if_running(RemoteNode) when is_atom(RemoteNode) ->
318+
RabbitWasRunning = erpc:call(RemoteNode, rabbit, is_running, []),
319+
case RabbitWasRunning of
320+
true -> ok = erpc:call(RemoteNode, rabbit, stop, []);
321+
false -> ok
322+
end,
323+
RabbitWasRunning.
324+
325+
unlock_cluster_changes(
326+
#{changing_node := ChangingNode,
327+
rabbit_was_running := RabbitWasRunning}) ->
328+
rabbit_ff_registry_factory:release_state_change_lock(),
329+
start_rabbit_if_was_running(ChangingNode, RabbitWasRunning),
330+
ok.
331+
332+
start_rabbit_if_was_running(_ChangingNode, false = _RabbitWasRunning) ->
333+
ok;
334+
start_rabbit_if_was_running(ThisNode, true = _RabbitWasRunning)
335+
when ThisNode =:= node() ->
336+
rabbit:start();
337+
start_rabbit_if_was_running(RemoteNode, true = _RabbitWasRunning) ->
338+
erpc:call(RemoteNode, rabbit, start, []).
339+
290340
%% -------------------------------------------------------------------
291341
%% Cluster update.
292342
%% -------------------------------------------------------------------

0 commit comments

Comments
 (0)