@@ -11381,22 +11381,6 @@ func (l *fastProdLogger) Debugf(format string, args ...any) {
1138111381	}
1138211382}
1138311383
11384- type  slowWriteConn  struct  {
11385- 	sync.RWMutex 
11386- 	net.Conn 
11387- 	delay  bool 
11388- }
11389- 
11390- func  (c  * slowWriteConn ) Write (b  []byte ) (int , error ) {
11391- 	c .RLock ()
11392- 	delay  :=  c .delay 
11393- 	c .RUnlock ()
11394- 	if  delay  {
11395- 		time .Sleep (100  *  time .Millisecond )
11396- 	}
11397- 	return  c .Conn .Write (b )
11398- }
11399- 
1140011384func  TestNoRaceNoFastProducerStall (t  * testing.T ) {
1140111385	tmpl  :=  ` 
1140211386		listen: "127.0.0.1:-1" 
@@ -11409,173 +11393,86 @@ func TestNoRaceNoFastProducerStall(t *testing.T) {
1140911393	l  :=  & fastProdLogger {gotIt : make (chan  struct {}, 1 )}
1141011394	s .SetLogger (l , true , false )
1141111395
11412- 	ncSlow  :=  natsConnect (t , s .ClientURL (),  nats . ReconnectWait ( 10 * time . Millisecond ) )
11396+ 	ncSlow  :=  natsConnect (t , s .ClientURL ())
1141311397	defer  ncSlow .Close ()
1141411398	natsSub (t , ncSlow , "foo" , func (_  * nats.Msg ) {})
1141511399	natsFlush (t , ncSlow )
1141611400
11401+ 	nc  :=  natsConnect (t , s .ClientURL ())
11402+ 	defer  nc .Close ()
11403+ 	traceSub  :=  natsSubSync (t , nc , "my.trace.subj" )
11404+ 	natsFlush (t , nc )
11405+ 
11406+ 	ncProd  :=  natsConnect (t , s .ClientURL ())
11407+ 	defer  ncProd .Close ()
11408+ 
11409+ 	payload  :=  make ([]byte , 256 )
11410+ 
1141711411	cid , err  :=  ncSlow .GetClientID ()
1141811412	require_NoError (t , err )
1141911413	c  :=  s .GetClient (cid )
1142011414	require_True (t , c  !=  nil )
1142111415
11422- 	c .mu .Lock ()
11423- 	swc  :=  & slowWriteConn {Conn : c .nc , delay : true }
11424- 	c .nc  =  swc 
11425- 	c .mu .Unlock ()
11416+ 	wg  :=  sync.WaitGroup {}
11417+ 	pub  :=  func () {
11418+ 		wg .Add (1 )
11419+ 		go  func () {
11420+ 			defer  wg .Done ()
1142611421
11427- 	defer  func () {
11428- 		swc .Lock ()
11429- 		swc .delay  =  false 
11430- 		swc .Unlock ()
11431- 	}()
11422+ 			msg  :=  nats .NewMsg ("foo" )
11423+ 			msg .Header .Set (MsgTraceDest , traceSub .Subject )
11424+ 			msg .Data  =  payload 
11425+ 			ncProd .PublishMsg (msg )
11426+ 		}()
11427+ 	}
1143211428
11433- 	// The producer could still overwhelm the "fast" consumer. 
11434- 	// So we will send more than what it will use as a target 
11435- 	// to consider the test done. 
11436- 	total  :=  2_000_000 
11437- 	target   :=   total   /   2 
11438- 	ch  :=  make ( chan   struct {},  1 )
11439- 	var   count   int 
11440- 
11441- 	ncFast   :=   natsConnect (t , s . ClientURL (),  nats . ReconnectWait ( 10 * time . Millisecond ) )
11442- 	defer   ncFast . Close () 
11443- 	fastSub   :=   natsSub (t , ncFast ,  "foo" ,  func ( _   * nats. Msg ) { 
11444- 		if   count ++ ;  count   ==   target  {
11445- 			ch   <-   struct {}{} 
11429+ 	checkTraceMsg   :=   func ( err   string ) { 
11430+ 		 t . Helper () 
11431+ 		 var   e   MsgTraceEvent 
11432+ 		 traceMsg  :=  natsNexMsg ( t ,  traceSub ,  time . Second ) 
11433+ 		 json . Unmarshal ( traceMsg . Data ,  & e ) 
11434+ 		 egresses  :=  e . Egresses ( )
11435+ 		 require_Equal ( t ,  len ( egresses ),  1 ) 
11436+ 		 eg   :=   egresses [ 0 ] 
11437+ 		 require_Equal (t , eg . CID ,  cid )
11438+ 		 if   err   !=   _EMPTY_  { 
11439+ 			 require_Contains (t , eg . Error ,  err ) 
11440+ 		}  else  {
11441+ 			require_Equal ( t ,  eg . Error ,  _EMPTY_ ) 
1144611442		}
11447- 	})
11448- 	err  =  fastSub .SetPendingLimits (- 1 , - 1 )
11449- 	require_NoError (t , err )
11450- 	natsFlush (t , ncFast )
11451- 
11452- 	ncProd  :=  natsConnect (t , s .ClientURL (), nats .ReconnectWait (10 * time .Millisecond ))
11453- 	defer  ncProd .Close ()
11454- 
11455- 	cid , err  =  ncProd .GetClientID ()
11456- 	require_NoError (t , err )
11457- 	pc  :=  s .GetClient (cid )
11458- 	pc .mu .Lock ()
11459- 	pcnc  :=  pc .nc 
11460- 	pc .mu .Unlock ()
11443+ 	}
1146111444
11462- 	payload  :=  make ([]byte , 256 )
11445+ 	// Artificially set a stall channel. 
11446+ 	c .mu .Lock ()
11447+ 	c .out .stc  =  make (chan  struct {})
11448+ 	c .mu .Unlock ()
1146311449
11464- 	for  i  :=  0 ; i  <  total ; i ++  {
11465- 		natsPub (t , ncProd , "foo" , payload )
11466- 	}
11467- 	select  {
11468- 	case  <- ch :
11469- 		// OK 
11470- 	case  <- time .After (10  *  time .Second ):
11471- 		t .Fatal ("Test timed-out" )
11472- 	}
11473- 	// Now wait a bit and make sure we did not get any fast producer debug statements. 
11450+ 	// Publish a message, it should not stall the producer. 
11451+ 	pub ()
11452+ 	// Now  make sure we did not get any fast producer debug statements. 
1147411453	select  {
1147511454	case  <- l .gotIt :
1147611455		t .Fatal ("Got debug logs about fast producer" )
11477- 	case  <- time .After (time .Second ):
11478- 		// OK 
11456+ 	case  <- time .After (250   *   time .Millisecond ):
11457+ 		// OK!  
1147911458	}
11459+ 	wg .Wait ()
11460+ 
11461+ 	checkTraceMsg (errMsgTraceFastProdNoStall )
1148011462
11481- 	// We don't need that one anymore 
11482- 	ncFast .Close ()
1148311463	// Now we will conf reload to enable fast producer stalling. 
1148411464	reloadUpdateConfig (t , s , conf , fmt .Sprintf (tmpl , "false" ))
1148511465
11486- 	// Since the producer can block, we will publish from a different routine, 
11487- 	// and check for the debug trace from the main. 
11488- 	wg  :=  sync.WaitGroup {}
11489- 	wg .Add (1 )
11490- 	doneCh  :=  make (chan  struct {})
11491- 	go  func () {
11492- 		defer  wg .Done ()
11493- 		for  i  :=  0 ; ; i ++  {
11494- 			ncProd .Publish ("foo" , payload )
11495- 			select  {
11496- 			case  <- doneCh :
11497- 				return 
11498- 			default :
11499- 			}
11500- 			if  i % 1000  ==  0  {
11501- 				time .Sleep (time .Millisecond )
11502- 			}
11503- 		}
11504- 	}()
11466+ 	// Publish, this time the prod should be stalled. 
11467+ 	pub ()
1150511468	select  {
1150611469	case  <- l .gotIt :
11507- 		pcnc .Close ()
11508- 		close (doneCh )
11509- 	case  <- time .After (20  *  time .Second ):
11470+ 		// OK! 
11471+ 	case  <- time .After (500  *  time .Millisecond ):
1151011472		t .Fatal ("Timed-out waiting for a warning" )
1151111473	}
1151211474	wg .Wait ()
11513- }
11514- 
11515- func  TestNoRaceNoFastProducerStallAndMsgTrace (t  * testing.T ) {
11516- 	o  :=  DefaultOptions ()
11517- 	o .NoFastProducerStall  =  true 
11518- 	o .WriteDeadline  =  2  *  time .Second 
11519- 	s  :=  RunServer (o )
11520- 	defer  s .Shutdown ()
11521- 
11522- 	ncSlow  :=  natsConnect (t , s .ClientURL (), nats .ReconnectWait (10 * time .Millisecond ))
11523- 	defer  ncSlow .Close ()
11524- 	natsSub (t , ncSlow , "foo" , func (_  * nats.Msg ) {})
11525- 	natsFlush (t , ncSlow )
1152611475
11527- 	cid , err  :=  ncSlow .GetClientID ()
11528- 	require_NoError (t , err )
11529- 	c  :=  s .GetClient (cid )
11530- 	require_True (t , c  !=  nil )
11531- 
11532- 	c .mu .Lock ()
11533- 	swc  :=  & slowWriteConn {Conn : c .nc , delay : true }
11534- 	c .nc  =  swc 
11535- 	c .mu .Unlock ()
11536- 
11537- 	defer  func () {
11538- 		swc .Lock ()
11539- 		swc .delay  =  false 
11540- 		swc .Unlock ()
11541- 	}()
11542- 
11543- 	ncProd  :=  natsConnect (t , s .ClientURL (), nats .ReconnectWait (10 * time .Millisecond ))
11544- 	defer  ncProd .Close ()
11545- 
11546- 	payload  :=  make ([]byte , 256 )
11547- 
11548- 	nc  :=  natsConnect (t , s .ClientURL (), nats .ReconnectWait (10 * time .Millisecond ))
11549- 	defer  nc .Close ()
11550- 	doneCh  :=  make (chan  struct {})
11551- 	traceSub  :=  natsSub (t , nc , "my.trace.subj" , func (traceMsg  * nats.Msg ) {
11552- 		var  e  MsgTraceEvent 
11553- 		json .Unmarshal (traceMsg .Data , & e )
11554- 		egresses  :=  e .Egresses ()
11555- 		if  len (egresses ) ==  1  {
11556- 			if  err  :=  egresses [0 ].Error ; err  ==  errMsgTraceFastProdNoStall  {
11557- 				close (doneCh )
11558- 				traceMsg .Sub .Unsubscribe ()
11559- 			}
11560- 		}
11561- 	})
11562- 	err  =  traceSub .SetPendingLimits (- 1 , - 1 )
11563- 	require_NoError (t , err )
11564- 	natsFlush (t , nc )
11565- 
11566- 	msg  :=  nats .NewMsg ("foo" )
11567- 	msg .Header .Set (MsgTraceDest , traceSub .Subject )
11568- 	msg .Data  =  payload 
11569- 	for  i , done  :=  0 , false ; ! done ; i ++  {
11570- 		ncProd .PublishMsg (msg )
11571- 		select  {
11572- 		case  <- doneCh :
11573- 			// OK 
11574- 			return 
11575- 		default :
11576- 		}
11577- 		if  i % 1000  ==  0  {
11578- 			time .Sleep (time .Millisecond )
11579- 		}
11580- 	}
11476+ 	// Should have been delivered to the trace subscription. 
11477+ 	checkTraceMsg (_EMPTY_ )
1158111478}
0 commit comments