第4章:I/O多重化
はじめに
複数のクライアントを同時に処理するには、I/O多重化(I/O Multiplexing)が必要です。本章では、select、poll、epoll、kqueueを学びます。
---
1. なぜI/O多重化が必要か
1.1 ブロッキングI/Oの問題
ブロッキングモデル:
Client 1: ─────────────────────────────────→
[recv] ← ブロック中
Client 2: ───────×───────────────────────→
↑
待機(処理されない)
Client 3: ───────────×───────────────────→
↑
待機(処理されない)
問題: 1クライアントの処理中、他を処理できない
1.2 解決策の比較
解決策1: マルチプロセス
+ シンプル
- リソース消費大
- プロセス間通信が複雑
解決策2: マルチスレッド
+ 共有メモリ
- 同期が難しい
- デッドロックのリスク
解決策3: I/O多重化 ← webservで使用
+ 単一スレッド
+ 効率的
+ シンプルな状態管理
- コードが複雑になりやすい
1.3 C10K問題
1999年、Dan Kegel「The C10K Problem」:
- 10,000同時接続をどう処理するか
- select/pollの限界
- epoll/kqueueの誕生のきっかけ
現代:
- C10M問題(1,000万接続)
- io_uring (Linux 5.1+)
- DPDK(カーネルバイパス)
---
2. select()
2.1 概要
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
// nfds: 監視するfdの最大値 + 1
// readfds: 読み込み可能を監視
// writefds: 書き込み可能を監視
// exceptfds: 例外を監視
// timeout: タイムアウト(NULLで無限)
// 戻り値
// > 0: 準備完了のfd数
// = 0: タイムアウト
// < 0: エラー
2.2 fd_setの操作
fd_set fds;
FD_ZERO(&fds); // 初期化(すべてクリア)
FD_SET(fd, &fds); // fdを追加
FD_CLR(fd, &fds); // fdを削除
FD_ISSET(fd, &fds); // fdが含まれているか
2.3 selectを使ったサーバー
#include <sys/select.h>
#include <vector>
#include <algorithm>
class SelectServer {
private:
int _serverFd;
std::vector<int> _clients;
fd_set _masterSet;
int _maxFd;
public:
void run() {
FD_ZERO(&_masterSet);
FD_SET(_serverFd, &_masterSet);
_maxFd = _serverFd;
while (true) {
fd_set readSet = _masterSet; // コピー(selectが変更するため)
// タイムアウト設定(60秒)
struct timeval timeout;
timeout.tv_sec = 60;
timeout.tv_usec = 0;
int ready = select(_maxFd + 1, &readSet, NULL, NULL, &timeout);
if (ready < 0) {
if (errno == EINTR) continue; // シグナルで中断
perror("select");
break;
}
if (ready == 0) {
std::cout << "Timeout, no activity" << std::endl;
continue;
}
// 新規接続をチェック
if (FD_ISSET(_serverFd, &readSet)) {
acceptNewClient();
}
// 既存クライアントをチェック
for (int fd : _clients) {
if (FD_ISSET(fd, &readSet)) {
handleClient(fd);
}
}
}
}
void acceptNewClient() {
int clientFd = accept(_serverFd, NULL, NULL);
if (clientFd < 0) {
perror("accept");
return;
}
// 非ブロッキングに設定
int flags = fcntl(clientFd, F_GETFL, 0);
fcntl(clientFd, F_SETFL, flags | O_NONBLOCK);
FD_SET(clientFd, &_masterSet);
_clients.push_back(clientFd);
_maxFd = std::max(_maxFd, clientFd);
std::cout << "New client: " << clientFd << std::endl;
}
void removeClient(int fd) {
close(fd);
FD_CLR(fd, &_masterSet);
_clients.erase(std::remove(_clients.begin(), _clients.end(), fd),
_clients.end());
// maxFdの再計算
_maxFd = _serverFd;
for (int client : _clients) {
_maxFd = std::max(_maxFd, client);
}
}
};
2.4 selectの限界
selectの問題:
1. FD_SETSIZE制限(通常1024)
2. 毎回fd_setをコピーする必要
3. O(n)でfdをスキャン
4. fd_setはビットマスク(非効率)
適用範囲:
- 少数の接続(〜数百)
- 移植性が重要な場合
- macOS/BSDで使用可能
---
3. poll()
3.1 概要
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd {
int fd; // ファイルディスクリプタ
short events; // 監視するイベント
short revents; // 発生したイベント
};
// events/reventsのフラグ
POLLIN // 読み込み可能
POLLOUT // 書き込み可能
POLLERR // エラー
POLLHUP // 接続切断
POLLNVAL // 無効なfd
3.2 pollを使ったサーバー
#include <poll.h>
#include <vector>
class PollServer {
private:
int _serverFd;
std::vector<struct pollfd> _pollfds;
public:
void run() {
// サーバーソケットを追加
struct pollfd serverPfd;
serverPfd.fd = _serverFd;
serverPfd.events = POLLIN;
serverPfd.revents = 0;
_pollfds.push_back(serverPfd);
while (true) {
int ready = poll(_pollfds.data(), _pollfds.size(), 60000);
if (ready < 0) {
if (errno == EINTR) continue;
perror("poll");
break;
}
if (ready == 0) {
std::cout << "Timeout" << std::endl;
continue;
}
// 発生したイベントを処理
for (size_t i = 0; i < _pollfds.size(); ++i) {
if (_pollfds[i].revents == 0) continue;
if (_pollfds[i].fd == _serverFd) {
// 新規接続
if (_pollfds[i].revents & POLLIN) {
acceptNewClient();
}
} else {
// クライアント処理
handleClientEvents(i);
}
}
}
}
void acceptNewClient() {
int clientFd = accept(_serverFd, NULL, NULL);
if (clientFd < 0) {
perror("accept");
return;
}
// 非ブロッキングに設定
int flags = fcntl(clientFd, F_GETFL, 0);
fcntl(clientFd, F_SETFL, flags | O_NONBLOCK);
struct pollfd pfd;
pfd.fd = clientFd;
pfd.events = POLLIN;
pfd.revents = 0;
_pollfds.push_back(pfd);
}
void handleClientEvents(size_t index) {
struct pollfd& pfd = _pollfds[index];
if (pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
// エラーまたは切断
removeClient(index);
return;
}
if (pfd.revents & POLLIN) {
// 読み込み可能
if (!handleRead(pfd.fd)) {
removeClient(index);
return;
}
}
if (pfd.revents & POLLOUT) {
// 書き込み可能
if (!handleWrite(pfd.fd)) {
removeClient(index);
return;
}
}
}
void removeClient(size_t index) {
close(_pollfds[index].fd);
_pollfds.erase(_pollfds.begin() + index);
}
// 書き込み待ちを設定
void setWriteInterest(int fd) {
for (auto& pfd : _pollfds) {
if (pfd.fd == fd) {
pfd.events |= POLLOUT;
break;
}
}
}
// 書き込み待ちを解除
void clearWriteInterest(int fd) {
for (auto& pfd : _pollfds) {
if (pfd.fd == fd) {
pfd.events &= ~POLLOUT;
break;
}
}
}
};
3.3 pollの特徴
selectより改善:
+ FD_SETSIZE制限なし
+ イベントと結果が分離
+ より明確なAPI
依然として残る問題:
- 毎回全fdをカーネルにコピー
- O(n)でスキャン
- 大量接続では遅い
---
4. epoll(Linux専用)
4.1 概要
#include <sys/epoll.h>
// epollインスタンス作成
int epoll_create1(int flags);
// イベント登録
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// op: EPOLL_CTL_ADD, EPOLL_CTL_MOD, EPOLL_CTL_DEL
// イベント待機
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
struct epoll_event {
uint32_t events; // EPOLLIN, EPOLLOUT, etc.
epoll_data_t data;
};
union epoll_data_t {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
};
4.2 epollを使ったサーバー
#include <sys/epoll.h>
class EpollServer {
private:
int _serverFd;
int _epollFd;
static const int MAX_EVENTS = 64;
public:
void init() {
_epollFd = epoll_create1(0);
if (_epollFd < 0) {
throw std::runtime_error("epoll_create1 failed");
}
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = _serverFd;
epoll_ctl(_epollFd, EPOLL_CTL_ADD, _serverFd, &ev);
}
void run() {
struct epoll_event events[MAX_EVENTS];
while (true) {
int nready = epoll_wait(_epollFd, events, MAX_EVENTS, 60000);
if (nready < 0) {
if (errno == EINTR) continue;
perror("epoll_wait");
break;
}
if (nready == 0) {
std::cout << "Timeout" << std::endl;
continue;
}
// 準備完了のfdだけを処理(O(ready))
for (int i = 0; i < nready; ++i) {
if (events[i].data.fd == _serverFd) {
acceptNewClient();
} else {
handleClientEvent(events[i]);
}
}
}
}
void acceptNewClient() {
int clientFd = accept(_serverFd, NULL, NULL);
if (clientFd < 0) {
perror("accept");
return;
}
// 非ブロッキングに設定
int flags = fcntl(clientFd, F_GETFL, 0);
fcntl(clientFd, F_SETFL, flags | O_NONBLOCK);
// epollに登録
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // エッジトリガー
ev.data.fd = clientFd;
epoll_ctl(_epollFd, EPOLL_CTL_ADD, clientFd, &ev);
}
void handleClientEvent(struct epoll_event& event) {
int fd = event.data.fd;
if (event.events & (EPOLLERR | EPOLLHUP)) {
removeClient(fd);
return;
}
if (event.events & EPOLLIN) {
handleRead(fd);
}
if (event.events & EPOLLOUT) {
handleWrite(fd);
}
}
void removeClient(int fd) {
epoll_ctl(_epollFd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
}
void setWriteInterest(int fd) {
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
ev.data.fd = fd;
epoll_ctl(_epollFd, EPOLL_CTL_MOD, fd, &ev);
}
};
4.3 レベルトリガー vs エッジトリガー
レベルトリガー(LT): デフォルト
- データがある限り通知し続ける
- 安全だが効率が落ちる可能性
エッジトリガー(ET): EPOLLET
- 状態が変化した時だけ通知
- 効率的だが、一度にすべて読む必要
- 読み残すと次の通知が来ない
// エッジトリガーでの読み込み
void handleReadET(int fd) {
while (true) {
char buffer[4096];
ssize_t bytes = recv(fd, buffer, sizeof(buffer), 0);
if (bytes > 0) {
// データ処理
processData(buffer, bytes);
} else if (bytes == 0) {
// 切断
removeClient(fd);
break;
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// すべて読み終わった
break;
}
// 本当のエラー
removeClient(fd);
break;
}
}
}
---
5. kqueue(BSD/macOS)
5.1 概要
#include <sys/event.h>
// kqueueインスタンス作成
int kqueue(void);
// イベント登録・取得
int kevent(int kq, const struct kevent *changelist, int nchanges,
struct kevent *eventlist, int nevents,
const struct timespec *timeout);
struct kevent {
uintptr_t ident; // 識別子(通常はfd)
int16_t filter; // フィルタータイプ
uint16_t flags; // アクションフラグ
uint32_t fflags; // フィルター固有フラグ
intptr_t data; // フィルター固有データ
void *udata; // ユーザーデータ
};
// フィルター
EVFILT_READ // 読み込み可能
EVFILT_WRITE // 書き込み可能
EVFILT_TIMER // タイマー
// フラグ
EV_ADD // イベント追加
EV_DELETE // イベント削除
EV_ENABLE // 有効化
EV_DISABLE // 無効化
EV_ONESHOT // 一度だけ通知
5.2 kqueueを使ったサーバー
#include <sys/event.h>
class KqueueServer {
private:
int _serverFd;
int _kq;
static const int MAX_EVENTS = 64;
public:
void init() {
_kq = kqueue();
if (_kq < 0) {
throw std::runtime_error("kqueue failed");
}
struct kevent ev;
EV_SET(&ev, _serverFd, EVFILT_READ, EV_ADD, 0, 0, NULL);
kevent(_kq, &ev, 1, NULL, 0, NULL);
}
void run() {
struct kevent events[MAX_EVENTS];
while (true) {
struct timespec timeout;
timeout.tv_sec = 60;
timeout.tv_nsec = 0;
int nready = kevent(_kq, NULL, 0, events, MAX_EVENTS, &timeout);
if (nready < 0) {
if (errno == EINTR) continue;
perror("kevent");
break;
}
if (nready == 0) {
std::cout << "Timeout" << std::endl;
continue;
}
for (int i = 0; i < nready; ++i) {
int fd = events[i].ident;
if (fd == _serverFd) {
acceptNewClient();
} else {
handleClientEvent(events[i]);
}
}
}
}
void acceptNewClient() {
int clientFd = accept(_serverFd, NULL, NULL);
if (clientFd < 0) {
perror("accept");
return;
}
// 非ブロッキングに設定
int flags = fcntl(clientFd, F_GETFL, 0);
fcntl(clientFd, F_SETFL, flags | O_NONBLOCK);
// kqueueに登録
struct kevent ev;
EV_SET(&ev, clientFd, EVFILT_READ, EV_ADD, 0, 0, NULL);
kevent(_kq, &ev, 1, NULL, 0, NULL);
}
void handleClientEvent(struct kevent& event) {
int fd = event.ident;
if (event.flags & EV_EOF) {
removeClient(fd);
return;
}
if (event.filter == EVFILT_READ) {
handleRead(fd);
} else if (event.filter == EVFILT_WRITE) {
handleWrite(fd);
}
}
void removeClient(int fd) {
// kqueueはclose時に自動削除
close(fd);
}
void setWriteInterest(int fd) {
struct kevent ev;
EV_SET(&ev, fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, NULL);
kevent(_kq, &ev, 1, NULL, 0, NULL);
}
};
5.3 kqueueの特徴
kqueueの利点:
+ 様々なイベントを統一的に扱える
+ タイマーもkqueueで管理可能
+ ファイル変更監視も可能(EVFILT_VNODE)
+ 効率的(O(1)でイベント登録)
epollとの比較:
- APIが異なる
- EV_ONESHOTの挙動が便利
- udataでユーザーデータを直接格納可能
---
6. クロスプラットフォーム対応
6.1 抽象化レイヤー
// I/O多重化の抽象インターフェース
class IMultiplexer {
public:
virtual ~IMultiplexer() {}
virtual void addFd(int fd, int events) = 0;
virtual void modFd(int fd, int events) = 0;
virtual void delFd(int fd) = 0;
virtual int wait(std::vector<Event>& events, int timeout) = 0;
};
struct Event {
int fd;
bool readable;
bool writable;
bool error;
};
// プラットフォーム別実装
#ifdef __linux__
class EpollMultiplexer : public IMultiplexer { /* ... */ };
typedef EpollMultiplexer Multiplexer;
#elif defined(__APPLE__) || defined(__FreeBSD__)
class KqueueMultiplexer : public IMultiplexer { /* ... */ };
typedef KqueueMultiplexer Multiplexer;
#else
class PollMultiplexer : public IMultiplexer { /* ... */ };
typedef PollMultiplexer Multiplexer;
#endif
6.2 42課題での選択
webservでの推奨:
1. macOS: kqueue または poll
2. Linux: epoll または poll
3. 移植性重視: poll
選択基準:
- 評価環境を確認
- pollは両方で動作
- 性能要件を考慮
---
7. タイムアウト処理
7.1 接続タイムアウト
#include <ctime>
struct ClientState {
int fd;
time_t lastActivity;
std::string buffer;
// ...
};
class TimeoutHandler {
private:
std::map<int, ClientState> _clients;
static const int TIMEOUT_SECONDS = 60;
public:
void updateActivity(int fd) {
_clients[fd].lastActivity = time(NULL);
}
void checkTimeouts() {
time_t now = time(NULL);
for (auto it = _clients.begin(); it != _clients.end(); ) {
if (now - it->second.lastActivity > TIMEOUT_SECONDS) {
std::cout << "Client " << it->first << " timed out" << std::endl;
close(it->first);
it = _clients.erase(it);
} else {
++it;
}
}
}
};
7.2 Keep-Aliveタイムアウト
// HTTP/1.1のKeep-Alive
struct KeepAliveConfig {
int timeout; // 秒
int maxRequests; // 最大リクエスト数
};
class Connection {
int _requestCount;
time_t _lastRequest;
KeepAliveConfig _config;
public:
bool shouldClose() const {
if (_requestCount >= _config.maxRequests) {
return true;
}
if (time(NULL) - _lastRequest > _config.timeout) {
return true;
}
return false;
}
};
---
8. イベントループの設計
8.1 基本的なイベントループ
class EventLoop {
private:
Multiplexer _multiplexer;
std::map<int, ClientState> _clients;
bool _running;
public:
void run() {
_running = true;
while (_running) {
std::vector<Event> events;
int timeout = calculateNextTimeout();
int ready = _multiplexer.wait(events, timeout);
if (ready < 0 && errno != EINTR) {
break;
}
// イベント処理
for (const Event& ev : events) {
handleEvent(ev);
}
// タイムアウト処理
checkTimeouts();
// 保留中の書き込み処理
processPendingWrites();
}
}
void stop() {
_running = false;
}
};
8.2 Reactorパターン
Reactorパターン:
+------------------+
| Event Loop |
+------------------+
|
+---------+
| Wait | ← 多重化API
+---------+
|
+---------+
| Dispatch|
+---------+
/ | \
Handler Handler Handler
(Accept)(Read) (Write)
// ハンドラーインターフェース
class IEventHandler {
public:
virtual ~IEventHandler() {}
virtual void handleRead() = 0;
virtual void handleWrite() = 0;
virtual int getFd() const = 0;
};
// アクセプターハンドラー
class AcceptHandler : public IEventHandler {
int _serverFd;
std::function<void(int)> _onAccept;
public:
void handleRead() override {
int clientFd = accept(_serverFd, NULL, NULL);
if (clientFd >= 0) {
_onAccept(clientFd);
}
}
};
// クライアントハンドラー
class ClientHandler : public IEventHandler {
int _clientFd;
std::string _readBuffer;
std::string _writeBuffer;
public:
void handleRead() override {
char buf[4096];
ssize_t n = recv(_clientFd, buf, sizeof(buf), 0);
if (n > 0) {
_readBuffer.append(buf, n);
processRequest();
}
}
void handleWrite() override {
if (_writeBuffer.empty()) return;
ssize_t n = send(_clientFd, _writeBuffer.data(),
_writeBuffer.size(), 0);
if (n > 0) {
_writeBuffer.erase(0, n);
}
}
};
---
まとめ
本章で学んだこと:
- I/O多重化の必要性: ブロッキングI/Oの問題
- select: 古典的だが移植性が高い
- poll: selectの改良版
- epoll: Linux専用、高性能
- kqueue: BSD/macOS専用、高性能
- クロスプラットフォーム: 抽象化レイヤー
- タイムアウト: 接続管理
- Reactorパターン: イベント駆動設計
次章では、レスポンス生成とCGIを学びます。