第4章:I/O多重化

はじめに

複数のクライアントを同時に処理するには、I/O多重化(I/O Multiplexing)が必要です。本章では、select、poll、epoll、kqueueを学びます。

---

1. なぜI/O多重化が必要か

1.1 ブロッキングI/Oの問題

ブロッキングモデル:
Client 1: ─────────────────────────────────→
          [recv] ← ブロック中

Client 2: ───────×───────────────────────→
                 ↑
                 待機(処理されない)

Client 3: ───────────×───────────────────→
                     ↑
                     待機(処理されない)

問題: 1クライアントの処理中、他を処理できない

1.2 解決策の比較

解決策1: マルチプロセス
+ シンプル
- リソース消費大
- プロセス間通信が複雑

解決策2: マルチスレッド
+ 共有メモリ
- 同期が難しい
- デッドロックのリスク

解決策3: I/O多重化 ← webservで使用
+ 単一スレッド
+ 効率的
+ シンプルな状態管理
- コードが複雑になりやすい

1.3 C10K問題

1999年、Dan Kegel「The C10K Problem」:
- 10,000同時接続をどう処理するか
- select/pollの限界
- epoll/kqueueの誕生のきっかけ

現代:
- C10M問題(1,000万接続)
- io_uring (Linux 5.1+)
- DPDK(カーネルバイパス)

---

2. select()

2.1 概要

#include <sys/select.h>

int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *exceptfds, struct timeval *timeout);

// nfds: 監視するfdの最大値 + 1
// readfds: 読み込み可能を監視
// writefds: 書き込み可能を監視
// exceptfds: 例外を監視
// timeout: タイムアウト(NULLで無限)

// 戻り値
// > 0: 準備完了のfd数
// = 0: タイムアウト
// < 0: エラー

2.2 fd_setの操作

fd_set fds;

FD_ZERO(&fds);      // 初期化(すべてクリア)
FD_SET(fd, &fds);   // fdを追加
FD_CLR(fd, &fds);   // fdを削除
FD_ISSET(fd, &fds); // fdが含まれているか

2.3 selectを使ったサーバー

#include <sys/select.h>
#include <vector>
#include <algorithm>

class SelectServer {
private:
    int _serverFd;
    std::vector<int> _clients;
    fd_set _masterSet;
    int _maxFd;

public:
    void run() {
        FD_ZERO(&_masterSet);
        FD_SET(_serverFd, &_masterSet);
        _maxFd = _serverFd;

        while (true) {
            fd_set readSet = _masterSet;  // コピー(selectが変更するため)

            // タイムアウト設定(60秒)
            struct timeval timeout;
            timeout.tv_sec = 60;
            timeout.tv_usec = 0;

            int ready = select(_maxFd + 1, &readSet, NULL, NULL, &timeout);

            if (ready < 0) {
                if (errno == EINTR) continue;  // シグナルで中断
                perror("select");
                break;
            }

            if (ready == 0) {
                std::cout << "Timeout, no activity" << std::endl;
                continue;
            }

            // 新規接続をチェック
            if (FD_ISSET(_serverFd, &readSet)) {
                acceptNewClient();
            }

            // 既存クライアントをチェック
            for (int fd : _clients) {
                if (FD_ISSET(fd, &readSet)) {
                    handleClient(fd);
                }
            }
        }
    }

    void acceptNewClient() {
        int clientFd = accept(_serverFd, NULL, NULL);
        if (clientFd < 0) {
            perror("accept");
            return;
        }

        // 非ブロッキングに設定
        int flags = fcntl(clientFd, F_GETFL, 0);
        fcntl(clientFd, F_SETFL, flags | O_NONBLOCK);

        FD_SET(clientFd, &_masterSet);
        _clients.push_back(clientFd);
        _maxFd = std::max(_maxFd, clientFd);

        std::cout << "New client: " << clientFd << std::endl;
    }

    void removeClient(int fd) {
        close(fd);
        FD_CLR(fd, &_masterSet);
        _clients.erase(std::remove(_clients.begin(), _clients.end(), fd),
                       _clients.end());

        // maxFdの再計算
        _maxFd = _serverFd;
        for (int client : _clients) {
            _maxFd = std::max(_maxFd, client);
        }
    }
};

2.4 selectの限界

selectの問題:
1. FD_SETSIZE制限(通常1024)
2. 毎回fd_setをコピーする必要
3. O(n)でfdをスキャン
4. fd_setはビットマスク(非効率)

適用範囲:
- 少数の接続(〜数百)
- 移植性が重要な場合
- macOS/BSDで使用可能

---

3. poll()

3.1 概要

#include <poll.h>

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

struct pollfd {
    int   fd;       // ファイルディスクリプタ
    short events;   // 監視するイベント
    short revents;  // 発生したイベント
};

// events/reventsのフラグ
POLLIN   // 読み込み可能
POLLOUT  // 書き込み可能
POLLERR  // エラー
POLLHUP  // 接続切断
POLLNVAL // 無効なfd

3.2 pollを使ったサーバー

#include <poll.h>
#include <vector>

class PollServer {
private:
    int _serverFd;
    std::vector<struct pollfd> _pollfds;

public:
    void run() {
        // サーバーソケットを追加
        struct pollfd serverPfd;
        serverPfd.fd = _serverFd;
        serverPfd.events = POLLIN;
        serverPfd.revents = 0;
        _pollfds.push_back(serverPfd);

        while (true) {
            int ready = poll(_pollfds.data(), _pollfds.size(), 60000);

            if (ready < 0) {
                if (errno == EINTR) continue;
                perror("poll");
                break;
            }

            if (ready == 0) {
                std::cout << "Timeout" << std::endl;
                continue;
            }

            // 発生したイベントを処理
            for (size_t i = 0; i < _pollfds.size(); ++i) {
                if (_pollfds[i].revents == 0) continue;

                if (_pollfds[i].fd == _serverFd) {
                    // 新規接続
                    if (_pollfds[i].revents & POLLIN) {
                        acceptNewClient();
                    }
                } else {
                    // クライアント処理
                    handleClientEvents(i);
                }
            }
        }
    }

    void acceptNewClient() {
        int clientFd = accept(_serverFd, NULL, NULL);
        if (clientFd < 0) {
            perror("accept");
            return;
        }

        // 非ブロッキングに設定
        int flags = fcntl(clientFd, F_GETFL, 0);
        fcntl(clientFd, F_SETFL, flags | O_NONBLOCK);

        struct pollfd pfd;
        pfd.fd = clientFd;
        pfd.events = POLLIN;
        pfd.revents = 0;
        _pollfds.push_back(pfd);
    }

    void handleClientEvents(size_t index) {
        struct pollfd& pfd = _pollfds[index];

        if (pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
            // エラーまたは切断
            removeClient(index);
            return;
        }

        if (pfd.revents & POLLIN) {
            // 読み込み可能
            if (!handleRead(pfd.fd)) {
                removeClient(index);
                return;
            }
        }

        if (pfd.revents & POLLOUT) {
            // 書き込み可能
            if (!handleWrite(pfd.fd)) {
                removeClient(index);
                return;
            }
        }
    }

    void removeClient(size_t index) {
        close(_pollfds[index].fd);
        _pollfds.erase(_pollfds.begin() + index);
    }

    // 書き込み待ちを設定
    void setWriteInterest(int fd) {
        for (auto& pfd : _pollfds) {
            if (pfd.fd == fd) {
                pfd.events |= POLLOUT;
                break;
            }
        }
    }

    // 書き込み待ちを解除
    void clearWriteInterest(int fd) {
        for (auto& pfd : _pollfds) {
            if (pfd.fd == fd) {
                pfd.events &= ~POLLOUT;
                break;
            }
        }
    }
};

3.3 pollの特徴

selectより改善:
+ FD_SETSIZE制限なし
+ イベントと結果が分離
+ より明確なAPI

依然として残る問題:
- 毎回全fdをカーネルにコピー
- O(n)でスキャン
- 大量接続では遅い

---

4. epoll(Linux専用)

4.1 概要

#include <sys/epoll.h>

// epollインスタンス作成
int epoll_create1(int flags);

// イベント登録
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// op: EPOLL_CTL_ADD, EPOLL_CTL_MOD, EPOLL_CTL_DEL

// イベント待機
int epoll_wait(int epfd, struct epoll_event *events,
               int maxevents, int timeout);

struct epoll_event {
    uint32_t events;  // EPOLLIN, EPOLLOUT, etc.
    epoll_data_t data;
};

union epoll_data_t {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
};

4.2 epollを使ったサーバー

#include <sys/epoll.h>

class EpollServer {
private:
    int _serverFd;
    int _epollFd;
    static const int MAX_EVENTS = 64;

public:
    void init() {
        _epollFd = epoll_create1(0);
        if (_epollFd < 0) {
            throw std::runtime_error("epoll_create1 failed");
        }

        struct epoll_event ev;
        ev.events = EPOLLIN;
        ev.data.fd = _serverFd;
        epoll_ctl(_epollFd, EPOLL_CTL_ADD, _serverFd, &ev);
    }

    void run() {
        struct epoll_event events[MAX_EVENTS];

        while (true) {
            int nready = epoll_wait(_epollFd, events, MAX_EVENTS, 60000);

            if (nready < 0) {
                if (errno == EINTR) continue;
                perror("epoll_wait");
                break;
            }

            if (nready == 0) {
                std::cout << "Timeout" << std::endl;
                continue;
            }

            // 準備完了のfdだけを処理(O(ready))
            for (int i = 0; i < nready; ++i) {
                if (events[i].data.fd == _serverFd) {
                    acceptNewClient();
                } else {
                    handleClientEvent(events[i]);
                }
            }
        }
    }

    void acceptNewClient() {
        int clientFd = accept(_serverFd, NULL, NULL);
        if (clientFd < 0) {
            perror("accept");
            return;
        }

        // 非ブロッキングに設定
        int flags = fcntl(clientFd, F_GETFL, 0);
        fcntl(clientFd, F_SETFL, flags | O_NONBLOCK);

        // epollに登録
        struct epoll_event ev;
        ev.events = EPOLLIN | EPOLLET;  // エッジトリガー
        ev.data.fd = clientFd;
        epoll_ctl(_epollFd, EPOLL_CTL_ADD, clientFd, &ev);
    }

    void handleClientEvent(struct epoll_event& event) {
        int fd = event.data.fd;

        if (event.events & (EPOLLERR | EPOLLHUP)) {
            removeClient(fd);
            return;
        }

        if (event.events & EPOLLIN) {
            handleRead(fd);
        }

        if (event.events & EPOLLOUT) {
            handleWrite(fd);
        }
    }

    void removeClient(int fd) {
        epoll_ctl(_epollFd, EPOLL_CTL_DEL, fd, NULL);
        close(fd);
    }

    void setWriteInterest(int fd) {
        struct epoll_event ev;
        ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
        ev.data.fd = fd;
        epoll_ctl(_epollFd, EPOLL_CTL_MOD, fd, &ev);
    }
};

4.3 レベルトリガー vs エッジトリガー

レベルトリガー(LT): デフォルト
- データがある限り通知し続ける
- 安全だが効率が落ちる可能性

エッジトリガー(ET): EPOLLET
- 状態が変化した時だけ通知
- 効率的だが、一度にすべて読む必要
- 読み残すと次の通知が来ない

// エッジトリガーでの読み込み
void handleReadET(int fd) {
    while (true) {
        char buffer[4096];
        ssize_t bytes = recv(fd, buffer, sizeof(buffer), 0);

        if (bytes > 0) {
            // データ処理
            processData(buffer, bytes);
        } else if (bytes == 0) {
            // 切断
            removeClient(fd);
            break;
        } else {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // すべて読み終わった
                break;
            }
            // 本当のエラー
            removeClient(fd);
            break;
        }
    }
}

---

5. kqueue(BSD/macOS)

5.1 概要

#include <sys/event.h>

// kqueueインスタンス作成
int kqueue(void);

// イベント登録・取得
int kevent(int kq, const struct kevent *changelist, int nchanges,
           struct kevent *eventlist, int nevents,
           const struct timespec *timeout);

struct kevent {
    uintptr_t ident;   // 識別子(通常はfd)
    int16_t   filter;  // フィルタータイプ
    uint16_t  flags;   // アクションフラグ
    uint32_t  fflags;  // フィルター固有フラグ
    intptr_t  data;    // フィルター固有データ
    void      *udata;  // ユーザーデータ
};

// フィルター
EVFILT_READ   // 読み込み可能
EVFILT_WRITE  // 書き込み可能
EVFILT_TIMER  // タイマー

// フラグ
EV_ADD      // イベント追加
EV_DELETE   // イベント削除
EV_ENABLE   // 有効化
EV_DISABLE  // 無効化
EV_ONESHOT  // 一度だけ通知

5.2 kqueueを使ったサーバー

#include <sys/event.h>

class KqueueServer {
private:
    int _serverFd;
    int _kq;
    static const int MAX_EVENTS = 64;

public:
    void init() {
        _kq = kqueue();
        if (_kq < 0) {
            throw std::runtime_error("kqueue failed");
        }

        struct kevent ev;
        EV_SET(&ev, _serverFd, EVFILT_READ, EV_ADD, 0, 0, NULL);
        kevent(_kq, &ev, 1, NULL, 0, NULL);
    }

    void run() {
        struct kevent events[MAX_EVENTS];

        while (true) {
            struct timespec timeout;
            timeout.tv_sec = 60;
            timeout.tv_nsec = 0;

            int nready = kevent(_kq, NULL, 0, events, MAX_EVENTS, &timeout);

            if (nready < 0) {
                if (errno == EINTR) continue;
                perror("kevent");
                break;
            }

            if (nready == 0) {
                std::cout << "Timeout" << std::endl;
                continue;
            }

            for (int i = 0; i < nready; ++i) {
                int fd = events[i].ident;

                if (fd == _serverFd) {
                    acceptNewClient();
                } else {
                    handleClientEvent(events[i]);
                }
            }
        }
    }

    void acceptNewClient() {
        int clientFd = accept(_serverFd, NULL, NULL);
        if (clientFd < 0) {
            perror("accept");
            return;
        }

        // 非ブロッキングに設定
        int flags = fcntl(clientFd, F_GETFL, 0);
        fcntl(clientFd, F_SETFL, flags | O_NONBLOCK);

        // kqueueに登録
        struct kevent ev;
        EV_SET(&ev, clientFd, EVFILT_READ, EV_ADD, 0, 0, NULL);
        kevent(_kq, &ev, 1, NULL, 0, NULL);
    }

    void handleClientEvent(struct kevent& event) {
        int fd = event.ident;

        if (event.flags & EV_EOF) {
            removeClient(fd);
            return;
        }

        if (event.filter == EVFILT_READ) {
            handleRead(fd);
        } else if (event.filter == EVFILT_WRITE) {
            handleWrite(fd);
        }
    }

    void removeClient(int fd) {
        // kqueueはclose時に自動削除
        close(fd);
    }

    void setWriteInterest(int fd) {
        struct kevent ev;
        EV_SET(&ev, fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, NULL);
        kevent(_kq, &ev, 1, NULL, 0, NULL);
    }
};

5.3 kqueueの特徴

kqueueの利点:
+ 様々なイベントを統一的に扱える
+ タイマーもkqueueで管理可能
+ ファイル変更監視も可能(EVFILT_VNODE)
+ 効率的(O(1)でイベント登録)

epollとの比較:
- APIが異なる
- EV_ONESHOTの挙動が便利
- udataでユーザーデータを直接格納可能

---

6. クロスプラットフォーム対応

6.1 抽象化レイヤー

// I/O多重化の抽象インターフェース
class IMultiplexer {
public:
    virtual ~IMultiplexer() {}
    virtual void addFd(int fd, int events) = 0;
    virtual void modFd(int fd, int events) = 0;
    virtual void delFd(int fd) = 0;
    virtual int wait(std::vector<Event>& events, int timeout) = 0;
};

struct Event {
    int fd;
    bool readable;
    bool writable;
    bool error;
};

// プラットフォーム別実装
#ifdef __linux__
class EpollMultiplexer : public IMultiplexer { /* ... */ };
typedef EpollMultiplexer Multiplexer;
#elif defined(__APPLE__) || defined(__FreeBSD__)
class KqueueMultiplexer : public IMultiplexer { /* ... */ };
typedef KqueueMultiplexer Multiplexer;
#else
class PollMultiplexer : public IMultiplexer { /* ... */ };
typedef PollMultiplexer Multiplexer;
#endif

6.2 42課題での選択

webservでの推奨:
1. macOS: kqueue または poll
2. Linux: epoll または poll
3. 移植性重視: poll

選択基準:
- 評価環境を確認
- pollは両方で動作
- 性能要件を考慮

---

7. タイムアウト処理

7.1 接続タイムアウト

#include <ctime>

struct ClientState {
    int fd;
    time_t lastActivity;
    std::string buffer;
    // ...
};

class TimeoutHandler {
private:
    std::map<int, ClientState> _clients;
    static const int TIMEOUT_SECONDS = 60;

public:
    void updateActivity(int fd) {
        _clients[fd].lastActivity = time(NULL);
    }

    void checkTimeouts() {
        time_t now = time(NULL);

        for (auto it = _clients.begin(); it != _clients.end(); ) {
            if (now - it->second.lastActivity > TIMEOUT_SECONDS) {
                std::cout << "Client " << it->first << " timed out" << std::endl;
                close(it->first);
                it = _clients.erase(it);
            } else {
                ++it;
            }
        }
    }
};

7.2 Keep-Aliveタイムアウト

// HTTP/1.1のKeep-Alive
struct KeepAliveConfig {
    int timeout;     // 秒
    int maxRequests; // 最大リクエスト数
};

class Connection {
    int _requestCount;
    time_t _lastRequest;
    KeepAliveConfig _config;

public:
    bool shouldClose() const {
        if (_requestCount >= _config.maxRequests) {
            return true;
        }

        if (time(NULL) - _lastRequest > _config.timeout) {
            return true;
        }

        return false;
    }
};

---

8. イベントループの設計

8.1 基本的なイベントループ

class EventLoop {
private:
    Multiplexer _multiplexer;
    std::map<int, ClientState> _clients;
    bool _running;

public:
    void run() {
        _running = true;

        while (_running) {
            std::vector<Event> events;
            int timeout = calculateNextTimeout();

            int ready = _multiplexer.wait(events, timeout);

            if (ready < 0 && errno != EINTR) {
                break;
            }

            // イベント処理
            for (const Event& ev : events) {
                handleEvent(ev);
            }

            // タイムアウト処理
            checkTimeouts();

            // 保留中の書き込み処理
            processPendingWrites();
        }
    }

    void stop() {
        _running = false;
    }
};

8.2 Reactorパターン

Reactorパターン:
+------------------+
|   Event Loop     |
+------------------+
         |
    +---------+
    |  Wait   | ← 多重化API
    +---------+
         |
    +---------+
    | Dispatch|
    +---------+
     /   |   \
Handler Handler Handler
(Accept)(Read) (Write)

// ハンドラーインターフェース
class IEventHandler {
public:
    virtual ~IEventHandler() {}
    virtual void handleRead() = 0;
    virtual void handleWrite() = 0;
    virtual int getFd() const = 0;
};

// アクセプターハンドラー
class AcceptHandler : public IEventHandler {
    int _serverFd;
    std::function<void(int)> _onAccept;

public:
    void handleRead() override {
        int clientFd = accept(_serverFd, NULL, NULL);
        if (clientFd >= 0) {
            _onAccept(clientFd);
        }
    }
};

// クライアントハンドラー
class ClientHandler : public IEventHandler {
    int _clientFd;
    std::string _readBuffer;
    std::string _writeBuffer;

public:
    void handleRead() override {
        char buf[4096];
        ssize_t n = recv(_clientFd, buf, sizeof(buf), 0);
        if (n > 0) {
            _readBuffer.append(buf, n);
            processRequest();
        }
    }

    void handleWrite() override {
        if (_writeBuffer.empty()) return;

        ssize_t n = send(_clientFd, _writeBuffer.data(),
                         _writeBuffer.size(), 0);
        if (n > 0) {
            _writeBuffer.erase(0, n);
        }
    }
};

---

まとめ

本章で学んだこと:

  • I/O多重化の必要性: ブロッキングI/Oの問題
  • select: 古典的だが移植性が高い
  • poll: selectの改良版
  • epoll: Linux専用、高性能
  • kqueue: BSD/macOS専用、高性能
  • クロスプラットフォーム: 抽象化レイヤー
  • タイムアウト: 接続管理
  • Reactorパターン: イベント駆動設計

次章では、レスポンス生成とCGIを学びます。