diff --git a/.gitignore b/.gitignore index ab7a9be..b1013c8 100644 --- a/.gitignore +++ b/.gitignore @@ -31,4 +31,5 @@ coverage.xml env/ venv/ /site -.idea/ \ No newline at end of file +.idea/ +.DS_Store diff --git a/maskerlogger/__init__.py b/maskerlogger/__init__.py index 28e8803..675bdcd 100644 --- a/maskerlogger/__init__.py +++ b/maskerlogger/__init__.py @@ -1,5 +1,18 @@ """ Init file for oxformatter package. """ -from maskerlogger.masker_formatter import MaskerFormatter, MaskerFormatterJson # noqa -__version__ = '0.4.0-beta.1' + +from maskerlogger.masker_formatter import ( + MaskerFormatter, + MaskerFormatterJson, + mask_string, +) + +# Expose the classes and main function +__all__ = [ + "MaskerFormatter", + "MaskerFormatterJson", + "mask_string", +] + +__version__ = "0.4.0-beta.1" diff --git a/maskerlogger/ahocorasick_regex_match.py b/maskerlogger/ahocorasick_regex_match.py index 0f93f2d..bc4b820 100644 --- a/maskerlogger/ahocorasick_regex_match.py +++ b/maskerlogger/ahocorasick_regex_match.py @@ -3,6 +3,7 @@ from typing import List import ahocorasick from maskerlogger.utils import timeout +from collections import defaultdict MAX_MATCH_TIMEOUT = 1 @@ -19,47 +20,57 @@ def _initialize_automaton(self) -> ahocorasick.Automaton: for keyword, regexs in self.keyword_to_patterns.items(): keyword_automaton.add_word(keyword, (regexs)) keyword_automaton.make_automaton() + return keyword_automaton @staticmethod def _load_config(config_path: str) -> dict: - with open(config_path, 'rb') as f: + with open(config_path, "rb") as f: return toml.load(f) - def _extract_keywords_and_patterns(self, config) -> dict: - keyword_to_patterns = {} - for rule in config['rules']: - for keyword in rule.get('keywords', []): - if keyword not in keyword_to_patterns: - keyword_to_patterns[keyword] = [] + def _extract_keywords_and_patterns( + self, config: dict + ) -> dict[str, List[re.Pattern]]: + """Extracts keywords and their corresponding regex patterns from the configuration file.""" + keyword_to_patterns = defaultdict(list) - keyword_to_patterns[keyword].append(self._get_compiled_regex( - rule['regex'])) + for rule in config["rules"]: + for keyword in rule.get("keywords", []): + keyword_to_patterns[keyword].append( + self._get_compiled_regex(rule["regex"]) + ) - return keyword_to_patterns + return dict(keyword_to_patterns) - def _get_compiled_regex(self, regex: str) -> str: - if '(?i)' in regex: - regex = regex.replace('(?i)', '') + def _get_compiled_regex(self, regex: str) -> re.Pattern[str]: + """Compiles the regex pattern and returns the compiled pattern.""" + if "(?i)" in regex: + regex = regex.replace("(?i)", "") return re.compile(regex, re.IGNORECASE) return re.compile(regex) - def _filter_by_keywords(self, line): + def _filter_by_keywords(self, line: str) -> list[re.Pattern[str]]: + """Filters the regex patterns based on the keywords present in the line.""" + matched_regexes = set() - for end_index, regex_values in self.automaton.iter(line): + for _, regex_values in self.automaton.iter(line): matched_regexes.update(regex_values) - return matched_regexes + + return list(matched_regexes) @timeout(MAX_MATCH_TIMEOUT) - def _get_match_regex(self, line: str, - matched_regex: List[re.Pattern]) -> List[re.Match]: - matches = [] - for regex in matched_regex: - if match := regex.search(line): - matches.append(match) - return matches - - def match_regex_to_line(self, line: str) -> re.Match: + def _get_match_regex( + self, + line: str, + matched_regex: List[re.Pattern[str]], + ) -> List[re.Match]: + """Gets the matches of the regex patterns in the given line.""" + return [match for regex in matched_regex for match in regex.finditer(line)] + + def match_regex_to_line(self, line: str) -> list[re.Match[str]]: + """Matches the regex patterns to the given line.""" lower_case_line = line.lower() - if matched_regxes := self._filter_by_keywords(lower_case_line): - return self._get_match_regex(line, matched_regxes) + + if matched_regexes := self._filter_by_keywords(lower_case_line): + return self._get_match_regex(line, matched_regexes) + return [] diff --git a/maskerlogger/config/gitleaks.toml b/maskerlogger/config/gitleaks.toml index b2a2634..91ccd3e 100644 --- a/maskerlogger/config/gitleaks.toml +++ b/maskerlogger/config/gitleaks.toml @@ -505,7 +505,7 @@ keywords = [ [[rules]] id = "generic-api-key" description = "Detected a Generic API Key, potentially exposing access to various services and sensitive operations." -regex = '''(?i)(?:key|api|token|secret|client|passwd|password|auth|access)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([0-9a-z\-_.=]{10,150})(?:['|\"|\n|\r|\s|\x60|;]|$)''' +regex = '''(?i)(?:key|api|token|secret|client|passwd|password|auth|access)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([0-9a-zA-Z\-_.!$&'*+/=?^_`{|}~#@,;:%^]{8,150})(?:['|\"|\n|\r|\s|\x60|;]|$)''' entropy = 3.5 keywords = [ "key","api","token","secret","client","passwd","password","auth","access", diff --git a/maskerlogger/masker_formatter.py b/maskerlogger/masker_formatter.py index 5b26d95..e3ea4b1 100644 --- a/maskerlogger/masker_formatter.py +++ b/maskerlogger/masker_formatter.py @@ -2,7 +2,7 @@ import os import re from abc import ABC -from typing import List +from typing import List, Optional from pythonjsonlogger import jsonlogger @@ -11,15 +11,56 @@ DEFAULT_SECRETS_CONFIG_PATH = os.path.join( os.path.dirname(__file__), "config/gitleaks.toml" ) -_APPLY_MASK = 'apply_mask' +_APPLY_MASK = "apply_mask" SKIP_MASK = {_APPLY_MASK: False} +__all__ = [ + "mask_string", + "MaskerFormatter", + "MaskerFormatterJson", +] + + +def _apply_asterisk_mask(msg: str, matches: List[re.Match[str]], redact: int) -> str: + """Replace the sensitive data with asterisks in the given message.""" + for match in matches: + match_groups = match.groups() if match.groups() else [match.group()] # noqa + for group in match_groups: + redact_length = int((len(group) / 100) * redact) + msg = msg.replace(group[:redact_length], "*" * redact_length, 1) + + return msg + + +def mask_string( + msg: str, + redact: int = 100, + regex_config_path: str = DEFAULT_SECRETS_CONFIG_PATH, +) -> str: + """Masks the sensitive data in the given string. + + Args: + string (str): The string to mask. + redact (int): Percentage of the sensitive data to + redact. + regex_config_path (str): Path to the configuration file for regex patterns. + + Returns: + str: The masked string. + """ + regex_matcher = RegexMatcher(regex_config_path) + if found_matching_regexes := regex_matcher.match_regex_to_line(msg): + msg = _apply_asterisk_mask(msg, found_matching_regexes, redact=redact) + + return msg + + class AbstractMaskedLogger(ABC): def __init__( - self, - regex_config_path: str = DEFAULT_SECRETS_CONFIG_PATH, - redact=100 + self, + regex_config_path: str = DEFAULT_SECRETS_CONFIG_PATH, + redact: int = 100, ): """Initializes the AbstractMaskedLogger. @@ -27,40 +68,22 @@ def __init__( regex_config_path (str): Path to the configuration file for regex patterns. redact (int): Percentage of the sensitive data to redact. """ - self.regex_matcher = RegexMatcher(regex_config_path) + self.regex_config_path = regex_config_path self.redact = redact - @staticmethod - def _validate_redact(redact: int) -> int: - if not (0 <= int(redact) <= 100): - raise ValueError("Redact value must be between 0 and 100") - - return int(redact) - - def _mask_secret(self, msg: str, matches: List[re.Match]) -> str: - """Masks the sensitive data in the log message.""" - for match in matches: - match_groups = match.groups() if match.groups() else [match.group()] # noqa - for group in match_groups: - redact_length = int((len(group) / 100) * self.redact) - msg = msg.replace( - group[:redact_length], "*" * redact_length, 1) - - return msg - def _mask_sensitive_data(self, record: logging.LogRecord) -> None: """Applies masking to the sensitive data in the log message.""" - if found_matching_regex := self.regex_matcher.match_regex_to_line(record.msg): # noqa - record.msg = self._mask_secret(record.msg, found_matching_regex) + record.msg = mask_string(record.msg, self.redact, self.regex_config_path) -# Normal Masked Logger - Text-Based Log Formatter class MaskerFormatter(logging.Formatter, AbstractMaskedLogger): + """A log formatter that masks sensitive data in text-based logs.""" + def __init__( - self, - fmt: str, - regex_config_path: str = DEFAULT_SECRETS_CONFIG_PATH, - redact=100 + self, + fmt: Optional[str] = None, + regex_config_path: str = DEFAULT_SECRETS_CONFIG_PATH, + redact: int = 100, ): """Initializes the MaskerFormatter. @@ -80,13 +103,14 @@ def format(self, record: logging.LogRecord) -> str: return super().format(record) -# JSON Masked Logger - JSON-Based Log Formatter class MaskerFormatterJson(jsonlogger.JsonFormatter, AbstractMaskedLogger): + """A JSON log formatter that masks sensitive data in json-based logs.""" + def __init__( - self, - fmt: str, - regex_config_path: str = DEFAULT_SECRETS_CONFIG_PATH, - redact=100 + self, + fmt: Optional[str] = None, + regex_config_path: str = DEFAULT_SECRETS_CONFIG_PATH, + redact: int = 100, ): """Initializes the MaskerFormatterJson. diff --git a/maskerlogger/secrets_in_logs_example.py b/maskerlogger/secrets_in_logs_example.py index 522988a..f02035e 100644 --- a/maskerlogger/secrets_in_logs_example.py +++ b/maskerlogger/secrets_in_logs_example.py @@ -10,19 +10,21 @@ def main(): """ Main function to demonstrate logging with secrets. """ - logger = logging.getLogger('mylogger') + logger = logging.getLogger("mylogger") logger.setLevel(logging.DEBUG) handler = logging.StreamHandler() handler.setFormatter( - MaskerFormatter("%(asctime)s %(name)s %(levelname)s %(message)s", - redact=50)) + MaskerFormatter("%(asctime)s %(name)s %(levelname)s %(message)s", redact=50) + ) logger.addHandler(handler) - logger.info('"current_key": "AIzaSOHbouG6DDa6DOcRGEgOMayAXYXcw6la3c"', extra=SKIP_MASK) # noqa + logger.info( + '"current_key": "AIzaSOHbouG6DDa6DOcRGEgOMayAXYXcw6la3c"', extra=SKIP_MASK + ) # noqa logger.info('"AKIAI44QH8DHBEXAMPLE" and then more text.') logger.info("Datadog access token: 'abcdef1234567890abcdef1234567890'") logger.info('"password": "password123"') -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/maskerlogger/utils.py b/maskerlogger/utils.py index 5ba8041..addaa2f 100644 --- a/maskerlogger/utils.py +++ b/maskerlogger/utils.py @@ -19,8 +19,9 @@ def target(): thread.start() thread.join(seconds) if thread.is_alive(): - raise TimeoutException( - f"Function call exceeded {seconds} seconds") + raise TimeoutException(f"Function call exceeded {seconds} seconds") return result[0] + return wrapper + return decorator diff --git a/tests/test_masked_logger.py b/tests/test_masked_logger.py index b500217..907e312 100644 --- a/tests/test_masked_logger.py +++ b/tests/test_masked_logger.py @@ -5,6 +5,53 @@ from maskerlogger import MaskerFormatter, MaskerFormatterJson +ELASTIC_PW = "^_h6yCZKuadboPDfSa7pmN2tdWPCbZPWq!!" +MASKED_ELASTIC_PW = len(ELASTIC_PW) * "*" +API_TOKEN = "dqu0oJU45UMbrhJ1eNfVdSQ9Yf6wj6u@!^_" +MASKED_API_TOKEN = len(API_TOKEN) * "*" +GCP_API_KEY = "AIzaSyabcdefghijklmnopqrstuvwxyz1234567" +MASKED_GCP_API_KEY = len(GCP_API_KEY) * "*" + +SENSITIVE_STRING = json.dumps( + { + "ELASTIC_SEARCH": { + "URL": "https://example.com:9200", + "USERNAME": "", + "PASSWORD": ELASTIC_PW, + }, + "ANOTHER_ELASTIC_SEARCH": { + "URL": "https://example.com:9200", + "USERNAME": "", + "PASSWORD": ELASTIC_PW, + }, + "API": { + "HOST": "https://api.example.com", + "USERNAME": "", + "TOKEN": API_TOKEN, + }, + "GCP": { + "PROJECT_ID": "my-gcp-project", + "SERVICE_ACCOUNT": "my-service", + "API_KEY": GCP_API_KEY, + }, + } +) + + +def common_assertions(log_output: str) -> None: + # ElASTIC_SEARCH password should be masked + assert ELASTIC_PW not in log_output + assert f'"PASSWORD": "{MASKED_ELASTIC_PW}"' in log_output + + # API token should be masked + assert API_TOKEN not in log_output + assert f'"TOKEN": "{MASKED_API_TOKEN}"' in log_output + + # GCP API key should be masked + assert GCP_API_KEY not in log_output + assert f'"API_KEY": "{MASKED_GCP_API_KEY}"' in log_output + + @pytest.fixture def logger_and_log_stream(): """ @@ -13,7 +60,7 @@ def logger_and_log_stream(): Returns: tuple: A logger instance and a StringIO object to capture the log output. """ - logger = logging.getLogger('test_logger') + logger = logging.getLogger("test_logger") logger.setLevel(logging.DEBUG) logger.handlers.clear() log_stream = StringIO() @@ -45,14 +92,12 @@ def test_masked_logger_text(logger_and_log_stream, log_format): logger.handlers[0].setFormatter(formatter) # Log a sensitive message - logger.info("User login with password=secretpassword") + logger.info(SENSITIVE_STRING) # Read and parse the log output log_output = log_stream.getvalue().strip() - # Validate that the password is masked in the text log output - assert "password=*****" in log_output - assert "secretpassword" not in log_output + common_assertions(log_output) def test_masked_logger_json(logger_and_log_stream, log_format): @@ -70,15 +115,14 @@ def test_masked_logger_json(logger_and_log_stream, log_format): logger.handlers[0].setFormatter(formatter) # Log a sensitive message - logger.info("User login with password=secretpassword") + logger.info(SENSITIVE_STRING) # Read and parse the log output log_output = log_stream.getvalue().strip() log_json = json.loads(log_output) # Parse the JSON log output # Validate that the password is masked in the JSON log output - assert "password=*****" in log_json["message"] - assert "secretpassword" not in log_json["message"] + common_assertions(log_json["message"]) def test_masked_logger_text_format_after_masking(logger_and_log_stream, log_format): @@ -96,14 +140,13 @@ def test_masked_logger_text_format_after_masking(logger_and_log_stream, log_form logger.handlers[0].setFormatter(formatter) # Log a sensitive message - logger.info("Sensitive data: password=secretpassword and other info") + logger.info(SENSITIVE_STRING) # Read and parse the log output log_output = log_stream.getvalue().strip() # Validate that the password is masked and the log format is correct - assert "password=*****" in log_output - assert "secretpassword" not in log_output + common_assertions(log_output) def test_masked_logger_json_format_after_masking(logger_and_log_stream, log_format): @@ -122,15 +165,14 @@ def test_masked_logger_json_format_after_masking(logger_and_log_stream, log_form logger.handlers[0].setFormatter(formatter) # Log a sensitive message - logger.info("Sensitive data: password=secretpassword and other info") + logger.info(SENSITIVE_STRING) # Read and parse the log output log_output = log_stream.getvalue().strip() log_json = json.loads(log_output) # Parse the JSON log output # Validate that the password is masked and the JSON log format is correct - assert "password=*****" in log_json["message"] - assert "secretpassword" not in log_json["message"] + common_assertions(log_json["message"]) def test_masked_logger_non_sensitive_data(logger_and_log_stream, log_format):