Skip to content

Commit 79c2f58

Browse files
authored
graceful handling of non-cloudevent requests (#59)
* fix: handle non-cloudevent requests gracefully * add cloudevent test cmd * update changelog
1 parent 0a2c062 commit 79c2f58

File tree

7 files changed

+281
-59
lines changed

7 files changed

+281
-59
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
__pycache__
2+
3+
CLAUDE.md

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,15 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
1010

1111
### Added
1212
### Changed
13+
14+
- Improved test function to cover additional CloudEvent cases
15+
1316
### Deprecated
1417
### Removed
1518
### Fixed
19+
20+
- CloudEvent handler gracefully fails on non-cloudevent requests
21+
1622
### Security
1723

1824
## [0.5.1] - 2025-03-10

cmd/fcloudevent/README.md

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,61 @@
1-
# Function CloudEvents Test Command
1+
# CloudEvent Function Example
22

3-
fcloudevent is a command which illustrates how the func-python library middleware
4-
wraps a function and exposes it as a service. Useful for development.
3+
This directory contains an example CloudEvent function that demonstrates how to use the `func_python.cloudevent` middleware to handle CloudEvents.
54

65
This is an example usage of the Functions CloudEvents middleware.
76

87
Run the function
98
```
9+
10+
## Start the Function
11+
12+
Run the instanced handler (default):
13+
```bash
1014
poetry run python cmd/fcloudevent/main.py
1115
```
1216

13-
Send a CloudEvent against it:
17+
Run the static handler:
18+
```bash
19+
poetry run python cmd/fcloudevent/main.py --static
20+
```
21+
22+
Change the listen address (default is [::]:8080):
23+
```bash
24+
LISTEN_ADDRESS=127.0.0.1:8081 poetry run python cmd/fcloudevent/main.py
1425
```
15-
curl -v -X POST http://127.0.0.1:8080/ \
26+
27+
## Invoke the Function
28+
29+
You can send a CloudEvent to the function using curl with structured encoding:
30+
31+
```bash
32+
curl -X POST http://127.0.0.1:8080 \
33+
-H "Ce-Specversion: 1.0" \
34+
-H "Ce-Type: com.example.test" \
35+
-H "Ce-Source: /test/source" \
36+
-H "Ce-Id: test-123" \
1637
-H "Content-Type: application/json" \
17-
-H "ce-specversion: 1.0" \
18-
-H "ce-type: com.example.event.submit" \
19-
-H "ce-source: /applications/user-service" \
20-
-H "ce-id: $(uuidgen)" \
21-
-H "ce-time: $(date -u +"%Y-%m-%dT%H:%M:%SZ")" \
38+
-d '{"message": "Hello CloudEvents!"}'
39+
```
40+
41+
Or with a full CloudEvent in structured format:
42+
43+
```bash
44+
curl -X POST http://127.0.0.1:8080 \
45+
-H "Content-Type: application/cloudevents+json" \
2246
-d '{
23-
"message": "Hello CloudEvents",
24-
"username": "testuser",
25-
"action": "submit",
47+
"specversion": "1.0",
48+
"type": "com.example.test",
49+
"source": "/test/source",
50+
"id": "test-456",
51+
"datacontenttype": "application/json",
52+
"data": {
53+
"message": "Hello from structured CloudEvent!",
54+
"value": 42
55+
}
2656
}'
2757
```
2858

59+
2960
To see the actual middleware which is used when building a Python Function,
3061
see the [Functions Python Scaffolding](https://github.com/knative/func/tree/main/templates/python/cloudevents)

cmd/fcloudevent/main.py

Lines changed: 88 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,84 +1,125 @@
11
import argparse
22
import logging
3-
import json
43
from func_python.cloudevent import serve
54
from cloudevents.http import CloudEvent
65

76
# Set the default logging level to INFO
87
logging.basicConfig(level=logging.INFO)
98

10-
# Parse command line arguments
11-
parser = argparse.ArgumentParser(description='Serve a CloudEvents Function')
9+
# Allow this test to be either instanced (default) or --static
10+
# to test the two different primary method signatures supported in the
11+
# final Function.
12+
parser = argparse.ArgumentParser(description='Serve a Test CloudEvent Function')
1213
parser.add_argument('--static', action='store_true',
13-
help='Serve the static handler (default is instanced)')
14+
help='Serve the example static handler (default is to '
15+
'instantiate and serve the example class)')
1416
args = parser.parse_args()
1517

1618

17-
# Static handler implementation
19+
# Example static handler.
1820
# Enable with --static
1921
# Must be named exactly "handle"
2022
async def handle(scope, receive, send):
21-
"""Static handler for CloudEvents"""
22-
logging.info("CloudEvent static handler called")
23+
""" handle is an example of a static handler which can be sent to the
24+
middleware as a function. It will be wrapped in a default Function
25+
instance before being served as an ASGI application.
26+
"""
27+
logging.info("Static CloudEvent handler invoked")
2328

24-
# Process the CloudEvent from the scope
25-
event = scope.get("event")
29+
# Access the CloudEvent from the scope
30+
event = scope["event"]
2631
if not event:
2732
error_event = CloudEvent(
28-
{"type": "dev.functions.error", "source": "/cloudevent/error"},
29-
{"message": "No CloudEvent found in request"}
33+
{"type": "dev.functions.error", "source": "/fcloudevent/error"},
34+
{"message": "No CloudEvent found in scope"}
3035
)
31-
await send(error_event, 400)
36+
await send(error_event, 500)
3237
return
3338

34-
# Log the received event
35-
logging.info(f"Received event type: {event['type']}")
36-
logging.info(f"Received event source: {event['source']}")
37-
logging.info(f"Received event data: {json.dumps(event.data)}")
39+
logging.info(f"Received CloudEvent: type={event['type']}, source={event['source']}")
40+
41+
# Handle event data - it might be bytes or dict
42+
event_data = event.data
43+
if isinstance(event_data, bytes):
44+
import json
45+
event_data = json.loads(event_data)
46+
logging.info(f"CloudEvent data: {event_data}")
3847

39-
# Create and send response CloudEvent
4048
response_event = CloudEvent(
41-
{"type": "dev.functions.response", "source": "/cloudevent/processor"},
42-
{"message": "Processed static", "original_data": event.data}
49+
{
50+
"type": "com.example.response.static",
51+
"source": "/fcloudevent/static",
52+
},
53+
{
54+
"message": "OK: static CloudEvent handler",
55+
"received_event_type": event['type'],
56+
"received_event_source": event['source'],
57+
"received_data": event_data
58+
}
4359
)
60+
4461
await send(response_event)
4562

4663

47-
# Instanced handler implementation
64+
# Example instanced handler
65+
# This is the default expected by this test.
66+
# The class can be named anything, but there must be a constructor named "new"
67+
# which returns an object with an async method "handle" conforming to the ASGI
68+
# callable's method signature.
4869
class MyCloudEventFunction:
4970
def __init__(self):
5071
self.event_count = 0
5172
logging.info("CloudEvent function instance created")
5273

5374
async def handle(self, scope, receive, send):
54-
"""Handle CloudEvents in an instanced function"""
55-
event = scope.get("event")
75+
logging.info("Instanced CloudEvent handler invoked")
76+
77+
# Access the CloudEvent from the scope
78+
event = scope["event"]
5679
if not event:
80+
# This shouldn't happen with our fix, but let's handle it gracefully
5781
error_event = CloudEvent(
58-
{"type": "dev.functions.error", "source": "/cloudevent/error"},
59-
{"message": "No CloudEvent found in request"}
82+
{"type": "dev.functions.error", "source": "/fcloudevent/error"},
83+
{"message": "No CloudEvent found in scope"}
6084
)
61-
await send(error_event, 400)
85+
await send(error_event, 500)
6286
return
6387

6488
self.event_count += 1
6589

66-
# Log the received event
67-
logging.info(f"Received event type: {event['type']}")
68-
logging.info(f"Received event source: {event['source']}")
69-
logging.info(f"Received event data: {json.dumps(event.data)}")
90+
logging.info(f"Received CloudEvent #{self.event_count}: type={event['type']}, source={event['source']}")
91+
92+
# Handle event data - it might be bytes or dict
93+
event_data = event.data
94+
if isinstance(event_data, bytes):
95+
import json
96+
event_data = json.loads(event_data)
97+
logging.info(f"CloudEvent data: {event_data}")
7098
logging.info(f"Total events processed: {self.event_count}")
7199

72-
# Create and send response CloudEvent
73100
response_event = CloudEvent(
74-
{"type": "dev.functions.response", "source": "/cloudevent/processor"},
75101
{
76-
"message": "Processed instanced",
77-
"original_data": event.data,
78-
"count": self.event_count
102+
"type": "com.example.response.instanced",
103+
"source": "/fcloudevent/instanced",
104+
},
105+
{
106+
"message": "OK: instanced CloudEvent handler",
107+
"event_count": self.event_count,
108+
"received_event_type": event['type'],
109+
"received_event_source": event['source'],
110+
"received_data": event_data
79111
}
80112
)
81-
await send(response_event)
113+
114+
# Demonstrate different sending methods
115+
if self.event_count % 2 == 0:
116+
# Use structured encoding (default)
117+
logging.info("Sending structured CloudEvent response")
118+
await send.structured(response_event)
119+
else:
120+
# Use binary encoding
121+
logging.info("Sending binary CloudEvent response")
122+
await send.binary(response_event)
82123

83124
def alive(self):
84125
logging.info("Liveness checked")
@@ -93,16 +134,26 @@ def stop(self):
93134

94135

95136
# Function instance constructor
137+
# expected to be named exactly "new"
138+
# Must return a object which exposes a method "handle" which conforms to the
139+
# ASGI specification's method signature.
96140
def new():
97-
"""Create a new function instance"""
141+
""" new is the factory function (or constructor) which will create
142+
a new function instance when invoked. This must be named "new", and the
143+
structure returned must include a method named "handle" which implements
144+
the ASGI specification's method signature. The name of the class itself
145+
can be changed.
146+
"""
98147
return MyCloudEventFunction()
99148

100149

101-
# Run the example
150+
# Run the example.
151+
# Start either the static or instanced handler depending on flag --static
102152
if __name__ == "__main__":
103153
if args.static:
104154
logging.info("Starting static CloudEvent handler")
105155
serve(handle)
106156
else:
107157
logging.info("Starting instanced CloudEvent handler")
108158
serve(new)
159+

cmd/fhttp/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ async def handle(scope, receive, send):
2323
middleware as a function. It will be wrapped in a default Function
2424
instance before being served as an ASGI application.
2525
"""
26-
logging.info("OK: static!!")
26+
logging.info("Static HTTP handler invoked")
2727

2828
await send({
2929
'type': 'http.response.start',

src/func_python/cloudevent.py

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from cloudevents.http import from_http, CloudEvent
99
from cloudevents.conversion import to_structured, to_binary
10+
from cloudevents.exceptions import MissingRequiredFields, InvalidRequiredFields
1011

1112
DEFAULT_LOG_LEVEL = logging.INFO
1213
DEFAULT_LISTEN_ADDRESS = "[::]:8080"
@@ -143,14 +144,39 @@ async def __call__(self, scope, receive, send):
143144
# interstitial encode/decode, and thus avoid the approx. 200
144145
# lines of shared server boilerplate.
145146
#
146-
# Decode the event and make it available in the scope
147-
scope["event"] = await decode_event(scope, receive)
148-
# Wrap the sender in a CloudEventSender
149-
send = CloudEventSender(send)
150-
# Delegate processing to user's Function
151-
await self.f.handle(scope, receive, send)
147+
try:
148+
# Decode the event and make it available in the scope
149+
scope["event"] = await decode_event(scope, receive)
150+
# Wrap the sender in a CloudEventSender
151+
send = CloudEventSender(send)
152+
# Delegate processing to user's Function
153+
await self.f.handle(scope, receive, send)
154+
except (MissingRequiredFields, InvalidRequiredFields) as e:
155+
# Log the non-CloudEvent request for debugging
156+
logging.warning(f"Received non-CloudEvent request: {scope['method']} {scope['path']}")
157+
headers_dict = {k.decode('utf-8'): v.decode('utf-8') for k, v in scope.get('headers', [])}
158+
logging.debug(f"Request headers: {headers_dict}")
159+
160+
# Return 400 Bad Request for non-CloudEvent requests
161+
await send({
162+
'type': 'http.response.start',
163+
'status': 400,
164+
'headers': [[b'content-type', b'text/plain']]
165+
})
166+
await send({
167+
'type': 'http.response.body',
168+
'body': b'Bad Request: This endpoint expects CloudEvent requests. '
169+
})
170+
return
152171
except Exception as e:
153-
await send_exception_cloudevent(send, 500, f"Error: {e}")
172+
# For other unexpected errors, try to send a CloudEvent error response
173+
# But check if send is already a CloudEventSender
174+
if hasattr(send, 'structured'):
175+
await send_exception_cloudevent(send, 500, f"Error: {e}")
176+
else:
177+
# Fallback to plain HTTP error
178+
logging.error(f"Unexpected error: {e}")
179+
await send_exception(send, 500, f"Internal Server Error: {e}".encode())
154180

155181
async def handle_liveness(self, scope, receive, send):
156182
alive = True
@@ -234,7 +260,23 @@ async def send_exception_cloudevent(send, status, message):
234260
}
235261
data = {"message": message}
236262

237-
await send.structured(CloudEvent(attributes, data), status)
263+
# Check if send is a CloudEventSender with structured method
264+
if hasattr(send, 'structured'):
265+
await send.structured(CloudEvent(attributes, data), status)
266+
else:
267+
# Fallback to plain HTTP error response if send is not a CloudEventSender
268+
logging.warning("send_exception_cloudevent called with non-CloudEventSender, falling back to HTTP response")
269+
await send({
270+
'type': 'http.response.start',
271+
'status': status,
272+
'headers': [[b'content-type', b'application/json']]
273+
})
274+
import json
275+
error_body = json.dumps({"error": {"message": message, "type": "dev.functions.error"}})
276+
await send({
277+
'type': 'http.response.body',
278+
'body': error_body.encode()
279+
})
238280

239281

240282
class CloudEventSender:

0 commit comments

Comments
 (0)