还在用perf Buffer? ringBuffer真王朝了
内核态向用户态的通信
在前面两篇文章的demo中,我们通过bpf_printk
输出采集到的数据,但bpf_printk
有几点不便,一是其不支持超过5个的格式化参数,二是其只能往trace_pipe
流输出数据,在查看时不方便,因此一种更可行的方式是将ebpf内核态程序采集到的数据传输到用户态空间,然后主要在用户态对这些数据进行处理和输出,在linux5.8以前,这项工作主要由perf Buffer
实现,从linux5.8开始,内核引入ringBuffer
数据结构,后者在多数情况下被证明是从内核向用户态传输数据的最佳选择
此外就是我们也希望从用户态向内核态传递一些配置参数,比如针对pid进行采集进程过滤,针对这种少量的数据我们可以通过内核态全局变量传递
什么是ringBuffer
引用bpf-developer-tutorial中对ringBuffer特性的描述(这也是一套非常好的ebpf开发教程)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| ### eBPF ringbuf vs eBPF perfbuf
只要 BPF 程序需要将收集到的数据发送到用户空间进行后处理和记录,它通常会使用 BPF perf buffer(perfbuf)来实现。Perfbuf 是每个CPU循环缓冲区的集合,它允许在内核和用户空间之间有效地交换数据。它在实践中效果很好,但由于其按CPU设计,它有两个主要的缺点,在实践中被证明是不方便的:内存的低效使用和事件的重新排序。
为了解决这些问题,从Linux 5.8开始,BPF提供了一个新的BPF数据结构(BPF map)。BPF环形缓冲区(ringbuf)。它是一个多生产者、单消费者(MPSC)队列,可以同时在多个CPU上安全共享。
BPF ringbuf 支持来自 BPF perfbuf 的熟悉的功能:
- 变长的数据记录。 - 能够通过内存映射区域有效地从用户空间读取数据,而不需要额外的内存拷贝和/或进入内核的系统调用。 - 既支持epoll通知,又能以绝对最小的延迟进行忙环操作。
同时,BPF ringbuf解决了BPF perfbuf的以下问题:
- 内存开销。 - 数据排序。 - 浪费的工作和额外的数据复制。
|
总的来说,可以将其理解为一块内核和用户的共享空间,可以借助其进行一些数据交换
如何使用
声明Rb
ringbuf
属于BPF_MAP
,通常通过匿名结构体的形式定义在.maps
段中
1 2 3 4 5 6
| const int PAGESIZE = 4096; struct { __uint(type, BPF_MAP_TYPE_RINGBUF); __uint(max_entries, PAGESIZE * 16); } ringbuf_map SEC(".maps");
|
这里使用btf宏进行定义,这是一种推荐的写法。我们指定type为BPF_MAP_TYPE_RINGBUF
,然后设置max_entries
(对于ringbuf而言就是缓冲区大小)

根据ebpf.io的说明,max_entries
必须设置为页对齐的,且大小必须是2的幂,这里就设置为4kb * 16的大小,实际使用过程中注意别设置太小导致缓冲区爆掉就行
分配空间
1 2 3 4 5 6 7
| struct mydata { char buf[256]; int pid; int syscallID; }; struct mydata *data; data = (struct mydata *)bpf_ringbuf_reserve(&ringbuf_map,sizeof(struct mydata), 0);
|
通过bpf_ringbuf_reserve
从我们声明的ringbuf中分配一块空间用于存储准备传输的数据
然后就正常的把数据写入这个data即可,最后通过bpf_ringbuf_submit
提交数据,提交时选择的flag有发送通知信号,不发送通知信号和自主决定是否发送信号(flag=0)三种,根据情况选择即可,完整的探针代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| const int PAGESIZE = 4096; struct { __uint(type, BPF_MAP_TYPE_RINGBUF); __uint(max_entries, PAGESIZE * 16); } ringbuf_map SEC(".maps");
volatile int targetPid = 0;
SEC("tp/raw_syscalls/sys_enter") int handle_sys_enter(struct trace_event_raw_sys_enter *args) { int id = args->id; struct mydata *data; if (bpf_get_current_pid_tgid() >> 32 != targetPid) { return 0; } data = (struct mydata *)bpf_ringbuf_reserve(&ringbuf_map, sizeof(struct mydata), 0); if (!data) { bpf_printk("Failed to reserve space in ring buffer\n"); return 0; } data->pid = bpf_get_current_pid_tgid() >> 32; data->syscallID = id; bpf_printk("sys_enter: id=%d, pid=%d\n", id, bpf_get_current_pid_tgid() >> 32); char buf[256]; bpf_probe_read_user_str(buf, sizeof(buf), (void *)(args->args[1])); if (id == 0x40) { __builtin_memcpy(data->buf, buf, sizeof(data->buf)); bpf_printk("count: %d", args->args[2]); bpf_printk("write : %s", buf); } bpf_ringbuf_submit(data, 0); return 0; }
|
接收数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| int rbHandle(void *ctx, void *data, unsigned long dataSize) { const struct mydata *mydata = (struct mydata *)data; if (mydata->syscallID == 0x40) { printf("write information to rb: pid=%d, buf=%s \n", mydata->pid, mydata->buf); } else { printf("other syscall : id=%d, pid=%d \n", mydata->syscallID, mydata->pid); } return 0; } struct ring_buffer *rb; rb = ring_buffer__new(bpf_map__fd(skel->maps.ringbuf_map), rbHandle, NULL, NULL); while (1) { ring_buffer__poll(rb, -1); }
|
在用户侧程序中,我们通过ring_buffer__new
在用户态空间同样声明rb,然后绑定rbHandle
作为回调,之后轮询ring_buffer__poll
不断获取数据,这里回调的的第一个参数是上下文,第二个参数是数据结构体,第三个参数是数据大小
ring_buffer__poll
是一种带超时的阻塞式轮询方法,会对获取的每一个数据触发回调,适合简单的demo
通过全局变量传递配置数据
注意到上文中在内核侧程序中声明了一个全局变量targetPid
用于指定pid过滤设置
1
| volatile int targetPid = 0;
|
我们直接在skel
加载后向这个变量赋值即可传递数据,这个等价于直接往内核态程序对应的内存位置写数据
1 2
| skel = bpf_test_bpf__open_and_load(); skel->bss->targetPid = atoi(argv[1]);
|

这样我们就不用每次都去trace_pipe
里看输出了,而且对数据处理的自由度也更高
完整demo代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| #include "mydata.h" #include "vmlinux.h" #include <bpf/bpf_core_read.h> #include <bpf/bpf_helpers.h> #include <bpf/bpf_tracing.h> const int PAGESIZE = 4096; struct { __uint(type, BPF_MAP_TYPE_RINGBUF); __uint(max_entries, PAGESIZE * 16); } ringbuf_map SEC(".maps");
volatile int targetPid = 0;
SEC("tp/raw_syscalls/sys_enter") int handle_sys_enter(struct trace_event_raw_sys_enter *args) { int id = args->id; struct mydata *data; if (bpf_get_current_pid_tgid() >> 32 != targetPid) { return 0; } data = (struct mydata *)bpf_ringbuf_reserve(&ringbuf_map, sizeof(struct mydata), 0); if (!data) { bpf_printk("Failed to reserve space in ring buffer\n"); return 0; } data->pid = bpf_get_current_pid_tgid() >> 32; data->syscallID = id; bpf_printk("sys_enter: id=%d, pid=%d\n", id, bpf_get_current_pid_tgid() >> 32); char buf[256]; bpf_probe_read_user_str(buf, sizeof(buf), (void *)(args->args[1])); if (id == 0x40) { __builtin_memcpy(data->buf, buf, sizeof(data->buf)); bpf_printk("count: %d", args->args[2]); bpf_printk("write : %s", buf); } bpf_ringbuf_submit(data, 0); return 0; } SEC("tp/raw_syscalls/sys_exit") int handle_sys_exit(struct trace_event_raw_sys_exit *args) { int id = args->id; long ret = args->ret; bpf_printk("sys_exit: id=%d, pid=%d, ret=%ld\n", id, bpf_get_current_pid_tgid() >> 32, ret);
return 0; } char _license[] SEC("license") = "GPL";
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
#include "bpf_test.skel.h" #include "mydata.h" #include <bpf/libbpf.h> #include <cerrno> #include <csignal> #include <cstdio> #include <cstring> #include <format> #include <sys/resource.h> #include <unistd.h> static int libbpf_print_fn(enum libbpf_print_level level, const char *format, va_list args) { return vfprintf(stderr, format, args); } int rbHandle(void *ctx, void *data, unsigned long dataSize) { const struct mydata *mydata = (struct mydata *)data; if (mydata->syscallID == 0x40) { printf("write information to rb: pid=%d, buf=%s \n", mydata->pid, mydata->buf); } else { printf("other syscall : id=%d, pid=%d \n", mydata->syscallID, mydata->pid); } return 0; } int main(int argc, char **argv) { struct bpf_test_bpf *skel; struct ring_buffer *rb; int err; if (argc != 2) { printf("need exact one pid"); return 0; } libbpf_set_print(libbpf_print_fn);
skel = bpf_test_bpf__open_and_load(); skel->bss->targetPid = atoi(argv[1]); if (!skel) { fprintf(stderr, "Failed to open BPF skeleton\n"); return 1; } err = bpf_test_bpf__attach(skel); if (err) { fprintf(stderr, "Failed to attach BPF skeleton\n"); goto cleanup; } rb = ring_buffer__new(bpf_map__fd(skel->maps.ringbuf_map), rbHandle, NULL, NULL); if (!rb) { fprintf(stderr, "Failed to create ring buffer\n"); goto cleanup; } printf("while 1 \n"); while (1) { ring_buffer__poll(rb, -1); } cleanup: bpf_test_bpf__destroy(skel); ring_buffer__free(rb); return -err; }
|