From 7a14bf35685a526afc4740b15b9bb31d5e56bdc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20Ferrer=20S=C3=A1nchez?= Date: Thu, 17 Jul 2025 14:17:56 +0200 Subject: [PATCH 01/13] Add credential manager for external secret providers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement unified credential management system that supports retrieving secrets from Bitwarden, HashiCorp Vault, and AWS Secrets Manager. Key features: - Factory pattern - Command-line interface - Python API - Full test coverage The system allows secure credential management without hardcoding sensitive information in configuration files. Signed-off-by: [Your Name] <[your-email]> This is a combination of 2 commits. Added credential_manager tool and tests. Updated README.md to include credential_manager instructions,. Signed-off-by: Alberto Ferrer Sánchez --- README.md | 117 ++++++ .../credential_manager/__init__.py | 25 ++ .../credential_manager/__main__.py | 4 + .../credential_manager/aws_manager.py | 115 ++++++ .../credential_manager/bw_manager.py | 344 ++++++++++++++++++ .../credential_manager/credential_manager.py | 96 +++++ .../credential_manager/hc_manager.py | 124 +++++++ .../secrets_manager_factory.py | 123 +++++++ .../credential_manager/utils.py | 29 ++ tests/test_aws_manager.py | 132 +++++++ tests/test_bw_manager.py | 187 ++++++++++ tests/test_hc_manager.py | 142 ++++++++ 12 files changed, 1438 insertions(+) create mode 100644 grimoirelab_toolkit/credential_manager/__init__.py create mode 100644 grimoirelab_toolkit/credential_manager/__main__.py create mode 100644 grimoirelab_toolkit/credential_manager/aws_manager.py create mode 100644 grimoirelab_toolkit/credential_manager/bw_manager.py create mode 100755 grimoirelab_toolkit/credential_manager/credential_manager.py create mode 100644 grimoirelab_toolkit/credential_manager/hc_manager.py create mode 100644 grimoirelab_toolkit/credential_manager/secrets_manager_factory.py create mode 100644 grimoirelab_toolkit/credential_manager/utils.py create mode 100644 tests/test_aws_manager.py create mode 100644 tests/test_bw_manager.py create mode 100644 tests/test_hc_manager.py diff --git a/README.md b/README.md index ec2c41d..5d4c519 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,10 @@ URIs/URLs, among other topics. ## Requirements * Python >= 3.8 + * hvac >= 2.3.0 + * boto3 >= 1.35.63 + +(Hvac and boto3 are used for credential_manager) You will also need some other libraries for running the tool, you can find the whole list of dependencies in [pyproject.toml](pyproject.toml) file. @@ -59,6 +63,119 @@ To spaw a new shell within the virtual environment use: $ poetry shell ``` +## Credential Manager + +This is a module made to retrieve credentials from different secrets management systems like Bitwarden. +It accesses the secrets management service, looks for the desired credential and returns it in String form. + + +There are two ways of using this module. + + +### Terminal + +To use this, any of these two is valid: + +Command-Line Interface: + +``` +$ python -m credential_manager +``` + +Where: + +- manager → credential manager used to store the credentials (Bitwarden, aws, Hashicorp Vault) +- service → the platform to which you want to connect (github, gitlab, bugzilla). It is the name of the secret in the credential storage, it does not have to be the same as the service. +- credential → the field inside the secret that you want to retrieve (username, password, api-token) + +Examples: + +``` +$ python -m credential_manager bitwarden gmail password +$ python -m credential_manager hashicorp github api_token +$ python -m credential_manager aws production db_password +``` + +In each case, the script will log / access into the corresponding vault, search for the secret with the name of the service that wants to be accessed and then retrieve, from that secret, the value with the name inserted as credential. + +That is, in the first case, it will log into Bitwarden, access the secret called "bugzilla", and from it retrieve the value of the field "username". + +Each of the secrets management services are accessed in different forms and need different configurations to work, as specified in the [[#Managers]] section. + + +### Python API + +To use the module in your python code + + +``` +# Retrieve a secret from Bitwarden +username = get_secret("bitwarden", "bugzilla", "username") + +# Retrieve a secret from AWS Secrets Manager +api_token = get_secret("aws", "github", "api-token") + +# Retrieve a secret from HashiCorp Vault +password = get_secret("hashicorp", "gitlab", "password") +``` + +For more advaced usage, you can directly use the factory to get a specific manager: + +``` +from credential_manager.secrets_manager_factory import SecretsManagerFactory + +# Get a Bitwarden manager instance +bw_manager = SecretsManagerFactory.get_bitwarden_manager() +username = bw_manager.get_secret("bugzilla", "username") + +# Get an AWS Secrets Manager instance +aws_manager = SecretsManagerFactory.get_aws_manager() +api_token = aws_manager.get_secret("github", "api-token") + +``` + +### Supported Managers + + +This section explains the different things to consider when using each of the supported secrets management services, like where to store the credentials to access the secrets manager. + +#### AWS + +The module uses [boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html), and this looks for your credentials in the .aws folder, in the files "credentials" and "config". + +Configuration: + +- Credentials are read from the standard AWS credentials file (~/.aws/credentials) +- Region configuration is read from ~/.aws/config +- Ensure your IAM user/role has appropriate permissions to access Secrets Manager + +More about this [here](https://docs.aws.amazon.com/sdkref/latest/guide/file-location.html). + +#### Hashicorp Vault + +The module uses [hvac](https://hvac.readthedocs.io/en/stable/overview.html) to interact with Hashicorp Vault. + +The function will look for the following environment variables to get into the vault, and prompt the user for them if not found: + +- VAULT_ADDR → Address of the Vault server. +- VAULT_TOKEN → A Vault-issued service token that authenticates the CLI user to Vault. +- VAULT_CACERT → Path to a PEM-encoded CA certificate file on the local disk. Used to verify SSL certificates for the server + +If environment variables are not found, the user will be prompted to introduce the data manually. + +More info on this can be found [here](https://developer.hashicorp.com/vault/docs/commands). + +#### Bitwarden + +The module uses the [Bitwarden CLI](https://bitwarden.com/help/cli/) to interact with Bitwarden. + +Required environment variables: + +- BW_EMAIL → the email used to log into the bitwarden account +- BW_PASSWORD + +If environment variables are not found, the user will be prompted to introduce the data manually. + ## License Licensed under GNU General Public License (GPL), version 3 or later. diff --git a/grimoirelab_toolkit/credential_manager/__init__.py b/grimoirelab_toolkit/credential_manager/__init__.py new file mode 100644 index 0000000..17ddfa5 --- /dev/null +++ b/grimoirelab_toolkit/credential_manager/__init__.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: +# Alberto Ferrer Sánchez (alberefe@gmail.com) +# + +from .credential_manager import get_secret +from .secrets_manager_factory import SecretsManagerFactory + +__all__ = ['get_secret', 'SecretsManagerFactory'] \ No newline at end of file diff --git a/grimoirelab_toolkit/credential_manager/__main__.py b/grimoirelab_toolkit/credential_manager/__main__.py new file mode 100644 index 0000000..7b83158 --- /dev/null +++ b/grimoirelab_toolkit/credential_manager/__main__.py @@ -0,0 +1,4 @@ +from .credential_manager import main + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/grimoirelab_toolkit/credential_manager/aws_manager.py b/grimoirelab_toolkit/credential_manager/aws_manager.py new file mode 100644 index 0000000..870e47c --- /dev/null +++ b/grimoirelab_toolkit/credential_manager/aws_manager.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- +# +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: +# Alberto Ferrer Sánchez (alberefe@gmail.com) +# + +import logging +import json +import boto3 +from botocore.exceptions import EndpointConnectionError, SSLError, ClientError + +logging.getLogger("urllib3").setLevel(logging.WARNING) +logging.getLogger("botocore").setLevel(logging.WARNING) +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) +_logger = logging.getLogger(__name__) + + +class AwsManager: + + def __init__(self): + """ + Initializes the client that will access to the credentials management service. + + This takes the credentials to log into aws from the .aws folder. + This constructor also takes other relevant information from that folder if it exists. + + Raises: + Exception: If there's a connection error. + """ + + # Creates a client using the credentials found in the .aws folder + try: + _logger.info("Initializing client and login in") + self.client = boto3.client("secretsmanager") + + except (EndpointConnectionError, SSLError, ClientError, Exception) as e: + _logger.error("Problem starting the client: %s", e) + raise e + + def _retrieve_and_format_credentials(self, service_name: str) -> dict: + """ + Retrieves credentials using the class client. + + Args: + service_name (str): Name of the service to retrieve credentials for.(or name of the secret) + + Returns: + formatted_credentials (dict): Dictionary containing the credentials retrieved and formatted as a dict + + Raises: + Exception: If there's a connection error. + """ + try: + _logger.info("Retrieving credentials: %s", service_name) + secret_value_response = self.client.get_secret_value(SecretId=service_name) + formatted_credentials = json.loads(secret_value_response["SecretString"]) + return formatted_credentials + except (ClientError, json.JSONDecodeError) as e: + _logger.error("Error retrieving the secret: %s", str(e)) + raise e + + def get_secret(self, service_name: str, credential_name: str) -> str: + """ + Gets a secret based on the service name and the desired credential. + + Args: + service_name (str): Name of the service to retrieve credentials for + credential_name (str): Name of the credential + + Returns: + str: The credential value if found, empty string if not found + + Raises: + Exception: If there's a connection error. + """ + try: + formatted_credentials = self._retrieve_and_format_credentials(service_name) + credential = formatted_credentials[credential_name] + return credential + except KeyError: + # This handles when the credential doesn't exist in the secret + _logger.error("The secret %s:%s, was not found.", service_name, credential_name) + _logger.error( + "Please check the secret name and the credential name. For now here you have an empty string.") + return "" + except ClientError as e: + # This handles AWS-specific errors like ResourceNotFoundException + if e.response['Error']['Code'] == 'ResourceNotFoundException': + _logger.error("The secret %s:%s, was not found.", service_name, credential_name) + _logger.error(e) + _logger.error( + "Please check the secret name and the credential name. For now here you have an empty string.") + return "" + _logger.error("There was a problem getting the secret") + raise e + except Exception as e: + _logger.error("There was a problem getting the secret") + raise e diff --git a/grimoirelab_toolkit/credential_manager/bw_manager.py b/grimoirelab_toolkit/credential_manager/bw_manager.py new file mode 100644 index 0000000..0b362f6 --- /dev/null +++ b/grimoirelab_toolkit/credential_manager/bw_manager.py @@ -0,0 +1,344 @@ +# -*- coding: utf-8 -*- +# +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: +# Alberto Ferrer Sánchez (alberefe@gmail.com) +# + +import json +import subprocess +import logging +from datetime import datetime, timedelta + +logging.basicConfig( + level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s" +) +_logger = logging.getLogger(__name__) + + +class BitwardenManager: + + def __init__(self, email: str, password: str): + """ + Logs in bitwarden if not already. + + Args: + email (str): The email of the user + password (str): The password of the user + + Raises: + FileNotFoundError: If no credentials file is found + """ + # Session key of the bw session + self.session_key = None + self.formatted_credentials = {} + # store email for session validation + self._email = email + self.last_sync_time = None + self.sync_interval = timedelta(minutes=3) + + try: + self._login(email, password) + except FileNotFoundError: + _logger.error("File not found") + + def _login(self, bw_email: str, bw_password: str) -> str: + """ + Logs into Bitwarden and obtains a session key. + + Checks the current Bitwarden session status, unlocking or logging in as necessary. + If successful, synchronizes the vault. + + Args: + bw_email (str): Bitwarden account email. + bw_password (str): Bitwarden account password. + + Returns: + str: The session key for the current Bitwarden session. + + Raises: + Exception: If unlocking or logging into Bitwarden fails. + """ + try: + # If we have a session key, check if sync is needed + if self.session_key and self._validate_session(): + if self._should_sync(): + self._sync_vault() + return self.session_key + + _logger.info("Checking Bitwarden login status") + status_result = subprocess.run( + ["/snap/bin/bw", "status"], capture_output=True, text=True, check=False + ) + + if status_result.returncode == 0: + _logger.info("Checking vault status") + status = json.loads(status_result.stdout) + + if status.get("userEmail") == bw_email: + _logger.info("User was already authenticated: %s", bw_email) + + if status.get("status") == "unlocked": + _logger.info("Vault unlocked, getting session key") + self.session_key = status.get("sessionKey") + + elif status.get("status") == "locked": + _logger.info("Vault locked, unlocking") + unlock_result = subprocess.run( + ["/snap/bin/bw", "unlock", bw_password, "--raw"], + capture_output=True, + text=True, + check=False, + ) + + if unlock_result.returncode != 0: + error_msg = unlock_result.stderr.strip() if unlock_result.stderr else "Unknown error" + _logger.error("Error unlocking vault: %s", error_msg) + # Handle specific authentication errors + if "invalid_grant" in error_msg.lower(): + _logger.error("Invalid credentials provided for Bitwarden") + return "" + + session_key = unlock_result.stdout.strip() if unlock_result.stdout else "" + if not session_key: + _logger.error("Empty session key received from unlock command") + return "" + self.session_key = session_key + + if not self.session_key: + _logger.info("Couldn't obtain session key during login") + return "" + + else: + _logger.info("Login in: %s", bw_email) + result = subprocess.run( + ["/snap/bin/bw", "login", bw_email, bw_password, "--raw"], + capture_output=True, + text=True, + check=False, + ) + + if result.returncode != 0: + error_msg = result.stderr.strip() if result.stderr else "Unknown error" + _logger.error("Error logging in: %s", error_msg) + # Handle specific authentication errors + if "invalid_grant" in error_msg.lower(): + _logger.error("Invalid credentials provided for Bitwarden") + return "" + + _logger.info("Setting session key") + session_key = result.stdout.strip() if result.stdout else "" + if not session_key: + _logger.error("Empty session key received from login command") + return "" + self.session_key = session_key + + if self.session_key: + # Only sync if needed based on time interval + if self._should_sync(): + _logger.info("Syncing local vault with Bitwarden") + sync_result = subprocess.run( + ["/snap/bin/bw", "sync", "--session", self.session_key], + capture_output=True, + text=True, + check=False, + ) + if sync_result.returncode == 0: + self.last_sync_time = datetime.now() + else: + error_msg = sync_result.stderr.strip() if sync_result.stderr else "Unknown sync error" + _logger.warning("Sync failed but continuing: %s", error_msg) + return self.session_key + + _logger.info("Session key not found cause could not log in") + return "" + + except Exception as e: + _logger.error("There was a problem login in: %s", e) + raise e + + def _validate_session(self) -> bool: + """Checks current session.""" + try: + status_result = subprocess.run( + ["/snap/bin/bw", "status"], capture_output=True, text=True, check=False + ) + + if status_result.returncode != 0: + return False + + status = json.loads(status_result.stdout) + return ( + status.get("status") == "unlocked" + and status.get("userEmail") == self._email + and status.get("sessionKey") == self.session_key + ) + except: + return False + + def _should_sync(self) -> bool: + """Determines if vault sync is needed based on last sync time.""" + return ( + not self.last_sync_time + or datetime.now() - self.last_sync_time > self.sync_interval + ) + + def _sync_vault(self) -> None: + """Syncs the vault and updates last sync time.""" + try: + _logger.info("Syncing vault") + subprocess.run( + ["/snap/bin/bw", "sync", "--session", self.session_key], check=True + ) + self.last_sync_time = datetime.now() + except subprocess.CalledProcessError as e: + _logger.error("Sync failed: %s", e) + + def _retrieve_credentials(self, service_name: str) -> dict: + """ + Retrieves a secret from a particular service from the Bitwarden vault. + + Args: + service_name (str): The name of the data source for which to retrieve the secret. + + Returns: + dict: The secret item retrieved from Bitwarden as a dictionary. + + Raises: + Exception: If retrieval of the secret fails. + """ + try: + _logger.info("Retrieving credential from Bitwarden CLI: %s", service_name) + + bw_path = "/snap/bin/bw" + + _logger.debug("Session key = %s", self.session_key or "None") + + # Check if session key is available + if not self.session_key: + _logger.error("No valid session key available") + return {} + + # Get list of items + list_items = subprocess.run( + [bw_path, "list", "items", "--session", self.session_key], + capture_output=True, + text=True, + check=False, + ) + + + if list_items.returncode != 0: + _logger.error(f"Failed to list items: {list_items.stderr}") + return {} + + + items = json.loads(list_items.stdout) + + # Find exact match + match_item = None + for item in items: + if item.get("name","").lower() == service_name.lower(): + match_item = item + break + + if not match_item: + if not match_item: + _logger.error(f"No exact match found for item: {service_name}") + return {} + + # Retrieve exact match by id + item_id = match_item.get("id") + result = subprocess.run( + [bw_path, "get", "item", item_id, "--session", self.session_key], + capture_output=True, + text=True, + check=False, + ) + + + + if result.returncode != 0: + _logger.error("Failed to retrieve secret: %s", result.stderr) + return {} + + retrieved_secrets = json.loads(result.stdout) + _logger.info("Secrets successfully retrieved") + return retrieved_secrets + + except Exception as e: + _logger.error("There was a problem retrieving secret: %s", e) + raise e + + def _format_credentials(self, credentials: dict) -> dict: + """ + Formats the credentials retrieved from Bitwarden into a standardized format. + + Args: + credentials (dict): Raw credentials from Bitwarden + + Returns: + dict: Formatted credentials with standardized keys + """ + formatted = {"service_name": credentials["name"].lower()} + + # Get username and password from login section + login = credentials.get("login", {}) + if login.get("username"): + formatted["username"] = login["username"] + if login.get("password"): + formatted["password"] = login["password"] + + # Get custom fields + for field in credentials.get("fields", []): + formatted[field["name"]] = field["value"] + + return formatted + + def get_secret(self, service_name: str, credential_name: str) -> str: + """ + Retrieves a secret by name from the Bitwarden vault. + + Args: + service_name (str): The name of the secret to retrieve. + credential_name (str): The concrete credential to retrieve. + + Returns: + str: The secret value retrieved. + """ + + # If stored credentials are not available or belong to a different service + if ( + not self.formatted_credentials + or self.formatted_credentials.get("service_name") != service_name + ): + unformatted_credentials = self._retrieve_credentials(service_name) + self.formatted_credentials = self._format_credentials( + unformatted_credentials + ) + + secret = self.formatted_credentials.get(credential_name) + # in case nothing was found + if not secret: + _logger.error( + "The credential %s:%s, was not found.", service_name, credential_name + ) + _logger.error("In the meantime here you got an empty string") + return "" + else: + # Return the requested credential + return secret diff --git a/grimoirelab_toolkit/credential_manager/credential_manager.py b/grimoirelab_toolkit/credential_manager/credential_manager.py new file mode 100755 index 0000000..dbcf62f --- /dev/null +++ b/grimoirelab_toolkit/credential_manager/credential_manager.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: +# Alberto Ferrer Sánchez (alberefe@gmail.com) +# + +import argparse +import logging +import sys + +from .secrets_manager_factory import SecretsManagerFactory + +logging.basicConfig( + level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s" +) +_logger = logging.getLogger(__name__) + + +def get_secret( + secrets_manager_name: str, service_name: str, credential_name: str +) -> str: + """ + Retrieve a secret from the secrets manager. + + Args: + secrets_manager_name (str): The name of the secrets manager to be used + service_name (str): The name of the service we want to access + credential_name (str): The name of the credential we want to retrieve + + Returns: + str: The credential retrieved + + Raises: + ValueError: If the secrets manager is not supported or initialization fails + """ + try: + if secrets_manager_name == "bitwarden": + manager = SecretsManagerFactory.get_bitwarden_manager() + return manager.get_secret(service_name, credential_name) + + elif secrets_manager_name == "hashicorp": + manager = SecretsManagerFactory.get_hashicorp_manager() + return manager.get_secret(service_name, credential_name) + + elif secrets_manager_name == "aws": + manager = SecretsManagerFactory.get_aws_manager() + return manager.get_secret(service_name, credential_name) + + else: + raise ValueError(f"Unsupported secrets manager: {secrets_manager_name}") + + except Exception as e: + _logger.error("Error retrieving secret: %s", e) + raise + + +def main(): + """ + Main entry point for the command line interface. + Parses arguments and retrieves secrets using the appropriate manager. + """ + parser = argparse.ArgumentParser( + description="Retrieve a secret from a specified secrets manager." + ) + parser.add_argument( + "manager", + choices=["bitwarden", "hashicorp", "aws"], + help="The name of the secrets manager to use.", + ) + parser.add_argument( + "service", help="The name of the service for which to retrieve the credential." + ) + parser.add_argument("credential", help="The name of the credential to retrieve.") + + args = parser.parse_args() + + try: + secret = get_secret(args.manager, args.service, args.credential) + print(f"Retrieved {args.credential} for {args.service}: {secret}") + except Exception as e: + _logger.error("Failed to retrieve secret: %s", e) + sys.exit(1) \ No newline at end of file diff --git a/grimoirelab_toolkit/credential_manager/hc_manager.py b/grimoirelab_toolkit/credential_manager/hc_manager.py new file mode 100644 index 0000000..7e835e4 --- /dev/null +++ b/grimoirelab_toolkit/credential_manager/hc_manager.py @@ -0,0 +1,124 @@ +# -*- coding: utf-8 -*- +# +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: +# Alberto Ferrer Sánchez (alberefe@gmail.com) +# + +import logging +import hvac +import hvac.exceptions + +logging.basicConfig( + level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s" +) +_logger = logging.getLogger(__name__) + + +class HashicorpManager: + """ + A class to retrieve secrets from HashicorpVault + """ + + def __init__(self, vault_url: str, token: str, certificate: str): + """ + Initializes the client with the corresponding token to interact with the vault, so no login + is required in vault. + + Args: + vault_url (str): The vault URL. + token (str): The access token. + certificate (str): The tls certificate. + + Raises: + Exception: If couldn't inizialize the client + """ + try: + _logger.info("Creating client and logging in.") + self.client = hvac.Client(url=vault_url, token=token, verify=certificate) + + except Exception as e: + _logger.error("An error ocurred initializing the client: %s", str(e)) + # this is dealt with in the get_secret function + raise e + + if self.client.sys.is_initialized(): + _logger.info("Client is initialized") + + if self.client.is_authenticated(): + _logger.info("Client is authenticated") + + def _retrieve_credentials(self, service_name: str) -> dict: + """ + Function responsible for retrieving credentials from vault + + Args: + service_name (str): The name of the service to retrieve credentials for + + Returns: + a dict containing all the data for that service. Includes metadata + and other information stored in the vault + + Raises: + Exception: If couldn't retrieve credentials' + """ + try: + _logger.info("Retrieving credentials from vault.") + secret = self.client.secrets.kv.read_secret(path=service_name) + return secret + except Exception as e: + _logger.error("Error retrieving the secret: %s", str(e)) + # this is dealt with in the get_secret function + raise e + + def get_secret(self, service_name: str, credential_name: str) -> str: + """ + Retrieves the value of the service + credential named. + + Args: + service_name (str): The name of the service to retrieve credentials for + credential_name (str): The name of the credential to retrieve + + Returns: + str: The value of the credential + + Raises: + Exception: If couldn't retrieve credentials' + """ + try: + credentials = self._retrieve_credentials(service_name) + # We get the exact credential from the dict returned by the retrieval + credential = credentials["data"]["data"][credential_name] + return credential + except ( + hvac.exceptions.Forbidden, + hvac.exceptions.InternalServerError, + hvac.exceptions.InvalidRequest, + hvac.exceptions.RateLimitExceeded, + hvac.exceptions.Unauthorized, + hvac.exceptions.UnsupportedOperation, + hvac.exceptions.VaultDown, + hvac.exceptions.VaultError, + ) as e: + _logger.error("There was an error retrieving the secret: %s", e) + return "" + except KeyError: + _logger.error("The credential %s was not found", credential_name) + return "" + except hvac.exceptions.InvalidPath: + _logger.error("The path %s does not exist in the vault", service_name) + return "" diff --git a/grimoirelab_toolkit/credential_manager/secrets_manager_factory.py b/grimoirelab_toolkit/credential_manager/secrets_manager_factory.py new file mode 100644 index 0000000..91c5b85 --- /dev/null +++ b/grimoirelab_toolkit/credential_manager/secrets_manager_factory.py @@ -0,0 +1,123 @@ +# -*- coding: utf-8 -*- +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: +# Alberto Ferrer Sánchez (alberefe@gmail.com) +# + +import getpass +import logging +import os + +from .aws_manager import AwsManager +from .bw_manager import BitwardenManager +from .hc_manager import HashicorpManager + +logging.basicConfig( + level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s" +) +_logger = logging.getLogger(__name__) + + +class SecretsManagerFactory: + + @staticmethod + def get_bitwarden_manager(email=None, password=None): + """ + Gets or creates a BitwardenManager instance. + + Args: + email (str, optional): Bitwarden email. If not provided, + will try environment variables or prompt. + password (str, optional): Bitwarden password. If not provided, + will try environment variables or prompt. + + Returns: + BitwardenManager: The singleton BitwardenManager instance + + Raises: + ValueError: If credentials cannot be obtained + """ + _logger.debug("Creating new Bitwarden manager") + + if email is None: + email = os.environ.get("GRIMOIRELAB_BW_EMAIL") + if password is None: + password = os.environ.get("GRIMOIRELAB_BW_PASSWORD") + + if not email or not password: + email = input("Bitwarden email: ") + password = getpass.getpass("Bitwarden master password: ") + + if not email or not password: + raise ValueError("Bitwarden credentials are required") + + return BitwardenManager(email, password) + + @staticmethod + def get_aws_manager(): + """ + Gets or creates an AwsManager instance. + + Returns: + AwsManager: The singleton AwsManager instance + """ + _logger.debug("Creating new AWS manager") + + return AwsManager() + + + @staticmethod + def get_hashicorp_manager( + vault_addr=None, token=None, certificate=None + ): + """ + Gets or creates a HashicorpManager instance. + + Args: + vault_addr (str, optional): Vault address. + + token (str, optional): Vault token. + + certificate (str, optional): Path to CA certificate. + + Returns: + HashicorpManager: The singleton HashicorpManager instance + + Raises: + ValueError: If required credentials cannot be obtained + """ + _logger.debug("Creating new Hashicorp manager") + + if vault_addr is None: + vault_addr = os.environ.get("GRIMOIRELAB_VAULT_ADDR") + if token is None: + token = os.environ.get("GRIMOIRELAB_VAULT_TOKEN") + if certificate is None: + certificate = os.environ.get("GRIMOIRELAB_VAULT_CACERT") + + if not vault_addr: + vault_addr = input("Please enter vault address: ") + if not token: + token = input("Please enter vault token: ") + if not certificate: + certificate = input( + "Please enter path to a PEM-encoded CA certificate file: " + ) + + if not all([vault_addr, token, certificate]): + raise ValueError("All Hashicorp Vault credentials are required") + + return HashicorpManager(vault_addr, token, certificate) diff --git a/grimoirelab_toolkit/credential_manager/utils.py b/grimoirelab_toolkit/credential_manager/utils.py new file mode 100644 index 0000000..ae27ae2 --- /dev/null +++ b/grimoirelab_toolkit/credential_manager/utils.py @@ -0,0 +1,29 @@ +from urllib.parse import urlparse, urlunparse + +def build_url(base_url, username, password=None, token=None): + """Build URL with credentials, preserving the original path and query. + Used to build the URLs for kibiter and elasticsearch if secrets manager is active in the config. + + Args: + base_url: Original URL from config + username: Username + password: Password + token : Token api token, access token + Returns: + str: The url formatted with the corresponding credentials if provided + """ + # If there's no username or password/token, return the original URL + if not (username and (password or token)): + return base_url + + parsed = urlparse(base_url) + # Prefer token over password if both are somehow provided + auth_password = token or password + + # Build the 'netloc' part of the URL, which includes credentials + new_netloc = f"{username}:{auth_password}@{parsed.hostname}" + if parsed.port: + new_netloc += f":{parsed.port}" + + # Reconstruct the full URL, preserving the path, query, etc. + return urlunparse(parsed._replace(netloc=new_netloc)) \ No newline at end of file diff --git a/tests/test_aws_manager.py b/tests/test_aws_manager.py new file mode 100644 index 0000000..84be5ce --- /dev/null +++ b/tests/test_aws_manager.py @@ -0,0 +1,132 @@ +import pytest +from unittest.mock import patch, MagicMock +import json +from botocore.exceptions import ClientError, EndpointConnectionError, SSLError + +from grimoirelab_toolkit.credential_manager.aws_manager import AwsManager + +MOCK_SECRET_RESPONSE = { + "ARN": "arn:aws:secretsmanager:region:account:secret:test-secret-123456", + "Name": "test-secret", + "VersionId": "12345678-1234-1234-1234-123456789012", + "SecretString": '{"username": "test_user", "password": "test_pass", "api_key": "test_key"}', + "VersionStages": ["AWSCURRENT"] +} + +def test_initialization(): + """Test successful initialization""" + with patch('boto3.client') as mock_boto: + mock_boto.return_value = MagicMock() + manager = AwsManager() + mock_boto.assert_called_once_with('secretsmanager') + assert manager.client is not None + +def test_initialization_endpoint_error(): + """Test initialization failure due to endpoint error""" + with patch('boto3.client') as mock_boto: + mock_boto.side_effect = EndpointConnectionError(endpoint_url="http://example.com") + with pytest.raises(EndpointConnectionError): + AwsManager() + +def test_initialization_ssl_error(): + """Test initialization failure due to SSL error""" + with patch('boto3.client') as mock_boto: + mock_boto.side_effect = SSLError( + error="SSL Validation failed", + endpoint_url="http://example.com" + ) + with pytest.raises(SSLError): + AwsManager() + +def test_retrieve_and_format_credentials_success(): + """Test successful retrieval and formatting of credentials""" + with patch('boto3.client') as mock_boto: + mock_client = MagicMock() + mock_client.get_secret_value.return_value = MOCK_SECRET_RESPONSE + mock_boto.return_value = mock_client + + manager = AwsManager() + result = manager._retrieve_and_format_credentials("test-secret") + + assert result["username"] == "test_user" + assert result["password"] == "test_pass" + assert result["api_key"] == "test_key" + +def test_retrieve_and_format_credentials_not_found(): + """Test handling of non-existent secrets""" + with patch('boto3.client') as mock_boto: + mock_client = MagicMock() + error_response = { + 'Error': { + 'Code': 'ResourceNotFoundException', + 'Message': 'Secret not found' + } + } + + mock_client.get_secret_value.side_effect = ClientError( + error_response, 'GetSecretValue' + ) + mock_boto.return_value = mock_client + + manager = AwsManager() + + with pytest.raises(Exception): + manager._retrieve_and_format_credentials("nonexistent-secret") + +def test_retrieve_and_format_credentials_invalid_json(): + """Test handling of invalid JSON in secret value""" + with patch('boto3.client') as mock_boto: + mock_client = MagicMock() + invalid_response = MOCK_SECRET_RESPONSE.copy() + invalid_response['SecretString'] = 'invalid json' + mock_client.get_secret_value.return_value = invalid_response + mock_boto.return_value = mock_client + + manager = AwsManager() + + with pytest.raises(json.JSONDecodeError): + manager._retrieve_and_format_credentials("test-secret") + +def test_get_secret_success(): + """Test successful secret retrieval""" + with patch('boto3.client') as mock_boto: + mock_client = MagicMock() + mock_client.get_secret_value.return_value = MOCK_SECRET_RESPONSE + mock_boto.return_value = mock_client + + manager = AwsManager() + + result = manager.get_secret("test-secret", "api_key") + assert result == "test_key" + +def test_get_secret_missing_credential(): + """Test handling of non existant credential""" + with patch('boto3.client') as mock_boto: + mock_client = MagicMock() + mock_client.get_secret_value.return_value = MOCK_SECRET_RESPONSE + mock_boto.return_value = mock_client + + manager = AwsManager() + + result = manager.get_secret("test-secret", "nonexistent_credential") + assert result == "" + +def test_get_secret_service_error(): + """Test handling of AWS service errors""" + with patch('boto3.client') as mock_boto: + mock_client = MagicMock() + error_response = { + 'Error': { + 'Code': 'InternalServiceError', + 'Message': 'Internal service error' + } + } + mock_client.get_secret_value.side_effect = ClientError( + error_response, 'GetSecretValue' + ) + mock_boto.return_value = mock_client + + manager = AwsManager() + + with pytest.raises(Exception): + manager.get_secret("test-secret", "api_key") \ No newline at end of file diff --git a/tests/test_bw_manager.py b/tests/test_bw_manager.py new file mode 100644 index 0000000..20a9b77 --- /dev/null +++ b/tests/test_bw_manager.py @@ -0,0 +1,187 @@ +import unittest +import subprocess +import datetime +from datetime import timedelta +from unittest.mock import patch, MagicMock + +from grimoirelab_toolkit.credential_manager.bw_manager import BitwardenManager + + +class TestBitwardenManager(unittest.TestCase): + """BitwardenManager unit tests""" + + def setUp(self): + self.email = "test@example.com" + self.password = "test_password" + self.manager = BitwardenManager(self.email, self.password) + + def tearDown(self): + """Clean up after each test""" + self.manager = None + + def test_initialization(self): + """Test initialization of attributes""" + self.assertEqual(self.manager._email, self.email) + self.assertEqual(self.manager.session_key, None) + self.assertEqual(self.manager.last_sync_time, None) + self.assertEqual(self.manager.sync_interval, timedelta(minutes=3)) + self.assertEqual(self.manager.formatted_credentials, {}) + + @patch("subprocess.run") + def test_login_success(self, mock_run): + """Test successful login""" + + # First call checks status - user not logged in + mock_status = MagicMock() + mock_status.returncode = 0 + mock_status.stdout = '{"status": "unauthenticated"}' # User needs to log in + + # Second call simulates login, returns just the session key + mock_login = MagicMock() + mock_login.returncode = 0 + mock_login.stdout = "test_session_key" + + # Third call might be sync + mock_sync = MagicMock() + mock_sync.returncode = 0 + + mock_run.side_effect = [mock_status, mock_login, mock_sync] + + session_key = self.manager._login(self.email, self.password) + self.assertEqual(session_key, "test_session_key") + + @patch("subprocess.run") + def test_login_locked_vault(self, mock_run): + """Test login with locked vault""" + + # First call returns status check showing locked + mock_status = MagicMock() + mock_status.returncode = 0 + mock_status.stdout = '{"status": "locked", "userEmail": "test@example.com"}' + + # Second call is unlock attempt + mock_unlock = MagicMock() + mock_unlock.returncode = 0 + mock_unlock.stdout = "unlocked_session_key" + + # Third call is sync after unlock + mock_sync = MagicMock() + mock_sync.returncode = 0 + + mock_run.side_effect = [mock_status, mock_unlock, mock_sync] + + session_key = self.manager._login(self.email, self.password) + self.assertEqual(session_key, "unlocked_session_key") + + @patch("subprocess.run") + def test_login_already_logged_in(self, mock_run): + """Test login attempt when user is already logged in""" + + # First call to check status - showing user is authenticated + mock_status = MagicMock() + mock_status.returncode = 0 + mock_status.stdout = '{"status": "unlocked", "userEmail": "test@example.com"}' + + # If login is attempted, it would return the "already logged in" message + mock_login = MagicMock() + mock_login.returncode = 1 + mock_login.stderr = "You are already logged in as test@example.com." + + mock_run.side_effect = [mock_status] + + @patch("subprocess.run") + def test_login_failure(self, mock_run): + """Test login fail scenario""" + mock_run.return_value = MagicMock(returncode=1, stderr="Login failed") + + session_key = self.manager._login(self.email, self.password) + self.assertEqual(session_key, "") + + def test_validate_session_no_session(self): + """Test session validation when there's no active session""" + self.assertFalse(self.manager._validate_session()) + + @patch("subprocess.run") + def test_validate_session_valid(self, mock_run): + """Test validation of valid session""" + mock_response = MagicMock() + mock_response.returncode = 0 + mock_response.stdout = ( + '{"status": "unlocked", ' + '"userEmail": "test@example.com", ' + '"sessionKey": "test_key"}' + ) + mock_run.return_value = mock_response + + self.manager.session_key = "test_key" + self.assertTrue(self.manager._validate_session()) + + def test_should_sync_initial(self): + """Test sync decision with no previous sync""" + self.assertTrue(self.manager._should_sync()) + + def test_should_sync_recent(self): + """Test sync decision with recent sync""" + self.manager.last_sync_time = datetime.datetime.now() + self.assertFalse(self.manager._should_sync()) + + def test_should_sync_old(self): + """Test sync decision with too old sync""" + self.manager.last_sync_time = datetime.datetime.now() - timedelta(minutes=5) + self.assertTrue(self.manager._should_sync()) + + @patch("subprocess.run") + def test_sync_vault_success(self, mock_run): + """Test successful vault sync""" + mock_run.return_value = MagicMock(returncode=0) + self.manager.session_key = "test_key" + + self.manager._sync_vault() + self.assertIsNotNone(self.manager.last_sync_time) + mock_run.assert_called_once() + + @patch("subprocess.run") + def test_sync_vault_failure(self, mock_run): + """Test vault sync failure""" + mock_run.side_effect = subprocess.CalledProcessError(1, "cmd") + self.manager.session_key = "test_key" + + self.manager._sync_vault() + self.assertIsNone(self.manager.last_sync_time) + + + def test_format_credentials_complete(self): + """Test formatting credentials""" + raw_creds = { + "name": "test_service", + "login": {"username": "user", "password": "pass"}, + "fields": [{"name": "api_key", "value": "xyz"}], + } + + formatted = self.manager._format_credentials(raw_creds) + + self.assertEqual(formatted["service_name"], "test_service") + self.assertEqual(formatted["username"], "user") + self.assertEqual(formatted["password"], "pass") + self.assertEqual(formatted["api_key"], "xyz") + + def test_get_secret_success(self): + """Test successful secret retrieval""" + self.manager.formatted_credentials = { + "service_name": "test_service", + "test_credential": "test_value", + } + + result = self.manager.get_secret("test_service", "test_credential") + self.assertEqual(result, "test_value") + + def test_get_secret_missing(self): + """Test secret retrieval with non existant credential""" + self.manager.formatted_credentials = {"service_name": "test_service"} + + result = self.manager.get_secret("test_service", "missing_credential") + self.assertEqual(result, "") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_hc_manager.py b/tests/test_hc_manager.py new file mode 100644 index 0000000..905d561 --- /dev/null +++ b/tests/test_hc_manager.py @@ -0,0 +1,142 @@ +import pytest +from unittest.mock import patch +import hvac.exceptions + +from grimoirelab_toolkit.credential_manager.hc_manager import HashicorpManager + +MOCK_SECRET_RESPONSE = { + "auth": None, + "data": { + "data": {"password": "pass", "username": "user", "api_key": "test_key"}, + "metadata": { + "created_time": "2024-11-23T12:20:59.985132927Z", + "custom_metadata": None, + "deletion_time": "", + "destroyed": False, + "version": 1, + }, + }, + "lease_duration": 0, + "lease_id": "", + "mount_type": "kv", + "renewable": False, + "request_id": "d09e2bb5-00ee-576b-6078-5d291d35ccc3", + "warnings": None, + "wrap_info": None, +} + + +@pytest.fixture +def mock_hvac_client(): + with patch("hvac.Client") as mock_client: + mock_instance = mock_client.return_value + mock_instance.sys.is_initialized.return_value = True + mock_instance.is_authenticated.return_value = True + yield mock_client + + +def test_initialization(mock_hvac_client): + """Test successful initialization of HashicorpManager.""" + + manager = HashicorpManager("http://vault-url", "test-token", "test-certificate") + + assert manager.client is not None + mock_hvac_client.assert_called_once_with( + url="http://vault-url", token="test-token", verify="test-certificate" + ) + assert manager.client.sys.is_initialized() + assert manager.client.is_authenticated() + + +def test_initialization_failure(mock_hvac_client): + """Test handling of initialization failures.""" + + mock_hvac_client.side_effect = hvac.exceptions.VaultError("Connection failed") + + with pytest.raises(hvac.exceptions.VaultError) as exception_info: + HashicorpManager("http://vault-url", "test-token", "test-certificate") + assert "Connection failed" in str(exception_info.value) + + +def test_get_secret_success(mock_hvac_client): + """Test successful secret retrieval.""" + + # Mock that returns the test secret + mock_instance = mock_hvac_client.return_value + mock_instance.secrets.kv.read_secret.return_value = MOCK_SECRET_RESPONSE + + manager = HashicorpManager("http://vault-url", "test-token", "test-certificate") + result = manager.get_secret("test_service", "api_key") + + assert result == "test_key" + mock_instance.secrets.kv.read_secret.assert_called_once_with(path="test_service") + + +def test_get_secret_not_found(mock_hvac_client): + """Test handling of non existant secrets.""" + + # Mock that simulates non existant secret + mock_instance = mock_hvac_client.return_value + mock_instance.secrets.kv.read_secret.side_effect = hvac.exceptions.InvalidPath() + + manager = HashicorpManager("http://vault-url", "test-token", "test-certificate") + result = manager.get_secret("test_service", "nonexistent") + + assert result == "" + + +def test_get_secret_permission_denied(mock_hvac_client): + """Test handling of permission denied errors.""" + + # Mock to simulate permission denied + mock_instance = mock_hvac_client.return_value + mock_instance.secrets.kv.read_secret.side_effect = hvac.exceptions.Forbidden() + + manager = HashicorpManager("http://vault-url", "test-token", "test-certificate") + result = manager.get_secret("test_service", "api_key") + + assert result == "" + + +def test_retrieve_credentials_success(mock_hvac_client): + """Test successful retrieval of raw credentials.""" + + # Mock that returns the test secret + mock_instance = mock_hvac_client.return_value + mock_instance.secrets.kv.read_secret.return_value = MOCK_SECRET_RESPONSE + + manager = HashicorpManager("http://vault-url", "test-token", "test-certificate") + result = manager._retrieve_credentials("test_service") + + assert result == MOCK_SECRET_RESPONSE + mock_instance.secrets.kv.read_secret.assert_called_once_with(path="test_service") + + +def test_retrieve_credentials_failure(mock_hvac_client): + """Test handling of credential retrieval failures.""" + + # Mock simulate Vault error + mock_instance = mock_hvac_client.return_value + mock_instance.secrets.kv.read_secret.side_effect = hvac.exceptions.VaultError( + "Vault error" + ) + + manager = HashicorpManager("http://vault-url", "test-token", "test-certificate") + + with pytest.raises(hvac.exceptions.VaultError): + manager._retrieve_credentials("test_service") + + +def test_vault_connection_error(mock_hvac_client): + """Test handling of Vault connection errors.""" + + # Mock that simulates connection error + mock_instance = mock_hvac_client.return_value + mock_instance.secrets.kv.read_secret.side_effect = hvac.exceptions.VaultDown( + "Vault is sealed" + ) + + manager = HashicorpManager("http://vault-url", "test-token", "test-certificate") + result = manager.get_secret("test_service", "api_key") + + assert result == "" From 90e439d2b63e96286795016e5014c46d462137b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20Ferrer=20S=C3=A1nchez?= Date: Mon, 15 Sep 2025 18:59:07 +0200 Subject: [PATCH 02/13] Updated logging bw_manager.py --- .../credential_manager/bw_manager.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/grimoirelab_toolkit/credential_manager/bw_manager.py b/grimoirelab_toolkit/credential_manager/bw_manager.py index 0b362f6..9853c70 100644 --- a/grimoirelab_toolkit/credential_manager/bw_manager.py +++ b/grimoirelab_toolkit/credential_manager/bw_manager.py @@ -80,24 +80,24 @@ def _login(self, bw_email: str, bw_password: str) -> str: self._sync_vault() return self.session_key - _logger.info("Checking Bitwarden login status") + _logger.debug("Checking Bitwarden login status") status_result = subprocess.run( ["/snap/bin/bw", "status"], capture_output=True, text=True, check=False ) if status_result.returncode == 0: - _logger.info("Checking vault status") + _logger.debug("Checking vault status") status = json.loads(status_result.stdout) if status.get("userEmail") == bw_email: - _logger.info("User was already authenticated: %s", bw_email) + _logger.debug("User was already authenticated: %s", bw_email) if status.get("status") == "unlocked": - _logger.info("Vault unlocked, getting session key") + _logger.debug("Vault unlocked, getting session key") self.session_key = status.get("sessionKey") elif status.get("status") == "locked": - _logger.info("Vault locked, unlocking") + _logger.debug("Vault locked, unlocking") unlock_result = subprocess.run( ["/snap/bin/bw", "unlock", bw_password, "--raw"], capture_output=True, @@ -120,11 +120,11 @@ def _login(self, bw_email: str, bw_password: str) -> str: self.session_key = session_key if not self.session_key: - _logger.info("Couldn't obtain session key during login") + _logger.debug("Couldn't obtain session key during login") return "" else: - _logger.info("Login in: %s", bw_email) + _logger.debug("Login in: %s", bw_email) result = subprocess.run( ["/snap/bin/bw", "login", bw_email, bw_password, "--raw"], capture_output=True, @@ -140,7 +140,7 @@ def _login(self, bw_email: str, bw_password: str) -> str: _logger.error("Invalid credentials provided for Bitwarden") return "" - _logger.info("Setting session key") + _logger.debug("Setting session key") session_key = result.stdout.strip() if result.stdout else "" if not session_key: _logger.error("Empty session key received from login command") @@ -150,7 +150,7 @@ def _login(self, bw_email: str, bw_password: str) -> str: if self.session_key: # Only sync if needed based on time interval if self._should_sync(): - _logger.info("Syncing local vault with Bitwarden") + _logger.debug("Syncing local vault with Bitwarden") sync_result = subprocess.run( ["/snap/bin/bw", "sync", "--session", self.session_key], capture_output=True, @@ -161,10 +161,10 @@ def _login(self, bw_email: str, bw_password: str) -> str: self.last_sync_time = datetime.now() else: error_msg = sync_result.stderr.strip() if sync_result.stderr else "Unknown sync error" - _logger.warning("Sync failed but continuing: %s", error_msg) + _logger.debug("Sync failed but continuing: %s", error_msg) return self.session_key - _logger.info("Session key not found cause could not log in") + _logger.debug("Session key not found cause could not log in") return "" except Exception as e: @@ -200,7 +200,7 @@ def _should_sync(self) -> bool: def _sync_vault(self) -> None: """Syncs the vault and updates last sync time.""" try: - _logger.info("Syncing vault") + _logger.debug("Syncing vault") subprocess.run( ["/snap/bin/bw", "sync", "--session", self.session_key], check=True ) From a6116323e0f6c3dee801a292f506e0f77459a3a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20Ferrer=20S=C3=A1nchez?= Date: Mon, 15 Sep 2025 19:00:59 +0200 Subject: [PATCH 03/13] Updated logging hc_manager.py --- grimoirelab_toolkit/credential_manager/hc_manager.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/grimoirelab_toolkit/credential_manager/hc_manager.py b/grimoirelab_toolkit/credential_manager/hc_manager.py index 7e835e4..f4bed6f 100644 --- a/grimoirelab_toolkit/credential_manager/hc_manager.py +++ b/grimoirelab_toolkit/credential_manager/hc_manager.py @@ -48,7 +48,7 @@ def __init__(self, vault_url: str, token: str, certificate: str): Exception: If couldn't inizialize the client """ try: - _logger.info("Creating client and logging in.") + _logger.debug("Creating client and logging in.") self.client = hvac.Client(url=vault_url, token=token, verify=certificate) except Exception as e: @@ -57,10 +57,10 @@ def __init__(self, vault_url: str, token: str, certificate: str): raise e if self.client.sys.is_initialized(): - _logger.info("Client is initialized") + _logger.debug("Client is initialized") if self.client.is_authenticated(): - _logger.info("Client is authenticated") + _logger.debug("Client is authenticated") def _retrieve_credentials(self, service_name: str) -> dict: """ @@ -103,6 +103,7 @@ def get_secret(self, service_name: str, credential_name: str) -> str: credentials = self._retrieve_credentials(service_name) # We get the exact credential from the dict returned by the retrieval credential = credentials["data"]["data"][credential_name] + _logger.info("Credentials retrieved succesfully") return credential except ( hvac.exceptions.Forbidden, From 78388249ebda3a0a466a40131da8baa89df9070d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20Ferrer=20S=C3=A1nchez?= Date: Mon, 15 Sep 2025 19:05:31 +0200 Subject: [PATCH 04/13] Bitwarden now has to be installed and in path for the program to use it. --- grimoirelab_toolkit/credential_manager/bw_manager.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/grimoirelab_toolkit/credential_manager/bw_manager.py b/grimoirelab_toolkit/credential_manager/bw_manager.py index 9853c70..a33d8b8 100644 --- a/grimoirelab_toolkit/credential_manager/bw_manager.py +++ b/grimoirelab_toolkit/credential_manager/bw_manager.py @@ -81,8 +81,9 @@ def _login(self, bw_email: str, bw_password: str) -> str: return self.session_key _logger.debug("Checking Bitwarden login status") + # bw has to be in PATH status_result = subprocess.run( - ["/snap/bin/bw", "status"], capture_output=True, text=True, check=False + ["bw", "status"], capture_output=True, text=True, check=False ) if status_result.returncode == 0: @@ -175,7 +176,7 @@ def _validate_session(self) -> bool: """Checks current session.""" try: status_result = subprocess.run( - ["/snap/bin/bw", "status"], capture_output=True, text=True, check=False + ["bw", "status"], capture_output=True, text=True, check=False ) if status_result.returncode != 0: @@ -202,7 +203,7 @@ def _sync_vault(self) -> None: try: _logger.debug("Syncing vault") subprocess.run( - ["/snap/bin/bw", "sync", "--session", self.session_key], check=True + ["bw", "sync", "--session", self.session_key], check=True ) self.last_sync_time = datetime.now() except subprocess.CalledProcessError as e: @@ -224,7 +225,6 @@ def _retrieve_credentials(self, service_name: str) -> dict: try: _logger.info("Retrieving credential from Bitwarden CLI: %s", service_name) - bw_path = "/snap/bin/bw" _logger.debug("Session key = %s", self.session_key or "None") @@ -235,7 +235,7 @@ def _retrieve_credentials(self, service_name: str) -> dict: # Get list of items list_items = subprocess.run( - [bw_path, "list", "items", "--session", self.session_key], + ["bw", "list", "items", "--session", self.session_key], capture_output=True, text=True, check=False, @@ -264,7 +264,7 @@ def _retrieve_credentials(self, service_name: str) -> dict: # Retrieve exact match by id item_id = match_item.get("id") result = subprocess.run( - [bw_path, "get", "item", item_id, "--session", self.session_key], + ["bw", "get", "item", item_id, "--session", self.session_key], capture_output=True, text=True, check=False, From e3612eae802cb3a8d9dfa9da1fd4a569da190272 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20Ferrer=20S=C3=A1nchez?= Date: Mon, 15 Sep 2025 19:09:39 +0200 Subject: [PATCH 05/13] hc_manager.py raises exception if authentication fails --- grimoirelab_toolkit/credential_manager/hc_manager.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/grimoirelab_toolkit/credential_manager/hc_manager.py b/grimoirelab_toolkit/credential_manager/hc_manager.py index f4bed6f..11df6f2 100644 --- a/grimoirelab_toolkit/credential_manager/hc_manager.py +++ b/grimoirelab_toolkit/credential_manager/hc_manager.py @@ -58,9 +58,13 @@ def __init__(self, vault_url: str, token: str, certificate: str): if self.client.sys.is_initialized(): _logger.debug("Client is initialized") + else: + raise Exception("Vault client is not initialized") if self.client.is_authenticated(): _logger.debug("Client is authenticated") + else: + raise Exception("Client authentication failed") def _retrieve_credentials(self, service_name: str) -> dict: """ From 0bfaecd678fbbcbaa7a9c9cc071c5436c32e98dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20Ferrer=20S=C3=A1nchez?= Date: Mon, 15 Sep 2025 19:15:11 +0200 Subject: [PATCH 06/13] It does not return an empty string anymore. --- .../credential_manager/bw_manager.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/grimoirelab_toolkit/credential_manager/bw_manager.py b/grimoirelab_toolkit/credential_manager/bw_manager.py index a33d8b8..a9e70c9 100644 --- a/grimoirelab_toolkit/credential_manager/bw_manager.py +++ b/grimoirelab_toolkit/credential_manager/bw_manager.py @@ -56,7 +56,7 @@ def __init__(self, email: str, password: str): except FileNotFoundError: _logger.error("File not found") - def _login(self, bw_email: str, bw_password: str) -> str: + def _login(self, bw_email: str, bw_password: str) -> str | None: """ Logs into Bitwarden and obtains a session key. @@ -112,17 +112,17 @@ def _login(self, bw_email: str, bw_password: str) -> str: # Handle specific authentication errors if "invalid_grant" in error_msg.lower(): _logger.error("Invalid credentials provided for Bitwarden") - return "" + return None session_key = unlock_result.stdout.strip() if unlock_result.stdout else "" if not session_key: _logger.error("Empty session key received from unlock command") - return "" + return self.session_key = session_key if not self.session_key: _logger.debug("Couldn't obtain session key during login") - return "" + return None else: _logger.debug("Login in: %s", bw_email) @@ -139,13 +139,13 @@ def _login(self, bw_email: str, bw_password: str) -> str: # Handle specific authentication errors if "invalid_grant" in error_msg.lower(): _logger.error("Invalid credentials provided for Bitwarden") - return "" + return None _logger.debug("Setting session key") session_key = result.stdout.strip() if result.stdout else "" if not session_key: _logger.error("Empty session key received from login command") - return "" + return None self.session_key = session_key if self.session_key: @@ -166,7 +166,7 @@ def _login(self, bw_email: str, bw_password: str) -> str: return self.session_key _logger.debug("Session key not found cause could not log in") - return "" + return None except Exception as e: _logger.error("There was a problem login in: %s", e) @@ -338,7 +338,7 @@ def get_secret(self, service_name: str, credential_name: str) -> str: "The credential %s:%s, was not found.", service_name, credential_name ) _logger.error("In the meantime here you got an empty string") - return "" + return None else: # Return the requested credential return secret From 9673050886f24294f6d3a5978588c2138ffd69fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20Ferrer=20S=C3=A1nchez?= Date: Mon, 15 Sep 2025 19:27:02 +0200 Subject: [PATCH 07/13] commented line that prints credentials retrieved --- grimoirelab_toolkit/credential_manager/credential_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/grimoirelab_toolkit/credential_manager/credential_manager.py b/grimoirelab_toolkit/credential_manager/credential_manager.py index dbcf62f..f5a690b 100755 --- a/grimoirelab_toolkit/credential_manager/credential_manager.py +++ b/grimoirelab_toolkit/credential_manager/credential_manager.py @@ -90,7 +90,7 @@ def main(): try: secret = get_secret(args.manager, args.service, args.credential) - print(f"Retrieved {args.credential} for {args.service}: {secret}") + # print(f"Retrieved {args.credential} for {args.service}: {secret}") except Exception as e: _logger.error("Failed to retrieve secret: %s", e) sys.exit(1) \ No newline at end of file From efdc24543c5aa501d6f09f9da22f2f22219a4c83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20Ferrer=20S=C3=A1nchez?= Date: Mon, 15 Sep 2025 19:27:31 +0200 Subject: [PATCH 08/13] added return to main() --- grimoirelab_toolkit/credential_manager/credential_manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/grimoirelab_toolkit/credential_manager/credential_manager.py b/grimoirelab_toolkit/credential_manager/credential_manager.py index f5a690b..11958a4 100755 --- a/grimoirelab_toolkit/credential_manager/credential_manager.py +++ b/grimoirelab_toolkit/credential_manager/credential_manager.py @@ -90,6 +90,7 @@ def main(): try: secret = get_secret(args.manager, args.service, args.credential) + return secret # print(f"Retrieved {args.credential} for {args.service}: {secret}") except Exception as e: _logger.error("Failed to retrieve secret: %s", e) From 2d4306b3bcb8e565e86cae6f964a64614a4e0540 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20Ferrer=20S=C3=A1nchez?= Date: Mon, 15 Sep 2025 19:34:32 +0200 Subject: [PATCH 09/13] Unified logger. --- .../credential_manager/aws_manager.py | 5 ---- .../credential_manager/bw_manager.py | 4 +-- .../credential_manager/credential_manager.py | 30 +++++++++++++++++-- .../credential_manager/hc_manager.py | 3 -- 4 files changed, 28 insertions(+), 14 deletions(-) diff --git a/grimoirelab_toolkit/credential_manager/aws_manager.py b/grimoirelab_toolkit/credential_manager/aws_manager.py index 870e47c..bcd03c9 100644 --- a/grimoirelab_toolkit/credential_manager/aws_manager.py +++ b/grimoirelab_toolkit/credential_manager/aws_manager.py @@ -24,11 +24,6 @@ import boto3 from botocore.exceptions import EndpointConnectionError, SSLError, ClientError -logging.getLogger("urllib3").setLevel(logging.WARNING) -logging.getLogger("botocore").setLevel(logging.WARNING) -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) _logger = logging.getLogger(__name__) diff --git a/grimoirelab_toolkit/credential_manager/bw_manager.py b/grimoirelab_toolkit/credential_manager/bw_manager.py index a9e70c9..9a76953 100644 --- a/grimoirelab_toolkit/credential_manager/bw_manager.py +++ b/grimoirelab_toolkit/credential_manager/bw_manager.py @@ -24,9 +24,7 @@ import logging from datetime import datetime, timedelta -logging.basicConfig( - level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s" -) + _logger = logging.getLogger(__name__) diff --git a/grimoirelab_toolkit/credential_manager/credential_manager.py b/grimoirelab_toolkit/credential_manager/credential_manager.py index 11958a4..880f563 100755 --- a/grimoirelab_toolkit/credential_manager/credential_manager.py +++ b/grimoirelab_toolkit/credential_manager/credential_manager.py @@ -24,9 +24,9 @@ from .secrets_manager_factory import SecretsManagerFactory -logging.basicConfig( - level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s" -) +CREDENTIAL_MANAGER_LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s" +CREDENTIAL_MANAGER_DEBUG_LOG_FORMAT = "[%(asctime)s - %(name)s - %(levelname)s] - %(message)s" + _logger = logging.getLogger(__name__) @@ -68,6 +68,26 @@ def get_secret( raise +def configure_logging(debug=False): + """Configure credential_manager logging + + The function configures the log messages produced by Credential_manager. + By default, log messages are sent to stderr. Set the parameter + `debug` to activate the debug mode. + + :param debug: set the debug mode + """ + if not debug: + logging.basicConfig(level=logging.INFO, + format=CREDENTIAL_MANAGER_LOG_FORMAT) + logging.getLogger('requests').setLevel(logging.WARNING) + logging.getLogger('urllib3').setLevel(logging.WARNING) + logging.getLogger('boto3').setLevel(logging.WARNING) + logging.getLogger('botocore').setLevel(logging.WARNING) + else: + logging.basicConfig(level=logging.DEBUG, + format=CREDENTIAL_MANAGER_DEBUG_LOG_FORMAT) + def main(): """ Main entry point for the command line interface. @@ -88,6 +108,10 @@ def main(): args = parser.parse_args() + configure_logging(args.debug) + + logging.info("Starting credental manager.") + try: secret = get_secret(args.manager, args.service, args.credential) return secret diff --git a/grimoirelab_toolkit/credential_manager/hc_manager.py b/grimoirelab_toolkit/credential_manager/hc_manager.py index 11df6f2..1c390ab 100644 --- a/grimoirelab_toolkit/credential_manager/hc_manager.py +++ b/grimoirelab_toolkit/credential_manager/hc_manager.py @@ -23,9 +23,6 @@ import hvac import hvac.exceptions -logging.basicConfig( - level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s" -) _logger = logging.getLogger(__name__) From 308697c48c001a9fd52d389d2f850184eccfe510 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20Ferrer=20S=C3=A1nchez?= Date: Mon, 15 Sep 2025 19:37:35 +0200 Subject: [PATCH 10/13] fixed bw snap path --- grimoirelab_toolkit/credential_manager/bw_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/grimoirelab_toolkit/credential_manager/bw_manager.py b/grimoirelab_toolkit/credential_manager/bw_manager.py index 9a76953..baa1d9b 100644 --- a/grimoirelab_toolkit/credential_manager/bw_manager.py +++ b/grimoirelab_toolkit/credential_manager/bw_manager.py @@ -125,7 +125,7 @@ def _login(self, bw_email: str, bw_password: str) -> str | None: else: _logger.debug("Login in: %s", bw_email) result = subprocess.run( - ["/snap/bin/bw", "login", bw_email, bw_password, "--raw"], + ["bw", "login", bw_email, bw_password, "--raw"], capture_output=True, text=True, check=False, From 972b5acc0116fd15e1ef099d4c699e55e08fa49a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20Ferrer=20S=C3=A1nchez?= Date: Mon, 15 Sep 2025 19:38:20 +0200 Subject: [PATCH 11/13] fixed bw snap path --- grimoirelab_toolkit/credential_manager/bw_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/grimoirelab_toolkit/credential_manager/bw_manager.py b/grimoirelab_toolkit/credential_manager/bw_manager.py index baa1d9b..f1e4302 100644 --- a/grimoirelab_toolkit/credential_manager/bw_manager.py +++ b/grimoirelab_toolkit/credential_manager/bw_manager.py @@ -98,7 +98,7 @@ def _login(self, bw_email: str, bw_password: str) -> str | None: elif status.get("status") == "locked": _logger.debug("Vault locked, unlocking") unlock_result = subprocess.run( - ["/snap/bin/bw", "unlock", bw_password, "--raw"], + ["bw", "unlock", bw_password, "--raw"], capture_output=True, text=True, check=False, @@ -151,7 +151,7 @@ def _login(self, bw_email: str, bw_password: str) -> str | None: if self._should_sync(): _logger.debug("Syncing local vault with Bitwarden") sync_result = subprocess.run( - ["/snap/bin/bw", "sync", "--session", self.session_key], + ["bw", "sync", "--session", self.session_key], capture_output=True, text=True, check=False, From 08386d520c78e34aa8001e1cf687451a11f15c81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20Ferrer=20S=C3=A1nchez?= Date: Mon, 15 Sep 2025 20:02:21 +0200 Subject: [PATCH 12/13] fixed debug lvl argument --- .../credential_manager/credential_manager.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/grimoirelab_toolkit/credential_manager/credential_manager.py b/grimoirelab_toolkit/credential_manager/credential_manager.py index 880f563..50f5e80 100755 --- a/grimoirelab_toolkit/credential_manager/credential_manager.py +++ b/grimoirelab_toolkit/credential_manager/credential_manager.py @@ -105,6 +105,11 @@ def main(): "service", help="The name of the service for which to retrieve the credential." ) parser.add_argument("credential", help="The name of the credential to retrieve.") + parser.add_argument( + "--debug", + action="store_true", + help="Enable debug logging" + ) args = parser.parse_args() @@ -114,8 +119,8 @@ def main(): try: secret = get_secret(args.manager, args.service, args.credential) - return secret - # print(f"Retrieved {args.credential} for {args.service}: {secret}") + # return secret + print(f"Retrieved {args.credential} for {args.service}: {secret}") except Exception as e: _logger.error("Failed to retrieve secret: %s", e) sys.exit(1) \ No newline at end of file From 6bb93b550b5c597ef1c933b8b34df0cc6ce003f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20Ferrer=20S=C3=A1nchez?= Date: Mon, 15 Sep 2025 20:04:08 +0200 Subject: [PATCH 13/13] Renamed datetime to datetime_toolkit so it does not cause import problems. --- grimoirelab_toolkit/{datetime.py => datetime_toolkit.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename grimoirelab_toolkit/{datetime.py => datetime_toolkit.py} (100%) diff --git a/grimoirelab_toolkit/datetime.py b/grimoirelab_toolkit/datetime_toolkit.py similarity index 100% rename from grimoirelab_toolkit/datetime.py rename to grimoirelab_toolkit/datetime_toolkit.py