Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions bpf/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ enum large_buf_action : u8 {
};

enum {
k_dns_max_len = 516,
k_dns_max_len = 512, // must be a power of 2
};

#define MAX_SPAN_NAME_LEN 64
Expand Down Expand Up @@ -246,17 +246,16 @@ typedef struct mongo_go_client_req {

typedef struct dns_req {
u8 flags; // Must be first we use it to tell what kind of packet we have on the ring buffer
u8 p_type;
u8 dns_q;
u8 _pad1[1];
u8 _pad1[2];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed an unused field.

u32 len;
connection_info_t conn;
u16 id;
u8 _pad2[2];
tp_info_t tp;
u64 ts;
// we need this to filter traces from unsolicited processes that share the executable
// with other instrumented processes
pid_info pid;
unsigned char buf[k_dns_max_len];
u8 _pad3[4];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to clamp_max a size for the BPF buffer, so I needed a power of 2. I've switched to 512 and I've added a padding.

} dns_req_t;
99 changes: 75 additions & 24 deletions bpf/common/dns.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,36 @@ static __always_inline u8 is_dns(connection_info_t *conn) {
return is_dns_port(conn->s_port) || is_dns_port(conn->d_port);
}

static __always_inline void populate_dns_record(dns_req_t *req,
const pid_connection_info_t *p_conn,
const u16 orig_dport,
const u32 size,
const u8 qr,
const u16 id,
const conn_pid_t *conn_pid) {
__builtin_memcpy(&req->conn, &p_conn->conn, sizeof(connection_info_t));

req->flags = EVENT_DNS_REQUEST;
req->len = size;
req->dns_q = qr;
req->id = bpf_ntohs(id);
req->tp.ts = bpf_ktime_get_ns();
req->pid = conn_pid->p_info;

trace_key_t t_key = {0};
trace_key_from_pid_tid_with_p_key(&t_key, &conn_pid->p_key, conn_pid->id);

const u8 found = find_trace_for_client_request_with_t_key(
p_conn, orig_dport, &t_key, conn_pid->id, &req->tp);

bpf_dbg_printk("handle_dns: looking up client trace info, found %d", found);
if (found) {
urand_bytes(req->tp.span_id, SPAN_ID_SIZE_BYTES);
} else {
init_new_trace(&req->tp);
}
}

static __always_inline u8 handle_dns(struct __sk_buff *skb,
connection_info_t *conn,
protocol_info_t *p_info) {
Expand Down Expand Up @@ -158,30 +188,51 @@ static __always_inline u8 handle_dns(struct __sk_buff *skb,
dns_req_t *req = bpf_ringbuf_reserve(&events, sizeof(dns_req_t), 0);

if (req) {
__builtin_memcpy(&req->conn, conn, sizeof(connection_info_t));

req->flags = EVENT_DNS_REQUEST;
req->p_type = skb->pkt_type;
req->len = skb->len;
req->dns_q = qr;
req->id = bpf_ntohs(hdr.id);
req->ts = bpf_ktime_get_ns();
req->tp.ts = bpf_ktime_get_ns();
req->pid = conn_pid->p_info;

trace_key_t t_key = {0};
trace_key_from_pid_tid_with_p_key(&t_key, &conn_pid->p_key, conn_pid->id);

const u8 found = find_trace_for_client_request_with_t_key(
&p_conn, orig_dport, &t_key, conn_pid->id, &req->tp);

bpf_dbg_printk("handle_dns: looking up client trace info, found %d", found);
if (found) {
urand_bytes(req->tp.span_id, SPAN_ID_SIZE_BYTES);
} else {
init_new_trace(&req->tp);
}
read_skb_bytes(skb, dns_off, req->buf, sizeof(req->buf));
u32 len = skb->len - dns_off;
bpf_clamp_umax(len, 512);
populate_dns_record(req, &p_conn, orig_dport, len, qr, hdr.id, conn_pid);

read_skb_bytes(skb, dns_off, req->buf, len);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a nasty bug. We were reading up to the max of req->buf, but read_skb_bytes failed to grab the last chunk of 16 bytes since we weren't specifically reading exactly as we should. This left us with garbage at the end.

bpf_d_printk("sending dns trace");
bpf_ringbuf_submit(req, get_flags());
}

return 1;
}

return 0;
}

static __always_inline u8 handle_dns_buf(const unsigned char *buf,
const int size,
pid_connection_info_t *p_conn,
u16 orig_dport) {

if (size < sizeof(struct dnshdr)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add an additional way to read the DNS traffic since if OBI is configured with host network, it may not be able to read some internal pod traffic, in this case it wasn't able to see the docker internal DNS traffic between the docker network and the service itself. I reproduced it in our test examples, with setting network_mode: host for OBI.

In this scenario, the socket_filter doesn't see the traffic, but the udp_sendmsg and sock_recvmsg kprobes fire. I added a userspace buffer helper here so we can capture those DNS requests too.

If OBI is running in the network of the target process, we'll see the event twice, once from the sock_filter another time by the kprobes. This isn't a problem because we deduplicate the DNS requests in user space and won't let them be emitted twice.

bpf_d_printk("dns packet too small");
return 0;
}

struct dnshdr hdr;
bpf_probe_read_user(&hdr, sizeof(struct dnshdr), buf);

const u16 flags = bpf_ntohs(hdr.flags);
const u8 qr = dns_qr(flags);

bpf_d_printk("QR type: %d", qr);

if (qr == k_dns_qr_query || qr == k_dns_qr_resp) {
conn_pid_t *conn_pid = bpf_map_lookup_elem(&sock_pids, &p_conn->conn);
if (!conn_pid) {
bpf_d_printk("can't find connection info for dns call");
return 0;
}

dns_req_t *req = bpf_ringbuf_reserve(&events, sizeof(dns_req_t), 0);
if (req) {
populate_dns_record(req, p_conn, orig_dport, size, qr, hdr.id, conn_pid);

bpf_probe_read(req->buf, sizeof(req->buf), buf);
bpf_d_printk("sending dns trace");
bpf_ringbuf_submit(req, get_flags());
}
Expand Down
2 changes: 1 addition & 1 deletion bpf/common/trace_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static __always_inline void trace_key_from_pid_tid(trace_key_t *t_key) {
}

static __always_inline void
trace_key_from_pid_tid_with_p_key(trace_key_t *t_key, pid_key_t *p_key, u64 id) {
trace_key_from_pid_tid_with_p_key(trace_key_t *t_key, const pid_key_t *p_key, u64 id) {
t_key->p_key = *p_key;

u64 extra_id = extra_runtime_id_with_task_id(id);
Expand Down
49 changes: 47 additions & 2 deletions bpf/generictracer/k_tracer.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ int BPF_KPROBE(obi_kprobe_tcp_connect, struct sock *sk) {
}

SEC("kprobe/udp_sendmsg")
int BPF_KPROBE(obi_kprobe_udp_sendmsg, struct sock *sk) {
int BPF_KPROBE(obi_kprobe_udp_sendmsg, struct sock *sk, struct msghdr *msg, size_t len) {
(void)ctx;

u64 id = bpf_get_current_pid_tgid();
Expand All @@ -219,10 +219,31 @@ int BPF_KPROBE(obi_kprobe_udp_sendmsg, struct sock *sk) {
return 0;
}

bpf_dbg_printk("=== udp_sendmsg %llx sock %llx ===", id, sk);
bpf_dbg_printk("=== udp_sendmsg %llx sock %llx len %d ===", id, sk, len);

store_sock_pid(sk);

send_args_t s_args = {.size = len};

if (parse_sock_info(sk, &s_args.p_conn.conn)) {
u16 orig_dport = s_args.p_conn.conn.d_port;
dbg_print_http_connection_info(&s_args.p_conn.conn);
if (is_dns(&s_args.p_conn.conn)) {
sort_connection_info(&s_args.p_conn.conn);
s_args.p_conn.pid = pid_from_pid_tgid(id);
s_args.orig_dport = orig_dport;

unsigned char *buf = iovec_memory();
if (buf) {
len = read_msghdr_buf(msg, buf, len);
if (len) {
bpf_dbg_printk("Got buffer with len %d", len);
handle_dns_buf(buf, len, &s_args.p_conn, orig_dport);
}
}
}
}

return 0;
}

Expand Down Expand Up @@ -806,9 +827,33 @@ int BPF_KRETPROBE(obi_kretprobe_sock_recvmsg, int copied_len) {
info.pid = pid_from_pid_tgid(id);
setup_cp_support_conn_info(&info, false);
setup_connection_to_pid_mapping(id, &info, orig_dport);

if (is_dns(&info.conn)) {
sort_connection_info(&info.conn);

iovec_iter_ctx *iov_ctx = (iovec_iter_ctx *)&args->iovec_ctx;

if (!iov_ctx->iov && !iov_ctx->ubuf) {
bpf_dbg_printk("iovec_ptr found in kprobe is NULL, ignoring this sock_recvmsg");

goto done;
}

unsigned char *buf = iovec_memory();
if (buf) {
copied_len = read_iovec_ctx(iov_ctx, buf, copied_len);
if (!copied_len) {
bpf_dbg_printk("Not copied anything");
} else {
bpf_d_printk("Got potential dns buffer with len %d", copied_len);
handle_dns_buf(buf, copied_len, &info, orig_dport);
}
}
}
}
}

done:
bpf_map_delete_elem(&active_recv_args, &id);

return 0;
Expand Down
1 change: 1 addition & 0 deletions internal/test/oats/sql/docker-compose-beyla-sql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ services:
OTEL_EBPF_BPF_DEBUG: "true"
OTEL_EXPORTER_OTLP_ENDPOINT: "http://collector:4318"
OTEL_EBPF_BPF_BUFFER_SIZE_POSTGRES: 512
OTEL_EBPF_NAME_RESOLVER_SOURCES: "rdns"
depends_on:
testserver:
condition: service_started
6 changes: 3 additions & 3 deletions internal/test/oats/sql/yaml/oats_sql_other_langs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ docker-compose:
files:
- ../docker-compose-beyla-sql.yml
input:
- path: '/query'
- path: "/query"

interval: 500ms
expected:
traces:
- traceql: '{ .db.operation.name = "SELECT" && .db.system.name = "postgresql"}'
spans:
- name: 'SELECT accounting.contacts'
- name: "SELECT accounting.contacts"
attributes:
db.operation.name: SELECT
db.collection.name: accounting.contacts
Expand All @@ -23,5 +23,5 @@ expected:
value: "== 0"
- promql: 'db_client_operation_duration_bucket{le="10", db_system_name="postgresql"}'
value: "> 0"
- promql: 'db_client_operation_duration_count{db_system_name="postgresql"}'
- promql: 'db_client_operation_duration_count{db_system_name="postgresql", server_address="sqlserver"}'
value: "> 0"
8 changes: 4 additions & 4 deletions pkg/ebpf/common/dns_request_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ func readDNSEventIntoSpan(parseCtx *EBPFParseContext, record *ringbuf.Record) (r
Host: hostname,
HostPort: hostPort,
ContentLength: 0,
RequestStart: int64(event.Ts),
Start: int64(event.Ts),
End: int64(event.Ts + 1),
RequestStart: int64(event.Tp.Ts),
Start: int64(event.Tp.Ts),
End: int64(event.Tp.Ts + 1),
TraceID: trace.TraceID(event.Tp.TraceId),
SpanID: trace.SpanID(event.Tp.SpanId),
ParentSpanID: trace.SpanID(event.Tp.ParentId),
Expand Down Expand Up @@ -122,7 +122,7 @@ func readDNSEventIntoSpan(parseCtx *EBPFParseContext, record *ringbuf.Record) (r
if msg.Response {
responseCode = uint16(msg.RCode)
span.Status = int(responseCode)
span.End = int64(event.Ts)
span.End = int64(event.Tp.Ts)
} else {
return *span, true, nil // ignore until we get a response or never hear back
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/export/attributes/attr_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func getDefinitions(
map[attr.Name]Default{
attr.MessagingSystem: true,
attr.MessagingDestination: true,
attr.ServerAddr: true,
},
extraGroupAttributes[GroupMessaging],
)
Expand Down Expand Up @@ -277,6 +278,7 @@ func getDefinitions(
DBClientDuration.Section: {
SubGroups: []*AttrReportGroup{&appAttributes, &appKubeAttributes},
Attributes: map[attr.Name]Default{
attr.ServerAddr: true,
attr.DBOperation: true,
attr.DBSystemName: true,
attr.ErrorType: true,
Expand Down
5 changes: 4 additions & 1 deletion pkg/internal/netolly/flow/reverse_dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ func ReverseDNSProvider(cfg *ReverseDNS, input, output *msg.Queue[[]*ebpf.Record
func checkEBPFReverseDNS(ctx context.Context, cfg *ReverseDNS) error {
if cfg.Type == ReverseDNSEBPF {
// overriding netLookupAddr by an eBPF-based alternative
ipToHosts := store.NewInMemory()
ipToHosts, err := store.NewInMemory(cfg.CacheLen)
if err != nil {
return fmt.Errorf("initializing eBPF-based reverse DNS cache: %w", err)
}
if err := xdp.StartDNSPacketInspector(ctx, ipToHosts); err != nil {
return fmt.Errorf("starting eBPF-based reverse DNS: %w", err)
}
Expand Down
25 changes: 19 additions & 6 deletions pkg/internal/rdns/store/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package store

import (
"sync"

"github.com/hashicorp/golang-lru/v2/simplelru"
)

type DNSEntry struct {
Expand All @@ -18,26 +20,37 @@ type InMemory struct {
access sync.RWMutex
// key: IP address, values: hostname
// TODO: address scenarios where different hostnames point to a same IP
entries map[string][]string
entries *simplelru.LRU[string, []string]
}

func NewInMemory() *InMemory {
return &InMemory{
entries: map[string][]string{},
func NewInMemory(cacheSize int) (*InMemory, error) {
cache, err := simplelru.NewLRU[string, []string](cacheSize, nil)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this implementation to ensure this cache is memory capped.

if err != nil {
return nil, err
}
return &InMemory{
entries: cache,
}, nil
}

func (im *InMemory) Store(entry *DNSEntry) {
im.access.Lock()
defer im.access.Unlock()
for _, ip := range entry.IPs {
// TODO: store IPv4 also with its IPv6 representation
im.entries[ip] = []string{entry.HostName}
im.entries.Add(ip, []string{entry.HostName})
}
}

func (im *InMemory) StorePair(ip, name string) {
im.access.Lock()
defer im.access.Unlock()
im.entries.Add(ip, []string{name})
}

func (im *InMemory) GetHostnames(ip string) ([]string, error) {
im.access.RLock()
defer im.access.RUnlock()
return im.entries[ip], nil
r, _ := im.entries.Get(ip)
return r, nil
}
Loading
Loading