@@ -292,7 +292,7 @@ check_incoming_control_channel(struct context *c)
292292
293293 struct gc_arena gc = gc_new ();
294294 struct buffer buf = alloc_buf_gc (len , & gc );
295- if (tls_rec_payload (c -> c2 .tls_multi , & buf ))
295+ while (tls_rec_payload (c -> c2 .tls_multi , & buf ))
296296 {
297297 while (BLEN (& buf ) > 1 )
298298 {
@@ -304,10 +304,6 @@ check_incoming_control_channel(struct context *c)
304304 }
305305 }
306306 }
307- else
308- {
309- msg (D_PUSH_ERRORS , "WARNING: Receive control message failed" );
310- }
311307
312308 gc_free (& gc );
313309}
@@ -376,20 +372,18 @@ check_connection_established(struct context *c)
376372#endif
377373
378374bool
379- send_control_channel_string_dowork (struct tls_session * session , const char * str ,
380- msglvl_t msglevel )
375+ send_control_channel_string_dowork (struct tls_multi * multi , struct key_state * ks , const char * str , msglvl_t msglevel )
381376{
382377 struct gc_arena gc = gc_new ();
383378 bool stat ;
384379
385- ASSERT (session );
386- struct key_state * ks = & session -> key [KS_PRIMARY ];
380+ ASSERT (multi );
387381
388382 /* buffered cleartext write onto TLS control channel */
389383 stat = tls_send_payload (ks , (uint8_t * )str , strlen (str ) + 1 );
390384
391385 msg (msglevel , "SENT CONTROL [%s]: '%s' (status=%d)" ,
392- session -> common_name ? session -> common_name : "UNDEF" , sanitize_control_message (str , & gc ),
386+ multi -> common_name ? multi -> common_name : "UNDEF" , sanitize_control_message (str , & gc ),
393387 (int )stat );
394388
395389 gc_free (& gc );
@@ -406,12 +400,12 @@ reschedule_multi_process(struct context *c)
406400bool
407401send_control_channel_string (struct context * c , const char * str , msglvl_t msglevel )
408402{
409- if (c -> c2 .tls_multi )
403+ struct tls_multi * multi = c -> c2 .tls_multi ;
404+ if (multi )
410405 {
411- struct tls_session * session = & c -> c2 . tls_multi -> session [ TM_ACTIVE ] ;
412- bool ret = send_control_channel_string_dowork (session , str , msglevel );
406+ struct key_state * ks = tls_select_encryption_key_init ( multi ) ;
407+ bool ret = send_control_channel_string_dowork (multi , ks , str , msglevel );
413408 reschedule_multi_process (c );
414-
415409 return ret ;
416410 }
417411 return true;
@@ -2358,11 +2352,13 @@ get_io_flags_udp(struct context *c, struct multi_io *multi_io, const unsigned in
23582352}
23592353
23602354void
2361- io_wait_dowork (struct context * c , const unsigned int flags )
2355+ io_wait_dowork (struct context * c , const unsigned int flags , int t )
23622356{
23632357 unsigned int out_socket ;
23642358 unsigned int out_tuntap ;
23652359 struct event_set_return esr [4 ];
2360+ struct event_set * event_set = ((t & THREAD_RTWL ) != 0 ) ? c -> c2 .event_set : c -> c2 .event_set2 ;
2361+ unsigned int * event_set_status = ((t & THREAD_RTWL ) != 0 ) ? & (c -> c2 .event_set_status ) : & (c -> c2 .event_set_status2 );
23662362
23672363 /* These shifts all depend on EVENT_READ and EVENT_WRITE */
23682364 static uintptr_t socket_shift = SOCKET_SHIFT ; /* depends on SOCKET_READ and SOCKET_WRITE */
@@ -2380,29 +2376,29 @@ io_wait_dowork(struct context *c, const unsigned int flags)
23802376 /*
23812377 * Decide what kind of events we want to wait for.
23822378 */
2383- event_reset (c -> c2 . event_set );
2379+ event_reset (event_set );
23842380
2385- multi_io_process_flags (c , c -> c2 . event_set , flags , & out_socket , & out_tuntap );
2381+ multi_io_process_flags (c , event_set , flags , & out_socket , & out_tuntap );
23862382
23872383#if defined(TARGET_LINUX ) || defined(TARGET_FREEBSD )
23882384 if (out_socket & EVENT_READ && c -> c2 .did_open_tun )
23892385 {
2390- dco_event_set (& c -> c1 .tuntap -> dco , c -> c2 . event_set , (void * )dco_shift );
2386+ dco_event_set (& c -> c1 .tuntap -> dco , event_set , (void * )dco_shift );
23912387 }
23922388#endif
23932389
23942390#ifdef ENABLE_MANAGEMENT
23952391 if (management )
23962392 {
2397- management_socket_set (management , c -> c2 . event_set , (void * )management_shift , NULL );
2393+ management_socket_set (management , event_set , (void * )management_shift , NULL );
23982394 }
23992395#endif
24002396
24012397#ifdef ENABLE_ASYNC_PUSH
24022398 /* arm inotify watcher */
24032399 if (c -> options .mode == MODE_SERVER )
24042400 {
2405- event_ctl (c -> c2 . event_set , c -> c2 .inotify_fd , EVENT_READ , (void * )file_shift );
2401+ event_ctl (event_set , c -> c2 .inotify_fd , EVENT_READ , (void * )file_shift );
24062402 }
24072403#endif
24082404
@@ -2416,7 +2412,7 @@ io_wait_dowork(struct context *c, const unsigned int flags)
24162412 * (6) timeout (tv) expired
24172413 */
24182414
2419- c -> c2 . event_set_status = ES_ERROR ;
2415+ * event_set_status = ES_ERROR ;
24202416
24212417 if (!c -> sig -> signal_received )
24222418 {
@@ -2434,14 +2430,14 @@ io_wait_dowork(struct context *c, const unsigned int flags)
24342430 /*
24352431 * Wait for something to happen.
24362432 */
2437- status = event_wait (c -> c2 . event_set , & c -> c2 .timeval , esr , SIZE (esr ));
2433+ status = event_wait (event_set , & c -> c2 .timeval , esr , SIZE (esr ));
24382434
24392435 check_status (status , "event_wait" , NULL , NULL );
24402436
24412437 if (status > 0 )
24422438 {
24432439 int i ;
2444- c -> c2 . event_set_status = 0 ;
2440+ * event_set_status = 0 ;
24452441 for (i = 0 ; i < status ; ++ i )
24462442 {
24472443 const struct event_set_return * e = & esr [i ];
@@ -2452,7 +2448,7 @@ io_wait_dowork(struct context *c, const unsigned int flags)
24522448 struct event_arg * ev_arg = (struct event_arg * )e -> arg ;
24532449 if (ev_arg -> type != EVENT_ARG_LINK_SOCKET )
24542450 {
2455- c -> c2 . event_set_status = ES_ERROR ;
2451+ * event_set_status = ES_ERROR ;
24562452 msg (D_LINK_ERRORS , "io_work: non socket event delivered" );
24572453 return ;
24582454 }
@@ -2464,30 +2460,30 @@ io_wait_dowork(struct context *c, const unsigned int flags)
24642460 shift = (uintptr_t )e -> arg ;
24652461 }
24662462
2467- c -> c2 . event_set_status |= ((e -> rwflags & 3 ) << shift );
2463+ * event_set_status |= ((e -> rwflags & 3 ) << shift );
24682464 }
24692465 }
24702466 else if (status == 0 )
24712467 {
2472- c -> c2 . event_set_status = ES_TIMEOUT ;
2468+ * event_set_status = ES_TIMEOUT ;
24732469 }
24742470 }
24752471 else
24762472 {
2477- c -> c2 . event_set_status = SOCKET_READ ;
2473+ * event_set_status = SOCKET_READ ;
24782474 }
24792475 }
24802476
24812477 /* 'now' should always be a reasonably up-to-date timestamp */
24822478 update_time ();
24832479
24842480 /* set signal_received if a signal was received */
2485- if (c -> c2 . event_set_status & ES_ERROR )
2481+ if (* event_set_status & ES_ERROR )
24862482 {
24872483 get_signal (& c -> sig -> signal_received );
24882484 }
24892485
2490- dmsg (D_EVENT_WAIT , "I/O WAIT status=0x%04x" , c -> c2 . event_set_status );
2486+ dmsg (D_EVENT_WAIT , "I/O WAIT status=0x%04x" , * event_set_status );
24912487}
24922488
24932489void threaded_fwd_inp_intf (struct context * c , struct link_socket * sock , struct thread_pointer * b )
@@ -2507,7 +2503,7 @@ void threaded_fwd_inp_intf(struct context *c, struct link_socket *sock, struct t
25072503}
25082504
25092505void
2510- process_io (struct context * c , struct link_socket * sock , struct thread_pointer * b )
2506+ process_io (struct context * c , struct link_socket * sock , struct thread_pointer * b , int t )
25112507{
25122508 const unsigned int status = c -> c2 .event_set_status ;
25132509
@@ -2520,17 +2516,17 @@ process_io(struct context *c, struct link_socket *sock, struct thread_pointer *b
25202516#endif
25212517
25222518 /* TCP/UDP port ready to accept write */
2523- if (status & SOCKET_WRITE )
2519+ if (( status & SOCKET_WRITE ) && (( t & THREAD_RTWL ) != 0 ) )
25242520 {
25252521 process_outgoing_link (c , sock );
25262522 }
25272523 /* TUN device ready to accept write */
2528- else if (status & TUN_WRITE )
2524+ else if (( status & TUN_WRITE ) && (( t & THREAD_RLWT ) != 0 ) )
25292525 {
25302526 process_outgoing_tun (c , sock );
25312527 }
25322528 /* Incoming data on TCP/UDP port */
2533- else if (status & SOCKET_READ )
2529+ else if (( status & SOCKET_READ ) && (( t & THREAD_RLWT ) != 0 ) )
25342530 {
25352531 read_incoming_link (c , sock );
25362532 if (!IS_SIG (c ))
@@ -2539,15 +2535,32 @@ process_io(struct context *c, struct link_socket *sock, struct thread_pointer *b
25392535 }
25402536 }
25412537 /* Incoming data on TUN device */
2542- else if (status & TUN_READ )
2538+ else if (( status & TUN_READ ) && (( t & THREAD_RTWL ) != 0 ) )
25432539 {
25442540 threaded_fwd_inp_intf (c , sock , b );
25452541 }
2546- else if (status & DCO_READ )
2542+ else if (( status & DCO_READ ) && (( t & THREAD_RTWL ) != 0 ) )
25472543 {
25482544 if (!IS_SIG (c ))
25492545 {
25502546 process_incoming_dco (c );
25512547 }
25522548 }
25532549}
2550+
2551+ void * threaded_process_io (void * a )
2552+ {
2553+ struct dual_args * d = (struct dual_args * )a ;
2554+ struct thread_pointer * b = d -> b ;
2555+ int t = d -> t ;
2556+ while (true)
2557+ {
2558+ if (b -> p -> z != 1 ) { break ; }
2559+ pthread_mutex_lock (& (d -> i ));
2560+ if (b -> p -> z != 1 ) { break ; }
2561+ struct context * c = d -> c ;
2562+ process_io (c , c -> c2 .link_sockets [0 ], b , t );
2563+ pthread_mutex_unlock (& (d -> o ));
2564+ }
2565+ return NULL ;
2566+ }
0 commit comments