Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions lib/sink/connection/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,15 @@ defmodule Sink.Connection.Server do
"""
@spec connected_clients() :: list(String.t())
def connected_clients do
Registry.select(@registry, [{{:"$1", :_, :_}, [], [:"$1"]}])
Registry.select(@registry, [{{:"$1", :_, :"$2"}, [{:"/=", :"$2", nil}], [:"$1"]}])
end

@doc """
Returns the number of currently connected clients.

For large numbers of connected clients this will be more performant than `connected_clients`
"""
@spec connected_clients_count() :: non_neg_integer()
def connected_clients_count do
Registry.count(@registry)
length(connected_clients())
end

@doc """
Expand All @@ -29,7 +27,7 @@ defmodule Sink.Connection.Server do
|> Registry.lookup(client_id)
|> case do
[] -> nil
[{_pid, connected_at}] -> connected_at
[{_pid, connected_at_or_nil}] -> connected_at_or_nil
end
end
end
8 changes: 6 additions & 2 deletions lib/sink/connection/server_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ defmodule Sink.Connection.ServerHandler do
)

:ok =
case Registry.register(@registry, client_id, DateTime.utc_now()) do
case Registry.register(@registry, client_id, nil) do
{:ok, _} ->
:ok

Expand Down Expand Up @@ -401,6 +401,10 @@ defmodule Sink.Connection.ServerHandler do
handler.handle_connection_response(new_state.client, other)
end

if ConnectionStatus.connected?(new_state.connection_status) do
Registry.update_value(@registry, client_id, fn _ -> DateTime.utc_now() end)
end

{new_state, {:connection_response, {:connection_response, encodeable_response}}}

{:ack, message_id} ->
Expand Down Expand Up @@ -554,7 +558,7 @@ defmodule Sink.Connection.ServerHandler do
end

defp register_when_clear(client_id) do
case Registry.register(@registry, client_id, DateTime.utc_now()) do
case Registry.register(@registry, client_id, nil) do
{:ok, _} ->
:ok

Expand Down
18 changes: 18 additions & 0 deletions test/sink/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ defmodule Sink.ConnectionTest do

assert Sink.Connection.Client.connected?()
assert Sink.Connection.ServerHandler.connected?("abc123")
assert %DateTime{} = Sink.Connection.Server.connected_at("abc123")
assert "abc123" in Sink.Connection.Server.connected_clients()

assert_received msg
assert {:handle_connection_response, :connected} = msg
Expand Down Expand Up @@ -157,6 +159,8 @@ defmodule Sink.ConnectionTest do

assert Sink.Connection.Client.connected?()
assert Sink.Connection.ServerHandler.connected?("abc123")
assert %DateTime{} = Sink.Connection.Server.connected_at("abc123")
assert "abc123" in Sink.Connection.Server.connected_clients()

stop_supervised!(Sink.Connection.Client)
stop_supervised!(Sink.Connection.ServerListener)
Expand Down Expand Up @@ -197,6 +201,8 @@ defmodule Sink.ConnectionTest do

assert Sink.Connection.Client.connected?()
assert Sink.Connection.ServerHandler.connected?("abc123")
assert %DateTime{} = Sink.Connection.Server.connected_at("abc123")
assert "abc123" in Sink.Connection.Server.connected_clients()

stop_supervised!(Sink.Connection.Client)
stop_supervised!(Sink.Connection.ServerListener)
Expand Down Expand Up @@ -235,13 +241,17 @@ defmodule Sink.ConnectionTest do

refute Sink.Connection.Client.connected?()
refute Sink.Connection.ServerHandler.connected?("abc123")
refute Sink.Connection.Server.connected_at("abc123")
refute "abc123" in Sink.Connection.Server.connected_clients()

# # give it time to connect

Process.sleep(@time_to_connect)

assert Sink.Connection.Client.connected?()
assert Sink.Connection.ServerHandler.connected?("abc123")
assert %DateTime{} = Sink.Connection.Server.connected_at("abc123")
assert "abc123" in Sink.Connection.Server.connected_clients()

stop_supervised!(Sink.Connection.Client)
stop_supervised!(Sink.Connection.ServerListener)
Expand Down Expand Up @@ -282,6 +292,8 @@ defmodule Sink.ConnectionTest do

refute Sink.Connection.Client.connected?()
refute Sink.Connection.ServerHandler.connected?("abc123")
refute Sink.Connection.Server.connected_at("abc123")
refute "abc123" in Sink.Connection.Server.connected_clients()

assert {:error, :no_connection} == Sink.Connection.Client.publish(@event, @ack_key)

Expand Down Expand Up @@ -327,6 +339,8 @@ defmodule Sink.ConnectionTest do

refute Sink.Connection.Client.connected?()
refute Sink.Connection.ServerHandler.connected?("abc123")
refute Sink.Connection.Server.connected_at("abc123")
refute "abc123" in Sink.Connection.Server.connected_clients()

assert {:error, :no_connection} == Sink.Connection.Client.publish(@event, @ack_key)

Expand Down Expand Up @@ -368,6 +382,8 @@ defmodule Sink.ConnectionTest do

refute Sink.Connection.Client.connected?()
refute Sink.Connection.ServerHandler.connected?("abc123")
refute Sink.Connection.Server.connected_at("abc123")
refute "abc123" in Sink.Connection.Server.connected_clients()

stop_supervised!(Sink.Connection.Client)
stop_supervised!(Sink.Connection.ServerListener)
Expand Down Expand Up @@ -408,6 +424,8 @@ defmodule Sink.ConnectionTest do

refute Sink.Connection.Client.connected?()
refute Sink.Connection.ServerHandler.connected?("abc123")
refute Sink.Connection.Server.connected_at("abc123")
refute "abc123" in Sink.Connection.Server.connected_clients()

stop_supervised!(Sink.Connection.Client)
stop_supervised!(Sink.Connection.ServerListener)
Expand Down