|
| 1 | +# Copyright VyOS maintainers and contributors <[email protected]> |
| 2 | +# |
| 3 | +# This library is free software; you can redistribute it and/or |
| 4 | +# modify it under the terms of the GNU Lesser General Public |
| 5 | +# License as published by the Free Software Foundation; either |
| 6 | +# version 2.1 of the License, or (at your option) any later version. |
| 7 | +# |
| 8 | +# This library is distributed in the hope that it will be useful, |
| 9 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 10 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| 11 | +# Lesser General Public License for more details. |
| 12 | +# |
| 13 | +# You should have received a copy of the GNU Lesser General Public |
| 14 | +# License along with this library. If not, see <http://www.gnu.org/licenses/>. |
| 15 | + |
| 16 | +from vyos.utils.dict import dict_search |
| 17 | +from vyos.utils.dict import dict_search_recursive |
| 18 | +from vyos.utils.dict import dict_search_recursive_values |
| 19 | +from vyos.utils.dict import dict_set_nested |
| 20 | + |
| 21 | + |
| 22 | +def is_subsequence(check, path): |
| 23 | + """ |
| 24 | + Return True if all items in 'check' appear in order within 'path'. |
| 25 | +
|
| 26 | + Items do not need to be contiguous; only their relative order must match. |
| 27 | + This implementation performs a single forward pass over 'path'. |
| 28 | +
|
| 29 | + Parameters: |
| 30 | + check: Sequence of items to find, in the given order. |
| 31 | + path: Iterable to search for the ordered items. |
| 32 | +
|
| 33 | + Returns: |
| 34 | + bool: True if 'check' is an ordered subsequence of 'path', otherwise False. |
| 35 | + """ |
| 36 | + it = iter(path) |
| 37 | + return all(item in it for item in check) |
| 38 | + |
| 39 | + |
| 40 | +def check_warning_or_error( |
| 41 | + match: list[str], |
| 42 | + dep: dict, |
| 43 | + phrases: list[str] = [], |
| 44 | + ignore: str = '', |
| 45 | + is_warning: bool = False, |
| 46 | +): |
| 47 | + """ |
| 48 | + Classify a dependency match as a warning or an error and record it. |
| 49 | +
|
| 50 | + If 'is_warning' is True, the match is appended to |
| 51 | + dep['dependencies']['warnings'] and no further checks are performed. |
| 52 | + Otherwise, if any phrase in 'phrases' appears as an ordered subsequence |
| 53 | + within 'match' (space-splitting each phrase), the match is recorded as a |
| 54 | + warning. If no phrase matches, the match is recorded as an error unless |
| 55 | + 'ignore' is set and appears as an ordered subsequence within 'match'. |
| 56 | +
|
| 57 | + Parameters: |
| 58 | + match: List of tokens describing the configuration path for the hit. |
| 59 | + dep: Accumulator dict with 'dependencies.warnings' and |
| 60 | + 'dependencies.errors' lists to be mutated in-place. |
| 61 | + phrases: List of phrases that downgrade a hit to a warning when their |
| 62 | + tokens appear in order within 'match'. |
| 63 | + ignore: Optional phrase; if its tokens appear in order within 'match', |
| 64 | + suppresses adding the hit to errors. |
| 65 | + is_warning: Force classification as a warning (bypasses phrase/ignore checks). |
| 66 | +
|
| 67 | + Returns: |
| 68 | + updates 'dep' in-place. |
| 69 | + """ |
| 70 | + if is_warning: |
| 71 | + dep['dependencies']['warnings'].append(match) |
| 72 | + return |
| 73 | + |
| 74 | + for phrase in phrases: |
| 75 | + if is_subsequence(phrase.split(), match): |
| 76 | + dep['dependencies']['warnings'].append(match) |
| 77 | + break |
| 78 | + else: |
| 79 | + if not ignore or not is_subsequence(ignore.split(), match): |
| 80 | + dep['dependencies']['errors'].append(match) |
| 81 | + |
| 82 | + |
| 83 | +def verify_interface_dependencies(conf: dict, interface: str, ignore: str = ''): |
| 84 | + """ |
| 85 | + Analyze configuration to find and classify references to an interface. |
| 86 | +
|
| 87 | + The function scans relevant top-level sections (container, firewall, |
| 88 | + interfaces, nat, nat66, policy, protocols, qos, service, system) for any |
| 89 | + occurrences of 'interface'. Each hit is routed through |
| 90 | + check_warning_or_error to determine whether it should be recorded as a |
| 91 | + warning or an error. The optional 'ignore' phrase can suppress specific |
| 92 | + error entries when its tokens appear in order within a matched path. |
| 93 | +
|
| 94 | + Parameters: |
| 95 | + conf: Configuration dictionary to search. |
| 96 | + interface: Interface name to look for (for example, "br0"). |
| 97 | + ignore: Optional phrase whose ordered tokens suppress adding a hit to errors. |
| 98 | +
|
| 99 | + Returns: |
| 100 | + dict: Summary with optional keys: |
| 101 | + - 'warnings' (bool) and 'warnings_msg' (str) |
| 102 | + - 'errors' (bool) and 'errors_msg' (str) |
| 103 | + """ |
| 104 | + container = dict_search('container', conf) |
| 105 | + firewall = dict_search('firewall', conf) |
| 106 | + interfaces = dict_search('interfaces', conf) |
| 107 | + nat = dict_search('nat', conf) |
| 108 | + nat66 = dict_search('nat66', conf) |
| 109 | + policy = dict_search('policy', conf) |
| 110 | + protocols = dict_search('protocols', conf) |
| 111 | + qos = dict_search('qos', conf) |
| 112 | + service = dict_search('service', conf) |
| 113 | + system = dict_search('system', conf) |
| 114 | + |
| 115 | + dep = {} |
| 116 | + |
| 117 | + dict_set_nested(f'dependencies.warnings', [], dep) |
| 118 | + dict_set_nested(f'dependencies.errors', [], dep) |
| 119 | + |
| 120 | + ########## Container ########## |
| 121 | + for container_match in dict_search_recursive_values(container, interface): |
| 122 | + check_warning_or_error(['container', *container_match], dep, is_warning=True) |
| 123 | + for found_name, found_path in dict_search_recursive(container, interface): |
| 124 | + check_warning_or_error(['container', *found_path], dep) |
| 125 | + |
| 126 | + ########## Firewall ########## |
| 127 | + for fw_match in dict_search_recursive_values(firewall, interface): |
| 128 | + check_warning_or_error(['firewall', *fw_match], dep, is_warning=True) |
| 129 | + for found_name, found_path in dict_search_recursive(firewall, interface): |
| 130 | + check_warning_or_error(['firewall', *found_path], dep) |
| 131 | + |
| 132 | + ########## Interfaces ########## |
| 133 | + for int_match in dict_search_recursive_values(interfaces, interface): |
| 134 | + check_warning_or_error( |
| 135 | + ['interfaces', *int_match], dep, ignore=ignore, is_warning=True |
| 136 | + ) |
| 137 | + for found_name, found_path in dict_search_recursive(interfaces, interface): |
| 138 | + check_warning_or_error(['interfaces', *found_path], dep, ignore=ignore) |
| 139 | + |
| 140 | + ########## Nat ########## |
| 141 | + nat_warning_list = ["nat source", "nat destination"] |
| 142 | + for nat_match in dict_search_recursive_values(nat, interface): |
| 143 | + check_warning_or_error(['nat', *nat_match], dep, is_warning=True) |
| 144 | + for found_name, found_path in dict_search_recursive(nat, interface): |
| 145 | + check_warning_or_error(['nat', *found_path], dep, nat_warning_list) |
| 146 | + |
| 147 | + ########## Nat66 ########## |
| 148 | + nat66_warning_list = ["nat66 source", "nat66 destination"] |
| 149 | + for nat66_match in dict_search_recursive_values(nat66, interface): |
| 150 | + check_warning_or_error(['nat66', *nat66_match], dep, is_warning=True) |
| 151 | + for found_name, found_path in dict_search_recursive(nat66, interface): |
| 152 | + check_warning_or_error(['nat66', *found_path], dep, nat66_warning_list) |
| 153 | + |
| 154 | + ########## Policy ########## |
| 155 | + policy_warning_list = ["policy route interface"] |
| 156 | + for policy_match in dict_search_recursive_values(policy, interface): |
| 157 | + check_warning_or_error(['policy', *policy_match], dep, is_warning=True) |
| 158 | + for found_name, found_path in dict_search_recursive(policy, interface): |
| 159 | + check_warning_or_error(['policy', *found_path], dep, policy_warning_list) |
| 160 | + |
| 161 | + ########## Protocols ########## |
| 162 | + proto_warning_list = [ |
| 163 | + "protocols static", |
| 164 | + "protocols babel", |
| 165 | + "protocols bfd", |
| 166 | + "protocols bgp", |
| 167 | + "protocols failover", |
| 168 | + "protocols rip", |
| 169 | + ] |
| 170 | + for proto_match in dict_search_recursive_values(protocols, interface): |
| 171 | + check_warning_or_error(['protocols', *proto_match], dep, is_warning=True) |
| 172 | + for found_name, found_path in dict_search_recursive(protocols, interface): |
| 173 | + check_warning_or_error(['protocols', *found_path], dep, proto_warning_list) |
| 174 | + |
| 175 | + ########## QoS ########## |
| 176 | + qos_warning_list = ["qos source", "qos destination"] |
| 177 | + for qos_match in dict_search_recursive_values(qos, interface): |
| 178 | + check_warning_or_error(['qos', *qos_match], dep, is_warning=True) |
| 179 | + for found_name, found_path in dict_search_recursive(qos, interface): |
| 180 | + check_warning_or_error(['qos', *found_path], dep, qos_warning_list) |
| 181 | + |
| 182 | + ########## Services ########## |
| 183 | + service_warning_list = [ |
| 184 | + "service dns dynamic", |
| 185 | + "service pppoe_server", |
| 186 | + "service lldp", |
| 187 | + "service suricata", |
| 188 | + ] |
| 189 | + for service_match in dict_search_recursive_values(service, interface): |
| 190 | + check_warning_or_error(['service', *service_match], dep, is_warning=True) |
| 191 | + for found_name, found_path in dict_search_recursive(service, interface): |
| 192 | + check_warning_or_error(['service', *found_path], dep, service_warning_list) |
| 193 | + |
| 194 | + ########## System ########## |
| 195 | + system_warning_list = [ |
| 196 | + "system flow_accounting", |
| 197 | + "system name-server", |
| 198 | + "system sflow", |
| 199 | + ] |
| 200 | + for system_match in dict_search_recursive_values(system, interface): |
| 201 | + check_warning_or_error(['system', *system_match], dep, is_warning=True) |
| 202 | + for found_name, found_path in dict_search_recursive(system, interface): |
| 203 | + check_warning_or_error(['system', *found_path], dep, system_warning_list) |
| 204 | + |
| 205 | + out = {} |
| 206 | + dependency_warnings = dict_search('dependencies.warnings', dep) |
| 207 | + dependency_errors = dict_search('dependencies.errors', dep) |
| 208 | + |
| 209 | + if dependency_warnings: |
| 210 | + warning_paths = "\n".join("- " + " ".join(dep) for dep in dependency_warnings) |
| 211 | + msg = f"{interface} is configured in the following configuration paths:\n{warning_paths}" |
| 212 | + out['warnings'] = True |
| 213 | + out['warnings_msg'] = msg |
| 214 | + if dependency_errors: |
| 215 | + error_paths = "\n".join("- " + " ".join(dep) for dep in dependency_errors) |
| 216 | + msg = ( |
| 217 | + f"{interface} can't be deleted while configured in the following configuration paths:" |
| 218 | + "\n" |
| 219 | + f"{error_paths}" |
| 220 | + ) |
| 221 | + out['errors'] = True |
| 222 | + out['errors_msg'] = msg |
| 223 | + return out |
0 commit comments