第5章: パイプとプロセス間通信の理論

5.1 パイプの歴史と理論

Douglas McIlroyとUNIXパイプ

1964年、Bell研究所のDouglas McIlroyは、プログラムを接続するメカニズムについてのアイデアを提案した。彼のメモには以下のように記されている:

> "We should have some ways of connecting programs like garden hose — screw in another segment when it becomes when it becomes necessary to massage data in another way." > (プログラムを庭のホースのように接続する方法が必要だ — データを別の方法で処理する必要があるときに、別のセグメントをねじ込めばよい)

このアイデアは、Ken ThompsonとDennis Ritchieによって1973年にUNIX Version 3で実装された。パイプ記号「|」は、McIlroyの提案に基づいてThompsonが一晩で実装したと言われている。

UNIX哲学とパイプ

McIlroyは後に「UNIX哲学」として知られる原則を定式化した:

  • 単一責任: 各プログラムは一つのことをうまくやる
  • 協調: プログラムは協調して動作するよう設計する
  • テキストストリーム: テキストを普遍的なインターフェースとして使用する

UNIX哲学の体現:
ls | grep "\.c$" | wc -l

ls    → ファイル一覧を出力(単一責任)
grep  → パターンに一致する行を抽出(単一責任)
wc    → 行数をカウント(単一責任)
|     → プログラム間の協調を実現

この設計思想は、ソフトウェア工学におけるモジュール性(Modularity)関心の分離(Separation of Concerns)の先駆けとなった。

プロセス間通信(IPC)の理論

プロセス間通信(Inter-Process Communication, IPC)は、独立したプロセス間でデータを交換するメカニズムの総称である。

IPCの分類(Tanenbaum, "Modern Operating Systems"):

IPC機構の分類:

1. 共有メモリ(Shared Memory)
   - 最も高速
   - 同期が必要
   - 例: POSIX shared memory, mmap

2. メッセージパッシング(Message Passing)
   - プロセス分離を維持
   - カーネルを介する
   - 例: パイプ、メッセージキュー、ソケット

3. シグナル(Signals)
   - 非同期通知
   - 限定的な情報量
   - 例: SIGINT, SIGTERM

パイプはメッセージパッシングの一形態であり、単方向のバイトストリームを提供する。

生産者-消費者問題

パイプは、Dijkstraが1965年に定式化した生産者-消費者問題(Producer-Consumer Problem)の解決策の一つである。

問題の定義:

  • 生産者(Producer): データを生成するプロセス
  • 消費者(Consumer): データを消費するプロセス
  • バッファ: 両者間の有限容量の記憶領域

生産者-消費者モデル:

[Producer] → [Buffer] → [Consumer]
     ↓          ↓           ↓
   write()   カーネル     read()
             バッファ

制約:
1. バッファが満杯なら、生産者は待機
2. バッファが空なら、消費者は待機
3. バッファへのアクセスは相互排他的

パイプのカーネル実装は、これらの制約を自動的に処理する。

5.2 パイプのシステムコール

pipe()システムコール

#include <unistd.h>

int pipe(int pipefd[2]);

pipe()は、単方向のデータチャネルを作成する。

引数:

  • pipefd[0]: 読み取り端(read end)のファイルディスクリプタ
  • pipefd[1]: 書き込み端(write end)のファイルディスクリプタ

戻り値:

  • 成功: 0
  • 失敗: -1(errnoにエラーコード)

パイプの内部構造

カーネル内では、パイプは循環バッファ(Circular Buffer)として実装されている:

パイプバッファの構造(Linux):

struct pipe_inode_info {
    wait_queue_head_t wait;     /* 待機キュー */
    unsigned int nrbufs;         /* 使用中のバッファ数 */
    unsigned int curbuf;         /* 現在のバッファインデックス */
    struct pipe_buffer *bufs;    /* バッファ配列 */
    unsigned int readers;        /* 読み取りプロセス数 */
    unsigned int writers;        /* 書き込みプロセス数 */
    unsigned int files;          /* 参照カウント */
    unsigned int waiting_writers;/* 待機中の書き込み数 */
    ...
};

バッファサイズ:

  • Linux: 65536バイト(16ページ、カーネル2.6.11以降)
  • macOS: 16384バイト
  • POSIX最小値: 512バイト(PIPE_BUF)
  • パイプの基本動作

    #include <unistd.h>
    #include <stdio.h>
    #include <string.h>
    
    void demonstrate_pipe(void)
    {
        int     pipefd[2];
        char    buffer[128];
        ssize_t bytes_read;
    
        /* パイプを作成 */
        if (pipe(pipefd) == -1)
        {
            perror("pipe");
            return;
        }
    
        /* 書き込み端にデータを書く */
        const char *message = "Hello through pipe";
        write(pipefd[1], message, strlen(message));
    
        /* 書き込み端を閉じる(重要!) */
        close(pipefd[1]);
    
        /* 読み取り端からデータを読む */
        bytes_read = read(pipefd[0], buffer, sizeof(buffer) - 1);
        buffer[bytes_read] = '\0';
    
        printf("Received: %s\n", buffer);
    
        /* 読み取り端を閉じる */
        close(pipefd[0]);
    }
    

    5.3 パイプとfork()の連携

    親子プロセス間のパイプ

    パイプはfork()と組み合わせることで、親子プロセス間の通信を実現する:

    void parent_child_pipe(void)
    {
        int     pipefd[2];
        pid_t   pid;
        char    buffer[128];
    
        if (pipe(pipefd) == -1)
        {
            perror("pipe");
            return;
        }
    
        pid = fork();
        if (pid == -1)
        {
            perror("fork");
            return;
        }
        else if (pid == 0)
        {
            /* 子プロセス: 書き込み */
            close(pipefd[0]);  /* 読み取り端を閉じる */
    
            const char *msg = "Message from child";
            write(pipefd[1], msg, strlen(msg));
    
            close(pipefd[1]);
            exit(0);
        }
        else
        {
            /* 親プロセス: 読み取り */
            close(pipefd[1]);  /* 書き込み端を閉じる */
    
            ssize_t n = read(pipefd[0], buffer, sizeof(buffer) - 1);
            buffer[n] = '\0';
    
            printf("Parent received: %s\n", buffer);
    
            close(pipefd[0]);
            wait(NULL);
        }
    }
    

    ファイルディスクリプタの継承

    fork()後、子プロセスは親のファイルディスクリプタテーブルのコピーを持つ:

    fork()前:
    親プロセス
    ┌─────────────────────────────────┐
    │ fd 0 → stdin                   │
    │ fd 1 → stdout                  │
    │ fd 2 → stderr                  │
    │ fd 3 → pipefd[0] (読み取り端)   │
    │ fd 4 → pipefd[1] (書き込み端)   │
    └─────────────────────────────────┘
    
    fork()後:
    親プロセス                         子プロセス
    ┌─────────────────────────────────┐   ┌─────────────────────────────────┐
    │ fd 3 → pipefd[0] ─────────────────→│ fd 3 → pipefd[0]               │
    │ fd 4 → pipefd[1] ─────────────────→│ fd 4 → pipefd[1]               │
    └─────────────────────────────────┘   └─────────────────────────────────┘
                        │                                   │
                        └─────────────┬─────────────────────┘
                                      ↓
                             カーネルパイプバッファ
    

    不要なファイルディスクリプタの閉鎖

    パイプの正常な動作のために、不要なファイルディスクリプタを閉じることが重要:

    /*
     * なぜ閉じる必要があるのか:
     *
     * 1. read()がEOFを検出するため
     *    - 全ての書き込み端が閉じられるとread()は0を返す
     *    - 書き込み端が開いたままだとread()はブロックし続ける
     *
     * 2. write()がSIGPIPEを受け取るため
     *    - 全ての読み取り端が閉じられるとSIGPIPEが送られる
     *    - これにより「壊れたパイプ」エラーを検出できる
     *
     * 3. リソースの解放
     *    - ファイルディスクリプタは有限リソース
     *    - 閉じないとリークする
     */
    

    5.4 標準入出力のリダイレクション

    dup2()によるリダイレクション

    dup2()は、ファイルディスクリプタを複製し、特定の番号に割り当てる:

    #include <unistd.h>
    
    int dup2(int oldfd, int newfd);
    

    パイプと標準入出力を接続:

    void redirect_stdout_to_pipe(int pipefd[2])
    {
        /* 標準出力をパイプの書き込み端に接続 */
        close(pipefd[0]);               /* 読み取り端は不要 */
        dup2(pipefd[1], STDOUT_FILENO); /* stdoutをパイプに */
        close(pipefd[1]);               /* 元のfdは不要 */
    }
    
    void redirect_stdin_from_pipe(int pipefd[2])
    {
        /* 標準入力をパイプの読み取り端に接続 */
        close(pipefd[1]);              /* 書き込み端は不要 */
        dup2(pipefd[0], STDIN_FILENO); /* stdinをパイプに */
        close(pipefd[0]);              /* 元のfdは不要 */
    }
    

    パイプラインコマンドの実装

    ls | grep test の実装例:

    int execute_simple_pipeline(char **cmd1, char **cmd2, t_shell *shell)
    {
        int     pipefd[2];
        pid_t   pid1;
        pid_t   pid2;
        int     status;
    
        if (pipe(pipefd) == -1)
            return (1);
    
        /* 最初のコマンド(ls) */
        pid1 = fork();
        if (pid1 == 0)
        {
            close(pipefd[0]);
            dup2(pipefd[1], STDOUT_FILENO);
            close(pipefd[1]);
    
            execve_command(cmd1, shell);
            exit(127);
        }
    
        /* 2番目のコマンド(grep) */
        pid2 = fork();
        if (pid2 == 0)
        {
            close(pipefd[1]);
            dup2(pipefd[0], STDIN_FILENO);
            close(pipefd[0]);
    
            execve_command(cmd2, shell);
            exit(127);
        }
    
        /* 親プロセス: 両端を閉じて待機 */
        close(pipefd[0]);
        close(pipefd[1]);
    
        waitpid(pid1, NULL, 0);
        waitpid(pid2, &status, 0);
    
        return (WEXITSTATUS(status));
    }
    

    5.5 複数パイプの実装

    パイプライン理論

    n個のコマンドからなるパイプラインは、n-1個のパイプを必要とする:

    cmd1 | cmd2 | cmd3 | cmd4
    
        pipe[0]    pipe[1]    pipe[2]
    [cmd1] → [cmd2] → [cmd3] → [cmd4]
       ↓        ↓        ↓        ↓
     fork     fork     fork     fork
    

    データ構造

    typedef struct s_pipeline
    {
        int     cmd_count;      /* コマンド数 */
        int     **pipes;        /* パイプ配列 [cmd_count-1][2] */
        pid_t   *pids;          /* プロセスID配列 [cmd_count] */
        t_cmd   **commands;     /* コマンド配列 */
    }   t_pipeline;
    

    パイプの作成

    int **create_pipes(int count)
    {
        int **pipes;
        int i;
    
        pipes = malloc(sizeof(int *) * count);
        if (!pipes)
            return (NULL);
    
        i = 0;
        while (i < count)
        {
            pipes[i] = malloc(sizeof(int) * 2);
            if (!pipes[i] || pipe(pipes[i]) == -1)
            {
                /* エラー処理: 作成済みパイプを閉じる */
                while (--i >= 0)
                {
                    close(pipes[i][0]);
                    close(pipes[i][1]);
                    free(pipes[i]);
                }
                free(pipes);
                return (NULL);
            }
            i++;
        }
    
        return (pipes);
    }
    

    子プロセスでのパイプ設定

    void setup_child_pipes(int **pipes, int pipe_count, int index)
    {
        int i;
    
        /* 前のコマンドからの入力 */
        if (index > 0)
            dup2(pipes[index - 1][0], STDIN_FILENO);
    
        /* 次のコマンドへの出力 */
        if (index < pipe_count)
            dup2(pipes[index][1], STDOUT_FILENO);
    
        /* 全てのパイプを閉じる */
        i = 0;
        while (i < pipe_count)
        {
            close(pipes[i][0]);
            close(pipes[i][1]);
            i++;
        }
    }
    

    パイプラインの実行

    int execute_pipeline(t_cmd *pipeline, t_shell *shell)
    {
        t_list  *cmd_list;
        int     cmd_count;
        int     **pipes;
        pid_t   *pids;
        int     i;
        int     status;
    
        /* ASTをコマンドリストに変換 */
        cmd_list = flatten_pipeline(pipeline);
        cmd_count = ft_lstsize(cmd_list);
    
        /* パイプを作成 */
        pipes = create_pipes(cmd_count - 1);
        if (!pipes)
            return (1);
    
        /* PID配列を確保 */
        pids = malloc(sizeof(pid_t) * cmd_count);
        if (!pids)
        {
            free_pipes(pipes, cmd_count - 1);
            return (1);
        }
    
        /* 各コマンドをfork */
        i = 0;
        while (i < cmd_count)
        {
            pids[i] = fork();
            if (pids[i] == -1)
            {
                cleanup_pipeline(pipes, pids, cmd_count, i);
                return (1);
            }
            else if (pids[i] == 0)
            {
                /* 子プロセス */
                setup_child_pipes(pipes, cmd_count - 1, i);
                execute_child(get_cmd_at(cmd_list, i), shell);
                exit(1);
            }
            i++;
        }
    
        /* 親プロセス: 全パイプを閉じる */
        close_all_pipes(pipes, cmd_count - 1);
    
        /* 全子プロセスを待機 */
        i = 0;
        while (i < cmd_count)
        {
            waitpid(pids[i], &status, 0);
            i++;
        }
    
        /* クリーンアップ */
        free_pipes(pipes, cmd_count - 1);
        free(pids);
        ft_lstclear(&cmd_list, NULL);
    
        /* 最後のコマンドの終了ステータスを返す */
        return (WEXITSTATUS(status));
    }
    

    5.6 パイプとリダイレクションの組み合わせ

    リダイレクションの優先順位

    POSIX規格では、リダイレクションはパイプの後に処理される:

    # この場合:
    cat < input.txt | grep pattern > output.txt
    
    # 処理順序:
    # 1. パイプを設定(catの出力 → grepの入力)
    # 2. catに入力リダイレクション(input.txt)
    # 3. grepに出力リダイレクション(output.txt)
    

    実装

    void execute_child(t_cmd *cmd, t_shell *shell)
    {
        /* シグナルをデフォルトに戻す */
        signal(SIGINT, SIG_DFL);
        signal(SIGQUIT, SIG_DFL);
    
        /* リダイレクションを設定(パイプの後) */
        if (setup_redirections(cmd->redirects) != 0)
            exit(1);
    
        /* ビルトインか外部コマンドを実行 */
        if (is_builtin(cmd->argv[0]))
            exit(execute_builtin(cmd, shell));
    
        /* 外部コマンドを実行 */
        char *path = find_command(cmd->argv[0], shell->env);
        if (!path)
        {
            print_error("command not found", cmd->argv[0]);
            exit(127);
        }
    
        char **envp = env_to_array(shell->env);
        execve(path, cmd->argv, envp);
    
        /* execveが失敗した場合 */
        perror("execve");
        exit(126);
    }
    

    5.7 ヒアドキュメントとパイプ

    ヒアドキュメントの実装

    ヒアドキュメント(<<)は、パイプを使って実装される:

    int setup_heredoc(char *delimiter, int expand, t_shell *shell)
    {
        int     pipefd[2];
        char    *line;
        char    *expanded;
    
        if (pipe(pipefd) == -1)
            return (-1);
    
        while (1)
        {
            line = readline("> ");
    
            /* EOF (Ctrl+D) */
            if (!line)
            {
                print_heredoc_warning(delimiter);
                break;
            }
    
            /* デリミタと一致 */
            if (ft_strcmp(line, delimiter) == 0)
            {
                free(line);
                break;
            }
    
            /* 変数展開(デリミタがクォートされていない場合) */
            if (expand)
                expanded = expand_variables(line, shell);
            else
                expanded = ft_strdup(line);
    
            /* パイプに書き込み */
            write(pipefd[1], expanded, ft_strlen(expanded));
            write(pipefd[1], "\n", 1);
    
            free(line);
            free(expanded);
        }
    
        close(pipefd[1]);  /* 書き込み端を閉じる */
    
        return (pipefd[0]); /* 読み取り端を返す */
    }
    

    ヒアドキュメントの使用

    int apply_heredoc_redirect(t_redirect *redir, t_shell *shell)
    {
        int     heredoc_fd;
        int     expand;
    
        /* デリミタがクォートされているか確認 */
        expand = !is_quoted(redir->delimiter);
    
        /* ヒアドキュメントを作成 */
        heredoc_fd = setup_heredoc(redir->delimiter, expand, shell);
        if (heredoc_fd == -1)
            return (-1);
    
        /* 標準入力にリダイレクト */
        if (dup2(heredoc_fd, STDIN_FILENO) == -1)
        {
            close(heredoc_fd);
            return (-1);
        }
    
        close(heredoc_fd);
        return (0);
    }
    

    5.8 パイプラインの終了ステータス

    POSIX規定

    POSIXでは、パイプラインの終了ステータスは最後のコマンドのものとなる:

    false | true
    echo $?  # 0(trueの終了ステータス)
    
    true | false
    echo $?  # 1(falseの終了ステータス)
    

    pipefailオプション(bash拡張)

    bashにはpipefailオプションがあり、パイプライン内のいずれかのコマンドが失敗した場合に非ゼロを返す:

    set -o pipefail
    false | true
    echo $?  # 1(falseが失敗したため)
    

    minishellではPOSIXに準拠し、最後のコマンドの終了ステータスを返す。

    実装

    int wait_for_pipeline(pid_t *pids, int count)
    {
        int i;
        int status;
        int last_status;
    
        last_status = 0;
    
        i = 0;
        while (i < count)
        {
            if (waitpid(pids[i], &status, 0) == -1)
            {
                perror("waitpid");
            }
    
            /* 最後のコマンドのステータスを保存 */
            if (i == count - 1)
            {
                if (WIFEXITED(status))
                    last_status = WEXITSTATUS(status);
                else if (WIFSIGNALED(status))
                    last_status = 128 + WTERMSIG(status);
            }
    
            i++;
        }
    
        return (last_status);
    }
    

    5.9 パイプのエッジケース

    壊れたパイプ(Broken Pipe)

    読み取り端が閉じられた状態で書き込むと、SIGPIPEシグナルが送られる:

    void handle_sigpipe(int sig)
    {
        (void)sig;
        /* 通常は何もしない、またはログを出力 */
    }
    
    void setup_sigpipe_handler(void)
    {
        struct sigaction sa;
    
        sigemptyset(&sa.sa_mask);
        sa.sa_flags = 0;
        sa.sa_handler = SIG_IGN;  /* または handle_sigpipe */
        sigaction(SIGPIPE, &sa, NULL);
    }
    

    パイプバッファの満杯

    書き込みがバッファサイズを超えると、write()はブロックする:

    /*
     * 解決策:
     * 1. 子プロセスで読み取りを確実に行う
     * 2. 適切なタイミングでパイプを閉じる
     * 3. 非ブロッキングI/Oを使用(複雑)
     */
    

    パイプ内のビルトインコマンド

    ビルトインコマンドがパイプ内にある場合、子プロセスで実行する必要がある:

    int execute_in_pipeline(t_cmd *cmd, t_shell *shell)
    {
        pid_t   pid;
        int     status;
    
        pid = fork();
        if (pid == 0)
        {
            /* 子プロセス */
            if (is_builtin(cmd->argv[0]))
                exit(execute_builtin(cmd, shell));
    
            execute_external(cmd, shell);
            exit(1);
        }
    
        waitpid(pid, &status, 0);
        return (WEXITSTATUS(status));
    }
    

    5.10 デバッグとトレース

    パイプラインのデバッグ

    #ifdef DEBUG
    void trace_pipeline_execution(t_cmd *cmd, int index, int pipe_in, int pipe_out)
    {
        fprintf(stderr, "[DEBUG] Command %d: ", index);
    
        int i = 0;
        while (cmd->argv && cmd->argv[i])
        {
            fprintf(stderr, "%s ", cmd->argv[i]);
            i++;
        }
    
        fprintf(stderr, "\n");
        fprintf(stderr, "  stdin  fd: %d\n", pipe_in);
        fprintf(stderr, "  stdout fd: %d\n", pipe_out);
    }
    
    void trace_fd_table(void)
    {
        int fd;
        struct stat st;
    
        fprintf(stderr, "[DEBUG] File descriptor table:\n");
        for (fd = 0; fd < 20; fd++)
        {
            if (fstat(fd, &st) == 0)
                fprintf(stderr, "  fd %d: open\n", fd);
        }
    }
    #endif
    

    ファイルディスクリプタリークの検出

    void check_fd_leaks(void)
    {
        int fd;
        int count = 0;
    
        /* fd 3以降をチェック(0-2は標準入出力) */
        for (fd = 3; fd < 1024; fd++)
        {
            if (fcntl(fd, F_GETFD) != -1)
                count++;
        }
    
        if (count > 0)
            fprintf(stderr, "Warning: %d file descriptors leaked\n", count);
    }
    

    5.11 まとめ

    本章では、パイプとプロセス間通信について学んだ:

  • 歴史: McIlroyのパイプ概念とUNIX哲学
  • IPC理論: プロセス間通信のメカニズム
  • pipe(): パイプの作成と基本操作
  • dup2(): 標準入出力のリダイレクション
  • 複数パイプ: パイプライン実行の実装
  • ヒアドキュメント: パイプを使った実装
  • 終了ステータス: POSIX規定の動作
  • エッジケース: 壊れたパイプ、バッファ満杯
  • 次章では、ビルトインコマンドとシグナル処理について学ぶ。シェルの内部コマンドの必要性と実装方法を解説する。

    ---

    参考文献

  • McIlroy, M. D. (1964). "Internal Bell Labs memo on pipe concept"
  • Ritchie, D. M., & Thompson, K. (1974). "The UNIX Time-Sharing System", Communications of the ACM, 17(7)
  • Dijkstra, E. W. (1965). "Cooperating Sequential Processes", Technical Report EWD-123
  • IEEE (2017). "POSIX.1-2017: Shell & Utilities", IEEE Std 1003.1-2017
  • Tanenbaum, A. S. (2014). "Modern Operating Systems", 4th Edition, Pearson
  • Stevens, W. R., & Rago, S. A. (2013). "Advanced Programming in the UNIX Environment", 3rd Edition, Addison-Wesley
  • Kernighan, B. W., & Pike, R. (1984). "The UNIX Programming Environment", Prentice Hall