前言 在Swoole4.5
版本中(目前还未发布),我们的Server
有一个性能需要优化的地方,就是worker
进程在收到master
进程发来的包的时候,需要进行两次的拷贝,才可以把数据从PHP
扩展层传递到PHP
上层(也就是我们事件回调函数需要拿到的data
)。
优化前 我们先来分析一下为什么会有性能的问题。首先,我们需要一份会有性能问题的代码。我们git clone
下swoole-src
代码,然后git checkout
到8235c82fea2130534a16fd20771dcab3408a763e
这个commit
位置:
1 git checkout 8235c82fea2130534a16fd20771dcab3408a763e
我们来分析一下代码,首先看master
进程是如何封装数据然后发送给worker
进程的。在函数process_send_packet
里面,我们看核心的地方:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 static int process_send_packet (swServer *serv, swPipeBuffer *buf, swSendData *resp, send_func_t _send, void * private_data) { const char * data = resp->data; uint32_t send_n = resp->info.len; off_t offset = 0 ; uint32_t max_length = serv->ipc_max_size - sizeof (buf->info); if (send_n <= max_length) { buf->info.flags = 0 ; buf->info.len = send_n; memcpy (buf->data, data, send_n); int retval = _send(serv, buf, sizeof (buf->info) + send_n, private_data); return retval; } buf->info.flags = SW_EVENT_DATA_CHUNK; while (send_n > 0 ) { if (send_n > max_length) { buf->info.len = max_length; } else { buf->info.flags |= SW_EVENT_DATA_END; buf->info.len = send_n; } memcpy (buf->data, data + offset, buf->info.len); if (_send(serv, buf, sizeof (buf->info) + buf->info.len, private_data) < 0 ) { return SW_ERR; } send_n -= buf->info.len; offset += buf->info.len; } return SW_OK; }
首先,我们来说一下process_send_packet
这个函数的参数:
其中,
swServer *serv
就是我们创建的那个Server
。
swPipeBuffer *buf
指向的内存里面的数据需要发送给worker
进程。
swSendData *resp
里面存放了master
进程收到的客户端数据以及一个swDataHead info
头部。
_send
是一个回调函数,这里面的逻辑就是master
进程把swPipeBuffer *buf
里面的数据发送给worker
进程。
void* private_data
这里是一个swWorker *worker
类型的指针转换过来的。指定了master
进程需要发送的那个worker
进程。
说明一点,这里我们是以Server
设置了eof
选项为例子讲解的(假设设置了"\r\n"
)。因为TCP
是面向字节流的,即使客户端发送了一个很大的包过来,服务器一次read
出来的数据也不见得非常大。如果不设置eof
的话,是不会导致我们这篇文章所说的性能问题。
介绍完了process_send_packet
函数的参数之后,我们来看看代码是如何实现的:
1 const char * data = resp->data;
首先,让data
指向resp->data
,也就是客户端发来的实际数据。例如,客户端发来了字符串hello world\r\n
,那么data
里面存放的就是hello world\r\n
。
1 uint32_t send_n = resp->info.len;
标志着resp->data
数据的长度。例如,客户端往服务器发送了1M
的数据,那么resp->info.len
就是1048576
。
用来标志哪些数据master
进程已经发送给了worker
进程。
1 uint32_t max_length = serv->ipc_max_size - sizeof (buf->info);
max_length
表示master
进程一次往worker
进程发送的包最大长度。
注意:master
进程和worker
进程是通过udg
方式进行通信的。所以,master
进程发送多少,worker
进程就直接收多少
1 2 3 4 5 6 7 8 9 if (send_n <= max_length){ buf->info.flags = 0 ; buf->info.len = send_n; memcpy (buf->data, data, send_n); int retval = _send(serv, buf, sizeof (buf->info) + send_n, private_data); return retval; }
如果master
进程要发给worker
进程的数据小于max_length
,那么就直接调用_send
函数,直接把数据发给worker
进程。
1 buf->info.flags = SW_EVENT_DATA_CHUNK;
当send_n
大于max_length
的时候,设置buf->info.flags
为CHUNK
,也就意味着需要把客户端发来的数据先拆分成一小段一小段的数据,然后再发送给worker
进程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 while (send_n > 0 ){ if (send_n > max_length) { buf->info.len = max_length; } else { buf->info.flags |= SW_EVENT_DATA_END; buf->info.len = send_n; } memcpy (buf->data, data + offset, buf->info.len); if (_send(serv, buf, sizeof (buf->info) + buf->info.len, private_data) < 0 ) { return SW_ERR; } send_n -= buf->info.len; offset += buf->info.len; }
逻辑比较简单,就是一个分段发送的过程。这里需要注意的两点:
1 2 1、buf->info.len的长度需要更新为小段的chunk的长度,而不是大数据包的长度 2、最后一个chunk的info.flags需要变成SW_EVENT_DATA_END,意味着一个完整的包已经发完了
OK
,分析完了master
进程发包的过程,我们来分析一下worker
进程收包的过程。
我们先看一下函数swWorker_onPipeReceive
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 static int swWorker_onPipeReceive (swReactor *reactor, swEvent *event) { swServer *serv = (swServer *) reactor->ptr; swFactory *factory = &serv->factory; swPipeBuffer *buffer = serv->pipe_buffers[0 ]; int ret; _read_from_pipe: if (read(event->fd, buffer, serv->ipc_max_size) > 0 ) { ret = swWorker_onTask(factory, (swEventData *) buffer); if (buffer->info.flags & SW_EVENT_DATA_CHUNK) { if (ret < 0 && errno == EAGAIN) { return SW_OK; } else if (ret > 0 ) { goto _read_from_pipe; } } return ret; } return SW_ERR; }
这个就是worker
进程接收master
进程发来的数据的代码。
我们看的,worker
进程会直接把数据先读取到buffer
内存里面,然后调用swWorker_onTask
。我们再来看看swWorker_onTask
函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 int swWorker_onTask (swFactory *factory, swEventData *task) { swServer *serv = (swServer *) factory->ptr; swWorker *worker = SwooleWG.worker; worker->status = SW_WORKER_BUSY; if (task->info.flags & SW_EVENT_DATA_CHUNK) { if (serv->merge_chunk(serv, task->info.reactor_id, task->data, task->info.len) < 0 ) { swoole_error_log(SW_LOG_WARNING, SW_ERROR_SESSION_DISCARD_DATA, "cannot merge chunk to worker buffer, data[fd=%d, size=%d] lost" , task->info.fd, task->info.len); return SW_OK; } if (!(task->info.flags & SW_EVENT_DATA_END)) { return SW_OK; } } switch (task->info.type) { case SW_SERVER_EVENT_SEND_DATA: if (swWorker_discard_data(serv, task) == SW_TRUE) { break ; } swWorker_do_task(serv, worker, task, serv->onReceive); break ; default : swWarn("[Worker] error event[type=%d]" , (int )task->info.type); break ; } worker->status = SW_WORKER_IDLE; if (!SwooleWG.run_always && worker->request_count >= SwooleWG.max_request) { swWorker_stop(worker); } return SW_OK; }
我们重点看看性能问题代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 if (task->info.flags & SW_EVENT_DATA_CHUNK){ if (serv->merge_chunk(serv, task->info.reactor_id, task->data, task->info.len) < 0 ) { swoole_error_log(SW_LOG_WARNING, SW_ERROR_SESSION_DISCARD_DATA, "cannot merge chunk to worker buffer, data[fd=%d, size=%d] lost" , task->info.fd, task->info.len); return SW_OK; } if (!(task->info.flags & SW_EVENT_DATA_END)) { return SW_OK; } }
这里,worker
进程会先判断master
发来的数据是否是CHUNK
数据,如果是,那么会进行merge_chunk
的操作。我们看看merge_chunk
对应的函数:
1 2 3 4 5 6 static int swServer_worker_merge_chunk (swServer *serv, int key, const char *data, size_t len) { swString *package = swServer_worker_get_input_buffer(serv, key); return swString_append_ptr(package, data, len); }
我们会先根据key
的值(实际上是reactor
线程的id
),获取一块全局的内存,然后把接收到的chunk
数据,追加到这个全局的内存上面,而swString_append_ptr
执行的就是memcpy
的操作。
所以,这就是一个性能问题了。worker
进程接收到的所有数据都会被完整的拷贝一遍。如果客户端发来的数据很大,这个拷贝的开销也是很大的。
我们再看看Swoole
内核是如何把data
提供给PHP
应用层的,主要函数是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 void php_swoole_get_recv_data (swServer *serv, zval *zdata, swEventData *req, char *header, uint32_t header_length) { char *data = NULL ; size_t length = serv->get_packet(serv, req, &data); if (header_length >= length) { ZVAL_EMPTY_STRING(zdata); } else { ZVAL_STRINGL(zdata, data + header_length, length - header_length); } if (header_length > 0 ) { memcpy (header, data, header_length); } }
程序会进入以下代码:
1 ZVAL_STRINGL(zdata, data + header_length, length - header_length);
这个地方是通过ZVAL_STRINGL
来创建zend_string
的,也就意味着PHP
底层会把Swoole
内核中存储的data
完整的拷贝一份到zend_string
里面。然后再让zdata
(也就是PHP
应用层会用到的data
)的zend_value
指针指向这个zend_string
。这里,又多了一次完整的内存拷贝。
上述过程我们可以通过下面这幅图总结:
因此,我们对这部分合并的代码进行了一个优化。我们让worker
进程在接收master
进程的数据之前,就准备好一块足够大的内存,然后直接用这块内存把master
进程发来的数据接收即可。
优化一 我们先更新一下swoole-src
的源码:
1 git checkout 529ad44d578930b3607abedcfc278364df34bc73
我们依旧先看看process_send_packet
函数的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 static int process_send_packet (swServer *serv, swPipeBuffer *buf, swSendData *resp, send_func_t _send, void * private_data) { const char * data = resp->data; uint32_t send_n = resp->info.len; off_t offset = 0 ; uint32_t copy_n; uint32_t max_length = serv->ipc_max_size - sizeof (buf->info); if (send_n <= max_length) { buf->info.flags = 0 ; buf->info.len = send_n; memcpy (buf->data, data, send_n); int retval = _send(serv, buf, sizeof (buf->info) + send_n, private_data); return retval; } buf->info.flags = SW_EVENT_DATA_CHUNK; buf->info.len = send_n; while (send_n > 0 ) { if (send_n > max_length) { copy_n = max_length; } else { buf->info.flags |= SW_EVENT_DATA_END; copy_n = send_n; } memcpy (buf->data, data + offset, copy_n); swTrace("finish, type=%d|len=%d" , buf->info.type, copy_n); if (_send(serv, buf, sizeof (buf->info) + copy_n, private_data) < 0 ) { return SW_ERR; } send_n -= copy_n; offset += copy_n; } return SW_OK; }
我们聚焦修改的地方,主要是对CHUNK
的处理:
1 2 buf->info.flags = SW_EVENT_DATA_CHUNK; buf->info.len = send_n;
我们发现,buf->info.len
的长度不是每个小段chunk
的长度了,而是整个大包的长度了。为什么可以这样做呢?因为master
进程与worker
进程是通过udg
进行通信的,所以,worker
进程在调用recv
的时候,返回值实际上就是chunk
的长度了,所以buf->info.len
里面存储chunk
的长度没有必要。
其他地方的逻辑和之前的代码没有区别。
我们再来看看worker
进程是如何接收master
进程发来的数据的。在函数swWorker_onPipeReceive
里面:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 static int swWorker_onPipeReceive (swReactor *reactor, swEvent *event) { int ret; ssize_t recv_n = 0 ; swServer *serv = (swServer *) reactor->ptr; swFactory *factory = &serv->factory; swPipeBuffer *pipe_buffer = serv->pipe_buffers[0 ]; void *buffer; struct iovec buffers [2]; recv_n = recv(event->fd, &pipe_buffer->info, sizeof (pipe_buffer->info), MSG_PEEK); if (recv_n < 0 && errno == EAGAIN) { return SW_OK; } else if (recv_n < 0 ) { return SW_ERR; } if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK) { buffer = serv->get_buffer(serv, &pipe_buffer->info); _read_from_pipe: buffers[0 ].iov_base = &pipe_buffer->info; buffers[0 ].iov_len = sizeof (pipe_buffer->info); buffers[1 ].iov_base = buffer; buffers[1 ].iov_len = serv->ipc_max_size - sizeof (pipe_buffer->info); recv_n = readv(event->fd, buffers, 2 ); if (recv_n < 0 && errno == EAGAIN) { return SW_OK; } if (recv_n > 0 ) { serv->add_buffer_len(serv, &pipe_buffer->info, recv_n - sizeof (pipe_buffer->info)); } if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK) { if (!(pipe_buffer->info.flags & SW_EVENT_DATA_END)) { goto _read_from_pipe; } else { pipe_buffer->info.flags |= SW_EVENT_DATA_OBJ_PTR; serv->copy_buffer_addr(serv, pipe_buffer); } } } else { recv_n = read(event->fd, pipe_buffer, serv->ipc_max_size); } if (recv_n > 0 ) { ret = swWorker_onTask(factory, (swEventData *) pipe_buffer, recv_n - sizeof (pipe_buffer->info)); return ret; } return SW_ERR; }
其中,
1 2 3 4 5 6 7 8 9 recv_n = recv(event->fd, &pipe_buffer->info, sizeof (pipe_buffer->info), MSG_PEEK); if (recv_n < 0 && errno == EAGAIN){ return SW_OK; } else if (recv_n < 0 ){ return SW_ERR; }
我们先对内核缓冲区里面的数据进行一次peek
操作,来获取到head
部分。这样我们就知道数据是否是以CHUNK
方式发来的了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK){ buffer = serv->get_buffer(serv, &pipe_buffer->info); _read_from_pipe: buffers[0 ].iov_base = &pipe_buffer->info; buffers[0 ].iov_len = sizeof (pipe_buffer->info); buffers[1 ].iov_base = buffer; buffers[1 ].iov_len = serv->ipc_max_size - sizeof (pipe_buffer->info); recv_n = readv(event->fd, buffers, 2 ); if (recv_n < 0 && errno == EAGAIN) { return SW_OK; } if (recv_n > 0 ) { serv->add_buffer_len(serv, &pipe_buffer->info, recv_n - sizeof (pipe_buffer->info)); } if (pipe_buffer->info.flags & SW_EVENT_DATA_CHUNK) { if (!(pipe_buffer->info.flags & SW_EVENT_DATA_END)) { goto _read_from_pipe; } else { pipe_buffer->info.flags |= SW_EVENT_DATA_OBJ_PTR; serv->copy_buffer_addr(serv, pipe_buffer); } } }
如果是CHUNK
方式发来的数据,那么我们执行如下的操作:
1 buffer = serv->get_buffer(serv, &pipe_buffer->info);
get_buffer
是一个回调函数,对应:
1 2 3 4 5 6 7 8 9 10 11 static void * swServer_worker_get_buffer (swServer *serv, swDataHead *info) { swString *worker_buffer = swServer_worker_get_input_buffer(serv, info->reactor_id); if (worker_buffer->size < info->len) { swString_extend(worker_buffer, info->len); } return worker_buffer->str + worker_buffer->length; }
这里,我们会先判断这块全局的buffer
是否足够的大,可以接收完整个大包,如果不够大,我们扩容到足够的大。
1 2 3 4 5 6 7 8 _read_from_pipe: buffers[0 ].iov_base = &pipe_buffer->info; buffers[0 ].iov_len = sizeof (pipe_buffer->info); buffers[1 ].iov_base = buffer; buffers[1 ].iov_len = serv->ipc_max_size - sizeof (pipe_buffer->info); recv_n = readv(event->fd, buffers, 2 );
然后,我们调用readv
,把head
和实际的数据分别存在了两个地方。这么做是避免为了把head
和实际的数据做拆分而导致的内存拷贝。
通过以上方式,Swoole Server
减少了一次内存拷贝。
上述过程我们可以通过下面这幅图总结:
从图中我们可以看出,步骤2
到步骤3
这里还是会有一次完整的拷贝,我们也把它给优化掉了。我们来看优化后的代码。
优化二 我们先更新一下swoole-src
的源码:
1 git checkout 5278bb30c9b6b84753fa1950cef3226f1cfb515c
master
进程发送数据到worker
进程的代码没有变化,主要是worker
进程这边对接收buffer
处理的变化。我们会发现函数swWorker_onPipeReceive
没有任何改动,改动的是处理buffer
的几个回调函数。我们一一来看下。
首先是函数指针swServer::get_buffer
对应了函数php_swoole_server_worker_get_buffer
:
1 2 3 4 5 6 7 8 9 10 11 12 13 static void * php_swoole_server_worker_get_buffer (swServer *serv, swDataHead *info) { zend_string *worker_buffer = php_swoole_server_worker_get_input_buffer(serv, info->reactor_id); if (worker_buffer == NULL ) { worker_buffer = zend_string_alloc(info->len, 0 ); worker_buffer->len = 0 ; php_swoole_server_worker_set_buffer(serv, info, worker_buffer); } return worker_buffer->val + worker_buffer->len; }
这里先调用函数php_swoole_server_worker_get_input_buffer
来获取接收master
进程发来数据的buffer
。如果说没有获取到,那么说明我们之前的worker_buffer
没有创建或者接收完所有的数据之后被销毁了,此时我们需要通过函数zend_string_alloc
分配一块内存。这里需要注意的一个地方就是,info->len
是master
进程发送给worker
进程的总长度,也就意味着我们需要把zend_string
的len
手动初始化为0
。毕竟zend_string
没有offest
这个成员,所以这里我们只能够把len
当作offset
来用了。
获取到zend_string
这块worker buffer
之后,我们就可以通过readv
来读取master
进程发送给worker
进程的数据了。
获取完数据之后,我们调用swServer::add_buffer_len
函数指针对应的php_swoole_server_worker_add_buffer_len
这个函数来增加偏移量:
1 2 3 4 5 static void php_swoole_server_worker_add_buffer_len (swServer *serv, swDataHead *info, size_t len) { zend_string *worker_buffer = php_swoole_server_worker_get_input_buffer(serv, info->reactor_id); worker_buffer->len += len; }
当我们接收完master
进程发送过来的所有数据之后,我们调用swServer::copy_buffer_addr
函数指针对应的php_swoole_server_worker_copy_buffer_addr
来把zend_string
的地址拷贝到swPipeBuffer::data
里面。
这样,我们通过函数指针的简单替换,实现了C
层面的buffer
到PHP
层面的buffer
切换。
最后,我们需要把接收到的数据,也就是zend_string
里面的数据提供给PHP
应用层。我们来看看swoole_websocket_onMessage
这个函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 int swoole_websocket_onMessage (swServer *serv, swEventData *req) { int fd = req->info.fd; uchar flags = 0 ; zend_long opcode = 0 ; zval zdata; char frame_header[2 ]; memcpy (frame_header, &req->info.ext_flags, sizeof (frame_header)); php_swoole_get_recv_data(serv, &zdata, req); flags = frame_header[0 ]; opcode = frame_header[1 ]; if (opcode == WEBSOCKET_OPCODE_CLOSE && !serv->listen_list->open_websocket_close_frame) { zval_ptr_dtor(&zdata); return SW_OK; } #ifdef SW_HAVE_ZLIB if (serv->websocket_compression && (flags & SW_WEBSOCKET_FLAG_RSV1)) { swString_clear(swoole_zlib_buffer); if (!websocket_message_uncompress(swoole_zlib_buffer, Z_STRVAL(zdata), Z_STRLEN(zdata))) { zval_ptr_dtor(&zdata); return SW_OK; } zval_ptr_dtor(&zdata); ZVAL_STRINGL(&zdata, swoole_zlib_buffer->str, swoole_zlib_buffer->length); flags ^= (SW_WEBSOCKET_FLAG_RSV1 | SW_WEBSOCKET_FLAG_COMPRESS); } #endif zend_fcall_info_cache *fci_cache = php_swoole_server_get_fci_cache(serv, req->info.server_fd, SW_SERVER_CB_onMessage); zval args[2 ]; args[0 ] = *(zval *) serv->ptr2; php_swoole_websocket_construct_frame(&args[1 ], opcode, Z_STRVAL(zdata), Z_STRLEN(zdata), flags); zend_update_property_long(swoole_websocket_frame_ce, &args[1 ], ZEND_STRL("fd" ), fd); if (UNEXPECTED(!zend::function::call(fci_cache, 2 , args, NULL , SwooleG.enable_coroutine))) { php_swoole_error(E_WARNING, "%s->onMessage handler error" , ZSTR_VAL(swoole_websocket_server_ce->name)); serv->close(serv, fd, 0 ); } zval_ptr_dtor(&zdata); zval_ptr_dtor(&args[1 ]); return SW_OK; }
其中,php_swoole_get_recv_data
这个函数就是用来获取zend_string
数据的,我们分析下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void php_swoole_get_recv_data (swServer *serv, zval *zdata, swEventData *req) { char *data = NULL ; zend_string *worker_buffer; size_t length = serv->get_packet(serv, req, &data); if (length == 0 ) { ZVAL_EMPTY_STRING(zdata); } else { if (req->info.flags & SW_EVENT_DATA_OBJ_PTR) { worker_buffer = (zend_string *) (data - XtOffsetOf(zend_string, val)); ZVAL_STR(zdata, worker_buffer); } else { ZVAL_STRINGL(zdata, data, length); } } }
因为在swWorker_onPipeReceive
函数里面把req->info.flags
设置为了SW_EVENT_DATA_OBJ_PTR
,所以函数会执行以下代码:
1 2 3 4 5 if (req->info.flags & SW_EVENT_DATA_OBJ_PTR){ worker_buffer = (zend_string *) (data - XtOffsetOf(zend_string, val)); ZVAL_STR(zdata, worker_buffer); }
其中,zdata
就是PHP
应用层会使用到的data
,而ZVAL_STR
是让zdata
里面的zend_value
直接指向了worker_buffer
,没有任何的内存拷贝。
通过以上方式,Swoole Server
再次减少了一次内存拷贝。
上述过程我们可以通过下面这幅图总结:
最终,我们把4
次内存拷贝降低到了1
次,因此onMessage
回调函数性能提升了4
倍。
性能对比 我们的压测程序如下,Server
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 <?php use Swoole \WebSocket \Server ;$start = microtime(true ); $server = new Server("0.0.0.0" , 9501 ); $server->set([ "worker_num" => 1 , 'package_max_length' => 1024 * 1024 * 4 , ]); $server->on('open' , function (Server $server, $request ) { }); $server->on('message' , function (Server $server, $frame ) use ($start ) { if ($frame->data === "end" ) { $end = microtime(true ); var_dump(($end - $start) * 1000 ); } }); $server->on('close' , function ($ser, $fd ) { echo "client {$fd} closed\n" ; }); $server->start();
Client
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 <?php use Swoole \Coroutine ;use Swoole \Coroutine \Http \Client ;use function Co \run ;run(function ( ) { $cli = new Client("127.0.0.1" , 9501 ); $ret = $cli->upgrade("/websocket" ); if (!$ret) { echo "ERROR\n" ; return ; } for ($i = 0 ; $i < 2000 ; $i++) { $cli->push(str_repeat('a' , 0.5 * 1024 * 1024 )); } $cli->push("end" ); sleep(100000 ); });
内存拷贝:
CPU使用率:
总结
本次优化主要是通过减少内存拷贝来提升Server
的性能,这也是服务器优化的重点。
本次优化我们通过设计Buffer
的接口,来达到切换Swoole
内核层Buffer
和PHP
层Buffer
的目的。首先,通过调用get_buffer
来获取到接收数据的zend_string
(接收第一个Chunk
前,都会分配一个新的zend_string
,因为PHP
底层会通过内存池来管理,所以分配zend_string
的性能开销不用太担心);当接收完数据的时候,调用add_buffer_len
来更新zend_string
的len
,也就是我们的offset
;当我们接收完所有的Chunk
之后,调用copy_buffer_addr
来保存zend_string
的地址。最后,我们通过ZVAL_STR
来设置PHP
应用层要用的data
。
本次优化我们使用了readv
代替read
。这么做的目的是,Swoole
进程间通信的时候,使用了一个header
来保存data
的信息,例如数据是哪个reactor
线程发来的,发送的数据大小是多少。但是,这个header
对于PHP
应用层来说是无需关心的,所以,我们就需要把本来连续的header:data
内存单独分开了,把所有的data
拼接起来。但是,这样就避免不了对data
的拷贝了。所以我们需要通过readv
来在接收数据的时候就分离header
和data
。但是,因为readv
是需要指定每一段buffer
的长度的,所以我们在接收第一个Chunk
之前,需要调用read
的peek
方法来获取到header
的长度,但是这个系统调用的开销是非常小的。