開源項目SMSS開發指南(二)——基于libevent的線程池

libevent是一套輕量級的網絡庫,基于事件驅動開發。能夠實現多線程的多路復用和注冊事件響應。本文將介紹libevent的基本功能以及如何利用libevent開發一個線程池。

一. 使用指南

監聽服務和注冊連接事件

libevent是一個基于事件驅動的網絡庫,通過在一個事件循環上注冊不同的事件以完成線程多路復用。由于libevent采用c語言開發,為了使用方便我們可以將它的功能通過面向對象的設計模式用c++來封裝。下面是對常用函數的詳細介紹:

(1)event_base_new():創建(初始化)event_base

event_base代表了一個事件循環上下文,所有需要基于這個事件循環的事件都需要注冊在它的上面。如果創建成功,最后需要使用event_base_free()來釋放資源。

(2)evconnlistener_new_bind():綁定一個本地端口并注冊網絡監聽事件

參數說明:

  • struct event_base* base 前文創建好的base,事件將關聯到這個事件循環上
  • evconnlistener_cb cb 事件觸發的回調
  • void *ptr 回調函數的參數,這個參數可以由用戶任意指定,方便在回調函數中使用
  • unsigned flags 事件的附加標識,代表事件操作
  • int backlog 網絡緩存大小
  • const struct sockaddr *sa socket地址
  • int socklen socket地址長度

函數會返回一個新的evconnlistener,如果創建成功,需要使用evconnlistener_free()來釋放資源。

(3)event_base_dispatch():啟動事件循環和事件分發

這個函數會阻塞當前線程,用戶可以在事件回調函數中通過event_base_loopbreak()來中斷。如果不希望當前線程被堵塞也可以使用event_base_loop()函數。注意,千萬不要在回調函數中清理event_base。

代碼示例:

// 創建事件循環
ev_base_ = event_base_new();
if (!ev_base_)
{
    return false;
}
sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(port_);
// 創建socket連接回調
ev_listener_ = evconnlistener_new_bind(
    ev_base_,
    SConnListenerCb,
    this,
    LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,
    this->backlog_,
    (sockaddr *)&sin,
    sizeof(sin));
if (!ev_listener_)
{
    return false;
}
while (!quit_)
{
    event_base_loop(ev_base_, EVLOOP_NONBLOCK);
    this_thread::sleep_for(chrono::milliseconds(1));
}
evconnlistener_free(ev_listener_);
event_base_free(ev_base_);
static void SConnListenerCb(struct evconnlistener *listen, evutil_socket_t sock, struct sockaddr *addr, int len, void *ctx)
{
    // 解析客戶端ip
    char ip[16] = {0};
    sockaddr_in *addr_in = (sockaddr_in *)addr;
    evutil_inet_ntop(AF_INET, &addr_in->sin_addr, ip, sizeof(ip));
    stringstream ss;
    ss << ip << ":" << addr_in->sin_port << " 連接完成...";
    LOG4CPLUS_INFO(SimpleLogger::Get()->LoggerRef(), ss.str());
    SmsServer *server = (SmsServer *)ctx;
    int s = sock;

    server->ConnListener(s);
}

創建連接和注冊讀、寫、事件監聽

(1)bufferevent_socket_new():創建一個帶socket緩存的事件

bufferevent表示一個事件緩存,每當有數據需要讀取的時候,它會先將數據從內核態取出再通知用戶。順帶提一下,libevent對事件的觸發支持兩種模式:(ET)邊沿觸發和(LT)水平觸發。如果你設置了水平觸發,但是通過bufferevent來讀取消息,無論你是否將消息接收完成,都不會被反復觸發回調。因此,使用bufferevent來接收消息的時候,需要特別關注TCP粘包和長包。

(2)bufferevent_setcb():設置bufferevent的回調函數

參數說明:

  • struct bufferevent* bufev 關聯對象
  • bufferevent_data_cb readcb 讀回調 函數原型void (*bufferevent_data_cb)(struct bufferevent *bev, void* ctx)
  • bufferevent_data_cb writecb 寫回調 函數原型(同上)
  • bufferevent_event_cb eventcb 事件回調 函數原型void (*bufferevent_event_cb)(struct bufferevent *bev, short what, void *ctx)
  • void *cbarg 回調函數的最后一個參數,由用戶指定

讀回調顧名思義就是當有數據的時候會觸發的函數,可是寫回調什么時候觸發?有興趣的朋友可以自己測試一下。特別需要關注事件回調函數。所有可觸發的事件包括:BEV_EVENT_READING(讀事件),BEV_EVENT_WRITING(寫事件),BEV_EVENT_EOF(結束事件),BEV_EVENT_ERROR(錯誤事件),BEV_EVENT_TIMEOUT(超時事件),BEV_EVENT_CONNECTED(連接事件)。如果你是在開發服務端BEV_EVENT_CONNECTED事件不會被觸發,因為連接事件是在bufferevent創建前產生的。BEV_EVENT_READING || BEV_EVENT_TIMEOUT可以用來表示讀數據超時,通過這個事件可以偵測心跳代表距離上次讀數據已經超時。BEV_EVENT_WRITING || BEV_EVENT_TIMEOUT可以表示寫超時,但是這個事件只會在當有數據需要被發送可是超時未發送成功后才會被觸發。

此外,當發生超時事件后,相關的讀寫操作都會被從bufferevent中移除。如果用戶希望繼續之前的操作,需要重新注冊讀/寫。

(3)bufferevent_set_timeouts():設置讀/寫超時

只有在通過這個函數設置了讀/寫超時后,在事件回調函數中BEV_EVENT_TIMEOUT才會生效。

代碼示例:

bufferevent *buff_ev_ = bufferevent_socket_new(ev_base_, socket_, BEV_OPT_CLOSE_ON_FREE);
if (!buff_ev_)
{
    return false;
}
// 指定參數
bufferevent_setcb(buff_ev_, SReadCb, SWriteCb, SEventCb, this);
bufferevent_enable(buff_ev_, EV_READ | EV_WRITE);
timeval tv = {timeout_, 0};
bufferevent_set_timeouts(buff_ev_, &tv, NULL);
return true;

讀寫數據

(1)bufferevent_read():從緩存中接收數據

通常在讀回調中使用,通過返回值判斷緩存中是否還有數據

(2)bufferevent_write():向緩沖寫入數據以通過socket發送

返回值表示有多少數據已經被寫入進內核

創建基于管道的事件

libevent除了可以用在網絡上,還可以和管道(pipe)結合用來生成管道事件。

(1)event_config_new():創建一個事件配置對象

event_config可以用來創建一個非默認的事件循環,通常使用這個函數配合event_base_new_with_config()創建event_base。最后需要使用event_config_free()來釋放資源。

(2)event_new():創建一個讀/寫事件

和bufferevent的創建不同,event_new()只會創建一個配套的事件,如果在事件中用戶沒有對數據進行處理,回調會一直被觸發。

代碼示例:

// 初始化一對管道,只能在linux系統下使用
int pipefd[2];
if (pipe(pipefd))
{
    return false;
}
// pipefd[0]讀取管道 pipefd[1]發送管道
this->pipe_endpoint_ = pipefd[1];
// 創建管道事件
event_config *ev_conf = event_config_new();
event_config_set_flag(ev_conf, EVENT_BASE_FLAG_NOLOCK);
this->ev_base_ = event_base_new_with_config(ev_conf);
event_config_free(ev_conf);
if (!ev_base_)
{
    return false;
}

pipe_ev_ = event_new(this->ev_base_, pipefd[0], EV_READ | EV_PERSIST, SEventCb, this);
event_add(pipe_ev_, 0);

 二、實現線程池

線程池實現原理

libevent可以實現對線程的多路復用,因此我們可以在一個線程中完成服務端對多個客戶端的同時讀寫操作。這樣做雖然能夠最大限度的利用系統資源,可是無法充分發揮cpu多線程的處理能力。開發高可用和適合高負載的服務端我們依然應該考慮啟動多個線程來處理數據。關鍵是我們如何將事件分發到不同的線程中以保持多個線程的負載均衡。

  1. 當服務啟動的時候首先創建N條線程。每一個線程對應一個事件循環event_base。
  2. 主線程負責監聽指定端口并在連接的回調函數中處理新連接套接字的處理。
  3. 當有新的客戶端連接后,主線程會把套接字先保存在一個隊列中。掃描當前所有線程的處理量,選擇負載最小的線程利用管道發送一個信號(‘c’)。對應線程的事件循環在管道的讀事件中從隊列中獲取這個套接字,并建立對應的bufferevent進行處理。當前線程負載量+1。
  4. 客戶端斷開后通知bufferevent所在的線程將負載量減一。

smss源碼閱讀

相關的源碼文件為sms_server,work_group,work_thread和socket_manager

服務初始化,注冊連接監聽事件并初始化線程組

bool SmsServer::Init()
{
    // 創建事件循環
    ev_base_ = event_base_new();
    if (!ev_base_)
    {
        return false;
    }
    sockaddr_in sin;
    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_port = htons(port_);
    // 創建socket連接回調
    ev_listener_ = evconnlistener_new_bind(
        ev_base_,
        SConnListenerCb,
        this,
        LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,
        this->backlog_,
        (sockaddr *)&sin,
        sizeof(sin));
    if (!ev_listener_)
    {
        return false;
    }
    // 創建線程組管理類
    boss_ = new WorkGroup(thread_num_);
    boss_->Init();
    return true;
}

線程組負責管理所有的線程

bool WorkGroup::Init()
{
    // 直接初始化指定的工作線程
    for (int i = 0; i < num_; i++)
    {
        int id = group_.size() + 1;
        WorkThread *work = new WorkThread(this, id, net_bus_);
        if (!work->Init())
        {
            return false;
        }
        work->Start(); // thread start...
        group_.push_back(work);
        // 將當前初始化完成的工作線程注冊進消息總線
        net_bus_->Regist(work); // regist thread to netbus
    }
    return true;
}

每一個線程在初始化的時候會創建一條管道并在自己的事件循環上注冊對應的讀回調,對外部暴露Notify方法用來激活事件

bool WorkThread::Init()
{
    // 初始化一對管道,只能在linux系統下使用
    int pipefd[2];
    if (pipe(pipefd))
    {
        return false;
    }
    // pipefd[0]讀取管道 pipefd[1]發送管道
    this->pipe_endpoint_ = pipefd[1];
    // 創建管道事件
    event_config *ev_conf = event_config_new();
    event_config_set_flag(ev_conf, EVENT_BASE_FLAG_NOLOCK);
    this->ev_base_ = event_base_new_with_config(ev_conf);
    event_config_free(ev_conf);
    if (!ev_base_)
    {
        return false;
    }

    pipe_ev_ = event_new(this->ev_base_, pipefd[0], EV_READ | EV_PERSIST, SEventCb, this);
    event_add(pipe_ev_, 0);
    return true;
}

void WorkThread::Notify(const char *sign)
{
    // 激活
    int re = write(this->pipe_endpoint_, sign, 1);
    if (re <= 0)
    {
        LOG4CPLUS_ERROR(SimpleLogger::Get()->LoggerRef(), "管道激活失敗");
    }
}

在讀回調中獲取套接字,創建連接管理對象SocketManager

void WorkThread::Activated(int fd)
{
    char buf[2] = {0};
    int re = read(fd, buf, 1);
    socket_list_mtx_.lock();
    if (strcmp(buf, "c") == 0) // 通知有新的客戶端連接
    {
        // new client connect, create SocketManager
        if (socket_list_.empty())
        {
            socket_list_mtx_.unlock();
            return;
        }
        // 讀取一條套接字
        int client_sock = socket_list_.front();
        socket_list_.pop_front();
        // 創建socketManager
        SocketManager *manager = new SocketManager(this, ev_base_, client_sock, AppContext::Get()->client_timeout());
        manager->Init();
        sock_manager_list_.push_back(manager);
        stringstream ss;
        ss << "SocketManager:" << client_sock << " 創建完成";
        LOG4CPLUS_DEBUG(SimpleLogger::Get()->LoggerRef(), ss.str());
    }

    socket_list_mtx_.unlock();
}

客戶端連接后將創建的套接字交給負載最小的線程處理

void WorkGroup::CreateConnection(int sock)
{
    int min = -1;
    WorkThread *work = nullptr;
    // 遍歷尋找負載最輕的線程
    for (auto it = group_.begin(); it != group_.end(); it++)
    {
        if (min == -1)
        {
            min = (*it)->connect_num();
            work = (*it);
        }
        else if ((*it)->connect_num() < min)
        {
            min = (*it)->connect_num();
            work = (*it);
        }
    }
    // 添加一條socket fd進隊列并通過管道激活
    work->AddSocket(sock);
    work->Notify("c");
}

 

完整源碼已經發布在碼云上。

相關文章:《開源項目SMSS開發指南》

posted @ 2020-01-11 15:13  冷豪  閱讀(...)  評論(...編輯  收藏
ag二分彩