Skip to content

Commit e17f8a8

Browse files
aggregate data before write
1 parent 61a0c13 commit e17f8a8

File tree

3 files changed

+35
-6
lines changed

3 files changed

+35
-6
lines changed

vmetrics/message.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"reflect"
8+
"strconv"
89
"time"
910
)
1011

@@ -20,6 +21,10 @@ func NewMessage() *Message {
2021

2122
func toMap(intf interface{}) (map[string]interface{}, error) {
2223
var m map[string]interface{}
24+
kind := reflect.ValueOf(intf).Kind()
25+
if kind != reflect.Struct && kind != reflect.Ptr {
26+
return m, errors.New("message should be type of struct: " + reflect.ValueOf(intf).Kind().String())
27+
}
2328
b, err := json.Marshal(&intf)
2429
if err != nil {
2530
return m, err
@@ -29,10 +34,6 @@ func toMap(intf interface{}) (map[string]interface{}, error) {
2934
}
3035

3136
func (m *Message) jsonString(inf interface{}) (string, error) {
32-
kind := reflect.ValueOf(inf).Kind()
33-
if kind != reflect.Struct && kind != reflect.Ptr {
34-
return "", errors.New("message should be type of struct: " + reflect.ValueOf(inf).Kind().String())
35-
}
3637
infMap, err := toMap(inf)
3738
if err != nil {
3839
return "", errors.New("message should be type of struct: " + reflect.ValueOf(inf).Kind().String())
@@ -62,6 +63,32 @@ func (m *Message) Consume() []string {
6263
return messages
6364
}
6465

66+
func (m *Message) Aggregated() []string {
67+
var messages []string
68+
var uniqueMap = make(map[string]int64)
69+
for _, msg := range m.messages {
70+
infMap, err := toMap(msg)
71+
if err != nil {
72+
fmt.Println(err)
73+
} else {
74+
var kstr = ""
75+
for k := range infMap {
76+
kstr += k + "_"
77+
}
78+
if val, ok := uniqueMap[kstr]; ok {
79+
uniqueMap[kstr] = val + 1
80+
} else {
81+
uniqueMap[kstr] = 1
82+
}
83+
}
84+
}
85+
for msg := range uniqueMap {
86+
count := strconv.Itoa(int(uniqueMap[msg]))
87+
messages = append(messages, "{ \"key\": \""+msg+"\" \"count\": "+count+"}")
88+
}
89+
return messages
90+
}
91+
6592
func (m *Message) Clear() {
6693
m.messages = nil
6794
m.messages = make([]interface{}, 0)

vmetrics/metrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@ package vmetrics
33
type Metric interface {
44
Record(inf interface{})
55
Consume() []string
6+
Aggregated() []string
67
Clear()
78
}

vmetrics/registry.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,9 @@ func (r *Registry) Start() {
8181
go func() {
8282
for {
8383
for _, metric := range r.Metrics {
84-
messages := metric.Consume()
85-
go r.writeToKafka(messages)
84+
//messages := metric.Consume()
85+
agMessages := metric.Aggregated()
86+
go r.writeToKafka(agMessages)
8687
metric.Clear()
8788
}
8889
time.Sleep(r.Config.Cycle)

0 commit comments

Comments
 (0)