Replies: 1 comment
-
NetStream Code AnalysisOverviewThe Core Architecture1. Imports and Dependenciesfrom enum import Enum, auto
import logging
from typing import TYPE_CHECKING
import trio Code Breakdown:
Key Dependencies:
2. StreamState Enumclass StreamState(Enum):
INIT = auto() # Stream created but not established
OPEN = auto() # Stream ready for I/O operations
CLOSE_READ = auto() # Read side closed, write may be open
CLOSE_WRITE = auto() # Write side closed, read may be open
CLOSE_BOTH = auto() # Both sides closed, stream terminated
RESET = auto() # Stream reset by peer or locally
ERROR = auto() # Unrecoverable error condition Code Breakdown:
State Machine Design:
State Relationships:
Class Implementation3. Constructor and Initializationdef __init__(self, muxed_stream: IMuxedStream, swarm_conn: "SwarmConn | None") -> None:
self.muxed_stream = muxed_stream
self.muxed_conn = muxed_stream.muxed_conn
self.protocol_id = None
self._state = StreamState.INIT
self.swarm_conn = swarm_conn
self.logger = logging.getLogger(__name__)
# Thread safety for state operations (following AkMo3's approach)
self._state_lock = trio.Lock() Code Breakdown:
Key Features:
Thread Safety Design:
4. Protocol Managementdef get_protocol(self) -> TProtocol | None:
return self.protocol_id
def set_protocol(self, protocol_id: TProtocol) -> None:
self.protocol_id = protocol_id Code Breakdown:
Purpose: Manages the protocol identifier for the stream, allowing protocol-specific handling. Usage Pattern:
5. State ManagementState Property (Async)@property
async def state(self) -> StreamState:
async with self._state_lock:
return self._state Code Breakdown:
Thread Safety: State access is protected by lock to prevent race conditions. Usage Pattern: current_state = await stream.state
if current_state == StreamState.OPEN:
# Stream is ready for I/O State Setting with Loggingasync def set_state(self, state: StreamState) -> None:
async with self._state_lock:
old_state = self._state
self._state = state
# Log state transition for debugging and monitoring
self.logger.debug(f"Stream state transition: {old_state.name} → {state.name}")
# Log important state changes at info level
if state in [StreamState.ERROR, StreamState.RESET, StreamState.CLOSE_BOTH]:
self.logger.info(f"Stream entered {state.name} state (from {old_state.name})") Code Breakdown:
Features:
Logging Strategy:
I/O Operations6. Read Operationasync def read(self, n: int | None = None) -> bytes:
# Check state atomically to prevent race conditions
async with self._state_lock:
if self._state == StreamState.ERROR:
raise StreamError("Cannot read from stream; stream is in error state")
elif self._state == StreamState.RESET:
raise StreamReset("Cannot read from stream; stream is reset")
elif self._state == StreamState.CLOSE_READ:
# Allow reads from streams that might still have buffered data
pass
# Note: Allow reading from CLOSE_BOTH state - buffered data may exist
# Perform I/O operation without holding the lock to prevent deadlocks
try:
return await self.muxed_stream.read(n)
except MuxedStreamEOF as error:
# Handle state transitions when EOF is encountered
async with self._state_lock:
if self._state == StreamState.CLOSE_WRITE:
self._state = StreamState.CLOSE_BOTH
elif self._state == StreamState.OPEN:
self._state = StreamState.CLOSE_READ
raise StreamEOF() from error
# ... additional exception handling Code Breakdown: State Validation Phase:
I/O Operation Phase:
EOF Handling:
Key Design Decisions:
7. Write Operationasync def write(self, data: bytes) -> None:
# Check state atomically to prevent race conditions
async with self._state_lock:
if self._state == StreamState.ERROR:
raise StreamError("Cannot write to stream; stream is in error state")
elif self._state == StreamState.RESET:
raise StreamReset("Cannot write to stream; stream is reset")
elif self._state in [StreamState.CLOSE_WRITE, StreamState.CLOSE_BOTH]:
raise StreamClosed("Cannot write to stream; closed for writing")
# Perform I/O operation without holding the lock to prevent deadlocks
try:
await self.muxed_stream.write(data)
except (MuxedStreamClosed, QUICStreamClosedError, QUICStreamResetError) as error:
async with self._state_lock:
if self._state == StreamState.CLOSE_READ:
self._state = StreamState.CLOSE_BOTH
elif self._state == StreamState.OPEN:
self._state = StreamState.CLOSE_WRITE
raise StreamClosed() from error
# ... additional exception handling Code Breakdown: State Validation Phase:
I/O Operation Phase:
Stream Closure Handling:
Features:
Stream Lifecycle Management8. Close OperationsComplete Closeasync def close(self) -> None:
"""Close stream completely and clean up resources."""
await self.muxed_stream.close()
await self.set_state(StreamState.CLOSE_BOTH)
await self.remove() Half-close Operationsasync def close_read(self) -> None:
"""Close the stream for reading only."""
async with self._state_lock:
if self._state == StreamState.ERROR:
raise StreamError("Cannot close read on stream; stream is in error state")
elif self._state == StreamState.OPEN:
self._state = StreamState.CLOSE_READ
elif self._state == StreamState.CLOSE_WRITE:
self._state = StreamState.CLOSE_BOTH
await self.remove()
async def close_write(self) -> None:
"""Close the stream for writing only."""
async with self._state_lock:
if self._state == StreamState.ERROR:
raise StreamError("Cannot close write on stream; stream is in error state")
await self.muxed_stream.close()
async with self._state_lock:
if self._state == StreamState.OPEN:
self._state = StreamState.CLOSE_WRITE
elif self._state == StreamState.CLOSE_READ:
self._state = StreamState.CLOSE_BOTH
await self.remove() Half-close Support: Essential for QUIC transport where streams can independently close read/write sides. 9. Reset Operationasync def reset(self) -> None:
"""Reset stream."""
# Allow reset even from ERROR state for cleanup purposes
try:
await self.muxed_stream.reset()
except Exception:
# If reset fails, we still want to mark the stream as reset
# This allows cleanup to proceed even if the underlying stream is broken
pass
async with self._state_lock:
self._state = StreamState.RESET
await self.remove() Code Breakdown:
Robust Cleanup Design:
State Transition Logic:
10. Stream Removalasync def remove(self) -> None:
"""Remove the stream from the connection and notify swarm that stream was closed."""
if self.swarm_conn is not None:
self.swarm_conn.remove_stream(self)
await self.swarm_conn.swarm.notify_closed_stream(self) Code Breakdown:
Event Notification Design:
Resource Cleanup:
Utility Methods11. Remote Addressdef get_remote_address(self) -> tuple[str, int] | None:
"""Delegate to the underlying muxed stream."""
return self.muxed_stream.get_remote_address() Delegation Pattern: Simple delegation to underlying muxed stream. 12. Operational Checkasync def is_operational(self) -> bool:
"""Check if stream is in an operational state."""
async with self._state_lock:
return self._state not in [
StreamState.ERROR,
StreamState.RESET,
StreamState.CLOSE_BOTH,
] Code Breakdown:
State-based Validation: Determines if stream can perform I/O operations based on current state. Operational States:
Non-operational States:
Usage Pattern: if await stream.is_operational():
# Stream can perform I/O operations
data = await stream.read(1024)
else:
# Stream is in terminal state, cannot perform I/O
# Handle accordingly Thread Safety and Concurrency13. Lock StrategyKey Principles:
Race Condition Prevention: # Example: Read operation
async with self._state_lock:
# Atomic state validation
if self._state == StreamState.ERROR:
raise StreamError(...)
# Lock released during I/O
result = await self.muxed_stream.read(n)
# Lock re-acquired for state transition
async with self._state_lock:
if self._state == StreamState.OPEN:
self._state = StreamState.CLOSE_READ Error Handling Strategy14. Exception HierarchyKnown Exceptions (Handled Gracefully):
Unknown Exceptions (Trigger ERROR State):
15. Detailed Exception Handling in Read Operationexcept MuxedStreamEOF as error:
# Handle state transitions when EOF is encountered
async with self._state_lock:
if self._state == StreamState.CLOSE_WRITE:
self._state = StreamState.CLOSE_BOTH
elif self._state == StreamState.OPEN:
self._state = StreamState.CLOSE_READ
raise StreamEOF() from error
except (
MuxedStreamReset,
QUICStreamClosedError,
QUICStreamResetError,
) as error:
async with self._state_lock:
self._state = StreamState.RESET
raise StreamReset() from error
except Exception as error:
# Only set ERROR state for truly unexpected errors
# Known exceptions (MuxedStreamEOF, MuxedStreamReset, etc.)
# are handled above
if not isinstance(
error,
(
MuxedStreamEOF,
MuxedStreamReset,
QUICStreamClosedError,
QUICStreamResetError,
StreamEOF,
StreamReset,
StreamClosed,
ValueError, # QUIC stream errors
),
):
async with self._state_lock:
self._state = StreamState.ERROR
raise StreamError(f"Read operation failed: {error}") from error
# Re-raise known exceptions as-is
raise Code Breakdown: EOF Exception Handler:
Reset Exception Handler:
Catch-All Exception Handler:
Exception Filtering Logic:
15. State Transition LogicEOF Handling: except MuxedStreamEOF as error:
async with self._state_lock:
if self._state == StreamState.CLOSE_WRITE:
self._state = StreamState.CLOSE_BOTH
elif self._state == StreamState.OPEN:
self._state = StreamState.CLOSE_READ
raise StreamEOF() from error Reset Handling: except (MuxedStreamReset, QUICStreamResetError) as error:
async with self._state_lock:
self._state = StreamState.RESET
raise StreamReset() from error Documentation and Monitoring16. Comprehensive DocstringThe class includes extensive documentation covering:
17. Logging StrategyDebug Level: All state transitions Key Design Decisions18. AkMo3's Feedback ImplementationConcurrency Safety:
Documentation:
Terminal States:
19. QUIC CompatibilityHalf-close Support:
20. Performance ConsiderationsLock Optimization:
Exception Handling:
SummaryThe
This implementation addresses all concerns raised by AkMo3 while maintaining compatibility with existing functionality and providing robust stream management capabilities. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Description
We propose introducing a formal Stream State mechanism in
py-libp2p
to track the lifecycle of aStream
. This would allow both internal components and external consumers to determine whether a stream is open, closed, reset, or has errored.This mirrors behavior in
go-libp2p
andjs-libp2p
, and is aligned with good practices from transport protocols like TCP and QUIC.Motivation
Currently,
py-libp2p
has no explicit notion of a stream’s state. As a result:Introducing a structured
StreamState
will:libp2p
implementationsCurrent Implementation
Stream
class.asyncio
reader/writer objects.Proposed Change
Step 1: Define a StreamState Enum
Step 2: Add State to
Stream
ClassUpdate transitions in:
start()
→OPEN
reset()
→RESET
close()
→CLOSED
Exception
→ERRORED
Expose via:
Step 3: Guard Operations
Add guards in critical methods:
Apply similar logic to
write()
,reset()
, andclose()
.Example Usage
Design Notes
go-libp2p-core/network.Stream
semantics.Testing Plan
Next Steps
StreamState
enum and transition logicStream
classRelated Topics
yamux-py
or multiplexers for clean lifecycle managementBeta Was this translation helpful? Give feedback.
All reactions