第4章: パイプとプロセス間通信理論 - UNIXパイプラインの計算機科学的基盤

4.1 パイプの歴史的起源

Doug McIlroyとパイプの発明

UNIXパイプの概念は、1964年にDoug McIlroy(Bell Labs)が社内メモで最初に提案しました。しかし、実際の実装は1973年のUNIX Version 3まで待つことになります。

McIlroyの1964年メモより(抜粋):

"We should have some ways of coupling programs like garden
hose -- screw in another segment when it becomes necessary
to massage data in another way."

「プログラムを庭のホースのように連結する方法が必要だ。
データを別の方法で処理する必要が生じたら、
別のセグメントをねじ込めばよい。」

この比喩は非常に重要です:

庭のホース(Garden Hose)の比喩:

┌─────────┐   ┌─────────┐   ┌─────────┐   ┌─────────┐
│ 蛇口     │───│ ホース1  │───│ スプリンクラー │───│ 植物    │
│ (水源)   │   │ (輸送)   │   │ (変換)   │   │ (出力)  │
└─────────┘   └─────────┘   └─────────┘   └─────────┘
     ↓              ↓              ↓              ↓
 ファイル       cat         grep         wc
 (入力源)      (通過)       (フィルタ)     (集計)

Ken Thompsonの実装(1973)

Ken Thompsonは、McIlroyのアイデアを一晩で実装したと言われています。Thompson自身は後にこう振り返っています:

「McIlroyがパイプのアイデアを何年も推進していたが、
誰も実装しなかった。ある晩、私はシェルとOSに
パイプを追加した。翌朝にはみんなが使っていた。」

- Ken Thompson, UNIX Oral History

パイプの実装は、UNIXの設計哲学を具現化するものでした:

UNIX哲学の3原則とパイプの関係:

1. 一つのことをうまくやる(Do One Thing Well)
   ├─ 各コマンドは単一機能に特化
   └─ 複雑なタスクはパイプで連結して解決

2. テキストストリームを共通インターフェースに
   ├─ すべてのプログラムがテキストを入出力
   └─ パイプで任意のプログラムを接続可能

3. 小さなプログラムを組み合わせる
   ├─ 巨大な一体型プログラムを避ける
   └─ パイプラインとして構築

4.2 プロセス間通信(IPC)の理論的基盤

IPCの分類体系

オペレーティングシステム理論において、プロセス間通信(Inter-Process Communication, IPC)は以下のように分類されます:

IPC機構の分類(Silberschatz, Galvin & Gagne, 2018):

┌────────────────────────────────────────────────────────────┐
│                    IPC(プロセス間通信)                     │
├──────────────────────┬─────────────────────────────────────┤
│   メッセージパッシング │          共有メモリ                  │
│  (Message Passing)   │     (Shared Memory)                 │
├──────────────────────┼─────────────────────────────────────┤
│ ・パイプ(Pipes)     │ ・mmap                              │
│ ・名前付きパイプ(FIFO) │ ・shmget/shmat                      │
│ ・ソケット            │ ・POSIX共有メモリ                    │
│ ・メッセージキュー     │                                    │
└──────────────────────┴─────────────────────────────────────┘

パイプの位置づけ

パイプは「メッセージパッシング」に属し、以下の特性を持ちます:

パイプの理論的特性:

1. 単方向性(Unidirectional)
   ├─ データは一方向にのみ流れる
   └─ 双方向通信には2本のパイプが必要

2. FIFOセマンティクス(First-In, First-Out)
   ├─ キュー構造
   └─ 書き込まれた順序で読み出される

3. バイトストリーム(Byte Stream)
   ├─ メッセージ境界なし
   └─ 連続したバイト列として扱う

4. 有限バッファ(Bounded Buffer)
   ├─ カーネル内の固定サイズバッファ
   └─ 満杯時は書き込みがブロック

生産者・消費者問題(Producer-Consumer Problem)

パイプは、計算機科学の古典的問題「生産者・消費者問題」を解決するメカニズムです。

Dijkstra(1965)による生産者・消費者問題の定式化:

┌────────────────┐    ┌──────────────┐    ┌────────────────┐
│   Producer     │───>│   Buffer     │───>│   Consumer     │
│   (生産者)      │    │ (バッファ)    │    │   (消費者)      │
└────────────────┘    └──────────────┘    └────────────────┘
        │                    │                    │
        ↓                    ↓                    ↓
   データ生成          一時保存場所          データ消費

制約条件:
1. バッファが空の時、消費者は待機
2. バッファが満杯の時、生産者は待機
3. 同時アクセスの排他制御

パイプはこれをカーネルレベルで実装します:

/* 生産者・消費者としてのパイプ */

// Producer(生産者プロセス)
void producer(int write_fd)
{
    char data[1024];

    while (generate_data(data))
    {
        // バッファが満杯なら自動的にブロック
        write(write_fd, data, sizeof(data));
    }
    close(write_fd);
}

// Consumer(消費者プロセス)
void consumer(int read_fd)
{
    char data[1024];
    ssize_t bytes;

    // バッファが空ならブロック、EOFなら0を返す
    while ((bytes = read(read_fd, data, sizeof(data))) > 0)
    {
        process_data(data, bytes);
    }
    close(read_fd);
}

4.3 パイプバッファの計算機科学

循環バッファ(Circular Buffer)

パイプの内部実装は、循環バッファ(Ring Buffer)として知られるデータ構造を使用します:

循環バッファの構造(Maurice Bach, 1986):

         head(読み込み位置)
           ↓
┌───┬───┬───┬───┬───┬───┬───┬───┐
│ D │ E │ F │   │   │   │ A │ B │  ← バッファ配列
└───┴───┴───┴───┴───┴───┴───┴───┘
           ↑                   ↑
         tail              論理的な開始点
     (書き込み位置)

データ配置(論理的な順序):
A → B → ... → D → E → F

物理的にはラップアラウンド:
head = (head + 1) % BUFFER_SIZE
tail = (tail + 1) % BUFFER_SIZE

循環バッファの利点:

1. 固定メモリ使用量
   ├─ 動的メモリ確保不要
   └─ メモリリーク防止

2. O(1)の操作
   ├─ 読み込み: O(1)
   └─ 書き込み: O(1)

3. 連続データの効率的処理
   ├─ メモリコピー最小化
   └─ キャッシュ効率

PIPE_BUFと原子性保証

POSIXは、パイプ書き込みの原子性について重要な保証を定義しています:

POSIX.1-2017 原子性保証:

┌────────────────────────────────────────────────────────────┐
│ PIPE_BUF以下のwrite()は原子的(atomic)に実行される        │
│                                                            │
│ 具体的には:                                               │
│ - 複数プロセスが同時に書き込んでも                         │
│ - PIPE_BUF以下のデータはインターリーブしない               │
└────────────────────────────────────────────────────────────┘

システム別PIPE_BUF値:
┌────────────┬────────────────┐
│ システム    │ PIPE_BUF       │
├────────────┼────────────────┤
│ POSIX最小  │ 512 bytes      │
│ Linux      │ 4,096 bytes    │
│ macOS      │ 512 bytes      │
│ FreeBSD    │ 512 bytes      │
└────────────┴────────────────┘

パイプバッファ容量(別物):
┌────────────┬────────────────┐
│ Linux      │ 65,536 bytes   │
│ macOS      │ 16,384 bytes   │
└────────────┴────────────────┘

原子性の重要性:

/* PIPE_BUFより小さい書き込みは安全 */
#include <limits.h>

void safe_atomic_write(int fd, const char *msg)
{
    size_t len = strlen(msg);

    if (len <= PIPE_BUF)
    {
        // 原子的に書き込まれる
        // 他のプロセスの書き込みとインターリーブしない
        write(fd, msg, len);
    }
    else
    {
        // PIPE_BUFより大きい場合
        // 他の書き込みとインターリーブする可能性あり
        // アプリケーション側でロックが必要
        write(fd, msg, len);
    }
}

4.4 pipe()システムコールの詳細

システムコールインターフェース

#include <unistd.h>

int pipe(int pipefd[2]);

パラメータ

  • pipefd[2]: 作成されたパイプのファイルディスクリプタを格納する配列

戻り値

  • 成功: 0
  • 失敗: -1(errnoにエラーコード)
  • pipefd配列の意味

    pipefd[0]: 読み込み側(Read End)
    pipefd[1]: 書き込み側(Write End)
    
    覚え方:
    - 0 = stdin(入力)に対応 → 読み込み
    - 1 = stdout(出力)に対応 → 書き込み
    

    カーネル内部での動作

    pipe()システムコール実行時のカーネル動作:
    
    ユーザ空間        カーネル空間
        │
        │  pipe(pipefd)
        ↓
    ┌─────────────────────────────────────────────────────────┐
    │ 1. パイプiノードを作成                                   │
    │    └─ 特殊なファイルシステム(pipefs)内                 │
    │                                                         │
    │ 2. パイプバッファを割り当て                              │
    │    └─ 通常16ページ(64KB、Linux)                        │
    │                                                         │
    │ 3. 2つのファイル構造体を作成                             │
    │    ├─ 読み込み用: O_RDONLY                              │
    │    └─ 書き込み用: O_WRONLY                              │
    │                                                         │
    │ 4. ファイルディスクリプタを割り当て                      │
    │    ├─ 最小の未使用FDをpipefd[0]に                       │
    │    └─ 次の最小FDをpipefd[1]に                           │
    └─────────────────────────────────────────────────────────┘
        │
        ↓
    プロセスのFDテーブル:
    ┌────┬─────────────────────┐
    │ FD │ 参照先               │
    ├────┼─────────────────────┤
    │ 0  │ stdin               │
    │ 1  │ stdout              │
    │ 2  │ stderr              │
    │ 3  │ pipe read end  ←─── pipefd[0]
    │ 4  │ pipe write end ←─── pipefd[1]
    └────┴─────────────────────┘
    

    基本的な使用例

    #include <unistd.h>
    #include <stdio.h>
    #include <string.h>
    
    int main(void)
    {
        int     pipefd[2];
        char    write_msg[] = "Hello through pipe!";
        char    read_msg[100];
        ssize_t bytes;
    
        /* パイプを作成 */
        if (pipe(pipefd) == -1)
        {
            perror("pipe");
            return (1);
        }
    
        printf("パイプ作成成功:\n");
        printf("  読み込み側 FD: %d\n", pipefd[0]);
        printf("  書き込み側 FD: %d\n", pipefd[1]);
    
        /* パイプに書き込む */
        write(pipefd[1], write_msg, strlen(write_msg) + 1);
        printf("書き込み完了: \"%s\"\n", write_msg);
    
        /* パイプから読み込む */
        bytes = read(pipefd[0], read_msg, sizeof(read_msg));
        printf("読み込み完了: \"%s\" (%zd bytes)\n", read_msg, bytes);
    
        /* パイプを閉じる */
        close(pipefd[0]);
        close(pipefd[1]);
    
        return (0);
    }
    

    4.5 fork()とパイプの組み合わせ

    パイプ継承の原理

    fork()時、子プロセスは親プロセスのファイルディスクリプタテーブルのコピーを継承します:

    fork()前後のファイルディスクリプタ状態:
    
    【pipe()実行直後、fork()前】
    
    親プロセス
    ┌────────────────┐
    │ FDテーブル      │
    │ 3 → pipe[read] │─────┐
    │ 4 → pipe[write]│─────┼──→ 同じパイプを参照
    └────────────────┘     │
    
    【fork()実行後】
    
    親プロセス                      子プロセス
    ┌────────────────┐            ┌────────────────┐
    │ 3 → pipe[read] │──┐    ┌───│ 3 → pipe[read] │
    │ 4 → pipe[write]│──┼────┼───│ 4 → pipe[write]│
    └────────────────┘  │    │   └────────────────┘
                        ↓    ↓
                  ┌───────────────┐
                  │  Pipe Buffer  │
                  │   (カーネル)   │
                  └───────────────┘
    
    参照カウント:
    - pipe[read]:  2 (親と子)
    - pipe[write]: 2 (親と子)
    

    不要なファイルディスクリプタを閉じる理由

    パイプを使ったIPC成功の鍵は、不要なFDを閉じることです:

    EOFの検出メカニズム:
    
    【問題のあるケース:書き込み側を閉じない】
    
    親プロセス                      子プロセス
    ┌────────────────┐            ┌────────────────┐
    │ 4 → pipe[write]│───┐    ┌───│ 4 → pipe[write]│(閉じてない!)
    └────────────────┘   │    │   └────────────────┘
                         ↓    ↓
                   ┌───────────────┐
                   │  Pipe Buffer  │
                   │  (空でも...)   │
                   └───────────────┘
                         ↑
                   ┌────────────────┐
                   │ 3 → pipe[read] │
                   └────────────────┘
                       子プロセス
    
    → 子プロセスのread()は永遠にブロック!
      (まだ書き込み側が開いているので、EOFにならない)
    
    
    【正しいケース:書き込み側を閉じる】
    
    親プロセス                      子プロセス
    ┌────────────────┐            ┌────────────────┐
    │ 4 → pipe[write]│───┐        │ pipe[write]閉じた│
    └────────────────┘   ↓        └────────────────┘
                   ┌───────────────┐
                   │  Pipe Buffer  │
                   │  (データなし)  │
                   └───────────────┘
                         ↑
                   ┌────────────────┐
                   │ 3 → pipe[read] │
                   └────────────────┘
                       子プロセス
    
    → 親が書き込み終了後close()すると、
      子のread()はEOF(0)を返す
    

    正しいパイプ使用パターン

    #include <unistd.h>
    #include <stdio.h>
    #include <string.h>
    #include <sys/wait.h>
    #include <stdlib.h>
    
    int main(void)
    {
        int     pipefd[2];
        pid_t   pid;
        char    write_msg[] = "Message from parent";
        char    read_msg[100];
    
        /* 重要: パイプはfork()の前に作成 */
        if (pipe(pipefd) == -1)
        {
            perror("pipe");
            return (1);
        }
    
        pid = fork();
        if (pid == -1)
        {
            perror("fork");
            return (1);
        }
    
        if (pid == 0)
        {
            /* 子プロセス: 読み込み側 */
            close(pipefd[1]);  /* 書き込み側を閉じる(重要!) */
    
            ssize_t bytes = read(pipefd[0], read_msg, sizeof(read_msg));
            printf("子プロセス: 受信 \"%s\" (%zd bytes)\n", read_msg, bytes);
    
            close(pipefd[0]);
            exit(0);
        }
        else
        {
            /* 親プロセス: 書き込み側 */
            close(pipefd[0]);  /* 読み込み側を閉じる(重要!) */
    
            write(pipefd[1], write_msg, strlen(write_msg) + 1);
            printf("親プロセス: 送信 \"%s\"\n", write_msg);
    
            close(pipefd[1]);
            wait(NULL);
        }
    
        return (0);
    }
    

    4.6 パイプのブロッキング動作

    read()のブロッキング条件

    read()の動作フローチャート:
    
               read(fd, buf, n)
                      │
                      ↓
             ┌───────────────┐
             │バッファにデータ│
             │    あり?      │
             └───────┬───────┘
                  No │ Yes
                     │  │
        ┌────────────┤  └──→ データを読み込み
        ↓            │       bytesを返す
    ┌───────────────┐│
    │書き込み側が   ││
    │全て閉じた?   ││
    └───────┬───────┘│
         Yes│     No │
            │        │
            ↓        ↓
        return 0  プロセスを
        (EOF)     スリープ
                  データ到着で
                  ウェイクアップ
    

    write()のブロッキング条件

    write()の動作フローチャート:
    
               write(fd, buf, n)
                      │
                      ↓
             ┌───────────────┐
             │読み込み側が   │
             │全て閉じた?    │
             └───────┬───────┘
                  Yes│     No
                     │      │
                     ↓      ↓
                SIGPIPE  ┌───────────────┐
                送信     │バッファに空き │
                return -1│   あり?       │
                errno =  └───────┬───────┘
                EPIPE         Yes│     No
                                 │      │
                                 ↓      ↓
                            データを   プロセスを
                            書き込み   スリープ
                            bytesを    空きができたら
                            返す       ウェイクアップ
    

    ブロッキングの実例

    #include <unistd.h>
    #include <stdio.h>
    #include <string.h>
    #include <sys/wait.h>
    #include <stdlib.h>
    
    int main(void)
    {
        int     pipefd[2];
        pid_t   pid;
        char    data[4096];  /* 4KB単位 */
    
        memset(data, 'A', sizeof(data));
        pipe(pipefd);
        pid = fork();
    
        if (pid == 0)
        {
            /* 子プロセス: 大量書き込み */
            close(pipefd[0]);
    
            for (int i = 0; i < 100; i++)
            {
                printf("子: 書き込み試行 %d\n", i);
                write(pipefd[1], data, sizeof(data));
                printf("子: 書き込み完了 %d\n", i);
                /* バッファが満杯になると、ここでブロック */
            }
    
            close(pipefd[1]);
            exit(0);
        }
        else
        {
            /* 親プロセス: ゆっくり読み込み */
            close(pipefd[1]);
    
            sleep(3);  /* 3秒待機してからゆっくり読む */
    
            for (int i = 0; i < 100; i++)
            {
                printf("親: 読み込み %d\n", i);
                read(pipefd[0], data, sizeof(data));
                usleep(100000);  /* 0.1秒待機 */
            }
    
            close(pipefd[0]);
            wait(NULL);
        }
    
        return (0);
    }
    

    4.7 SIGPIPE(Broken Pipe)シグナル

    SIGPIPEの発生条件

    SIGPIPEの発生メカニズム:
    
    時刻 T0: 正常な状態
    ┌─────────┐         ┌─────────┐
    │ Writer  │───────→│ Reader  │
    │ (親)    │  PIPE  │ (子)    │
    │ fd[1]開 │         │ fd[0]開 │
    └─────────┘         └─────────┘
    
    時刻 T1: 読み込み側がclose()
    ┌─────────┐         ┌─────────┐
    │ Writer  │───────→│ Reader  │
    │ (親)    │  PIPE  │ (子)    │
    │ fd[1]開 │         │ fd[0]閉 │ ←
    └─────────┘         └─────────┘
    
    時刻 T2: 書き込みを試みる
    ┌─────────┐
    │ Writer  │
    │ write() │───────→ ✕ 誰も読む人がいない
    └─────────┘
         │
         ↓
      SIGPIPE送信
      (デフォルト動作: プロセス終了)
    

    SIGPIPEの処理方法

    #include <unistd.h>
    #include <stdio.h>
    #include <signal.h>
    #include <errno.h>
    #include <sys/wait.h>
    #include <stdlib.h>
    
    /* 方法1: シグナルハンドラ */
    void sigpipe_handler(int sig)
    {
        (void)sig;
        /* ログ出力など */
        fprintf(stderr, "SIGPIPE caught: broken pipe\n");
    }
    
    /* 方法2: シグナルを無視 */
    void ignore_sigpipe(void)
    {
        signal(SIGPIPE, SIG_IGN);
        /* 以後、write()はエラーを返すがプロセスは終了しない */
    }
    
    int main(void)
    {
        int     pipefd[2];
        pid_t   pid;
        char    data[] = "test";
    
        /* SIGPIPEを無視する設定 */
        ignore_sigpipe();
    
        pipe(pipefd);
        pid = fork();
    
        if (pid == 0)
        {
            /* 子: 読み込み側を即座に閉じて終了 */
            close(pipefd[0]);
            close(pipefd[1]);
            exit(0);
        }
        else
        {
            close(pipefd[0]);
            wait(NULL);  /* 子の終了を待つ */
    
            /* この時点で読み込み側は全て閉じている */
            ssize_t result = write(pipefd[1], data, sizeof(data));
    
            if (result == -1)
            {
                if (errno == EPIPE)
                    fprintf(stderr, "Error: Broken pipe (EPIPE)\n");
                else
                    perror("write");
            }
    
            close(pipefd[1]);
        }
    
        return (0);
    }
    

    4.8 Pipexにおけるパイプ実装

    基本アーキテクチャ

    Pipexは、シェルの< infile cmd1 | cmd2 > outfileと等価な動作を実現します:

    ./pipex infile "cmd1" "cmd2" outfile
    
    データフロー図:
    
    ┌──────────────┐                              ┌──────────────┐
    │   infile     │                              │   outfile    │
    │  (入力)      │                              │   (出力)     │
    └──────┬───────┘                              └──────▲───────┘
           │                                             │
           │ open(O_RDONLY)                    open(O_WRONLY|O_CREAT|O_TRUNC)
           ↓                                             │
    ┌──────────────┐   pipe()   ┌──────────────┐        │
    │ 子プロセス1  │───────────→│ 子プロセス2  │────────┘
    │ (cmd1実行)   │            │ (cmd2実行)   │
    │              │            │              │
    │ stdin=infile │            │ stdin=pipe[0]│
    │ stdout=pipe[1]            │ stdout=outfile
    └──────────────┘            └──────────────┘
    

    パイプ実装のコード

    #include "pipex.h"
    
    /* pipex.h */
    typedef struct s_pipex
    {
        int     infile;
        int     outfile;
        int     pipe_fd[2];
        char    **envp;
    }   t_pipex;
    
    /* PATH環境変数からコマンドを検索 */
    char    *find_command(char *cmd, char **envp)
    {
        char    **paths;
        char    *path_env;
        char    *cmd_path;
        char    *temp;
        int     i;
    
        /* PATH環境変数を取得 */
        path_env = NULL;
        i = 0;
        while (envp[i])
        {
            if (ft_strncmp(envp[i], "PATH=", 5) == 0)
            {
                path_env = envp[i] + 5;
                break;
            }
            i++;
        }
    
        if (!path_env)
            return (NULL);
    
        /* パスを分割して各ディレクトリを検索 */
        paths = ft_split(path_env, ':');
        i = 0;
        while (paths[i])
        {
            temp = ft_strjoin(paths[i], "/");
            cmd_path = ft_strjoin(temp, cmd);
            free(temp);
    
            if (access(cmd_path, X_OK) == 0)
            {
                free_array(paths);
                return (cmd_path);
            }
    
            free(cmd_path);
            i++;
        }
    
        free_array(paths);
        return (NULL);
    }
    
    /* コマンドを実行 */
    void    execute_cmd(char *cmd, char **envp)
    {
        char    **cmd_args;
        char    *cmd_path;
    
        cmd_args = ft_split(cmd, ' ');
        if (!cmd_args || !cmd_args[0])
        {
            ft_putstr_fd("Error: empty command\n", 2);
            exit(127);
        }
    
        /* 絶対パスまたは相対パスの場合 */
        if (ft_strchr(cmd_args[0], '/'))
            cmd_path = ft_strdup(cmd_args[0]);
        else
            cmd_path = find_command(cmd_args[0], envp);
    
        if (!cmd_path)
        {
            ft_putstr_fd("Command not found: ", 2);
            ft_putendl_fd(cmd_args[0], 2);
            free_array(cmd_args);
            exit(127);
        }
    
        if (execve(cmd_path, cmd_args, envp) == -1)
        {
            perror("execve");
            free(cmd_path);
            free_array(cmd_args);
            exit(126);
        }
    }
    
    /* 第1子プロセス: infile → pipe */
    void    first_child(t_pipex *data, char *cmd)
    {
        /* 入力ファイル → stdin */
        if (dup2(data->infile, STDIN_FILENO) == -1)
        {
            perror("dup2");
            exit(1);
        }
    
        /* stdout → パイプ書き込み端 */
        if (dup2(data->pipe_fd[1], STDOUT_FILENO) == -1)
        {
            perror("dup2");
            exit(1);
        }
    
        /* 不要なFDを全て閉じる */
        close(data->pipe_fd[0]);
        close(data->pipe_fd[1]);
        close(data->infile);
        close(data->outfile);
    
        /* コマンド実行 */
        execute_cmd(cmd, data->envp);
    }
    
    /* 第2子プロセス: pipe → outfile */
    void    second_child(t_pipex *data, char *cmd)
    {
        /* パイプ読み込み端 → stdin */
        if (dup2(data->pipe_fd[0], STDIN_FILENO) == -1)
        {
            perror("dup2");
            exit(1);
        }
    
        /* stdout → 出力ファイル */
        if (dup2(data->outfile, STDOUT_FILENO) == -1)
        {
            perror("dup2");
            exit(1);
        }
    
        /* 不要なFDを全て閉じる */
        close(data->pipe_fd[0]);
        close(data->pipe_fd[1]);
        close(data->infile);
        close(data->outfile);
    
        /* コマンド実行 */
        execute_cmd(cmd, data->envp);
    }
    
    int main(int argc, char **argv, char **envp)
    {
        t_pipex data;
        pid_t   pid1;
        pid_t   pid2;
        int     status;
    
        if (argc != 5)
        {
            ft_putstr_fd("Usage: ./pipex infile cmd1 cmd2 outfile\n", 2);
            return (1);
        }
    
        data.envp = envp;
    
        /* ファイルを開く */
        data.infile = open(argv[1], O_RDONLY);
        if (data.infile == -1)
        {
            perror(argv[1]);
            return (1);
        }
    
        data.outfile = open(argv[4], O_WRONLY | O_CREAT | O_TRUNC, 0644);
        if (data.outfile == -1)
        {
            perror(argv[4]);
            close(data.infile);
            return (1);
        }
    
        /* パイプを作成(fork前に!) */
        if (pipe(data.pipe_fd) == -1)
        {
            perror("pipe");
            close(data.infile);
            close(data.outfile);
            return (1);
        }
    
        /* 第1子プロセスを作成 */
        pid1 = fork();
        if (pid1 == -1)
        {
            perror("fork");
            return (1);
        }
        if (pid1 == 0)
            first_child(&data, argv[2]);
    
        /* 第2子プロセスを作成 */
        pid2 = fork();
        if (pid2 == -1)
        {
            perror("fork");
            return (1);
        }
        if (pid2 == 0)
            second_child(&data, argv[3]);
    
        /* 親プロセス: 全FDを閉じる */
        close(data.pipe_fd[0]);
        close(data.pipe_fd[1]);
        close(data.infile);
        close(data.outfile);
    
        /* 両方の子プロセスを待つ */
        waitpid(pid1, &status, 0);
        waitpid(pid2, &status, 0);
    
        return (WEXITSTATUS(status));
    }
    

    FD状態の推移

    【1. 初期状態:ファイルとパイプを開く】
    
    親プロセス
    ┌────────────────────────────┐
    │ FDテーブル                 │
    │ 0 → stdin                 │
    │ 1 → stdout                │
    │ 2 → stderr                │
    │ 3 → infile  (O_RDONLY)    │
    │ 4 → outfile (O_WRONLY)    │
    │ 5 → pipe[0] (read)        │
    │ 6 → pipe[1] (write)       │
    └────────────────────────────┘
    
    【2. fork()後:pid1(第1子)を作成】
    
    親プロセス                     子プロセス1
    ┌────────────────────┐        ┌────────────────────┐
    │ 0 → stdin          │        │ 0 → stdin          │
    │ 1 → stdout         │        │ 1 → stdout         │
    │ 2 → stderr         │        │ 2 → stderr         │
    │ 3 → infile         │        │ 3 → infile         │
    │ 4 → outfile        │        │ 4 → outfile        │
    │ 5 → pipe[0]        │        │ 5 → pipe[0]        │
    │ 6 → pipe[1]        │        │ 6 → pipe[1]        │
    └────────────────────┘        └────────────────────┘
    
    【3. 子プロセス1のリダイレクト後】
    
    子プロセス1(first_child()実行後)
    ┌────────────────────┐
    │ 0 → infile         │  ← dup2(infile, STDIN)
    │ 1 → pipe[1]        │  ← dup2(pipe[1], STDOUT)
    │ 2 → stderr         │
    │ 3 → (閉じた)        │
    │ 4 → (閉じた)        │
    │ 5 → (閉じた)        │
    │ 6 → (閉じた)        │
    └────────────────────┘
    
    → execve()で cmd1 を実行
      cmd1の stdin = infile
      cmd1の stdout = パイプ
    

    4.9 複数パイプの実装(ボーナス)

    N個のコマンドをパイプ接続

    3コマンド以上のパイプライン:
    
    ./pipex infile "cmd1" "cmd2" "cmd3" outfile
    
    ┌────────┐   ┌──────┐   ┌──────┐   ┌──────┐   ┌─────────┐
    │ infile │──→│ cmd1 │──→│ cmd2 │──→│ cmd3 │──→│ outfile │
    └────────┘   └──────┘   └──────┘   └──────┘   └─────────┘
                   │           │           │
               pipe[0]     pipe[1]     (outfile)
    
    必要なパイプ数: コマンド数 - 1
    

    マルチパイプ実装

    typedef struct s_multi_pipe
    {
        int     **pipes;      /* パイプの配列 */
        int     cmd_count;    /* コマンド数 */
        char    **cmds;       /* コマンド配列 */
        char    **envp;
        int     infile;
        int     outfile;
    }   t_multi_pipe;
    
    /* パイプを必要な数だけ作成 */
    int create_pipes(t_multi_pipe *data)
    {
        int i;
    
        /* (コマンド数 - 1)個のパイプが必要 */
        data->pipes = malloc(sizeof(int *) * (data->cmd_count - 1));
        if (!data->pipes)
            return (-1);
    
        for (i = 0; i < data->cmd_count - 1; i++)
        {
            data->pipes[i] = malloc(sizeof(int) * 2);
            if (!data->pipes[i])
                return (-1);
    
            if (pipe(data->pipes[i]) == -1)
            {
                perror("pipe");
                return (-1);
            }
        }
    
        return (0);
    }
    
    /* 子プロセスのリダイレクト設定 */
    void    setup_child_redirection(t_multi_pipe *data, int index)
    {
        /* 最初のコマンド: infile → cmd → pipe[0] */
        if (index == 0)
        {
            dup2(data->infile, STDIN_FILENO);
            dup2(data->pipes[0][1], STDOUT_FILENO);
        }
        /* 最後のコマンド: pipe[n-2] → cmd → outfile */
        else if (index == data->cmd_count - 1)
        {
            dup2(data->pipes[index - 1][0], STDIN_FILENO);
            dup2(data->outfile, STDOUT_FILENO);
        }
        /* 中間のコマンド: pipe[i-1] → cmd → pipe[i] */
        else
        {
            dup2(data->pipes[index - 1][0], STDIN_FILENO);
            dup2(data->pipes[index][1], STDOUT_FILENO);
        }
    }
    
    /* 全てのパイプを閉じる */
    void    close_all_pipes(t_multi_pipe *data)
    {
        int i;
    
        for (i = 0; i < data->cmd_count - 1; i++)
        {
            close(data->pipes[i][0]);
            close(data->pipes[i][1]);
        }
    }
    
    /* パイプラインを実行 */
    void    execute_pipeline(t_multi_pipe *data)
    {
        pid_t   *pids;
        int     i;
        int     status;
    
        pids = malloc(sizeof(pid_t) * data->cmd_count);
    
        /* 各コマンドに対してプロセスを作成 */
        for (i = 0; i < data->cmd_count; i++)
        {
            pids[i] = fork();
    
            if (pids[i] == 0)
            {
                /* 子プロセス */
                setup_child_redirection(data, i);
                close_all_pipes(data);
                close(data->infile);
                close(data->outfile);
    
                execute_cmd(data->cmds[i], data->envp);
            }
        }
    
        /* 親プロセス: 全てのパイプとファイルを閉じる */
        close_all_pipes(data);
        close(data->infile);
        close(data->outfile);
    
        /* 全ての子プロセスを待つ */
        for (i = 0; i < data->cmd_count; i++)
            waitpid(pids[i], &status, 0);
    
        free(pids);
    }
    

    マルチパイプのデータフロー

    5コマンドの例:
    ./pipex in "cat" "grep a" "grep b" "sort" "uniq" out
    
            pipe[0]    pipe[1]    pipe[2]    pipe[3]
               │          │          │          │
    ┌────┐   ┌────┐   ┌────┐   ┌────┐   ┌────┐   ┌─────┐
    │ in │──→│cat │──→│grep│──→│grep│──→│sort│──→│uniq │──→ out
    └────┘   └────┘   └────┘   └────┘   └────┘   └─────┘
               ↑          ↑          ↑          ↑          ↑
             cmd[0]    cmd[1]    cmd[2]    cmd[3]    cmd[4]
    
    リダイレクション:
    cmd[0]: stdin=in,       stdout=pipe[0][1]
    cmd[1]: stdin=pipe[0][0], stdout=pipe[1][1]
    cmd[2]: stdin=pipe[1][0], stdout=pipe[2][1]
    cmd[3]: stdin=pipe[2][0], stdout=pipe[3][1]
    cmd[4]: stdin=pipe[3][0], stdout=out
    

    4.10 まとめ

    学習した理論

  • パイプの歴史
- Doug McIlroyの1964年構想 - Ken Thompsonの1973年実装 - UNIX哲学との関係

  • IPC理論
- プロセス間通信の分類 - 生産者・消費者問題 - パイプの位置づけ

  • バッファ理論
- 循環バッファの仕組み - PIPE_BUFと原子性保証 - ブロッキング動作

実装のポイント

  • pipe()はfork()の前に
  • 不要なFDは必ず閉じる
  • EOFは全書き込み側close()で発生
  • SIGPIPEの適切な処理

次章では、here-docとボーナス機能について学びます。