Skip to content

Commit 0c9b05a

Browse files
authored
enh(ws): reduce mem copy used with taos.client() (#499)
Also downgrade tracing levels in consumer cancellation.
1 parent 35233d0 commit 0c9b05a

File tree

5 files changed

+32
-24
lines changed

5 files changed

+32
-24
lines changed

taos-ws-sys/src/ws/stmt.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ unsafe fn stmt_init(
189189
.as_mut()
190190
.ok_or(TaosError::new(Code::FAILED, "taos is null"))?;
191191

192-
let stmt2 = Stmt2::new(taos.client());
192+
let stmt2 = Stmt2::new(taos.client_cloned());
193193

194194
block_in_place_or_global(stmt2.init_with_options(
195195
req_id,

taos-ws-sys/src/ws/stmt2.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ unsafe fn stmt2_init(taos: *mut TAOS, option: *mut TAOS_STMT2_OPTION) -> TaosRes
8686
.as_mut()
8787
.ok_or(TaosError::new(Code::INVALID_PARA, "taos is null"))?;
8888

89-
let stmt2 = Stmt2::new(taos.client());
89+
let stmt2 = Stmt2::new(taos.client_cloned());
9090

9191
let (req_id, single_stb_insert, single_table_bind_once, async_exec_fn, userdata) =
9292
match option.as_ref() {

taos-ws/src/consumer/conn.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ async fn send_messages(
188188
}
189189
}
190190
_ = close_reader.changed() => {
191-
tracing::info!("WebSocket sender received close signal");
191+
tracing::trace!("WebSocket sender received close signal");
192192
send_close_message(&mut ws_stream_sender).await;
193193
break;
194194
}
@@ -287,7 +287,7 @@ async fn read_messages(
287287
}
288288
}
289289
_ = close_reader.changed() => {
290-
tracing::info!("WebSocket reader received close signal");
290+
tracing::trace!("WebSocket reader received close signal");
291291
closed_normally = true;
292292
break;
293293
}
@@ -297,7 +297,11 @@ async fn read_messages(
297297
drop(message_tx);
298298

299299
if let Err(err) = message_handle.await {
300-
tracing::error!("handle messages task failed: {err:?}");
300+
if err.is_cancelled() {
301+
tracing::trace!("handle messages task was cancelled");
302+
} else {
303+
tracing::error!("handle messages task panicked: {err:?}");
304+
}
301305
}
302306

303307
if closed_normally {

taos-ws/src/query/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,11 @@ impl Taos {
3838
self.client.get_req_id()
3939
}
4040

41-
pub fn client(&self) -> Arc<WsTaos> {
41+
pub fn client(&self) -> &Arc<WsTaos> {
42+
&self.client
43+
}
44+
45+
pub fn client_cloned(&self) -> Arc<WsTaos> {
4246
self.client.clone()
4347
}
4448
}

taos-ws/src/stmt2/mod.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ impl Stmt2 {
605605

606606
impl Stmt2Bindable<super::Taos> for Stmt2 {
607607
fn init(taos: &super::Taos) -> RawResult<Self> {
608-
let stmt2 = Self::new(taos.client());
608+
let stmt2 = Self::new(taos.client_cloned());
609609
block_in_place_or_global(stmt2.init())?;
610610
Ok(stmt2)
611611
}
@@ -636,7 +636,7 @@ impl Stmt2Bindable<super::Taos> for Stmt2 {
636636
#[async_trait::async_trait]
637637
impl Stmt2AsyncBindable<super::Taos> for Stmt2 {
638638
async fn init(taos: &super::Taos) -> RawResult<Self> {
639-
let stmt2 = Self::new(taos.client());
639+
let stmt2 = Self::new(taos.client_cloned());
640640
stmt2.init().await?;
641641
Ok(stmt2)
642642
}
@@ -691,7 +691,7 @@ mod tests {
691691
])
692692
.await?;
693693

694-
let stmt2 = Stmt2::new(taos.client());
694+
let stmt2 = Stmt2::new(taos.client_cloned());
695695
stmt2.init().await?;
696696
stmt2
697697
.prepare("insert into t0 values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
@@ -783,7 +783,7 @@ mod tests {
783783
])
784784
.await?;
785785

786-
let stmt2 = Stmt2::new(taos.client());
786+
let stmt2 = Stmt2::new(taos.client_cloned());
787787
stmt2.init().await?;
788788
stmt2.prepare("insert into t0 values(?, ?)").await?;
789789

@@ -852,7 +852,7 @@ mod tests {
852852
])
853853
.await?;
854854

855-
let stmt2 = Stmt2::new(taos.client());
855+
let stmt2 = Stmt2::new(taos.client_cloned());
856856
stmt2.init().await?;
857857
stmt2
858858
.prepare("select * from t0 where c8 > ? and c10 > ? and c12 = ?")
@@ -937,7 +937,7 @@ mod tests {
937937
])
938938
.await?;
939939

940-
let stmt2 = Stmt2::new(taos.client());
940+
let stmt2 = Stmt2::new(taos.client_cloned());
941941
stmt2.init().await?;
942942
stmt2.prepare("select * from t0 where c1 > ?").await?;
943943

@@ -988,7 +988,7 @@ mod tests {
988988
])
989989
.await?;
990990

991-
let stmt2 = Stmt2::new(taos.client());
991+
let stmt2 = Stmt2::new(taos.client_cloned());
992992
stmt2.init().await?;
993993
stmt2
994994
.prepare("insert into ? using s0 tags(?) values(?, ?)")
@@ -1073,7 +1073,7 @@ mod tests {
10731073
])
10741074
.await?;
10751075

1076-
let stmt2 = Stmt2::new(taos.client());
1076+
let stmt2 = Stmt2::new(taos.client_cloned());
10771077
stmt2.init().await?;
10781078
stmt2
10791079
.prepare("insert into s0 (tbname, ts, c1, t1) values(?, ?, ?, ?)")
@@ -1158,7 +1158,7 @@ mod tests {
11581158
])
11591159
.await?;
11601160

1161-
let stmt2 = Stmt2::new(taos.client());
1161+
let stmt2 = Stmt2::new(taos.client_cloned());
11621162
stmt2.init().await?;
11631163
stmt2.prepare("insert into t0 values(?, ?)").await?;
11641164

@@ -1185,7 +1185,7 @@ mod tests {
11851185
])
11861186
.await?;
11871187

1188-
let mut stmt2 = Stmt2::new(taos.client());
1188+
let mut stmt2 = Stmt2::new(taos.client_cloned());
11891189
stmt2.init().await?;
11901190
stmt2.prepare("insert into t0 values(?, ?)").await?;
11911191

@@ -1298,7 +1298,7 @@ mod cloud_tests {
12981298
])
12991299
.await?;
13001300

1301-
let stmt2 = Stmt2::new(taos.client());
1301+
let stmt2 = Stmt2::new(taos.client_cloned());
13021302
stmt2.init().await?;
13031303

13041304
stmt2
@@ -1315,7 +1315,7 @@ mod cloud_tests {
13151315
let affected = stmt2.exec().await?;
13161316
assert_eq!(affected, 1);
13171317

1318-
let stmt2 = Stmt2::new(taos.client());
1318+
let stmt2 = Stmt2::new(taos.client_cloned());
13191319
stmt2.init().await?;
13201320

13211321
stmt2
@@ -1567,7 +1567,7 @@ mod recover_tests {
15671567
.build()
15681568
.await?;
15691569

1570-
let stmt2 = Stmt2::new(taos.client());
1570+
let stmt2 = Stmt2::new(taos.client_cloned());
15711571
stmt2.init().await?;
15721572

15731573
Ok(())
@@ -1614,7 +1614,7 @@ mod recover_tests {
16141614
])
16151615
.await?;
16161616

1617-
let stmt2 = Stmt2::new(taos.client());
1617+
let stmt2 = Stmt2::new(taos.client_cloned());
16181618
stmt2.init().await?;
16191619
stmt2
16201620
.prepare("select * from test_1755136975.t0 where c1 > ?")
@@ -1663,7 +1663,7 @@ mod recover_tests {
16631663
])
16641664
.await?;
16651665

1666-
let stmt2 = Stmt2::new(taos.client());
1666+
let stmt2 = Stmt2::new(taos.client_cloned());
16671667
stmt2.init().await?;
16681668
stmt2
16691669
.prepare("insert into test_1755137215.t0 values(?, ?)")
@@ -1720,7 +1720,7 @@ mod recover_tests {
17201720
])
17211721
.await?;
17221722

1723-
let stmt2 = Stmt2::new(taos.client());
1723+
let stmt2 = Stmt2::new(taos.client_cloned());
17241724
stmt2.init().await?;
17251725
stmt2
17261726
.prepare("insert into test_1755137720.t0 values(?, ?)")
@@ -1789,7 +1789,7 @@ mod recover_tests {
17891789
])
17901790
.await?;
17911791

1792-
let stmt2 = Stmt2::new(taos.client());
1792+
let stmt2 = Stmt2::new(taos.client_cloned());
17931793
stmt2.init().await?;
17941794
stmt2
17951795
.prepare("select * from test_1755138202.t0 where c1 > ?")
@@ -1862,7 +1862,7 @@ mod recover_tests {
18621862
let n = 3;
18631863
let mut tasks = Vec::with_capacity(n);
18641864
for i in 0..n {
1865-
let client = taos.client();
1865+
let client = taos.client_cloned();
18661866
tasks.push(tokio::spawn(async move {
18671867
let stmt2 = Stmt2::new(client);
18681868
stmt2.init().await?;

0 commit comments

Comments
 (0)