Skip to content

Commit ed85210

Browse files
committed
Regroup modules
1 parent ad0f0c7 commit ed85210

File tree

9 files changed

+497
-511
lines changed

9 files changed

+497
-511
lines changed

src/client/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,28 @@ impl Writer for Context {
316316
}
317317
}
318318

319+
#[cfg(any(feature = "rtu", feature = "tcp"))]
320+
pub(crate) async fn disconnect_framed<T, C>(
321+
framed: tokio_util::codec::Framed<T, C>,
322+
) -> std::io::Result<()>
323+
where
324+
T: tokio::io::AsyncWrite + Unpin,
325+
{
326+
use tokio::io::AsyncWriteExt as _;
327+
328+
framed
329+
.into_inner()
330+
.shutdown()
331+
.await
332+
.or_else(|err| match err.kind() {
333+
std::io::ErrorKind::NotConnected | std::io::ErrorKind::BrokenPipe => {
334+
// Already disconnected.
335+
Ok(())
336+
}
337+
_ => Err(err),
338+
})
339+
}
340+
319341
#[cfg(test)]
320342
mod tests {
321343
use crate::{Error, Result};

src/client/rtu.rs

Lines changed: 255 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,29 +3,278 @@
33

44
//! RTU client connections
55
6+
use std::{fmt, io};
7+
8+
use futures_util::{SinkExt as _, StreamExt as _};
69
use tokio::io::{AsyncRead, AsyncWrite};
10+
use tokio_util::codec::Framed;
711

8-
use crate::rtu::{Client, ClientContext};
12+
use crate::{
13+
codec::rtu::ClientCodec,
14+
frame::{
15+
rtu::{Header, RequestAdu},
16+
RequestPdu,
17+
},
18+
slave::SlaveContext,
19+
FunctionCode, Request, Response, Result, Slave,
20+
};
921

10-
use super::*;
22+
use super::{disconnect_framed, Context};
1123

12-
/// Connect to no particular Modbus slave device for sending
24+
/// Connect to no particular _Modbus_ slave device for sending
1325
/// broadcast messages.
1426
pub fn attach<T>(transport: T) -> Context
1527
where
16-
T: AsyncRead + AsyncWrite + Debug + Unpin + Send + 'static,
28+
T: AsyncRead + AsyncWrite + fmt::Debug + Unpin + Send + 'static,
1729
{
1830
attach_slave(transport, Slave::broadcast())
1931
}
2032

21-
/// Connect to any kind of Modbus slave device.
33+
/// Connect to any kind of _Modbus_ slave device.
2234
pub fn attach_slave<T>(transport: T, slave: Slave) -> Context
2335
where
24-
T: AsyncRead + AsyncWrite + Debug + Unpin + Send + 'static,
36+
T: AsyncRead + AsyncWrite + fmt::Debug + Unpin + Send + 'static,
2537
{
2638
let client = Client::new(transport);
2739
let context = ClientContext::new(client, slave);
2840
Context {
2941
client: Box::new(context),
3042
}
3143
}
44+
45+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
46+
pub struct RequestContext {
47+
pub(crate) function_code: FunctionCode,
48+
pub(crate) header: Header,
49+
}
50+
51+
impl RequestContext {
52+
#[must_use]
53+
pub const fn function_code(&self) -> FunctionCode {
54+
self.function_code
55+
}
56+
}
57+
58+
/// _Modbus_ RTU client.
59+
#[derive(Debug)]
60+
pub struct Client<T> {
61+
framed: Framed<T, ClientCodec>,
62+
}
63+
64+
impl<T> Client<T>
65+
where
66+
T: AsyncRead + AsyncWrite + Unpin,
67+
{
68+
pub fn new(transport: T) -> Self {
69+
let framed = Framed::new(transport, ClientCodec::default());
70+
Self { framed }
71+
}
72+
73+
pub async fn disconnect(self) -> io::Result<()> {
74+
let Self { framed } = self;
75+
disconnect_framed(framed).await
76+
}
77+
78+
pub async fn call<'a>(&mut self, server: Slave, request: Request<'a>) -> Result<Response> {
79+
let request_context = self.send_request(server, request).await?;
80+
self.recv_response(request_context).await
81+
}
82+
83+
pub async fn send_request<'a>(
84+
&mut self,
85+
server: Slave,
86+
request: Request<'a>,
87+
) -> io::Result<RequestContext> {
88+
self.send_request_pdu(server, request).await
89+
}
90+
91+
async fn send_request_pdu<'a, R>(
92+
&mut self,
93+
server: Slave,
94+
request_pdu: R,
95+
) -> io::Result<RequestContext>
96+
where
97+
R: Into<RequestPdu<'a>>,
98+
{
99+
let request_adu = request_adu(server, request_pdu);
100+
self.send_request_adu(request_adu).await
101+
}
102+
103+
async fn send_request_adu<'a>(
104+
&mut self,
105+
request_adu: RequestAdu<'a>,
106+
) -> io::Result<RequestContext> {
107+
let request_context = request_adu.context();
108+
109+
self.framed.read_buffer_mut().clear();
110+
self.framed.send(request_adu).await?;
111+
112+
Ok(request_context)
113+
}
114+
115+
pub async fn recv_response(&mut self, request_context: RequestContext) -> Result<Response> {
116+
let response_adu = self
117+
.framed
118+
.next()
119+
.await
120+
.unwrap_or_else(|| Err(io::Error::from(io::ErrorKind::BrokenPipe)))?;
121+
122+
response_adu.try_into_response(request_context)
123+
}
124+
}
125+
126+
/// _Modbus_ RTU client with (server) context and connection state.
127+
///
128+
/// Client that invokes methods (request/response) on a single or many (broadcast) server(s).
129+
///
130+
/// The server can be switched between method calls.
131+
#[derive(Debug)]
132+
pub struct ClientContext<T> {
133+
client: Option<Client<T>>,
134+
server: Slave,
135+
}
136+
137+
impl<T> ClientContext<T> {
138+
pub fn new(client: Client<T>, server: Slave) -> Self {
139+
Self {
140+
client: Some(client),
141+
server,
142+
}
143+
}
144+
145+
#[must_use]
146+
pub const fn is_connected(&self) -> bool {
147+
self.client.is_some()
148+
}
149+
150+
#[must_use]
151+
pub const fn server(&self) -> Slave {
152+
self.server
153+
}
154+
155+
pub fn set_server(&mut self, server: Slave) {
156+
self.server = server;
157+
}
158+
}
159+
160+
impl<T> ClientContext<T>
161+
where
162+
T: AsyncWrite + Unpin,
163+
{
164+
pub async fn disconnect(&mut self) -> io::Result<()> {
165+
let Some(client) = self.client.take() else {
166+
// Already disconnected.
167+
return Ok(());
168+
};
169+
disconnect_framed(client.framed).await
170+
}
171+
}
172+
173+
impl<T> ClientContext<T>
174+
where
175+
T: AsyncRead + AsyncWrite + Unpin,
176+
{
177+
pub async fn call(&mut self, request: Request<'_>) -> Result<Response> {
178+
log::debug!("Call {:?}", request);
179+
180+
let Some(client) = &mut self.client else {
181+
return Err(io::Error::new(io::ErrorKind::NotConnected, "disconnected").into());
182+
};
183+
184+
client.call(self.server, request).await
185+
}
186+
}
187+
188+
impl<T> ClientContext<T>
189+
where
190+
T: AsyncRead + AsyncWrite + Unpin + fmt::Debug + Send + 'static,
191+
{
192+
#[must_use]
193+
pub fn boxed(self) -> Box<dyn crate::client::Client> {
194+
Box::new(self)
195+
}
196+
}
197+
198+
impl<T> SlaveContext for ClientContext<T> {
199+
fn set_slave(&mut self, slave: Slave) {
200+
self.set_server(slave);
201+
}
202+
}
203+
204+
#[async_trait::async_trait]
205+
impl<T> crate::client::Client for ClientContext<T>
206+
where
207+
T: fmt::Debug + AsyncRead + AsyncWrite + Send + Unpin,
208+
{
209+
async fn call(&mut self, req: Request<'_>) -> Result<Response> {
210+
self.call(req).await
211+
}
212+
213+
async fn disconnect(&mut self) -> io::Result<()> {
214+
self.disconnect().await
215+
}
216+
}
217+
218+
fn request_adu<'a, R>(server: Slave, request_pdu: R) -> RequestAdu<'a>
219+
where
220+
R: Into<RequestPdu<'a>>,
221+
{
222+
let hdr = Header { slave: server };
223+
let pdu = request_pdu.into();
224+
RequestAdu { hdr, pdu }
225+
}
226+
227+
#[cfg(test)]
228+
mod tests {
229+
use core::{
230+
pin::Pin,
231+
task::{Context, Poll},
232+
};
233+
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf, Result};
234+
235+
use crate::Error;
236+
237+
use super::*;
238+
239+
#[derive(Debug)]
240+
struct MockTransport;
241+
242+
impl Unpin for MockTransport {}
243+
244+
impl AsyncRead for MockTransport {
245+
fn poll_read(
246+
self: Pin<&mut Self>,
247+
_: &mut Context<'_>,
248+
_: &mut ReadBuf<'_>,
249+
) -> Poll<Result<()>> {
250+
Poll::Ready(Ok(()))
251+
}
252+
}
253+
254+
impl AsyncWrite for MockTransport {
255+
fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, _: &[u8]) -> Poll<Result<usize>> {
256+
Poll::Ready(Ok(2))
257+
}
258+
259+
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
260+
Poll::Ready(Ok(()))
261+
}
262+
263+
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
264+
unimplemented!()
265+
}
266+
}
267+
268+
#[tokio::test]
269+
async fn handle_broken_pipe() {
270+
let transport = MockTransport;
271+
let client = Client::new(transport);
272+
let mut context = ClientContext::new(client, Slave::broadcast());
273+
let res = context.call(Request::ReadCoils(0x00, 5)).await;
274+
assert!(res.is_err());
275+
let err = res.err().unwrap();
276+
assert!(
277+
matches!(err, Error::Transport(err) if err.kind() == std::io::ErrorKind::BrokenPipe)
278+
);
279+
}
280+
}

0 commit comments

Comments
 (0)