diff --git a/docs/release_7_0_0.md b/docs/release_7_0_0.md new file mode 100644 index 00000000..4c2cb39c --- /dev/null +++ b/docs/release_7_0_0.md @@ -0,0 +1,46 @@ +# Removed usage of btool command from solnlib + +As of version 7.0.0, the `btool` command has been removed from solnlib. Configuration stanzas and keys should now be accessed via the REST API. +Additionally, the `splunkenv` module can only be used in environments where Splunk is installed, as it relies on Splunk-specific methods for making internal calls. + +## Session key is now mandatory in some of the functions + +The affected functions now require a valid `session_key` to operate correctly. While solnlib attempts to retrieve the `session_key` automatically, +there are scenarios—such as within a modular input script—where this is not possible. In such cases, you must explicitly provide the `session_key` +to ensure proper authorization. Affected functions are: + +* `get_splunk_host_info` +* `get_splunkd_access_info` +* `get_scheme_from_hec_settings` +* `get_splunkd_uri` +* `get_conf_key_value` +* `get_conf_stanza` + +## Changed arguments in `get_conf_key_value` and `get_conf_stanza` + +As of version 7.0.0, the following changes have been made to the function: + +`get_conf_key_value` now requires 4 mandatory arguments: + +* `conf_name` +* `stanza` +* `key` +* `app_name` (new) + +`get_conf_stanza` now requires 3 mandatory arguments: + +* `conf_name` +* `stanza` +* `app_name` (new) + +Both functions also accept the following optional arguments: + +* `session_key` - Used for authentication. If not provided, a 401 Unauthorized error may occur depending on the context. +* `users` - Limits results returned by the configuration endpoint. Defaults to `nobody`. +* `raw_output` - If set to `True`, the full decoded JSON response is returned. +This should be enabled when `app_name` is set to the global context `(/-/)`, as the Splunk REST API may return multiple entries in that case. + +## The `get_session_key` function has been removed from solnlib + +This function relied on reading the `scheme`, `host` and `port` using the deprecated btool utility, which is no longer supported. + diff --git a/mkdocs.yml b/mkdocs.yml index f0742b0e..5e24644d 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -31,6 +31,7 @@ plugins: nav: - Home: index.md - Release 6.0.0: release_6_0_0.md + - Release 7.0.0: release_7_0_0.md - References: - modular_input: - "checkpointer.py": modular_input/checkpointer.md diff --git a/pyproject.toml b/pyproject.toml index 3654662c..2b39efe9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ [tool.poetry] name = "solnlib" -version = "6.3.0" +version = "7.0.0-beta.1" description = "The Splunk Software Development Kit for Splunk Solutions" authors = ["Splunk "] license = "Apache-2.0" diff --git a/solnlib/__init__.py b/solnlib/__init__.py index e14cb03b..1ba3236d 100644 --- a/solnlib/__init__.py +++ b/solnlib/__init__.py @@ -56,4 +56,4 @@ "utils", ] -__version__ = "6.3.0" +__version__ = "7.0.0-beta.1" diff --git a/solnlib/_settings.py b/solnlib/_settings.py new file mode 100644 index 00000000..1f3501de --- /dev/null +++ b/solnlib/_settings.py @@ -0,0 +1,18 @@ +# +# Copyright 2025 Splunk Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""This module provide settings that can be used to disable/switch features.""" + +use_btool = False diff --git a/solnlib/credentials.py b/solnlib/credentials.py index b9ab5b4a..29ee1ff6 100644 --- a/solnlib/credentials.py +++ b/solnlib/credentials.py @@ -16,7 +16,6 @@ """This module contains Splunk credential related interfaces.""" -import json import re import warnings from typing import Dict, List @@ -24,15 +23,12 @@ from splunklib import binding, client from . import splunk_rest_client as rest_client -from .net_utils import validate_scheme_host_port -from .splunkenv import get_splunkd_access_info from .utils import retry __all__ = [ "CredentialException", "CredentialNotExistException", "CredentialManager", - "get_session_key", ] @@ -339,56 +335,3 @@ def _get_all_passwords(self) -> List[Dict[str, str]]: ) passwords = self._storage_passwords.list(count=-1) return self._get_clear_passwords(passwords) - - -@retry(exceptions=[binding.HTTPError]) -def get_session_key( - username: str, - password: str, - scheme: str = None, - host: str = None, - port: int = None, - **context: dict, -) -> str: - """Get splunkd access token. - - Arguments: - username: The Splunk account username, which is used to authenticate the Splunk instance. - password: The Splunk account password. - scheme: (optional) The access scheme, default is None. - host: (optional) The host name, default is None. - port: (optional) The port number, default is None. - context: Other configurations for Splunk rest client. - - Returns: - Splunk session key. - - Raises: - CredentialException: If username/password are invalid. - ValueError: if scheme, host or port are invalid. - - Examples: - >>> get_session_key('user', 'password') - """ - validate_scheme_host_port(scheme, host, port) - - if any([scheme is None, host is None, port is None]): - scheme, host, port = get_splunkd_access_info() - - uri = "{scheme}://{host}:{port}/{endpoint}".format( - scheme=scheme, host=host, port=port, endpoint="services/auth/login" - ) - _rest_client = rest_client.SplunkRestClient( - None, "-", "nobody", scheme, host, port, **context - ) - try: - response = _rest_client.http.post( - uri, username=username, password=password, output_mode="json" - ) - except binding.HTTPError as e: - if e.status != 401: - raise - - raise CredentialException("Invalid username/password.") - - return json.loads(response.body.read())["sessionKey"] diff --git a/solnlib/modular_input/event_writer.py b/solnlib/modular_input/event_writer.py index 1b10e4a6..06a14ae8 100644 --- a/solnlib/modular_input/event_writer.py +++ b/solnlib/modular_input/event_writer.py @@ -23,6 +23,7 @@ import threading import time import traceback +import warnings from abc import ABCMeta, abstractmethod from random import randint from typing import List, Union @@ -39,6 +40,16 @@ __all__ = ["ClassicEventWriter", "HECEventWriter"] +deprecation_msg = ( + "Function 'create_from_token' is deprecated and incompatible with 'global_settings_schema=True'. " + "Use 'create_from_token_with_session_key' instead." +) + + +class FunctionDeprecated(Exception): + pass + + class EventWriter(metaclass=ABCMeta): """Base class of event writer.""" @@ -233,13 +244,13 @@ def __init__( scheme, host, hec_port = utils.extract_http_scheme_host_port(hec_uri) else: if not all([scheme, host, port]): - scheme, host, port = get_splunkd_access_info() + scheme, host, port = get_splunkd_access_info(self._session_key) hec_port, hec_token = self._get_hec_config( hec_input_name, session_key, scheme, host, port, **context ) if global_settings_schema: - scheme = get_scheme_from_hec_settings() + scheme = get_scheme_from_hec_settings(self._session_key) if not context.get("pool_connections"): context["pool_connections"] = 10 @@ -272,6 +283,11 @@ def create_from_token( Created HECEventWriter. """ + warnings.warn(deprecation_msg, DeprecationWarning, stacklevel=2) + + if global_settings_schema: + raise FunctionDeprecated(deprecation_msg) + return HECEventWriter( None, None, diff --git a/solnlib/server_info.py b/solnlib/server_info.py index daa7fa26..56f2c006 100644 --- a/solnlib/server_info.py +++ b/solnlib/server_info.py @@ -78,7 +78,7 @@ def __init__( """ is_localhost = False if not all([scheme, host, port]) and os.environ.get("SPLUNK_HOME"): - scheme, host, port = get_splunkd_access_info() + scheme, host, port = get_splunkd_access_info(session_key) is_localhost = ( host == "localhost" or host == "127.0.0.1" or host in ("::1", "[::1]") ) diff --git a/solnlib/splunk_rest_client.py b/solnlib/splunk_rest_client.py index 7e858248..d960a2e3 100644 --- a/solnlib/splunk_rest_client.py +++ b/solnlib/splunk_rest_client.py @@ -221,7 +221,7 @@ def __init__( """ # Only do splunkd URI discovery in SPLUNK env (SPLUNK_HOME is set). if not all([scheme, host, port]) and os.environ.get("SPLUNK_HOME"): - scheme, host, port = get_splunkd_access_info() + scheme, host, port = get_splunkd_access_info(session_key) if os.environ.get("SPLUNK_HOME") is None: if not all([scheme, host, port]): raise ValueError( diff --git a/solnlib/splunkenv.py b/solnlib/splunkenv.py index d27ef882..09ce12de 100644 --- a/solnlib/splunkenv.py +++ b/solnlib/splunkenv.py @@ -16,16 +16,43 @@ """Splunk platform related utilities.""" + import os import os.path as op import socket import subprocess +import json from configparser import ConfigParser from io import StringIO from typing import List, Optional, Tuple, Union - +import __main__ +from solnlib._settings import use_btool from .utils import is_true +try: + from splunk.rest import simpleRequest +except ImportError: + + def simpleRequest(*args, **kwargs): + raise ImportError("This module requires Splunk to be installed.") + + +try: + from splunk import getSessionKey +except ImportError: + + def getSessionKey(*args, **kwargs): + raise ImportError("This module requires Splunk to be installed.") + + +try: + from splunk.clilib.bundle_paths import make_splunkhome_path as msp +except ImportError: + + def msp(*args, **kwargs): + raise ImportError("This module requires Splunk to be installed.") + + __all__ = [ "make_splunkhome_path", "get_splunk_host_info", @@ -42,56 +69,9 @@ APP_SYSTEM = "system" APP_HEC = "splunk_httpinput" -# See validateSearchHeadPooling() in src/libbundle/ConfSettings.cpp -on_shared_storage = [ - os.path.join(ETC_LEAF, "apps"), - os.path.join(ETC_LEAF, "users"), - os.path.join("var", "run", "splunk", "dispatch"), - os.path.join("var", "run", "splunk", "srtemp"), - os.path.join("var", "run", "splunk", "rss"), - os.path.join("var", "run", "splunk", "scheduler"), - os.path.join("var", "run", "splunk", "lookup_tmp"), -] - - -def _splunk_home(): - return os.path.normpath(os.environ["SPLUNK_HOME"]) - - -def _splunk_etc(): - try: - result = os.environ["SPLUNK_ETC"] - except KeyError: - result = op.join(_splunk_home(), ETC_LEAF) - - return os.path.normpath(result) - - -def _get_shared_storage() -> Optional[str]: - """Get splunk shared storage name. - - Returns: - Splunk shared storage name. - """ - - try: - state = get_conf_key_value("server", "pooling", "state", APP_SYSTEM) - storage = get_conf_key_value("server", "pooling", "storage", APP_SYSTEM) - except KeyError: - state = "disabled" - storage = None - - if state == "enabled" and storage: - return storage - - return None - -# Verify path prefix and return true if both paths have drives -def _verify_path_prefix(path, start): - path_drive = os.path.splitdrive(path)[0] - start_drive = os.path.splitdrive(start)[0] - return len(path_drive) == len(start_drive) +class SessionKeyNotFound(Exception): + pass def make_splunkhome_path(parts: Union[List, Tuple]) -> str: @@ -111,53 +91,28 @@ def make_splunkhome_path(parts: Union[List, Tuple]) -> str: Raises: ValueError: Escape from intended parent directories. """ - - relpath = os.path.normpath(os.path.join(*parts)) - - basepath = None - shared_storage = _get_shared_storage() - if shared_storage: - for candidate in on_shared_storage: - # SPL-100508 On windows if the path is missing the drive letter, - # construct fullpath manually and call relpath - if os.name == "nt" and not _verify_path_prefix(relpath, candidate): - break - - if os.path.relpath(relpath, candidate)[0:2] != "..": - basepath = shared_storage - break - - if basepath is None: - etc_with_trailing_sep = os.path.join(ETC_LEAF, "") - if relpath == ETC_LEAF or relpath.startswith(etc_with_trailing_sep): - # Redirect $SPLUNK_HOME/etc to $SPLUNK_ETC. - basepath = _splunk_etc() - # Remove leading etc (and path separator, if present). Note: when - # emitting $SPLUNK_ETC exactly, with no additional path parts, we - # set to the empty string. - relpath = relpath[4:] - else: - basepath = _splunk_home() - - fullpath = os.path.normpath(os.path.join(basepath, relpath)) - - # Check that we haven't escaped from intended parent directories. - if os.path.relpath(fullpath, basepath)[0:2] == "..": - raise ValueError( - f'Illegal escape from parent directory "{basepath}": {fullpath}' - ) - return fullpath + return msp(parts) -def get_splunk_host_info() -> Tuple: +def get_splunk_host_info(session_key: Optional[str] = None) -> Tuple: """Get splunk host info. + Arguments: + session_key: Needed to make a call to config endpoint. If 'None', solnlib will try to get it from + splunk.getSessionKey() and/or __main__ module and if it won't get it, SessionKeyNotFound will be raised. Returns: Tuple of (server_name, host_name). """ - server_name = get_conf_key_value("server", "general", "serverName", APP_SYSTEM) + server_name = get_conf_key_value( + "server", + "general", + "serverName", + APP_SYSTEM, + session_key=session_key, + ) host_name = socket.gethostname() + return server_name, host_name @@ -175,21 +130,35 @@ def get_splunk_bin() -> str: return make_splunkhome_path(("bin", splunk_bin)) -def get_splunkd_access_info() -> Tuple[str, str, int]: +def get_splunkd_access_info(session_key: Optional[str] = None) -> Tuple[str, str, int]: """Get splunkd server access info. + Arguments: + session_key: Needed to make a call to config endpoint. If 'None', solnlib will try to get it from + splunk.getSessionKey() and/or __main__ module and if it won't get it, SessionKeyNotFound will be raised. Returns: Tuple of (scheme, host, port). """ + enable_splunkd_ssl = get_conf_key_value( + "server", + "sslConfig", + "enableSplunkdSSL", + APP_SYSTEM, + session_key=session_key, + ) - if is_true( - get_conf_key_value("server", "sslConfig", "enableSplunkdSSL", APP_SYSTEM) - ): + if is_true(enable_splunkd_ssl): scheme = "https" else: scheme = "http" - host_port = get_conf_key_value("web", "settings", "mgmtHostPort", APP_SYSTEM) + host_port = get_conf_key_value( + "web", + "settings", + "mgmtHostPort", + APP_SYSTEM, + session_key=session_key, + ) host_port = host_port.strip() host_port_split_parts = host_port.split(":") host = ":".join(host_port_split_parts[:-1]) @@ -203,14 +172,23 @@ def get_splunkd_access_info() -> Tuple[str, str, int]: return scheme, host, port -def get_scheme_from_hec_settings() -> str: +def get_scheme_from_hec_settings(session_key: Optional[str] = None) -> str: """Get scheme from HEC global settings. + Arguments: + session_key: Needed to make a call to config endpoint. If 'None', solnlib will try to get it from + splunk.getSessionKey() and/or __main__ module and if it won't get it, SessionKeyNotFound will be raised. Returns: scheme (str) """ try: - ssl_enabled = get_conf_key_value("inputs", "http", "enableSSL", APP_HEC) + ssl_enabled = get_conf_key_value( + "inputs", + "http", + "enableSSL", + APP_HEC, + session_key=session_key, + ) except KeyError: raise KeyError( "Cannot get enableSSL setting form conf: 'inputs' and stanza: '[http]'. " @@ -227,9 +205,12 @@ def get_scheme_from_hec_settings() -> str: return scheme -def get_splunkd_uri() -> str: +def get_splunkd_uri(session_key: Optional[str] = None) -> str: """Get splunkd uri. + Arguments: + session_key: Needed to make a call to config endpoint. If 'None', solnlib will try to get it from + splunk.getSessionKey() and/or __main__ module and if it won't get it, SessionKeyNotFound will be raised. Returns: Splunkd uri. """ @@ -237,20 +218,32 @@ def get_splunkd_uri() -> str: if os.environ.get("SPLUNKD_URI"): return os.environ["SPLUNKD_URI"] - scheme, host, port = get_splunkd_access_info() + scheme, host, port = get_splunkd_access_info(session_key) return f"{scheme}://{host}:{port}" def get_conf_key_value( - conf_name: str, stanza: str, key: str, app_name: Optional[str] = None + conf_name: str, + stanza: str, + key: str, + app_name: str, + session_key: Optional[str] = None, + user: str = "nobody", + raw_output: Optional[bool] = False, ) -> Union[str, List, dict]: """Get value of `key` of `stanza` in `conf_name`. Arguments: conf_name: Config file. stanza: Stanza name. - key: Key name. - app_name: Application name. Optional. + key: Key name in the stanza. + app_name: Application name. To make a call to global context use '-' as app_name and set raw_output=True. + In that case manual parsing is needed as response may be the list with multiple entries. + session_key: Needed to make a call to config endpoint. If 'None', solnlib will try to get it from + splunk.getSessionKey() and/or __main__ module and if it won't get it, SessionKeyNotFound will be raised. + user: used for set user context in API call. Optional. + raw_output: if 'true' full, decoded response in json format will be returned. It should be set to True when + app_name is a global context '/-/'. In that case splunk API may return multiple entries. Returns: Config value. @@ -259,19 +252,43 @@ def get_conf_key_value( KeyError: If `stanza` or `key` doesn't exist. """ - stanzas = get_conf_stanzas(conf_name, app_name) - return stanzas[stanza][key] + if use_btool: + app = None if app_name == "-" else app_name + stanzas = get_conf_stanzas(conf_name, app) + return stanzas[stanza][key] + + stanzas = _get_conf_stanzas_from_splunk_api( + conf_name, app_name, session_key=session_key, user=user, stanza=stanza + ) + + if raw_output: + return stanzas + + stanza = stanzas.get("entry")[0].get("content") + requested_key = stanza[key] + return requested_key def get_conf_stanza( - conf_name: str, stanza: str, app_name: Optional[str] = None + conf_name: str, + stanza: str, + app_name: str, + session_key: Optional[str] = None, + user: str = "nobody", + raw_output: Optional[bool] = False, ) -> dict: """Get `stanza` in `conf_name`. Arguments: conf_name: Config file. stanza: Stanza name. - app_name: Application name. Optional. + app_name: Application name. To make a call to global context use '-' as app_name and set raw_output=True. + In that case manual parsing is needed as response may be the list with multiple entries. + session_key: Needed to make a call to config endpoint. If 'None', solnlib will try to get it from + splunk.getSessionKey() and/or __main__ module and if it won't get it, SessionKeyNotFound will be raised. + user: used for set user context in API call. Optional. + raw_output: if 'true' full, decoded response in json format will be returned. It should be set to True when + app_name is a global context '/-/'. In that case splunk API may return multiple entries. Returns: Config stanza. @@ -280,8 +297,20 @@ def get_conf_stanza( KeyError: If stanza doesn't exist. """ - stanzas = get_conf_stanzas(conf_name, app_name) - return stanzas[stanza] + if use_btool: + app = None if app_name == "-" else app_name + stanzas = get_conf_stanzas(conf_name, app) + return stanzas[stanza] + + stanzas = _get_conf_stanzas_from_splunk_api( + conf_name, app_name, session_key=session_key, user=user, stanza=stanza + ) + + if raw_output: + return stanzas + + stanza = stanzas.get("entry")[0].get("content") + return stanza def get_conf_stanzas(conf_name: str, app_name: Optional[str] = None) -> dict: @@ -330,3 +359,56 @@ def get_conf_stanzas(conf_name: str, app_name: Optional[str] = None) -> dict: for section in parser.sections(): out[section] = {item[0]: item[1] for item in parser.items(section, raw=True)} return out + + +def _get_conf_stanzas_from_splunk_api( + conf_name: str, + app_name: str, + session_key: Optional[str] = None, + user: str = "nobody", + stanza: Optional[str] = None, +) -> dict: + """Get stanzas of `conf_name` using splunk API: + + /servicesNS/{user}/{app_name}/configs/conf-{conf_name}/{stanza} + + Arguments: + conf_name: Config file. + app_name: Application name. To make a call to global context use '-' as app_name and set raw_output=True. + In that case manual parsing is needed as response may be the list with multiple entries. + session_key: Needed to make a call to config endpoint. If 'None', solnlib will try to get it from + splunk.getSessionKey() and/or __main__ module and if it won't get it, SessionKeyNotFound will be raised. + user: used for set user context in API call. Optional. + stanza: Stanza name. Optional. + + Returns: + json response. + """ + + url = f"/servicesNS/{user}/{app_name}/configs/conf-{conf_name}" + + if stanza: + url = url + "/" + stanza + + if not session_key: + session_key = getSessionKey() + + if not session_key and hasattr(__main__, "___sessionKey"): + session_key = getattr(__main__, "___sessionKey") + + if not session_key: + raise SessionKeyNotFound( + "Session key is missing. If you are using 'splunkenv' module in your TA, please ensure you are " + "providing session_key to it's functions. For more information " + "please see: https://splunk.github.io/addonfactory-solutions-library-python/release_7_0_0/" + ) + + server_response, server_content = simpleRequest( + url, + sessionKey=session_key, + getargs={"output_mode": "json"}, + ) + + result = json.loads(server_content.decode()) + + return result diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index c7583b64..9f9f6f0b 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -21,3 +21,18 @@ def setup_env(): @pytest.fixture(scope="session") def session_key(): return context.get_session_key() + + +def mock_splunk(monkeypatch): + def simple_requests(url, *args, **kwargs): + from splunk.rest import simpleRequest + + return simpleRequest(url, *args, **kwargs) + + def make_splunkn_home(url, *args, **kwargs): + from splunk.clilib.bundle_paths import make_splunkhome_path + + return make_splunkhome_path(url, *args, **kwargs) + + monkeypatch.setattr("solnlib.splunkenv.simpleRequest", simple_requests) + monkeypatch.setattr("solnlib.splunkenv.msp", make_splunkn_home) diff --git a/tests/integration/test_conf_manager.py b/tests/integration/test_conf_manager.py index 54907bd7..a48425d2 100644 --- a/tests/integration/test_conf_manager.py +++ b/tests/integration/test_conf_manager.py @@ -15,6 +15,7 @@ # import context +import conftest import pytest from solnlib import conf_manager, soln_exceptions from unittest import mock @@ -144,7 +145,11 @@ def test_conf_manager_update_conf_with_encrypted_keys(): assert conf_file.get("stanza")["key2"] == "value2" -def test_get_log_level(): +@mock.patch("splunk.clilib.cli_common.getMgmtUri") +def test_get_log_level(mock_mgm_uri, monkeypatch): + mock_mgm_uri.return_value = "https://127.0.0.1:8089" + conftest.mock_splunk(monkeypatch) + session_key = context.get_session_key() expected_log_level = "DEBUG" @@ -159,7 +164,9 @@ def test_get_log_level(): assert expected_log_level == log_level -def test_get_log_level_incorrect_log_level_field(): +def test_get_log_level_incorrect_log_level_field(monkeypatch): + conftest.mock_splunk(monkeypatch) + session_key = context.get_session_key() expected_log_level = "INFO" @@ -173,7 +180,9 @@ def test_get_log_level_incorrect_log_level_field(): assert expected_log_level == log_level -def test_get_proxy_dict(): +def test_get_proxy_dict(monkeypatch): + conftest.mock_splunk(monkeypatch) + session_key = context.get_session_key() expected_proxy_dict = VALID_PROXY_DICT proxy_dict = conf_manager.get_proxy_dict( @@ -185,7 +194,9 @@ def test_get_proxy_dict(): assert expected_proxy_dict == proxy_dict -def test_invalid_proxy_port(): +def test_invalid_proxy_port(monkeypatch): + conftest.mock_splunk(monkeypatch) + session_key = context.get_session_key() with pytest.raises(soln_exceptions.InvalidPortError): @@ -199,7 +210,9 @@ def test_invalid_proxy_port(): ) -def test_invalid_proxy_host(): +def test_invalid_proxy_host(monkeypatch): + conftest.mock_splunk(monkeypatch) + session_key = context.get_session_key() with pytest.raises(soln_exceptions.InvalidHostnameError): @@ -225,7 +238,9 @@ def test_conf_manager_exception(): ) -def test_conf_stanza_not_exist_exception(): +def test_conf_stanza_not_exist_exception(monkeypatch): + conftest.mock_splunk(monkeypatch) + session_key = context.get_session_key() with pytest.raises(soln_exceptions.ConfStanzaNotExistException): diff --git a/tests/integration/test_credentials.py b/tests/integration/test_credentials.py index e6999ec0..af475d75 100644 --- a/tests/integration/test_credentials.py +++ b/tests/integration/test_credentials.py @@ -27,13 +27,7 @@ def _build_credential_manager( realm: Optional[str] = None, ) -> credentials.CredentialManager: - session_key = credentials.get_session_key( - context.username, - context.password, - scheme=context.scheme, - host=context.host, - port=context.port, - ) + session_key = context.get_session_key() return credentials.CredentialManager( session_key, context.app, diff --git a/tests/integration/test_hec_config.py b/tests/integration/test_hec_config.py index 38159e60..95c8713b 100644 --- a/tests/integration/test_hec_config.py +++ b/tests/integration/test_hec_config.py @@ -15,6 +15,7 @@ # import context +import conftest import os.path as op import sys @@ -23,7 +24,9 @@ sys.path.insert(0, op.dirname(op.dirname(op.abspath(__file__)))) -def test_hec_config(): +def test_hec_config(monkeypatch): + conftest.mock_splunk(monkeypatch) + session_key = context.get_session_key() config = hec_config.HECConfig(session_key) stanza = { diff --git a/tests/integration/test_hec_event_writer.py b/tests/integration/test_hec_event_writer.py index 3a19ef74..11316bc3 100644 --- a/tests/integration/test_hec_event_writer.py +++ b/tests/integration/test_hec_event_writer.py @@ -14,6 +14,7 @@ # limitations under the License. # import context +import conftest import os.path as op import sys import time @@ -25,7 +26,9 @@ sys.path.insert(0, op.dirname(op.dirname(op.abspath(__file__)))) -def test_hec_event_writer(): +def test_hec_event_writer(monkeypatch): + conftest.mock_splunk(monkeypatch) + session_key = context.get_session_key() ew = hew.HECEventWriter("test", session_key) @@ -40,7 +43,9 @@ def test_hec_event_writer(): ew.write_events([e1, e2]) -def test_hec_event_writes_with_non_utf_8(): +def test_hec_event_writes_with_non_utf_8(monkeypatch): + conftest.mock_splunk(monkeypatch) + # To test scenario listed in https://github.com/splunk/addonfactory-solutions-library-python/pull/112. test_name = "test_hec_event_writes_with_non_utf_8" session_key = context.get_session_key() diff --git a/tests/integration/test_splunkenv.py b/tests/integration/test_splunkenv.py index d9a0c47f..d8fe2b51 100644 --- a/tests/integration/test_splunkenv.py +++ b/tests/integration/test_splunkenv.py @@ -13,22 +13,32 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import pytest +import context +import conftest import os import os.path as op import sys from solnlib import splunkenv +from unittest import mock sys.path.insert(0, op.dirname(op.dirname(op.abspath(__file__)))) -def test_splunkenv(): +@mock.patch("solnlib.splunkenv.getSessionKey") +def test_splunkenv(mock_get_session_key, monkeypatch): + conftest.mock_splunk(monkeypatch) + mock_get_session_key.return_value = None + assert "SPLUNK_HOME" in os.environ + sk = context.get_session_key() + splunkhome_path = splunkenv.make_splunkhome_path(["etc", "apps"]) assert splunkhome_path == op.join(os.environ["SPLUNK_HOME"], "etc", "apps") - server_name, host_name = splunkenv.get_splunk_host_info() + server_name, host_name = splunkenv.get_splunk_host_info(sk) assert server_name assert host_name @@ -38,10 +48,20 @@ def test_splunkenv(): op.join(os.environ["SPLUNK_HOME"], "bin", "splunk.exe"), ] - scheme, host, port = splunkenv.get_splunkd_access_info() + scheme, host, port = splunkenv.get_splunkd_access_info(sk) assert scheme assert host assert port - uri = splunkenv.get_splunkd_uri() + uri = splunkenv.get_splunkd_uri(sk) assert uri == f"{scheme}://{host}:{port}" + + exp_msg = ( + "Session key is missing. If you are using 'splunkenv' module in your TA, please ensure you are " + "providing session_key to it's functions. For more information " + "please see: https://splunk.github.io/addonfactory-solutions-library-python/release_7_0_0/" + ) + + with pytest.raises(splunkenv.SessionKeyNotFound) as e: + splunkenv.get_splunk_host_info() + assert e.value.args[0] == exp_msg diff --git a/tests/integration/test_time_parser.py b/tests/integration/test_time_parser.py index e21b0a67..4c2192e4 100644 --- a/tests/integration/test_time_parser.py +++ b/tests/integration/test_time_parser.py @@ -15,6 +15,7 @@ # import context +import conftest import datetime import os.path as op import sys @@ -24,7 +25,9 @@ sys.path.insert(0, op.dirname(op.dirname(op.abspath(__file__)))) -def test_time_parser(): +def test_time_parser(monkeypatch): + conftest.mock_splunk(monkeypatch) + session_key = context.get_session_key() tp = time_parser.TimeParser(session_key) diff --git a/tests/integration/test_user_access.py b/tests/integration/test_user_access.py index 62a8cfe0..09ba9857 100644 --- a/tests/integration/test_user_access.py +++ b/tests/integration/test_user_access.py @@ -15,6 +15,7 @@ # import context +import conftest import os.path as op import sys @@ -114,7 +115,9 @@ def test_app_capability_manager(): assert not acm.capabilities_are_registered() -def test_check_user_access(): +def test_check_user_access(monkeypatch): + conftest.mock_splunk(monkeypatch) + session_key = context.get_session_key() app_capabilities = { "object_type1": { @@ -179,10 +182,10 @@ def test_get_user_roles(): ) -def test_user_access(): +def test_user_access(monkeypatch): test_object_acl_manager() test_app_capability_manager() - test_check_user_access() + test_check_user_access(monkeypatch) test_get_current_username() test_get_user_capabilities() test_user_is_capable() diff --git a/tests/unit/common.py b/tests/unit/common.py index fa79619f..659fb228 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # - import os.path as op import socket import subprocess @@ -72,9 +71,40 @@ def communicate(self, input=None): with open(file_path) as fp: return fp.read(), None + def simple_requests(url, *args, **kwargs): + if "conf-server/sslConfig" in url: + file_path = op.sep.join( + [cur_dir, "data/mock_splunk/etc/system/default/server-sslconfig.json"] + ) + elif "conf-server/general" in url: + file_path = op.sep.join( + [cur_dir, "data/mock_splunk/etc/system/default/server-general.json"] + ) + elif "conf-inputs" in url: + file_path = op.sep.join( + [ + cur_dir, + "data/mock_splunk/etc/apps/splunk_httpinput/local/inputs.json", + ] + ) + else: + file_path = op.sep.join( + [cur_dir, "data/mock_splunk/etc/system/default/web.json"] + ) + with open(file_path) as fp: + data = fp.read(), None + + out = data[0] + return "200", out.encode("utf8") + + def get_session_key(): + return None + splunk_home = op.join(cur_dir, "data/mock_splunk/") monkeypatch.setenv("SPLUNK_HOME", splunk_home) monkeypatch.setenv("SPLUNK_ETC", op.join(splunk_home, "etc")) + monkeypatch.setattr("solnlib.splunkenv.simpleRequest", simple_requests) + monkeypatch.setattr("solnlib.splunkenv.getSessionKey", get_session_key) monkeypatch.setattr(subprocess, "Popen", MockPopen) diff --git a/tests/unit/data/mock_splunk/etc/apps/splunk_httpinput/local/inputs.conf b/tests/unit/data/mock_splunk/etc/apps/splunk_httpinput/local/inputs.conf index 6e74c6dc..d224460a 100644 --- a/tests/unit/data/mock_splunk/etc/apps/splunk_httpinput/local/inputs.conf +++ b/tests/unit/data/mock_splunk/etc/apps/splunk_httpinput/local/inputs.conf @@ -1,3 +1,3 @@ [http] -disabled = 0 -enableSSL = 0 +disabled = 1 +enableSSL = 1 diff --git a/tests/unit/data/mock_splunk/etc/apps/splunk_httpinput/local/inputs.json b/tests/unit/data/mock_splunk/etc/apps/splunk_httpinput/local/inputs.json new file mode 100644 index 00000000..825b5c5a --- /dev/null +++ b/tests/unit/data/mock_splunk/etc/apps/splunk_httpinput/local/inputs.json @@ -0,0 +1,21 @@ +{ + "links": { + "create": "/servicesNS/nobody/-/configs/conf-server/_new", + "_reload": "/servicesNS/nobody/-/configs/conf-server/_reload", + "_acl": "/servicesNS/nobody/-/configs/conf-server/_acl" + }, + "origin": "https://127.0.0.1:8089/servicesNS/nobody/-/configs/conf-server", + "updated": "2025-05-29T20:31:49+02:00", + "generator": { + "build": "51ccf43db5bd", + "version": "9.3.0" + }, + "entry": [ + { + "content": { + "disabled": 0, + "enableSSL": 0 + } + } + ] +} \ No newline at end of file diff --git a/tests/unit/data/mock_splunk/etc/system/default/server-general.json b/tests/unit/data/mock_splunk/etc/system/default/server-general.json new file mode 100644 index 00000000..15e6be2d --- /dev/null +++ b/tests/unit/data/mock_splunk/etc/system/default/server-general.json @@ -0,0 +1,33 @@ +{ + "links": { + "create": "/servicesNS/nobody/-/configs/conf-server/_new", + "_reload": "/servicesNS/nobody/-/configs/conf-server/_reload", + "_acl": "/servicesNS/nobody/-/configs/conf-server/_acl" + }, + "origin": "https://127.0.0.1:8089/servicesNS/nobody/-/configs/conf-server", + "updated": "2025-05-29T20:31:49+02:00", + "generator": { + "build": "51ccf43db5bd", + "version": "9.3.0" + }, + "entry": [ + { + "content": { + "serverName": "unittestServer", + "sessionTimeout": "1h", + "pass4SymmKey": "changeme", + "allowRemoteLogin": "requireSetPassword", + "access_logging_for_phonehome": "true", + "hangup_after_phonehome": "false", + "listenOnIPv6": "no", + "connectUsingIpVersion": "auto", + "useHTTPServerCompression": "true", + "useHTTPClientCompression": "false", + "defaultHTTPServerCompressionLevel": "6", + "skipHTTPCompressionAcl": "127.0.0.1 ::1", + "parallelIngestionPipelines": "1", + "instanceType": "download" + } + } + ] +} \ No newline at end of file diff --git a/tests/unit/data/mock_splunk/etc/system/default/server-sslconfig.json b/tests/unit/data/mock_splunk/etc/system/default/server-sslconfig.json new file mode 100644 index 00000000..403f50a6 --- /dev/null +++ b/tests/unit/data/mock_splunk/etc/system/default/server-sslconfig.json @@ -0,0 +1,32 @@ +{ + "links": { + "create": "/servicesNS/nobody/-/configs/conf-server/_new", + "_reload": "/servicesNS/nobody/-/configs/conf-server/_reload", + "_acl": "/servicesNS/nobody/-/configs/conf-server/_acl" + }, + "origin": "https://127.0.0.1:8089/servicesNS/nobody/-/configs/conf-server", + "updated": "2025-05-29T20:31:49+02:00", + "generator": { + "build": "51ccf43db5bd", + "version": "9.3.0" + }, + "entry": [ + { + "content": { + "allowSslCompression": true, + "allowSslRenegotiation": true, + "caCertFile": "cacert.pem", + "caPath": "$SPLUNK_HOME/etc/auth", + "certCreateScript": "$SPLUNK_HOME/bin/splunk, createssl, server-cert", + "cipherSuite": "TLSv1+HIGH:@STRENGTH", + "enableSplunkdSSL": true, + "sendStrictTransportSecurityHeader": "false", + "sslKeysfile": "server.pem", + "sslKeysfilePassword": "password", + "sslVersions": "*,-ssl2", + "useClientSSLCompression": true, + "useSplunkdClientSSLCompression": true + } + } + ] +} \ No newline at end of file diff --git a/tests/unit/data/mock_splunk/etc/system/default/server.conf b/tests/unit/data/mock_splunk/etc/system/default/server.conf index dec71e1a..42a9a7ac 100644 --- a/tests/unit/data/mock_splunk/etc/system/default/server.conf +++ b/tests/unit/data/mock_splunk/etc/system/default/server.conf @@ -16,7 +16,7 @@ [general] -serverName=unittestServer +serverName=unittestServerBtool sessionTimeout=1h pass4SymmKey = changeme @@ -46,7 +46,7 @@ parallelIngestionPipelines = 1 instanceType = download [sslConfig] -enableSplunkdSSL = true +enableSplunkdSSL = false useClientSSLCompression = true useSplunkdClientSSLCompression = true # enableSplunkSearchSSL has been moved to web.conf, and changed to enableSplunkWebSSL diff --git a/tests/unit/data/mock_splunk/etc/system/default/web.conf b/tests/unit/data/mock_splunk/etc/system/default/web.conf index e21f2d49..a192f56b 100644 --- a/tests/unit/data/mock_splunk/etc/system/default/web.conf +++ b/tests/unit/data/mock_splunk/etc/system/default/web.conf @@ -1,4 +1,5 @@ # Version 6.3.0 +# Version 6.3.0 # DO NOT EDIT THIS FILE! # Changes to default files will be lost on update and are difficult to # manage and support. @@ -29,7 +30,7 @@ httpport = 8000 enableSplunkWebSSL = false # location of splunkd; don't include http[s]:// in this anymore. -mgmtHostPort = 127.0.0.1:8089 +mgmtHostPort = 127.0.0.2:8079 # list of ports to start python application servers on (although usually # one port is enough) Set to 0 to instead run the application server diff --git a/tests/unit/data/mock_splunk/etc/system/default/web.json b/tests/unit/data/mock_splunk/etc/system/default/web.json new file mode 100644 index 00000000..e100c30c --- /dev/null +++ b/tests/unit/data/mock_splunk/etc/system/default/web.json @@ -0,0 +1,122 @@ +{ + "links": { + "create": "/servicesNS/nobody/-/configs/conf-server/_new", + "_reload": "/servicesNS/nobody/-/configs/conf-server/_reload", + "_acl": "/servicesNS/nobody/-/configs/conf-server/_acl" + }, + "origin": "https://127.0.0.1:8089/servicesNS/nobody/-/configs/conf-server", + "updated": "2025-05-29T20:31:49+02:00", + "generator": { + "build": "51ccf43db5bd", + "version": "9.3.0" + }, + "entry": [ + { + "content": { + "SSOMode": "strict", + "acceptFrom": "*", + "allowSslCompression": "false", + "allowSslRenegotiation": "true", + "allowSsoWithoutChangingServerConf": "0", + "appServerPorts": "8065", + "caCertPath": "etc/auth/splunkweb/cert.pem", + "cacheBytesLimit": "4194304", + "cacheEntriesLimit": "16384", + "cipherSuite": "TLSv1+HIGH:@STRENGTH", + "crossOriginSharingPolicy": "", + "dedicatedIoThreads": "0", + "docsCheckerBaseURL": "https://quickdraw.splunk.com/help", + "embed_footer": "splunk>", + "embed_uri": "", + "enableSplunkWebSSL": "false", + "enableWebDebug": "False", + "enable_autocomplete_login": "False", + "enable_gzip": "True", + "enable_insecure_login": "False", + "enable_pivot_adhoc_acceleration": "True", + "enable_proxy_write": "True", + "enabled_decomposers": "plot", + "engine.autoreload_on": "False", + "flash_major_version": "9", + "flash_minor_version": "0", + "flash_revision_version": "124", + "forceHttp10": "auto", + "httpport": "8000", + "job_max_polling_interval": "1000", + "job_min_polling_interval": "100", + "js_logger_mode": "None", + "js_logger_mode_server_end_point": "util/log/js", + "js_logger_mode_server_max_buffer": "100", + "js_logger_mode_server_poll_buffer": "1000", + "js_no_cache": "False", + "jschart_test_mode": "False", + "jschart_truncation_limit.chrome": "20000", + "jschart_truncation_limit.firefox": "20000", + "jschart_truncation_limit.ie10": "20000", + "jschart_truncation_limit.ie11": "20000", + "jschart_truncation_limit.ie7": "2000", + "jschart_truncation_limit.ie8": "2000", + "jschart_truncation_limit.ie9": "20000", + "jschart_truncation_limit.safari": "20000", + "listenOnIPv6": "no", + "log.access_file": "web_access.log", + "log.access_maxfiles": "5", + "log.access_maxsize": "25000000", + "log.error_maxfiles": "5", + "log.error_maxsize": "25000000", + "log.screen": "True", + "login_content": "", + "maxSockets": "0", + "maxThreads": "0", + "max_view_cache_size": "1000", + "mgmtHostPort": "127.0.0.1:8089", + "minify_css": "True", + "minify_js": "True", + "module_dir": "share/splunk/search_mrsparkle/modules", + "override_JSON_MIME_type_with_text_plain": "True", + "pdfgen_is_available": "1", + "pivot_adhoc_acceleration_mode": "Elastic", + "privKeyPath": "etc/auth/splunkweb/privkey.pem", + "productMenuLabel": "My Splunk", + "productMenuUriPrefix": "https://splunkcommunities.force.com", + "remoteUser": "REMOTE_USER", + "remoteUserMatchExact": "0", + "request.show_tracebacks": "True", + "response.timeout": "7200", + "root_endpoint": "/", + "rss_endpoint": "/rss", + "sendStrictTransportSecurityHeader": "false", + "showProductMenu": "False", + "showUserMenuProfile": "False", + "simple_xml_force_flash_charting": "False", + "simple_xml_module_render": "False", + "splunkdConnectionTimeout": "30", + "sslVersions": "ssl3, tls", + "startwebserver": "1", + "staticCompressionLevel": "9", + "static_dir": "share/splunk/search_mrsparkle/exposed", + "static_endpoint": "/static", + "template_dir": "share/splunk/search_mrsparkle/templates", + "testing_dir": "share/splunk/testing", + "testing_endpoint": "/testing", + "tools.decode.on": "True", + "tools.encode.encoding": "utf-8", + "tools.encode.on": "True", + "tools.sessions.httponly": "True", + "tools.sessions.on": "True", + "tools.sessions.restart_persist": "True", + "tools.sessions.secure": "True", + "tools.sessions.storage_path": "var/run/splunk", + "tools.sessions.storage_type": "file", + "tools.sessions.timeout": "60", + "trap_module_exceptions": "True", + "ui_inactivity_timeout": "60", + "updateCheckerBaseURL": "https://quickdraw.splunk.com/js/", + "use_future_expires": "True", + "userRegistrationURL": "https://www.splunk.com/page/sign_up", + "verifyCookiesWorkDuringLogin": "True", + "x_frame_options_sameorigin": "True" + } + } + ] +} \ No newline at end of file diff --git a/tests/unit/test_credentials.py b/tests/unit/test_credentials.py deleted file mode 100644 index b2e1341c..00000000 --- a/tests/unit/test_credentials.py +++ /dev/null @@ -1,49 +0,0 @@ -# -# Copyright 2023 Splunk Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import common -import pytest -from splunklib import binding - -from solnlib import credentials - - -def test_get_session_key(monkeypatch): - def _mock_session_key_post(self, url, headers=None, **kwargs): - return common.make_response_record( - '{"sessionKey":"' + common.SESSION_KEY + '"}' - ) - - common.mock_splunkhome(monkeypatch) - monkeypatch.setattr(binding.HttpLib, "post", _mock_session_key_post) - - assert credentials.get_session_key("user", "password") == common.SESSION_KEY - - with pytest.raises(ValueError): - credentials.get_session_key("user", "password", scheme="non-http") - credentials.get_session_key("user", "password", scheme="http") - credentials.get_session_key("user", "password", scheme="https") - with pytest.raises(ValueError): - credentials.get_session_key("user", "password", scheme="http", host="==") - credentials.get_session_key("user", "password", scheme="http", host="localhost") - with pytest.raises(ValueError): - credentials.get_session_key( - "user", "password", scheme="http", host="localhost", port=-10 - ) - credentials.get_session_key( - "user", "password", scheme="http", host="localhost", port=10 - ) - credentials.get_session_key("user", "password", scheme="HTTP") - credentials.get_session_key("user", "password", scheme="HTTPS") diff --git a/tests/unit/test_modular_input_event_writer.py b/tests/unit/test_modular_input_event_writer.py index dd9902e7..1d006a57 100644 --- a/tests/unit/test_modular_input_event_writer.py +++ b/tests/unit/test_modular_input_event_writer.py @@ -20,8 +20,10 @@ import common import pytest from splunklib import binding +from unittest.mock import patch from solnlib.modular_input import ClassicEventWriter, HECEventWriter +from solnlib.modular_input.event_writer import FunctionDeprecated, deprecation_msg def test_classic_event_writer(monkeypatch): @@ -269,7 +271,6 @@ def mock_post_2( (create_hec_event_writer__create_from_input, False, "https"), (create_hec_event_writer__create_from_token_with_session_key, True, "http"), (create_hec_event_writer__create_from_token_with_session_key, False, "https"), - (create_hec_event_writer__create_from_token, True, "http"), (create_hec_event_writer__create_from_token, False, "https"), ], ) @@ -287,3 +288,54 @@ def mock_get_hec_config( ev = create_hec_event_writer(hec) assert ev._rest_client.scheme == expected_scheme + + +@patch("solnlib.splunkenv.use_btool") +@pytest.mark.parametrize( + "create_hec_event_writer, hec, expected_scheme", + [ + (create_hec_event_writer__constructor, True, "https"), + (create_hec_event_writer__constructor, False, "http"), + (create_hec_event_writer__create_from_input, True, "https"), + (create_hec_event_writer__create_from_input, False, "https"), + (create_hec_event_writer__create_from_token_with_session_key, True, "https"), + (create_hec_event_writer__create_from_token_with_session_key, False, "https"), + (create_hec_event_writer__create_from_token, False, "https"), + ], +) +def test_hec_event_writer_gets_scheme_from_global_settings_if_requested_backdoor( + mock_flag, monkeypatch, create_hec_event_writer, hec, expected_scheme +): + mock_flag.return_value = True + common.mock_splunkhome(monkeypatch) + + def mock_get_hec_config( + self, hec_input_name, session_key, scheme, host, port, **context + ): + return "8088", "87de04d1-0823-11e6-9c94-a45e60e" + + monkeypatch.setattr(HECEventWriter, "_get_hec_config", mock_get_hec_config) + + ev = create_hec_event_writer(hec) + assert ev._rest_client.scheme == expected_scheme + + +@pytest.mark.parametrize("hec", [False, True]) +def test_hec_event_writer_create_from_token_deprecation(hec, monkeypatch, caplog): + common.mock_splunkhome(monkeypatch) + + def mock_get_hec_config( + self, hec_input_name, session_key, scheme, host, port, **context + ): + return "8088", "87de04d1-0823-11e6-9c94-a45e60e" + + monkeypatch.setattr(HECEventWriter, "_get_hec_config", mock_get_hec_config) + + if hec: + with pytest.raises(FunctionDeprecated) as e: + create_hec_event_writer__create_from_token(hec=True) + assert e.value.args[0] == deprecation_msg + else: + with pytest.warns(DeprecationWarning, match=deprecation_msg): + ev = create_hec_event_writer__create_from_token(hec=False) + assert ev._rest_client.scheme == "https" diff --git a/tests/unit/test_splunkenv.py b/tests/unit/test_splunkenv.py index 36834932..9580d285 100644 --- a/tests/unit/test_splunkenv.py +++ b/tests/unit/test_splunkenv.py @@ -14,7 +14,6 @@ # limitations under the License. # -import os from unittest import mock import common @@ -23,30 +22,24 @@ from solnlib import splunkenv -def test_splunkhome_path(monkeypatch): - common.mock_splunkhome(monkeypatch) - - splunkhome_path = splunkenv.make_splunkhome_path(["etc", "apps"]) - assert splunkhome_path == os.environ["SPLUNK_HOME"] + "etc/apps" - - def test_get_splunk_host_info(monkeypatch): common.mock_splunkhome(monkeypatch) common.mock_gethostname(monkeypatch) - server_name, host_name = splunkenv.get_splunk_host_info() + server_name, host_name = splunkenv.get_splunk_host_info("session_key") assert server_name == "unittestServer" assert host_name == "unittestServer" -def test_splunk_bin(monkeypatch): +@mock.patch("solnlib.splunkenv.use_btool") +def test_get_splunk_host_info_backdoor(mock_flag, monkeypatch): + mock_flag.return_value = True common.mock_splunkhome(monkeypatch) + common.mock_gethostname(monkeypatch) - splunk_bin = splunkenv.get_splunk_bin() - assert splunk_bin in ( - os.environ["SPLUNK_HOME"] + "bin/splunk", - os.environ["SPLUNK_HOME"] + "bin/splunk.exe", - ) + server_name, host_name = splunkenv.get_splunk_host_info("session_key") + assert server_name == "unittestServerBtool" + assert host_name == "unittestServer" @mock.patch.object(splunkenv, "get_conf_key_value") @@ -127,17 +120,47 @@ def test_get_splunkd_access_info( def test_splunkd_uri(monkeypatch): common.mock_splunkhome(monkeypatch) - uri = splunkenv.get_splunkd_uri() + uri = splunkenv.get_splunkd_uri("session_key") assert uri == "https://127.0.0.1:8089" monkeypatch.setenv("SPLUNK_BINDIP", "10.0.0.2:7080") - uri = splunkenv.get_splunkd_uri() + uri = splunkenv.get_splunkd_uri("session_key") assert uri == "https://10.0.0.2:8089" monkeypatch.setenv("SPLUNK_BINDIP", "10.0.0.3") - uri = splunkenv.get_splunkd_uri() + uri = splunkenv.get_splunkd_uri("session_key") assert uri == "https://10.0.0.3:8089" monkeypatch.setenv("SPLUNKD_URI", "https://10.0.0.1:8089") - uri = splunkenv.get_splunkd_uri() + uri = splunkenv.get_splunkd_uri("session_key") + assert uri == "https://10.0.0.1:8089" + + +@mock.patch("solnlib.splunkenv.use_btool") +def test_splunkd_uri_backdoor(mock_flag, monkeypatch): + mock_flag.return_value = True + + common.mock_splunkhome(monkeypatch) + + uri = splunkenv.get_splunkd_uri("session_key") + assert uri == "http://127.0.0.2:8079" + + monkeypatch.setenv("SPLUNK_BINDIP", "10.0.0.2:7080") + uri = splunkenv.get_splunkd_uri("session_key") + assert uri == "http://10.0.0.2:8079" + + monkeypatch.setenv("SPLUNK_BINDIP", "10.0.0.3") + uri = splunkenv.get_splunkd_uri("session_key") + assert uri == "http://10.0.0.3:8079" + + monkeypatch.setenv("SPLUNKD_URI", "https://10.0.0.1:8089") + uri = splunkenv.get_splunkd_uri("session_key") assert uri == "https://10.0.0.1:8089" + + +def test_splunkenv_used_outside_splunk_env(): + with pytest.raises(ImportError) as exc_info: + splunkenv._get_conf_stanzas_from_splunk_api( + "server", "app", session_key="session_key" + ) + assert exc_info.value.args[0] == "This module requires Splunk to be installed."