eBPF学习

基本使用

bpf程序加载到内核后,并不会立刻执行。ebpf程序需要事件触发后才会执行,这些事件包括:系统调用、内核跟踪点、内核函数和用户态函数的调用退出、网络事件等等。

首先写一个c程序,ebpf程序:

1
2
3
4
5
int hello_world(void *ctx)
{
bpf_trace_printk("Hello, World!");
return 0;
}

然后通过bbc的python库,加载这个ebpf程序:

1
2
3
4
5
6
7
8
#!/usr/bin/env python3
# This is a Hello World example of BPF.
from bcc import BPF

# load BPF program
b = BPF(src_file="hello.c")
b.attach_kprobe(event="do_sys_openat2", fn_name="hello_world") #内核跟踪
b.trace_print()

然后执行:sudo python3 hello.py即可启动这个ebpf程序

一个完成的ebpf程序包括用户态和内核态两个部分

用户态负责ebpf程序的加载,事件的绑定,ebpf程序运行结果的汇总输出。

内核态运行在ebpf虚拟机中,负责定制和控制系统的运行状态

img

用户态和内核态的交互通过系统调用bpf()来完成。

1
2
3
#include <linux/bpf.h>

int bpf(int cmd, union bpf_attr *attr, unsigned int size);
  • cmd ,代表操作命令,比如上 BPF_PROG_LOAD 就是加载 eBPF 程序

  • attr,代表 bpf_attr 类型的 eBPF 属性指针,不同类型的操作命令需要传入不同的属性参数

  • size ,代表属性的大小

运行原理

eBPF是一个运行在内核中的虚拟机。虽然叫虚拟机,但是跟系统虚拟化比如KVM还是有本质的不同。

系统虚拟化基于X86或arm的通用指令集,来完成计算机的所有功能。而eBPF只提供了非常有限的指令集,只用于完成一部分内核的功能。

eBPF在内核运行是的5个模块组成:

img

  • ebpf辅助函数:提供ebpf程序与内核其他模块进行交互的函数
  • ebpf验证器:确保ebpf程序的安全,保证执行的指令是一个有向无环图(DAG),确保程序不包含不可达指令,不会执行无效指令
  • 存储模块:11个64位寄存器,一个程序计数器,一个512字节的栈
  • 即时编译器:将ebpf字节码编译成本地机器指令,从而在内核中执行。bpf指令加载到内核后,即使编译器会将其编译成本地机器指令,最后才会执行编译后的机器指令。c源代码->bpf指令->机器指令。
  • bpf映射:提供给用户空间程序来访问

BPF辅助函数

ebpf程序不能随意调用内核函数,隐私内核定义了一些列的辅助函数,用来ebpf程序与内核其他模块进行交互。比如的上面例子中的bpf_trace_printk(),用来向debugfs(/sys/kernel/debug/tracing/trace_pipe)中写入调试信息。

bpftool feature probe可以查看当前系统支持的辅助函数列表。

image-20220507142123640

bpf映射

bpf映射提供大块的kv存储,可以被用户程序访问,进而获取ebpf程序的运行状态

img

bpf映射智能通过用户态程序的系统调用来创建,比如:

1
2
3
4
5
6
7
8
9
10
11
12
int bpf_create_map(enum bpf_map_type map_type,
unsigned int key_size,
unsigned int value_size, unsigned int max_entries)
{
union bpf_attr attr = {
.map_type = map_type,
.key_size = key_size,
.value_size = value_size,
.max_entries = max_entries
};
return bpf(BPF_MAP_CREATE, &attr, sizeof(attr)); //返回映射的文件描述符
}

比较关键的是设置映射类型,bpftool feature probe | grep map_type可以查看支持的映射类型

image-20220507140422151

另外,如果ebpf程序使用了bbc库,还可以通过预定义的宏来简化bpf映射的创建过程,比如对于hash表映射,bbc定义了PF_HASH(name, key_type=u64, leaf_type=u64, size=10240)

1
2
3
4
5
6
7
8
9
10
11
12

// 使用默认参数 key_type=u64, leaf_type=u64, size=10240
BPF_HASH(stats);

// 使用自定义key类型,保持默认 leaf_type=u64, size=10240
struct key_t {
char c[80];
};
BPF_HASH(counts, struct key_t);

// 自定义所有参数
BPF_HASH(cpu_time, uint64_t, uint64_t, 4096);

bpftool查看操作映射的常用指令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

//创建一个哈希表映射,并挂载到/sys/fs/bpf/stats_map(Key和Value的大小都是2字节)
bpftool map create /sys/fs/bpf/stats_map type hash key 2 value 2 entries 8 name stats_map

//查询系统中的所有映射
bpftool map
//示例输出
//340: hash name stats_map flags 0x0
//key 2B value 2B max_entries 8 memlock 4096B

//向哈希表映射中插入数据
bpftool map update name stats_map key 0xc1 0xc2 value 0xa1 0xa2

//查询哈希表映射中的所有数据

bpftool map dump name stats_map
//示例输出
//key: c1 c2 value: a1 a2
//Found 1 element

//删除哈希表映射
rm /sys/fs/bpf/stats_map

内核数据结构定义问题

bbc在编译ebpf程序时,需要从内核头文件中找到相应的内核数据结构定义,这就会带来问题。

比如内核头文件的数据结构在不同的内核版本不一样。

从内核 5.2 开始,只要开启了 CONFIG_DEBUG_INFO_BTF,在编译内核时,内核数据结构的定义就会自动内嵌在内核二进制文件 vmlinux 中。

可以借助:bpftool btf dump file /sys/kernel/btf/vmlinux format c > vmlinux.h

把这些数据结构的定义导出到一个头文件中,这样在开发ebpf程序时只要引入一个vmlinux.h即可。

事件触发

内核中不同事件会触发不同的eBPF程序。eBPF程序类型决定了一个eBPF程序可以挂载的事件类型和事件参数。

bpftool feature probe | grep program_type查看支持的程序类型

主要可以分类三大类

  • 跟踪:从内核和程序的运行状态中提取跟踪信息
  • 网络:对网络数据包进行过滤和处理
  • 其他,包括安全控制,BPF扩展等等

跟踪类

主要用于从系统中提取跟踪信息,进而为监控、排错、性能优化等提供数据支撑。

image-20220507150544640

网络类

对网络数据包进行过滤和处理,进而实现网络的观测、过滤、流量控制以及性能优化。

根据触发位置不同,有可以分为XDP(express data path)程序、TC(traffic control流量控制)程序、套接字程序以及cgroup程序。

XDP程序

在网络驱动程序刚刚收到数据包时触发执行,定义为BPF_PROG_TYPE_XDP

XDP程序并没有绕过内核协议栈。而是在内核协议栈之前处理数据包,处理后的数据包还可以正常通过内核协议栈继续处理。

根据网卡和网卡驱动是否原生支持XDP程序,XDP可以分为三种:

  • 通用模式:xdp程序运行在内核中,性能差,一般用于测试,不需要网卡和网卡驱动支持
  • 原生模式:需要网卡驱动支持
  • 卸载模式:网卡固件支持XDP卸载,XDP程序直接运行在网卡上,不需要消耗CPU资源,具有最好的性能。

XDP程序通过ip link命令加载到具体的网卡上。

内核跟踪

问题

即时编译器是事件被触发才会编译吗?

参考

XDP

Linux内核之文件系统

对于read 和 write。

在 VFS 层调用的是 vfs_read 和 vfs_write 并且调用 file_operation。

在 ext4 层调用的是 ext4_file_read_iter 和 ext4_file_write_iter。

接下来就是分缓存 I/O 和直接 I/O。直接 I/O 读写的流程是一样的,调用 ext4_direct_IO,再往下就调用块设备层了。缓存 I/O 读写的流程不一样。对于读,从块设备读取到缓存中,然后从缓存中拷贝到用户态。对于写,从用户态拷贝到缓存,设置缓存页为脏,然后启动一个线程写入块设备。

img

vmware静态ip配置及网络排查

虚拟机静态ip配置,网络排查

hft warm up

内存管理

Linux 内存管理

C++内存管理

hft低延迟技术

编程细节

硬件架构

系统性能调优

Linux低延迟服务系统调优

Linux 对CPU core 屏蔽timer interrupt

Linux获取ns时间戳

RDTSC的坑

Linux 性能优化

CPU

CPU如何读写数据、CPU如何选择线程

CPU cache

基本工具

perf、strace、netstat、lsof、vtune

理解内存性能

kungfu

易筋经

一个journal由多个page组成,一个page由page header和若干个frame组成。一个frame由一个frame header和data组成。frame是数据的最小写入单元。

一个journal只能有一个写入线程,writer在写入时,每一次都是通过一个院子操作在journal中形成一个frame。

每个写入线程对应了一种特定的应用,如行情接受,交易下单等。

image-20220418150835834

image-20220418150915067

x86指令相关

https://www.felixcloutier.com/x86/

C++ STL容器实现

queue

queue其实有两种实现,一种底层是list,一种底层是deque

list

list的实现是一个双向循环链表

deque

是维护了一个map数组,map数组里存了指向几个连续空间的指针,连续空间存放用户数据。同时deque维护了两个迭代器,start和finish。

image-20220330220306952

一个生产者一个消费者deque是线程安全的吗?

比如某个时刻push_back造成map需要重新分配,start迭代器的node需要更新到新的map,还没更新时候,发生了pop_front,需要start的node需要后移,这时push_back的node移动生效,就把pop_front的操作覆盖了。

对于list的情况呢?

由于是双向循环链表的情况,一个线程插入,一个线程删除。添加时,新增的末尾节点要指向第一个节点,还没赋值这个指针的时候,另一个线程进行了头部的删除,头节点已经修改。

为什么ringbuffer就是单生产者单消费者线程安全的了?

muduo学习笔记

调用关系分析

Muduo自顶向下

自顶向下分析muduo的调用逻辑

创建自己的业务server

  • 创建自己的server类,包含了两个私有成员:EventLoop *loop_,TcpServer server_
  • 实现on_messageon_connection,在构造函数里设置回调,设置线程数
  • main函数调用server.start()
  • server.start()会调用内部TcpServer的start,然后调用EventLoop的loop()。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    class EchoServer
    {
    public:
    EchoServer(EventLoop *loop, InetAddress &addr, string name)
    : server_(loop, addr, name), loop_(loop)
    {
    //注册回调函数
    server_.set_connection_callback(bind(&EchoServer::on_connection, this, _1));
    server_.set_message_callback(bind(&EchoServer::on_message, this, _1, _2, _3));

    //设置线程数量
    server_.set_thread_num(3);
    }
    void start()
    {
    server_.start();
    loop_->loop();
    }

    private:
    //连接建立或者断开的回调
    void on_connection(const TcpConnectionPtr &conn)
    {
    if (conn->connected())
    {
    LOG_INFO("conn up: %s", conn->get_peeraddr().get_ip_port().c_str());
    }
    else
    {
    LOG_INFO("conn down: %s", conn->get_peeraddr().get_ip_port().c_str());
    }
    }

    //可读事件回调
    void on_message(const TcpConnectionPtr &conn, Buffer *buffer, TimeStamp time)
    {
    string msg = buffer->retrieve_all_asString();
    conn->send(msg);
    //conn->shutdown();
    }

    private:
    EventLoop *loop_;
    TcpServer server_;
    };

    int main()
    {
    EventLoop loop(1);
    InetAddress addr(8000);
    EchoServer server(&loop, addr, "echo 01");
    server.start();
    //loop.loop(); //启动main loop的底层poller
    return 0;
    }

    TcpServer的start

    业务server调用TcpServer的start

TcpServer有一个main Loop,一个线程池,池子里是所有的subReactor,每个线程也有自己的loop
TcpServer的start会调用自己的线程池的start,然后在自己的main loop里调用listen

1
2
3
4
5
6
7
8
9
//开启服务器监听
void TcpServer::start()
{
if (started_++ == 0) //防止被多次启动
{
thread_pool_->start(thread_init_callback_);
loop_->run_in_loop(bind(&Acceptor::listen, acceptor_.get()));
}
}

线程池的start

创建thread_nums个EventLoopThread,每个thread有自己的一个EventLoop,调用start_loop,返回的eventloop地址然后放到线程池的loops_数组中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void EventLoopThreadPool::start(const ThreadInitCallback &callback)
{
started_ = true;
//整个服务端只有baseloop,也就是mainreactor
if (thread_nums_ == 0)
{
callback(baseloop_);
}
else
{
for (int i = 0; i < thread_nums_; ++i)
{
char buffer[name_.size() + 32] = {0};
snprintf(buffer, sizeof(buffer), "%s %d", name_.c_str(), i);
EventLoopThread *t = new EventLoopThread(callback, buffer);
threads_.push_back(unique_ptr<EventLoopThread>(t));
loops_.push_back(t->start_loop()); //底层开始创建线程,并绑定一个新的eventloop,返回其地址
}
}
}

EventLoopThread的start_loop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
EventLoop *EventLoopThread::start_loop()
{
thread_.start(); //启动线程

EventLoop *loop = nullptr;
{
unique_lock<mutex> lock(thread_mutex_);
while (loop_ == nullptr)
{
condition_.wait(lock);
}
loop = loop_;
}

return loop;
}

里面再调用Thread的start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void Thread::start()
{
started_ = true;
sem_t sem;
sem_init(&sem, false, 0);

//开启线程
thread_ = shared_ptr<thread>(new thread([&]() {
//获取线程tid值
tid_ = Current_thread::tid();
sem_post(&sem);
//执行函数
function_();
}));
//需要等待新创建的线程,获取其线程的id
sem_wait(&sem);
}

这里会创建新的线程,然后新的线程里跑function_, function_实际就是初始化EventLoopThread时传入了的thread_function,在里面会创建EventLoop,然后进入EventLoop的while 1循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//启动的线程中执行以下方法
void EventLoopThread::thread_function()
{
EventLoop loop(0); //创建一个独立的EventLoop,和上面的线程是一一对应 one loop per thread

if (callback_function_)
{
callback_function_(&loop);
}

{
unique_lock<mutex> lock(thread_mutex_);
loop_ = &loop;
condition_.notify_one();
}

loop.loop(); //开启事件循环

//结束事件循环
unique_lock<mutex> lock(thread_mutex_);
loop_ = nullptr;
}

上面说的while 1循环就是调用poller的poll,然后填充active_channel,然后在使用channel来处理不同事件,比如读写的回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
while (!quit_)
{
active_channels.clear();
//监听两类fd 一种是client的fd,一种wakeup的fd
poll_return_time_ = poller_->poll(k_poll_timeout, &active_channels);

for (Channel *channel : active_channels)
{
//Poller监听哪些channel发生事件了,然后上报给eventloop,通知channel处理事件
channel->handle_event(poll_return_time_);
}

//执行当前EventLoop事件循环需要处理的回调操作
do_pending_functors();
}

然后main reactor会调用run_in_loop, 开启acceptor的listen

1
2
3
4
5
6
7
8
9
void Acceptor::listen()
{
LOG_INFO("Acceptor listen called!\n");
listenning_ = true;
accept_socket_.listen();
//借助poller进行监听
accept_channel_.enable_reading();

}

Acceptor

acceptor_是TcpServer类中的一个指针成员。包含两个关键成员:指向mainloop的指针,和用来管理listenfd的channel。

1
2
3
EventLoop *loop_; //acceptor用的用户定义的那个baseloop,也就是mainloop

Channel accept_channel_;

当有新的连接到达时,Acceptor会执行new_connection的回调,里面会选择一个subreactor,创建一个新的connection,放到TcpServer的map里,然后,在这个subreactor里调用establish_connect,establish_connect会像Poller添加关于这个新连接fd的事件监听。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void TcpServer::new_connection(int sockfd, const InetAddress &peeraddr)
{
LOG_INFO("new connection callback called\n");
//轮询算法,选择一个subloop管理channel
EventLoop *ioloop = thread_pool_->get_nextEventLoop();//连接均匀打到每个eventloop上

char buffer[BUFFER_SIZE64] = {0};
snprintf(buffer, sizeof(buffer), "-%s#%d", ip_port_.c_str(), next_conn_id_);
++next_conn_id_;
string conn_name = name_ + buffer;

LOG_INFO("tcp server:: new connection[%s] - new connection[%s] from %s\n", name_.c_str(), conn_name.c_str(), peeraddr.get_ip_port().c_str());

//通过sockfd,获取其绑定的端口号和ip信息
sockaddr_in local;
bzero(&local, sizeof(local));
socklen_t addrlen = sizeof(local);
if (::getsockname(sockfd, (sockaddr *)&local, &addrlen) < 0)
{
LOG_ERROR("new connection get localaddr error\n");
}

InetAddress localaddr(local);

//根据连接成功的sockfd,创建tcpc连接对象
TcpConnectionPtr conn(new TcpConnection(ioloop, conn_name, sockfd, localaddr, peeraddr));

connections_[conn_name] = conn;

//下面回调是用户设置给tcpserver-》tcpconn-》channel-》poller-》notify channel
conn->set_connection_callback(connection_callback_);
conn->set_message_callback(message_callback_);
conn->set_write_complete_callback(write_complete_callback_);

//设置如何关闭连接的回调
conn->set_close_callback(bind(&TcpServer::remove_connection, this, _1));
ioloop->run_in_loop(bind(&TcpConnection::establish_connect, conn));
}

看下run_in_loop怎么实现的:
如果调用方就是这个loop所属的线程,直接调用
否则放到这个eventloop的pending_Functors_里,而eventloop每次poll完,处理完对应channel的handle都会调用do_pending_functors()。

1
2
3
4
5
6
7
8
9
10
11
12
void EventLoop::run_in_loop(Functor cb)
{
//在当前的loop线程中执行回调
if (is_in_loopThread())
{
cb();
}
else //在其他线程执行cb,唤醒loop所在线程执行cb
{
queue_in_loop(cb);
}
}

模块分析

Reactor

image-20220328164152236

EventLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class EventLoop : boost::noncopyable {
public:
EventLoop();
~EventLoop();
void loop();
void assertInLoopThread() {
if (!isInLoopThread()) {
abortNotInLoopThread();
}
}
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
private:
void abortNotInLoopThread();
bool looping_; /* atomic */
const pid_t threadId_; //记录自己所属的线程
};

构造函数会检查当前线程是否创建了EventLoop对象,如果已经创建了就返回错误。这是one loop per thread的要求,一个IO线程只能有一个EventLoop对象。

Channel

用来管理各种callback,比如对于readcallback:

TimerQueue用它来读timerfd

EventLoop用来读eventfd

TcpServer/Acceptor用来读listening socket

TcpConnection 用它来读Tcp socket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class Channel : boost::noncopyable {
public:
typedef boost::function<void()> EventCallback;
Channel(EventLoop *loop, int fd);
void handleEvent();
void setReadCallback(const EventCallback &cb) { readCallback_ = cb; }
void setWriteCallback(const EventCallback &cb) { writeCallback_ = cb; }
void setErrorCallback(const EventCallback &cb) { errorCallback_ = cb; }
int fd() const { return fd_; }
int events() const { return events_; }
void set_revents(int revt) { revents_ = revt; }
bool isNoneEvent() const { return events_ == kNoneEvent; }
void enableReading() {
events_ |= kReadEvent;
update();
}
// void enableWriting() { events_ |= kWriteEvent; update(); }
// void disableWriting() { events_ &= ~kWriteEvent; update(); }
// void disableAll() { events_ = kNoneEvent; update(); }
// for Poller
int index() { return index_; }
void set_index(int idx) { index_ = idx; }
EventLoop *ownerLoop() { return loop_; }
private:
void update();
static const int kNoneEvent;//几种事件的定义
static const int kReadEvent;
static const int kWriteEvent;
EventLoop *loop_;
const int fd_;
int events_;//关心的IO事件,由用户设置
int revents_;//目前活动的事件,由Event/Poller设置
int index_; // used by Poller.
EventCallback readCallback_;
EventCallback writeCallback_;
EventCallback errorCallback_;
};

每个channel只属于一个EventLoop,因此每个channel只属于一个IO线程,每个Channel对象自始至终只负责一个文件描述符的IO事件分发,Channel把不同的IO线程分发为不同的回调,例如ReadCallback、writeCallback等,回调用boost::function。后面的TcpConnection是对Channel的更上层封装,用户一般不使用channel。

由于channel的成员函数只能在IO线程使用,所以更新成员函数不用加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;

Channel::Channel(EventLoop *loop, int fdArg)
: loop_(loop),
fd_(fdArg),
events_(0),
revents_(0),
index_(-1) {
}

void Channel::update() {
loop_->updateChannel(this);
}

void Channel::handleEvent() {
if (revents_ & POLLNVAL) {
LOG_WARN << "Channel::handle_event() POLLNVAL";
}

if (revents_ & (POLLERR | POLLNVAL)) {
if (errorCallback_) errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) {
if (readCallback_) readCallback_();
}
if (revents_ & POLLOUT) {
if (writeCallback_) writeCallback_();
}
}

updata()会调用eventloop的updateChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
```

void Channel::handleEvent() 是Channel的核心,由EventLoop::loop()调用,根据revents_的值分别调用不同的用户回调。

```c++
void EventLoop::loop() {
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false;

while (!quit_) {
activeChannels_.clear();
poller_->poll(kPollTimeMs, &activeChannels_);
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it) {
(*it)->handleEvent();
}
}

LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}

Poller

Poller是IO 多路复用的封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Poller : boost::noncopyable {
public:
typedef std::vector<Channel *> ChannelList;

Poller(EventLoop *loop);

~Poller();

/// Polls the I/O events.
/// Must be called in the loop thread.
Timestamp poll(int timeoutMs, ChannelList *activeChannels); //核心

/// Changes the interested I/O events.
/// Must be called in the loop thread.
void updateChannel(Channel *channel);

void assertInLoopThread() { ownerLoop_->assertInLoopThread(); }

private:
void fillActiveChannels(int numEvents,
ChannelList *activeChannels) const;

typedef std::vector<struct pollfd> PollFdList;
typedef std::map<int, Channel *> ChannelMap; //fd到Channel*的映射

EventLoop *ownerLoop_;
PollFdList pollfds_;
ChannelMap channels_;
};

Channel提供了fd到Channel*的映射,poll()不会在每次调用前构造pollfd数组,而是把它缓存到follfds_里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Timestamp Poller::poll(int timeoutMs, ChannelList *activeChannels) {
// XXX pollfds_ shouldn't change
int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);
Timestamp now(Timestamp::now());
if (numEvents > 0) {
LOG_TRACE << numEvents << " events happended";
fillActiveChannels(numEvents, activeChannels);
} else if (numEvents == 0) {
LOG_TRACE << " nothing happended";
} else {
LOG_SYSERR << "Poller::poll()";
}
return now;
}

poll是Poller的核心功能,调用poll(2)获取当前活动的IO事件,然后放入activeChannels中(fillActiveChannels)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void Poller::fillActiveChannels(int numEvents,
ChannelList *activeChannels) const {
for (PollFdList::const_iterator pfd = pollfds_.begin();
pfd != pollfds_.end() && numEvents > 0; ++pfd) {
if (pfd->revents > 0) { //有事件发生
--numEvents;
ChannelMap::const_iterator ch = channels_.find(pfd->fd);
assert(ch != channels_.end());
Channel *channel = ch->second;
assert(channel->fd() == pfd->fd);
channel->set_revents(pfd->revents);
// pfd->revents = 0;
activeChannels->push_back(channel);
}
}
}

//注pollfd结构
struct pollfd
{
int fd; /* File descriptor to poll. */
short int events; /* Types of events poller cares about. */
short int revents; /* Types of events that actually occurred. */
};

fillActiveChannels() 先遍历了缓存的pollfds_,判断pollfd是否有事件发生,如果有事件,则根据ChannelMap找到该fd对应的Channel,放到activeChannels中。

poll完之后,eventloop会handleEvent。

一个点是,这里不能边遍历pollfds,一边handleEvents,因为handleEvents会添加或删除Channel,从而造成pollfds数组大小改变。另一个原因是Poller的职责是多路复用,并不负责事件分发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void Poller::updateChannel(Channel *channel) {
assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
if (channel->index() < 0) {
// a new one, add to pollfds_
assert(channels_.find(channel->fd()) == channels_.end());
struct pollfd pfd;
pfd.fd = channel->fd();
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
pollfds_.push_back(pfd);
int idx = static_cast<int>(pollfds_.size()) - 1;
channel->set_index(idx); //记住自己在fd数组中的下标
channels_[pfd.fd] = channel;
} else {
// update existing one
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_[channel->fd()] == channel);
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
struct pollfd &pfd = pollfds_[idx];
assert(pfd.fd == channel->fd() || pfd.fd == -1);
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
if (channel->isNoneEvent()) {
// ignore this pollfd
pfd.fd = -1;
}
}
}

updateChannel()负责维护和更新pollfds_数组。为什么说插入删除是logN?

定时器

muduo的定时器实现用了三个类,TimerId、Timer、TimerQueue。

TimerQueue

有了Reactor的基础,在EventLoop加上定时器功能。现代Linux中有timerfd,可以用和处理IO事件相同的方式来处理定时。传统reactor通过控制select和poll的等待事件来实现定时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class TimerQueue : boost::noncopyable {
public:
TimerQueue(EventLoop *loop);
~TimerQueue();

///
/// Schedules the callback to be run at given time,
/// repeats if @c interval > 0.0.
///
/// Must be thread safe. Usually be called from other threads.
TimerId addTimer(const TimerCallback &cb,
Timestamp when,
double interval);

// void cancel(TimerId timerId);

private:

// FIXME: use unique_ptr<Timer> instead of raw pointers.
typedef std::pair<Timestamp, Timer *> Entry;
typedef std::set<Entry> TimerList;

// called when timerfd alarms
void handleRead();

// move out all expired timers
std::vector<Entry> getExpired(Timestamp now);

void reset(const std::vector<Entry> &expired, Timestamp now);

bool insert(Timer *timer);

EventLoop *loop_;
const int timerfd_;
Channel timerfdChannel_; //用来观察timerfd_上的readable事件
// Timer list sorted by expiration
TimerList timers_;
};

TimerQueue提供了两个接口addTimer()和cancel(),只能在它所属的IO线程调用,所以不用加锁。

addTimer()提供给EventLoop使用。EventLoop会把addTimer()封装为更好用的runAt()、runAfter()、runEvery()等函数。

TimerQueue需要管理未到期的Timer,能快速根据当前时间找到已经到期的Timer,同时能高效添加和删除Timer。

EventLoop的改动

新增几个调用定时器的接口。

EventLoop::runInLoop()函数

在IO线程内执行某个用户任务回调。

TCP网络库

Acceptor

acceptor用来accept新的TCP连接,并通过回调通知使用者。这个类是TcpServer使用的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Acceptor : boost::noncopyable {
public:
typedef boost::function<void(int sockfd,
const InetAddress &)> NewConnectionCallback;

Acceptor(EventLoop *loop, const InetAddress &listenAddr);//构造函数则执行TCP服务端的传统步骤,创建socket,bind

void setNewConnectionCallback(const NewConnectionCallback &cb) { newConnectionCallback_ = cb; }

bool listenning() const { return listenning_; }

void listen(); //然后listen

private:
void handleRead(); //调用accept接受新的连接,并回调用户的callback

EventLoop *loop_;
Socket acceptSocket_;//封装了socket,利用RAII管理生命期。这里是个listen socket
Channel acceptChannel_;
NewConnectionCallback newConnectionCallback_;
bool listenning_;
};

TcpServer

管理accept得到的连接,直接给框架的用户使用。用户只需要设置好callback,然后调用start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class TcpServer : boost::noncopyable {
public:

TcpServer(EventLoop *loop, const InetAddress &listenAddr);

~TcpServer(); // force out-line dtor, for scoped_ptr members.

/// Set the number of threads for handling input.
///
/// Always accepts new connection in loop's thread.
/// Must be called before @c start
/// @param numThreads
/// - 0 means all I/O in loop's thread, no thread will created.
/// this is the default value.
/// - 1 means all I/O in another thread.
/// - N means a thread pool with N threads, new connections
/// are assigned on a round-robin basis.
void setThreadNum(int numThreads);

/// Starts the server if it's not listenning.
///
/// It's harmless to call it multiple times.
/// Thread safe.
void start();

/// Set connection callback.
/// Not thread safe.
void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; }

/// Set message callback.
/// Not thread safe.
void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; }

/// Set write complete callback.
/// Not thread safe.
void setWriteCompleteCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; }

private:
/// Not thread safe, but in loop
void newConnection(int sockfd, const InetAddress &peerAddr);

/// Thread safe.
void removeConnection(const TcpConnectionPtr &conn);

/// Not thread safe, but in loop
void removeConnectionInLoop(const TcpConnectionPtr &conn);

typedef std::map<std::string, TcpConnectionPtr> ConnectionMap;

EventLoop *loop_; // the acceptor loop
const std::string name_;
boost::scoped_ptr<Acceptor> acceptor_; // avoid revealing Acceptor
boost::scoped_ptr<EventLoopThreadPool> threadPool_;
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;
bool started_;
int nextConnId_; // always in loop thread
ConnectionMap connections_;
};

内部使用acceptor管理新连接的fd。持有目前存活的TcpConnection的shared_ptr。

TcpConnetction

包含了这个连接对应的socket,和对应的channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class TcpConnection : boost::noncopyable,
public boost::enable_shared_from_this<TcpConnection> {
public:
/// Constructs a TcpConnection with a connected sockfd
///
/// User should not create this object.
TcpConnection(EventLoop *loop,
const std::string &name,
int sockfd,
const InetAddress &localAddr,
const InetAddress &peerAddr);

~TcpConnection();

EventLoop *getLoop() const { return loop_; }

const std::string &name() const { return name_; }

const InetAddress &localAddress() { return localAddr_; }

const InetAddress &peerAddress() { return peerAddr_; }

bool connected() const { return state_ == kConnected; }

//void send(const void* message, size_t len);
// Thread safe.
void send(const std::string &message);

// Thread safe.
void shutdown();

void setTcpNoDelay(bool on);

void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; }

void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; }

void setWriteCompleteCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; }

/// Internal use only.
void setCloseCallback(const CloseCallback &cb) { closeCallback_ = cb; }

// called when TcpServer accepts a new connection
void connectEstablished(); // should be called only once
// called when TcpServer has removed me from its map
void connectDestroyed(); // should be called only once

private:
enum StateE {
kConnecting, kConnected, kDisconnecting, kDisconnected,
};

void setState(StateE s) { state_ = s; }

void handleRead(Timestamp receiveTime);

void handleWrite();

void handleClose();

void handleError();

void sendInLoop(const std::string &message);

void shutdownInLoop();

EventLoop *loop_;
std::string name_;
StateE state_; // FIXME: use atomic variable
// we don't expose those classes to client.
boost::scoped_ptr<Socket> socket_;
boost::scoped_ptr<Channel> channel_;
InetAddress localAddr_;
InetAddress peerAddr_;
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;
CloseCallback closeCallback_;
Buffer inputBuffer_;
Buffer outputBuffer_;
};

TcpConnection使用Channel来获得socket上的IO事件。

Buffer

Buffer是非阻塞Tcp网络编程比不可少的东西。在TcpConnection中作为输入输出缓冲。

多线程TcpServer

EventLoopThreadPool

多线程TcpServer自己的EventLoop只用来接受新连接,而新连接会用其他EventLoop来执行IO。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class EventLoopThreadPool : boost::noncopyable {
public:
EventLoopThreadPool(EventLoop *baseLoop);

~EventLoopThreadPool();

void setThreadNum(int numThreads) { numThreads_ = numThreads; }

void start();

EventLoop *getNextLoop();

private:
EventLoop *baseLoop_;
bool started_;
int numThreads_;
int next_; // always in loop thread
boost::ptr_vector<EventLoopThread> threads_;
std::vector<EventLoop *> loops_;
};

TcpServer每次新建一个TcpConnection就会调用getNextLoop()来取得一个Event-Loop。

使用io_uring的改写

muduo原生支持epoll和poll两种poller,都是为Poller基类派生出来,现在派生出一种新的Poller,叫UringPoller如下,包含了uring需要的资源,如sqe,cqe, 然添加新的接口:

  • add_accept
  • add_socket
  • add_socket_write
  • add_provide_buf

都是向sqe队列中添加读写事件,等待内核完成后返回给cqe队列。

基于iouring实现新的Poller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class UringPoller : public Poller
{
public:
UringPoller(EventLoop *loop);
~UringPoller() override;

TimeStamp poll(int timeout, ChannelList *active_channels) override;

void update_channel(Channel *channel);
void remove_channel(Channel *channel);

private:
//填写活跃的链接
void fill_active_channels(int events_num, ChannelList *active_channels) ;
//更新channel,调用epoll_ctl
//void update(int operation, Channel *channel);
void add_accept(Channel* channel, struct sockaddr *client_addr, socklen_t *client_len, unsigned flags); //新链接
void add_socket_read(Channel* channel, unsigned gid, size_t message_size, unsigned flags);
void add_socket_write(Channel* channel, unsigned flags, const string &buf);
void add_provide_buf(__u16 bid, unsigned gid);

private:
static const int k_init_eventList_size_ = 16;

private:
conn_info* conns;
char bufs_[BUFFERS_COUNT][MAX_MESSAGE_LEN] = {0};
int group_id_ = 1337;
struct io_uring_params params_;
struct io_uring ring_;
struct io_uring_sqe *sqe_;
unsigned count;
struct io_uring_cqe *cqes_[BACKLOG];
};

使用UringPoller

以add_socket_read为例:
当一个新的连接到来,main reactor会用轮询的方式选择一个sub reactor,然后创建TcpConnection,这个新的连接就由选择出来的sub reactor所属的eventloop负责,然后建立连接的回调函数就会在这个eventloop的poller里添加负责管理这个新fd的channel的事件监听。

1
2
3
4
5
6
7
8
9
10
void TcpConnection::establish_connect()
{
set_state(k_connected);
channel_->tie(shared_from_this());
channel_->enable_reading(); //向poller注册channel的epollin事件
printf("get fd from channel fd = %d\n",channel_->get_fd());
loop_->poller_->add_socket_read(channel_.get(), 1337, 0, 0);
//新连接建立
connection_callback_(shared_from_this());
}

这里自己定义了一个结构体conn_info,把这个结构体放在sqe的user_data域里,等内核处理完成后,返回的cqe队列也会有这个conn_info,这样就能知道那个channel发生了(完成了)事件。

1
2
3
4
5
6
typedef struct conn_info {
__u32 fd;
__u16 event;
__u16 bid;
Channel* channel;
} conn_info;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
void UringPoller::add_socket_read(Channel* channel, unsigned gid, size_t message_size, unsigned flags) {
int fd = channel->get_fd();
conn_info *conn_i = &conns[fd];
printf("add_socket_read:fd = %d\n",fd);
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring_);
io_uring_prep_recv(sqe, fd, NULL, 0, 0); //读0长度,只关注事件是否发生。
io_uring_sqe_set_flags(sqe, flags);
sqe->buf_group = gid;

conn_i->fd = fd;
conn_i->event = READ;
conn_i->channel = channel;
io_uring_sqe_set_data(sqe, conn_i);
}

UringPoller的poll实现

对于UringPoller,实现poll成员函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
TimeStamp UringPoller::poll(int timeout, ChannelList *active_channels)
{
count++;
int ret = io_uring_submit_and_wait(&ring_, 0); //提交sq的entry
if (ret < 0) {
printf("Returned from io is %d\n", errno);
perror("Error io_uring_submit_and_wait\n");
LOG_ERROR("%s", "io_uring failure");
exit(1);
}
//将准备好的队列填充到cqes中,并返回已准备好的数目,收割cqe
int cqe_count = io_uring_peek_batch_cqe(&ring_, cqes_, sizeof(cqes_) / sizeof(cqes_[0]));
TimeStamp now(TimeStamp::now());
if (cqe_count > 0)
{
LOG_INFO("%d events happened \n", cqe_count);
fill_active_channels(cqe_count, active_channels);
}
//返回发生事件时间点
return now;
}

ml4sys笔记

  • A Learning-based Approach Towards Automated Tuning of SSD Configurations

提出了LearnedSSD框架。对于新的工作负载,LearnedSSD将提取其特征,并使用相似性比较网络将其与ConfDB(实现中使用leveldb)中的记录进行比较。如果LearnedSSD在其ConfDB中识别出类似的工作负载,它将直接推荐相应的SSD配置,这样我们就可以利用之前学到的经验。否则,LearnedSSD将为工作负载学习新的SSD配置,并将它们添加到其ConfDB中以供将来参考。

开发了一种基于块I/O跟踪的基于学习的聚类方法。选择块I/O跟踪来了解存储工作负载的特征。跟踪特征包括I/O时间戳、I/O大小、设备号、块地址和操作类型。使用主成分分析将数据点转换为二维。之后使用k-means对这些数据点进行聚类。 在对一个新工作负载的所有数据点进行聚类后,我们将计算被检查数据点的中心与现有集群中心之间的距离。如果距离低于一个阈值,属于现有的工作负载集群。如果LearnedSSD无法识别类似群集,LearnedSSD将为新工作负载创建一个新群集。

然后这里就又一个问题,如何生成新配置?

LearnedSSD一次调整一个或两个相关参数,然后保留满足约束的参数。对于其他参数,LearnedSSD将在搜索空间中来回调整其值。一旦我们确定了一种配置的参数,LearnedSSD将使用GPR模型来确定具有最佳预测性能等级(4)的配置。如果其性能等级优于搜索根,LearnedSSD将此配置设置为新的搜索根,并继续下一次搜索迭代。SGD程序(3)的主要挑战是平衡学习精度和开发开销。由于无法保证初始配置集将覆盖整个搜索空间,LearnedSSD必须逐步扩展其搜索空间,以确保能够识别最佳配置。然而,这可能会导致搜索空间爆炸。为了解决这个问题,我们引入了一个启发式利用因子,即正在利用的配置与现有学习配置之间的最小曼哈顿距离[77]。我们还为配置探索中的搜索迭代次数设置了一个阈值(LearnedSSD中默认为20次迭代)。

预测已探索配置的等级。LearnedSSD使用GPR[59]预测新配置的等级(4)。这主要有三个原因。首先,GPR可以提供与深度神经网络几乎相同的性能,尤其是在搜索最优配置和提出建议的建模方面。其次,它在探索新知识和学习知识之间提供了极好的权衡[44,67]。第三,默认情况下,GPR以较低的计算开销提供置信区间[9]。在LearnedSSD中,我们通过指定其均值函数和协方差函数来建立一个新的GPR模型。由于在LearnedSSD中学习之前,性能指标的平均值未知,因此平均值函数配置为可训练。我们使用协方差函数来表示模型的两个相邻点之间相关性 ,采用径向基函数(RBF)核[75]和有理二次核[76]作为回归协方差。我们还添加了一个用于随机噪声模拟的白核[47]

  • Lynx: A Learning Linux Prefetching Mechanism For SSD Performance Model

为了执行顺序预取,需要回答两个主要问题:何时预取,以及预取多少数据。传统的Linux预读机制通过将PoM和PoH关联起来来回答第一个问题。在缺页的情况下启动同步预读操作,在命中的情况下启动异步预读操作。对于第二个问题,预读会在32个连续页面的窗口大小内从存储设备预取数据。当特定文件的未命中率增加时,预读窗口会缩小,因为预取被评估为低效。然后,要预取的数据量是动态的。

Efficient IO with io_uring

本文旨在介绍最新的Linux IO接口,io_uring,并将其与现有的进行比较。我们将讨论它存在的原因、内部工作原理以及用户接口。本文不会详细介绍具体的命令,这些在man手册中。相反,本文将介绍io_uring以及其工作原理。

内核开发

基本特点

Linux是单内核的(也叫宏内核),较少了消息传递的开销(函数调用形式),性能会更好,但是可扩展性就会比较差。为了改善单内核的可维护性,Linux提出了内核模块机制,用户可以在不对内核重新编译的情况下,动态向内核装入和移除模块。

微内核和宏内核的根本区别是,微内核是进程间通信,宏内核走函数调用。

模块的本质是一种目标对象文件,不能独立运行,但是其代码可以在运行时连接到系统中作为内核的一部分运行,从而动态扩展内核的功能。

对比应用程序和内核模块

C语言应用程序 内核模块程序
使用函数 Libc库 内核函数
运行空间 用户空间 内核空间
运行权限 普通用户 超级用户
入口函数 main() module_init()
出口函数 exit() module_exit()
编译 Gcc –c Makefile
连接 Gcc insmod
运行 直接运行 insmod
调试 Gdb kdbug, kdb,kgdb等
  • 内核开发中没有libc,标准头文件。考虑大小,内核页禁止患处,常驻内存,不能太大。

  • 应使用GNU C,不完全符合ANSI C标准

  • 关注的语法特性

    • inline c99标准引用
    • 内联汇编:asm volatile(……)
    • 分支优化预测,likely、unlikely给编译器提示,这是kernel代码中定义的宏,c++20才作为关键字
  • 没有内存保护机制

  • 不要轻易在内核中使用浮点数

    • 内核态进程使用浮点操作时,内核会完成从整数模式到浮点操作模式转换
      • 通常时通过捕获陷阱进行转换
  • 内核本身不能陷入,需要人工保存、恢复浮点寄存器。

  • 函数调用栈很小,所以别用局部大数组

    • 默认情况下,64位栈大小为8kb

    • 不要使用局部数组、不要使用递归调用

      • 非要递归,写尾递归

      • 尾递归

      • f(int n) {
            if(n==1) return 1;
            return n*f(n-1);
        }
        
        改写
        f(int res, int n) {
            if(n==0) return res;
            return f(res*n,n-1);
        }//编译器会自动展开成循环
        <!--code0-->
        
        
        
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
(gdb) disass func1
Dump of assembler code for function func1:
0x00000000000001e5 <+0>: callq 0x1ea <func1+5>
0x00000000000001ea <+5>: push %rbp
0x00000000000001eb <+6>: mov %rsp,%rbp
0x00000000000001ee <+9>: cmp $0x8,%edi //8和edi比较
0x00000000000001f1 <+12>: jle 0x206 <func1+33> //小于等于(小概率)发生跳转
0x00000000000001f3 <+14>: mov $0xa,%esi
0x00000000000001f8 <+19>: mov $0x0,%rdi
0x00000000000001ff <+26>: callq 0x204 <func1+31>
0x0000000000000204 <+31>: jmp 0x217 <func1+50>
0x0000000000000206 <+33>: mov $0x2,%esi
0x000000000000020b <+38>: mov $0x0,%rdi
0x0000000000000212 <+45>: callq 0x217 <func1+50>
0x0000000000000217 <+50>: pop %rbp
0x0000000000000218 <+51>: retq
End of assembler dump.
(gdb) disass func2
Dump of assembler code for function func2:
0x0000000000000219 <+0>: callq 0x21e <func2+5>
0x000000000000021e <+5>: push %rbp
0x000000000000021f <+6>: mov %rsp,%rbp
0x0000000000000222 <+9>: cmp $0x8,%edi //8和edi比较
0x0000000000000225 <+12>: jle 0x23a <func2+33> //编译器问题没起作用,以前的编译器是jg,所以现在编译器大概不支持了。一般默认if里面直接跟的是小概率事件
0x0000000000000227 <+14>: mov $0xa,%esi
0x000000000000022c <+19>: mov $0x0,%rdi
0x0000000000000233 <+26>: callq 0x238 <func2+31>
0x0000000000000238 <+31>: jmp 0x24b <func2+50>
0x000000000000023a <+33>: mov $0x2,%esi
0x000000000000023f <+38>: mov $0x0,%rdi
0x0000000000000246 <+45>: callq 0x24b <func2+50>
0x000000000000024b <+50>: pop %rbp
0x000000000000024c <+51>: retq
End of assembler dump.

内核模块机制:

首先需要了解内核符号表的概念,内核符号表存放了所有模块可以访问的符号及相应的地址,模块声明的任何全局符号都成为内核符号表的一部分。

内核符号表处于内核代码段的_ksymtab部分,其开始地址和结束地址是由C编译器所产生的两个符号来指定:__start_ksymtab和_stop_ksymtab。

内核模块没有main函数,通过回调方式运行

回调:向内核注册函数,然后应用程序触发函数的执行,比如驱动程序在初始化时,向内核注册处理某个设备写操作的函数,当应用程序使用write系统调用写该设备时,内核就会调用注册的回调函数。

内核模块makefile编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
ifneq ($(KERNELRELEASE),)
#检查KERNELRELEASE是否已经被定义
obj-m := PrintModule.o
#说明有一个模块需要从PrintModule.o中构造,而该模块名为PrintModule.ko
PrintModule-objs := DriverMain.o DriverFileOperations.o
#说明PrintModule由多个目标文件构成,一个.o文件就是一个编译单元,一个.c生成一个.o
EXTRA_CFLAGS := -DTEST_DEBUG -ggdb -O0
#DTEST_DEBUG 是代码中定义的debug宏
#-ggdb 加入调试信息
#-O0 优化级别,没有优化
#-O1 基本优化级别
#-o2 主要优化时间效率,不考虑生成的目标文件大小
#-O3 最高优化级别,一般不用
#-Os 优化生成的目标文件大小,并且激活-O2中不增加代码大小的优化选项
#-Og gcc4.8中引入的优化级别。编译快,同时合理提供运行效率。
else
KERNELDIR ?= /lib/modules/$(shell uname -r)/build
#?=表示如果KERNELDIR还没赋值,则赋值
#$(shell uname -r) 获取当前内核版本号
#/lib/modules/$(shell uname -r)存放编译好的内核模块符号信息
#build是一个符号连接,指向了/usr/src/linux-headers-xxxxx-generic,里面包含了内核头文件,用于编译内核模块的各个Makefile
PWD := $(shell pwd)
# 保存当前路径
default: #default都会执行
$(MAKE) -C $(KERNELDIR) M=$(PWD) modules
#MAKE:就是执行make
#-C:切换目录到$(KERNELDIR),因为这里有顶层makefile文件,编译内核模块之前需要对这个顶层的内核模块进行处理
#在顶层makefile文件中,就定义了KERNELRELEASE
#KERNELRELEASE = $(shell cat include/config/kernel.release 2> /dev/null)
#M表示在构造内核模块之前,回到目录PWD,再次执行当前目录下的Makefile
rm *.order *.symvers *.mod.c *.o .*.o.cmd .*.cmd .tmp_versions -rf
endif

优化和调试级别

优化

  • -O0 优化级别,没有优化
  • -O1 基本优化级别
  • -O2 主要优化时间效率,不考虑生成的目标文件大小
  • -O3 最高优化级别,一般不用
  • -Os 优化生成的目标文件大小,并且激活-O2中不增加代码大小的优化选项
  • -Og gcc4.8中引入的优化级别。编译快,同时合理提供运行效率。

调试级别

  • -g 利用操作系统native format生成调试信息,调试器可以直接使用,默认是-g2
    • -g2 包含扩展的符号表,行号,局部或外部变量信息
    • -g3包含2中的所有调试信息,外加源码中定义的宏
  • -ggdb 是gcc为gdb专门生成的调试信息,只能用gdb调,默认是-ggdb2
    • -ggdbx,x跟在-gx的解释一样
  • -g0其实是不包含调试信息,等于不使用-g
  • -g1不包含局部变量和行号有关的调试信息,因此只能用于回溯跟踪(函数调用历史)和堆栈转储

内核源码各目录功能

du -sh 源码大概800mb

arch是体系结构相关的代码

arm/boot是启动相关的代码

mach-xxx开头是不同公司针对硬件平台增加的代码,比如三星的飞思卡尔的,不同的硬件平台配置不同。

/Documentaion 是内核说明

/firmware 固件芯片相关

/init 内核初始化代码,汇编代码会调用start_kernel函数,do_mount挂载文件系统

/usr是测试代码,不用看了

/block 块设备相关代码

/drivers 是驱动代码,占据了内核代码的一半以上。

/fs文件系统代码

/ipc 进程通信相关代码

/kernel 内核核心通用代码,比如进程

/arch/arm/kernel 是体系结构相关的内核代码,kernel里的代码会调用这里面的

/net 网络子系统相关代码

/crypto 加密相关

/sound 声卡相关

/include 内核头文件相关

/lib 通用库,给内核各个模块使用

/mm 内存管理,页表页表管理等等

/scripts 编译内核的脚本

编译内核

顶根目录下有一个makefile文件,各个子目录下也有makefile文件。

顶层makefile通过include子目录下的makefile文件。

顶层makefile首先会include体系结构相关的makefile文件

1
2
3
SRCARCH 	:= $(ARCH)
...
include arch/$(SRCARCH)/Makefile

子目录,顶层makefile会调用这些子目录的makefie:

1
2
3
4
5
6
init-y		:= init/
drivers-y := drivers/ sound/ firmware/
net-y := net/
libs-y := lib/
core-y := usr/
virt-y := virt/

比如drivers/tty里的,Kconfig文件是make menuconfigkey看见的配置文件,makefile里面就是那些文件应该编译成ko或者哪些不需要编译。

里面都是obj打头的变量。-y表示编译到内核里,-m表示编译成驱动的形式

1
2
3
4
5
6
7
8
9
10
11
12
```

Kconfig支持编译到内核里还是编译成内核模块还是不编译。-y表示编译到内核里,-m表示编译成驱动的形式

obj-y += 目录 意思是到下级目录继续编译

build-in.o是这个目录下所有.o文件的链接文件

然后顶层makefile再把所有build-in.o链接成内核镜像

定义lds一个链接文件,位于arch/$(SRCARCH)/kernel下。

export KBUILD_LDS := arch/$(SRCARCH)/kernel/vmlinux.lds

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

```makefile
vmlinux-dirs := $(patsubst %/,%,$(filter %/, $(init-y) $(init-m) \
$(core-y) $(core-m) $(drivers-y) $(drivers-m) \
$(net-y) $(net-m) $(libs-y) $(libs-m) $(virt-y)))

vmlinux-alldirs := $(sort $(vmlinux-dirs) $(patsubst %/,%,$(filter %/, \
$(init-) $(core-) $(drivers-) $(net-) $(libs-) $(virt-))))

init-y := $(patsubst %/, %/built-in.a, $(init-y)) #内置函数
core-y := $(patsubst %/, %/built-in.a, $(core-y))
drivers-y := $(patsubst %/, %/built-in.a, $(drivers-y))
net-y := $(patsubst %/, %/built-in.a, $(net-y))
libs-y1 := $(patsubst %/, %/lib.a, $(libs-y))
libs-y2 := $(patsubst %/, %/built-in.a, $(filter-out %.a, $(libs-y)))
virt-y := $(patsubst %/, %/built-in.a, $(virt-y))

KBUILD_CFLAGS += -I/root/kernel-ml/include这样可以添加头文件

为了方便,下载与当前系统相同的内核版本进行修改,这样编译不容易出错。然后用当前系统的config文件基础上进行。编译内核的时候为了方便调试尽量不要优化。-02改成-O1。-O0会编不过

1
2
3
4
5
6
7
8
cp /boot/config-xxxx-generic /pwd/.config
make oldconfig
make-kpkg clean
make-kpkg --initrd kernel-headers kernel_image
#记得安装apt install kernel-package, libncurses6-dev等
#然后会生成headers.deb和image.deb,在内核源码的上层目录下。
dpkg -i *.deb
reboot

编译成功后的效果:

  • 安装了linux-headers文件

    • ls /usr/src/
      #里面会有对应版本的headers文件夹
      #里面存放了用来编译内核模块的内核makefile,包括顶层makefile,用来编译内核模块
      #include里面就是头文件
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25

      * `/lib/modules`新增了对应版本的文件夹,里边的build、source指向了刚刚编译内核的目录

      * `/boot`目录下增加了config-xxxx,inintrd.img,符号表信息System.map-xxx,内核映像vmlinuz-xx(理解成内核的可执行文件)

      * 启动项里也有增加,`vi /boot/grub/grub.cfg`

      什么是initrd

      系统启动的过程中,一个重要的任务就是mount根文件系统,里面存放了大部分系统程序

      而要mount根文件系统,必须有磁盘的驱动程序和文件系统驱动程序

      由于硬件和兼容性的限制。内核影响的大小不能太大

      Linux需要尽可能的支持多的硬件设备,但是由于内核映像大小的的限制,不能随便把硬件设备驱动程序放入内核映像中。

      于是将各种硬件设备、文件系统的驱动程序模块化。发行商提供内核映像、系统安装程序,系统安装过程中,会跟根据当前硬件设备情况,选出系统启动需要的驱动程序,并据此制作成initrd

      initrd相当于一个临时的根文件系统,其中存放了系统启动必须的各种驱动程序

      ### 修改grub启动项

      ```shell
      grep menuentry /boot/grub/grub.cfg

image-20220303193927327

1
vim /etc/default/grub

image-20220303194017988

修改GRUB_DEFAULTsubmenu之间用>连接

kgdb

原理

image-20220227151447175

  • 一台目标机,target、服务器,运行kgdb,需要调试时。目标机启动kgdb,控制权就移交给kgdb。等待连接gdb连接
  • 一台开发机,host,客户端,运行gdb,调试命令发送给目标机。使用gdb连接目标机的kgdb。发起连接

调试器基本原理

断点如何实现

设置断点,调试器会将断点处的内存修改为0xcc,也就是int3

运行到断点,相当于执行int3的处理函数,就是调试器的主要工作环境

然后调试器再把断点处的0xcc修改为原值

环境搭建

  • step1:编译内核

    • 尽量不要优化编译,去优化。makefile文件里的-O2改成-O1。
    • 不要设置优化大小CONFIG-CC_OPTIMIZE_FOR_SIZE
    • 设置CONFIG_DEBUG_SECTION_MISMATCH,相当于-fno-inline-functions-called-once,避免inline优化
  • step2:利用VMWare clone虚拟机

  • step3:为两个系统配置串口

    • image-20220302144440893
    • image-20220302144510864
    • 目标机:cat /dev/ttyS1
    • 开发机:echo "test" > /dev/ttyS1
  • step4:配置grub.cfg,禁止内核地址随机化(nokaslr),不要直接改.cfg

    • 修改/etc/default/grub文件,增加:GRUB_CMDLINE_LINUX=”nokaslr rootdelay=90quiet splash text kgdboc=ttyS1,115200”
      • 坑:增加后实验2.4会死机?,不知道为什么
    • 执行#update-grub

内核源码调试

目标机:echo g > /proc/sysrq-trigger

开发机:

1
2
3
4
5
6
7
8
9
10
gdb ./vmlinux
set serial baud 115200
target remote /dev/ttyS1
b do_init_module
#获取模块段地址
p mod->sect_attrs->nsections
#找到内核模块各个段的名字
p mod->sect_attrs->attrs[2]->battr->attr->name
#找到模块段的地址
p mod->sect_attrs->attrs[2]->address

找出代码段、数据段、bss段地址:

image-20220303185414477

然后设置各段地址:

add-symbol-file /mnt/hgfs/lilin-linux/homework/1/driver/AddModule.ko 0xffffffffc0680000 -s .data 0xffffffffc0682000 -s .bss 0xffffffffc06824c0

然后就可以对模块进行打断点调试等等。

进程的用户栈和内核栈

用户栈:

基于进程的虚拟地址空间的管理机制实,以VMA的形式实现

内核栈:

每个进程都有自己的内核栈,一般是4k,一个page。作为task_struct的一部分。每个进程可能通过系统调用进入内核,内核会代表进程执行一些代码,会保存一些私有的数据,这时候就要用内核站。

image-20220414220406178

image-20220414220520207

task_struct描述了linux进程的通用部分

里面的一个结构体thread_info 描述了特定体系结构的汇编代码段需要访问的那部分进程的数据。定义在arch/arm/include/asm/thread_info.h

是不同体系下进程的描述。

image-20220414220954751

内核同步场景

源码相关

GDT表管理代码

管理GDTR必须要专门的指令,lgdt和sgdt

内核一定会使用lgdt指令把GDT表的及地址写入GDTR寄存器

所以在源码文件中搜索出现了lgdt指令的地方

方法是利用源码搜索网站结合观察源码一层层找

最后找到内核用gdt_page管理GDT表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
struct pv_cpu_ops {
...
void (*load_gdt)(const struct desc_ptr *);
...
}//间接层,类似于vfs的接口

static inline void load_gdt(const struct desc_ptr *dtr)
{
PVOP_VCALL1(cpu.load_gdt, dtr);
}//x86平台的load_gdt实现

void load_direct_gdt(int cpu)
{
struct desc_ptr gdt_descr;

gdt_descr.address = (long)get_cpu_gdt_rw(cpu);//返回了gdt表的地址
gdt_descr.size = GDT_SIZE - 1;
load_gdt(&gdt_descr);
}//调用了load_gdt的地方
EXPORT_SYMBOL_GPL(load_direct_gdt);


static inline struct desc_struct *get_cpu_gdt_rw(unsigned int cpu)
{
return per_cpu(gdt_page, cpu).gdt;
}//get_cpu_gdt_rw做了什么


struct gdt_page{
struct desc_struct gdt[GDT_ENTRIES];
}//真正的GDT表

struct desc_struct {
u16 limit0;
u16 base0;
u16 base1:8, type: 4, s: 1, dpl: , p:1;
...
}//这里就是Intel手册中的段描述符,就是GDT表的表项。

GDT全局描述符表位于内存中,每个CPU对应一个GDT

__thread和PerCPU

PerCPU

假设有一个per cpu变量int x, x存在于内核映像文件.data..percpu段内。当系统初始化时,内核会为每个cpu都分配per cpu内存空间,并向其中复制一份.data..percpu段内的所有内容。

pvops

pvops接口来源于Xen项目,初衷是建立一个类虚拟化(para-virtualized)内核来适应于不同的hypervisor(虚拟层),当然也包括适应于非虚拟化平台。

pvops将类虚拟化操作分成一系列结构:pv_time_ops,pv_cpu_ops,pv_mmu_ops,pv_lock_opspv_irq_ops

举个例子,x86系统中利用MOV CR3指令来加载页表。pvops将其替换为一个间接跳转到pv_mmu_ops -> write_cr3函数。 每种虚拟化平台,对这些函数都有自己的实现。上面是load_gdt在x86的实现。

https://diting0x.github.io/20170101/pvops/

问题:gdt表是一个cpu一个吗?

page fault两个例子:mmap,fork的写时复制

堆栈地址范围

内核编了,kgdb调

kpti

Linux内核之块IO层

image-20220221113129721

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×