协程

libco分析,以32位为例

协程

协程结构体

作用类似进程对应的PCB,这里看作协程控制块,包含了协程的上下文数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
struct stCoRoutine_t
{
stCoRoutineEnv_t *env;
pfn_co_routine_t pfn; //实际待执行的协程函数
void *arg; //参数
coctx_t ctx;//协程上下文,就是那些寄存器

//一些状态和标志变量
char cStart;
char cEnd;
char cIsMain;
char cEnableSysHook;
char cIsShareStack;

void *pvEnv;//用于保存系统环境变量的指针

//char sRunStack[ 1024 * 128 ];
stStackMem_t* stack_mem; //协程栈


//save satck buffer while confilct on same stack_buffer;
char* stack_sp;
unsigned int save_size;
char* save_buffer;

stCoSpec_t aSpec[1024];

};

协程栈

libco提供了两种实现方式:独立栈和共享栈

独立栈:为每一个协程分配一个单独的、固定大小的栈。一个协程默认128K.

共享栈:仅为当前正在运行的协程分配栈内存。协程切换出去后,会把他的栈内存拷贝到一个独立的缓冲区,再次调度的时候再拷贝回来。一般来说,一个协程实际的栈空间要小于128KB,这样的方案占用内存会小很多,但是拷贝内存也有时间开销,各有利弊。

协程环境结构体

这个结构体是跟线程绑定的,运行再一个线程上的各个协程共享该环境结构体。

当前运行的协程、上次挂起的协程、嵌套调用的协程栈,一个epoll的封装结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct stCoRoutineEnv_t
{
stCoRoutine_t *pCallStack[ 128 ]; //递归调用的协程栈,最深128
int iCallStackSize; //当前递归深度
stCoEpoll_t *pEpoll;

//for copy stack log lastco and nextco
stCoRoutine_t* pending_co;//共享栈时候用这两个指针
stCoRoutine_t* occupy_co;
};

stCoRoutine_t *GetCurrCo( stCoRoutineEnv_t *env )//获取当前协程
{
return env->pCallStack[ env->iCallStackSize - 1 ];
}

这里的调用协程栈要跟前面的协程栈区分开,是调用栈。

先要知道,libco实现的协程是非对称协程。非对称的意思就是协程间存在明确的调用关系,被调协程只能返回调用者协程。(go实现的是对称协程)。因此这里的调用关系必须用链保存下来,也就是这里的pCallStack。

每次启动(resume)一个协程,就将他的协程控制块的指针保存到栈顶。然后切换上下文到带启动协程。当协程要让出CPU时,也即是yield,就要弹栈,返回切换上下文到栈顶的协程。

主协程,也就是libc的第一个协程,执行main函数的协程,不会yield。

上下文

上下文结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct coctx_t
{
#if defined(__i386__)
void *regs[ 8 ];
#else
void *regs[ 14 ];
#endif
size_t ss_size;//栈大小
char *ss_sp;//栈顶

};
// | regs[0]: ret |
// | regs[1]: ebx |
// | regs[2]: ecx |
// | regs[3]: edx |
// | regs[4]: edi |
// | regs[5]: esi |
// | regs[6]: ebp |
// | regs[7]: eax | = esp

上下文创建

初始化上下文的栈和跳转点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int coctx_make(coctx_t* ctx, coctx_pfn_t pfn, const void* s, const void* s1) {
// make room for coctx_param
char* sp = ctx->ss_sp + ctx->ss_size - sizeof(coctx_param_t);
sp = (char*)((unsigned long)sp & -16L);

coctx_param_t* param = (coctx_param_t*)sp;
void** ret_addr = (void**)(sp - sizeof(void*) * 2);
*ret_addr = (void*)pfn;
param->s1 = s;
param->s2 = s1;

memset(ctx->regs, 0, sizeof(ctx->regs));//通用寄存器初始化

ctx->regs[kESP] = (char*)(sp) - sizeof(void*) * 2;
return 0;
}

struct coctx_param_t
{
const void *s1;
const void *s2;
};

上下文切换

coctx_swap.S

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
   movl 4(%esp), %eax
movl %esp, 28(%eax)
movl %ebp, 24(%eax)
movl %esi, 20(%eax)
movl %edi, 16(%eax)
movl %edx, 12(%eax)
movl %ecx, 8(%eax)
movl %ebx, 4(%eax)


movl 8(%esp), %eax
movl 4(%eax), %ebx
movl 8(%eax), %ecx
movl 12(%eax), %edx
movl 16(%eax), %edi
movl 20(%eax), %esi
movl 24(%eax), %ebp
movl 28(%eax), %esp

ret

协程生命周期

协程创建

1
2
3
4
5
6
7
8
9
10
int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{
if( !co_get_curr_thread_env() )
{
co_init_curr_thread_env();
}
stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );
*ppco = co;
return 0;
}

协程创建后,这时协程还没启动

协程切换

非共享栈情况下,切换上下文即可。

为了节约内存,采用共享栈,每个协程栈无需占据128K而是根据size动态分配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co)
{
stCoRoutineEnv_t* env = co_get_curr_thread_env();

//get curr stack sp
char c;
curr->stack_sp= &c;

if (!pending_co->cIsShareStack)//非共享栈情况下,切换上下文即可。
{
env->pending_co = NULL;
env->occupy_co = NULL;
}
else
{
env->pending_co = pending_co;
//get last occupy co on the same stack mem
stCoRoutine_t* occupy_co = pending_co->stack_mem->occupy_co;
//set pending co to occupy thest stack mem;
pending_co->stack_mem->occupy_co = pending_co;

env->occupy_co = occupy_co;
if (occupy_co && occupy_co != pending_co)
{
save_stack_buffer(occupy_co);
}
}

//swap context
coctx_swap(&(curr->ctx),&(pending_co->ctx) );

//stack buffer may be overwrite, so get again;
stCoRoutineEnv_t* curr_env = co_get_curr_thread_env();
stCoRoutine_t* update_occupy_co = curr_env->occupy_co;
stCoRoutine_t* update_pending_co = curr_env->pending_co;

if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co)
{
//resume stack buffer
if (update_pending_co->save_buffer && update_pending_co->save_size > 0)
{
memcpy(update_pending_co->stack_sp, update_pending_co->save_buffer, update_pending_co->save_size);
}
}
}

参考资料

https://zhuanlan.zhihu.com/p/338767781

https://zhuanlan.zhihu.com/p/178525588

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×