Skip to content

Commit 4c70073

Browse files
committed
cover
1 parent 6fbded3 commit 4c70073

File tree

1 file changed

+290
-0
lines changed

1 file changed

+290
-0
lines changed
Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
"""Test ServerDisconnectedError retry mechanism with a mock HTTP server."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
7+
from collections.abc import AsyncGenerator
8+
import pytest
9+
import pytest_asyncio
10+
from aiohttp import web, ClientSession
11+
from lxml import etree
12+
from onvif.client import AsyncTransportProtocolErrorHandler
13+
14+
15+
class DisconnectingHTTPProtocol(asyncio.Protocol):
16+
"""HTTP protocol that disconnects after each response without sending Connection: close."""
17+
18+
def __init__(self, server: DisconnectingServer) -> None:
19+
self.server: DisconnectingServer = server
20+
self.transport: asyncio.Transport | None = None
21+
self.buffer: bytes = b""
22+
23+
def connection_made(self, transport: asyncio.Transport) -> None:
24+
"""Called when a connection is established."""
25+
self.transport = transport
26+
27+
def data_received(self, data: bytes) -> None:
28+
"""Handle incoming HTTP request."""
29+
self.buffer += data
30+
31+
# Simple HTTP parsing - look for double CRLF indicating end of headers
32+
if b"\r\n\r\n" not in self.buffer:
33+
return # Wait for more data
34+
35+
headers_end = self.buffer.index(b"\r\n\r\n") + 4
36+
headers = self.buffer[:headers_end].decode("utf-8", errors="ignore")
37+
38+
# Check if we have Content-Length header
39+
if b"Content-Length:" in self.buffer:
40+
# Extract content length
41+
for line in headers.split("\r\n"):
42+
if line.startswith("Content-Length:"):
43+
content_length = int(line.split(":")[1].strip())
44+
45+
# Check if we have the full body
46+
if len(self.buffer) >= headers_end + content_length:
47+
self.process_request()
48+
# else wait for more data
49+
break
50+
else:
51+
# No body expected
52+
self.process_request()
53+
54+
def process_request(self) -> None:
55+
"""Process the HTTP request and send response."""
56+
self.server.request_count += 1
57+
58+
# Create HTTP response without Connection: close header
59+
response = (
60+
b"HTTP/1.1 200 OK\r\n"
61+
b"Content-Type: application/soap+xml\r\n"
62+
b"Content-Length: %d\r\n"
63+
b"\r\n"
64+
)
65+
66+
body = (
67+
b"""<?xml version="1.0" encoding="UTF-8"?>
68+
<SOAP-ENV:Envelope xmlns:SOAP-ENV="http://www.w3.org/2003/05/soap-envelope">
69+
<SOAP-ENV:Body>
70+
<tds:TestResponse>
71+
<tds:RequestNumber>%d</tds:RequestNumber>
72+
</tds:TestResponse>
73+
</SOAP-ENV:Body>
74+
</SOAP-ENV:Envelope>"""
75+
% self.server.request_count
76+
)
77+
78+
response = response % len(body) + body
79+
80+
# Send response
81+
self.transport.write(response)
82+
83+
# Clear buffer for next request
84+
self.buffer = b""
85+
86+
# Abruptly close connection after sending response
87+
# This simulates the problematic camera behavior
88+
asyncio.get_event_loop().call_later(0.01, self.transport.close)
89+
90+
def connection_lost(self, exc: Exception | None) -> None:
91+
"""Called when the connection is lost."""
92+
pass
93+
94+
95+
class DisconnectingServer:
96+
"""Mock server that closes connection after each response without Connection: close header."""
97+
98+
def __init__(self) -> None:
99+
self.request_count: int = 0
100+
self.server: asyncio.Server | None = None
101+
self.port: int | None = None
102+
103+
async def start(self, port: int = 0) -> str:
104+
"""Start the mock server."""
105+
loop = asyncio.get_event_loop()
106+
107+
# Create server with custom protocol
108+
self.server = await loop.create_server(
109+
lambda: DisconnectingHTTPProtocol(self), "localhost", port
110+
)
111+
112+
# Get the actual port if 0 was specified
113+
self.port = self.server.sockets[0].getsockname()[1]
114+
115+
return f"http://localhost:{self.port}"
116+
117+
async def stop(self) -> None:
118+
"""Stop the mock server."""
119+
if self.server:
120+
self.server.close()
121+
await self.server.wait_closed()
122+
123+
124+
class ProperServer:
125+
"""Server that properly indicates connection closure with Connection: close header."""
126+
127+
def __init__(self) -> None:
128+
self.request_count: int = 0
129+
self.app: web.Application = web.Application()
130+
self.app.router.add_post("/onvif/device_service", self.handle_request)
131+
self.runner: web.AppRunner | None = None
132+
self.site: web.TCPSite | None = None
133+
134+
async def handle_request(self, request: web.Request) -> web.Response:
135+
"""Handle request and properly indicate connection will close."""
136+
self.request_count += 1
137+
await request.read()
138+
139+
# Properly indicate connection will close
140+
return web.Response(
141+
body=f"<response>{self.request_count}</response>".encode(),
142+
content_type="application/soap+xml",
143+
headers={"Connection": "close"},
144+
)
145+
146+
async def start(self, port: int = 8889) -> str:
147+
"""Start the server."""
148+
self.runner = web.AppRunner(self.app)
149+
await self.runner.setup()
150+
self.site = web.TCPSite(self.runner, "localhost", port)
151+
await self.site.start()
152+
return f"http://localhost:{port}"
153+
154+
async def stop(self) -> None:
155+
"""Stop the server."""
156+
if self.site:
157+
await self.site.stop()
158+
if self.runner:
159+
await self.runner.cleanup()
160+
161+
162+
@pytest_asyncio.fixture
163+
async def disconnecting_server() -> AsyncGenerator[tuple[DisconnectingServer, str]]:
164+
"""Fixture that provides a server that disconnects without notice."""
165+
server = DisconnectingServer()
166+
base_url = await server.start()
167+
yield server, base_url
168+
await server.stop()
169+
170+
171+
@pytest_asyncio.fixture
172+
async def proper_server() -> AsyncGenerator[tuple[ProperServer, str]]:
173+
"""Fixture that provides a server that properly sends Connection: close."""
174+
server = ProperServer()
175+
base_url = await server.start()
176+
yield server, base_url
177+
await server.stop()
178+
179+
180+
@pytest.mark.asyncio
181+
async def test_retry_on_server_disconnect_with_mock_server(
182+
disconnecting_server: tuple[DisconnectingServer, str],
183+
) -> None:
184+
"""Test that the client retries when server closes connection without notice."""
185+
186+
server, base_url = disconnecting_server
187+
188+
# Create client session with connection pooling
189+
# This will try to reuse connections
190+
async with ClientSession() as session:
191+
transport = AsyncTransportProtocolErrorHandler(
192+
session=session, verify_ssl=False
193+
)
194+
195+
# Create real XML envelope
196+
envelope = etree.Element(
197+
"{http://www.w3.org/2003/05/soap-envelope}Envelope",
198+
nsmap={
199+
"soap-env": "http://www.w3.org/2003/05/soap-envelope",
200+
"ns0": "http://www.onvif.org/ver10/device/wsdl",
201+
},
202+
)
203+
body = etree.SubElement(
204+
envelope, "{http://www.w3.org/2003/05/soap-envelope}Body"
205+
)
206+
etree.SubElement(
207+
body, "{http://www.onvif.org/ver10/device/wsdl}GetDeviceInformation"
208+
)
209+
210+
# First request should succeed
211+
result1 = await transport.post_xml(
212+
f"{base_url}/onvif/device_service", envelope, {}
213+
)
214+
assert result1.status_code == 200
215+
assert b"RequestNumber>1<" in result1._content
216+
217+
# Small delay to ensure connection is fully closed
218+
await asyncio.sleep(0.02)
219+
220+
# Second request will fail initially due to closed connection
221+
# but should succeed on retry
222+
result2 = await transport.post_xml(
223+
f"{base_url}/onvif/device_service", envelope, {}
224+
)
225+
assert result2.status_code == 200
226+
227+
# Should have made 3 total requests (1st success, 2nd fail, 3rd retry success)
228+
# Note: The exact behavior depends on connection pooling timing
229+
assert server.request_count >= 2
230+
231+
232+
@pytest.mark.asyncio
233+
async def test_multiple_sequential_requests_with_disconnects(
234+
disconnecting_server: tuple[DisconnectingServer, str],
235+
) -> None:
236+
"""Test multiple sequential requests with server disconnecting each time."""
237+
238+
server, base_url = disconnecting_server
239+
240+
async with ClientSession() as session:
241+
transport = AsyncTransportProtocolErrorHandler(
242+
session=session, verify_ssl=False
243+
)
244+
245+
# Create simple test envelope
246+
envelope = etree.Element("{http://test}TestRequest")
247+
248+
# Make 5 sequential requests with small delays
249+
for _ in range(5):
250+
await asyncio.sleep(0) # Ensure previous connection is closed
251+
result = await transport.post_xml(
252+
f"{base_url}/onvif/device_service", envelope, {}
253+
)
254+
assert result.status_code == 200
255+
256+
# Each request should succeed, potentially with retries
257+
assert server.request_count >= 5
258+
259+
260+
@pytest.mark.asyncio
261+
async def test_no_retry_with_proper_connection_close(
262+
proper_server: tuple[ProperServer, str],
263+
) -> None:
264+
"""Test that no retry occurs when server properly sends Connection: close."""
265+
266+
server, base_url = proper_server
267+
268+
async with ClientSession() as session:
269+
transport = AsyncTransportProtocolErrorHandler(
270+
session=session, verify_ssl=False
271+
)
272+
273+
# Create simple test envelope
274+
envelope = etree.Element("{http://test}TestRequest")
275+
276+
# Make 3 requests - no retries should occur
277+
for i in range(3):
278+
result = await transport.post_xml(
279+
f"{base_url}/onvif/device_service", envelope, {}
280+
)
281+
assert result.status_code == 200
282+
283+
# Should be exactly 3 requests (no retries)
284+
assert server.request_count == 3
285+
286+
287+
if __name__ == "__main__":
288+
print(
289+
"Tests should be run with pytest: pytest tests/test_server_disconnected_retry.py"
290+
)

0 commit comments

Comments
 (0)