暴力生成支配树

在编译器进行后端优化的时候,会使用支配树来构造SSA。支配树的生成算法有好几种,这里我们介绍一下最暴力的方法。(Opcache则是使用其他算法来实现,我们可以搜索论文A Simple, Fast Dominance Algorithm找到)

基本定义

支配

在一个图里面,有两个点uw,如果从图的源顶点出发,必须经过u才能到达w,那么我们称u支配w

如下图:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
                        ┌────────────┐                        
│ 1 │
│ │
└────────────┘





┌────────────┐
│ 2 │
│ │
└────────────┘

┌───────────────────────┴───────────────────────┐
│ │
▼ ▼
┌────────────┐ ┌────────────┐
│ 3 │ │ 4 │
│ │ │ │
└────────────┘ └────────────┘
│ │
│ │
└────────────────────────┬───────────────────────┘


┌────────────┐
│ 5 │
│ │
└────────────┘

那么有如下支配关系:

1
2
1支配2,1支配3,1支配4,1支配5
2支配3,2支配4,2支配5

因为34都可以到达5,所以34不支配5

直接支配

如果u支配w,而w的其他支配者支配u,则节点u被认为是节点w的直接支配者,表示为idom (w)

如下图:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
                        ┌────────────┐                        
│ 1 │
│ │
└────────────┘





┌────────────┐
│ 2 │
│ │
└────────────┘

┌───────────────────────┴───────────────────────┐
│ │
▼ ▼
┌────────────┐ ┌────────────┐
│ 3 │ │ 4 │
│ │ │ │
└────────────┘ └────────────┘
│ │
│ │
└────────────────────────┬───────────────────────┘


┌────────────┐
│ 5 │
│ │
└────────────┘

那么有如下直接支配关系:

1
2
1直接支配2
2直接支配3,2直接支配4,2直接支配5

我们发现,12都支配着345。但是,因为1支配了2,所以,按照直接支配的定义,2才是345的直接支配。

定理

1.除了图的源点外,其他点至少有一个点支配着它。

我们从上面的直接支配点可以看出,2345都被支配着。

2.除了图的源点外,其他点只有一个点直接支配着它。(我们可以结合上面的例子来理解)

支配树

我们可以通过edges {(idom(w),w)}来得到支配树。其中,有向图的源点就是支配树的根。

生成支配树的算法

DFS树

可以通过DFS来遍历有向图:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public function domDFS(Vertex $vertex)
{
if (!$this->isVisited($vertex)) {
$this->visitedVertexs[$vertex->name] = true;
foreach ($vertex->nexts as $next) {
if (!$this->isVisited($next)) $this->domDFS($next);
}
}
}

public function computeDominatorTree()
{
foreach ($this->predOrder as $parent) {
$this->visitedVertexs = [];
$this->visitedVertexs[$parent->name] = true;
$this->domDFS($this->predOrder[0]);
foreach ($this->predOrder as $child) {
if (!$this->isVisited($child)) {
$child->dominator = $parent;
}
}
}
}

实际上,这个算法很好理解,非常的直观暴力,但是我们还是来解释下。

前提,对图进行深度优先遍历,得到一组序列。

从第一个序列开始,每次取出一个序列,我们记作s。然后重新对图进行深度优先遍历,但是,如果遇到了当前这个点s,就停止往s这个点后面的点深度遍历了,开始回退,深度遍历其他的点。我们把每一个遍历到的点保存下来,比如放在一个visitedMap里面。最后,我们和所有的点进行对比,不在visitedMap里面的点,就是被当前s这个点支配的。一直重复下去,可以得到每一个点的直接支配点。最终,得到支配树。

在演算的过程中我们发现,对于这个算法,如果有两个点(记作v1v2)可以到达第三个点(记作v3)。那么,当选取v1或者v2进行深度遍历的时候,visitedMap会保存所有的顶点。也就意味着,v1v2不支配任何点。

Swoole AIO线程池实现协程化的思路

Swoole在实现一些不好Hook的函数的时候,采用了AIO线程池来完成协程化的工作。

它的基本工作思路如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
┌──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐                                               
│ main thread │
│ │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ │ │ │ │ │ │◀─────────────────────────┐
│ │ co 1 │ │ co 2 │ │ co n │ │ │
│ │ │ │ │ │ │ │ │
│ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │ │ receive event and resume the coroutine
└───────────┼────────────────────────────────────────────────┼─────────────────────────────────────────────────┼───────────┘ │
│ │ │ │
dispatch task to pool::queue, dispatch task to pool::queue, dispatch task to pool::queue, │
then yield then yield then yield │
│ │ │ │
│ │ │ │
└────────────────────────────────────────────────┼─────────────────────────────────────────────────┘ │
│ │
│ │
│ ┌────────────────────┐
│ │ │
│ │ unix socket │
│ │ │
▼ │ │
┌────────────────────────────────────────┐ └────────────────────┘
│ │ ▲
│ ThreadPool::queue │ │
│ │ │
└────────────────────────────────────────┘ │
│ │
│ │
│ │
│ │
│ │
┌───────────────────────────────┬────────────────┴──────────────┬────────────────────────────────┐ │
│ │ │ │ │
│ │ │ │ │
pop task from pool::queue pop task from pool::queue pop task from pool::queue pop task from pool::queue │
│ │ │ │ │
│ │ │ │ │
▼ ▼ ▼ ▼ │
┌───────────────────────┐ ┌───────────────────────┐ ┌───────────────────────┐ ┌───────────────────────┐ │
│ │ │ │ │ │ │ │ │
│ AIO thread 1 │ │ AIO thread 2 │ │ AIO thread 3 │ │ AIO thread n │ │
│ │ │ │ │ │ │ │ │
│ │ │ │ │ │ │ │ │
└───────────────────────┘ └───────────────────────┘ └───────────────────────┘ └───────────────────────┘ │
│ │ │ │ │
send event send event send event send event │
└───────────────────────────────┴───────────────────────────────┴────────────────────────────────┴────────────────────────────────────────┘

大概讲一讲这个流程:

1.当一个协程执行一个不好协程化的任务的时候,就会创建一个任务,投递到线程池的queue里面,对应代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
AsyncEvent *dispatch(const AsyncEvent *request) {
if (SwooleTG.aio_schedule) {
schedule();
}
auto _event_copy = new AsyncEvent(*request);
_event_copy->task_id = current_task_id++;
_event_copy->timestamp = swoole_microtime();
_event_copy->pipe_socket = SwooleTG.aio_write_socket;
event_mutex.lock();
_queue.push(_event_copy);
_cv.notify_one();
event_mutex.unlock();
swDebug("push and notify one: %f", swoole_microtime());
return _event_copy;
}

2.投递完任务之后,挂起当前协程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
bool async(const std::function<void(void)> &fn, double timeout) {
TimerNode *timer = nullptr;
AsyncEvent event{};
AsyncLambdaTask task{Coroutine::get_current_safe(), fn};

event.object = &task;
event.handler = async_lambda_handler;
event.callback = async_lambda_callback;

AsyncEvent *_ev = async::dispatch(&event);
if (_ev == nullptr) {
return false;
}
if (timeout > 0) {
timer = swoole_timer_add((long) (timeout * 1000), false, async_task_timeout, _ev);
}
task.co->yield();
// 省略其他代码
}

其他,async_lambda_handler会被AIO线程使用,async_lambda_callback被主线程的调度器使用。

3.当AIO线程抢到一个任务的时候,会调用async_lambda_handler,而async_lambda_handler就会去执行协程投递任务时设置的那个不好协程化的函数。

4.AIO线程执行完任务之后,通过unix socket通知主线程。此时,主线程就会执行async_lambda_callback,这个函数会resume这个任务对应的协程。然后,该协程继续往下运行。

这种方式很好用,但是,我们使用这种方式协程化的时候,需要注意一个问题,不要在任务里面去调用PHP的函数,因为这样就会让AIO线程操作ZendVM。因为主线程和AIO线程同时在修改同一个ZendVM上的数据,会导致一些内存错误。

编译器后端优化流程

用一个图来总结一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
                                                   ┌───────────────────────────────┐                                                   
│ │
│ create IRGenerator │
┌────────▶│ │
│ │ │
│ └───────────────────────────────┘
│ │
│ │
│ ▼
│ ┌───────────────────────────────┐
│ │ │
│ │create cfg -- ControlFlowGraph │
│ │ │
┌───────────────────────────────┐ │ │ │
│ │ │ └───────────────────────────────┘
│ 1. init IRGenerator │ │ │
│ │────────┤ │
│ │ │ ▼
└───────────────────────────────┘ │ ┌───────────────────────────────┐
│ │ │ create entry basic block │
│ │ │ │
│ │ │ maybe we should set the block │
│ │ │ index and block name │
│ │ └───────────────────────────────┘
│ │ │
│ │ │
│ │ │
│ │ ▼
│ │ ┌───────────────────────────────┐
│ │ │ │
│ │ │ add the basic block to cfg │
│ └────────▶│ │
│ │ │
│ └───────────────────────────────┘



┌───────────────────────────────┐ ┌───────────────────────────────┐
│ │ │ │
│ 2. generate IR │ │ expressions │
│ │─────────────────▶│ │◀─┐
│ │ │ │ │
└───────────────────────────────┘ └───────────────────────────────┘ │
│ │ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────┐ │
│ │ │ │
▼ │ get the current expression │ │
┌───────────────────────────────┐ │ │ │
│ │ │ │ │
│ 3. build inpred order and │ └───────────────────────────────┘ │
│ inpost order graph │ │ │
│ │ │ │
└───────────────────────────────┘ ▼ │
│ ┌───────────────────────────────┐ │ ┌───────────────────────────────┐
│ │ │ │ │ (maybe we should set up a │
│ │ change the expression to IR │ │ │ mapping between the variable │
▼ │ │──┼────▶│ and the block) │
┌───────────────────────────────┐ │ │ │ │ │
│ │ └───────────────────────────────┘ │ └───────────────────────────────┘
│ 4. build dominator tree │ │ │
│ │ │ │
│ │ │ │
└───────────────────────────────┘ ▼ │ ┌─────────────────────────────────────────┐
│ ┌───────────────────────────────┐ │ │ │
│ │ │ │ │ we should create new basic block for if
▼ │ add IR to basic block │ │ │ statement, loop condition, loop body, │
┌───────────────────────────────┐ │ │──┼────▶│then add basic block link and add to cfg.│
│ │ │ │ │ │(so we should create a branch statement) │
│ 5. insert PhiNodes │ └───────────────────────────────┘ │ │ │
│ │ │ │ │ │
│ │ │ │ └─────────────────────────────────────────┘
└───────────────────────────────┘ └──────────────────┘



┌───────────────────────────────┐
│ │
│ 6. build SSA │
│ │
│ │
└───────────────────────────────┘

什么是好的中间表示

摘抄自虎书

  1. 它必须便于语义分析阶段生成它
  2. 对于希望支持的所有目标机,它必须便于转变成真实的机器语言
  3. 便于对中间表示进行重写,因为后面可能会对中间表示进行优化

抽象语法中的复杂部分并不总是能正好与机器可以执行的复杂指令相对应。因此,中间表示中的个体成分应该只描述特别简单的事情:如单个取、存、加法或指令跳转等操作。这样,抽象语法中的任何复杂部分都可以用一组恰当的抽象机器指令来表示,而这些抽象机器指令则能形成“真正的”机器指令。

MacOS下pwrite无法O_APPEND的问题

这个问题来自Swoole的一个issue

有如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <sys/file.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>

int main(int argc, char const *argv[]) {
int flags = 0;
int fd;

flags = O_CREAT | O_WRONLY;
fd = open("test.txt", flags, 0644);
pwrite(fd, "first line\n", strlen("first line\n"), 0);

flags = O_APPEND | O_CREAT | O_WRONLY;
fd = open("test.txt", flags, 0644);
pwrite(fd, "second line\n", strlen("second line\n"), 0);

flags = O_APPEND | O_CREAT | O_WRONLY;
fd = open("test.txt", flags, 0644);
pwrite(fd, "third line\n", strlen("third line\n"), 0);

return 0;
}

此时,test.txt文件里面的内容是:

1
2
3
third line


我们发现,这实际上没有追加,而是覆盖了之前写入的内容。也就意味着pwriteoffsetO_APPEND没有一起起到作用。

我们换成write来测试追加:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <sys/file.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>

int main(int argc, char const *argv[]) {
int flags = 0;
int fd;

flags = O_CREAT | O_WRONLY;
fd = open("test.txt", flags, 0644);
write(fd, "first line\n", strlen("first line\n"));

flags = O_APPEND | O_CREAT | O_WRONLY;
fd = open("test.txt", flags, 0644);
write(fd, "second line\n", strlen("second line\n"));

flags = O_APPEND | O_CREAT | O_WRONLY;
fd = open("test.txt", flags, 0644);
write(fd, "third line\n", strlen("third line\n"));

return 0;
}

此时,test.txt文件里面的内容是:

1
2
3
4
first line
second line
third line

追加成功了。

Swoole Table内部结构

Swoole Table内部结构还是比较复杂的,这里做一个记录:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
┌───────────────────┐                                                                                                                                                                
│ GlobalMemory │──────────────────▶┌────────────────────────────────────────────┐ ┌───▶───────────────────────────────┐
└───────────────────┘ │ │ │ │ TableColumn │
│ │ │ │ │
│ │ │ ├───────────────────────────────┤
┌───────────────────┐ │ │ │ │enum Type type - int, string...│
│ MemoryBlock │──────────────────▶├────────────────────────────────────────────┤ │ ├───────────────────────────────┤
├───────────────────┤ │ │ │ │ uint32_t size │
│ uint32_t size │─────────────────▶ │ size of memory │ │ ├───────────────────────────────┤
├───────────────────┤ │ │ │ │ std::string name │
│ char memory[0] │─────────┬────────▶├────────────────────────────────────────────┤ │ ├───────────────────────────────┤
└───────────────────┘ │ │ swoole::Table │ │ │ size_t index │
│ │ │ │ └───────────────────────────────┘
│ ├────────────────────────────────────────────┤ │
│ │ std::unordered_map *column_map │────┤ ┌───────────────────────────────┐
│ ├────────────────────────────────────────────┤ │ │ │
│ │ std::vector *column_list │────┘ │ memory_size = size * │
│ ├────────────────────────────────────────────┤ │ sizeof(TableRow *) │
│ │ size_t size - construct size │ │ │
│ ├────────────────────────────────────────────┤ │ memory_size += row_num * │
│ │ size_t memory_size │───────▶│(sizeof(TableRow) + item_size) │
│ ├────────────────────────────────────────────┤ │ │
│ │ size_t item_size - all column size of row │ │ │
│ ├────────────────────────────────────────────┤ │ │
│ │ sw_atomic_t row_num │ │ │
│ ├────────────────────────────────────────────┤ └───────────────────────────────┘
│ │ TableIterator *iterator │
│ ├────────────────────────────────────────────┤
│ │ Mutex *mutex │
│ ├────────────────────────────────────────────┤
│ │ void *memory - save the all rows │─────────▶───────────────────────────────┐
│ ├────────────────────────────────────────────┤ │ size * sizeof(TableRow *) │
│ │ TableRow **rows ├───┐ │ │
│ ├────────────────────────────────────────────┤ │ │ to foreach table │
│ │ │ │ ├───────────────────────────────┤
└─────────▶────────────────────────────────────────────┤ └────▶│ TableRow1 * │──┐ ┌─────────────────────────────────────────┐
│ │ ├───────────────────────────────┤ │ │ TableRow │
│ │ │ TableRow2 * │ │ │ │
│ │ ├───────────────────────────────┤ │ │ sw_atomic_t lock_ │
│ │ │ .... │ │ │ │
│ │ ├───────────────────────────────┤ │ │ uint8_t active - whether the row init │
│ │ │ row_num * (sizeof(TableRow) + │ │ │ │
│ │ │ item_size) │ │ │ uint8_t key_len │
│ │ │ │ │ │ │
│ │ ├───────────────────────────────┤ │ │ TableRow *next │
│ │ │ TableRow 1 │◀─┘ │ │
│ │ ├───────────────────────────────┤ │ char key[SW_TABLE_KEY_SIZE] │
│ │ │ TableRow 2 │ │ │
│ │ ├───────────────────────────────┤ │ char data[0] │
│ │ │ TableRow 3 │ │ │
│ │ ├───────────────────────────────┤ └─────────────────────────────────────────┘
│ │ │ TableRow ... │
│ │ └───────────────────────────────┘
│ │
└────────────────────────────────────────────┘

PHP内核编译if语句

我们有如下脚本:

1
2
3
4
5
6
7
8
9
<?php

$i = 4;

if ($i < 5) {
echo 1;
} else {
echo 2;
}

对应的opcode为:

1
2
3
4
5
6
7
L3    #0     ASSIGN                  $i                   4
L5 #1 IS_SMALLER $i 5 ~1
L5 #2 JMPZ ~1 J5
L6 #3 ECHO 1
L6 #4 JMP J6
L8 #5 ECHO 2
L10 #6 RETURN<-1> 1

首先,我们把if语句的组成部分说一下(当然,这是一个没有包含递归的语法,简化版):

1
T_IF '(' cond_expr ')' true_statement T_ELSE false_statement

对应:

1
2
3
4
5
T_IF => if
cond_exprs => $i < 5
true_statement => echo 1;
T_ELSE => else
false_statement => echo 2;

OK,我们来从编译出来的opcode总结出编译for语句的一般规律:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
   ┌─────────────────────────────────┐   
│ │
│ │
│ cond_exprs │
│ │
│ │
└─────────────────────────────────┘




┌─────────────────────────────────┐
│ │
│ │
│ JMPZ │──┐
│ │ │
│ │ │
└─────────────────────────────────┘ │
│ │
│ │
│ │
▼ │
┌─────────────────────────────────┐ │
│ │ │
│ │ │
│ true_statement │ │
│ │ │
│ │ │
└─────────────────────────────────┘ │
│ │
│ │
│ │
▼ │
┌─────────────────────────────────┐ │
│ │ │
│ │ │
┌──│ JMP │ │
│ │ │ │
│ │ │ │
│ └─────────────────────────────────┘ │
│ │ │
│ │ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ │ │
│ │ │ │
│ │ false_statement │◀─┘
│ │ │
│ │ │
│ └─────────────────────────────────┘
│ │
│ │
│ │
│ │
│ ▼
│ ┌─────────────────────────────────┐
│ │ │
│ │ │
└─▶│ out_if_stmt │
│ │
│ │
└─────────────────────────────────┘

Swoole的writeVector和readVector功能介绍

Swoole v4.5.7版本中,我们新增了两个API,分别是writeVectorreadVector。这篇文章,我们来介绍下这两个方法。

writeVector

该方法在\Swoole\Coroutine\Socket类里面

这个方法用来把分散的字符串一块发送给对端,例如:

1
2
3
4
5
$conn = new \Swoole\Coroutine\Socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
$conn->connect('127.0.0.1', 9501);
$data1 = 'hello';
$data2 = 'world';
$ret = $conn->writeVector([$data1, $data2]);

那么,writeVector就会helloworld一块发送给对端。那在没有writeVector之前,我们是如何发送这两个字符串的呢?

我们有以下两种方式。

方式一:

1
2
3
4
5
6
7
$conn = new \Swoole\Coroutine\Socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
$conn->connect('127.0.0.1', 9501);
$data1 = 'hello';
$data2 = 'world';

$data3 = $data1 . $data2;
$ret = $conn->send($data3);

方式二:

1
2
3
4
5
6
7
$conn = new \Swoole\Coroutine\Socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
$conn->connect('127.0.0.1', 9501);
$data1 = 'hello';
$data2 = 'world';

$ret = $conn->send($data1);
$ret = $conn->send($data2);

方式一和方式二都会有它特有的性能问题。

其中,方式一的思路是,只想调用一次send方法,所以,就先拼接两个字符串。但是,这样会产生内存拷贝。拷贝的字节数是strlen($data1) + strlen(data2)。具体怎么拷贝的,我们可以去查看PHP内核ZEND_CONCAT对应的handler

方式二的思路是,调用两次send方法来发送$data1$data2

因为,send方法只是把字符串从我们的应用空间拷贝到内核空间,不会立马发送字符串给对端(意味着两次send实际上只有一次网络发包的时间),所以,方式一和方式二实际上是在系统调用时间和拷贝字节数之间做权衡。

所以,我们需要writeVector这么一个方法,直接把分散在多个地方的字符串,一块发送出去。这样,应用层不存在字符串拼接,也只需要一次系统调用就行了。

readVector

该方法在\Swoole\Coroutine\Socket类里面

readVectorwriteVector的优化目的是一样的。

使用方法如下:

1
2
3
$conn = new \Swoole\Coroutine\Socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
$conn->connect('127.0.0.1', 9501);
$ret = $conn->readVector([5, 5]);

如果对端发来了helloworld,那么,$ret就会得到对应的数组['hello', 'world']

那么,在没有readVector之前,我们也有两种方式来读取并分成两个字符串。

方式一:

1
2
3
4
5
6
$conn = new \Swoole\Coroutine\Socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
$conn->connect('127.0.0.1', 9501);
$data = $conn->recv(5 + 5);

$data1 = substr($data, 0, 5);
$data2 = substr($data, 5);

此时,我们通过一次系统调用来把字符串读取出来。但是,如果我们要分开来拿到这两个字符串,就需要调用substr来进行字符串截取了。此时,也会发生内存拷贝,拷贝的总字节数是strlen($data1) + strlen($data2)

方式二:

1
2
3
4
$conn = new \Swoole\Coroutine\Socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
$conn->connect('127.0.0.1', 9501);
$data1 = $conn->recv(5);
$data2 = $conn->recv(5);

此时,我们通过两次recv系统调用来读取出两个字符串。

PHP内核编译for语句

我们有如下脚本:

1
2
3
4
5
<?php

for ($i = 0; $i < 10; $i = $i + 1) {
echo $i;
}

对应的opcode为:

1
2
3
4
5
6
7
8
L3    #0     ASSIGN                  $i                   0
L3 #1 JMP J5
L4 #2 ECHO $i
L3 #3 ADD $i 1 ~1
L3 #4 ASSIGN $i ~1
L3 #5 IS_SMALLER $i 10 ~3
L3 #6 JMPNZ ~3 J2
L6 #7 RETURN<-1> 1

首先,我们把for语句的组成部分说一下:

1
T_FOR '(' init_exprs ';' cond_exprs ';' loop_exprs ')' for_statement

对应:

1
2
3
4
5
T_FOR => for
init_exprs => $i = 0
cond_exprs => $i < 10
loop_exprs => $i = $i + 1
for_statement => echo $i;

因为,这些opcode是没有经过任何优化的,所以,我们看到的opcode顺序就是PHP解释器去编译的顺序。

OK,我们来从编译出来的opcode总结出编译for语句的一般规律。

首先是:

1
L3    #0     ASSIGN                  $i                   0

这对应着我们的init_exprs

接着是:

1
L3    #1     JMP                     J5

没有对应的代码。

接着是:

1
L4    #2     ECHO                    $i

对应着我们的for_statement

接着是:

1
2
L3    #3     ADD                     $i                   1                    ~1
L3 #4 ASSIGN $i ~1

对应着我们的loop_exprs

接着是:

1
L3    #5     IS_SMALLER              $i                   10                   ~3

对应着我们的cond_exprs

最后是:

1
L3    #6     JMPNZ                   ~3                   J2

没有对应的代码。

我们可以得到如下图:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
   ┌──────────────────────────────┐   
│ │
│ init_exprs │
│ │
└──────────────────────────────┘



┌──────────────────────────────┐
│ │
┌──│ JMP │◀─┐
│ │ │ │
│ └──────────────────────────────┘ │
│ │ │
│ │ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ │ │
│ │ for_statement │ │
│ │ │ │
│ └──────────────────────────────┘ │
│ │ │
│ │ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ │ │
│ │ loop_exprs │ │
│ │ │ │
│ └──────────────────────────────┘ │
│ │ │
│ │ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ │ │
└─▶│ cond_exprs │ │
│ │ │
└──────────────────────────────┘ │
│ │
│ │
│ │
▼ │
┌──────────────────────────────┐ │
│ │ │
│ JMPNZ │──┘
│ │
└──────────────────────────────┘

这实际上,就是通过ifgoto来实现for循环。

我们在写编译器的时候,就可以借鉴这个模型。

epoll处理连接关闭的一个细节问题

最近在写SwooleCoroutine::Socket::readv功能的时候,遇到了一个处理连接关闭的细节问题。

一般情况下,当本端主动关闭连接之后,会触发EPOLLINEPOLLHUP事件,然后当本端调用recv的时候,返回值会是0,那么,本端就会进行一些清理连接和内存的操作。但是,不是所有的情况都是如此,如果仅仅依赖于这个recv的返回值来判断连接是否断了,那么可能会有一些问题。在alpine上面,收到EPOLLHUP事件之后,调用recv得到的一个EAGAIN错误。那么,如果我们仅仅是判断recv返回了EAGAIN,那么,我们接下来就会直接回到事件循环。并且,这个EAGAIN是持续的,这就导致了死循环了。

所以,我们需要对EPOLLHUP事件进行一次判断,来确定连接有没有关闭,如果得到了EPOLLHUP事件,那么我们就需要进行连接的清理工作了。