11use super :: utils:: * ;
22use crate :: colink_proto:: * ;
3+ pub use colink_registry_proto:: UserRecord ;
34use openssl:: sha:: sha256;
45use prost:: Message ;
56use secp256k1:: { ecdsa:: Signature , PublicKey , Secp256k1 } ;
67use tonic:: { Request , Response , Status } ;
78use uuid:: Uuid ;
89
10+ mod colink_registry_proto {
11+ include ! ( concat!( env!( "OUT_DIR" ) , "/colink_registry.rs" ) ) ;
12+ }
13+
914impl crate :: server:: MyService {
1015 pub async fn _create_task ( & self , request : Request < Task > ) -> Result < Response < Task > , Status > {
1116 Self :: check_privilege_in ( request. metadata ( ) , & [ "user" ] ) ?;
@@ -28,24 +33,13 @@ impl crate::server::MyService {
2833 self . _internal_storage_update ( & user_id, & format ! ( "tasks:{}" , task_id) , & payload)
2934 . await ?;
3035 for i in 1 ..task. participants . len ( ) {
31- let core_addr = self
32- . _internal_storage_read (
33- & user_id,
34- & format ! ( "known_users:{}:core_addr" , & task. participants[ i] . user_id) ,
35- )
36+ let ( core_addr, guest_jwt) = self
37+ . query_user_record ( & user_id, & task. participants [ i] . user_id )
3638 . await ?;
37- let core_addr = String :: from_utf8 ( core_addr) . unwrap ( ) ;
3839 let mut client = match self . _grpc_connect ( & core_addr) . await {
3940 Ok ( client) => client,
4041 Err ( e) => return Err ( Status :: internal ( format ! ( "{}" , e) ) ) ,
4142 } ;
42- let guest_jwt = self
43- . _internal_storage_read (
44- & user_id,
45- & format ! ( "known_users:{}:guest_jwt" , & task. participants[ i] . user_id) ,
46- )
47- . await ?;
48- let guest_jwt = String :: from_utf8 ( guest_jwt) . unwrap ( ) ;
4943 client
5044 . inter_core_sync_task ( generate_request ( & guest_jwt, task. clone ( ) ) )
5145 . await ?;
@@ -136,24 +130,13 @@ impl crate::server::MyService {
136130 drop ( task_storage_mutex) ;
137131
138132 if task. require_agreement && user_status != "ignored" {
139- let core_addr = self
140- . _internal_storage_read (
141- & user_id,
142- & format ! ( "known_users:{}:core_addr" , & task. participants[ 0 ] . user_id) ,
143- )
133+ let ( core_addr, guest_jwt) = self
134+ . query_user_record ( & user_id, & task. participants [ 0 ] . user_id )
144135 . await ?;
145- let core_addr = String :: from_utf8 ( core_addr) . unwrap ( ) ;
146136 let mut client = match self . _grpc_connect ( & core_addr) . await {
147137 Ok ( client) => client,
148138 Err ( e) => return Err ( Status :: internal ( format ! ( "{}" , e) ) ) ,
149139 } ;
150- let guest_jwt = self
151- . _internal_storage_read (
152- & user_id,
153- & format ! ( "known_users:{}:guest_jwt" , & task. participants[ 0 ] . user_id) ,
154- )
155- . await ?;
156- let guest_jwt = String :: from_utf8 ( guest_jwt) . unwrap ( ) ;
157140 client
158141 . inter_core_sync_task ( generate_request (
159142 & guest_jwt,
@@ -304,24 +287,13 @@ impl crate::server::MyService {
304287 // The initiator should broadcast the status change.
305288 if task. participants [ 0 ] . user_id == user_id && task. participants . len ( ) > 2 {
306289 for i in 1 ..task. participants . len ( ) {
307- let core_addr = self
308- . _internal_storage_read (
309- & user_id,
310- & format ! ( "known_users:{}:core_addr" , & task. participants[ i] . user_id) ,
311- )
290+ let ( core_addr, guest_jwt) = self
291+ . query_user_record ( & user_id, & task. participants [ i] . user_id )
312292 . await ?;
313- let core_addr = String :: from_utf8 ( core_addr) . unwrap ( ) ;
314293 let mut client = match self . _grpc_connect ( & core_addr) . await {
315294 Ok ( client) => client,
316295 Err ( e) => return Err ( Status :: internal ( format ! ( "{}" , e) ) ) ,
317296 } ;
318- let guest_jwt = self
319- . _internal_storage_read (
320- & user_id,
321- & format ! ( "known_users:{}:guest_jwt" , & task. participants[ i] . user_id) ,
322- )
323- . await ?;
324- let guest_jwt = String :: from_utf8 ( guest_jwt) . unwrap ( ) ;
325297 client
326298 . inter_core_sync_task ( generate_request ( & guest_jwt, task. clone ( ) ) )
327299 . await ?;
@@ -437,6 +409,89 @@ impl crate::server::MyService {
437409 Ok ( ( ) )
438410 }
439411
412+ async fn query_user_record (
413+ & self ,
414+ user_id : & str ,
415+ query_user_id : & str ,
416+ ) -> Result < ( String , String ) , Status > {
417+ let mut counter = 0 ;
418+ while self
419+ . _internal_storage_read (
420+ user_id,
421+ & format ! ( "known_users:{}:core_addr" , & query_user_id) ,
422+ )
423+ . await
424+ . is_err ( )
425+ || self
426+ . _internal_storage_read (
427+ user_id,
428+ & format ! ( "known_users:{}:guest_jwt" , & query_user_id) ,
429+ )
430+ . await
431+ . is_err ( )
432+ {
433+ if counter == 0 {
434+ let user = UserRecord {
435+ user_id : query_user_id. to_string ( ) ,
436+ ..Default :: default ( )
437+ } ;
438+ let mut payload = vec ! [ ] ;
439+ user. encode ( & mut payload) . unwrap ( ) ;
440+ let mut local_task = Task {
441+ task_id : Uuid :: new_v4 ( ) . to_string ( ) ,
442+ protocol_name : "registry" . to_string ( ) ,
443+ protocol_param : payload,
444+ participants : vec ! [ Participant {
445+ user_id: user_id. to_string( ) ,
446+ role: "query" . to_string( ) ,
447+ } ] ,
448+ require_agreement : false ,
449+ status : "started" . to_string ( ) ,
450+ expiration_time : chrono:: Utc :: now ( ) . timestamp ( ) + 86400 ,
451+ ..Default :: default ( )
452+ } ;
453+ local_task
454+ . decisions
455+ . resize ( local_task. participants . len ( ) , Default :: default ( ) ) ;
456+ local_task. decisions [ 0 ] = self
457+ . generate_decision ( true , false , "" , user_id, & local_task)
458+ . await ?;
459+ let mut payload = vec ! [ ] ;
460+ local_task. encode ( & mut payload) . unwrap ( ) ;
461+ self . _internal_storage_update (
462+ user_id,
463+ & format ! ( "tasks:{}" , local_task. task_id) ,
464+ & payload,
465+ )
466+ . await ?;
467+ let task_storage_mutex = self . task_storage_mutex . lock ( ) . await ;
468+ self . add_task_new_status ( user_id, & local_task) . await ?;
469+ drop ( task_storage_mutex) ;
470+ }
471+ // We choose 1 second as the retry interval and retry 30 times.
472+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 1 ) ) . await ;
473+ counter += 1 ;
474+ if counter > 30 {
475+ break ;
476+ }
477+ }
478+ let core_addr = self
479+ . _internal_storage_read (
480+ user_id,
481+ & format ! ( "known_users:{}:core_addr" , & query_user_id) ,
482+ )
483+ . await ?;
484+ let core_addr = String :: from_utf8 ( core_addr) . unwrap ( ) ;
485+ let guest_jwt = self
486+ . _internal_storage_read (
487+ user_id,
488+ & format ! ( "known_users:{}:guest_jwt" , & query_user_id) ,
489+ )
490+ . await ?;
491+ let guest_jwt = String :: from_utf8 ( guest_jwt) . unwrap ( ) ;
492+ Ok ( ( core_addr, guest_jwt) )
493+ }
494+
440495 /**
441496 * This function only checks the validity of the signature, user's decision will not be checked.
442497 */
0 commit comments