diff --git a/README.md b/README.md index ec2c41d..5d4c519 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,10 @@ URIs/URLs, among other topics. ## Requirements * Python >= 3.8 + * hvac >= 2.3.0 + * boto3 >= 1.35.63 + +(Hvac and boto3 are used for credential_manager) You will also need some other libraries for running the tool, you can find the whole list of dependencies in [pyproject.toml](pyproject.toml) file. @@ -59,6 +63,119 @@ To spaw a new shell within the virtual environment use: $ poetry shell ``` +## Credential Manager + +This is a module made to retrieve credentials from different secrets management systems like Bitwarden. +It accesses the secrets management service, looks for the desired credential and returns it in String form. + + +There are two ways of using this module. + + +### Terminal + +To use this, any of these two is valid: + +Command-Line Interface: + +``` +$ python -m credential_manager +``` + +Where: + +- manager → credential manager used to store the credentials (Bitwarden, aws, Hashicorp Vault) +- service → the platform to which you want to connect (github, gitlab, bugzilla). It is the name of the secret in the credential storage, it does not have to be the same as the service. +- credential → the field inside the secret that you want to retrieve (username, password, api-token) + +Examples: + +``` +$ python -m credential_manager bitwarden gmail password +$ python -m credential_manager hashicorp github api_token +$ python -m credential_manager aws production db_password +``` + +In each case, the script will log / access into the corresponding vault, search for the secret with the name of the service that wants to be accessed and then retrieve, from that secret, the value with the name inserted as credential. + +That is, in the first case, it will log into Bitwarden, access the secret called "bugzilla", and from it retrieve the value of the field "username". + +Each of the secrets management services are accessed in different forms and need different configurations to work, as specified in the [[#Managers]] section. + + +### Python API + +To use the module in your python code + + +``` +# Retrieve a secret from Bitwarden +username = get_secret("bitwarden", "bugzilla", "username") + +# Retrieve a secret from AWS Secrets Manager +api_token = get_secret("aws", "github", "api-token") + +# Retrieve a secret from HashiCorp Vault +password = get_secret("hashicorp", "gitlab", "password") +``` + +For more advaced usage, you can directly use the factory to get a specific manager: + +``` +from credential_manager.secrets_manager_factory import SecretsManagerFactory + +# Get a Bitwarden manager instance +bw_manager = SecretsManagerFactory.get_bitwarden_manager() +username = bw_manager.get_secret("bugzilla", "username") + +# Get an AWS Secrets Manager instance +aws_manager = SecretsManagerFactory.get_aws_manager() +api_token = aws_manager.get_secret("github", "api-token") + +``` + +### Supported Managers + + +This section explains the different things to consider when using each of the supported secrets management services, like where to store the credentials to access the secrets manager. + +#### AWS + +The module uses [boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html), and this looks for your credentials in the .aws folder, in the files "credentials" and "config". + +Configuration: + +- Credentials are read from the standard AWS credentials file (~/.aws/credentials) +- Region configuration is read from ~/.aws/config +- Ensure your IAM user/role has appropriate permissions to access Secrets Manager + +More about this [here](https://docs.aws.amazon.com/sdkref/latest/guide/file-location.html). + +#### Hashicorp Vault + +The module uses [hvac](https://hvac.readthedocs.io/en/stable/overview.html) to interact with Hashicorp Vault. + +The function will look for the following environment variables to get into the vault, and prompt the user for them if not found: + +- VAULT_ADDR → Address of the Vault server. +- VAULT_TOKEN → A Vault-issued service token that authenticates the CLI user to Vault. +- VAULT_CACERT → Path to a PEM-encoded CA certificate file on the local disk. Used to verify SSL certificates for the server + +If environment variables are not found, the user will be prompted to introduce the data manually. + +More info on this can be found [here](https://developer.hashicorp.com/vault/docs/commands). + +#### Bitwarden + +The module uses the [Bitwarden CLI](https://bitwarden.com/help/cli/) to interact with Bitwarden. + +Required environment variables: + +- BW_EMAIL → the email used to log into the bitwarden account +- BW_PASSWORD + +If environment variables are not found, the user will be prompted to introduce the data manually. + ## License Licensed under GNU General Public License (GPL), version 3 or later. diff --git a/grimoirelab_toolkit/credential_manager/__init__.py b/grimoirelab_toolkit/credential_manager/__init__.py new file mode 100644 index 0000000..17ddfa5 --- /dev/null +++ b/grimoirelab_toolkit/credential_manager/__init__.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: +# Alberto Ferrer Sánchez (alberefe@gmail.com) +# + +from .credential_manager import get_secret +from .secrets_manager_factory import SecretsManagerFactory + +__all__ = ['get_secret', 'SecretsManagerFactory'] \ No newline at end of file diff --git a/grimoirelab_toolkit/credential_manager/__main__.py b/grimoirelab_toolkit/credential_manager/__main__.py new file mode 100644 index 0000000..7b83158 --- /dev/null +++ b/grimoirelab_toolkit/credential_manager/__main__.py @@ -0,0 +1,4 @@ +from .credential_manager import main + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/grimoirelab_toolkit/credential_manager/aws_manager.py b/grimoirelab_toolkit/credential_manager/aws_manager.py new file mode 100644 index 0000000..bcd03c9 --- /dev/null +++ b/grimoirelab_toolkit/credential_manager/aws_manager.py @@ -0,0 +1,110 @@ +# -*- coding: utf-8 -*- +# +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: +# Alberto Ferrer Sánchez (alberefe@gmail.com) +# + +import logging +import json +import boto3 +from botocore.exceptions import EndpointConnectionError, SSLError, ClientError + +_logger = logging.getLogger(__name__) + + +class AwsManager: + + def __init__(self): + """ + Initializes the client that will access to the credentials management service. + + This takes the credentials to log into aws from the .aws folder. + This constructor also takes other relevant information from that folder if it exists. + + Raises: + Exception: If there's a connection error. + """ + + # Creates a client using the credentials found in the .aws folder + try: + _logger.info("Initializing client and login in") + self.client = boto3.client("secretsmanager") + + except (EndpointConnectionError, SSLError, ClientError, Exception) as e: + _logger.error("Problem starting the client: %s", e) + raise e + + def _retrieve_and_format_credentials(self, service_name: str) -> dict: + """ + Retrieves credentials using the class client. + + Args: + service_name (str): Name of the service to retrieve credentials for.(or name of the secret) + + Returns: + formatted_credentials (dict): Dictionary containing the credentials retrieved and formatted as a dict + + Raises: + Exception: If there's a connection error. + """ + try: + _logger.info("Retrieving credentials: %s", service_name) + secret_value_response = self.client.get_secret_value(SecretId=service_name) + formatted_credentials = json.loads(secret_value_response["SecretString"]) + return formatted_credentials + except (ClientError, json.JSONDecodeError) as e: + _logger.error("Error retrieving the secret: %s", str(e)) + raise e + + def get_secret(self, service_name: str, credential_name: str) -> str: + """ + Gets a secret based on the service name and the desired credential. + + Args: + service_name (str): Name of the service to retrieve credentials for + credential_name (str): Name of the credential + + Returns: + str: The credential value if found, empty string if not found + + Raises: + Exception: If there's a connection error. + """ + try: + formatted_credentials = self._retrieve_and_format_credentials(service_name) + credential = formatted_credentials[credential_name] + return credential + except KeyError: + # This handles when the credential doesn't exist in the secret + _logger.error("The secret %s:%s, was not found.", service_name, credential_name) + _logger.error( + "Please check the secret name and the credential name. For now here you have an empty string.") + return "" + except ClientError as e: + # This handles AWS-specific errors like ResourceNotFoundException + if e.response['Error']['Code'] == 'ResourceNotFoundException': + _logger.error("The secret %s:%s, was not found.", service_name, credential_name) + _logger.error(e) + _logger.error( + "Please check the secret name and the credential name. For now here you have an empty string.") + return "" + _logger.error("There was a problem getting the secret") + raise e + except Exception as e: + _logger.error("There was a problem getting the secret") + raise e diff --git a/grimoirelab_toolkit/credential_manager/bw_manager.py b/grimoirelab_toolkit/credential_manager/bw_manager.py new file mode 100644 index 0000000..f1e4302 --- /dev/null +++ b/grimoirelab_toolkit/credential_manager/bw_manager.py @@ -0,0 +1,342 @@ +# -*- coding: utf-8 -*- +# +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: +# Alberto Ferrer Sánchez (alberefe@gmail.com) +# + +import json +import subprocess +import logging +from datetime import datetime, timedelta + + +_logger = logging.getLogger(__name__) + + +class BitwardenManager: + + def __init__(self, email: str, password: str): + """ + Logs in bitwarden if not already. + + Args: + email (str): The email of the user + password (str): The password of the user + + Raises: + FileNotFoundError: If no credentials file is found + """ + # Session key of the bw session + self.session_key = None + self.formatted_credentials = {} + # store email for session validation + self._email = email + self.last_sync_time = None + self.sync_interval = timedelta(minutes=3) + + try: + self._login(email, password) + except FileNotFoundError: + _logger.error("File not found") + + def _login(self, bw_email: str, bw_password: str) -> str | None: + """ + Logs into Bitwarden and obtains a session key. + + Checks the current Bitwarden session status, unlocking or logging in as necessary. + If successful, synchronizes the vault. + + Args: + bw_email (str): Bitwarden account email. + bw_password (str): Bitwarden account password. + + Returns: + str: The session key for the current Bitwarden session. + + Raises: + Exception: If unlocking or logging into Bitwarden fails. + """ + try: + # If we have a session key, check if sync is needed + if self.session_key and self._validate_session(): + if self._should_sync(): + self._sync_vault() + return self.session_key + + _logger.debug("Checking Bitwarden login status") + # bw has to be in PATH + status_result = subprocess.run( + ["bw", "status"], capture_output=True, text=True, check=False + ) + + if status_result.returncode == 0: + _logger.debug("Checking vault status") + status = json.loads(status_result.stdout) + + if status.get("userEmail") == bw_email: + _logger.debug("User was already authenticated: %s", bw_email) + + if status.get("status") == "unlocked": + _logger.debug("Vault unlocked, getting session key") + self.session_key = status.get("sessionKey") + + elif status.get("status") == "locked": + _logger.debug("Vault locked, unlocking") + unlock_result = subprocess.run( + ["bw", "unlock", bw_password, "--raw"], + capture_output=True, + text=True, + check=False, + ) + + if unlock_result.returncode != 0: + error_msg = unlock_result.stderr.strip() if unlock_result.stderr else "Unknown error" + _logger.error("Error unlocking vault: %s", error_msg) + # Handle specific authentication errors + if "invalid_grant" in error_msg.lower(): + _logger.error("Invalid credentials provided for Bitwarden") + return None + + session_key = unlock_result.stdout.strip() if unlock_result.stdout else "" + if not session_key: + _logger.error("Empty session key received from unlock command") + return + self.session_key = session_key + + if not self.session_key: + _logger.debug("Couldn't obtain session key during login") + return None + + else: + _logger.debug("Login in: %s", bw_email) + result = subprocess.run( + ["bw", "login", bw_email, bw_password, "--raw"], + capture_output=True, + text=True, + check=False, + ) + + if result.returncode != 0: + error_msg = result.stderr.strip() if result.stderr else "Unknown error" + _logger.error("Error logging in: %s", error_msg) + # Handle specific authentication errors + if "invalid_grant" in error_msg.lower(): + _logger.error("Invalid credentials provided for Bitwarden") + return None + + _logger.debug("Setting session key") + session_key = result.stdout.strip() if result.stdout else "" + if not session_key: + _logger.error("Empty session key received from login command") + return None + self.session_key = session_key + + if self.session_key: + # Only sync if needed based on time interval + if self._should_sync(): + _logger.debug("Syncing local vault with Bitwarden") + sync_result = subprocess.run( + ["bw", "sync", "--session", self.session_key], + capture_output=True, + text=True, + check=False, + ) + if sync_result.returncode == 0: + self.last_sync_time = datetime.now() + else: + error_msg = sync_result.stderr.strip() if sync_result.stderr else "Unknown sync error" + _logger.debug("Sync failed but continuing: %s", error_msg) + return self.session_key + + _logger.debug("Session key not found cause could not log in") + return None + + except Exception as e: + _logger.error("There was a problem login in: %s", e) + raise e + + def _validate_session(self) -> bool: + """Checks current session.""" + try: + status_result = subprocess.run( + ["bw", "status"], capture_output=True, text=True, check=False + ) + + if status_result.returncode != 0: + return False + + status = json.loads(status_result.stdout) + return ( + status.get("status") == "unlocked" + and status.get("userEmail") == self._email + and status.get("sessionKey") == self.session_key + ) + except: + return False + + def _should_sync(self) -> bool: + """Determines if vault sync is needed based on last sync time.""" + return ( + not self.last_sync_time + or datetime.now() - self.last_sync_time > self.sync_interval + ) + + def _sync_vault(self) -> None: + """Syncs the vault and updates last sync time.""" + try: + _logger.debug("Syncing vault") + subprocess.run( + ["bw", "sync", "--session", self.session_key], check=True + ) + self.last_sync_time = datetime.now() + except subprocess.CalledProcessError as e: + _logger.error("Sync failed: %s", e) + + def _retrieve_credentials(self, service_name: str) -> dict: + """ + Retrieves a secret from a particular service from the Bitwarden vault. + + Args: + service_name (str): The name of the data source for which to retrieve the secret. + + Returns: + dict: The secret item retrieved from Bitwarden as a dictionary. + + Raises: + Exception: If retrieval of the secret fails. + """ + try: + _logger.info("Retrieving credential from Bitwarden CLI: %s", service_name) + + + _logger.debug("Session key = %s", self.session_key or "None") + + # Check if session key is available + if not self.session_key: + _logger.error("No valid session key available") + return {} + + # Get list of items + list_items = subprocess.run( + ["bw", "list", "items", "--session", self.session_key], + capture_output=True, + text=True, + check=False, + ) + + + if list_items.returncode != 0: + _logger.error(f"Failed to list items: {list_items.stderr}") + return {} + + + items = json.loads(list_items.stdout) + + # Find exact match + match_item = None + for item in items: + if item.get("name","").lower() == service_name.lower(): + match_item = item + break + + if not match_item: + if not match_item: + _logger.error(f"No exact match found for item: {service_name}") + return {} + + # Retrieve exact match by id + item_id = match_item.get("id") + result = subprocess.run( + ["bw", "get", "item", item_id, "--session", self.session_key], + capture_output=True, + text=True, + check=False, + ) + + + + if result.returncode != 0: + _logger.error("Failed to retrieve secret: %s", result.stderr) + return {} + + retrieved_secrets = json.loads(result.stdout) + _logger.info("Secrets successfully retrieved") + return retrieved_secrets + + except Exception as e: + _logger.error("There was a problem retrieving secret: %s", e) + raise e + + def _format_credentials(self, credentials: dict) -> dict: + """ + Formats the credentials retrieved from Bitwarden into a standardized format. + + Args: + credentials (dict): Raw credentials from Bitwarden + + Returns: + dict: Formatted credentials with standardized keys + """ + formatted = {"service_name": credentials["name"].lower()} + + # Get username and password from login section + login = credentials.get("login", {}) + if login.get("username"): + formatted["username"] = login["username"] + if login.get("password"): + formatted["password"] = login["password"] + + # Get custom fields + for field in credentials.get("fields", []): + formatted[field["name"]] = field["value"] + + return formatted + + def get_secret(self, service_name: str, credential_name: str) -> str: + """ + Retrieves a secret by name from the Bitwarden vault. + + Args: + service_name (str): The name of the secret to retrieve. + credential_name (str): The concrete credential to retrieve. + + Returns: + str: The secret value retrieved. + """ + + # If stored credentials are not available or belong to a different service + if ( + not self.formatted_credentials + or self.formatted_credentials.get("service_name") != service_name + ): + unformatted_credentials = self._retrieve_credentials(service_name) + self.formatted_credentials = self._format_credentials( + unformatted_credentials + ) + + secret = self.formatted_credentials.get(credential_name) + # in case nothing was found + if not secret: + _logger.error( + "The credential %s:%s, was not found.", service_name, credential_name + ) + _logger.error("In the meantime here you got an empty string") + return None + else: + # Return the requested credential + return secret diff --git a/grimoirelab_toolkit/credential_manager/credential_manager.py b/grimoirelab_toolkit/credential_manager/credential_manager.py new file mode 100755 index 0000000..50f5e80 --- /dev/null +++ b/grimoirelab_toolkit/credential_manager/credential_manager.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: +# Alberto Ferrer Sánchez (alberefe@gmail.com) +# + +import argparse +import logging +import sys + +from .secrets_manager_factory import SecretsManagerFactory + +CREDENTIAL_MANAGER_LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s" +CREDENTIAL_MANAGER_DEBUG_LOG_FORMAT = "[%(asctime)s - %(name)s - %(levelname)s] - %(message)s" + +_logger = logging.getLogger(__name__) + + +def get_secret( + secrets_manager_name: str, service_name: str, credential_name: str +) -> str: + """ + Retrieve a secret from the secrets manager. + + Args: + secrets_manager_name (str): The name of the secrets manager to be used + service_name (str): The name of the service we want to access + credential_name (str): The name of the credential we want to retrieve + + Returns: + str: The credential retrieved + + Raises: + ValueError: If the secrets manager is not supported or initialization fails + """ + try: + if secrets_manager_name == "bitwarden": + manager = SecretsManagerFactory.get_bitwarden_manager() + return manager.get_secret(service_name, credential_name) + + elif secrets_manager_name == "hashicorp": + manager = SecretsManagerFactory.get_hashicorp_manager() + return manager.get_secret(service_name, credential_name) + + elif secrets_manager_name == "aws": + manager = SecretsManagerFactory.get_aws_manager() + return manager.get_secret(service_name, credential_name) + + else: + raise ValueError(f"Unsupported secrets manager: {secrets_manager_name}") + + except Exception as e: + _logger.error("Error retrieving secret: %s", e) + raise + + +def configure_logging(debug=False): + """Configure credential_manager logging + + The function configures the log messages produced by Credential_manager. + By default, log messages are sent to stderr. Set the parameter + `debug` to activate the debug mode. + + :param debug: set the debug mode + """ + if not debug: + logging.basicConfig(level=logging.INFO, + format=CREDENTIAL_MANAGER_LOG_FORMAT) + logging.getLogger('requests').setLevel(logging.WARNING) + logging.getLogger('urllib3').setLevel(logging.WARNING) + logging.getLogger('boto3').setLevel(logging.WARNING) + logging.getLogger('botocore').setLevel(logging.WARNING) + else: + logging.basicConfig(level=logging.DEBUG, + format=CREDENTIAL_MANAGER_DEBUG_LOG_FORMAT) + +def main(): + """ + Main entry point for the command line interface. + Parses arguments and retrieves secrets using the appropriate manager. + """ + parser = argparse.ArgumentParser( + description="Retrieve a secret from a specified secrets manager." + ) + parser.add_argument( + "manager", + choices=["bitwarden", "hashicorp", "aws"], + help="The name of the secrets manager to use.", + ) + parser.add_argument( + "service", help="The name of the service for which to retrieve the credential." + ) + parser.add_argument("credential", help="The name of the credential to retrieve.") + parser.add_argument( + "--debug", + action="store_true", + help="Enable debug logging" + ) + + args = parser.parse_args() + + configure_logging(args.debug) + + logging.info("Starting credental manager.") + + try: + secret = get_secret(args.manager, args.service, args.credential) + # return secret + print(f"Retrieved {args.credential} for {args.service}: {secret}") + except Exception as e: + _logger.error("Failed to retrieve secret: %s", e) + sys.exit(1) \ No newline at end of file diff --git a/grimoirelab_toolkit/credential_manager/hc_manager.py b/grimoirelab_toolkit/credential_manager/hc_manager.py new file mode 100644 index 0000000..1c390ab --- /dev/null +++ b/grimoirelab_toolkit/credential_manager/hc_manager.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- +# +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: +# Alberto Ferrer Sánchez (alberefe@gmail.com) +# + +import logging +import hvac +import hvac.exceptions + +_logger = logging.getLogger(__name__) + + +class HashicorpManager: + """ + A class to retrieve secrets from HashicorpVault + """ + + def __init__(self, vault_url: str, token: str, certificate: str): + """ + Initializes the client with the corresponding token to interact with the vault, so no login + is required in vault. + + Args: + vault_url (str): The vault URL. + token (str): The access token. + certificate (str): The tls certificate. + + Raises: + Exception: If couldn't inizialize the client + """ + try: + _logger.debug("Creating client and logging in.") + self.client = hvac.Client(url=vault_url, token=token, verify=certificate) + + except Exception as e: + _logger.error("An error ocurred initializing the client: %s", str(e)) + # this is dealt with in the get_secret function + raise e + + if self.client.sys.is_initialized(): + _logger.debug("Client is initialized") + else: + raise Exception("Vault client is not initialized") + + if self.client.is_authenticated(): + _logger.debug("Client is authenticated") + else: + raise Exception("Client authentication failed") + + def _retrieve_credentials(self, service_name: str) -> dict: + """ + Function responsible for retrieving credentials from vault + + Args: + service_name (str): The name of the service to retrieve credentials for + + Returns: + a dict containing all the data for that service. Includes metadata + and other information stored in the vault + + Raises: + Exception: If couldn't retrieve credentials' + """ + try: + _logger.info("Retrieving credentials from vault.") + secret = self.client.secrets.kv.read_secret(path=service_name) + return secret + except Exception as e: + _logger.error("Error retrieving the secret: %s", str(e)) + # this is dealt with in the get_secret function + raise e + + def get_secret(self, service_name: str, credential_name: str) -> str: + """ + Retrieves the value of the service + credential named. + + Args: + service_name (str): The name of the service to retrieve credentials for + credential_name (str): The name of the credential to retrieve + + Returns: + str: The value of the credential + + Raises: + Exception: If couldn't retrieve credentials' + """ + try: + credentials = self._retrieve_credentials(service_name) + # We get the exact credential from the dict returned by the retrieval + credential = credentials["data"]["data"][credential_name] + _logger.info("Credentials retrieved succesfully") + return credential + except ( + hvac.exceptions.Forbidden, + hvac.exceptions.InternalServerError, + hvac.exceptions.InvalidRequest, + hvac.exceptions.RateLimitExceeded, + hvac.exceptions.Unauthorized, + hvac.exceptions.UnsupportedOperation, + hvac.exceptions.VaultDown, + hvac.exceptions.VaultError, + ) as e: + _logger.error("There was an error retrieving the secret: %s", e) + return "" + except KeyError: + _logger.error("The credential %s was not found", credential_name) + return "" + except hvac.exceptions.InvalidPath: + _logger.error("The path %s does not exist in the vault", service_name) + return "" diff --git a/grimoirelab_toolkit/credential_manager/secrets_manager_factory.py b/grimoirelab_toolkit/credential_manager/secrets_manager_factory.py new file mode 100644 index 0000000..91c5b85 --- /dev/null +++ b/grimoirelab_toolkit/credential_manager/secrets_manager_factory.py @@ -0,0 +1,123 @@ +# -*- coding: utf-8 -*- +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: +# Alberto Ferrer Sánchez (alberefe@gmail.com) +# + +import getpass +import logging +import os + +from .aws_manager import AwsManager +from .bw_manager import BitwardenManager +from .hc_manager import HashicorpManager + +logging.basicConfig( + level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s" +) +_logger = logging.getLogger(__name__) + + +class SecretsManagerFactory: + + @staticmethod + def get_bitwarden_manager(email=None, password=None): + """ + Gets or creates a BitwardenManager instance. + + Args: + email (str, optional): Bitwarden email. If not provided, + will try environment variables or prompt. + password (str, optional): Bitwarden password. If not provided, + will try environment variables or prompt. + + Returns: + BitwardenManager: The singleton BitwardenManager instance + + Raises: + ValueError: If credentials cannot be obtained + """ + _logger.debug("Creating new Bitwarden manager") + + if email is None: + email = os.environ.get("GRIMOIRELAB_BW_EMAIL") + if password is None: + password = os.environ.get("GRIMOIRELAB_BW_PASSWORD") + + if not email or not password: + email = input("Bitwarden email: ") + password = getpass.getpass("Bitwarden master password: ") + + if not email or not password: + raise ValueError("Bitwarden credentials are required") + + return BitwardenManager(email, password) + + @staticmethod + def get_aws_manager(): + """ + Gets or creates an AwsManager instance. + + Returns: + AwsManager: The singleton AwsManager instance + """ + _logger.debug("Creating new AWS manager") + + return AwsManager() + + + @staticmethod + def get_hashicorp_manager( + vault_addr=None, token=None, certificate=None + ): + """ + Gets or creates a HashicorpManager instance. + + Args: + vault_addr (str, optional): Vault address. + + token (str, optional): Vault token. + + certificate (str, optional): Path to CA certificate. + + Returns: + HashicorpManager: The singleton HashicorpManager instance + + Raises: + ValueError: If required credentials cannot be obtained + """ + _logger.debug("Creating new Hashicorp manager") + + if vault_addr is None: + vault_addr = os.environ.get("GRIMOIRELAB_VAULT_ADDR") + if token is None: + token = os.environ.get("GRIMOIRELAB_VAULT_TOKEN") + if certificate is None: + certificate = os.environ.get("GRIMOIRELAB_VAULT_CACERT") + + if not vault_addr: + vault_addr = input("Please enter vault address: ") + if not token: + token = input("Please enter vault token: ") + if not certificate: + certificate = input( + "Please enter path to a PEM-encoded CA certificate file: " + ) + + if not all([vault_addr, token, certificate]): + raise ValueError("All Hashicorp Vault credentials are required") + + return HashicorpManager(vault_addr, token, certificate) diff --git a/grimoirelab_toolkit/credential_manager/utils.py b/grimoirelab_toolkit/credential_manager/utils.py new file mode 100644 index 0000000..ae27ae2 --- /dev/null +++ b/grimoirelab_toolkit/credential_manager/utils.py @@ -0,0 +1,29 @@ +from urllib.parse import urlparse, urlunparse + +def build_url(base_url, username, password=None, token=None): + """Build URL with credentials, preserving the original path and query. + Used to build the URLs for kibiter and elasticsearch if secrets manager is active in the config. + + Args: + base_url: Original URL from config + username: Username + password: Password + token : Token api token, access token + Returns: + str: The url formatted with the corresponding credentials if provided + """ + # If there's no username or password/token, return the original URL + if not (username and (password or token)): + return base_url + + parsed = urlparse(base_url) + # Prefer token over password if both are somehow provided + auth_password = token or password + + # Build the 'netloc' part of the URL, which includes credentials + new_netloc = f"{username}:{auth_password}@{parsed.hostname}" + if parsed.port: + new_netloc += f":{parsed.port}" + + # Reconstruct the full URL, preserving the path, query, etc. + return urlunparse(parsed._replace(netloc=new_netloc)) \ No newline at end of file diff --git a/grimoirelab_toolkit/datetime.py b/grimoirelab_toolkit/datetime_toolkit.py similarity index 100% rename from grimoirelab_toolkit/datetime.py rename to grimoirelab_toolkit/datetime_toolkit.py diff --git a/tests/test_aws_manager.py b/tests/test_aws_manager.py new file mode 100644 index 0000000..84be5ce --- /dev/null +++ b/tests/test_aws_manager.py @@ -0,0 +1,132 @@ +import pytest +from unittest.mock import patch, MagicMock +import json +from botocore.exceptions import ClientError, EndpointConnectionError, SSLError + +from grimoirelab_toolkit.credential_manager.aws_manager import AwsManager + +MOCK_SECRET_RESPONSE = { + "ARN": "arn:aws:secretsmanager:region:account:secret:test-secret-123456", + "Name": "test-secret", + "VersionId": "12345678-1234-1234-1234-123456789012", + "SecretString": '{"username": "test_user", "password": "test_pass", "api_key": "test_key"}', + "VersionStages": ["AWSCURRENT"] +} + +def test_initialization(): + """Test successful initialization""" + with patch('boto3.client') as mock_boto: + mock_boto.return_value = MagicMock() + manager = AwsManager() + mock_boto.assert_called_once_with('secretsmanager') + assert manager.client is not None + +def test_initialization_endpoint_error(): + """Test initialization failure due to endpoint error""" + with patch('boto3.client') as mock_boto: + mock_boto.side_effect = EndpointConnectionError(endpoint_url="http://example.com") + with pytest.raises(EndpointConnectionError): + AwsManager() + +def test_initialization_ssl_error(): + """Test initialization failure due to SSL error""" + with patch('boto3.client') as mock_boto: + mock_boto.side_effect = SSLError( + error="SSL Validation failed", + endpoint_url="http://example.com" + ) + with pytest.raises(SSLError): + AwsManager() + +def test_retrieve_and_format_credentials_success(): + """Test successful retrieval and formatting of credentials""" + with patch('boto3.client') as mock_boto: + mock_client = MagicMock() + mock_client.get_secret_value.return_value = MOCK_SECRET_RESPONSE + mock_boto.return_value = mock_client + + manager = AwsManager() + result = manager._retrieve_and_format_credentials("test-secret") + + assert result["username"] == "test_user" + assert result["password"] == "test_pass" + assert result["api_key"] == "test_key" + +def test_retrieve_and_format_credentials_not_found(): + """Test handling of non-existent secrets""" + with patch('boto3.client') as mock_boto: + mock_client = MagicMock() + error_response = { + 'Error': { + 'Code': 'ResourceNotFoundException', + 'Message': 'Secret not found' + } + } + + mock_client.get_secret_value.side_effect = ClientError( + error_response, 'GetSecretValue' + ) + mock_boto.return_value = mock_client + + manager = AwsManager() + + with pytest.raises(Exception): + manager._retrieve_and_format_credentials("nonexistent-secret") + +def test_retrieve_and_format_credentials_invalid_json(): + """Test handling of invalid JSON in secret value""" + with patch('boto3.client') as mock_boto: + mock_client = MagicMock() + invalid_response = MOCK_SECRET_RESPONSE.copy() + invalid_response['SecretString'] = 'invalid json' + mock_client.get_secret_value.return_value = invalid_response + mock_boto.return_value = mock_client + + manager = AwsManager() + + with pytest.raises(json.JSONDecodeError): + manager._retrieve_and_format_credentials("test-secret") + +def test_get_secret_success(): + """Test successful secret retrieval""" + with patch('boto3.client') as mock_boto: + mock_client = MagicMock() + mock_client.get_secret_value.return_value = MOCK_SECRET_RESPONSE + mock_boto.return_value = mock_client + + manager = AwsManager() + + result = manager.get_secret("test-secret", "api_key") + assert result == "test_key" + +def test_get_secret_missing_credential(): + """Test handling of non existant credential""" + with patch('boto3.client') as mock_boto: + mock_client = MagicMock() + mock_client.get_secret_value.return_value = MOCK_SECRET_RESPONSE + mock_boto.return_value = mock_client + + manager = AwsManager() + + result = manager.get_secret("test-secret", "nonexistent_credential") + assert result == "" + +def test_get_secret_service_error(): + """Test handling of AWS service errors""" + with patch('boto3.client') as mock_boto: + mock_client = MagicMock() + error_response = { + 'Error': { + 'Code': 'InternalServiceError', + 'Message': 'Internal service error' + } + } + mock_client.get_secret_value.side_effect = ClientError( + error_response, 'GetSecretValue' + ) + mock_boto.return_value = mock_client + + manager = AwsManager() + + with pytest.raises(Exception): + manager.get_secret("test-secret", "api_key") \ No newline at end of file diff --git a/tests/test_bw_manager.py b/tests/test_bw_manager.py new file mode 100644 index 0000000..20a9b77 --- /dev/null +++ b/tests/test_bw_manager.py @@ -0,0 +1,187 @@ +import unittest +import subprocess +import datetime +from datetime import timedelta +from unittest.mock import patch, MagicMock + +from grimoirelab_toolkit.credential_manager.bw_manager import BitwardenManager + + +class TestBitwardenManager(unittest.TestCase): + """BitwardenManager unit tests""" + + def setUp(self): + self.email = "test@example.com" + self.password = "test_password" + self.manager = BitwardenManager(self.email, self.password) + + def tearDown(self): + """Clean up after each test""" + self.manager = None + + def test_initialization(self): + """Test initialization of attributes""" + self.assertEqual(self.manager._email, self.email) + self.assertEqual(self.manager.session_key, None) + self.assertEqual(self.manager.last_sync_time, None) + self.assertEqual(self.manager.sync_interval, timedelta(minutes=3)) + self.assertEqual(self.manager.formatted_credentials, {}) + + @patch("subprocess.run") + def test_login_success(self, mock_run): + """Test successful login""" + + # First call checks status - user not logged in + mock_status = MagicMock() + mock_status.returncode = 0 + mock_status.stdout = '{"status": "unauthenticated"}' # User needs to log in + + # Second call simulates login, returns just the session key + mock_login = MagicMock() + mock_login.returncode = 0 + mock_login.stdout = "test_session_key" + + # Third call might be sync + mock_sync = MagicMock() + mock_sync.returncode = 0 + + mock_run.side_effect = [mock_status, mock_login, mock_sync] + + session_key = self.manager._login(self.email, self.password) + self.assertEqual(session_key, "test_session_key") + + @patch("subprocess.run") + def test_login_locked_vault(self, mock_run): + """Test login with locked vault""" + + # First call returns status check showing locked + mock_status = MagicMock() + mock_status.returncode = 0 + mock_status.stdout = '{"status": "locked", "userEmail": "test@example.com"}' + + # Second call is unlock attempt + mock_unlock = MagicMock() + mock_unlock.returncode = 0 + mock_unlock.stdout = "unlocked_session_key" + + # Third call is sync after unlock + mock_sync = MagicMock() + mock_sync.returncode = 0 + + mock_run.side_effect = [mock_status, mock_unlock, mock_sync] + + session_key = self.manager._login(self.email, self.password) + self.assertEqual(session_key, "unlocked_session_key") + + @patch("subprocess.run") + def test_login_already_logged_in(self, mock_run): + """Test login attempt when user is already logged in""" + + # First call to check status - showing user is authenticated + mock_status = MagicMock() + mock_status.returncode = 0 + mock_status.stdout = '{"status": "unlocked", "userEmail": "test@example.com"}' + + # If login is attempted, it would return the "already logged in" message + mock_login = MagicMock() + mock_login.returncode = 1 + mock_login.stderr = "You are already logged in as test@example.com." + + mock_run.side_effect = [mock_status] + + @patch("subprocess.run") + def test_login_failure(self, mock_run): + """Test login fail scenario""" + mock_run.return_value = MagicMock(returncode=1, stderr="Login failed") + + session_key = self.manager._login(self.email, self.password) + self.assertEqual(session_key, "") + + def test_validate_session_no_session(self): + """Test session validation when there's no active session""" + self.assertFalse(self.manager._validate_session()) + + @patch("subprocess.run") + def test_validate_session_valid(self, mock_run): + """Test validation of valid session""" + mock_response = MagicMock() + mock_response.returncode = 0 + mock_response.stdout = ( + '{"status": "unlocked", ' + '"userEmail": "test@example.com", ' + '"sessionKey": "test_key"}' + ) + mock_run.return_value = mock_response + + self.manager.session_key = "test_key" + self.assertTrue(self.manager._validate_session()) + + def test_should_sync_initial(self): + """Test sync decision with no previous sync""" + self.assertTrue(self.manager._should_sync()) + + def test_should_sync_recent(self): + """Test sync decision with recent sync""" + self.manager.last_sync_time = datetime.datetime.now() + self.assertFalse(self.manager._should_sync()) + + def test_should_sync_old(self): + """Test sync decision with too old sync""" + self.manager.last_sync_time = datetime.datetime.now() - timedelta(minutes=5) + self.assertTrue(self.manager._should_sync()) + + @patch("subprocess.run") + def test_sync_vault_success(self, mock_run): + """Test successful vault sync""" + mock_run.return_value = MagicMock(returncode=0) + self.manager.session_key = "test_key" + + self.manager._sync_vault() + self.assertIsNotNone(self.manager.last_sync_time) + mock_run.assert_called_once() + + @patch("subprocess.run") + def test_sync_vault_failure(self, mock_run): + """Test vault sync failure""" + mock_run.side_effect = subprocess.CalledProcessError(1, "cmd") + self.manager.session_key = "test_key" + + self.manager._sync_vault() + self.assertIsNone(self.manager.last_sync_time) + + + def test_format_credentials_complete(self): + """Test formatting credentials""" + raw_creds = { + "name": "test_service", + "login": {"username": "user", "password": "pass"}, + "fields": [{"name": "api_key", "value": "xyz"}], + } + + formatted = self.manager._format_credentials(raw_creds) + + self.assertEqual(formatted["service_name"], "test_service") + self.assertEqual(formatted["username"], "user") + self.assertEqual(formatted["password"], "pass") + self.assertEqual(formatted["api_key"], "xyz") + + def test_get_secret_success(self): + """Test successful secret retrieval""" + self.manager.formatted_credentials = { + "service_name": "test_service", + "test_credential": "test_value", + } + + result = self.manager.get_secret("test_service", "test_credential") + self.assertEqual(result, "test_value") + + def test_get_secret_missing(self): + """Test secret retrieval with non existant credential""" + self.manager.formatted_credentials = {"service_name": "test_service"} + + result = self.manager.get_secret("test_service", "missing_credential") + self.assertEqual(result, "") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_hc_manager.py b/tests/test_hc_manager.py new file mode 100644 index 0000000..905d561 --- /dev/null +++ b/tests/test_hc_manager.py @@ -0,0 +1,142 @@ +import pytest +from unittest.mock import patch +import hvac.exceptions + +from grimoirelab_toolkit.credential_manager.hc_manager import HashicorpManager + +MOCK_SECRET_RESPONSE = { + "auth": None, + "data": { + "data": {"password": "pass", "username": "user", "api_key": "test_key"}, + "metadata": { + "created_time": "2024-11-23T12:20:59.985132927Z", + "custom_metadata": None, + "deletion_time": "", + "destroyed": False, + "version": 1, + }, + }, + "lease_duration": 0, + "lease_id": "", + "mount_type": "kv", + "renewable": False, + "request_id": "d09e2bb5-00ee-576b-6078-5d291d35ccc3", + "warnings": None, + "wrap_info": None, +} + + +@pytest.fixture +def mock_hvac_client(): + with patch("hvac.Client") as mock_client: + mock_instance = mock_client.return_value + mock_instance.sys.is_initialized.return_value = True + mock_instance.is_authenticated.return_value = True + yield mock_client + + +def test_initialization(mock_hvac_client): + """Test successful initialization of HashicorpManager.""" + + manager = HashicorpManager("http://vault-url", "test-token", "test-certificate") + + assert manager.client is not None + mock_hvac_client.assert_called_once_with( + url="http://vault-url", token="test-token", verify="test-certificate" + ) + assert manager.client.sys.is_initialized() + assert manager.client.is_authenticated() + + +def test_initialization_failure(mock_hvac_client): + """Test handling of initialization failures.""" + + mock_hvac_client.side_effect = hvac.exceptions.VaultError("Connection failed") + + with pytest.raises(hvac.exceptions.VaultError) as exception_info: + HashicorpManager("http://vault-url", "test-token", "test-certificate") + assert "Connection failed" in str(exception_info.value) + + +def test_get_secret_success(mock_hvac_client): + """Test successful secret retrieval.""" + + # Mock that returns the test secret + mock_instance = mock_hvac_client.return_value + mock_instance.secrets.kv.read_secret.return_value = MOCK_SECRET_RESPONSE + + manager = HashicorpManager("http://vault-url", "test-token", "test-certificate") + result = manager.get_secret("test_service", "api_key") + + assert result == "test_key" + mock_instance.secrets.kv.read_secret.assert_called_once_with(path="test_service") + + +def test_get_secret_not_found(mock_hvac_client): + """Test handling of non existant secrets.""" + + # Mock that simulates non existant secret + mock_instance = mock_hvac_client.return_value + mock_instance.secrets.kv.read_secret.side_effect = hvac.exceptions.InvalidPath() + + manager = HashicorpManager("http://vault-url", "test-token", "test-certificate") + result = manager.get_secret("test_service", "nonexistent") + + assert result == "" + + +def test_get_secret_permission_denied(mock_hvac_client): + """Test handling of permission denied errors.""" + + # Mock to simulate permission denied + mock_instance = mock_hvac_client.return_value + mock_instance.secrets.kv.read_secret.side_effect = hvac.exceptions.Forbidden() + + manager = HashicorpManager("http://vault-url", "test-token", "test-certificate") + result = manager.get_secret("test_service", "api_key") + + assert result == "" + + +def test_retrieve_credentials_success(mock_hvac_client): + """Test successful retrieval of raw credentials.""" + + # Mock that returns the test secret + mock_instance = mock_hvac_client.return_value + mock_instance.secrets.kv.read_secret.return_value = MOCK_SECRET_RESPONSE + + manager = HashicorpManager("http://vault-url", "test-token", "test-certificate") + result = manager._retrieve_credentials("test_service") + + assert result == MOCK_SECRET_RESPONSE + mock_instance.secrets.kv.read_secret.assert_called_once_with(path="test_service") + + +def test_retrieve_credentials_failure(mock_hvac_client): + """Test handling of credential retrieval failures.""" + + # Mock simulate Vault error + mock_instance = mock_hvac_client.return_value + mock_instance.secrets.kv.read_secret.side_effect = hvac.exceptions.VaultError( + "Vault error" + ) + + manager = HashicorpManager("http://vault-url", "test-token", "test-certificate") + + with pytest.raises(hvac.exceptions.VaultError): + manager._retrieve_credentials("test_service") + + +def test_vault_connection_error(mock_hvac_client): + """Test handling of Vault connection errors.""" + + # Mock that simulates connection error + mock_instance = mock_hvac_client.return_value + mock_instance.secrets.kv.read_secret.side_effect = hvac.exceptions.VaultDown( + "Vault is sealed" + ) + + manager = HashicorpManager("http://vault-url", "test-token", "test-certificate") + result = manager.get_secret("test_service", "api_key") + + assert result == ""