1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
| #include <cstddef> #include <cstdint> #include <cstring> #include <ctime> #include <iostream> #include <pthread.h> #include <sys/types.h> #include <unistd.h> #include <fcntl.h>
using namespace std;
#define BLOCK_SIZE 1024 #define READER_COUNT 20 #define WRITER_COUNT 10
int pfd[2];
int infileFd; int outfileFd;
class Buffer { public: off_t offset; ssize_t dataLength; char data[BLOCK_SIZE]; };
class ThreadArgs { public: uint32_t id; };
void *readerFunc(void *args) { if (args == NULL) { cout << "ERROR: args is NULL" << endl; return NULL; } ThreadArgs *tArgs = (ThreadArgs *)args;
Buffer *buffer = NULL;
int epoch = 0; while (1) { buffer = new Buffer; memset(buffer, 0, sizeof(*buffer)); ssize_t blockSize = sizeof(buffer->data); buffer->offset = (tArgs->id + epoch * READER_COUNT) * blockSize; buffer->dataLength = pread(infileFd, buffer->data, blockSize, buffer->offset); if (buffer->dataLength == 0) { delete buffer; printf("INFO: thread %d finish to read the infile\n", tArgs->id); break; } else if (buffer->dataLength < 0) { printf("WARNING: thread %d failed to read infile\n", tArgs->id); } else { write(pfd[1], &buffer, sizeof(Buffer *)); ++epoch; } } pthread_exit(NULL); }
void *writerFunc(void *args) { if (args == NULL) { cout << "ERROR: tArgs is NULL" << endl; return NULL; } ThreadArgs *tArgs = (ThreadArgs *)args;
Buffer *buffer = NULL;
while (1) { int ret = read(pfd[0], &buffer, sizeof(Buffer *)); if (ret < 0) { printf("ERROR: thread %d failed to read pipe error.\n", tArgs->id); } else if (ret == 0) { printf("INFO: thread %d pipe is empty, quiting\n", tArgs->id ); break; } else { pwrite(outfileFd, buffer->data, buffer->dataLength, buffer->offset); delete buffer; } } pthread_exit(NULL); }
int main(int argc, char **args) { clock_t startTime = clock(); clock_t finishTime;
if (argc != 3) { cout << "ERROR: please enter the infile and outfile." << endl; return 0; }
if (pipe(pfd) < 0) { cout << "ERROR: failed to create the pipe." << endl; }
if ((infileFd = open(args[1], O_RDONLY)) < 0) { cout << "failed to open infile" << endl; return -1; } if ((outfileFd = open(args[2], O_RDWR)) < 0) { cout << "failed to open outfile" << endl; return -1; } pthread_t readerHandles[READER_COUNT]; ThreadArgs readerArgs[READER_COUNT]; pthread_t writerHandles[WRITER_COUNT]; ThreadArgs writerArgs[WRITER_COUNT];
for (int i = 0; i < READER_COUNT; ++i) { readerArgs[i].id = i; if (pthread_create(readerHandles + i, NULL, readerFunc, readerArgs + i) != 0) { cout << "ERROR: failed to create reader thread " << i << endl; } }
for (int i = 0; i < WRITER_COUNT; ++i) { writerArgs[i].id = i; if (pthread_create(writerHandles + i, NULL, writerFunc, writerArgs + i) != 0) { cout << "ERROR: failed to create writer thread " << i << endl; } }
for (int i = 0; i < READER_COUNT; ++i) { if (pthread_join(readerHandles[i], NULL) != 0) { cout << "ERROR: failed to join reader thread " << i << endl; } } close(pfd[1]);
for (int i = 0; i < WRITER_COUNT; ++i) { if (pthread_join(writerHandles[i], NULL) != 0) { cout << "ERROR: failed to join reader thread " << i << endl; } }
close(pfd[0]);
close(infileFd); close(outfileFd);
finishTime = clock(); printf("spend time %f seconds\n", (double)(finishTime - startTime) / CLOCKS_PER_SEC);
return 0; }
|