Skip to content

Commit 7583c64

Browse files
image: T5455: Add migration of SSH known_hosts files during image upgrade
During upgrade, the script now checks if any `known_hosts` files exist. If so, it prompts the user to save these SSH fingerprints, and upon confirmation, copies the files to the new image persistence directory.
1 parent 9684092 commit 7583c64

File tree

4 files changed

+152
-27
lines changed

4 files changed

+152
-27
lines changed

python/vyos/utils/auth.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,34 @@
2020

2121
from enum import StrEnum
2222
from decimal import Decimal
23+
from pwd import getpwall, getpwnam
2324
from vyos.utils.process import cmd
2425

25-
26+
# Minimum UID used when adding system users
27+
MIN_USER_UID: int = 1000
28+
# Maximim UID used when adding system users
29+
MAX_USER_UID: int = 59999
30+
# List of local user accounts that must be preserved
31+
SYSTEM_USER_SKIP_LIST: frozenset = {
32+
'radius_user',
33+
'radius_priv_user',
34+
'tacacs0',
35+
'tacacs1',
36+
'tacacs2',
37+
'tacacs3',
38+
'tacacs4',
39+
'tacacs5',
40+
'tacacs6',
41+
'tacacs7',
42+
'tacacs8',
43+
'tacacs9',
44+
'tacacs10',
45+
'tacacs11',
46+
'tacacs12',
47+
'tacacs13',
48+
'tacacs14',
49+
'tacacs15',
50+
}
2651
DEFAULT_PASSWORD: str = 'vyos'
2752
LOW_ENTROPY_MSG: str = 'should be at least 8 characters long;'
2853
WEAK_PASSWORD_MSG: str = 'The password complexity is too low - @MSG@'
@@ -119,3 +144,24 @@ def get_current_user() -> str:
119144
elif 'USER' in os.environ:
120145
current_user = os.environ['USER']
121146
return current_user
147+
148+
149+
def get_local_users(min_uid=MIN_USER_UID, max_uid=MAX_USER_UID) -> list:
150+
"""Return list of dynamically allocated users (see Debian Policy Manual)"""
151+
local_users = []
152+
153+
for s_user in getpwall():
154+
if s_user.pw_uid < min_uid:
155+
continue
156+
if s_user.pw_uid > max_uid:
157+
continue
158+
if s_user.pw_name in SYSTEM_USER_SKIP_LIST:
159+
continue
160+
local_users.append(s_user.pw_name)
161+
162+
return local_users
163+
164+
165+
def get_user_home_dir(user: str) -> str:
166+
"""Return user's home directory"""
167+
return getpwnam(user).pw_dir

src/conf_mode/system_login.py

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
from passlib.hosts import linux_context
2020
from psutil import users
2121
from pwd import getpwall
22-
from pwd import getpwnam
2322
from pwd import getpwuid
2423
from sys import exit
2524
from time import sleep
@@ -34,6 +33,7 @@
3433
from vyos.utils.auth import EPasswdStrength
3534
from vyos.utils.auth import evaluate_strength
3635
from vyos.utils.auth import get_current_user
36+
from vyos.utils.auth import MIN_USER_UID, get_local_users, get_user_home_dir
3737
from vyos.utils.configfs import delete_cli_node
3838
from vyos.utils.configfs import add_cli_node
3939
from vyos.utils.dict import dict_search
@@ -53,10 +53,6 @@
5353
tacacs_nss_config_file = "/etc/tacplus_nss.conf"
5454
nss_config_file = "/etc/nsswitch.conf"
5555

56-
# Minimum UID used when adding system users
57-
MIN_USER_UID: int = 1000
58-
# Maximim UID used when adding system users
59-
MAX_USER_UID: int = 59999
6056
# LOGIN_TIMEOUT from /etc/loign.defs minus 10 sec
6157
MAX_RADIUS_TIMEOUT: int = 50
6258
# MAX_RADIUS_TIMEOUT divided by 2 sec (minimum recomended timeout)
@@ -65,25 +61,7 @@
6561
MAX_TACACS_COUNT: int = 8
6662
# Minimum USER id for TACACS users
6763
MIN_TACACS_UID = 900
68-
# List of local user accounts that must be preserved
69-
SYSTEM_USER_SKIP_LIST: list = ['radius_user', 'radius_priv_user', 'tacacs0', 'tacacs1',
70-
'tacacs2', 'tacacs3', 'tacacs4', 'tacacs5', 'tacacs6',
71-
'tacacs7', 'tacacs8', 'tacacs9', 'tacacs10',' tacacs11',
72-
'tacacs12', 'tacacs13', 'tacacs14', 'tacacs15']
73-
74-
def get_local_users(min_uid=MIN_USER_UID, max_uid=MAX_USER_UID):
75-
"""Return list of dynamically allocated users (see Debian Policy Manual)"""
76-
local_users = []
77-
for s_user in getpwall():
78-
if getpwnam(s_user.pw_name).pw_uid < min_uid:
79-
continue
80-
if getpwnam(s_user.pw_name).pw_uid > max_uid:
81-
continue
82-
if s_user.pw_name in SYSTEM_USER_SKIP_LIST:
83-
continue
84-
local_users.append(s_user.pw_name)
85-
86-
return local_users
64+
8765

8866
def get_shadow_password(username):
8967
with open('/etc/shadow') as f:
@@ -342,7 +320,7 @@ def apply(login):
342320
# crazy user will choose username root or any other system user which will fail.
343321
#
344322
# XXX: Should we deny using root at all?
345-
home_dir = getpwnam(user).pw_dir
323+
home_dir = get_user_home_dir(user)
346324
# always re-render SSH keys with appropriate permissions
347325
render(f'{home_dir}/.ssh/authorized_keys', 'login/authorized_keys.j2',
348326
user_config, permission=0o600,
@@ -397,7 +375,7 @@ def apply(login):
397375
# Disable user to prevent re-login
398376
call(f'usermod -s /sbin/nologin {user}')
399377

400-
home_dir = getpwnam(user).pw_dir
378+
home_dir = get_user_home_dir(user)
401379
# Remove SSH authorized keys file
402380
authorized_keys_file = f'{home_dir}/.ssh/authorized_keys'
403381
if os.path.exists(authorized_keys_file):

src/op_mode/image_installer.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
from vyos.utils.file import read_file
5858
from vyos.utils.file import write_file
5959
from vyos.utils.process import cmd, run, rc_cmd
60+
from vyos.utils.auth import get_local_users, get_user_home_dir
6061
from vyos.version import get_version_data
6162

6263
# define text messages
@@ -715,6 +716,20 @@ def copy_ssh_host_keys() -> bool:
715716
return False
716717

717718

719+
def copy_ssh_known_hosts() -> bool:
720+
"""Ask user to copy SSH `known_hosts` files
721+
722+
Returns:
723+
bool: user's decision
724+
"""
725+
known_hosts_files = get_known_hosts_files()
726+
msg = (
727+
'Would you like to save the SSH known hosts (fingerprints) '
728+
'from your current configuration?'
729+
)
730+
return known_hosts_files and ask_yes_no(msg, default=True)
731+
732+
718733
def console_hint() -> str:
719734
pid = getppid() if 'SUDO_USER' in environ else getpid()
720735
try:
@@ -1010,6 +1025,40 @@ def install_image() -> None:
10101025
exit(1)
10111026

10121027

1028+
def get_known_hosts_files() -> list:
1029+
"""Collect all existing `known_hosts` files for root and users under /home"""
1030+
1031+
files = []
1032+
1033+
# for root
1034+
base_files = ('/root/.ssh/known_hosts', '/etc/ssh/ssh_known_hosts')
1035+
for file_path in base_files:
1036+
root_known_hosts = Path(file_path)
1037+
if root_known_hosts.exists():
1038+
files.append(root_known_hosts)
1039+
1040+
# for each non-system user
1041+
for user in get_local_users():
1042+
home_dir = Path(get_user_home_dir(user))
1043+
if home_dir.exists():
1044+
known_hosts = home_dir / '.ssh' / 'known_hosts'
1045+
if known_hosts.exists():
1046+
files.append(known_hosts)
1047+
1048+
return files
1049+
1050+
1051+
def migrate_known_hosts(target_dir: str):
1052+
"""Copy `known_hosts` for root and all users to the new image directory"""
1053+
1054+
known_hosts_files = get_known_hosts_files()
1055+
for known_hosts_file in known_hosts_files:
1056+
# copy file to target directory
1057+
target_known_hosts = Path(f'{target_dir}{known_hosts_file}')
1058+
target_known_hosts.parent.mkdir(parents=True, exist_ok=True)
1059+
copy(known_hosts_file, target_known_hosts)
1060+
1061+
10131062
@compat.grub_cfg_update
10141063
def add_image(image_path: str, vrf: str = None, username: str = '',
10151064
password: str = '', no_prompt: bool = False, force: bool = False) -> None:
@@ -1115,6 +1164,11 @@ def add_image(image_path: str, vrf: str = None, username: str = '',
11151164
for host_key in host_keys:
11161165
copy(host_key, target_ssh_dir)
11171166

1167+
target_ssh_known_hosts_dir: str = f'{root_dir}/boot/{image_name}/rw'
1168+
if no_prompt or copy_ssh_known_hosts():
1169+
print('Copying SSH known_hosts files')
1170+
migrate_known_hosts(target_ssh_known_hosts_dir)
1171+
11181172
# copy system image and kernel files
11191173
print('Copying system image files')
11201174
for file in Path(f'{DIR_ISO_MOUNT}/live').iterdir():

src/tests/test_utils.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@
1212
# You should have received a copy of the GNU General Public License
1313
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1414

15+
import pwd
1516
from unittest import TestCase
17+
18+
from vyos.utils import auth
19+
20+
1621
class TestVyOSUtils(TestCase):
1722
def test_key_mangling(self):
1823
from vyos.utils.dict import mangle_dict_keys
@@ -24,3 +29,45 @@ def test_key_mangling(self):
2429
def test_sysctl_read(self):
2530
from vyos.utils.system import sysctl_read
2631
self.assertEqual(sysctl_read('net.ipv4.conf.lo.forwarding'), '1')
32+
33+
34+
class TestVyOSUtilsAuth(TestCase):
35+
36+
def test_get_local_users_returns_existing_usernames(self):
37+
# Returned users exist, skip list is excluded, and UIDs are in range
38+
39+
all_users = set(s_user.pw_name for s_user in pwd.getpwall())
40+
local_users = auth.get_local_users()
41+
42+
# All returned users must really exist
43+
for user in local_users:
44+
self.assertIn(user, all_users)
45+
46+
# Nobody in the skip list
47+
for skipped in auth.SYSTEM_USER_SKIP_LIST:
48+
self.assertNotIn(skipped, local_users)
49+
50+
# All are within UID range
51+
for s_user in pwd.getpwall():
52+
if s_user.pw_name in local_users:
53+
self.assertGreaterEqual(s_user.pw_uid, auth.MIN_USER_UID)
54+
self.assertLessEqual(s_user.pw_uid, auth.MAX_USER_UID)
55+
56+
def test_get_user_home_dir_for_real_user(self):
57+
# User's homedir is a non-empty string for a valid user
58+
59+
local_users = auth.get_local_users()
60+
if local_users:
61+
for user in local_users:
62+
home_dir = auth.get_user_home_dir(user)
63+
self.assertIsInstance(home_dir, str)
64+
self.assertTrue(bool(home_dir)) # Should not be empty
65+
else:
66+
self.skipTest("No suitable non-system users found on this system")
67+
68+
def test_get_user_home_dir_invalid_user(self):
69+
# Raises KeyError for nonexistent username
70+
71+
user = "__this_user_does_not_exist__" # Test using unlikely username
72+
with self.assertRaises(KeyError):
73+
auth.get_user_home_dir(user)

0 commit comments

Comments
 (0)