1
1
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
2
2
3
- use crate :: {
4
- pd:: { RetryClient , RetryClientTrait } ,
5
- region:: { RegionId , RegionVerId , RegionWithLeader , StoreId } ,
6
- Key , Result ,
7
- } ;
8
3
use std:: {
9
4
collections:: { BTreeMap , HashMap , HashSet } ,
10
5
sync:: Arc ,
11
6
} ;
7
+
8
+ use tokio:: sync:: { Notify , RwLock } ;
9
+
12
10
use tikv_client_common:: Error ;
13
11
use tikv_client_pd:: Cluster ;
14
12
use tikv_client_proto:: metapb:: { self , Store } ;
15
- use tokio:: sync:: { Notify , RwLock } ;
13
+
14
+ use crate :: {
15
+ pd:: { RetryClient , RetryClientTrait } ,
16
+ region:: { RegionId , RegionVerId , RegionWithLeader , StoreId } ,
17
+ request:: request_codec:: RequestCodec ,
18
+ Key , Result ,
19
+ } ;
16
20
17
21
const MAX_RETRY_WAITING_CONCURRENT_REQUEST : usize = 4 ;
18
22
@@ -44,23 +48,25 @@ impl RegionCacheMap {
44
48
}
45
49
}
46
50
47
- pub struct RegionCache < Client = RetryClient < Cluster > > {
51
+ pub struct RegionCache < C , Client = RetryClient < Cluster > > {
48
52
region_cache : RwLock < RegionCacheMap > ,
49
53
store_cache : RwLock < HashMap < StoreId , Store > > ,
50
54
inner_client : Arc < Client > ,
55
+ codec : C ,
51
56
}
52
57
53
- impl < Client > RegionCache < Client > {
54
- pub fn new ( inner_client : Arc < Client > ) -> RegionCache < Client > {
58
+ impl < C , Client > RegionCache < C , Client > {
59
+ pub fn new ( codec : C , inner_client : Arc < Client > ) -> Self {
55
60
RegionCache {
56
61
region_cache : RwLock :: new ( RegionCacheMap :: new ( ) ) ,
57
62
store_cache : RwLock :: new ( HashMap :: new ( ) ) ,
58
63
inner_client,
64
+ codec,
59
65
}
60
66
}
61
67
}
62
68
63
- impl < C : RetryClientTrait > RegionCache < C > {
69
+ impl < C : RequestCodec , R : RetryClientTrait > RegionCache < C , R > {
64
70
// Retrieve cache entry by key. If there's no entry, query PD and update cache.
65
71
pub async fn get_region_by_key ( & self , key : & Key ) -> Result < RegionWithLeader > {
66
72
let region_cache_guard = self . region_cache . read ( ) . await ;
@@ -126,9 +132,14 @@ impl<C: RetryClientTrait> RegionCache<C> {
126
132
127
133
/// Force read through (query from PD) and update cache
128
134
pub async fn read_through_region_by_key ( & self , key : Key ) -> Result < RegionWithLeader > {
129
- let region = self . inner_client . clone ( ) . get_region ( key. into ( ) ) . await ?;
130
- self . add_region ( region. clone ( ) ) . await ;
131
- Ok ( region)
135
+ let mut r = self
136
+ . inner_client
137
+ . clone ( )
138
+ . get_region ( self . codec . encode_pd_query ( key) . into ( ) )
139
+ . await ?;
140
+ r. region = self . codec . decode_region ( r. region ) ?;
141
+ self . add_region ( r. clone ( ) ) . await ;
142
+ Ok ( r)
132
143
}
133
144
134
145
/// Force read through (query from PD) and update cache
@@ -140,7 +151,8 @@ impl<C: RetryClientTrait> RegionCache<C> {
140
151
region_cache_guard. on_my_way_id . insert ( id, notify. clone ( ) ) ;
141
152
}
142
153
143
- let region = self . inner_client . clone ( ) . get_region_by_id ( id) . await ?;
154
+ let mut region = self . inner_client . clone ( ) . get_region_by_id ( id) . await ?;
155
+ region. region = self . codec . decode_region ( region. region ) ?;
144
156
self . add_region ( region. clone ( ) ) . await ;
145
157
146
158
// notify others
@@ -226,27 +238,35 @@ impl<C: RetryClientTrait> RegionCache<C> {
226
238
cache. key_to_ver_id . remove ( & start_key) ;
227
239
}
228
240
}
241
+
242
+ pub fn get_request_codec ( & self ) -> C {
243
+ self . codec . clone ( )
244
+ }
229
245
}
230
246
231
247
#[ cfg( test) ]
232
248
mod test {
233
- use super :: RegionCache ;
234
- use crate :: {
235
- pd:: RetryClientTrait ,
236
- region:: { RegionId , RegionWithLeader } ,
237
- Key , Result ,
238
- } ;
239
- use async_trait:: async_trait;
240
249
use std:: {
241
250
collections:: { BTreeMap , HashMap , HashSet } ,
242
251
sync:: {
243
252
atomic:: { AtomicU64 , Ordering :: SeqCst } ,
244
253
Arc ,
245
254
} ,
246
255
} ;
256
+
257
+ use async_trait:: async_trait;
258
+ use tokio:: sync:: Mutex ;
259
+
247
260
use tikv_client_common:: Error ;
248
261
use tikv_client_proto:: metapb;
249
- use tokio:: sync:: Mutex ;
262
+
263
+ use crate :: {
264
+ pd:: RetryClientTrait ,
265
+ region:: { RegionId , RegionWithLeader } ,
266
+ Key , Result ,
267
+ } ;
268
+
269
+ use super :: RegionCache ;
250
270
251
271
#[ derive( Default ) ]
252
272
struct MockRetryClient {
0 commit comments