@@ -5,18 +5,18 @@ use log::{debug, error, info};
5
5
use nix:: sys:: signal:: { self , Signal } ;
6
6
use nix:: unistd:: Pid ;
7
7
use std:: collections:: HashMap ;
8
+ use std:: marker:: Unpin ;
8
9
/// Admin database.
9
10
use std:: sync:: atomic:: Ordering ;
11
+ use tokio:: io:: AsyncWrite ;
10
12
use tokio:: time:: Instant ;
11
13
12
14
use crate :: config:: { get_config, reload_config, VERSION } ;
13
- use crate :: errors:: Error ;
15
+ use crate :: errors:: { Error , ProtocolSyncError , ServerError } ;
14
16
use crate :: messages:: * ;
15
17
use crate :: pool:: get_all_pools;
16
18
use crate :: pool:: ClientServerMap ;
17
19
use crate :: stats:: client:: { CLIENT_STATE_ACTIVE , CLIENT_STATE_IDLE } ;
18
- #[ cfg( target_os = "linux" ) ]
19
- use crate :: stats:: get_socket_states_count;
20
20
use crate :: stats:: server:: { SERVER_STATE_ACTIVE , SERVER_STATE_IDLE } ;
21
21
use crate :: stats:: {
22
22
get_client_stats, get_server_stats, CANCEL_CONNECTION_COUNTER , PLAIN_CONNECTION_COUNTER ,
@@ -42,15 +42,15 @@ pub async fn handle_admin<T>(
42
42
client_server_map : ClientServerMap ,
43
43
) -> Result < ( ) , Error >
44
44
where
45
- T : tokio :: io :: AsyncWrite + std :: marker :: Unpin ,
45
+ T : AsyncWrite + Unpin ,
46
46
{
47
- let code = query. get_u8 ( ) as char ;
48
-
49
- if code != 'Q' {
50
- return Err ( Error :: ProtocolSyncError ( format ! (
51
- "Invalid code, expected 'Q' but got '{}'" ,
52
- code
53
- ) ) ) ;
47
+ let code = query. get_u8 ( ) ;
48
+ if code != b'Q' {
49
+ return Err ( ProtocolSyncError :: InvalidCode {
50
+ expected : b'Q' ,
51
+ actual : code,
52
+ }
53
+ . into ( ) ) ;
54
54
}
55
55
56
56
let len = query. get_i32 ( ) as usize ;
@@ -110,7 +110,7 @@ where
110
110
/// Column-oriented statistics.
111
111
async fn show_lists < T > ( stream : & mut T ) -> Result < ( ) , Error >
112
112
where
113
- T : tokio :: io :: AsyncWrite + std :: marker :: Unpin ,
113
+ T : AsyncWrite + Unpin ,
114
114
{
115
115
let client_stats = get_client_stats ( ) ;
116
116
let server_stats = get_server_stats ( ) ;
@@ -206,13 +206,13 @@ where
206
206
res. put_i32 ( 5 ) ;
207
207
res. put_u8 ( b'I' ) ;
208
208
209
- write_all_half ( stream, & res) . await
209
+ Ok ( write_all_half ( stream, & res) . await ? )
210
210
}
211
211
212
212
/// Show PgDoorman version.
213
213
async fn show_version < T > ( stream : & mut T ) -> Result < ( ) , Error >
214
214
where
215
- T : tokio :: io :: AsyncWrite + std :: marker :: Unpin ,
215
+ T : AsyncWrite + Unpin ,
216
216
{
217
217
let mut res = BytesMut :: new ( ) ;
218
218
@@ -224,13 +224,13 @@ where
224
224
res. put_i32 ( 5 ) ;
225
225
res. put_u8 ( b'I' ) ;
226
226
227
- write_all_half ( stream, & res) . await
227
+ Ok ( write_all_half ( stream, & res) . await ? )
228
228
}
229
229
230
230
/// Show utilization of connection pools for each pool.
231
231
async fn show_pools < T > ( stream : & mut T ) -> Result < ( ) , Error >
232
232
where
233
- T : tokio :: io :: AsyncWrite + std :: marker :: Unpin ,
233
+ T : AsyncWrite + Unpin ,
234
234
{
235
235
let pool_lookup = PoolStats :: construct_pool_lookup ( ) ;
236
236
let mut res = BytesMut :: new ( ) ;
@@ -245,13 +245,13 @@ where
245
245
res. put_i32 ( 5 ) ;
246
246
res. put_u8 ( b'I' ) ;
247
247
248
- write_all_half ( stream, & res) . await
248
+ Ok ( write_all_half ( stream, & res) . await ? )
249
249
}
250
250
251
251
/// Show extended utilization of connection pools for each pool.
252
252
async fn show_pools_extended < T > ( stream : & mut T ) -> Result < ( ) , Error >
253
253
where
254
- T : tokio :: io :: AsyncWrite + std :: marker :: Unpin ,
254
+ T : AsyncWrite + Unpin ,
255
255
{
256
256
let pool_lookup = PoolStats :: construct_pool_lookup ( ) ;
257
257
let mut res = BytesMut :: new ( ) ;
@@ -268,13 +268,13 @@ where
268
268
res. put_i32 ( 5 ) ;
269
269
res. put_u8 ( b'I' ) ;
270
270
271
- write_all_half ( stream, & res) . await
271
+ Ok ( write_all_half ( stream, & res) . await ? )
272
272
}
273
273
274
274
/// Show all available options.
275
275
async fn show_help < T > ( stream : & mut T ) -> Result < ( ) , Error >
276
276
where
277
- T : tokio :: io :: AsyncWrite + std :: marker :: Unpin ,
277
+ T : AsyncWrite + Unpin ,
278
278
{
279
279
let mut res = BytesMut :: new ( ) ;
280
280
@@ -307,13 +307,13 @@ where
307
307
res. put_i32 ( 5 ) ;
308
308
res. put_u8 ( b'I' ) ;
309
309
310
- write_all_half ( stream, & res) . await
310
+ Ok ( write_all_half ( stream, & res) . await ? )
311
311
}
312
312
313
313
/// Show databases.
314
314
async fn show_databases < T > ( stream : & mut T ) -> Result < ( ) , Error >
315
315
where
316
- T : tokio :: io :: AsyncWrite + std :: marker :: Unpin ,
316
+ T : AsyncWrite + Unpin ,
317
317
{
318
318
// Columns
319
319
let columns = vec ! [
@@ -361,22 +361,22 @@ where
361
361
res. put_i32 ( 5 ) ;
362
362
res. put_u8 ( b'I' ) ;
363
363
364
- write_all_half ( stream, & res) . await
364
+ Ok ( write_all_half ( stream, & res) . await ? )
365
365
}
366
366
367
367
/// Ignore any SET commands the client sends.
368
368
/// This is common initialization done by ORMs.
369
369
async fn ignore_set < T > ( stream : & mut T ) -> Result < ( ) , Error >
370
370
where
371
- T : tokio :: io :: AsyncWrite + std :: marker :: Unpin ,
371
+ T : AsyncWrite + Unpin ,
372
372
{
373
373
custom_protocol_response_ok ( stream, "SET" ) . await
374
374
}
375
375
376
376
/// Reload the configuration file without restarting the process.
377
377
async fn reload < T > ( stream : & mut T , client_server_map : ClientServerMap ) -> Result < ( ) , Error >
378
378
where
379
- T : tokio :: io :: AsyncWrite + std :: marker :: Unpin ,
379
+ T : AsyncWrite + Unpin ,
380
380
{
381
381
info ! ( "Reloading config" ) ;
382
382
@@ -393,13 +393,13 @@ where
393
393
res. put_i32 ( 5 ) ;
394
394
res. put_u8 ( b'I' ) ;
395
395
396
- write_all_half ( stream, & res) . await
396
+ Ok ( write_all_half ( stream, & res) . await ? )
397
397
}
398
398
399
399
/// Shows current configuration.
400
400
async fn show_config < T > ( stream : & mut T ) -> Result < ( ) , Error >
401
401
where
402
- T : tokio :: io :: AsyncWrite + std :: marker :: Unpin ,
402
+ T : AsyncWrite + Unpin ,
403
403
{
404
404
let config = & get_config ( ) ;
405
405
let config: HashMap < String , String > = config. into ( ) ;
@@ -439,13 +439,13 @@ where
439
439
res. put_i32 ( 5 ) ;
440
440
res. put_u8 ( b'I' ) ;
441
441
442
- write_all_half ( stream, & res) . await
442
+ Ok ( write_all_half ( stream, & res) . await ? )
443
443
}
444
444
445
445
/// Show stats.
446
446
async fn show_stats < T > ( stream : & mut T ) -> Result < ( ) , Error >
447
447
where
448
- T : tokio :: io :: AsyncWrite + std :: marker :: Unpin ,
448
+ T : AsyncWrite + Unpin ,
449
449
{
450
450
let pool_lookup = PoolStats :: construct_pool_lookup ( ) ;
451
451
let mut res = BytesMut :: new ( ) ;
@@ -461,13 +461,13 @@ where
461
461
res. put_i32 ( 5 ) ;
462
462
res. put_u8 ( b'I' ) ;
463
463
464
- write_all_half ( stream, & res) . await
464
+ Ok ( write_all_half ( stream, & res) . await ? )
465
465
}
466
466
467
467
/// Show currently connected clients
468
468
async fn show_clients < T > ( stream : & mut T ) -> Result < ( ) , Error >
469
469
where
470
- T : tokio :: io :: AsyncWrite + std :: marker :: Unpin ,
470
+ T : AsyncWrite + Unpin ,
471
471
{
472
472
let columns = vec ! [
473
473
( "client_id" , DataType :: Text ) ,
@@ -517,12 +517,12 @@ where
517
517
res. put_i32 ( 5 ) ;
518
518
res. put_u8 ( b'I' ) ;
519
519
520
- write_all_half ( stream, & res) . await
520
+ Ok ( write_all_half ( stream, & res) . await ? )
521
521
}
522
522
523
523
async fn show_connections < T > ( stream : & mut T ) -> Result < ( ) , Error >
524
524
where
525
- T : tokio :: io :: AsyncWrite + std :: marker :: Unpin ,
525
+ T : AsyncWrite + Unpin ,
526
526
{
527
527
let columns = vec ! [
528
528
( "total" , DataType :: Numeric ) ,
@@ -556,12 +556,13 @@ where
556
556
res. put_i32 ( 5 ) ;
557
557
res. put_u8 ( b'I' ) ;
558
558
559
- write_all_half ( stream, & res) . await
559
+ Ok ( write_all_half ( stream, & res) . await ? )
560
560
}
561
+
561
562
/// Show currently connected servers
562
563
async fn show_servers < T > ( stream : & mut T ) -> Result < ( ) , Error >
563
564
where
564
- T : tokio :: io :: AsyncWrite + std :: marker :: Unpin ,
565
+ T : AsyncWrite + Unpin ,
565
566
{
566
567
let columns = vec ! [
567
568
( "server_id" , DataType :: Text ) ,
@@ -627,13 +628,13 @@ where
627
628
res. put_i32 ( 5 ) ;
628
629
res. put_u8 ( b'I' ) ;
629
630
630
- write_all_half ( stream, & res) . await
631
+ Ok ( write_all_half ( stream, & res) . await ? )
631
632
}
632
633
633
634
/// Send response packets for shutdown.
634
635
async fn shutdown < T > ( stream : & mut T ) -> Result < ( ) , Error >
635
636
where
636
- T : tokio :: io :: AsyncWrite + std :: marker :: Unpin ,
637
+ T : AsyncWrite + Unpin ,
637
638
{
638
639
let mut res = BytesMut :: new ( ) ;
639
640
@@ -655,13 +656,13 @@ where
655
656
res. put_i32 ( 5 ) ;
656
657
res. put_u8 ( b'I' ) ;
657
658
658
- write_all_half ( stream, & res) . await
659
+ Ok ( write_all_half ( stream, & res) . await ? )
659
660
}
660
661
661
662
/// Show Users.
662
663
async fn show_users < T > ( stream : & mut T ) -> Result < ( ) , Error >
663
664
where
664
- T : tokio :: io :: AsyncWrite + std :: marker :: Unpin ,
665
+ T : AsyncWrite + Unpin ,
665
666
{
666
667
let mut res = BytesMut :: new ( ) ;
667
668
@@ -684,20 +685,19 @@ where
684
685
res. put_i32 ( 5 ) ;
685
686
res. put_u8 ( b'I' ) ;
686
687
687
- write_all_half ( stream, & res) . await
688
+ Ok ( write_all_half ( stream, & res) . await ? )
688
689
}
689
690
690
691
#[ cfg( target_os = "linux" ) ]
691
692
async fn show_sockets < T > ( stream : & mut T ) -> Result < ( ) , Error >
692
693
where
693
- T : tokio :: io :: AsyncWrite + std :: marker :: Unpin ,
694
+ T : AsyncWrite + Unpin ,
694
695
{
696
+ use crate :: stats:: get_socket_states_count;
697
+
695
698
let mut res = BytesMut :: new ( ) ;
696
699
697
- let sockets_info = match get_socket_states_count ( std:: process:: id ( ) ) {
698
- Ok ( info) => info,
699
- Err ( _) => return Err ( Error :: ServerError ) ,
700
- } ;
700
+ let sockets_info = get_socket_states_count ( std:: process:: id ( ) ) . map_err ( ServerError :: from) ?;
701
701
702
702
res. put ( row_description ( & vec ! [
703
703
// tcp
@@ -747,5 +747,5 @@ where
747
747
res. put_i32 ( 5 ) ;
748
748
res. put_u8 ( b'I' ) ;
749
749
750
- write_all_half ( stream, & res) . await
750
+ Ok ( write_all_half ( stream, & res) . await ? )
751
751
}
0 commit comments