Swoole Server 消息回调函数4倍性能提升

前言

Swoole4.5版本中(目前还未发布),我们的Server有一个性能需要优化的地方,就是worker进程在收到master进程发来的包的时候,需要进行两次的拷贝,才可以把数据从PHP扩展层传递到PHP上层(也就是我们事件回调函数需要拿到的data)。

优化前

我们先来分析一下为什么会有性能的问题。首先,我们需要一份会有性能问题的代码。我们git cloneswoole-src代码,然后git checkout8235c82fea2130534a16fd20771dcab3408a763e这个commit位置:

1
git checkout 8235c82fea2130534a16fd20771dcab3408a763e

我们来分析一下代码,首先看master进程是如何封装数据然后发送给worker进程的。在函数process_send_packet里面,我们看核心的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static int process_send_packet(swServer *serv, swPipeBuffer *buf, swSendData *resp, send_func_t _send, void* private_data)
{
const char* data = resp->data;
uint32_t send_n = resp->info.len;
off_t offset = 0;

uint32_t max_length = serv->ipc_max_size - sizeof(buf->info);

if (send_n <= max_length)
{
buf->info.flags = 0;
buf->info.len = send_n;
memcpy(buf->data, data, send_n);

int retval = _send(serv, buf, sizeof(buf->info) + send_n, private_data);
return retval;
}

buf->info.flags = SW_EVENT_DATA_CHUNK;

while (send_n > 0)
{
if (send_n > max_length)
{
buf->info.len = max_length;
}
else
{
buf->info.flags |= SW_EVENT_DATA_END;
buf->info.len = send_n;
}

memcpy(buf->data, data + offset, buf->info.len);

if (_send(serv, buf, sizeof(buf->info) + buf->info.len, private_data) < 0)
{
return SW_ERR;
}

send_n -= buf->info.len;
offset += buf->info.len;
}

return SW_OK;
}

首先,我们来说一下process_send_packet这个函数的参数:

其中,

swServer *serv就是我们创建的那个Server

swPipeBuffer *buf指向的内存里面的数据需要发送给worker进程。

swSendData *resp里面存放了master进程收到的客户端数据以及一个swDataHead info头部。

_send是一个回调函数,这里面的逻辑就是master进程把swPipeBuffer *buf里面的数据发送给worker进程。

void* private_data这里是一个swWorker *worker类型的指针转换过来的。指定了master进程需要发送的那个worker进程。

说明一点,这里我们是以Server设置了eof选项为例子讲解的(假设设置了"\r\n")。因为TCP是面向字节流的,即使客户端发送了一个很大的包过来,服务器一次read出来的数据也不见得非常大。如果不设置eof的话,是不会导致我们这篇文章所说的性能问题。

介绍完了process_send_packet函数的参数之后,我们来看看代码是如何实现的:

1
const char* data = resp->data;

首先,让data指向resp->data,也就是客户端发来的实际数据。例如,客户端发来了字符串hello world\r\n,那么data里面存放的就是hello world\r\n

1
uint32_t send_n = resp->info.len;

标志着resp->data数据的长度。例如,客户端往服务器发送了1M的数据,那么resp->info.len就是1048576

1
off_t offset = 0;

用来标志哪些数据master进程已经发送给了worker进程。

1
uint32_t max_length = serv->ipc_max_size - sizeof(buf->info);

max_length表示master进程一次往worker进程发送的包最大长度。

注意:master进程和worker进程是通过udg方式进行通信的。所以,master进程发送多少,worker进程就直接收多少

1
2
3
4
5
6
7
8
9
if (send_n <= max_length)
{
buf->info.flags = 0;
buf->info.len = send_n;
memcpy(buf->data, data, send_n);

int retval = _send(serv, buf, sizeof(buf->info) + send_n, private_data);
return retval;
}

如果master进程要发给worker进程的数据小于max_length,那么就直接调用_send函数,直接把数据发给worker进程。

1
buf->info.flags = SW_EVENT_DATA_CHUNK;

send_n大于max_length的时候,设置buf->info.flagsCHUNK,也就意味着需要把客户端发来的数据先拆分成一小段一小段的数据,然后再发送给worker进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
while (send_n > 0)
{
if (send_n > max_length)
{
buf->info.len = max_length;
}
else
{
buf->info.flags |= SW_EVENT_DATA_END;
buf->info.len = send_n;
}

memcpy(buf->data, data + offset, buf->info.len);

if (_send(serv, buf, sizeof(buf->info) + buf->info.len, private_data) < 0)
{
return SW_ERR;
}

send_n -= buf->info.len;
offset += buf->info.len;
}

逻辑比较简单,就是一个分段发送的过程。这里需要注意的两点:

1
2
1、buf->info.len的长度需要更新为小段的chunk的长度,而不是大数据包的长度
2、最后一个chunk的info.flags需要变成SW_EVENT_DATA_END,意味着一个完整的包已经发完了

OK,分析完了master进程发包的过程,我们来分析一下worker进程收包的过程。

我们先看一下函数swWorker_onPipeReceive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static int swWorker_onPipeReceive(swReactor *reactor, swEvent *event)
{
swServer *serv = (swServer *) reactor->ptr;
swFactory *factory = &serv->factory;
swPipeBuffer *buffer = serv->pipe_buffers[0];
int ret;

_read_from_pipe:

if (read(event->fd, buffer, serv->ipc_max_size) > 0)
{
ret = swWorker_onTask(factory, (swEventData *) buffer);
if (buffer->info.flags & SW_EVENT_DATA_CHUNK)
{
//no data
if (ret < 0 && errno == EAGAIN)
{
return SW_OK;
}
else if (ret > 0)
{
goto _read_from_pipe;
}
}
return ret;
}

return SW_ERR;
}

这个就是worker进程接收master进程发来的数据的代码。

我们看的,worker进程会直接把数据先读取到buffer内存里面,然后调用swWorker_onTask。我们再来看看swWorker_onTask函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
int swWorker_onTask(swFactory *factory, swEventData *task)
{
swServer *serv = (swServer *) factory->ptr;
swWorker *worker = SwooleWG.worker;

//worker busy
worker->status = SW_WORKER_BUSY;
//packet chunk
if (task->info.flags & SW_EVENT_DATA_CHUNK)
{
if (serv->merge_chunk(serv, task->info.reactor_id, task->data, task->info.len) < 0)
{
swoole_error_log(SW_LOG_WARNING, SW_ERROR_SESSION_DISCARD_DATA,
"cannot merge chunk to worker buffer, data[fd=%d, size=%d] lost", task->info.fd, task->info.len);
return SW_OK;
}
//wait more data
if (!(task->info.flags & SW_EVENT_DATA_END))
{
return SW_OK;
}
}

switch (task->info.type)
{
case SW_SERVER_EVENT_SEND_DATA:
//discard data
if (swWorker_discard_data(serv, task) == SW_TRUE)
{
break;
}
swWorker_do_task(serv, worker, task, serv->onReceive);
break;
// 省略其他的case
default:
swWarn("[Worker] error event[type=%d]", (int )task->info.type);
break;
}

//worker idle
worker->status = SW_WORKER_IDLE;

//maximum number of requests, process will exit.
if (!SwooleWG.run_always && worker->request_count >= SwooleWG.max_request)
{
swWorker_stop(worker);
}
return SW_OK;
}

我们重点看看性能问题代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if (task->info.flags & SW_EVENT_DATA_CHUNK)
{
if (serv->merge_chunk(serv, task->info.reactor_id, task->data, task->info.len) < 0)
{
swoole_error_log(SW_LOG_WARNING, SW_ERROR_SESSION_DISCARD_DATA,
"cannot merge chunk to worker buffer, data[fd=%d, size=%d] lost", task->info.fd, task->info.len);
return SW_OK;
}
//wait more data
if (!(task->info.flags & SW_EVENT_DATA_END))
{
return SW_OK;
}
}

这里,worker进程会先判断master发来的数据是否是CHUNK数据,如果是,那么会进行merge_chunk的操作。我们看看merge_chunk对应的函数:

1
2
3
4
5
6
static int swServer_worker_merge_chunk(swServer *serv, int key, const char *data, size_t len)
{
swString *package = swServer_worker_get_input_buffer(serv, key);
//merge data to package buffer
return swString_append_ptr(package, data, len);
}

我们会先根据key的值(实际上是reactor线程的id),获取一块全局的内存,然后把接收到的chunk数据,追加到这个全局的内存上面,而swString_append_ptr执行的就是memcpy的操作。

所以,这就是一个性能问题了。worker进程接收到的所有数据都会被完整的拷贝一遍。如果客户端发来的数据很大,这个拷贝的开销也是很大的。

我们再看看Swoole内核是如何把data提供给PHP应用层的,主要函数是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void php_swoole_get_recv_data(swServer *serv, zval *zdata, swEventData *req, char *header, uint32_t header_length)
{
char *data = NULL;

size_t length = serv->get_packet(serv, req, &data);
if (header_length >= length)
{
ZVAL_EMPTY_STRING(zdata);
}
else
{
ZVAL_STRINGL(zdata, data + header_length, length - header_length);
}
if (header_length > 0)
{
memcpy(header, data, header_length);
}
}

程序会进入以下代码:

1
ZVAL_STRINGL(zdata, data + header_length, length - header_length);

这个地方是通过ZVAL_STRINGL来创建zend_string的,也就意味着PHP底层会把Swoole内核中存储的data完整的拷贝一份到zend_string里面。然后再让zdata(也就是PHP应用层会用到的data)的zend_value指针指向这个zend_string。这里,又多了一次完整的内存拷贝。

上述过程我们可以通过下面这幅图总结:

优化前

因此,我们对这部分合并的代码进行了一个优化。我们让worker进程在接收master进程的数据之前,就准备好一块足够大的内存,然后直接用这块内存把master进程发来的数据接收即可。

优化一

我们先更新一下swoole-src的源码:

1
git checkout 529ad44d578930b3607abedcfc278364df34bc73

我们依旧先看看process_send_packet函数的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
static int process_send_packet(swServer *serv, swPipeBuffer *buf, swSendData *resp, send_func_t _send, void* private_data)
{
const char* data = resp->data;
uint32_t send_n = resp->info.len;
off_t offset = 0;
uint32_t copy_n;

uint32_t max_length = serv->ipc_max_size - sizeof(buf->info);

if (send_n <= max_length)
{
buf->info.flags = 0;
buf->info.len = send_n;
memcpy(buf->data, data, send_n);

int retval = _send(serv, buf, sizeof(buf->info) + send_n, private_data);
return retval;
}

buf->info.flags = SW_EVENT_DATA_CHUNK;
buf->info.len = send_n;

while (send_n > 0)
{
if (send_n > max_length)
{
copy_n = max_length;
}
else
{
buf->info.flags |= SW_EVENT_DATA_END;
copy_n = send_n;
}

memcpy(buf->data, data + offset, copy_n);

swTrace("finish, type=%d|len=%d", buf->info.type, copy_n);

if (_send(serv, buf, sizeof(buf->info) + copy_n, private_data) < 0)
{
return SW_ERR;
}

send_n -= copy_n;
offset += copy_n;
}

return SW_OK;
}

我们聚焦修改的地方,主要是对CHUNK的处理:

1
2
buf->info.flags = SW_EVENT_DATA_CHUNK;
buf->info.len = send_n;

我们发现,buf->info.len的长度不是每个小段chunk的长度了,而是整个大包的长度了。为什么可以这样做呢?因为master进程与worker进程是通过udg进行通信的,所以,worker进程在调用recv的时候,返回值实际上就是chunk的长度了,所以buf->info.len里面存储chunk的长度没有必要。

其他地方的逻辑和之前的代码没有区别。

我们再来看看worker进程是如何接收master进程发来的数据的。在函数swWorker_onPipeReceive里面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
static int swWorker_onPipeReceive(swReactor *reactor, swEvent *event)
{
int ret;
ssize_t recv_n = 0;
swServer *serv = (swServer *) reactor->ptr;
swFactory *factory = &serv->factory;
swPipeBuffer *pipe_buffer = serv->pipe_buffers[0];
void *buffer;
struct iovec buffers[2];

// peek
recv_n = recv(event->fd, &pipe_buffer->info, sizeof(pipe_buffer->info), MSG_PEEK);
if (recv_n < 0 && errno == EAGAIN)
{
return SW_OK;
}
else if (recv_n < 0)
{
return SW_ERR;
}

if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK)
{
buffer = serv->get_buffer(serv, &pipe_buffer->info);
_read_from_pipe:

buffers[0].iov_base = &pipe_buffer->info;
buffers[0].iov_len = sizeof(pipe_buffer->info);
buffers[1].iov_base = buffer;
buffers[1].iov_len = serv->ipc_max_size - sizeof(pipe_buffer->info);

recv_n = readv(event->fd, buffers, 2);
if (recv_n < 0 && errno == EAGAIN)
{
return SW_OK;
}
if (recv_n > 0)
{
serv->add_buffer_len(serv, &pipe_buffer->info, recv_n - sizeof(pipe_buffer->info));
}

if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK)
{
//wait more chunk data
if (!(pipe_buffer->info.flags & SW_EVENT_DATA_END))
{
goto _read_from_pipe;
}
else
{
pipe_buffer->info.flags |= SW_EVENT_DATA_OBJ_PTR;
/**
* Because we don't want to split the swEventData parameters into swDataHead and data,
* we store the value of the worker_buffer pointer in swEventData.data.
* The value of this pointer will be fetched in the swServer_worker_get_packet function.
*/
serv->copy_buffer_addr(serv, pipe_buffer);
}
}
}
else
{
recv_n = read(event->fd, pipe_buffer, serv->ipc_max_size);
}

if (recv_n > 0)
{
ret = swWorker_onTask(factory, (swEventData *) pipe_buffer, recv_n - sizeof(pipe_buffer->info));
return ret;
}

return SW_ERR;
}

其中,

1
2
3
4
5
6
7
8
9
recv_n = recv(event->fd, &pipe_buffer->info, sizeof(pipe_buffer->info), MSG_PEEK);
if (recv_n < 0 && errno == EAGAIN)
{
return SW_OK;
}
else if (recv_n < 0)
{
return SW_ERR;
}

我们先对内核缓冲区里面的数据进行一次peek操作,来获取到head部分。这样我们就知道数据是否是以CHUNK方式发来的了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK)
{
buffer = serv->get_buffer(serv, &pipe_buffer->info);
_read_from_pipe:

buffers[0].iov_base = &pipe_buffer->info;
buffers[0].iov_len = sizeof(pipe_buffer->info);
buffers[1].iov_base = buffer;
buffers[1].iov_len = serv->ipc_max_size - sizeof(pipe_buffer->info);

recv_n = readv(event->fd, buffers, 2);
if (recv_n < 0 && errno == EAGAIN)
{
return SW_OK;
}
if (recv_n > 0)
{
serv->add_buffer_len(serv, &pipe_buffer->info, recv_n - sizeof(pipe_buffer->info));
}

if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK)
{
//wait more chunk data
if (!(pipe_buffer->info.flags & SW_EVENT_DATA_END))
{
goto _read_from_pipe;
}
else
{
pipe_buffer->info.flags |= SW_EVENT_DATA_OBJ_PTR;
/**
* Because we don't want to split the swEventData parameters into swDataHead and data,
* we store the value of the worker_buffer pointer in swEventData.data.
* The value of this pointer will be fetched in the swServer_worker_get_packet function.
*/
serv->copy_buffer_addr(serv, pipe_buffer);
}
}
}

如果是CHUNK方式发来的数据,那么我们执行如下的操作:

1
buffer = serv->get_buffer(serv, &pipe_buffer->info);

get_buffer是一个回调函数,对应:

1
2
3
4
5
6
7
8
9
10
11
static void* swServer_worker_get_buffer(swServer *serv, swDataHead *info)
{
swString *worker_buffer = swServer_worker_get_input_buffer(serv, info->reactor_id);

if (worker_buffer->size < info->len)
{
swString_extend(worker_buffer, info->len);
}

return worker_buffer->str + worker_buffer->length;
}

这里,我们会先判断这块全局的buffer是否足够的大,可以接收完整个大包,如果不够大,我们扩容到足够的大。

1
2
3
4
5
6
7
8
_read_from_pipe:

buffers[0].iov_base = &pipe_buffer->info;
buffers[0].iov_len = sizeof(pipe_buffer->info);
buffers[1].iov_base = buffer;
buffers[1].iov_len = serv->ipc_max_size - sizeof(pipe_buffer->info);

recv_n = readv(event->fd, buffers, 2);

然后,我们调用readv,把head和实际的数据分别存在了两个地方。这么做是避免为了把head和实际的数据做拆分而导致的内存拷贝。

通过以上方式,Swoole Server减少了一次内存拷贝。

上述过程我们可以通过下面这幅图总结:

第一次优化

从图中我们可以看出,步骤2到步骤3这里还是会有一次完整的拷贝,我们也把它给优化掉了。我们来看优化后的代码。

优化二

我们先更新一下swoole-src的源码:

1
git checkout 5278bb30c9b6b84753fa1950cef3226f1cfb515c

master进程发送数据到worker进程的代码没有变化,主要是worker进程这边对接收buffer处理的变化。我们会发现函数swWorker_onPipeReceive没有任何改动,改动的是处理buffer的几个回调函数。我们一一来看下。

首先是函数指针swServer::get_buffer对应了函数php_swoole_server_worker_get_buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
static void* php_swoole_server_worker_get_buffer(swServer *serv, swDataHead *info)
{
zend_string *worker_buffer = php_swoole_server_worker_get_input_buffer(serv, info->reactor_id);

if (worker_buffer == NULL)
{
worker_buffer = zend_string_alloc(info->len, 0);
worker_buffer->len = 0;
php_swoole_server_worker_set_buffer(serv, info, worker_buffer);
}

return worker_buffer->val + worker_buffer->len;
}

这里先调用函数php_swoole_server_worker_get_input_buffer来获取接收master进程发来数据的buffer。如果说没有获取到,那么说明我们之前的worker_buffer没有创建或者接收完所有的数据之后被销毁了,此时我们需要通过函数zend_string_alloc分配一块内存。这里需要注意的一个地方就是,info->lenmaster进程发送给worker进程的总长度,也就意味着我们需要把zend_stringlen手动初始化为0。毕竟zend_string没有offest这个成员,所以这里我们只能够把len当作offset来用了。

获取到zend_string这块worker buffer之后,我们就可以通过readv来读取master进程发送给worker进程的数据了。

获取完数据之后,我们调用swServer::add_buffer_len函数指针对应的php_swoole_server_worker_add_buffer_len这个函数来增加偏移量:

1
2
3
4
5
static void php_swoole_server_worker_add_buffer_len(swServer *serv, swDataHead *info, size_t len)
{
zend_string *worker_buffer = php_swoole_server_worker_get_input_buffer(serv, info->reactor_id);
worker_buffer->len += len;
}

当我们接收完master进程发送过来的所有数据之后,我们调用swServer::copy_buffer_addr函数指针对应的php_swoole_server_worker_copy_buffer_addr来把zend_string的地址拷贝到swPipeBuffer::data里面。

这样,我们通过函数指针的简单替换,实现了C层面的bufferPHP层面的buffer切换。

最后,我们需要把接收到的数据,也就是zend_string里面的数据提供给PHP应用层。我们来看看swoole_websocket_onMessage这个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
int swoole_websocket_onMessage(swServer *serv, swEventData *req)
{
int fd = req->info.fd;
uchar flags = 0;
zend_long opcode = 0;

zval zdata;
char frame_header[2];
memcpy(frame_header, &req->info.ext_flags, sizeof(frame_header));

php_swoole_get_recv_data(serv, &zdata, req);

// frame info has already decoded in swWebSocket_dispatch_frame
flags = frame_header[0];
opcode = frame_header[1];

if (opcode == WEBSOCKET_OPCODE_CLOSE && !serv->listen_list->open_websocket_close_frame)
{
zval_ptr_dtor(&zdata);
return SW_OK;
}

#ifdef SW_HAVE_ZLIB
/**
* RFC 7692
*/
if (serv->websocket_compression && (flags & SW_WEBSOCKET_FLAG_RSV1))
{
swString_clear(swoole_zlib_buffer);
if (!websocket_message_uncompress(swoole_zlib_buffer, Z_STRVAL(zdata), Z_STRLEN(zdata)))
{
zval_ptr_dtor(&zdata);
return SW_OK;
}
zval_ptr_dtor(&zdata);
ZVAL_STRINGL(&zdata, swoole_zlib_buffer->str, swoole_zlib_buffer->length);
flags ^= (SW_WEBSOCKET_FLAG_RSV1 | SW_WEBSOCKET_FLAG_COMPRESS);
}
#endif

zend_fcall_info_cache *fci_cache = php_swoole_server_get_fci_cache(serv, req->info.server_fd, SW_SERVER_CB_onMessage);
zval args[2];

args[0] = *(zval *) serv->ptr2;
php_swoole_websocket_construct_frame(&args[1], opcode, Z_STRVAL(zdata), Z_STRLEN(zdata), flags);
zend_update_property_long(swoole_websocket_frame_ce, &args[1], ZEND_STRL("fd"), fd);

if (UNEXPECTED(!zend::function::call(fci_cache, 2, args, NULL, SwooleG.enable_coroutine)))
{
php_swoole_error(E_WARNING, "%s->onMessage handler error", ZSTR_VAL(swoole_websocket_server_ce->name));
serv->close(serv, fd, 0);
}

zval_ptr_dtor(&zdata);
zval_ptr_dtor(&args[1]);

return SW_OK;
}

其中,php_swoole_get_recv_data这个函数就是用来获取zend_string数据的,我们分析下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void php_swoole_get_recv_data(swServer *serv, zval *zdata, swEventData *req)
{
char *data = NULL;
zend_string *worker_buffer;

size_t length = serv->get_packet(serv, req, &data);
if (length == 0)
{
ZVAL_EMPTY_STRING(zdata);
}
else
{
if (req->info.flags & SW_EVENT_DATA_OBJ_PTR)
{
worker_buffer = (zend_string *) (data - XtOffsetOf(zend_string, val));
ZVAL_STR(zdata, worker_buffer);
}
else
{
ZVAL_STRINGL(zdata, data, length);
}
}
}

因为在swWorker_onPipeReceive函数里面把req->info.flags设置为了SW_EVENT_DATA_OBJ_PTR,所以函数会执行以下代码:

1
2
3
4
5
if (req->info.flags & SW_EVENT_DATA_OBJ_PTR)
{
worker_buffer = (zend_string *) (data - XtOffsetOf(zend_string, val));
ZVAL_STR(zdata, worker_buffer);
}

其中,zdata就是PHP应用层会使用到的data,而ZVAL_STR是让zdata里面的zend_value直接指向了worker_buffer,没有任何的内存拷贝。

通过以上方式,Swoole Server再次减少了一次内存拷贝。

上述过程我们可以通过下面这幅图总结:

第二次优化

最终,我们把4次内存拷贝降低到了1次,因此onMessage回调函数性能提升了4倍。

性能对比

我们的压测程序如下,Server代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?php

use Swoole\WebSocket\Server;

$start = microtime(true);

$server = new Server("0.0.0.0", 9501);

$server->set([
"worker_num" => 1,
'package_max_length' => 1024 * 1024 * 4,
]);

$server->on('open', function (Server $server, $request) {
});

$server->on('message', function (Server $server, $frame) use ($start) {
if ($frame->data === "end") {
$end = microtime(true);
var_dump(($end - $start) * 1000);
}
});

$server->on('close', function ($ser, $fd) {
echo "client {$fd} closed\n";
});

$server->start();

Client代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php

use Swoole\Coroutine;
use Swoole\Coroutine\Http\Client;

use function Co\run;

run(function () {
$cli = new Client("127.0.0.1", 9501);
$ret = $cli->upgrade("/websocket");

if (!$ret) {
echo "ERROR\n";
return;
}

for ($i = 0; $i < 2000; $i++) {
$cli->push(str_repeat('a', 0.5 * 1024 * 1024));
}

$cli->push("end");
sleep(100000);
});

内存拷贝:

内存拷贝

CPU使用率:

CPU使用率

总结

  1. 本次优化主要是通过减少内存拷贝来提升Server的性能,这也是服务器优化的重点。

  2. 本次优化我们通过设计Buffer的接口,来达到切换Swoole内核层BufferPHPBuffer的目的。首先,通过调用get_buffer来获取到接收数据的zend_string(接收第一个Chunk前,都会分配一个新的zend_string,因为PHP底层会通过内存池来管理,所以分配zend_string的性能开销不用太担心);当接收完数据的时候,调用add_buffer_len来更新zend_stringlen,也就是我们的offset;当我们接收完所有的Chunk之后,调用copy_buffer_addr来保存zend_string的地址。最后,我们通过ZVAL_STR来设置PHP应用层要用的data

  3. 本次优化我们使用了readv代替read。这么做的目的是,Swoole进程间通信的时候,使用了一个header来保存data的信息,例如数据是哪个reactor线程发来的,发送的数据大小是多少。但是,这个header对于PHP应用层来说是无需关心的,所以,我们就需要把本来连续的header:data内存单独分开了,把所有的data拼接起来。但是,这样就避免不了对data的拷贝了。所以我们需要通过readv来在接收数据的时候就分离headerdata。但是,因为readv是需要指定每一段buffer的长度的,所以我们在接收第一个Chunk之前,需要调用readpeek方法来获取到header的长度,但是这个系统调用的开销是非常小的。