Swoole AIO线程池实现协程化的思路

Swoole在实现一些不好Hook的函数的时候,采用了AIO线程池来完成协程化的工作。

它的基本工作思路如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
┌──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐                                               
│ main thread │
│ │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ │ │ │ │ │ │◀─────────────────────────┐
│ │ co 1 │ │ co 2 │ │ co n │ │ │
│ │ │ │ │ │ │ │ │
│ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │ │ receive event and resume the coroutine
└───────────┼────────────────────────────────────────────────┼─────────────────────────────────────────────────┼───────────┘ │
│ │ │ │
dispatch task to pool::queue, dispatch task to pool::queue, dispatch task to pool::queue, │
then yield then yield then yield │
│ │ │ │
│ │ │ │
└────────────────────────────────────────────────┼─────────────────────────────────────────────────┘ │
│ │
│ │
│ ┌────────────────────┐
│ │ │
│ │ unix socket │
│ │ │
▼ │ │
┌────────────────────────────────────────┐ └────────────────────┘
│ │ ▲
│ ThreadPool::queue │ │
│ │ │
└────────────────────────────────────────┘ │
│ │
│ │
│ │
│ │
│ │
┌───────────────────────────────┬────────────────┴──────────────┬────────────────────────────────┐ │
│ │ │ │ │
│ │ │ │ │
pop task from pool::queue pop task from pool::queue pop task from pool::queue pop task from pool::queue │
│ │ │ │ │
│ │ │ │ │
▼ ▼ ▼ ▼ │
┌───────────────────────┐ ┌───────────────────────┐ ┌───────────────────────┐ ┌───────────────────────┐ │
│ │ │ │ │ │ │ │ │
│ AIO thread 1 │ │ AIO thread 2 │ │ AIO thread 3 │ │ AIO thread n │ │
│ │ │ │ │ │ │ │ │
│ │ │ │ │ │ │ │ │
└───────────────────────┘ └───────────────────────┘ └───────────────────────┘ └───────────────────────┘ │
│ │ │ │ │
send event send event send event send event │
└───────────────────────────────┴───────────────────────────────┴────────────────────────────────┴────────────────────────────────────────┘

大概讲一讲这个流程:

1.当一个协程执行一个不好协程化的任务的时候,就会创建一个任务,投递到线程池的queue里面,对应代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
AsyncEvent *dispatch(const AsyncEvent *request) {
if (SwooleTG.aio_schedule) {
schedule();
}
auto _event_copy = new AsyncEvent(*request);
_event_copy->task_id = current_task_id++;
_event_copy->timestamp = swoole_microtime();
_event_copy->pipe_socket = SwooleTG.aio_write_socket;
event_mutex.lock();
_queue.push(_event_copy);
_cv.notify_one();
event_mutex.unlock();
swDebug("push and notify one: %f", swoole_microtime());
return _event_copy;
}

2.投递完任务之后,挂起当前协程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
bool async(const std::function<void(void)> &fn, double timeout) {
TimerNode *timer = nullptr;
AsyncEvent event{};
AsyncLambdaTask task{Coroutine::get_current_safe(), fn};

event.object = &task;
event.handler = async_lambda_handler;
event.callback = async_lambda_callback;

AsyncEvent *_ev = async::dispatch(&event);
if (_ev == nullptr) {
return false;
}
if (timeout > 0) {
timer = swoole_timer_add((long) (timeout * 1000), false, async_task_timeout, _ev);
}
task.co->yield();
// 省略其他代码
}

其他,async_lambda_handler会被AIO线程使用,async_lambda_callback被主线程的调度器使用。

3.当AIO线程抢到一个任务的时候,会调用async_lambda_handler,而async_lambda_handler就会去执行协程投递任务时设置的那个不好协程化的函数。

4.AIO线程执行完任务之后,通过unix socket通知主线程。此时,主线程就会执行async_lambda_callback,这个函数会resume这个任务对应的协程。然后,该协程继续往下运行。

这种方式很好用,但是,我们使用这种方式协程化的时候,需要注意一个问题,不要在任务里面去调用PHP的函数,因为这样就会让AIO线程操作ZendVM。因为主线程和AIO线程同时在修改同一个ZendVM上的数据,会导致一些内存错误。