Skip to content

Commit eb146e1

Browse files
image: T5455: Add migration of SSH known_hosts files during image upgrade
During upgrade, the script now checks if any `known_hosts` files exist. If so, it prompts the user to save these SSH fingerprints, and upon confirmation, copies the files to the new image persistence directory.
1 parent b1b4545 commit eb146e1

File tree

6 files changed

+319
-30
lines changed

6 files changed

+319
-30
lines changed

python/vyos/utils/auth.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,35 @@
2020

2121
from enum import StrEnum
2222
from decimal import Decimal
23+
from pwd import getpwall
24+
from pwd import getpwnam
2325
from vyos.utils.process import cmd
2426

25-
27+
# Minimum UID used when adding system users
28+
MIN_USER_UID: int = 1000
29+
# Maximim UID used when adding system users
30+
MAX_USER_UID: int = 59999
31+
# List of local user accounts that must be preserved
32+
SYSTEM_USER_SKIP_LIST: frozenset = {
33+
'radius_user',
34+
'radius_priv_user',
35+
'tacacs0',
36+
'tacacs1',
37+
'tacacs2',
38+
'tacacs3',
39+
'tacacs4',
40+
'tacacs5',
41+
'tacacs6',
42+
'tacacs7',
43+
'tacacs8',
44+
'tacacs9',
45+
'tacacs10',
46+
'tacacs11',
47+
'tacacs12',
48+
'tacacs13',
49+
'tacacs14',
50+
'tacacs15',
51+
}
2652
DEFAULT_PASSWORD: str = 'vyos'
2753
LOW_ENTROPY_MSG: str = 'should be at least 8 characters long;'
2854
WEAK_PASSWORD_MSG: str = 'The password complexity is too low - @MSG@'
@@ -119,3 +145,24 @@ def get_current_user() -> str:
119145
elif 'USER' in os.environ:
120146
current_user = os.environ['USER']
121147
return current_user
148+
149+
150+
def get_local_users(min_uid=MIN_USER_UID, max_uid=MAX_USER_UID) -> list:
151+
"""Return list of dynamically allocated users (see Debian Policy Manual)"""
152+
local_users = []
153+
154+
for s_user in getpwall():
155+
if s_user.pw_uid < min_uid:
156+
continue
157+
if s_user.pw_uid > max_uid:
158+
continue
159+
if s_user.pw_name in SYSTEM_USER_SKIP_LIST:
160+
continue
161+
local_users.append(s_user.pw_name)
162+
163+
return local_users
164+
165+
166+
def get_user_home_dir(user: str) -> str:
167+
"""Return user's home directory"""
168+
return getpwnam(user).pw_dir

python/vyos/utils/file.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
1515

1616
import os
17+
import shutil
18+
1719
from vyos.utils.permission import chown
1820

1921
def makedir(path, user=None, group=None):
@@ -185,3 +187,47 @@ def wait_for_file_write_complete(file_path, pre_hook=None, timeout=None, sleep_i
185187
""" Waits for a process to close a file after opening it in write mode. """
186188
wait_for_inotify(file_path,
187189
event_type='IN_CLOSE_WRITE', pre_hook=pre_hook, timeout=timeout, sleep_interval=sleep_interval)
190+
191+
192+
def copy_recursive(src: str, dst: str, overwrite: bool = False):
193+
"""
194+
Recursively copy files from `src` to `dst`.
195+
196+
:param src: Source directory
197+
:param dst: Destination directory
198+
:param overwrite: If True, overwrite existing files. If False, skip them.
199+
"""
200+
201+
if not os.path.exists(src):
202+
raise FileNotFoundError(f"Source path does not exist: {src}")
203+
204+
os.makedirs(dst, exist_ok=True) # Create destination directory if not exists
205+
206+
for root, _, files in os.walk(src):
207+
# Find relative path to maintain directory structure
208+
rel_path = os.path.relpath(root, src)
209+
target_dir = os.path.join(dst, rel_path) if rel_path != "." else dst
210+
211+
os.makedirs(target_dir, exist_ok=True)
212+
213+
for file in files:
214+
src_file = os.path.join(root, file)
215+
dst_file = os.path.join(target_dir, file)
216+
217+
if not os.path.exists(dst_file) or overwrite:
218+
shutil.copy2(src_file, dst_file)
219+
220+
221+
def move_recursive(src: str, dst: str, overwrite=False):
222+
"""
223+
Recursively move files from `src` to `dst` and removing the source.
224+
225+
:param src: Source directory
226+
:param dst: Destination directory
227+
:param overwrite: If True, overwrite existing files. If False, skip them.
228+
"""
229+
if not os.path.exists(src):
230+
raise FileNotFoundError(f"Source path does not exist: {src}")
231+
232+
copy_recursive(src, dst, overwrite=overwrite)
233+
shutil.rmtree(src)

src/conf_mode/system_login.py

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
from passlib.hosts import linux_context
2222
from psutil import users
2323
from pwd import getpwall
24-
from pwd import getpwnam
2524
from pwd import getpwuid
2625
from sys import exit
2726
from time import sleep
@@ -36,9 +35,13 @@
3635
from vyos.utils.auth import EPasswdStrength
3736
from vyos.utils.auth import evaluate_strength
3837
from vyos.utils.auth import get_current_user
38+
from vyos.utils.auth import get_local_users
39+
from vyos.utils.auth import get_user_home_dir
40+
from vyos.utils.auth import MIN_USER_UID
3941
from vyos.utils.configfs import delete_cli_node
4042
from vyos.utils.configfs import add_cli_node
4143
from vyos.utils.dict import dict_search
44+
from vyos.utils.file import move_recursive
4245
from vyos.utils.permission import chown
4346
from vyos.utils.process import cmd
4447
from vyos.utils.process import call
@@ -55,10 +58,6 @@
5558
tacacs_nss_config_file = "/etc/tacplus_nss.conf"
5659
nss_config_file = "/etc/nsswitch.conf"
5760

58-
# Minimum UID used when adding system users
59-
MIN_USER_UID: int = 1000
60-
# Maximim UID used when adding system users
61-
MAX_USER_UID: int = 59999
6261
# LOGIN_TIMEOUT from /etc/loign.defs minus 10 sec
6362
MAX_RADIUS_TIMEOUT: int = 50
6463
# MAX_RADIUS_TIMEOUT divided by 2 sec (minimum recomended timeout)
@@ -67,25 +66,7 @@
6766
MAX_TACACS_COUNT: int = 8
6867
# Minimum USER id for TACACS users
6968
MIN_TACACS_UID = 900
70-
# List of local user accounts that must be preserved
71-
SYSTEM_USER_SKIP_LIST: list = ['radius_user', 'radius_priv_user', 'tacacs0', 'tacacs1',
72-
'tacacs2', 'tacacs3', 'tacacs4', 'tacacs5', 'tacacs6',
73-
'tacacs7', 'tacacs8', 'tacacs9', 'tacacs10',' tacacs11',
74-
'tacacs12', 'tacacs13', 'tacacs14', 'tacacs15']
75-
76-
def get_local_users(min_uid=MIN_USER_UID, max_uid=MAX_USER_UID):
77-
"""Return list of dynamically allocated users (see Debian Policy Manual)"""
78-
local_users = []
79-
for s_user in getpwall():
80-
if getpwnam(s_user.pw_name).pw_uid < min_uid:
81-
continue
82-
if getpwnam(s_user.pw_name).pw_uid > max_uid:
83-
continue
84-
if s_user.pw_name in SYSTEM_USER_SKIP_LIST:
85-
continue
86-
local_users.append(s_user.pw_name)
87-
88-
return local_users
69+
8970

9071
def get_shadow_password(username):
9172
with open('/etc/shadow') as f:
@@ -364,9 +345,10 @@ def apply(login):
364345
tmp = dict_search('full_name', user_config)
365346
if tmp: command += f" --comment '{tmp}'"
366347

367-
tmp = dict_search('home_directory', user_config)
368-
if tmp: command += f" --home '{tmp}'"
369-
else: command += f" --home '/home/{user}'"
348+
home_directory = dict_search('home_directory', user_config)
349+
if not home_directory:
350+
home_directory = f'/home/{user}'
351+
command += f" --home '{home_directory}'"
370352

371353
if 'operator' not in user_config:
372354
command += f' --groups frr,frrvty,vyattacfg,sudo,adm,dip,disk,_kea'
@@ -379,7 +361,7 @@ def apply(login):
379361
# crazy user will choose username root or any other system user which will fail.
380362
#
381363
# XXX: Should we deny using root at all?
382-
home_dir = getpwnam(user).pw_dir
364+
home_dir = get_user_home_dir(user)
383365
# always re-render SSH keys with appropriate permissions
384366
render(f'{home_dir}/.ssh/authorized_keys', 'login/authorized_keys.j2',
385367
user_config, permission=0o600,
@@ -399,6 +381,17 @@ def apply(login):
399381
except Exception as e:
400382
raise ConfigError(f'Adding user "{user}" raised exception: "{e}"')
401383

384+
# After invoking 'useradd' for each user, if /var/.users_backups/{user} exists, restore the
385+
# backed up files to the newly created home directory. This reinstates the user's
386+
# SSH environment and avoids loss of access or trust relationships due to the user
387+
# creation process, which does not copy such custom files by default.
388+
#
389+
# More details: https://github.com/vyos/vyos-1x/pull/4678#pullrequestreview-3169648265
390+
backup_directory = f"/var/.users_backups/{user}"
391+
if command.startswith('useradd') and os.path.exists(backup_directory):
392+
move_recursive(backup_directory, home_dir)
393+
chown(home_dir, user=user, group='users', recursive=True)
394+
402395
# T5875: ensure UID is properly set on home directory if user is re-added
403396
# the home directory will always exist, as it's created above by --create-home,
404397
# retrieve current owner of home directory and adjust on demand
@@ -434,7 +427,7 @@ def apply(login):
434427
# Disable user to prevent re-login
435428
call(f'usermod -s /sbin/nologin {user}')
436429

437-
home_dir = getpwnam(user).pw_dir
430+
home_dir = get_user_home_dir(user)
438431
# Remove SSH authorized keys file
439432
authorized_keys_file = f'{home_dir}/.ssh/authorized_keys'
440433
if os.path.exists(authorized_keys_file):

src/op_mode/image_installer.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
from vyos.utils.file import read_file
5858
from vyos.utils.file import write_file
5959
from vyos.utils.process import cmd, run, rc_cmd
60+
from vyos.utils.auth import get_local_users
61+
from vyos.utils.auth import get_user_home_dir
6062
from vyos.version import get_version_data
6163

6264
# define text messages
@@ -715,6 +717,20 @@ def copy_ssh_host_keys() -> bool:
715717
return False
716718

717719

720+
def copy_ssh_known_hosts() -> bool:
721+
"""Ask user to copy SSH `known_hosts` files
722+
723+
Returns:
724+
bool: user's decision
725+
"""
726+
known_hosts_files = get_known_hosts_files()
727+
msg = (
728+
'Would you like to save the SSH known hosts (fingerprints) '
729+
'from your current configuration?'
730+
)
731+
return known_hosts_files and ask_yes_no(msg, default=True)
732+
733+
718734
def console_hint() -> str:
719735
pid = getppid() if 'SUDO_USER' in environ else getpid()
720736
try:
@@ -1010,6 +1026,55 @@ def install_image() -> None:
10101026
exit(1)
10111027

10121028

1029+
def get_known_hosts_files(for_root=True, for_users=True) -> list:
1030+
"""Collect all existing `known_hosts` files for root and/or users under /home"""
1031+
1032+
files = []
1033+
1034+
if for_root:
1035+
base_files = ('/root/.ssh/known_hosts', '/etc/ssh/ssh_known_hosts')
1036+
for file_path in base_files:
1037+
root_known_hosts = Path(file_path)
1038+
if root_known_hosts.exists():
1039+
files.append(root_known_hosts)
1040+
1041+
if for_users: # for each non-system user
1042+
for user in get_local_users():
1043+
home_dir = Path(get_user_home_dir(user))
1044+
if home_dir.exists():
1045+
known_hosts = home_dir / '.ssh' / 'known_hosts'
1046+
if known_hosts.exists():
1047+
files.append(known_hosts)
1048+
1049+
return files
1050+
1051+
1052+
def migrate_known_hosts(target_dir: str):
1053+
"""Copy `known_hosts` for root and all users to the new image directory"""
1054+
1055+
def _mkdir_and_copy_file(known_hosts_file, target_known_hosts):
1056+
target_known_hosts.parent.mkdir(parents=True, exist_ok=True)
1057+
copy(known_hosts_file, target_known_hosts)
1058+
1059+
# Copy root only files using default path
1060+
known_hosts_files = get_known_hosts_files(for_root=True, for_users=False)
1061+
for known_hosts_file in known_hosts_files:
1062+
target_known_hosts = Path(f'{target_dir}{known_hosts_file}')
1063+
_mkdir_and_copy_file(known_hosts_file, target_known_hosts)
1064+
1065+
# During image installation, backup critical user-specific files (e.g., known_hosts)
1066+
# from each user's home directory into /var/.users_backups/{user}. This ensures that their
1067+
# SSH configuration and trust relationships are preserved across system re-installations
1068+
# or provisioning.
1069+
# More details: https://github.com/vyos/vyos-1x/pull/4678#pullrequestreview-3169648265
1070+
known_hosts_files = get_known_hosts_files(for_root=False, for_users=True)
1071+
for known_hosts_file in known_hosts_files:
1072+
username = known_hosts_file.parent.parent.name
1073+
base_dir = Path(f'{target_dir}/var/.users_backups/{username}')
1074+
target_known_hosts = base_dir / '.ssh' / 'known_hosts'
1075+
_mkdir_and_copy_file(known_hosts_file, target_known_hosts)
1076+
1077+
10131078
@compat.grub_cfg_update
10141079
def add_image(image_path: str, vrf: str = None, username: str = '',
10151080
password: str = '', no_prompt: bool = False, force: bool = False) -> None:
@@ -1115,6 +1180,11 @@ def add_image(image_path: str, vrf: str = None, username: str = '',
11151180
for host_key in host_keys:
11161181
copy(host_key, target_ssh_dir)
11171182

1183+
target_ssh_known_hosts_dir: str = f'{root_dir}/boot/{image_name}/rw'
1184+
if no_prompt or copy_ssh_known_hosts():
1185+
print('Copying SSH known_hosts files')
1186+
migrate_known_hosts(target_ssh_known_hosts_dir)
1187+
11181188
# copy system image and kernel files
11191189
print('Copying system image files')
11201190
for file in Path(f'{DIR_ISO_MOUNT}/live').iterdir():

src/tests/test_utils.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@
1212
# You should have received a copy of the GNU General Public License
1313
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1414

15+
import pwd
1516
from unittest import TestCase
17+
18+
from vyos.utils import auth
19+
20+
1621
class TestVyOSUtils(TestCase):
1722
def test_key_mangling(self):
1823
from vyos.utils.dict import mangle_dict_keys
@@ -24,3 +29,45 @@ def test_key_mangling(self):
2429
def test_sysctl_read(self):
2530
from vyos.utils.system import sysctl_read
2631
self.assertEqual(sysctl_read('net.ipv4.conf.lo.forwarding'), '1')
32+
33+
34+
class TestVyOSUtilsAuth(TestCase):
35+
36+
def test_get_local_users_returns_existing_usernames(self):
37+
# Returned users exist, skip list is excluded, and UIDs are in range
38+
39+
all_users = set(s_user.pw_name for s_user in pwd.getpwall())
40+
local_users = auth.get_local_users()
41+
42+
# All returned users must really exist
43+
for user in local_users:
44+
self.assertIn(user, all_users)
45+
46+
# Nobody in the skip list
47+
for skipped in auth.SYSTEM_USER_SKIP_LIST:
48+
self.assertNotIn(skipped, local_users)
49+
50+
# All are within UID range
51+
for s_user in pwd.getpwall():
52+
if s_user.pw_name in local_users:
53+
self.assertGreaterEqual(s_user.pw_uid, auth.MIN_USER_UID)
54+
self.assertLessEqual(s_user.pw_uid, auth.MAX_USER_UID)
55+
56+
def test_get_user_home_dir_for_real_user(self):
57+
# User's homedir is a non-empty string for a valid user
58+
59+
local_users = auth.get_local_users()
60+
if local_users:
61+
for user in local_users:
62+
home_dir = auth.get_user_home_dir(user)
63+
self.assertIsInstance(home_dir, str)
64+
self.assertTrue(bool(home_dir)) # Should not be empty
65+
else:
66+
self.skipTest("No suitable non-system users found on this system")
67+
68+
def test_get_user_home_dir_invalid_user(self):
69+
# Raises KeyError for nonexistent username
70+
71+
user = "__this_user_does_not_exist__" # Test using unlikely username
72+
with self.assertRaises(KeyError):
73+
auth.get_user_home_dir(user)

0 commit comments

Comments
 (0)