-
Notifications
You must be signed in to change notification settings - Fork 840
Initialize AuTest port queue concurrently #12291
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import asyncio | ||
from typing import Set | ||
import socket | ||
import subprocess | ||
|
@@ -30,6 +31,72 @@ | |
g_ports = None # ports we can use | ||
|
||
|
||
class AsyncPortQueue(OrderedSetQueue): | ||
|
||
def __init__(self): | ||
super().__init__() | ||
self._listening_ports = _get_listening_ports() | ||
|
||
async def select_available(self, amount, dmin, dmax): | ||
rmin = dmin - 2000 | ||
rmax = 65536 - dmax | ||
|
||
port_tasks = [] | ||
await asyncio.gather(*port_tasks) | ||
if rmax > amount: | ||
# Fill in ports, starting above the upper OS-usable port range. | ||
port = dmax + 1 | ||
while port < 65536 and self.qsize() < amount: | ||
port_tasks.append(self._check_port(port)) | ||
port += 1 | ||
if rmin > amount and self.qsize() < amount: | ||
port = 2001 | ||
# Fill in more ports, starting at 2001, well above well known ports, | ||
# and going up until the minimum port range used by the OS. | ||
while port < dmin and self.qsize() < amount: | ||
port_tasks.append(self._check_port(port)) | ||
port += 1 | ||
|
||
await asyncio.gather(*port_tasks) | ||
Comment on lines
+46
to
+60
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe we're doing more port checks than originally here. On my system, this adds about 35k or so port_tasks values here, each of which are done even though we only need 1k. Originally, we we essentially short circuit the loop once There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you think of this? It iteratively adds tasks until we get the desired amount of free ports: diff --git a/tests/gold_tests/autest-site/ports.py b/tests/gold_tests/autest-site/ports.py
index c5259634c..951bcc3fe 100644
--- a/tests/gold_tests/autest-site/ports.py
+++ b/tests/gold_tests/autest-site/ports.py
@@ -17,7 +17,7 @@
# limitations under the License.
import asyncio
-from typing import Set
+from typing import Generator, Set
import socket
import subprocess
import os
@@ -37,27 +37,45 @@ class AsyncPortQueue(OrderedSetQueue):
super().__init__()
self._listening_ports = _get_listening_ports()
- async def select_available(self, amount, dmin, dmax):
- rmin = dmin - 2000
- rmax = 65536 - dmax
+ async def select_available(self, amount: int, dmin: int, dmax: int) -> None:
+ '''Populate the port queue with ports that are not in use by the OS.
- port_tasks = []
- await asyncio.gather(*port_tasks)
- if rmax > amount:
+ This method fills in the queue with at least `amount` ports that are not
+ within `dmin` and `dmax`.
+
+ :param amount: The number of ports to populate the queue with.
+ :param dmin: The minimum port number we expect that the OS uses.
+ :param dmax: The maximum port number we expect that the OS uses.
+ '''
+ def task_generator(amount: int, dmin: int, dmax: int) -> Generator:
+ task_counter = 0
# Fill in ports, starting above the upper OS-usable port range.
port = dmax + 1
- while port < 65536 and self.qsize() < amount:
- port_tasks.append(self._check_port(port))
+ while port < 65536 and task_counter < amount:
+ yield self._check_port(port)
port += 1
- if rmin > amount and self.qsize() < amount:
- port = 2001
+ task_counter += 1
# Fill in more ports, starting at 2001, well above well known ports,
# and going up until the minimum port range used by the OS.
- while port < dmin and self.qsize() < amount:
- port_tasks.append(self._check_port(port))
+ port = 2001
+ while port < dmin and task_counter < amount:
+ yield self._check_port(port)
port += 1
-
- await asyncio.gather(*port_tasks)
+ task_counter += 1
+
+ tasks = task_generator(amount, dmin, dmax)
+ while self.qsize() < amount:
+ port_tasks = []
+ ports_still_needed = amount - self.qsize()
+ while len(port_tasks) < ports_still_needed:
+ try:
+ port_tasks.append(next(tasks))
+ except StopIteration:
+ # No more tasks to generate.
+ host.WriteWarning('Ran out of ports to check, stopping port queue setup.')
+ return
+ host.WriteDebug('_setup_port_queue', f"Gathering {len(port_tasks)} port tasks out of {ports_still_needed} desired.")
+ await asyncio.gather(*port_tasks)
async def _check_port(self, port):
if await self._is_port_open(port): With this change:
0.58 seconds is still more than 0.41 seconds (the amount of time before any of the patches on this branch), but the value is so close and small no one will notice it. Can you check that it sill makes your system's check efficient? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have timed this patch. (The test failed because the plugin it needed wasn't compiled.)
It's fast enough that working on an AuTest is possible if you have a lot of patience. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @bneradt am I reading the code wrong, or will the generator not yield more than 1000 tasks, which may be too few to find 1000 open ports? Does the |
||
|
||
async def _check_port(self, port): | ||
if await self._is_port_open(port): | ||
host.WriteDebug('_setup_port_queue', f"Rejecting an already open port: {port}") | ||
else: | ||
host.WriteDebug('_setup_port_queue', f"Adding a possible port to connect to: {port}") | ||
self.put(port) | ||
|
||
async def _is_port_open(self, port, address=None): | ||
ret = False | ||
if address is None: | ||
address = "localhost" | ||
|
||
if port in self._listening_ports: | ||
host.WriteDebug('PortOpen', f"{port} is open because it is in the listening sockets set.") | ||
return True | ||
|
||
try: | ||
# Try to connect on that port. If we can connect on it, then someone is | ||
# listening on that port and therefore the port is open. | ||
reader, writer = await asyncio.open_connection(address, port, limit=1) | ||
writer.close() | ||
await writer.wait_closed() | ||
ret = True | ||
host.WriteDebug( | ||
'PortOpen', f"Connection to port {port} succeeded, the port is open, " | ||
"and a future connection cannot use it") | ||
except ConnectionRefusedError: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Running this locally, in a Rancher docker vm on my Apple silicon macbook pro, I get the following exception:
It seems like we need to add OSError to the list of handles exceptions: diff --git a/tests/gold_tests/autest-site/ports.py b/tests/gold_tests/autest-site/ports.py
index 0b8ec00e5..c5259634c 100644
--- a/tests/gold_tests/autest-site/ports.py
+++ b/tests/gold_tests/autest-site/ports.py
@@ -85,7 +85,7 @@ class AsyncPortQueue(OrderedSetQueue):
host.WriteDebug(
'PortOpen', f"Connection to port {port} succeeded, the port is open, "
"and a future connection cannot use it")
- except ConnectionRefusedError:
+ except (ConnectionRefusedError, OSError):
host.WriteDebug(
'PortOpen', f"socket error for port {port}, port is closed, "
"and therefore a future connection can use it") |
||
host.WriteDebug( | ||
'PortOpen', f"socket error for port {port}, port is closed, " | ||
"and therefore a future connection can use it") | ||
except TimeoutError: | ||
host.WriteDebug( | ||
'PortOpen', f"Timeout error for port {port}, port is closed, " | ||
"and therefore a future connection can use it") | ||
|
||
return ret | ||
|
||
|
||
class PortQueueSelectionError(Exception): | ||
""" | ||
An exception for when there are problems selecting a port from the port | ||
|
@@ -147,13 +214,11 @@ def _setup_port_queue(amount=1000): | |
Build up the set of ports that the OS in theory will not use. | ||
""" | ||
global g_ports | ||
if g_ports is None: | ||
host.WriteDebug('_setup_port_queue', "Populating the port queue.") | ||
g_ports = OrderedSetQueue() | ||
else: | ||
if g_ports is not None: | ||
# The queue has already been populated. | ||
host.WriteDebug('_setup_port_queue', f"Queue was previously populated. Queue size: {g_ports.qsize()}") | ||
return | ||
|
||
try: | ||
# Use sysctl to find the range of ports that the OS publishes it uses. | ||
# some docker setups don't have sbin setup correctly | ||
|
@@ -171,31 +236,17 @@ def _setup_port_queue(amount=1000): | |
host.WriteWarning("Unable to call sysctrl!\n Tests may fail because of bad port selection!") | ||
return | ||
|
||
rmin = dmin - 2000 | ||
rmax = 65536 - dmax | ||
host.WriteDebug('_setup_port_queue', "Populating the port queue.") | ||
g_ports = AsyncPortQueue() | ||
|
||
listening_ports = _get_listening_ports() | ||
if rmax > amount: | ||
# Fill in ports, starting above the upper OS-usable port range. | ||
port = dmax + 1 | ||
while port < 65536 and g_ports.qsize() < amount: | ||
if PortOpen(port, listening_ports=listening_ports): | ||
host.WriteDebug('_setup_port_queue', f"Rejecting an already open port: {port}") | ||
else: | ||
host.WriteDebug('_setup_port_queue', f"Adding a possible port to connect to: {port}") | ||
g_ports.put(port) | ||
port += 1 | ||
if rmin > amount and g_ports.qsize() < amount: | ||
port = 2001 | ||
# Fill in more ports, starting at 2001, well above well known ports, | ||
# and going up until the minimum port range used by the OS. | ||
while port < dmin and g_ports.qsize() < amount: | ||
if PortOpen(port, listening_ports=listening_ports): | ||
host.WriteDebug('_setup_port_queue', f"Rejecting an already open port: {port}") | ||
else: | ||
host.WriteDebug('_setup_port_queue', f"Adding a possible port to connect to: {port}") | ||
g_ports.put(port) | ||
port += 1 | ||
async def async_setup(): | ||
await g_ports.select_available(amount, dmin, dmax) | ||
|
||
try: | ||
loop = asyncio.get_running_loop() | ||
loop.call_soon(async_setup()) | ||
except RuntimeError: | ||
asyncio.run(async_setup()) | ||
|
||
|
||
def _get_port_by_bind(): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this await here on an empty port_tasks intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope.