@@ -23,10 +23,9 @@ import (
2323 "sync"
2424 "time"
2525
26- "github.com/gofrs/uuid/v5"
27-
2826 "github.com/elastic/elastic-agent-libs/monitoring"
2927 "github.com/elastic/go-sysinfo"
28+ "github.com/elastic/go-sysinfo/types"
3029
3130 "github.com/elastic/beats/v7/libbeat/beat"
3231 "github.com/elastic/beats/v7/libbeat/features"
@@ -57,16 +56,28 @@ type metrics struct {
5756 FQDNLookupFailed * monitoring.Int
5857}
5958
59+ // Interfaces to make mocking getting the hostname easier
60+ type hostInfo interface {
61+ Info () types.HostInfo
62+ FQDNWithContext (context.Context ) (string , error )
63+ }
64+
65+ type hostInfoFactory func () (hostInfo , error )
66+
67+ type hostMetadataCache struct {
68+ sync.Mutex
69+ lastUpdate time.Time
70+ data mapstr.Pointer
71+ }
72+
6073type addHostMetadata struct {
61- lastUpdate struct {
62- time.Time
63- sync.Mutex
64- }
65- data mapstr.Pointer
66- geoData mapstr.M
67- config Config
68- logger * logp.Logger
69- metrics metrics
74+ // One cache for standard hostname, one for FQDN
75+ caches [2 ]hostMetadataCache
76+ geoData mapstr.M
77+ config Config
78+ logger * logp.Logger
79+ metrics metrics
80+ hostInfoFactory hostInfoFactory
7081}
7182
7283// New constructs a new add_host_metadata processor.
@@ -77,14 +88,19 @@ func New(cfg *config.C, log *logp.Logger) (beat.Processor, error) {
7788 }
7889
7990 p := & addHostMetadata {
91+ caches : [2 ]hostMetadataCache {
92+ {data : mapstr .NewPointer (nil )},
93+ {data : mapstr .NewPointer (nil )},
94+ },
8095 config : c ,
81- data : mapstr .NewPointer (nil ),
8296 logger : log .Named (logName ),
8397 metrics : metrics {
8498 FQDNLookupFailed : monitoring .NewInt (reg , "fqdn_lookup_failed" ),
8599 },
100+ hostInfoFactory : func () (hostInfo , error ) { return sysinfo .Host () },
86101 }
87- if err := p .loadData (true , features .FQDN ()); err != nil {
102+ // Fetch and cache the initial host data.
103+ if _ , err := p .loadData (features .FQDN ()); err != nil {
88104 return nil , fmt .Errorf ("failed to load data: %w" , err )
89105 }
90106
@@ -96,28 +112,6 @@ func New(cfg *config.C, log *logp.Logger) (beat.Processor, error) {
96112 p .geoData = mapstr.M {"host" : mapstr.M {"geo" : geoFields }}
97113 }
98114
99- // create a unique ID for this instance of the processor
100- var cbIDStr string
101- cbID , err := uuid .NewV4 ()
102- // if we fail, fall back to the processor name, hope for the best.
103- if err != nil {
104- p .logger .Errorf ("error generating ID for FQDN callback, reverting to processor name: %v" , err )
105- cbIDStr = processorName
106- } else {
107- cbIDStr = cbID .String ()
108- }
109-
110- // this is safe as New() returns a pointer, not the actual object.
111- // This matters as other pieces of code in libbeat, like libbeat/processors/processor.go,
112- // will do weird stuff like copy the entire list of global processors.
113- err = features .AddFQDNOnChangeCallback (p .handleFQDNReportingChange , cbIDStr )
114- if err != nil {
115- return nil , fmt .Errorf (
116- "could not register callback for FQDN reporting onChange from %s processor: %w" ,
117- processorName , err ,
118- )
119- }
120-
121115 return p , nil
122116}
123117
@@ -128,12 +122,15 @@ func (p *addHostMetadata) Run(event *beat.Event) (*beat.Event, error) {
128122 return event , nil
129123 }
130124
131- err := p .loadData (true , features .FQDN ())
125+ data , err := p .loadData (features .FQDN ())
132126 if err != nil {
133127 return nil , fmt .Errorf ("error loading data during event update: %w" , err )
134128 }
135129
136- event .Fields .DeepUpdate (p .data .Get ().Clone ())
130+ // Superficially this clone seems unnecessary, but it seems to have been
131+ // applied as a fix a long time ago -- possibly there can be later processors
132+ // or changes to an event that would affect the cached data?
133+ event .Fields .DeepUpdate (data .Clone ())
137134
138135 if len (p .geoData ) > 0 {
139136 event .Fields .DeepUpdate (p .geoData )
@@ -150,34 +147,50 @@ func (p *addHostMetadata) Run(event *beat.Event) (*beat.Event, error) {
150147// return nil
151148//}
152149
153- func (p * addHostMetadata ) expired () bool {
154-
155- p .lastUpdate .Lock ()
156- defer p .lastUpdate .Unlock ()
157-
158- if p .config .CacheTTL <= 0 {
159- return true
150+ func (p * addHostMetadata ) cacheForFQDN (useFQDN bool ) * hostMetadataCache {
151+ if useFQDN {
152+ return & p .caches [1 ]
160153 }
154+ return & p .caches [0 ]
155+ }
161156
162- if p .lastUpdate .Add (p .config .CacheTTL ).After (time .Now ()) {
163- return false
157+ func timestampExpired (timestamp time.Time , ttl time.Duration ) bool {
158+ if ttl <= 0 {
159+ return true
164160 }
165- p .lastUpdate .Time = time .Now ()
166- return true
161+ return timestamp .Add (ttl ).Before (time .Now ())
167162}
168163
169164// loadData update's the processor's associated host metadata
170- func (p * addHostMetadata ) loadData (checkCache bool , useFQDN bool ) error {
171- if checkCache && ! p .expired () {
172- return nil
165+ func (p * addHostMetadata ) loadData (useFQDN bool ) (mapstr.M , error ) {
166+ cache := p .cacheForFQDN (useFQDN )
167+ cache .Lock ()
168+ defer cache .Unlock ()
169+
170+ data := cache .data .Get ()
171+ var err error
172+ if data == nil || timestampExpired (cache .lastUpdate , p .config .CacheTTL ) {
173+ // Data is absent or expired, refresh it.
174+ data , err = p .fetchData (useFQDN )
175+ if err == nil {
176+ cache .data .Set (data )
177+ }
178+ // Backwards compatibility (for now): cache timestamp is updated even if
179+ // the update fails (falls back on the last successful update, and avoids
180+ // blocking the pipeline when there are issues with the hostname).
181+ cache .lastUpdate = time .Now ()
173182 }
183+ return data , err
184+ }
174185
175- h , err := sysinfo .Host ()
186+ func (p * addHostMetadata ) fetchData (useFQDN bool ) (mapstr.M , error ) {
187+ h , err := p .hostInfoFactory ()
176188 if err != nil {
177- return fmt .Errorf ("error collecting host info: %w" , err )
189+ return nil , fmt .Errorf ("error collecting host info: %w" , err )
178190 }
179191
180- hostname := h .Info ().Hostname
192+ hInfo := h .Info ()
193+ hostname := hInfo .Hostname
181194 if useFQDN {
182195 ctx , cancel := context .WithTimeout (context .Background (), 1 * time .Minute )
183196 defer cancel ()
@@ -198,7 +211,7 @@ func (p *addHostMetadata) loadData(checkCache bool, useFQDN bool) error {
198211 }
199212 }
200213
201- data := host .MapHostInfo (h . Info () , hostname )
214+ data := host .MapHostInfo (hInfo , hostname )
202215 if p .config .NetInfoEnabled {
203216 // IP-address and MAC-address
204217 var ipList , hwList , err = util .GetNetInfo ()
@@ -208,85 +221,30 @@ func (p *addHostMetadata) loadData(checkCache bool, useFQDN bool) error {
208221
209222 if len (ipList ) > 0 {
210223 if _ , err := data .Put ("host.ip" , ipList ); err != nil {
211- return fmt .Errorf ("could not set host.ip: %w" , err )
224+ return nil , fmt .Errorf ("could not set host.ip: %w" , err )
212225 }
213226 }
214227 if len (hwList ) > 0 {
215228 if _ , err := data .Put ("host.mac" , hwList ); err != nil {
216- return fmt .Errorf ("could not set host.mac: %w" , err )
229+ return nil , fmt .Errorf ("could not set host.mac: %w" , err )
217230 }
218231 }
219232 }
220233
221234 if p .config .Name != "" {
222235 if _ , err := data .Put ("host.name" , p .config .Name ); err != nil {
223- return fmt .Errorf ("could not set host.name: %w" , err )
236+ return nil , fmt .Errorf ("could not set host.name: %w" , err )
224237 }
225238 }
226239
227- p .data .Set (data )
228- return nil
240+ return data , nil
229241}
230242
231243func (p * addHostMetadata ) String () string {
232244 return fmt .Sprintf ("%v=[netinfo.enabled=[%v], cache.ttl=[%v]]" ,
233245 processorName , p .config .NetInfoEnabled , p .config .CacheTTL )
234246}
235247
236- func (p * addHostMetadata ) handleFQDNReportingChange (new , old bool ) {
237- if new == old {
238- // Nothing to do
239- return
240- }
241-
242- // update the data for the processor
243- p .updateOrExpire (new )
244- }
245-
246- // updateOrExpire will attempt to update the data for the processor, or expire the cache
247- // if the config update fails, or times out
248- func (p * addHostMetadata ) updateOrExpire (useFQDN bool ) {
249- if p .config .CacheTTL <= 0 {
250- return
251- }
252-
253- p .lastUpdate .Lock ()
254- defer p .lastUpdate .Unlock ()
255-
256- // while holding the mutex, attempt to update loadData()
257- // doing this with the mutex means other events must wait until we have the correct host data, as we assume that
258- // a call to this function means something else wants to force an update, and thus all events must sync.
259-
260- updateChanSuccess := make (chan bool )
261- timeout := time .After (p .config .ExpireUpdateTimeout )
262- go func () {
263- err := p .loadData (false , useFQDN )
264- if err != nil {
265- p .logger .Errorf ("error updating data for processor: %v" , err )
266- updateChanSuccess <- false
267- return
268- }
269- updateChanSuccess <- true
270- }()
271-
272- // this additional timeout check is paranoid, but when it's method is called from handleFQDNReportingChange(),
273- // it's blocking, which means we can hold a mutex in features. In addition, we don't want to break the processor by
274- // having all the events wait for too long.
275- select {
276- case <- timeout :
277- p .logger .Errorf ("got timeout while trying to update metadata" )
278- p .lastUpdate .Time = time.Time {}
279- case success := <- updateChanSuccess :
280- // only expire the cache if update was failed
281- if ! success {
282- p .lastUpdate .Time = time.Time {}
283- } else {
284- p .lastUpdate .Time = time .Now ()
285- }
286- }
287-
288- }
289-
290248func skipAddingHostMetadata (event * beat.Event ) bool {
291249 // If host fields exist(besides host.name added by libbeat) in event, skip add_host_metadata.
292250 hostFields , err := event .Fields .GetValue ("host" )
0 commit comments