Skip to content

Commit 3aa7629

Browse files
image: T5455: Add migration of SSH known_hosts files during image upgrade
During upgrade, the script now checks if any `known_hosts` files exist. If so, it prompts the user to save these SSH fingerprints, and upon confirmation, copies the files to the new image persistence directory.
1 parent 9684092 commit 3aa7629

File tree

4 files changed

+156
-27
lines changed

4 files changed

+156
-27
lines changed

python/vyos/utils/auth.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,35 @@
2020

2121
from enum import StrEnum
2222
from decimal import Decimal
23+
from pwd import getpwall
24+
from pwd import getpwnam
2325
from vyos.utils.process import cmd
2426

25-
27+
# Minimum UID used when adding system users
28+
MIN_USER_UID: int = 1000
29+
# Maximim UID used when adding system users
30+
MAX_USER_UID: int = 59999
31+
# List of local user accounts that must be preserved
32+
SYSTEM_USER_SKIP_LIST: frozenset = {
33+
'radius_user',
34+
'radius_priv_user',
35+
'tacacs0',
36+
'tacacs1',
37+
'tacacs2',
38+
'tacacs3',
39+
'tacacs4',
40+
'tacacs5',
41+
'tacacs6',
42+
'tacacs7',
43+
'tacacs8',
44+
'tacacs9',
45+
'tacacs10',
46+
'tacacs11',
47+
'tacacs12',
48+
'tacacs13',
49+
'tacacs14',
50+
'tacacs15',
51+
}
2652
DEFAULT_PASSWORD: str = 'vyos'
2753
LOW_ENTROPY_MSG: str = 'should be at least 8 characters long;'
2854
WEAK_PASSWORD_MSG: str = 'The password complexity is too low - @MSG@'
@@ -119,3 +145,24 @@ def get_current_user() -> str:
119145
elif 'USER' in os.environ:
120146
current_user = os.environ['USER']
121147
return current_user
148+
149+
150+
def get_local_users(min_uid=MIN_USER_UID, max_uid=MAX_USER_UID) -> list:
151+
"""Return list of dynamically allocated users (see Debian Policy Manual)"""
152+
local_users = []
153+
154+
for s_user in getpwall():
155+
if s_user.pw_uid < min_uid:
156+
continue
157+
if s_user.pw_uid > max_uid:
158+
continue
159+
if s_user.pw_name in SYSTEM_USER_SKIP_LIST:
160+
continue
161+
local_users.append(s_user.pw_name)
162+
163+
return local_users
164+
165+
166+
def get_user_home_dir(user: str) -> str:
167+
"""Return user's home directory"""
168+
return getpwnam(user).pw_dir

src/conf_mode/system_login.py

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
from passlib.hosts import linux_context
2020
from psutil import users
2121
from pwd import getpwall
22-
from pwd import getpwnam
2322
from pwd import getpwuid
2423
from sys import exit
2524
from time import sleep
@@ -34,6 +33,9 @@
3433
from vyos.utils.auth import EPasswdStrength
3534
from vyos.utils.auth import evaluate_strength
3635
from vyos.utils.auth import get_current_user
36+
from vyos.utils.auth import get_local_users
37+
from vyos.utils.auth import get_user_home_dir
38+
from vyos.utils.auth import MIN_USER_UID
3739
from vyos.utils.configfs import delete_cli_node
3840
from vyos.utils.configfs import add_cli_node
3941
from vyos.utils.dict import dict_search
@@ -53,10 +55,6 @@
5355
tacacs_nss_config_file = "/etc/tacplus_nss.conf"
5456
nss_config_file = "/etc/nsswitch.conf"
5557

56-
# Minimum UID used when adding system users
57-
MIN_USER_UID: int = 1000
58-
# Maximim UID used when adding system users
59-
MAX_USER_UID: int = 59999
6058
# LOGIN_TIMEOUT from /etc/loign.defs minus 10 sec
6159
MAX_RADIUS_TIMEOUT: int = 50
6260
# MAX_RADIUS_TIMEOUT divided by 2 sec (minimum recomended timeout)
@@ -65,25 +63,7 @@
6563
MAX_TACACS_COUNT: int = 8
6664
# Minimum USER id for TACACS users
6765
MIN_TACACS_UID = 900
68-
# List of local user accounts that must be preserved
69-
SYSTEM_USER_SKIP_LIST: list = ['radius_user', 'radius_priv_user', 'tacacs0', 'tacacs1',
70-
'tacacs2', 'tacacs3', 'tacacs4', 'tacacs5', 'tacacs6',
71-
'tacacs7', 'tacacs8', 'tacacs9', 'tacacs10',' tacacs11',
72-
'tacacs12', 'tacacs13', 'tacacs14', 'tacacs15']
73-
74-
def get_local_users(min_uid=MIN_USER_UID, max_uid=MAX_USER_UID):
75-
"""Return list of dynamically allocated users (see Debian Policy Manual)"""
76-
local_users = []
77-
for s_user in getpwall():
78-
if getpwnam(s_user.pw_name).pw_uid < min_uid:
79-
continue
80-
if getpwnam(s_user.pw_name).pw_uid > max_uid:
81-
continue
82-
if s_user.pw_name in SYSTEM_USER_SKIP_LIST:
83-
continue
84-
local_users.append(s_user.pw_name)
85-
86-
return local_users
66+
8767

8868
def get_shadow_password(username):
8969
with open('/etc/shadow') as f:
@@ -342,7 +322,7 @@ def apply(login):
342322
# crazy user will choose username root or any other system user which will fail.
343323
#
344324
# XXX: Should we deny using root at all?
345-
home_dir = getpwnam(user).pw_dir
325+
home_dir = get_user_home_dir(user)
346326
# always re-render SSH keys with appropriate permissions
347327
render(f'{home_dir}/.ssh/authorized_keys', 'login/authorized_keys.j2',
348328
user_config, permission=0o600,
@@ -397,7 +377,7 @@ def apply(login):
397377
# Disable user to prevent re-login
398378
call(f'usermod -s /sbin/nologin {user}')
399379

400-
home_dir = getpwnam(user).pw_dir
380+
home_dir = get_user_home_dir(user)
401381
# Remove SSH authorized keys file
402382
authorized_keys_file = f'{home_dir}/.ssh/authorized_keys'
403383
if os.path.exists(authorized_keys_file):

src/op_mode/image_installer.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
from vyos.utils.file import read_file
5858
from vyos.utils.file import write_file
5959
from vyos.utils.process import cmd, run, rc_cmd
60+
from vyos.utils.auth import get_local_users
61+
from vyos.utils.auth import get_user_home_dir
6062
from vyos.version import get_version_data
6163

6264
# define text messages
@@ -715,6 +717,20 @@ def copy_ssh_host_keys() -> bool:
715717
return False
716718

717719

720+
def copy_ssh_known_hosts() -> bool:
721+
"""Ask user to copy SSH `known_hosts` files
722+
723+
Returns:
724+
bool: user's decision
725+
"""
726+
known_hosts_files = get_known_hosts_files()
727+
msg = (
728+
'Would you like to save the SSH known hosts (fingerprints) '
729+
'from your current configuration?'
730+
)
731+
return known_hosts_files and ask_yes_no(msg, default=True)
732+
733+
718734
def console_hint() -> str:
719735
pid = getppid() if 'SUDO_USER' in environ else getpid()
720736
try:
@@ -1010,6 +1026,40 @@ def install_image() -> None:
10101026
exit(1)
10111027

10121028

1029+
def get_known_hosts_files() -> list:
1030+
"""Collect all existing `known_hosts` files for root and users under /home"""
1031+
1032+
files = []
1033+
1034+
# for root
1035+
base_files = ('/root/.ssh/known_hosts', '/etc/ssh/ssh_known_hosts')
1036+
for file_path in base_files:
1037+
root_known_hosts = Path(file_path)
1038+
if root_known_hosts.exists():
1039+
files.append(root_known_hosts)
1040+
1041+
# for each non-system user
1042+
for user in get_local_users():
1043+
home_dir = Path(get_user_home_dir(user))
1044+
if home_dir.exists():
1045+
known_hosts = home_dir / '.ssh' / 'known_hosts'
1046+
if known_hosts.exists():
1047+
files.append(known_hosts)
1048+
1049+
return files
1050+
1051+
1052+
def migrate_known_hosts(target_dir: str):
1053+
"""Copy `known_hosts` for root and all users to the new image directory"""
1054+
1055+
known_hosts_files = get_known_hosts_files()
1056+
for known_hosts_file in known_hosts_files:
1057+
# copy file to target directory
1058+
target_known_hosts = Path(f'{target_dir}{known_hosts_file}')
1059+
target_known_hosts.parent.mkdir(parents=True, exist_ok=True)
1060+
copy(known_hosts_file, target_known_hosts)
1061+
1062+
10131063
@compat.grub_cfg_update
10141064
def add_image(image_path: str, vrf: str = None, username: str = '',
10151065
password: str = '', no_prompt: bool = False, force: bool = False) -> None:
@@ -1115,6 +1165,11 @@ def add_image(image_path: str, vrf: str = None, username: str = '',
11151165
for host_key in host_keys:
11161166
copy(host_key, target_ssh_dir)
11171167

1168+
target_ssh_known_hosts_dir: str = f'{root_dir}/boot/{image_name}/rw'
1169+
if no_prompt or copy_ssh_known_hosts():
1170+
print('Copying SSH known_hosts files')
1171+
migrate_known_hosts(target_ssh_known_hosts_dir)
1172+
11181173
# copy system image and kernel files
11191174
print('Copying system image files')
11201175
for file in Path(f'{DIR_ISO_MOUNT}/live').iterdir():

src/tests/test_utils.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@
1212
# You should have received a copy of the GNU General Public License
1313
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1414

15+
import pwd
1516
from unittest import TestCase
17+
18+
from vyos.utils import auth
19+
20+
1621
class TestVyOSUtils(TestCase):
1722
def test_key_mangling(self):
1823
from vyos.utils.dict import mangle_dict_keys
@@ -24,3 +29,45 @@ def test_key_mangling(self):
2429
def test_sysctl_read(self):
2530
from vyos.utils.system import sysctl_read
2631
self.assertEqual(sysctl_read('net.ipv4.conf.lo.forwarding'), '1')
32+
33+
34+
class TestVyOSUtilsAuth(TestCase):
35+
36+
def test_get_local_users_returns_existing_usernames(self):
37+
# Returned users exist, skip list is excluded, and UIDs are in range
38+
39+
all_users = set(s_user.pw_name for s_user in pwd.getpwall())
40+
local_users = auth.get_local_users()
41+
42+
# All returned users must really exist
43+
for user in local_users:
44+
self.assertIn(user, all_users)
45+
46+
# Nobody in the skip list
47+
for skipped in auth.SYSTEM_USER_SKIP_LIST:
48+
self.assertNotIn(skipped, local_users)
49+
50+
# All are within UID range
51+
for s_user in pwd.getpwall():
52+
if s_user.pw_name in local_users:
53+
self.assertGreaterEqual(s_user.pw_uid, auth.MIN_USER_UID)
54+
self.assertLessEqual(s_user.pw_uid, auth.MAX_USER_UID)
55+
56+
def test_get_user_home_dir_for_real_user(self):
57+
# User's homedir is a non-empty string for a valid user
58+
59+
local_users = auth.get_local_users()
60+
if local_users:
61+
for user in local_users:
62+
home_dir = auth.get_user_home_dir(user)
63+
self.assertIsInstance(home_dir, str)
64+
self.assertTrue(bool(home_dir)) # Should not be empty
65+
else:
66+
self.skipTest("No suitable non-system users found on this system")
67+
68+
def test_get_user_home_dir_invalid_user(self):
69+
# Raises KeyError for nonexistent username
70+
71+
user = "__this_user_does_not_exist__" # Test using unlikely username
72+
with self.assertRaises(KeyError):
73+
auth.get_user_home_dir(user)

0 commit comments

Comments
 (0)