分类: 2013-05-24 17:33 2422人阅读 (2)目录
最近在做websocket porting的工作中,需要实现最底层socket读和写,基于同步读,libevent, libuv和android Looper都写了一套,从中体会不少。
1)同步阻塞读写
最开始采用同步阻塞读写,主要是为了快速实现来验证上层websocket协议的完备性。优点仅仅是实现起来简单,缺点就是效率不高,不能很好利用线程的资源,建立连接这一块方法都是类似的,主要的区别是在如何读写数据,先看几种方法共用的一块:
- int n = 0;
- struct sockaddr_in serv_addr;
- event_init();
- if((mSockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
- //TODO error
- return;
- }
- memset(&serv_addr, '0', sizeof(serv_addr));
- serv_addr.sin_family = AF_INET;
- serv_addr.sin_port = htons(url.port());
- if(inet_pton(AF_INET, url.host().utf8().data(), &serv_addr.sin_addr)<=0){
- return;
- }
- if( connect(mSockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0){
- return;
- }
int n = 0; struct sockaddr_in serv_addr; event_init(); if((mSockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0){ //TODO error return; } memset(&serv_addr, '0', sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(url.port()); if(inet_pton(AF_INET, url.host().utf8().data(), &serv_addr.sin_addr)<=0){ return; } if( connect(mSockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0){ return; }这里由于是client,所以比较简单,当然缺失了DNS解析这一块。然后,就是要监视读数据,由于是同步阻塞读,所以需要在循环里不断地去read/recv:
- while (1) {
- ssize_t result = recv(fd, buf, sizeof(buf), 0);
- if (result == 0) {
- break;
- } else if (result < 0) {
- perror("recv");
- close(fd);
- return 1;
- }
- fwrite(buf, 1, result, stdout);
- }
while (1) { ssize_t result = recv(fd, buf, sizeof(buf), 0); if (result == 0) { break; } else if (result < 0) { perror("recv"); close(fd); return 1; } fwrite(buf, 1, result, stdout); }缺点就显而易见,此线程需要不断轮询。当然,这里是个例子程序,正式代码中不会处理这么草率。
2)libevent
对上面的改进方法就是基于异步非阻塞的方式来处理读数据,在linux上一般是通过epoll来做异步事件侦听,而libevent是一个封装了epoll或其他平台上异步事件的c库,所以基于libevent来做异步非阻塞读写会更简单,也能跨平台。重构的第一个步是设置socketFD为非阻塞:
- static int setnonblock(int fd)
- {
- int flags;
- flags = fcntl(fd, F_GETFL);
- if (flags < 0){
- return flags;
- }
- flags |= O_NONBLOCK;
- if (fcntl(fd, F_SETFL, flags) < 0){
- return -1;
- }
- return 0;
- }
static int setnonblock(int fd){ int flags; flags = fcntl(fd, F_GETFL); if (flags < 0){ return flags; } flags |= O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) < 0){ return -1; } return 0;}然后需要在单独的线程中维护event loop,并添加read事件侦听:
- static void* loopListen(void *arg)
- {
- SocketStreamHandle *handle = (SocketStreamHandle *)arg;
- struct event_base* base = event_base_new();
- struct event ev_read;
- handle->setReadEvent(&ev_read);
- setnonblock(handle->getSocketFD());
- event_set(&ev_read, handle->getSocketFD(), EV_READ|EV_PERSIST, onRead, handle);
- event_base_set(base, &ev_read);
- event_add(&ev_read, NULL);
- event_base_dispatch(base);
- }
static void* loopListen(void *arg){ SocketStreamHandle *handle = (SocketStreamHandle *)arg; struct event_base* base = event_base_new(); struct event ev_read; handle->setReadEvent(&ev_read); setnonblock(handle->getSocketFD()); event_set(&ev_read, handle->getSocketFD(), EV_READ|EV_PERSIST, onRead, handle); event_base_set(base, &ev_read); event_add(&ev_read, NULL); event_base_dispatch(base);}
- pthread_t pid;
- pthread_create(&pid, 0, loopListen, this);
pthread_t pid; pthread_create(&pid, 0, loopListen, this);然后在onRead方法中处理数据读取:
- static void onRead(int fd, short ev, void *arg)
- {
- while(true){
- char *buf = new char[1024];
- memset(buf, 0, 1024);
- int len = read(fd, buf, 1024);
- SocketStreamHandle *handle = (SocketStreamHandle *)arg;
- if(len > 0){
- SocketContext *context = new SocketContext;
- context->buf = buf;
- context->readLen = len;
- context->handle = handle;
- WTF::callOnMainThread(onReadMainThread, context);
- if(len == 1024){
- continue;
- }else{
- break;
- }
- }else{
- if(errno == EAGAIN || errno == EWOULDBLOCK){
- return;
- }else if(errno == EINTR){
- continue;
- }
- __android_log_print(ANDROID_LOG_INFO, LOG_TAG, "onCloseMainThread, len:%d, errno:%d", len, errno);
- WTF::callOnMainThread(onCloseMainThread, handle);
- event_del(handle->getReadEvent());
- }
- }
- }
static void onRead(int fd, short ev, void *arg){ while(true){ char *buf = new char[1024]; memset(buf, 0, 1024); int len = read(fd, buf, 1024); SocketStreamHandle *handle = (SocketStreamHandle *)arg; if(len > 0){ SocketContext *context = new SocketContext; context->buf = buf; context->readLen = len; context->handle = handle; WTF::callOnMainThread(onReadMainThread, context); if(len == 1024){ continue; }else{ break; } }else{ if(errno == EAGAIN || errno == EWOULDBLOCK){ return; }else if(errno == EINTR){ continue; } __android_log_print(ANDROID_LOG_INFO, LOG_TAG, "onCloseMainThread, len:%d, errno:%d", len, errno); WTF::callOnMainThread(onCloseMainThread, handle); event_del(handle->getReadEvent()); } }}这里比较有讲究的是:
1)当一次buf读不完,需要在循环里再次读一次
2)当read到0时,表示socket被关闭,这时需要删除事件侦听,不然会导致cpu 100%
3)当read到-1时,不完全是错误情况,比如errno == EAGAIN || errno == EWOULDBLOCK表示暂时不可读,歇一会后面再读。errno == EINTR表示被系统中断,应重读一遍
4)onRead是被libevent中专门做事件侦听的线程调用的,所以有的时候需要回到主线程,比如: WTF::callOnMainThread(onReadMainThread, context);这里就需要注意多线程间的同步问题。
3)libuv
libuv在libevent更进一步,它不但有event loop,并且把socket的各种操作也覆盖了,所以代码会更简洁,比如最开始的创建连接和创建loop:
- uv_loop_t *loop = uv_default_loop();
- uv_tcp_t client;
- uv_tcp_init(loop, &client);
- struct sockaddr_in req_addr = uv_ip4_addr(url.host().utf8().data(), url.port());
- uv_connect_t *connect_req;
- connect_req->data = this;
- uv_tcp_connect(connect_req, &client, req_addr, on_connect);
- uv_run(loop);
uv_loop_t *loop = uv_default_loop(); uv_tcp_t client; uv_tcp_init(loop, &client); struct sockaddr_in req_addr = uv_ip4_addr(url.host().utf8().data(), url.port()); uv_connect_t *connect_req; connect_req->data = this; uv_tcp_connect(connect_req, &client, req_addr, on_connect); uv_run(loop);在on_connect中创建对read的监听:
- static void* on_connect(uv_connect_t *req, int status)
- {
- SocketStreamHandle *handle = (SocketStreamHandle *)arg;
- uv_read_start(req->handle, alloc_buffer, on_read);
- }
static void* on_connect(uv_connect_t *req, int status){ SocketStreamHandle *handle = (SocketStreamHandle *)arg; uv_read_start(req->handle, alloc_buffer, on_read);}on_read就和前面类似了。所以libuv是最强大的,极大的省略了socket相关的开发。
4)Android Looper
Android提供一套event loop的机制,并且可以对FD进行监听,所以如果基于Android Looper,就可以省去对第三方lib的依赖。并且Android也是对epoll的封装,既然如此,值得试一试用Android原生的looper来做这块的event looper。socket连接这块和最开始是一样的,关键是在创建looper的地方:
- static void* loopListen(void *arg)
- {
- SocketStreamHandle *handle = (SocketStreamHandle *)arg;
- setnonblock(handle->getSocketFD());
- Looper *looper = new Looper(true);
- looper->addFd(handle->getSocketFD(), 0, ALOOPER_EVENT_INPUT, onRead, handle);
- while(true){
- if(looper->pollOnce(100) == ALOOPER_POLL_ERROR){
- __android_log_print(ANDROID_LOG_INFO, LOG_TAG, "ALOOPER_POLL_ERROR");
- break;
- }
- }
- }
static void* loopListen(void *arg){ SocketStreamHandle *handle = (SocketStreamHandle *)arg; setnonblock(handle->getSocketFD()); Looper *looper = new Looper(true); looper->addFd(handle->getSocketFD(), 0, ALOOPER_EVENT_INPUT, onRead, handle); while(true){ if(looper->pollOnce(100) == ALOOPER_POLL_ERROR){ __android_log_print(ANDROID_LOG_INFO, LOG_TAG, "ALOOPER_POLL_ERROR"); break; } }}代码比较简单就不多说,详细使用方法可以查看<utils/Looper.h>的API。
综上所述,如果是在Android上做,可以直接基于原生的Looper,如果需要跨平台可以基于libuv。总之,要避免同步阻塞,因为这样会导致线程设计上的复杂和低效。
在Java里也有类似的概念,可以参见以前的博文: