Skip to content

Commit 48d3608

Browse files
authored
Merge pull request #827 from Okabe-Rintarou-0/feat/dump_bpf
Support ads bpf map lookup all
2 parents aeba81e + e3c37f8 commit 48d3608

File tree

16 files changed

+453
-70
lines changed

16 files changed

+453
-70
lines changed

bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.c

Lines changed: 131 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,15 @@
1919
#include "deserialization_to_bpf_map.h"
2020
#include "../../config/kmesh_marcos_def.h"
2121

22-
#define LOG_ERR(fmt, args...) printf(fmt, ##args)
23-
#define LOG_WARN(fmt, args...) printf(fmt, ##args)
24-
#define LOG_INFO(fmt, args...) printf(fmt, ##args)
22+
#define PRINTF(fmt, args...) \
23+
do { \
24+
printf(fmt, ##args); \
25+
fflush(stdout); \
26+
} while (0)
27+
28+
#define LOG_ERR(fmt, args...) PRINTF(fmt, ##args)
29+
#define LOG_WARN(fmt, args...) PRINTF(fmt, ##args)
30+
#define LOG_INFO(fmt, args...) PRINTF(fmt, ##args)
2531

2632
struct op_context {
2733
void *key;
@@ -37,10 +43,10 @@ struct op_context {
3743
const ProtobufCMessageDescriptor *desc;
3844
};
3945

40-
#define init_op_context(context, key, val, desc, o_fd, fd, o_info, i_info, m_info) \
46+
#define init_op_context(context, k, v, desc, o_fd, fd, o_info, i_info, m_info) \
4147
do { \
42-
(context).key = (key); \
43-
(context).value = (val); \
48+
(context).key = (k); \
49+
(context).value = (v); \
4450
(context).desc = (desc); \
4551
(context).outter_fd = (o_fd); \
4652
(context).map_fd = (fd); \
@@ -51,6 +57,16 @@ struct op_context {
5157
(context).curr_fd = (fd); \
5258
} while (0)
5359

60+
#define append_new_node(elem_list_head, curr_elem_list_node, new_node) \
61+
do { \
62+
if (curr_elem_list_node == NULL) { \
63+
curr_elem_list_node = elem_list_head = new_node; \
64+
} else { \
65+
curr_elem_list_node->next = new_node; \
66+
curr_elem_list_node = new_node; \
67+
} \
68+
} while (0)
69+
5470
#define TASK_SIZE (100)
5571
struct inner_map_stat {
5672
int map_fd;
@@ -771,6 +787,55 @@ static int repeat_field_query(struct op_context *ctx, const ProtobufCFieldDescri
771787
return ret;
772788
}
773789

790+
void deserial_free_elem_list(struct element_list_node *head)
791+
{
792+
while (head != NULL) {
793+
struct element_list_node *n = head;
794+
deserial_free_elem(n->elem);
795+
head = n->next;
796+
free(n);
797+
}
798+
}
799+
800+
static void *create_struct_list(struct op_context *ctx, int *err)
801+
{
802+
void *prev_key = NULL;
803+
void *value;
804+
struct element_list_node *elem_list_head = NULL;
805+
struct element_list_node *curr_elem_list_node = NULL;
806+
807+
*err = 0;
808+
ctx->key = calloc(1, ctx->curr_info->key_size);
809+
while (!bpf_map_get_next_key(ctx->curr_fd, prev_key, ctx->key)) {
810+
prev_key = ctx->key;
811+
812+
value = create_struct(ctx, err);
813+
if (*err) {
814+
LOG_ERR("create_struct failed, err = %d\n", err);
815+
break;
816+
}
817+
818+
if (value == NULL) {
819+
continue;
820+
}
821+
822+
struct element_list_node *new_node = (struct element_list_node *)calloc(1, sizeof(struct element_list_node));
823+
if (!new_node) {
824+
*err = -1;
825+
break;
826+
}
827+
828+
new_node->elem = value;
829+
new_node->next = NULL;
830+
append_new_node(elem_list_head, curr_elem_list_node, new_node);
831+
}
832+
if (*err) {
833+
deserial_free_elem_list(elem_list_head);
834+
return NULL;
835+
}
836+
return elem_list_head;
837+
}
838+
774839
static void *create_struct(struct op_context *ctx, int *err)
775840
{
776841
void *value;
@@ -814,13 +879,71 @@ static void *create_struct(struct op_context *ctx, int *err)
814879
if (ret) {
815880
LOG_INFO("field[%d] query fail\n", i);
816881
*err = 1;
817-
return value;
882+
break;
818883
}
819884
}
820885

886+
if (*err) {
887+
deserial_free_elem(value);
888+
return NULL;
889+
}
890+
821891
return value;
822892
}
823893

894+
struct element_list_node *deserial_lookup_all_elems(const void *msg_desciptor)
895+
{
896+
int ret, err;
897+
struct element_list_node *value_list_head = NULL;
898+
const char *map_name = NULL;
899+
struct op_context context = {.inner_map_object = NULL};
900+
const ProtobufCMessageDescriptor *desc;
901+
struct bpf_map_info outter_info = {0}, inner_info = {0}, info = {0};
902+
int map_fd, outter_fd = 0, inner_fd = 0;
903+
unsigned int id, outter_id = 0, inner_id = 0;
904+
905+
if (msg_desciptor == NULL)
906+
return NULL;
907+
908+
desc = (ProtobufCMessageDescriptor *)msg_desciptor;
909+
if (desc->magic != PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC)
910+
return NULL;
911+
912+
map_name = desc->short_name;
913+
ret = get_map_ids(map_name, &id, &outter_id, &inner_id);
914+
if (ret)
915+
return NULL;
916+
917+
ret = get_map_fd_info(id, &map_fd, &info);
918+
if (ret < 0) {
919+
LOG_ERR("invalid MAP_ID: %d\n", id);
920+
return NULL;
921+
}
922+
923+
ret = get_map_fd_info(inner_id, &inner_fd, &inner_info);
924+
ret |= get_map_fd_info(outter_id, &outter_fd, &outter_info);
925+
if (ret < 0 || map_info_check(&outter_info, &inner_info))
926+
goto end;
927+
928+
init_op_context(context, NULL, NULL, desc, outter_fd, map_fd, &outter_info, &inner_info, &info);
929+
930+
value_list_head = create_struct_list(&context, &err);
931+
if (err != 0) {
932+
LOG_ERR("create_struct_list failed, err = %d", err);
933+
}
934+
935+
end:
936+
if (context.key != NULL)
937+
free(context.key);
938+
if (map_fd > 0)
939+
close(map_fd);
940+
if (outter_fd > 0)
941+
close(outter_fd);
942+
if (inner_fd > 0)
943+
close(inner_fd);
944+
return value_list_head;
945+
}
946+
824947
void *deserial_lookup_elem(void *key, const void *msg_desciptor)
825948
{
826949
int ret, err;
@@ -860,8 +983,7 @@ void *deserial_lookup_elem(void *key, const void *msg_desciptor)
860983
normalize_key(&context, key, map_name);
861984
value = create_struct(&context, &err);
862985
if (err != 0) {
863-
deserial_free_elem(value);
864-
value = NULL;
986+
LOG_ERR("create_struct failed, err = %d\n", err);
865987
}
866988

867989
end:

bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,16 @@
77
/* equal MAP_SIZE_OF_OUTTER_MAP */
88
#define MAX_OUTTER_MAP_ENTRIES (8192)
99

10+
struct element_list_node {
11+
void *elem;
12+
struct element_list_node *next;
13+
};
14+
1015
int deserial_update_elem(void *key, void *value);
1116
void *deserial_lookup_elem(void *key, const void *msg_desciptor);
17+
struct element_list_node *deserial_lookup_all_elems(const void *msg_desciptor);
1218
void deserial_free_elem(void *value);
19+
void deserial_free_elem_list(struct element_list_node *head);
1320
int deserial_delete_elem(void *key, const void *msg_desciptor);
1421

1522
int deserial_init();

pkg/bpf/bpf_kmesh.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,3 +441,7 @@ func (sc *BpfKmesh) Detach() error {
441441
}
442442
return nil
443443
}
444+
445+
func AdsL7Enabled() bool {
446+
return true
447+
}

pkg/bpf/bpf_kmesh_l4.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,3 +111,7 @@ func (sc *BpfKmesh) Detach() error {
111111
}
112112
return nil
113113
}
114+
115+
func AdsL7Enabled() bool {
116+
return false
117+
}

pkg/cache/v2/cluster.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -164,24 +164,6 @@ func (cache *ClusterCache) Delete() {
164164
}
165165
}
166166

167-
func (cache *ClusterCache) DumpBpf() []*cluster_v2.Cluster {
168-
cache.mutex.RLock()
169-
defer cache.mutex.RUnlock()
170-
clusters := make([]*cluster_v2.Cluster, 0, len(cache.apiClusterCache))
171-
for name, c := range cache.apiClusterCache {
172-
tmp := &cluster_v2.Cluster{}
173-
if err := maps_v2.ClusterLookup(name, tmp); err != nil {
174-
log.Errorf("ClusterLookup failed, %s", name)
175-
continue
176-
}
177-
178-
tmp.ApiStatus = c.ApiStatus
179-
clusters = append(clusters, tmp)
180-
}
181-
182-
return clusters
183-
}
184-
185167
func (cache *ClusterCache) Dump() []*cluster_v2.Cluster {
186168
cache.mutex.RLock()
187169
defer cache.mutex.RUnlock()

pkg/cache/v2/cluster_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package cache_v2
1818

1919
import (
20+
"sort"
2021
"testing"
2122

2223
"github.com/agiledragon/gomonkey/v2"
@@ -177,6 +178,33 @@ func TestClusterFlush(t *testing.T) {
177178
})
178179
}
179180

181+
func TestClusterLookupAll(t *testing.T) {
182+
config := options.BpfConfig{
183+
Mode: "ads",
184+
BpfFsPath: "/sys/fs/bpf",
185+
Cgroup2Path: "/mnt/kmesh_cgroup2",
186+
}
187+
cleanup, _ := test.InitBpfMap(t, config)
188+
t.Cleanup(cleanup)
189+
testClusterNames := []string{"ut-cluster-1", "ut-cluster-2", "ut-cluster-3"}
190+
for _, testClusterName := range testClusterNames {
191+
err := maps_v2.ClusterUpdate(testClusterName, &cluster_v2.Cluster{Name: testClusterName})
192+
assert.Nil(t, err)
193+
}
194+
195+
clusters, err := maps_v2.ClusterLookupAll()
196+
assert.Nil(t, err)
197+
198+
var actualClusterNames []string
199+
200+
for _, cluster := range clusters {
201+
actualClusterNames = append(actualClusterNames, cluster.Name)
202+
}
203+
204+
sort.Strings(actualClusterNames)
205+
assert.Equal(t, actualClusterNames, testClusterNames)
206+
}
207+
180208
func BenchmarkClusterFlush(b *testing.B) {
181209
t := &testing.T{}
182210
config := options.BpfConfig{

pkg/cache/v2/listener.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -127,24 +127,6 @@ func (cache *ListenerCache) Flush() {
127127
}
128128
}
129129

130-
func (cache *ListenerCache) DumpBpf() []*listener_v2.Listener {
131-
cache.mutex.RLock()
132-
defer cache.mutex.RUnlock()
133-
listeners := make([]*listener_v2.Listener, 0, len(cache.apiListenerCache))
134-
for name, listener := range cache.apiListenerCache {
135-
tmp := &listener_v2.Listener{}
136-
if err := maps_v2.ListenerLookup(listener.GetAddress(), tmp); err != nil {
137-
log.Errorf("ListenerLookup failed, %s", name)
138-
continue
139-
}
140-
141-
tmp.ApiStatus = listener.ApiStatus
142-
listeners = append(listeners, tmp)
143-
}
144-
145-
return listeners
146-
}
147-
148130
func (cache *ListenerCache) Dump() []*listener_v2.Listener {
149131
cache.mutex.RLock()
150132
defer cache.mutex.RUnlock()

pkg/cache/v2/listener_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package cache_v2
1818

1919
import (
2020
"fmt"
21+
"sort"
2122
"testing"
2223

2324
"github.com/agiledragon/gomonkey/v2"
@@ -36,6 +37,35 @@ import (
3637
"kmesh.net/kmesh/pkg/utils/test"
3738
)
3839

40+
func TestListenerLookupAll(t *testing.T) {
41+
config := options.BpfConfig{
42+
Mode: "ads",
43+
BpfFsPath: "/sys/fs/bpf",
44+
Cgroup2Path: "/mnt/kmesh_cgroup2",
45+
}
46+
cleanup, _ := test.InitBpfMap(t, config)
47+
t.Cleanup(cleanup)
48+
testListenerNames := []string{"ut-listener-1", "ut-listener-2", "ut-listener-3"}
49+
for i, testListenerName := range testListenerNames {
50+
err := maps_v2.ListenerUpdate(&core_v2.SocketAddress{
51+
Port: uint32(i + 1),
52+
}, &listener_v2.Listener{Name: testListenerName})
53+
assert.Nil(t, err)
54+
}
55+
56+
listeners, err := maps_v2.ListenerLookupAll()
57+
assert.Nil(t, err)
58+
59+
var actualListenerNames []string
60+
61+
for _, listener := range listeners {
62+
actualListenerNames = append(actualListenerNames, listener.Name)
63+
}
64+
65+
sort.Strings(actualListenerNames)
66+
assert.Equal(t, actualListenerNames, testListenerNames)
67+
}
68+
3969
func TestListenerFlush(t *testing.T) {
4070
t.Run("listener status is UPDATE", func(t *testing.T) {
4171
updateListenerAddress := []*core_v2.SocketAddress{}

0 commit comments

Comments
 (0)