Skip to content

Commit eed5c73

Browse files
authored
Merge pull request #1395 from hzxuzhonghu/graceful-shutdown
Fix authz during shutdown
2 parents d4a7ce1 + 897be44 commit eed5c73

File tree

4 files changed

+39
-7
lines changed

4 files changed

+39
-7
lines changed

bpf/kmesh/workload/include/frontend.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ static inline int frontend_manager(struct kmesh_context *kmesh_ctx, frontend_val
3434
direct_backend = true;
3535
}
3636

37-
if (direct_backend) {
37+
if (direct_backend) { // in this case, we donot check the healthy status of the backend, just let it go
3838
// For pod direct access, if a pod has waypoint captured, we will redirect to waypoint, otherwise we do nothing.
3939
if (backend_v->waypoint_port != 0) {
4040
BPF_LOG(

bpf/kmesh/workload/xdp.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ static inline wl_policies_v *get_workload_policies(struct xdp_info *info, struct
7070
}
7171
frontend_v = kmesh_map_lookup_elem(&map_of_frontend, &frontend_k);
7272
if (!frontend_v) {
73-
BPF_LOG(INFO, XDP, "failed to get frontend in xdp");
73+
BPF_LOG(DEBUG, XDP, "failed to get frontend in xdp");
7474
return AUTH_ALLOW;
7575
}
7676
workload_uid = frontend_v->upstream_id;

pkg/controller/workload/workload_processor.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,24 @@ func (p *Processor) removeWorkload(uid string) error {
201201
return p.removeWorkloadFromBpfMap(wl)
202202
}
203203

204+
// handleUnhealthyWorkload is used to handle unhealthy workload, we only leave it in the frontend and backend map.
205+
// Because we rely on the backend map to be aware of direct access in `frontend_manager`,
206+
// and xdp rely on frontend map to get the authz policy and do authz.
207+
func (p *Processor) handleUnhealthyWorkload(workload *workloadapi.Workload) error {
208+
backendUid := p.hashName.Hash(workload.Uid)
209+
210+
// 1. find all endpoint keys related to this workload and delete them
211+
if eks := p.bpf.GetEndpointKeys(backendUid); len(eks) > 0 {
212+
if err := p.deleteEndpointRecords(eks.UnsortedList()); err != nil {
213+
return fmt.Errorf("handleUnhealthyWorkload: deleteEndpointRecords for %s failed: %v", workload.Uid, err)
214+
}
215+
}
216+
217+
// TODO(hzxuzhonghu), maybe need to update backend map to update the workload status
218+
219+
return nil
220+
}
221+
204222
func (p *Processor) removeWorkloadFromBpfMap(workload *workloadapi.Workload) error {
205223
var (
206224
err error
@@ -486,11 +504,11 @@ func (p *Processor) handleWorkload(workload *workloadapi.Workload) error {
486504
p.locality.SetLocality(p.nodeName, workload.GetClusterId(), workload.GetNetwork(), workload.GetLocality())
487505
}
488506

489-
// Exclude unhealthy workload, which is not ready to serve traffic
507+
// Exclude unhealthy workload, which is not ready to serve traffic, but keep it in the frontend
508+
// backend map for authz
490509
if workload.Status == workloadapi.WorkloadStatus_UNHEALTHY {
491510
log.Debugf("workload %s is unhealthy", workload.ResourceName())
492-
// If the workload is updated to unhealthy, we should remove it from the bpf map
493-
return p.removeWorkloadFromBpfMap(workload)
511+
return p.handleUnhealthyWorkload(workload)
494512
}
495513

496514
// 1. update workload in backend map

pkg/controller/workload/workload_processor_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,12 @@ func Test_handleWorkload(t *testing.T) {
160160
assert.Equal(t, got.Status, workloadapi.WorkloadStatus_UNHEALTHY)
161161
checkNotExistInFrontEndMap(t, workload3.Addresses[0], p)
162162

163-
// 8. update workload from healthy to unhealthy, should remove it from bpf map
163+
// 8. update workload from healthy to unhealthy, should remove it from bpf endpoint map
164164
workload2.Status = workloadapi.WorkloadStatus_UNHEALTHY
165165
_ = p.handleWorkload(workload2)
166-
checkNotExistInFrontEndMap(t, workload2.Addresses[0], p)
166+
checkNotExistInEndpointMap(t, p, fakeSvc, []uint32{workload2ID})
167+
checkFrontEndMap(t, workload2.Addresses[0], p)
168+
checkBackendMap(t, p, workload2ID, workload2)
167169

168170
// 9. delete service
169171
p.handleRemovedAddresses([]string{fakeSvc.ResourceName()})
@@ -276,6 +278,18 @@ func checkServiceMap(t *testing.T, p *Processor, svcId uint32, fakeSvc *workload
276278
assert.Equal(t, sv.WaypointPort, nets.ConvertPortToBigEndian(fakeSvc.Waypoint.GetHboneMtlsPort()))
277279
}
278280

281+
func checkNotExistInEndpointMap(t *testing.T, p *Processor, fakeSvc *workloadapi.Service, backendUid []uint32) {
282+
endpoints := p.bpf.GetAllEndpointsForService(p.hashName.Hash(fakeSvc.ResourceName()))
283+
t.Logf("endpoints number: %v", len(endpoints))
284+
285+
all := sets.New[uint32](backendUid...)
286+
for _, endpoint := range endpoints {
287+
if all.Contains(endpoint.BackendUid) {
288+
t.Fatalf("endpoint %v still exists", endpoint.BackendUid)
289+
}
290+
}
291+
}
292+
279293
func checkEndpointMap(t *testing.T, p *Processor, fakeSvc *workloadapi.Service, backendUid []uint32) {
280294
endpoints := p.bpf.GetAllEndpointsForService(p.hashName.Hash(fakeSvc.ResourceName()))
281295
assert.Equal(t, len(endpoints), len(backendUid))

0 commit comments

Comments
 (0)