183. 管道与多线程

管道

使用方法

1
2
3
#include <unistd.h>

int pipe(int fildes[2]);

The pipe function creates a communication buffer that the caller can access through the file descriptors fildes[0] and fildes[1]. The data written to fildes[1] can be read from fildes[0] on a first-in-first-out basis.

  • 最简单的UNIX进程间通信机制

  • 管道是一种特殊的文件

  • pipe函数创建一个通信缓冲区,程序可以通过文件描述符fildes[0]fildes[1]来访问这个缓冲区

  • 写入fildes[1]的数据可以按照先进先出的顺序从fildes[0]中读出

  • 如果成功则返回0。否则返回-1,并设置errno

  • 使用 close() 关闭管道

    1
    2
    3
    4
    // 关闭读管道
    close(pfd[0]);
    // 关闭写管道
    close(pfd[1]);

PIPE初始化

  • PIPE 设备文件系统分配 inode

  • 分配读写两个文件表项,分配两个文件描述符,设置相关引用计数

  • 使用直接快作为BUFFER缓冲写入数据

  • 循环数组方式使用BUFFER

使用

使用 readwrite 对管道进行读写.

读管道

  • 示例

    1
    2
    3
    4
    5
    6
    int pfd[2];
    if (pipe(pfd) < 0) {
    cout << "ERROR: failed to create the pipe." << endl;
    }

    ssize_t n = read(pfd[0], buffer, bufferSize);read(pfd[1], buf, size);
  • 注意

    条件 状态
    管道读写端都未关闭(未调用close()函数)
    && 管道中有数据
    成功读出数据
    && 返回值为读出的字节数
    管道读写端都未关闭(未调用close()函数)
    && 管道中无数据
    阻塞
    管道读端关闭(close(pfd[0]))
    && 管道中有数据
    return -1
    管道读端关闭(close(pfd[0]))
    && 管道中无数据
    return -1
    管道写端关闭(close(pfd[1]))
    && 管道中有数据
    成功读出数据
    && 返回值为读出的字节数
    管道写端关闭(close(pfd[1]))
    && 管道中无数据
    reutrn 0

写管道

1
2
3
4
5
6
int pfd[2];
if (pipe(pfd) < 0) {
cout << "ERROR: failed to create the pipe." << endl;
}

write(pfd[1], buffer, bufferSize);

管道与多线程的关系

在Linux和类Unix系统中,读写管道不需要加锁的原因是因为管道本身是一个原子性的操作,只有一个写入端和一个读取端。由于管道的设计和操作是线性的,不会有多个写入端或读取端同时操作同一个管道。

在多进程或多线程环境中,多个进程或线程可以同时读取或写入管道,但这些操作不会导致数据竞争或并发问题,因为操作系统确保了管道的原子性。每次写入或读取操作都是原子的,不会中断或影响其他操作。这是因为操作系统会在内核层面对管道进行适当的同步和管理,以确保数据的一致性和完整性。

使用管道进行多线程之间的信息传递, 由于管道自带了锁, 因此不需要加锁. 同时, 管道中的阻塞现象, 也可以避免使用信号量进行同步.

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <ctime>
#include <iostream>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>

using namespace std;

#define BLOCK_SIZE 1024
#define READER_COUNT 20
#define WRITER_COUNT 10

// 管道描述符
int pfd[2];

int infileFd;
int outfileFd;

class Buffer {
public:
off_t offset;
ssize_t dataLength;
char data[BLOCK_SIZE];
};

class ThreadArgs {
public:
uint32_t id;
};

void *readerFunc(void *args) {
if (args == NULL) {
cout << "ERROR: args is NULL" << endl;
return NULL;
}
ThreadArgs *tArgs = (ThreadArgs *)args;

Buffer *buffer = NULL;

int epoch = 0;
while (1) {
buffer = new Buffer;
memset(buffer, 0, sizeof(*buffer));
ssize_t blockSize = sizeof(buffer->data);
buffer->offset = (tArgs->id + epoch * READER_COUNT) * blockSize;
buffer->dataLength = pread(infileFd, buffer->data, blockSize, buffer->offset);
if (buffer->dataLength == 0) {
delete buffer;
printf("INFO: thread %d finish to read the infile\n", tArgs->id);
break;
} else if (buffer->dataLength < 0) {
printf("WARNING: thread %d failed to read infile\n", tArgs->id);
} else {
write(pfd[1], &buffer, sizeof(Buffer *));
++epoch;
}
}
pthread_exit(NULL);
}

void *writerFunc(void *args) {
if (args == NULL) {
cout << "ERROR: tArgs is NULL" << endl;
return NULL;
}
ThreadArgs *tArgs = (ThreadArgs *)args;

Buffer *buffer = NULL;

while (1) {
int ret = read(pfd[0], &buffer, sizeof(Buffer *));
if (ret < 0) {
printf("ERROR: thread %d failed to read pipe error.\n", tArgs->id);
} else if (ret == 0) {
printf("INFO: thread %d pipe is empty, quiting\n", tArgs->id );
break;
} else {
pwrite(outfileFd, buffer->data, buffer->dataLength, buffer->offset);
delete buffer;
}
}
pthread_exit(NULL);
}

int main(int argc, char **args) {

clock_t startTime = clock();
clock_t finishTime;

if (argc != 3) {
cout << "ERROR: please enter the infile and outfile." << endl;
return 0;
}

if (pipe(pfd) < 0) {
cout << "ERROR: failed to create the pipe." << endl;
}

if ((infileFd = open(args[1], O_RDONLY)) < 0) {
cout << "failed to open infile" << endl;
return -1;
}
if ((outfileFd = open(args[2], O_RDWR)) < 0) {
cout << "failed to open outfile" << endl;
return -1;
}

pthread_t readerHandles[READER_COUNT];
ThreadArgs readerArgs[READER_COUNT];
pthread_t writerHandles[WRITER_COUNT];
ThreadArgs writerArgs[WRITER_COUNT];

for (int i = 0; i < READER_COUNT; ++i) {
readerArgs[i].id = i;
if (pthread_create(readerHandles + i, NULL, readerFunc, readerArgs + i) != 0) {
cout << "ERROR: failed to create reader thread " << i << endl;
}
}

for (int i = 0; i < WRITER_COUNT; ++i) {
writerArgs[i].id = i;
if (pthread_create(writerHandles + i, NULL, writerFunc, writerArgs + i) != 0) {
cout << "ERROR: failed to create writer thread " << i << endl;
}
}

for (int i = 0; i < READER_COUNT; ++i) {
if (pthread_join(readerHandles[i], NULL) != 0) {
cout << "ERROR: failed to join reader thread " << i << endl;
}
}

// 读者向管道写入数据
close(pfd[1]);

for (int i = 0; i < WRITER_COUNT; ++i) {
if (pthread_join(writerHandles[i], NULL) != 0) {
cout << "ERROR: failed to join reader thread " << i << endl;
}
}

// 写者从管道读取数据
close(pfd[0]);

// 关闭文件描述符
close(infileFd);
close(outfileFd);

finishTime = clock();
printf("spend time %f seconds\n", (double)(finishTime - startTime) / CLOCKS_PER_SEC);

return 0;
}