libmemif源码分析

libmemif

memif是Mmeory Interface的缩写,memif库实现了进程间高性能数据包传输

为什么高性能?

  • 共享内存实现进程间通信
  • 无锁环形缓冲区 + 批量收发 + 双缓冲实现数据包传输
  • epoll实现事件模式

环形缓冲区也称为循环队列

特点

  • 支持多线程
  • 支持0拷贝
  • 支持epoll外部处理
  • 支持中断(事件)模式或轮询模式

常用结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/** \brief Memif连接参数
@param socket - Memif socket 句柄, 为 NULL 时使用默认 socket.
默认套接字仅在全局数据库中受支持(参见memif_init)。
自定义数据库不创建默认套接字(参见memif_per_thread_init)。
Memif连接与套接字存储在同一数据库中.
@param secret - 用作身份验证的可选参数
@param num_s2m_rings - 主机到从机的环形缓冲区数量
@param num_m2s_rings - 从机到主机的环形缓冲区数量
@param buffer_size - 共享内存中缓冲区的大小
@param log2_ring_size - 环形缓冲区的大小(以2为底的对数)
@param is_master - 0 == 主机, 1 == 从机
@param interface_id - 用于标识对等连接的id
@param interface_name - 接口名
@param mode - 0 == 以太网(运行在2层), 1 == ip(运行在3层) , 2 == punt/inject
*/
typedef struct
{
memif_socket_handle_t socket; /*!< 默认 = /run/vpp/memif.sock */
uint8_t secret[24]; /*!< 可选 (身份验证) */

uint8_t num_s2m_rings; /*!< 默认 = 1 */
uint8_t num_m2s_rings; /*!< 默认 = 1 */
uint16_t buffer_size; /*!< 默认 = 2048 */
uint8_t log2_ring_size; /*!< 默认 = 10 (1 << 10 = 1024) */
uint8_t is_master;

uint32_t interface_id;
uint8_t interface_name[32];
memif_interface_mode_t mode:8;
} memif_conn_args_t;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/** \brief Memif 数据包缓冲区
@param desc_index - 环形缓冲区描述符索引
@param ring - 指向包含此缓冲区描述符的环形缓冲区的指针
@param len - 数据有效长度
@param flags - 标志位
@param data - 指向共享内存数据的指针
*/
typedef struct
{
uint16_t desc_index;
void *ring;
uint32_t len;
/** 表示buffer不够大,无法包含整个数据包,因此下一个缓冲器包含包的其余部分(链式缓冲区) */
#define MEMIF_BUFFER_FLAG_NEXT (1 << 0)
/** 表示buffer来自Rx环形缓冲区 */
#define MEMIF_BUFFER_FLAG_RX (1 << 1)
uint8_t flags;
void *data;
} memif_buffer_t;

常用API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/** \brief Memif 初始化
@param on_control_fd_update - 文件描述符事件轮询回调函数(用于自定义epoll)
@param app_name - 应用程序名称(被截断为32个字符)
@param memif_alloc - 自定义内存分配器, NULL = 默认(malloc)
@param memif_realloc - 自定义内存重分配器, NULL = 默认(realloc)
@param memif_free - 自定义内存释放器, NULL = 默认(free)

如果参数on_control_fd_update设置为NULL,libmemif将在内部处理文件描述符事件轮询如果设置了有效的回调,则需要通过用户应用程序,所有文件描述符和事件类型都将传入此回调将返回到用户应用程序

初始化内部libmemif结构。创建timerfd(用于在从属模式下通过断开连接的memifs定期请求连接,无需额外的API调用)。此fd通过memif_control_fd_update_t传递给用户

计时器在此状态下处于非活动状态。如果在从属模式下至少有一个memif,则会激活。

\return memif_err_t
*/
int memif_init (memif_control_fd_update_t * on_control_fd_update,
char *app_name, memif_alloc_t * memif_alloc,
memif_realloc_t * memif_realloc, memif_free_t * memif_free);

/** \brief 创建内存接口并连接
@param conn - 返回连接句柄 [out]
@param args - 连接参数
@param on_connect - 连接上触发的回调函数
@param on_disconnect - 断开连接触发的回调函数
@param on_interrupt - 通知用户中断,如果设置为null,则不会通知用户中断,用户可以使用memif_get_queue_efd调用获取中断fd以轮询事件
@param private_ctx - 通过回调传递回用户的上下文指针

主机模式 -
Start timer that will send events to timerfd. If this fd is passed to memif_control_fd_handler
every disconnected memif in slave mode will send connection request.
On success new fd is passed to user with memif_control_fd_update_t.

从机模式 -
Create listener socket and pass fd to user with memif_control_fd_update_t.
If this fd is passed to memif_control_fd_handler accept will be called and
new fd will be passed to user with memif_control_fd_update_t.


\return memif_err_t
*/
int memif_create (memif_conn_handle_t * conn, memif_conn_args_t * args,
memif_connection_update_t * on_connect,
memif_connection_update_t * on_disconnect,
memif_interrupt_t * on_interrupt, void *private_ctx);

/** \brief 发送缓冲区脉冲
@param conn - 连接句柄
@param qid - 队列ID
@param bufs - 数据包缓冲区(数组)
@param count - 要传输的数据包缓冲区数量
@param tx - 返回已传输的数据包缓冲区数量 [out]

\return memif_err_t
*/
int memif_tx_burst (memif_conn_handle_t conn, uint16_t qid,
memif_buffer_t * bufs, uint16_t count, uint16_t * tx);

/** \brief 接受缓冲区脉冲
@param conn - 连接句柄
@param qid - 队列ID
@param bufs - 数据包缓冲区(数组)
@param count - 要接收的数据包缓冲区数量
@param rx - 返回已接收的数据包缓冲区数量 [out]

为接收队列消耗中断事件.
如果memif_rx_burst失败,则事件不会被消费.

\return memif_err_t
*/
int memif_rx_burst (memif_conn_handle_t conn, uint16_t qid,
memif_buffer_t * bufs, uint16_t count, uint16_t * rx);

源码分析

下文中的消息队列指的是建立socket连接后,libmemif使用单链表实现的简单队列,用于进行建立连接类的消息通讯

下文中的数据传输队列指的是建立memif连接后,libmemif使用共享内存实现的环形缓冲区,用于进行数据包的传输

初始化memif_init流程

  1. 注册自定义内存管理函数
  2. 注册自定义epoll轮询回调函数
  3. 初始化连接列表、中断列表、socket列表等
  4. 创建定时器,并初始化为2秒(用于重连)
  5. 创建默认memif_socket_t结构体(注意!不是socket)

创建memif_create流程

  1. 将连接参数、回调函数等添加到memif_socke的interface_list里
  2. 如果是主机,就开始监听连接(监听到的从机socket连接添加socket_list里,并挂到epoll上)
  3. 如果是从机,就将连接参数添加到control_list里(由于还没连上主机,所以此时contort_list里的fd是-1),并尝试连接到主机(如果连接失败就启动定时器一直尝试重连)

请求连接memif_request_connection流程

  1. 创建AF_UNIX域socket,尝试连接到主机
  2. 注册socket的读、写回调(关键!读回调用于与主机进行简单的通信。协商创建共享内存域、环形缓冲区等等,写回调用于发送消息队列中的消息给对端)
  3. 修改control_list里的fd为socket的fd,并将fd挂到epoll上(用于通知读、写回调)
  4. (如果连接失败就启动定时器一直尝试重连)

主机accept后流程

  1. 直接发送HELLO消息(不使用消息队列)

HELLO消息处理流程

  1. 协商队列数量、单个数据包大小。获取对端接口名等
  2. 初始化共享内存域和循坏队列(数据传输队列、消息传输队列)
  3. 接口密码(可选)消息入队
  4. 共享内存域、环形缓冲区的信息入队

INIT消息处理流程

  1. 处理接口密码(可选)入队
  2. ACK入队

创建数据传输队列流程

  1. 根据连接参数,先创建合适大小的共享内存
  2. 将共享内存分为2个区域,区域0存储环形缓冲区的信息;区域1存储具体的数据包内容
  3. 创建数据传输队列,每个数据传输队列有2个环形缓冲区:Rx和Tx
  4. 为每个环形缓冲区都创建一个eventfd

eventfd用于写入数据后,触发对端的中断on_interrupt回调

传送数据包memif_tx_burst过程

  1. 根据队列ID拿到对应的Tx环形缓冲区(下文称为ring)
  2. 计算掩码(用于确定数据包缓冲区在ring中的位置)
  3. 循环要传输的数据包
  4. 使用内存屏障保证所有数据包已经正确写入长度
  5. 触发对端的中断on_interrupt回调

接收数据包memif_rx_burst过程

  1. 根据队列ID拿到对应的Rx环形缓冲区(下文称为ring)
  2. 计算掩码(用于确定数据包缓冲区在ring中的位置)
  3. 循环取出数据包
  4. 如果是从机,在取完数据后需要重置数据包缓冲区大小
  5. 记录下这次已经处理完ring的索引(用于下次确定从ring的哪里开始读取数据包)

扩展知识

为啥环形缓冲区高性能?

当一个数据元素被用掉后,其余数据元素不需要移动其存储位置,从而减少拷贝提高效率

为啥循环缓冲区的大小只能是以2为底的对数?

实现环形缓冲区需要使用取模运算(用于确定头、尾指针在环中的位置)

而众所周知位运算是最快的一组运算,当然也比取模运算快

而所有以2为第的对数,都可以将取模运算转换成位运算

即:若 $ M = 2^x $且 \(x\) 为自然数,则以下公式成立 \[ M \bmod N = M\ \\\&\ (\ N\ -\ 1\ ) \]

所以8 % 7就可以写成 8 & (7 - 1),提高了运算性能

等式证明

假设$ M = 8 = 2^3 \(,那么\) M - 1 = 7 $,二进制为0000 0111b

  1. 若 $ M < 8 $, $ M = M $ , $ M = M $,等式成立

  2. 若 $ M > 8 \(,\) M = 2a+2b+2^c+... $ 比如,$ 51 = 1+2+16+32 = 20+21+24+25 $ ,求 $ 51 $时,由于7的二进制是0000 0111b,所以2的幂只要大于等于\(2^3\)的数,与上7结果都是0,所以$ 2^4 = 0 , 2^5 \& 7 = 0, (20+21+24+25) \& (7) = 20+21=3 \(。而根据结论1,\)(20+21) \& 7 = (20+21) \(,所以\) 51 \& 7=51 $

综上得证。

为啥可以实现无锁并发?

  1. volatile关键字

  2. 内存屏障

  3. volatile关键字阻止了编译器为了提高速度将变量缓存起来,而修改后不写回

  4. 内存屏障阻止了现代CPU的乱序执行,保证在屏障之前的指令不会在屏障之后执行

这样就保证了传输数据时,每个数据包的长度都被正确写入后,再通知对端

如果不加内存屏障,可能数据、数据长度还没写完,由于CPU的乱序执行优化,就通知对端,这样读到的缓冲区内的数据就不完整

只能在单生产/单消费模式时不需要加锁同步

为啥源码中关于socket消息的结构体都有__attribute__((packed))

这个编译指令是什么?

告诉编译器按照实际占用字节数进行内存对齐

存在的意义?

其实这里的做法和Linux内核中关于网络协议的实现一致

因为不同平台(指操作系统、编译器等)的内存对齐方式不同,如果使用结构体进行平台间的通信,就可能会有问题

举例: 假设没有使用 __attribute__((packed))

发送消息的程序是GCC编译的,xxx_msg_t结构体默认的内存对齐策略为24字节,而接受消息的程序是另一个编译器,而另一个编译器下xxx_msg_t结构体默认的内存对齐策略为32字节(只是随便举个例子),那么每个变量对应的值就不对了。

memif_ring_t里的cacheline是个什么玩意?

Cache Line是什么?

由于CPU高速缓存的存在,CPU不再是按字节访问内存,而是以64字节为单位的块(Chunk)拿取,称为一个缓存行(Cache Line)

而当你读一个特定的内存地址,整个缓存行将从主存/低速缓存换入高速缓存,以提供高速访问

这时候有大聪明要问了,为啥CPU不能直接读主存呢,多省事啊?因为CPU运行速度极快,而主存完全跟不上CPU的速度,直接读主存会拖累CPU的运行速度(CPU长时间处于等待IO状态中)

Cache Line对齐是什么?

默认情况下,结构体内存会按照数据大小对齐, 而Cache Line对齐就是将结构体的内存对齐到与Cache Line的大小一致

存在的意义?

Cache Line对齐是对CPU缓存优化的一种方式

一般来说

如果按数据大小对齐,结构体里的数据可能会跨Cache行存放,CPU读取时就需要访问多次缓存行,影响性能

而我们可以将结构体按照一个缓存行的大小(64字节)进行内存对齐

这样就可以一次将整个结构体读入Cache中,减少CPU高级缓存与低级缓存、内存的数据交换次数

但在memif_ring_t中,其实另有其他更重要的作用

memif_ring_t里使用Cache Line对齐意义?

  • cacheline0的作用

这行的意义不明,在GCC下有没有这行都一样。个人猜测应该是为了GDB调试时方便查看结构体的内存布局

  • cacheline1和cacheline2的作用
1
2
3
4
5
6
7
8
9
typedef struct
{
....
volatile uint16_t head;
MEMIF_CACHELINE_ALIGN_MARK (cacheline1);
volatile uint16_t tail;
MEMIF_CACHELINE_ALIGN_MARK (cacheline2);
....
} memif_ring_t;

强制head与tail不在同的Cache Line中

为什么需要将两个volatile变量存在不同的Cache Line中?

为了解决伪共享的问题

在多CPU环境下,首先明确volatile变量在某个CPU修改后,在写回时,会通知其他CPU重新读取缓存,以确保一致性

如果两个volatile变量XY同一个Cache Line,而这两个变量某一个被修改时,由于volatile的特性,另一个CPU就会被通知重读缓存。这种无谓的通知就浪费了性能。这种现象就叫伪共享

而将两个volatile的变量放到不同的Cache Line中,就不需要一直通知另一个CPU更新数据了,因为另一个CPU根本没有也不需要这个数据

Linux下查看Cache Linux大小$ cat /sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size

如何进一步提高性能?

  1. memif的最关键的性能瓶颈在于数据包的拷贝,所以可以尝试使用SIMD(AVX2、AVX512)指令集提高拷贝性能
  2. 在流量密集情况下使用轮询模式,而非中断模式,减少中断开销(实现NAPI)
  3. 使用自定义的内存管理模块(比如内存池)。以提高缓存命中、减低内存碎片化
  4. 如果你的程序中有其他模块需要使用到epoll,请使用自定义的epoll,以减少多余的内存消耗、以及多个线程epoll_wait的开销
  5. 实现0拷贝(直接修改接收到的数据包,再直接重新入队)
  6. 使用线程池实现多线程(不要开太多工作线程,一般与CPU核心数一致即可,减少CPU在线程上下文的切换成本)

epoll使用红黑树实现,提供O(logN)的查询时间复杂度,即使大量挂载也比多开几个epoll的成本(额外的CPU线程上下文切换成本、内存成本)小

常见Q&A

Q: 连接不上,抛出Connection refused异常,或刚连就直接触发Disconnect事件

A: 检查两个进程之间连接参数是否一致!比如Socket的位置、Rx / Tx队列数是否一致等等

Q: 无法向共享内存缓冲区memif_buffer_t中拷贝数据,抛出segmentation fault异常

A: 检查是否手动为共享内存缓冲区memif_buffer_t分配内存。注意:这里的手动分配内存不是指调用memif_buffer_alloc

Q: 连接正常,但发出去的数据对面收不到,也都不抛出异常

A: 检查对方是否正确的实现了on_interrupt或轮询

Q: 我想用我自己的epoll(自己处理epoll事件)怎么办?

A: memif_init时传入epoll事件的回调,可以参考/examples/icmp_responder-epoll中的实现

Q: memif_init报错怎么办?

A: 确保你的memif_init整个进程运行时只被调用过一次