第4章: パイプとプロセス間通信理論 - UNIXパイプラインの計算機科学的基盤
4.1 パイプの歴史的起源
Doug McIlroyとパイプの発明
UNIXパイプの概念は、1964年にDoug McIlroy(Bell Labs)が社内メモで最初に提案しました。しかし、実際の実装は1973年のUNIX Version 3まで待つことになります。
McIlroyの1964年メモより(抜粋):
"We should have some ways of coupling programs like garden
hose -- screw in another segment when it becomes necessary
to massage data in another way."
「プログラムを庭のホースのように連結する方法が必要だ。
データを別の方法で処理する必要が生じたら、
別のセグメントをねじ込めばよい。」
この比喩は非常に重要です:
庭のホース(Garden Hose)の比喩:
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ 蛇口 │───│ ホース1 │───│ スプリンクラー │───│ 植物 │
│ (水源) │ │ (輸送) │ │ (変換) │ │ (出力) │
└─────────┘ └─────────┘ └─────────┘ └─────────┘
↓ ↓ ↓ ↓
ファイル cat grep wc
(入力源) (通過) (フィルタ) (集計)
Ken Thompsonの実装(1973)
Ken Thompsonは、McIlroyのアイデアを一晩で実装したと言われています。Thompson自身は後にこう振り返っています:
「McIlroyがパイプのアイデアを何年も推進していたが、
誰も実装しなかった。ある晩、私はシェルとOSに
パイプを追加した。翌朝にはみんなが使っていた。」
- Ken Thompson, UNIX Oral History
パイプの実装は、UNIXの設計哲学を具現化するものでした:
UNIX哲学の3原則とパイプの関係:
1. 一つのことをうまくやる(Do One Thing Well)
├─ 各コマンドは単一機能に特化
└─ 複雑なタスクはパイプで連結して解決
2. テキストストリームを共通インターフェースに
├─ すべてのプログラムがテキストを入出力
└─ パイプで任意のプログラムを接続可能
3. 小さなプログラムを組み合わせる
├─ 巨大な一体型プログラムを避ける
└─ パイプラインとして構築
4.2 プロセス間通信(IPC)の理論的基盤
IPCの分類体系
オペレーティングシステム理論において、プロセス間通信(Inter-Process Communication, IPC)は以下のように分類されます:
IPC機構の分類(Silberschatz, Galvin & Gagne, 2018):
┌────────────────────────────────────────────────────────────┐
│ IPC(プロセス間通信) │
├──────────────────────┬─────────────────────────────────────┤
│ メッセージパッシング │ 共有メモリ │
│ (Message Passing) │ (Shared Memory) │
├──────────────────────┼─────────────────────────────────────┤
│ ・パイプ(Pipes) │ ・mmap │
│ ・名前付きパイプ(FIFO) │ ・shmget/shmat │
│ ・ソケット │ ・POSIX共有メモリ │
│ ・メッセージキュー │ │
└──────────────────────┴─────────────────────────────────────┘
パイプの位置づけ
パイプは「メッセージパッシング」に属し、以下の特性を持ちます:
パイプの理論的特性:
1. 単方向性(Unidirectional)
├─ データは一方向にのみ流れる
└─ 双方向通信には2本のパイプが必要
2. FIFOセマンティクス(First-In, First-Out)
├─ キュー構造
└─ 書き込まれた順序で読み出される
3. バイトストリーム(Byte Stream)
├─ メッセージ境界なし
└─ 連続したバイト列として扱う
4. 有限バッファ(Bounded Buffer)
├─ カーネル内の固定サイズバッファ
└─ 満杯時は書き込みがブロック
生産者・消費者問題(Producer-Consumer Problem)
パイプは、計算機科学の古典的問題「生産者・消費者問題」を解決するメカニズムです。
Dijkstra(1965)による生産者・消費者問題の定式化:
┌────────────────┐ ┌──────────────┐ ┌────────────────┐
│ Producer │───>│ Buffer │───>│ Consumer │
│ (生産者) │ │ (バッファ) │ │ (消費者) │
└────────────────┘ └──────────────┘ └────────────────┘
│ │ │
↓ ↓ ↓
データ生成 一時保存場所 データ消費
制約条件:
1. バッファが空の時、消費者は待機
2. バッファが満杯の時、生産者は待機
3. 同時アクセスの排他制御
パイプはこれをカーネルレベルで実装します:
/* 生産者・消費者としてのパイプ */
// Producer(生産者プロセス)
void producer(int write_fd)
{
char data[1024];
while (generate_data(data))
{
// バッファが満杯なら自動的にブロック
write(write_fd, data, sizeof(data));
}
close(write_fd);
}
// Consumer(消費者プロセス)
void consumer(int read_fd)
{
char data[1024];
ssize_t bytes;
// バッファが空ならブロック、EOFなら0を返す
while ((bytes = read(read_fd, data, sizeof(data))) > 0)
{
process_data(data, bytes);
}
close(read_fd);
}
4.3 パイプバッファの計算機科学
循環バッファ(Circular Buffer)
パイプの内部実装は、循環バッファ(Ring Buffer)として知られるデータ構造を使用します:
循環バッファの構造(Maurice Bach, 1986):
head(読み込み位置)
↓
┌───┬───┬───┬───┬───┬───┬───┬───┐
│ D │ E │ F │ │ │ │ A │ B │ ← バッファ配列
└───┴───┴───┴───┴───┴───┴───┴───┘
↑ ↑
tail 論理的な開始点
(書き込み位置)
データ配置(論理的な順序):
A → B → ... → D → E → F
物理的にはラップアラウンド:
head = (head + 1) % BUFFER_SIZE
tail = (tail + 1) % BUFFER_SIZE
循環バッファの利点:
1. 固定メモリ使用量
├─ 動的メモリ確保不要
└─ メモリリーク防止
2. O(1)の操作
├─ 読み込み: O(1)
└─ 書き込み: O(1)
3. 連続データの効率的処理
├─ メモリコピー最小化
└─ キャッシュ効率
PIPE_BUFと原子性保証
POSIXは、パイプ書き込みの原子性について重要な保証を定義しています:
POSIX.1-2017 原子性保証:
┌────────────────────────────────────────────────────────────┐
│ PIPE_BUF以下のwrite()は原子的(atomic)に実行される │
│ │
│ 具体的には: │
│ - 複数プロセスが同時に書き込んでも │
│ - PIPE_BUF以下のデータはインターリーブしない │
└────────────────────────────────────────────────────────────┘
システム別PIPE_BUF値:
┌────────────┬────────────────┐
│ システム │ PIPE_BUF │
├────────────┼────────────────┤
│ POSIX最小 │ 512 bytes │
│ Linux │ 4,096 bytes │
│ macOS │ 512 bytes │
│ FreeBSD │ 512 bytes │
└────────────┴────────────────┘
パイプバッファ容量(別物):
┌────────────┬────────────────┐
│ Linux │ 65,536 bytes │
│ macOS │ 16,384 bytes │
└────────────┴────────────────┘
原子性の重要性:
/* PIPE_BUFより小さい書き込みは安全 */
#include <limits.h>
void safe_atomic_write(int fd, const char *msg)
{
size_t len = strlen(msg);
if (len <= PIPE_BUF)
{
// 原子的に書き込まれる
// 他のプロセスの書き込みとインターリーブしない
write(fd, msg, len);
}
else
{
// PIPE_BUFより大きい場合
// 他の書き込みとインターリーブする可能性あり
// アプリケーション側でロックが必要
write(fd, msg, len);
}
}
4.4 pipe()システムコールの詳細
システムコールインターフェース
#include <unistd.h>
int pipe(int pipefd[2]);
パラメータ:
pipefd[2]: 作成されたパイプのファイルディスクリプタを格納する配列
戻り値:
- 成功: 0
- 失敗: -1(errnoにエラーコード)
pipefd配列の意味:
pipefd[0]: 読み込み側(Read End)
pipefd[1]: 書き込み側(Write End)
覚え方:
- 0 = stdin(入力)に対応 → 読み込み
- 1 = stdout(出力)に対応 → 書き込み
カーネル内部での動作
pipe()システムコール実行時のカーネル動作:
ユーザ空間 カーネル空間
│
│ pipe(pipefd)
↓
┌─────────────────────────────────────────────────────────┐
│ 1. パイプiノードを作成 │
│ └─ 特殊なファイルシステム(pipefs)内 │
│ │
│ 2. パイプバッファを割り当て │
│ └─ 通常16ページ(64KB、Linux) │
│ │
│ 3. 2つのファイル構造体を作成 │
│ ├─ 読み込み用: O_RDONLY │
│ └─ 書き込み用: O_WRONLY │
│ │
│ 4. ファイルディスクリプタを割り当て │
│ ├─ 最小の未使用FDをpipefd[0]に │
│ └─ 次の最小FDをpipefd[1]に │
└─────────────────────────────────────────────────────────┘
│
↓
プロセスのFDテーブル:
┌────┬─────────────────────┐
│ FD │ 参照先 │
├────┼─────────────────────┤
│ 0 │ stdin │
│ 1 │ stdout │
│ 2 │ stderr │
│ 3 │ pipe read end ←─── pipefd[0]
│ 4 │ pipe write end ←─── pipefd[1]
└────┴─────────────────────┘
基本的な使用例
#include <unistd.h>
#include <stdio.h>
#include <string.h>
int main(void)
{
int pipefd[2];
char write_msg[] = "Hello through pipe!";
char read_msg[100];
ssize_t bytes;
/* パイプを作成 */
if (pipe(pipefd) == -1)
{
perror("pipe");
return (1);
}
printf("パイプ作成成功:\n");
printf(" 読み込み側 FD: %d\n", pipefd[0]);
printf(" 書き込み側 FD: %d\n", pipefd[1]);
/* パイプに書き込む */
write(pipefd[1], write_msg, strlen(write_msg) + 1);
printf("書き込み完了: \"%s\"\n", write_msg);
/* パイプから読み込む */
bytes = read(pipefd[0], read_msg, sizeof(read_msg));
printf("読み込み完了: \"%s\" (%zd bytes)\n", read_msg, bytes);
/* パイプを閉じる */
close(pipefd[0]);
close(pipefd[1]);
return (0);
}
4.5 fork()とパイプの組み合わせ
パイプ継承の原理
fork()時、子プロセスは親プロセスのファイルディスクリプタテーブルのコピーを継承します:
fork()前後のファイルディスクリプタ状態:
【pipe()実行直後、fork()前】
親プロセス
┌────────────────┐
│ FDテーブル │
│ 3 → pipe[read] │─────┐
│ 4 → pipe[write]│─────┼──→ 同じパイプを参照
└────────────────┘ │
【fork()実行後】
親プロセス 子プロセス
┌────────────────┐ ┌────────────────┐
│ 3 → pipe[read] │──┐ ┌───│ 3 → pipe[read] │
│ 4 → pipe[write]│──┼────┼───│ 4 → pipe[write]│
└────────────────┘ │ │ └────────────────┘
↓ ↓
┌───────────────┐
│ Pipe Buffer │
│ (カーネル) │
└───────────────┘
参照カウント:
- pipe[read]: 2 (親と子)
- pipe[write]: 2 (親と子)
不要なファイルディスクリプタを閉じる理由
パイプを使ったIPC成功の鍵は、不要なFDを閉じることです:
EOFの検出メカニズム:
【問題のあるケース:書き込み側を閉じない】
親プロセス 子プロセス
┌────────────────┐ ┌────────────────┐
│ 4 → pipe[write]│───┐ ┌───│ 4 → pipe[write]│(閉じてない!)
└────────────────┘ │ │ └────────────────┘
↓ ↓
┌───────────────┐
│ Pipe Buffer │
│ (空でも...) │
└───────────────┘
↑
┌────────────────┐
│ 3 → pipe[read] │
└────────────────┘
子プロセス
→ 子プロセスのread()は永遠にブロック!
(まだ書き込み側が開いているので、EOFにならない)
【正しいケース:書き込み側を閉じる】
親プロセス 子プロセス
┌────────────────┐ ┌────────────────┐
│ 4 → pipe[write]│───┐ │ pipe[write]閉じた│
└────────────────┘ ↓ └────────────────┘
┌───────────────┐
│ Pipe Buffer │
│ (データなし) │
└───────────────┘
↑
┌────────────────┐
│ 3 → pipe[read] │
└────────────────┘
子プロセス
→ 親が書き込み終了後close()すると、
子のread()はEOF(0)を返す
正しいパイプ使用パターン
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <sys/wait.h>
#include <stdlib.h>
int main(void)
{
int pipefd[2];
pid_t pid;
char write_msg[] = "Message from parent";
char read_msg[100];
/* 重要: パイプはfork()の前に作成 */
if (pipe(pipefd) == -1)
{
perror("pipe");
return (1);
}
pid = fork();
if (pid == -1)
{
perror("fork");
return (1);
}
if (pid == 0)
{
/* 子プロセス: 読み込み側 */
close(pipefd[1]); /* 書き込み側を閉じる(重要!) */
ssize_t bytes = read(pipefd[0], read_msg, sizeof(read_msg));
printf("子プロセス: 受信 \"%s\" (%zd bytes)\n", read_msg, bytes);
close(pipefd[0]);
exit(0);
}
else
{
/* 親プロセス: 書き込み側 */
close(pipefd[0]); /* 読み込み側を閉じる(重要!) */
write(pipefd[1], write_msg, strlen(write_msg) + 1);
printf("親プロセス: 送信 \"%s\"\n", write_msg);
close(pipefd[1]);
wait(NULL);
}
return (0);
}
4.6 パイプのブロッキング動作
read()のブロッキング条件
read()の動作フローチャート:
read(fd, buf, n)
│
↓
┌───────────────┐
│バッファにデータ│
│ あり? │
└───────┬───────┘
No │ Yes
│ │
┌────────────┤ └──→ データを読み込み
↓ │ bytesを返す
┌───────────────┐│
│書き込み側が ││
│全て閉じた? ││
└───────┬───────┘│
Yes│ No │
│ │
↓ ↓
return 0 プロセスを
(EOF) スリープ
データ到着で
ウェイクアップ
write()のブロッキング条件
write()の動作フローチャート:
write(fd, buf, n)
│
↓
┌───────────────┐
│読み込み側が │
│全て閉じた? │
└───────┬───────┘
Yes│ No
│ │
↓ ↓
SIGPIPE ┌───────────────┐
送信 │バッファに空き │
return -1│ あり? │
errno = └───────┬───────┘
EPIPE Yes│ No
│ │
↓ ↓
データを プロセスを
書き込み スリープ
bytesを 空きができたら
返す ウェイクアップ
ブロッキングの実例
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <sys/wait.h>
#include <stdlib.h>
int main(void)
{
int pipefd[2];
pid_t pid;
char data[4096]; /* 4KB単位 */
memset(data, 'A', sizeof(data));
pipe(pipefd);
pid = fork();
if (pid == 0)
{
/* 子プロセス: 大量書き込み */
close(pipefd[0]);
for (int i = 0; i < 100; i++)
{
printf("子: 書き込み試行 %d\n", i);
write(pipefd[1], data, sizeof(data));
printf("子: 書き込み完了 %d\n", i);
/* バッファが満杯になると、ここでブロック */
}
close(pipefd[1]);
exit(0);
}
else
{
/* 親プロセス: ゆっくり読み込み */
close(pipefd[1]);
sleep(3); /* 3秒待機してからゆっくり読む */
for (int i = 0; i < 100; i++)
{
printf("親: 読み込み %d\n", i);
read(pipefd[0], data, sizeof(data));
usleep(100000); /* 0.1秒待機 */
}
close(pipefd[0]);
wait(NULL);
}
return (0);
}
4.7 SIGPIPE(Broken Pipe)シグナル
SIGPIPEの発生条件
SIGPIPEの発生メカニズム:
時刻 T0: 正常な状態
┌─────────┐ ┌─────────┐
│ Writer │───────→│ Reader │
│ (親) │ PIPE │ (子) │
│ fd[1]開 │ │ fd[0]開 │
└─────────┘ └─────────┘
時刻 T1: 読み込み側がclose()
┌─────────┐ ┌─────────┐
│ Writer │───────→│ Reader │
│ (親) │ PIPE │ (子) │
│ fd[1]開 │ │ fd[0]閉 │ ←
└─────────┘ └─────────┘
時刻 T2: 書き込みを試みる
┌─────────┐
│ Writer │
│ write() │───────→ ✕ 誰も読む人がいない
└─────────┘
│
↓
SIGPIPE送信
(デフォルト動作: プロセス終了)
SIGPIPEの処理方法
#include <unistd.h>
#include <stdio.h>
#include <signal.h>
#include <errno.h>
#include <sys/wait.h>
#include <stdlib.h>
/* 方法1: シグナルハンドラ */
void sigpipe_handler(int sig)
{
(void)sig;
/* ログ出力など */
fprintf(stderr, "SIGPIPE caught: broken pipe\n");
}
/* 方法2: シグナルを無視 */
void ignore_sigpipe(void)
{
signal(SIGPIPE, SIG_IGN);
/* 以後、write()はエラーを返すがプロセスは終了しない */
}
int main(void)
{
int pipefd[2];
pid_t pid;
char data[] = "test";
/* SIGPIPEを無視する設定 */
ignore_sigpipe();
pipe(pipefd);
pid = fork();
if (pid == 0)
{
/* 子: 読み込み側を即座に閉じて終了 */
close(pipefd[0]);
close(pipefd[1]);
exit(0);
}
else
{
close(pipefd[0]);
wait(NULL); /* 子の終了を待つ */
/* この時点で読み込み側は全て閉じている */
ssize_t result = write(pipefd[1], data, sizeof(data));
if (result == -1)
{
if (errno == EPIPE)
fprintf(stderr, "Error: Broken pipe (EPIPE)\n");
else
perror("write");
}
close(pipefd[1]);
}
return (0);
}
4.8 Pipexにおけるパイプ実装
基本アーキテクチャ
Pipexは、シェルの< infile cmd1 | cmd2 > outfileと等価な動作を実現します:
./pipex infile "cmd1" "cmd2" outfile
データフロー図:
┌──────────────┐ ┌──────────────┐
│ infile │ │ outfile │
│ (入力) │ │ (出力) │
└──────┬───────┘ └──────▲───────┘
│ │
│ open(O_RDONLY) open(O_WRONLY|O_CREAT|O_TRUNC)
↓ │
┌──────────────┐ pipe() ┌──────────────┐ │
│ 子プロセス1 │───────────→│ 子プロセス2 │────────┘
│ (cmd1実行) │ │ (cmd2実行) │
│ │ │ │
│ stdin=infile │ │ stdin=pipe[0]│
│ stdout=pipe[1] │ stdout=outfile
└──────────────┘ └──────────────┘
パイプ実装のコード
#include "pipex.h"
/* pipex.h */
typedef struct s_pipex
{
int infile;
int outfile;
int pipe_fd[2];
char **envp;
} t_pipex;
/* PATH環境変数からコマンドを検索 */
char *find_command(char *cmd, char **envp)
{
char **paths;
char *path_env;
char *cmd_path;
char *temp;
int i;
/* PATH環境変数を取得 */
path_env = NULL;
i = 0;
while (envp[i])
{
if (ft_strncmp(envp[i], "PATH=", 5) == 0)
{
path_env = envp[i] + 5;
break;
}
i++;
}
if (!path_env)
return (NULL);
/* パスを分割して各ディレクトリを検索 */
paths = ft_split(path_env, ':');
i = 0;
while (paths[i])
{
temp = ft_strjoin(paths[i], "/");
cmd_path = ft_strjoin(temp, cmd);
free(temp);
if (access(cmd_path, X_OK) == 0)
{
free_array(paths);
return (cmd_path);
}
free(cmd_path);
i++;
}
free_array(paths);
return (NULL);
}
/* コマンドを実行 */
void execute_cmd(char *cmd, char **envp)
{
char **cmd_args;
char *cmd_path;
cmd_args = ft_split(cmd, ' ');
if (!cmd_args || !cmd_args[0])
{
ft_putstr_fd("Error: empty command\n", 2);
exit(127);
}
/* 絶対パスまたは相対パスの場合 */
if (ft_strchr(cmd_args[0], '/'))
cmd_path = ft_strdup(cmd_args[0]);
else
cmd_path = find_command(cmd_args[0], envp);
if (!cmd_path)
{
ft_putstr_fd("Command not found: ", 2);
ft_putendl_fd(cmd_args[0], 2);
free_array(cmd_args);
exit(127);
}
if (execve(cmd_path, cmd_args, envp) == -1)
{
perror("execve");
free(cmd_path);
free_array(cmd_args);
exit(126);
}
}
/* 第1子プロセス: infile → pipe */
void first_child(t_pipex *data, char *cmd)
{
/* 入力ファイル → stdin */
if (dup2(data->infile, STDIN_FILENO) == -1)
{
perror("dup2");
exit(1);
}
/* stdout → パイプ書き込み端 */
if (dup2(data->pipe_fd[1], STDOUT_FILENO) == -1)
{
perror("dup2");
exit(1);
}
/* 不要なFDを全て閉じる */
close(data->pipe_fd[0]);
close(data->pipe_fd[1]);
close(data->infile);
close(data->outfile);
/* コマンド実行 */
execute_cmd(cmd, data->envp);
}
/* 第2子プロセス: pipe → outfile */
void second_child(t_pipex *data, char *cmd)
{
/* パイプ読み込み端 → stdin */
if (dup2(data->pipe_fd[0], STDIN_FILENO) == -1)
{
perror("dup2");
exit(1);
}
/* stdout → 出力ファイル */
if (dup2(data->outfile, STDOUT_FILENO) == -1)
{
perror("dup2");
exit(1);
}
/* 不要なFDを全て閉じる */
close(data->pipe_fd[0]);
close(data->pipe_fd[1]);
close(data->infile);
close(data->outfile);
/* コマンド実行 */
execute_cmd(cmd, data->envp);
}
int main(int argc, char **argv, char **envp)
{
t_pipex data;
pid_t pid1;
pid_t pid2;
int status;
if (argc != 5)
{
ft_putstr_fd("Usage: ./pipex infile cmd1 cmd2 outfile\n", 2);
return (1);
}
data.envp = envp;
/* ファイルを開く */
data.infile = open(argv[1], O_RDONLY);
if (data.infile == -1)
{
perror(argv[1]);
return (1);
}
data.outfile = open(argv[4], O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (data.outfile == -1)
{
perror(argv[4]);
close(data.infile);
return (1);
}
/* パイプを作成(fork前に!) */
if (pipe(data.pipe_fd) == -1)
{
perror("pipe");
close(data.infile);
close(data.outfile);
return (1);
}
/* 第1子プロセスを作成 */
pid1 = fork();
if (pid1 == -1)
{
perror("fork");
return (1);
}
if (pid1 == 0)
first_child(&data, argv[2]);
/* 第2子プロセスを作成 */
pid2 = fork();
if (pid2 == -1)
{
perror("fork");
return (1);
}
if (pid2 == 0)
second_child(&data, argv[3]);
/* 親プロセス: 全FDを閉じる */
close(data.pipe_fd[0]);
close(data.pipe_fd[1]);
close(data.infile);
close(data.outfile);
/* 両方の子プロセスを待つ */
waitpid(pid1, &status, 0);
waitpid(pid2, &status, 0);
return (WEXITSTATUS(status));
}
FD状態の推移
【1. 初期状態:ファイルとパイプを開く】
親プロセス
┌────────────────────────────┐
│ FDテーブル │
│ 0 → stdin │
│ 1 → stdout │
│ 2 → stderr │
│ 3 → infile (O_RDONLY) │
│ 4 → outfile (O_WRONLY) │
│ 5 → pipe[0] (read) │
│ 6 → pipe[1] (write) │
└────────────────────────────┘
【2. fork()後:pid1(第1子)を作成】
親プロセス 子プロセス1
┌────────────────────┐ ┌────────────────────┐
│ 0 → stdin │ │ 0 → stdin │
│ 1 → stdout │ │ 1 → stdout │
│ 2 → stderr │ │ 2 → stderr │
│ 3 → infile │ │ 3 → infile │
│ 4 → outfile │ │ 4 → outfile │
│ 5 → pipe[0] │ │ 5 → pipe[0] │
│ 6 → pipe[1] │ │ 6 → pipe[1] │
└────────────────────┘ └────────────────────┘
【3. 子プロセス1のリダイレクト後】
子プロセス1(first_child()実行後)
┌────────────────────┐
│ 0 → infile │ ← dup2(infile, STDIN)
│ 1 → pipe[1] │ ← dup2(pipe[1], STDOUT)
│ 2 → stderr │
│ 3 → (閉じた) │
│ 4 → (閉じた) │
│ 5 → (閉じた) │
│ 6 → (閉じた) │
└────────────────────┘
→ execve()で cmd1 を実行
cmd1の stdin = infile
cmd1の stdout = パイプ
4.9 複数パイプの実装(ボーナス)
N個のコマンドをパイプ接続
3コマンド以上のパイプライン:
./pipex infile "cmd1" "cmd2" "cmd3" outfile
┌────────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌─────────┐
│ infile │──→│ cmd1 │──→│ cmd2 │──→│ cmd3 │──→│ outfile │
└────────┘ └──────┘ └──────┘ └──────┘ └─────────┘
│ │ │
pipe[0] pipe[1] (outfile)
必要なパイプ数: コマンド数 - 1
マルチパイプ実装
typedef struct s_multi_pipe
{
int **pipes; /* パイプの配列 */
int cmd_count; /* コマンド数 */
char **cmds; /* コマンド配列 */
char **envp;
int infile;
int outfile;
} t_multi_pipe;
/* パイプを必要な数だけ作成 */
int create_pipes(t_multi_pipe *data)
{
int i;
/* (コマンド数 - 1)個のパイプが必要 */
data->pipes = malloc(sizeof(int *) * (data->cmd_count - 1));
if (!data->pipes)
return (-1);
for (i = 0; i < data->cmd_count - 1; i++)
{
data->pipes[i] = malloc(sizeof(int) * 2);
if (!data->pipes[i])
return (-1);
if (pipe(data->pipes[i]) == -1)
{
perror("pipe");
return (-1);
}
}
return (0);
}
/* 子プロセスのリダイレクト設定 */
void setup_child_redirection(t_multi_pipe *data, int index)
{
/* 最初のコマンド: infile → cmd → pipe[0] */
if (index == 0)
{
dup2(data->infile, STDIN_FILENO);
dup2(data->pipes[0][1], STDOUT_FILENO);
}
/* 最後のコマンド: pipe[n-2] → cmd → outfile */
else if (index == data->cmd_count - 1)
{
dup2(data->pipes[index - 1][0], STDIN_FILENO);
dup2(data->outfile, STDOUT_FILENO);
}
/* 中間のコマンド: pipe[i-1] → cmd → pipe[i] */
else
{
dup2(data->pipes[index - 1][0], STDIN_FILENO);
dup2(data->pipes[index][1], STDOUT_FILENO);
}
}
/* 全てのパイプを閉じる */
void close_all_pipes(t_multi_pipe *data)
{
int i;
for (i = 0; i < data->cmd_count - 1; i++)
{
close(data->pipes[i][0]);
close(data->pipes[i][1]);
}
}
/* パイプラインを実行 */
void execute_pipeline(t_multi_pipe *data)
{
pid_t *pids;
int i;
int status;
pids = malloc(sizeof(pid_t) * data->cmd_count);
/* 各コマンドに対してプロセスを作成 */
for (i = 0; i < data->cmd_count; i++)
{
pids[i] = fork();
if (pids[i] == 0)
{
/* 子プロセス */
setup_child_redirection(data, i);
close_all_pipes(data);
close(data->infile);
close(data->outfile);
execute_cmd(data->cmds[i], data->envp);
}
}
/* 親プロセス: 全てのパイプとファイルを閉じる */
close_all_pipes(data);
close(data->infile);
close(data->outfile);
/* 全ての子プロセスを待つ */
for (i = 0; i < data->cmd_count; i++)
waitpid(pids[i], &status, 0);
free(pids);
}
マルチパイプのデータフロー
5コマンドの例:
./pipex in "cat" "grep a" "grep b" "sort" "uniq" out
pipe[0] pipe[1] pipe[2] pipe[3]
│ │ │ │
┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌─────┐
│ in │──→│cat │──→│grep│──→│grep│──→│sort│──→│uniq │──→ out
└────┘ └────┘ └────┘ └────┘ └────┘ └─────┘
↑ ↑ ↑ ↑ ↑
cmd[0] cmd[1] cmd[2] cmd[3] cmd[4]
リダイレクション:
cmd[0]: stdin=in, stdout=pipe[0][1]
cmd[1]: stdin=pipe[0][0], stdout=pipe[1][1]
cmd[2]: stdin=pipe[1][0], stdout=pipe[2][1]
cmd[3]: stdin=pipe[2][0], stdout=pipe[3][1]
cmd[4]: stdin=pipe[3][0], stdout=out
4.10 まとめ
学習した理論
- IPC理論
- バッファ理論
実装のポイント
- pipe()はfork()の前に
- 不要なFDは必ず閉じる
- EOFは全書き込み側close()で発生
- SIGPIPEの適切な処理
次章では、here-docとボーナス機能について学びます。