博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于libevent, libuv和android Looper不断演进socket编程 - 走向架构师之路 - 博客频道 - CSDN.NET...
阅读量:5840 次
发布时间:2019-06-18

本文共 7591 字,大约阅读时间需要 25 分钟。

分类:
2422人阅读
(2)

目录

最近在做websocket  porting的工作中,需要实现最底层socket读和写,基于同步读,libevent, libuv和android Looper都写了一套,从中体会不少。

1)同步阻塞读写

最开始采用同步阻塞读写,主要是为了快速实现来验证上层websocket协议的完备性。优点仅仅是实现起来简单,缺点就是效率不高,不能很好利用线程的资源,建立连接这一块方法都是类似的,主要的区别是在如何读写数据,先看几种方法共用的一块:

 

[cpp]
  1. int n = 0;  
  2. struct sockaddr_in serv_addr;  
  3. event_init();  
  4. if((mSockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0){  
  5.     //TODO error  
  6.     return;  
  7. }  
  8. memset(&serv_addr, '0'sizeof(serv_addr));  
  9. serv_addr.sin_family = AF_INET;  
  10. serv_addr.sin_port = htons(url.port());  
  11. if(inet_pton(AF_INET, url.host().utf8().data(), &serv_addr.sin_addr)<=0){  
  12.     return;  
  13. }  
  14. if( connect(mSockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0){  
  15.     return;  
  16. }  
int n = 0;    struct sockaddr_in serv_addr;    event_init();    if((mSockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0){        //TODO error        return;    }    memset(&serv_addr, '0', sizeof(serv_addr));    serv_addr.sin_family = AF_INET;    serv_addr.sin_port = htons(url.port());    if(inet_pton(AF_INET, url.host().utf8().data(), &serv_addr.sin_addr)<=0){        return;    }    if( connect(mSockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0){        return;    }
这里由于是client,所以比较简单,当然缺失了DNS解析这一块。然后,就是要监视读数据,由于是同步阻塞读,所以需要在循环里不断地去read/recv:

 

 

[cpp]
  1. while (1) {  
  2.     ssize_t result = recv(fd, buf, sizeof(buf), 0);  
  3.     if (result == 0) {  
  4.         break;  
  5.     } else if (result < 0) {  
  6.         perror("recv");  
  7.         close(fd);  
  8.         return 1;  
  9.     }  
  10.     fwrite(buf, 1, result, stdout);  
  11. }  
while (1) {        ssize_t result = recv(fd, buf, sizeof(buf), 0);        if (result == 0) {            break;        } else if (result < 0) {            perror("recv");            close(fd);            return 1;        }        fwrite(buf, 1, result, stdout);    }
缺点就显而易见,此线程需要不断轮询。当然,这里是个例子程序,正式代码中不会处理这么草率。

 

2)libevent

对上面的改进方法就是基于异步非阻塞的方式来处理读数据,在linux上一般是通过epoll来做异步事件侦听,而libevent是一个封装了epoll或其他平台上异步事件的c库,所以基于libevent来做异步非阻塞读写会更简单,也能跨平台。重构的第一个步是设置socketFD为非阻塞:

 

[cpp]
  1. static int setnonblock(int fd)  
  2. {  
  3.     int flags;  
  4.     flags = fcntl(fd, F_GETFL);  
  5.     if (flags < 0){  
  6.         return flags;  
  7.     }  
  8.     flags |= O_NONBLOCK;  
  9.     if (fcntl(fd, F_SETFL, flags) < 0){  
  10.         return -1;  
  11.     }  
  12.     return 0;  
  13. }  
static int setnonblock(int fd){    int flags;    flags = fcntl(fd, F_GETFL);    if (flags < 0){        return flags;    }    flags |= O_NONBLOCK;    if (fcntl(fd, F_SETFL, flags) < 0){        return -1;    }    return 0;}
然后需要在单独的线程中维护event loop,并添加read事件侦听:

 

 

[cpp]
  1. static void* loopListen(void *arg)  
  2. {  
  3.     SocketStreamHandle *handle = (SocketStreamHandle *)arg;  
  4.     struct event_base* base = event_base_new();  
  5.     struct event ev_read;  
  6.     handle->setReadEvent(&ev_read);  
  7.     setnonblock(handle->getSocketFD());  
  8.     event_set(&ev_read, handle->getSocketFD(), EV_READ|EV_PERSIST, onRead, handle);  
  9.     event_base_set(base, &ev_read);  
  10.     event_add(&ev_read, NULL);  
  11.     event_base_dispatch(base);  
  12. }  
static void* loopListen(void *arg){    SocketStreamHandle *handle = (SocketStreamHandle *)arg;    struct event_base* base = event_base_new();    struct event ev_read;    handle->setReadEvent(&ev_read);    setnonblock(handle->getSocketFD());    event_set(&ev_read, handle->getSocketFD(), EV_READ|EV_PERSIST, onRead, handle);    event_base_set(base, &ev_read);    event_add(&ev_read, NULL);    event_base_dispatch(base);}
[cpp]
  1.     pthread_t pid;  
  2.     pthread_create(&pid, 0, loopListen, this);  
    pthread_t pid;    pthread_create(&pid, 0, loopListen, this);

然后在onRead方法中处理数据读取:

 

[cpp]
  1. static void onRead(int fd, short ev, void *arg)  
  2. {  
  3.     while(true){  
  4.         char *buf = new char[1024];  
  5.         memset(buf, 0, 1024);  
  6.         int len = read(fd, buf, 1024);  
  7.         SocketStreamHandle *handle = (SocketStreamHandle *)arg;  
  8.         if(len > 0){  
  9.             SocketContext *context = new SocketContext;  
  10.             context->buf = buf;  
  11.             context->readLen = len;  
  12.             context->handle = handle;  
  13.             WTF::callOnMainThread(onReadMainThread, context);  
  14.             if(len == 1024){  
  15.                 continue;  
  16.             }else{  
  17.                 break;  
  18.             }  
  19.         }else{  
  20.             if(errno == EAGAIN || errno == EWOULDBLOCK){  
  21.                 return;  
  22.             }else if(errno == EINTR){  
  23.                 continue;  
  24.             }  
  25.             __android_log_print(ANDROID_LOG_INFO, LOG_TAG, "onCloseMainThread, len:%d, errno:%d", len, errno);  
  26.             WTF::callOnMainThread(onCloseMainThread, handle);  
  27.             event_del(handle->getReadEvent());  
  28.         }  
  29.     }  
  30. }  
static void onRead(int fd, short ev, void *arg){    while(true){        char *buf = new char[1024];        memset(buf, 0, 1024);        int len = read(fd, buf, 1024);        SocketStreamHandle *handle = (SocketStreamHandle *)arg;        if(len > 0){            SocketContext *context = new SocketContext;            context->buf = buf;            context->readLen = len;            context->handle = handle;            WTF::callOnMainThread(onReadMainThread, context);            if(len == 1024){                continue;            }else{                break;            }        }else{            if(errno == EAGAIN || errno == EWOULDBLOCK){                return;            }else if(errno == EINTR){                continue;            }            __android_log_print(ANDROID_LOG_INFO, LOG_TAG, "onCloseMainThread, len:%d, errno:%d", len, errno);            WTF::callOnMainThread(onCloseMainThread, handle);            event_del(handle->getReadEvent());        }    }}
这里比较有讲究的是:

 

1)当一次buf读不完,需要在循环里再次读一次

2)当read到0时,表示socket被关闭,这时需要删除事件侦听,不然会导致cpu 100%

3)当read到-1时,不完全是错误情况,比如errno == EAGAIN || errno == EWOULDBLOCK表示暂时不可读,歇一会后面再读。errno == EINTR表示被系统中断,应重读一遍

4)onRead是被libevent中专门做事件侦听的线程调用的,所以有的时候需要回到主线程,比如: WTF::callOnMainThread(onReadMainThread, context);这里就需要注意多线程间的同步问题。

3)libuv

libuv在libevent更进一步,它不但有event loop,并且把socket的各种操作也覆盖了,所以代码会更简洁,比如最开始的创建连接和创建loop:

 

[cpp]
  1. uv_loop_t *loop = uv_default_loop();  
  2. uv_tcp_t client;  
  3. uv_tcp_init(loop, &client);  
  4. struct sockaddr_in req_addr = uv_ip4_addr(url.host().utf8().data(), url.port());  
  5. uv_connect_t *connect_req;  
  6. connect_req->data = this;  
  7. uv_tcp_connect(connect_req, &client, req_addr, on_connect);  
  8. uv_run(loop);  
uv_loop_t *loop = uv_default_loop();    uv_tcp_t client;    uv_tcp_init(loop, &client);    struct sockaddr_in req_addr = uv_ip4_addr(url.host().utf8().data(), url.port());    uv_connect_t *connect_req;    connect_req->data = this;    uv_tcp_connect(connect_req, &client, req_addr, on_connect);    uv_run(loop);
在on_connect中创建对read的监听:

 

 

[cpp]
  1. static void* on_connect(uv_connect_t *req, int status)  
  2. {  
  3.     SocketStreamHandle *handle = (SocketStreamHandle *)arg;  
  4.     uv_read_start(req->handle, alloc_buffer, on_read);  
  5. }  
static void* on_connect(uv_connect_t *req, int status){    SocketStreamHandle *handle = (SocketStreamHandle *)arg;    uv_read_start(req->handle, alloc_buffer, on_read);}
on_read就和前面类似了。所以libuv是最强大的,极大的省略了socket相关的开发。

 

4)Android Looper

Android提供一套event loop的机制,并且可以对FD进行监听,所以如果基于Android Looper,就可以省去对第三方lib的依赖。并且Android也是对epoll的封装,既然如此,值得试一试用Android原生的looper来做这块的event looper。socket连接这块和最开始是一样的,关键是在创建looper的地方:

 

[cpp]
  1. static void* loopListen(void *arg)  
  2. {  
  3.     SocketStreamHandle *handle = (SocketStreamHandle *)arg;  
  4.     setnonblock(handle->getSocketFD());  
  5.     Looper *looper = new Looper(true);  
  6.     looper->addFd(handle->getSocketFD(), 0, ALOOPER_EVENT_INPUT, onRead, handle);  
  7.     while(true){  
  8.         if(looper->pollOnce(100) == ALOOPER_POLL_ERROR){  
  9.             __android_log_print(ANDROID_LOG_INFO, LOG_TAG, "ALOOPER_POLL_ERROR");  
  10.             break;  
  11.         }  
  12.     }  
  13. }  
static void* loopListen(void *arg){    SocketStreamHandle *handle = (SocketStreamHandle *)arg;    setnonblock(handle->getSocketFD());    Looper *looper = new Looper(true);    looper->addFd(handle->getSocketFD(), 0, ALOOPER_EVENT_INPUT, onRead, handle);    while(true){        if(looper->pollOnce(100) == ALOOPER_POLL_ERROR){            __android_log_print(ANDROID_LOG_INFO, LOG_TAG, "ALOOPER_POLL_ERROR");            break;        }    }}
代码比较简单就不多说,详细使用方法可以查看<utils/Looper.h>的API。

 

综上所述,如果是在Android上做,可以直接基于原生的Looper,如果需要跨平台可以基于libuv。总之,要避免同步阻塞,因为这样会导致线程设计上的复杂和低效。

在Java里也有类似的概念,可以参见以前的博文:

 

转载地址:http://cxvcx.baihongyu.com/

你可能感兴趣的文章
Monkey学习(3)如何在Android模拟器中安装apk
查看>>
测试1.书店的增删改查项目.链接数据库
查看>>
java基础( 九)-----深入分析Java的序列化与反序列化
查看>>
磁盘清理-安全转移C盘中软件的缓存文件
查看>>
Java 直线、多段线画板 PaintJFrame (整理)
查看>>
面向对象三大特性之多态
查看>>
GOLDENGATE 配置文档,各类参数--转发
查看>>
UVA 10209
查看>>
使用conlleval.pl对CRF测试结果进行评价的方法
查看>>
用半监督算法做文本分类(自训练)
查看>>
Mysql 数据库优化(一)
查看>>
javascript中split字符串分割函数
查看>>
搭建简单zookeeper+dubbo+项目
查看>>
开在东山小洋楼里的wilber's意大利餐厅
查看>>
基站定位接口说明文档
查看>>
Python-Day3
查看>>
CentOS6.2安装memcache
查看>>
《水晶报表在vs2010t中引用及打包》转自于互联网
查看>>
BZOJ3577:玩手机(最大流,二维ST表)
查看>>
C语言-控制语句(循环)
查看>>