Skip to content
Open
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
.venv
/venv
__pycache__
**/__pycache__
*.egg-info
.pytest_cache
Expand Down
4 changes: 4 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- bump: patch
changes:
added:
- Added GCP logs in Household and Metadata services to assist further investigation of the 502 errors.
141 changes: 135 additions & 6 deletions policyengine_api/endpoints/household.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import json
import logging
from datetime import date
from policyengine_api.structured_logger import get_logger, log_struct

logger = get_logger()
from policyengine_api.utils.payload_validators import validate_country


Expand Down Expand Up @@ -88,14 +91,61 @@ def get_household_under_policy(

api_version = COUNTRY_PACKAGE_VERSIONS.get(country_id)

# Look in computed_households to see if already computed
# Log start of request
log_struct(
event="get_household_under_policy_start",
input_data={
"country_id": country_id,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue, blocking: Could you create a request_id value (or something like that) that we can then pass all the way through the logs to make debugging the entire flow easier?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment applies throughout the PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can generate a request_id

"household_id": household_id,
"policy_id": policy_id,
"api_version": api_version,
"request_path": request.path,
},
message="Started processing household under policy request.",
severity="INFO",
logger=logger, # optional if you've already called get_logger()
)

row = local_database.query(
f"SELECT * FROM computed_household WHERE household_id = ? AND policy_id = ? AND api_version = ?",
(household_id, policy_id, api_version),
).fetchone()
# Look in computed_household cache table
try:
row = local_database.query(
f"SELECT * FROM computed_household WHERE household_id = ? AND policy_id = ? AND api_version = ?",
(household_id, policy_id, api_version),
).fetchone()
except Exception as e:
log_struct(
event="computed_household_query_failed",
input_data={
"household_id": household_id,
"policy_id": policy_id,
"api_version": api_version,
},
message=f"Database query failed: {e}",
severity="ERROR",
)
return Response(
json.dumps(
{
"status": "error",
"message": "Internal server error while querying computed_household.",
}
),
status=500,
mimetype="application/json",
)

if row is not None:
log_struct(
event="cached_computed_household_found",
input_data={
"household_id": household_id,
"policy_id": policy_id,
"api_version": api_version,
},
message="Found precomputed household result in cache.",
severity="INFO",
)

result = dict(
policy_id=row["policy_id"],
household_id=row["household_id"],
Expand All @@ -122,7 +172,27 @@ def get_household_under_policy(
if row is not None:
household = dict(row)
household["household_json"] = json.loads(household["household_json"])
log_struct(
event="household_data_loaded",
input_data={
"household_id": household_id,
"country_id": country_id,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: I'd prefer if we defined all of the values to be included here in one place, then constantly passed them into all logs. E.g., here we have only household_id and country_id, but at another point, some logs don't have country_id and do have policy_id and household_id.

Also, doing so and just deconstructing an object of values into input_data would remove some duplicative code and improve readability.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment applies throughout the changes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted - we’ve implemented a centralized context object containing all relevant IDs, which we now pass into input_data for every log, eliminating duplication and improving readability.

},
message="Loaded household data from DB.",
severity="INFO",
)

else:
log_struct(
event="household_not_found",
input_data={
"household_id": household_id,
"country_id": country_id,
},
message=f"Household #{household_id} not found.",
severity="WARNING",
)

response_body = dict(
status="error",
message=f"Household #{household_id} not found.",
Expand Down Expand Up @@ -168,7 +238,28 @@ def get_household_under_policy(
household_id,
policy_id,
)

log_struct(
event="calculation_success",
input_data={
"household_id": household_id,
"policy_id": policy_id,
},
message="Household calculation succeeded.",
severity="INFO",
)

except Exception as e:
log_struct(
event="calculation_failed",
input_data={
"household_id": household_id,
"policy_id": policy_id,
},
message=f"Calculation failed: {e}",
severity="ERROR",
)

logging.exception(e)
response_body = dict(
status="error",
Expand All @@ -193,7 +284,27 @@ def get_household_under_policy(
api_version,
),
)
except Exception:
log_struct(
event="computed_household_inserted",
input_data={
"household_id": household_id,
"policy_id": policy_id,
},
message="Inserted new computed_household record.",
severity="INFO",
)

except Exception as e:
log_struct(
event="computed_household_insert_failed_updating",
input_data={
"household_id": household_id,
"policy_id": policy_id,
},
message=f"Insert failed; updated existing record instead. Error: {e}",
severity="ERROR",
)

# Update the result if it already exists
local_database.query(
f"UPDATE computed_household SET computed_household_json = ? WHERE country_id = ? AND household_id = ? AND policy_id = ?",
Expand Down Expand Up @@ -227,7 +338,25 @@ def get_calculate(country_id: str, add_missing: bool = False) -> dict:

try:
result = country.calculate(household_json, policy_json)
log_struct(
event="calculation_success",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Distinguish between endpoint types in the event name

E.g., calculation_success refers to two different events, one with database storage underneath and one without

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment applies throughout the PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — we’ve updated the event names to clearly distinguish endpoint types

input_data={
"country_id": country_id,
},
message="Calculation completed successfully.",
severity="INFO",
)

except Exception as e:
log_struct(
event="calculation_failed",
input_data={
"country_id": country_id,
},
message=f"Error calculating household under policy: {e}",
severity="ERROR",
)

logging.exception(e)
response_body = dict(
status="error",
Expand Down
60 changes: 55 additions & 5 deletions policyengine_api/services/metadata_service.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,62 @@
from policyengine_api.country import COUNTRIES
from policyengine_api.structured_logger import get_logger, log_struct

logger = get_logger()


class MetadataService:
def get_metadata(self, country_id: str) -> dict:
country = COUNTRIES.get(country_id)
if country == None:
raise RuntimeError(
f"Attempted to get metadata for a nonexistant country: '{country_id}'"

# Log the metadata retrieval attempt
log_struct(
event="MetadataService.get_metadata_called",
input_data={
"country_id": country_id,
},
message="Metadata retrieval called.",
severity="INFO",
)

try:
country = COUNTRIES.get(country_id)
if country == None:
error_msg = f"Attempted to get metadata for a nonexistant country: '{country_id}'"
log_struct(
event="MetadataService.get_metadata_failed",
input_data={
"country_id": country_id,
"error": error_msg,
},
message=f"Metadata successfully retrieved for country_id '{country_id}'",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue, blocking: I believe this error message is incorrect.

severity="ERROR",
)

raise RuntimeError(error_msg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue, blocking: I'm not sure I'd raise here. This will return a 500 SERVER ERROR instead of the more apt 404 NOT FOUND. I'd recommend returning a structured response instead.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood - for now, we’ll revert to the existing production code, since changes in /metadata_routes.py are also required; we can implement and test the update once the lower environment is ready.


metadata = country.metadata

log_struct(
event="MetadataService.get_metadata_success",
input_data={
"country_id": country_id,
},
message="Metadata successfully retrieved.",
severity="INFO",
)

return country.metadata
return metadata

except Exception as e:
log_struct(
event="MetadataService.get_metadata_exception",
input_data={
"country_id": country_id,
"error": str(e),
},
message="Exception occurred while retrieving metadata.",
severity="ERROR",
)

raise RuntimeError(
f"Unexpected error retrieving metadata for country_id '{country_id}': {e}"
) from e
58 changes: 58 additions & 0 deletions policyengine_api/structured_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging
import json
import sys

# Only import if using GCP logging
try:
from google.cloud import logging as gcp_logging
from google.cloud.logging.handlers import CloudLoggingHandler
except ImportError:
gcp_logging = None
CloudLoggingHandler = None


class JsonFormatter(logging.Formatter):
"""Formatter that outputs logs as structured JSON."""

def format(self, record):
log_record = {
"severity": record.levelname,
"event": getattr(record, "event", None),
"input": getattr(record, "input", None),
"message": record.getMessage(),
}
if record.exc_info:
log_record["exception"] = self.formatException(record.exc_info)
return json.dumps(log_record)


def get_logger(name="policyengine-api", level=logging.INFO):
logger = logging.getLogger(name)
logger.setLevel(level)

# If no handlers are set, add a StreamHandler with JSON formatting
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: I'm curious if this worked in GCP correctly. I believe you said you tested locally and it does; is that correct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve tested it locally and confirmed the logger outputs JSON in the terminal; if you want, we can also verify the logs in GCP Logs Explorer once the lower environment is set up.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would argue that creating and sending a GCP log to the prod server is a low-risk activity for the following reasons:

  • It shouldn't impact the service itself
  • If configured properly, it shouldn't delete any logs

Prior to the deployment of any QA environments, and assuming you have the necessary permissions, could you write a Python script using the relevant log-writing snippet to confirm that this structure logs correctly to GCP? I'd have my money on it logging everything as a massive piece of text.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anth-volk - I tested it by sending logs from local and it’s working correctly — the logs are showing up as structured JSON with the expected fields instead of one large text string in log explorer
log

handler.setFormatter(JsonFormatter())
logger.addHandler(handler)

# If using GCP logging, add a CloudLoggingHandler
# For more advanced GCP integration, consider enabling CloudLoggingHandler.
# if gcp_logging and CloudLoggingHandler:
# client = gcp_logging.Client()
# gcp_handler = CloudLoggingHandler(client, name=name)
# gcp_handler.setFormatter(JsonFormatter()) # Optional
# logger.addHandler(gcp_handler)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment: I'd recommend getting rid of this if not reusing somehow.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


return logger


def log_struct(event, input_data, message, severity="INFO", logger=None):
"""
Implementation-agnostic structured logger.
"""
if logger is None:
logger = get_logger()

log_func = getattr(logger, severity.lower(), logger.info)
log_func(message, extra={"event": event, "input": input_data})
Loading