I have a generic problem I am looking to solve, where chunks of binary data sent from a standard input or regular file stream to an application, which in turn converts that binary data into text. Using threads, I want to process the text before piping it over to the next application, which modifies that text even further, and so on.
As a simple test case, I want to extract compressed data via gunzip
. Specifically, I am looking at using gunzip -c -
to extract chunks of binary data sent to it via its (reassigned) stdin
file descriptor, and then pulling out chunks of text from its (reassigned) stdout
file descriptor. I can then print these chunks of text to the real stdout
or stderr
(or do other stuff, later on).
(I realize that I can do gzip
-based compression and extraction on the command line. My goal here is to use this test case to learn how to correctly pass around generic chunks of binary and text data between threads that either run that data through binaries, or process it further.)
In the case of my test program, I have set up three pthread_t
threads:
produce_gzip_chunk_thread
consume_gzip_chunk_thread
consume_gunzip_chunk_thread
I pass each of these threads a shared data instance called thread_data
, which contains a thread lock, two conditions, and some buffers and counter variables. I also include a set of file descriptors for a gunzip
process opened with popen3()
:
typedef struct pthread_data pthread_data_t;
typedef struct popen3_desc popen3_desc_t;
struct pthread_data {
pthread_mutex_t in_lock;
pthread_cond_t in_cond;
pthread_cond_t out_cond;
unsigned char in_buf[BUF_LENGTH_VALUE];
size_t n_in_bytes;
size_t n_in_bytes_written_to_gunzip;
size_t n_out_bytes_read_from_gunzip;
FILE *in_file_ptr;
boolean in_eof;
char in_line[LINE_LENGTH_VALUE];
popen3_desc_t *gunzip_ptr;
};
struct popen3_desc {
int in;
int out;
int err;
};
The produce_gzip_chunk_thread
reads in a 1024-byte chunk of gzip
-compressed bytes from a regular file called foo.gz
.
These bytes are written to an unsigned char
buffer called in_buf
, which is part of the shared data struct I am passing to each thread:
void * produce_gzip_chunk(void *t_data)
{
#ifdef DEBUG
fprintf(stderr, "Debug: Entering --> produce_gzip_chunk()\n");
#endif
pthread_data_t *d = (pthread_data_t *)t_data;
unsigned char in_buf[BUF_LENGTH_VALUE];
size_t n_in_bytes = 0;
d->in_eof = kFalse;
pthread_mutex_lock(&d->in_lock);
while(kTrue) {
n_in_bytes = fread(in_buf, sizeof(in_buf[0]), sizeof(in_buf), d->in_file_ptr);
if (n_in_bytes > 0) {
while (d->n_in_bytes != 0 || d->n_out_bytes_read_from_gunzip != 0)
pthread_cond_wait(&d->in_cond, &d->in_lock);
memcpy(d->in_buf, in_buf, n_in_bytes);
d->n_in_bytes = n_in_bytes;
#ifdef DEBUG
fprintf(stderr, "Debug: ######## [%07zu] produced chunk\n", d->n_in_bytes);
#endif
pthread_cond_signal(&d->in_cond);
}
else if (feof(d->in_file_ptr) || ferror(d->in_file_ptr))
break;
}
d->in_eof = kTrue;
pthread_mutex_unlock(&d->in_lock);
pthread_cond_signal(&d->in_cond);
#ifdef DEBUG
fprintf(stderr, "Debug: Leaving --> produce_gzip_chunk()\n");
#endif
return NULL;
}
Once there is a positive number of bytes stored in n_bytes
— that is, we have pulled data from our input gzip
archive that needs to be processed with gunzip
— this triggers a condition that permits the second thread consume_gzip_chunk_thread
to operate:
void * consume_gzip_chunk(void *t_data)
{
#ifdef DEBUG
fprintf(stderr, "Debug: Entering --> consume_gzip_chunk()\n");
#endif
pthread_data_t *d = (pthread_data_t *)t_data;
long n_in_bytes_written_to_gunzip;
pthread_mutex_lock(&d->in_lock);
while(kTrue) {
while (d->n_in_bytes == 0 && !d->in_eof)
pthread_cond_wait(&d->in_cond, &d->in_lock);
if (d->n_in_bytes) {
#ifdef DEBUG
fprintf(stderr, "Debug: ........ [%07zu] processing chunk\n", d->n_in_bytes);
#endif
if (!d->gunzip_ptr) {
#ifdef DEBUG
fprintf(stderr, "Debug: * setting up gunzip ptr\n");
#endif
d->gunzip_ptr = malloc(sizeof(popen3_desc_t));
if (!d->gunzip_ptr) {
fprintf(stderr, "Error: Could not create gunzip file handle struct\n");
exit(EXIT_FAILURE);
}
popen3("gunzip -c -",
&(d->gunzip_ptr->in),
&(d->gunzip_ptr->out),
&(d->gunzip_ptr->err),
kTrue,
kTrue);
memset(d->in_line, 0, LINE_LENGTH_VALUE);
}
n_in_bytes_written_to_gunzip = (long) write(d->gunzip_ptr->in, d->in_buf, d->n_in_bytes);
#ifdef DEBUG
fprintf(stderr, "Debug: ................ wrote [%07ld] bytes into the gunzip process\n", n_in_bytes_written_to_gunzip);
#endif
if (n_in_bytes_written_to_gunzip > 0)
d->n_in_bytes_written_to_gunzip = n_in_bytes_written_to_gunzip;
d->n_in_bytes = 0;
pthread_cond_signal(&d->out_cond);
}
if (d->in_eof)
break;
}
pthread_mutex_unlock(&d->in_lock);
#ifdef DEBUG
fprintf(stderr, "Debug: Leaving --> consume_gzip_chunk()\n");
#endif
return NULL;
}
When consuming the gzip
data chunk, we use the write
function to send n_bytes
of in_buf
to the gunzip
process's input file descriptor. At the end, we send another thread signal, but this time to out_cond
, so as to help reawaken consume_gunzip_chunk_thread
, which reads from gunzip
's output to do more work:
void * consume_gunzip_chunk(void *t_data)
{
#ifdef DEBUG
fprintf(stderr, "Debug: Entering --> consume_gunzip_chunk()\n");
#endif
pthread_data_t *d = (pthread_data_t *)t_data;
long n_out_bytes_read_from_gunzip;
pthread_mutex_lock(&d->in_lock);
while(kTrue) {
while (d->n_in_bytes_written_to_gunzip == 0) {
pthread_cond_wait(&d->out_cond, &d->in_lock);
}
if (d->n_in_bytes_written_to_gunzip) {
sleep(1);
n_out_bytes_read_from_gunzip = read(d->gunzip_ptr->out, d->in_line, LINE_LENGTH_VALUE);
#ifdef DEBUG
fprintf(stderr, "Debug: ------------------------ read [%07ld] bytes out from the gunzip process\n", n_out_bytes_read_from_gunzip);
fprintf(stderr, "Debug: ------------------------ gunzip output chunk:\n[%s]\n", d->in_line);
#endif
memset(d->in_line, 0, strlen(d->in_line));
if (n_out_bytes_read_from_gunzip > 0)
d->n_out_bytes_read_from_gunzip = n_out_bytes_read_from_gunzip;
d->n_in_bytes_written_to_gunzip = 0;
pthread_cond_signal(&d->in_cond);
}
if (d->in_eof && (d->n_in_bytes_written_to_gunzip == 0))
break;
}
pthread_mutex_unlock(&d->in_lock);
#ifdef DEBUG
fprintf(stderr, "Debug: Leaving --> consume_gunzip_chunk()\n");
#endif
return NULL;
}
This attempts to read
any available bytes from the gunzip
process's output file descriptor. For debugging purposes, I just want to print them to stderr
for now.
The problem I am facing is that I need to add a sleep(1)
statement in consume_gunzip_chunk
, before doing the read
, in order to get things working properly.
Without this sleep(1)
statement, my test program will usually output nothing — except once every 8-10 attempts, when the compressed data are extracted correctly.
Question - What am I doing wrong about my arrangement of conditions, such that the sleep(1)
call is required to make the gzip
-extraction work properly? In a production scenario, working with much larger input files, forcibly waiting a second every 1kB seems like a bad idea.
For reproducibility with the full source code, here are the two relevant files. Here is the header:
/*
* convert.h
*/
#ifndef CONVERT_H
#define CONVERT_H
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <getopt.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#define CB_VERSION "1.0"
#define LINE_LENGTH_VALUE 65536
#define BUF_LENGTH_VALUE 1024
#define POPEN3_READ 0
#define POPEN3_WRITE 1
typedef int boolean;
extern const boolean kTrue;
extern const boolean kFalse;
const boolean kTrue = 1;
const boolean kFalse = 0;
typedef enum {
kGzip,
kUnknown
} format_t;
typedef struct pthread_data pthread_data_t;
typedef struct popen3_desc popen3_desc_t;
struct pthread_data {
pthread_mutex_t in_lock;
pthread_cond_t in_cond;
pthread_cond_t out_cond;
unsigned char in_buf[BUF_LENGTH_VALUE];
size_t n_in_bytes;
size_t n_in_bytes_written_to_gunzip;
size_t n_out_bytes_read_from_gunzip;
boolean in_eof;
FILE *in_file_ptr;
popen3_desc_t *gunzip_ptr;
char in_line[LINE_LENGTH_VALUE];
};
struct popen3_desc {
int in;
int out;
int err;
};
static const char *name = "convert";
static const char *version = CB_VERSION;
static const char *authors = "Alex Reynolds";
static const char *usage = "\n" \
"Usage: convert --input-format=str <input-file>\n" \
" Process Flags:\n\n" \
" --input-format=str | -f str Input format (str = [ gzip ]; required)\n" \
" --help | -h Show this usage message\n";
static struct convert_globals_t {
char *input_format_str;
format_t input_format;
char **filenames;
int num_filenames;
} convert_globals;
static struct option convert_client_long_options[] = {
{ "input-format", required_argument, NULL, 'f' },
{ "help", no_argument, NULL, 'h' },
{ NULL, no_argument, NULL, 0 }
};
static const char *convert_client_opt_string = "f:h?";
void * consume_gunzip_chunk (void *t_data);
void * consume_gzip_chunk (void *t_data);
void * produce_gzip_chunk (void *t_data);
FILE * new_file_ptr (const char *in_fn);
void delete_file_ptr (FILE **file_ptr);
pid_t popen3 (const char *command,
int *in_desc,
int *out_desc,
int *err_desc,
boolean nonblock_in,
boolean nonblock_outerr);
off_t fsize (const char *fn);
void initialize_globals ();
void parse_command_line_options (int argc,
char **argv);
void print_usage (FILE *stream);
#endif
Here is the implementation:
/*
* convert.c
*/
#include "convert.h"
int main(int argc, char **argv)
{
#ifdef DEBUG
fprintf(stderr, "Debug: Entering --> main()\n");
#endif
pthread_t produce_gzip_chunk_thread = NULL;
pthread_t consume_gzip_chunk_thread = NULL;
pthread_t consume_gunzip_chunk_thread = NULL;
pthread_data_t *thread_data = NULL;
parse_command_line_options(argc, argv);
/* initialize thread data */
thread_data = malloc(sizeof(pthread_data_t));
thread_data->n_in_bytes = 0;
thread_data->n_in_bytes_written_to_gunzip = 0;
thread_data->n_out_bytes_read_from_gunzip = 0;
thread_data->in_eof = kFalse;
thread_data->in_file_ptr = new_file_ptr(convert_globals.filenames[0]);
pthread_mutex_init(&(thread_data->in_lock), NULL);
pthread_cond_init(&(thread_data->in_cond), NULL);
pthread_cond_init(&(thread_data->out_cond), NULL);
/* parse input */
if (convert_globals.input_format == kGzip)
{
if (pthread_create(&produce_gzip_chunk_thread, NULL, produce_gzip_chunk, (void *) thread_data) != 0) {
fprintf(stderr, "Error: Could not create gzip chunk production thread\n");
return EXIT_FAILURE;
}
if (pthread_create(&consume_gzip_chunk_thread, NULL, consume_gzip_chunk, (void *) thread_data) != 0) {
fprintf(stderr, "Error: Could not create gzip chunk consumption thread\n");
return EXIT_FAILURE;
}
if (pthread_create(&consume_gunzip_chunk_thread, NULL, consume_gunzip_chunk, (void *) thread_data) != 0) {
fprintf(stderr, "Error: Could not create gunzip chunk consumption thread\n");
return EXIT_FAILURE;
}
if (pthread_join(produce_gzip_chunk_thread, NULL) != 0) {
fprintf(stderr, "Error: Could not join gzip chunk production thread\n");
return EXIT_FAILURE;
}
if (pthread_join(consume_gzip_chunk_thread, NULL) != 0) {
fprintf(stderr, "Error: Could not join gzip chunk consumption thread\n");
return EXIT_FAILURE;
}
if (pthread_join(consume_gunzip_chunk_thread, NULL) != 0) {
fprintf(stderr, "Error: Could not join gunzip chunk consumption thread\n");
return EXIT_FAILURE;
}
}
else
{
/*
handle text formats
*/
}
/* cleanup */
delete_file_ptr(&thread_data->in_file_ptr);
pthread_mutex_destroy(&(thread_data->in_lock));
pthread_cond_destroy(&(thread_data->in_cond));
pthread_cond_destroy(&(thread_data->out_cond));
free(thread_data);
#ifdef DEBUG
fprintf(stderr, "Debug: Leaving --> main()\n");
#endif
return EXIT_SUCCESS;
}
void * consume_gunzip_chunk(void *t_data)
{
#ifdef DEBUG
fprintf(stderr, "Debug: Entering --> consume_gunzip_chunk()\n");
#endif
pthread_data_t *d = (pthread_data_t *)t_data;
long n_out_bytes_read_from_gunzip;
pthread_mutex_lock(&d->in_lock);
while(kTrue) {
while (d->n_in_bytes_written_to_gunzip == 0) {
pthread_cond_wait(&d->out_cond, &d->in_lock);
}
if (d->n_in_bytes_written_to_gunzip) {
sleep(1);
n_out_bytes_read_from_gunzip = read(d->gunzip_ptr->out, d->in_line, LINE_LENGTH_VALUE);
#ifdef DEBUG
fprintf(stderr, "Debug: ------------------------ read [%07ld] bytes out from the gunzip process\n", n_out_bytes_read_from_gunzip);
fprintf(stderr, "Debug: ------------------------ gunzip output chunk:\n[%s]\n", d->in_line);
#endif
memset(d->in_line, 0, strlen(d->in_line));
if (n_out_bytes_read_from_gunzip > 0)
d->n_out_bytes_read_from_gunzip = n_out_bytes_read_from_gunzip;
d->n_in_bytes_written_to_gunzip = 0;
pthread_cond_signal(&d->in_cond);
}
if (d->in_eof && (d->n_in_bytes_written_to_gunzip == 0))
break;
}
pthread_mutex_unlock(&d->in_lock);
#ifdef DEBUG
fprintf(stderr, "Debug: Leaving --> consume_gunzip_chunk()\n");
#endif
return NULL;
}
void * consume_gzip_chunk(void *t_data)
{
#ifdef DEBUG
fprintf(stderr, "Debug: Entering --> consume_gzip_chunk()\n");
#endif
pthread_data_t *d = (pthread_data_t *)t_data;
long n_in_bytes_written_to_gunzip;
pthread_mutex_lock(&d->in_lock);
while(kTrue) {
while (d->n_in_bytes == 0 && !d->in_eof)
pthread_cond_wait(&d->in_cond, &d->in_lock);
if (d->n_in_bytes) {
#ifdef DEBUG
fprintf(stderr, "Debug: ........ [%07zu] processing chunk\n", d->n_in_bytes);
#endif
if (!d->gunzip_ptr) {
#ifdef DEBUG
fprintf(stderr, "Debug: * setting up gunzip ptr\n");
#endif
d->gunzip_ptr = malloc(sizeof(popen3_desc_t));
if (!d->gunzip_ptr) {
fprintf(stderr, "Error: Could not create gunzip file handle struct\n");
exit(EXIT_FAILURE);
}
popen3("gunzip -c -",
&(d->gunzip_ptr->in),
&(d->gunzip_ptr->out),
&(d->gunzip_ptr->err),
kTrue,
kTrue);
memset(d->in_line, 0, LINE_LENGTH_VALUE);
}
n_in_bytes_written_to_gunzip = (long) write(d->gunzip_ptr->in, d->in_buf, d->n_in_bytes);
#ifdef DEBUG
fprintf(stderr, "Debug: ................ wrote [%07ld] bytes into the gunzip process\n", n_in_bytes_written_to_gunzip);
#endif
if (n_in_bytes_written_to_gunzip > 0)
d->n_in_bytes_written_to_gunzip = n_in_bytes_written_to_gunzip;
d->n_in_bytes = 0;
/* pthread_cond_signal(&d->in_cond); */
pthread_cond_signal(&d->out_cond);
}
if (d->in_eof)
break;
}
pthread_mutex_unlock(&d->in_lock);
#ifdef DEBUG
fprintf(stderr, "Debug: Leaving --> consume_gzip_chunk()\n");
#endif
return NULL;
}
void * produce_gzip_chunk(void *t_data)
{
#ifdef DEBUG
fprintf(stderr, "Debug: Entering --> produce_gzip_chunk()\n");
#endif
pthread_data_t *d = (pthread_data_t *)t_data;
unsigned char in_buf[BUF_LENGTH_VALUE];
size_t n_in_bytes = 0;
d->in_eof = kFalse;
pthread_mutex_lock(&d->in_lock);
while(kTrue) {
n_in_bytes = fread(in_buf, sizeof(in_buf[0]), sizeof(in_buf), d->in_file_ptr);
if (n_in_bytes > 0) {
while (d->n_in_bytes != 0 || d->n_out_bytes_read_from_gunzip != 0)
pthread_cond_wait(&d->in_cond, &d->in_lock);
memcpy(d->in_buf, in_buf, n_in_bytes);
d->n_in_bytes = n_in_bytes;
#ifdef DEBUG
fprintf(stderr, "Debug: ######## [%07zu] produced chunk\n", d->n_in_bytes);
#endif
pthread_cond_signal(&d->in_cond);
}
else if (feof(d->in_file_ptr) || ferror(d->in_file_ptr))
break;
}
d->in_eof = kTrue;
pthread_mutex_unlock(&d->in_lock);
pthread_cond_signal(&d->in_cond);
#ifdef DEBUG
fprintf(stderr, "Debug: Leaving --> produce_gzip_chunk()\n");
#endif
return NULL;
}
FILE * new_file_ptr(const char *in_fn)
{
#ifdef DEBUG
fprintf(stderr, "Debug: Entering --> new_file_ptr()\n");
#endif
FILE *file_ptr = NULL;
boolean not_stdin = kTrue;
not_stdin = strcmp(in_fn, "-");
file_ptr = (not_stdin) ? fopen(in_fn, "r") : stdin;
if (!file_ptr) {
fprintf(stderr, "Error: Could not open input stream\n");
exit(EXIT_FAILURE);
}
#ifdef DEBUG
fprintf(stderr, "Debug: Leaving --> new_file_ptr()\n");
#endif
return file_ptr;
}
void delete_file_ptr(FILE **file_ptr)
{
#ifdef DEBUG
fprintf(stderr, "Debug: Entering --> delete_file_ptr()\n");
#endif
fclose(*file_ptr);
*file_ptr = NULL;
#ifdef DEBUG
fprintf(stderr, "Debug: Leaving --> delete_file_ptr()\n");
#endif
}
pid_t popen3(const char *command, int *in_desc, int *out_desc, int *err_desc, boolean nonblock_in, boolean nonblock_outerr)
{
#ifdef DEBUG
fprintf(stderr, "Debug: Entering --> popen3()\n");
#endif
int p_stdin[2], p_stdout[2], p_stderr[2];
pid_t pid;
if (pipe(p_stdin) != 0 || pipe(p_stdout) != 0 || pipe(p_stderr) != 0)
return -1;
if (nonblock_in) {
fcntl(p_stdin[POPEN3_WRITE], F_SETFL, fcntl(p_stdin[POPEN3_WRITE], F_GETFL) | O_NONBLOCK);
}
if (nonblock_outerr) {
fcntl(p_stdout[POPEN3_READ], F_SETFL, fcntl(p_stdout[POPEN3_READ], F_GETFL) | O_NONBLOCK);
fcntl(p_stderr[POPEN3_READ], F_SETFL, fcntl(p_stderr[POPEN3_READ], F_GETFL) | O_NONBLOCK);
}
pid = fork();
if (pid < 0)
return pid; /* error */
if (pid == 0) {
close(p_stdin[POPEN3_WRITE]);
close(p_stdout[POPEN3_READ]);
close(p_stderr[POPEN3_READ]);
dup2(p_stdin[POPEN3_READ], fileno(stdin));
dup2(p_stdout[POPEN3_WRITE], fileno(stderr));
dup2(p_stdout[POPEN3_WRITE], fileno(stdout));
execl("/bin/sh", "sh", "-c", command, NULL);
fprintf(stderr, "Error: Could not execl [%s]\n", command);
exit(EXIT_FAILURE);
}
if (in_desc == NULL)
close(p_stdin[POPEN3_WRITE]);
else
*in_desc = p_stdin[POPEN3_WRITE];
if (out_desc == NULL)
close(p_stdout[POPEN3_READ]);
else
*out_desc = p_stdout[POPEN3_READ];
if (err_desc == NULL)
close(p_stderr[POPEN3_READ]);
else
*err_desc = p_stderr[POPEN3_READ];
#ifdef DEBUG
fprintf(stderr, "Debug: New *in_desc = %d\n", *in_desc);
fprintf(stderr, "Debug: New *out_desc = %d\n", *out_desc);
fprintf(stderr, "Debug: New *err_desc = %d\n", *err_desc);
#endif
#ifdef DEBUG
fprintf(stderr, "Debug: Leaving --> popen3()\n");
#endif
return pid;
}
off_t fsize(const char *fn)
{
#ifdef DEBUG
fprintf(stderr, "Debug: Entering --> fsize()\n");
#endif
struct stat st;
if (stat(fn, &st) == 0)
return st.st_size;
#ifdef DEBUG
fprintf(stderr, "Debug: Leaving --> fsize()\n");
#endif
return EXIT_FAILURE;
}
void initialize_globals()
{
#ifdef DEBUG
fprintf(stderr, "Debug: Entering --> initialize_globals()\n");
#endif
convert_globals.input_format = kUnknown;
convert_globals.filenames = NULL;
convert_globals.num_filenames = 0;
#ifdef DEBUG
fprintf(stderr, "Debug: Leaving --> initialize_globals()\n");
#endif
}
void parse_command_line_options(int argc, char **argv)
{
#ifdef DEBUG
fprintf(stderr, "Debug: Entering --> parse_command_line_options()\n");
#endif
int client_long_index;
int client_opt = getopt_long(argc,
argv,
convert_client_opt_string,
convert_client_long_options,
&client_long_index);
char *in_format_str = NULL;
opterr = 0; /* disable error reporting by GNU getopt */
initialize_globals();
while (client_opt != -1)
{
switch (client_opt)
{
case 'f':
in_format_str = optarg;
break;
case 'h':
print_usage(stdout);
exit(EXIT_SUCCESS);
case '?':
print_usage(stdout);
exit(EXIT_SUCCESS);
default:
break;
}
client_opt = getopt_long(argc,
argv,
convert_client_opt_string,
convert_client_long_options,
&client_long_index);
}
convert_globals.filenames = argv + optind;
convert_globals.num_filenames = argc - optind;
if (!in_format_str) {
fprintf(stderr, "Error: Specified input format was omitted; please specify one of required input formats\n");
print_usage(stderr);
exit(EXIT_FAILURE);
}
else if (convert_globals.num_filenames != 1) {
fprintf(stderr, "Error: Please specify an input file (either a regular file or '-' for stdin\n");
print_usage(stderr);
exit(EXIT_FAILURE);
}
/* map format string to setting */
if (strcmp(in_format_str, "gzip") == 0)
convert_globals.input_format = kGzip;
else {
fprintf(stderr, "Error: Specified input format is unknown; please specify one of required input formats\n");
print_usage(stderr);
exit(EXIT_FAILURE);
}
#ifdef DEBUG
fprintf(stderr, "Debug: Leaving --> parse_command_line_options()\n");
#endif
}
void print_usage(FILE *stream)
{
#ifdef DEBUG
fprintf(stderr, "Debug: Entering --> print_usage()\n");
#endif
fprintf(stream,
"%s\n" \
" version: %s\n" \
" author: %s\n" \
"%s\n",
name,
version,
authors,
usage);
#ifdef DEBUG
fprintf(stderr, "Debug: Leaving --> print_usage()\n");
#endif
}
Here is the build process:
$ mkdir -p objects
$ cc -Wall -Wextra -pedantic -std=c99 -D__STDC_CONSTANT_MACROS -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE=1 -DDEBUG=1 -g -O0 -fno-inline -c convert.c -o objects/convert.o -iquote${PWD}
$ cc -Wall -Wextra -pedantic -std=c99 -D__STDC_CONSTANT_MACROS -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE=1 -DDEBUG=1 -g -O0 -fno-inline objects/convert.o -o convert -lpthread
I have been able to build this test code on OS X and Linux hosts with reasonably modern compile environments.
Thanks in advance for any useful advice!
I will start by saying that I feel pthreads conditions and mutexes were not really necessary here, nor was non-blocking I/O the best reaction to the problems you describe.
In my opinion, the problems you describe with your condition- and mutex-less version are symptoms of forgetting to close()
assiduously the ends of your pipes, with the result that a copy of the writing-end file descriptor of the pipe feeding the child process's stdin
leaked (into that child or others) alive.
Then, given that a writing-end corresponding to stdin's reading-end still existed, the system did not give EOF but instead blocked indefinitely.
In your case, you did prevent the pipe-end file descriptors from leaking to the spawned child (with the correct close()
calls on the child-side of the fork()
within your popen3()
, although you forgot to close()
the wrong-end pipe ends on the parent-side). However, you did not prevent this leakage to all other children! If you call popen3()
twice, the leakage of the set of three descriptors into the child is prevented, but as the parent still owns them, when the next call to popen3()
happens, after the fork()
there are now 6 file descriptors to close (The old set of three and and the new set of three you just created).
In your case, therefore, you should set the close-on-exec flag on those pipe ends, thusly:
fcntl(fdIn [PIPEWR], F_SETFD, fcntl(fdIn [PIPEWR], F_GETFD) | FD_CLOEXEC);
fcntl(fdOut[PIPERD], F_SETFD, fcntl(fdOut[PIPERD], F_GETFD) | FD_CLOEXEC);
fcntl(fdErr[PIPERD], F_SETFD, fcntl(fdErr[PIPERD], F_GETFD) | FD_CLOEXEC);
Here is code that spawns 6 threads and 3 processes, and passes its input unmodified to the output, after internally compressing then decompressing it. It effectively implements gzip -c - | XOR 0x55 | XOR 0x55 | gunzip -c - | cat
, where:
gzip
by thread srcThrd
.gzip
's output is read by thread a2xor0Thrd
and fed to thread xor0Thrd
.xor0Thrd
XORs its input with 0x55
before passing it on to thread xor1Thrd
.xor1Thrd
XORs its input with 0x55
before passing it on to thread xor22BThrd
.xor22BThrd
feeds its input to process gunzip
.gunzip
feeds its output directly (without going through a thread) to cat
cat
's output is read by thread dstThrd
and printed to standard output.Compression is done by inter-process pipe communication, while XORing is done by intra-process pipe communication. No mutexes or condition variables are used. main()
is extremely easy to understand. This code should be easy to extend to your situation.
/* Includes */
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <fcntl.h>
/* Defines */
#define PIPERD 0
#define PIPEWR 1
/* Data structures */
typedef struct PIPESET{
int Ain[2];
int Aout[2];
int Aerr[2];
int xor0[2];
int xor1[2];
int xor2[2];
int Bin[2];
int BoutCin[2];
int Berr[2];
int Cout[2];
int Cerr[2];
} PIPESET;
/* Function Implementations */
/**
* Source thread main method.
*
* Slurps from standard input and feeds process A.
*/
void* srcThrdMain(void* arg){
PIPESET* pipeset = (PIPESET*)arg;
char c;
while(read(0, &c, 1) > 0){
write(pipeset->Ain[PIPEWR], &c, 1);
}
close(pipeset->Ain[PIPEWR]);
pthread_exit(NULL);
}
/**
* A to XOR0 thread main method.
*
* Manually pipes from standard output of process A to input of thread XOR0.
*/
void* a2xor0ThrdMain(void* arg){
PIPESET* pipeset = (PIPESET*)arg;
char buf[65536];
ssize_t bytesRead;
while((bytesRead = read(pipeset->Aout[PIPERD], buf, 65536)) > 0){
write(pipeset->xor0[PIPEWR], buf, bytesRead);
}
close(pipeset->xor0[PIPEWR]);
pthread_exit(NULL);
}
/**
* XOR0 thread main method.
*
* XORs input with 0x55 and outputs to input of XOR1.
*/
void* xor0ThrdMain(void* arg){
PIPESET* pipeset = (PIPESET*)arg;
char c;
while(read(pipeset->xor0[PIPERD], &c, 1) > 0){
c ^= 0x55;
write(pipeset->xor1[PIPEWR], &c, 1);
}
close(pipeset->xor1[PIPEWR]);
pthread_exit(NULL);
}
/**
* XOR1 thread main method.
*
* XORs input with 0x55 and outputs to input of process B.
*/
void* xor1ThrdMain(void* arg){
PIPESET* pipeset = (PIPESET*)arg;
char c;
while(read(pipeset->xor1[PIPERD], &c, 1) > 0){
c ^= 0x55;
write(pipeset->xor2[PIPEWR], &c, 1);
}
close(pipeset->xor2[PIPEWR]);
pthread_exit(NULL);
}
/**
* XOR2 to B thread main method.
*
* Manually pipes from input (output of XOR1) to input of process B.
*/
void* xor22BThrdMain(void* arg){
PIPESET* pipeset = (PIPESET*)arg;
char buf[65536];
ssize_t bytesRead;
while((bytesRead = read(pipeset->xor2[PIPERD], buf, 65536)) > 0){
write(pipeset->Bin[PIPEWR], buf, bytesRead);
}
close(pipeset->Bin[PIPEWR]);
pthread_exit(NULL);
}
/**
* Destination thread main method.
*
* Manually copies the standard output of process C to the standard output.
*/
void* dstThrdMain(void* arg){
PIPESET* pipeset = (PIPESET*)arg;
char c;
while(read(pipeset->Cout[PIPERD], &c, 1) > 0){
write(1, &c, 1);
}
pthread_exit(NULL);
}
/**
* Set close on exec flag on given descriptor.
*/
void setCloExecFlag(int fd){
fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
}
/**
* Set close on exec flag on given descriptor.
*/
void unsetCloExecFlag(int fd){
fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) & ~FD_CLOEXEC);
}
/**
* Pipe4.
*
* Create a pipe with some ends possibly marked close-on-exec.
*/
#define PIPE4_FLAG_NONE (0U)
#define PIPE4_FLAG_RD_CLOEXEC (1U << 0)
#define PIPE4_FLAG_WR_CLOEXEC (1U << 1)
int pipe4(int fd[2], int flags){
int ret = pipe(fd);
if(flags&PIPE4_FLAG_RD_CLOEXEC){setCloExecFlag(fd[PIPERD]);}
if(flags&PIPE4_FLAG_WR_CLOEXEC){setCloExecFlag(fd[PIPEWR]);}
return ret;
}
/**
* Pipe4 explicit derivatives.
*/
#define pipe4_cloexec(fd) pipe4((fd), PIPE4_FLAG_RD_CLOEXEC|PIPE4_FLAG_WR_CLOEXEC)
/**
* Popen4.
*
* General-case for spawning a process and tethering it with cloexec pipes on stdin,
* stdout and stderr.
*
* @param [in] cmd The command to execute.
* @param [in/out] pin The pointer to the cloexec pipe for stdin.
* @param [in/out] pout The pointer to the cloexec pipe for stdout.
* @param [in/out] perr The pointer to the cloexec pipe for stderr.
* @param [in] flags A bitwise OR of flags to this function. Available
* flags are:
*
* POPEN4_FLAG_NONE:
* Explicitly specify no flags.
* POPEN4_FLAG_NOCLOSE_PARENT_STDIN,
* POPEN4_FLAG_NOCLOSE_PARENT_STDOUT,
* POPEN4_FLAG_NOCLOSE_PARENT_STDERR:
* Don't close pin[PIPERD], pout[PIPEWR] and perr[PIPEWR] in the parent,
* respectively.
* POPEN4_FLAG_CLOSE_CHILD_STDIN,
* POPEN4_FLAG_CLOSE_CHILD_STDOUT,
* POPEN4_FLAG_CLOSE_CHILD_STDERR:
* Close the respective streams in the child. Ignores pin, pout and perr
* entirely. Overrides a NOCLOSE_PARENT flag for the same stream.
*/
#define POPEN4_FLAG_NONE (0U)
#define POPEN4_FLAG_NOCLOSE_PARENT_STDIN (1U << 0)
#define POPEN4_FLAG_NOCLOSE_PARENT_STDOUT (1U << 1)
#define POPEN4_FLAG_NOCLOSE_PARENT_STDERR (1U << 2)
#define POPEN4_FLAG_CLOSE_CHILD_STDIN (1U << 3)
#define POPEN4_FLAG_CLOSE_CHILD_STDOUT (1U << 4)
#define POPEN4_FLAG_CLOSE_CHILD_STDERR (1U << 5)
pid_t popen4(const char* cmd, int pin[2], int pout[2], int perr[2], int flags){
/********************
** FORK PROCESS **
********************/
pid_t ret = fork();
if(ret < 0){
/**
* Error in fork(), still in parent.
*/
fprintf(stderr, "fork() failed!\n");
return ret;
}else if(ret == 0){
/**
* Child-side of fork
*/
if(flags & POPEN4_FLAG_CLOSE_CHILD_STDIN){
close(0);
}else{
unsetCloExecFlag(pin [PIPERD]);
dup2(pin [PIPERD], 0);
}
if(flags & POPEN4_FLAG_CLOSE_CHILD_STDOUT){
close(1);
}else{
unsetCloExecFlag(pout[PIPEWR]);
dup2(pout[PIPEWR], 1);
}
if(flags & POPEN4_FLAG_CLOSE_CHILD_STDERR){
close(2);
}else{
unsetCloExecFlag(perr[PIPEWR]);
dup2(perr[PIPEWR], 2);
}
execl("/bin/sh", "sh", "-c", cmd, NULL);
fprintf(stderr, "exec() failed!\n");
exit(-1);
}else{
/**
* Parent-side of fork
*/
if(~flags & POPEN4_FLAG_NOCLOSE_PARENT_STDIN &&
~flags & POPEN4_FLAG_CLOSE_CHILD_STDIN){
close(pin [PIPERD]);
}
if(~flags & POPEN4_FLAG_NOCLOSE_PARENT_STDOUT &&
~flags & POPEN4_FLAG_CLOSE_CHILD_STDOUT){
close(pout[PIPEWR]);
}
if(~flags & POPEN4_FLAG_NOCLOSE_PARENT_STDERR &&
~flags & POPEN4_FLAG_CLOSE_CHILD_STDERR){
close(perr[PIPEWR]);
}
return ret;
}
/* Unreachable */
return ret;
}
/**
* Main Function.
*
* Sets up the whole piping scheme.
*/
int main(int argc, char* argv[]){
pthread_t srcThrd, a2xor0Thrd, xor0Thrd, xor1Thrd, xor22BThrd, dstThrd;
pid_t gzip, gunzip, cat;
PIPESET pipeset;
pipe4_cloexec(pipeset.Ain);
pipe4_cloexec(pipeset.Aout);
pipe4_cloexec(pipeset.Aerr);
pipe4_cloexec(pipeset.Bin);
pipe4_cloexec(pipeset.BoutCin);
pipe4_cloexec(pipeset.Berr);
pipe4_cloexec(pipeset.Cout);
pipe4_cloexec(pipeset.Cerr);
pipe4_cloexec(pipeset.xor0);
pipe4_cloexec(pipeset.xor1);
pipe4_cloexec(pipeset.xor2);
/* Spawn processes */
gzip = popen4("gzip -c -", pipeset.Ain, pipeset.Aout, pipeset.Aerr, POPEN4_FLAG_NONE);
gunzip = popen4("gunzip -c -", pipeset.Bin, pipeset.BoutCin, pipeset.Berr, POPEN4_FLAG_NONE);
cat = popen4("cat", pipeset.BoutCin, pipeset.Cout, pipeset.Cerr, POPEN4_FLAG_NONE);
/* Spawn threads */
pthread_create(&srcThrd, NULL, srcThrdMain, &pipeset);
pthread_create(&a2xor0Thrd, NULL, a2xor0ThrdMain, &pipeset);
pthread_create(&xor0Thrd, NULL, xor0ThrdMain, &pipeset);
pthread_create(&xor1Thrd, NULL, xor1ThrdMain, &pipeset);
pthread_create(&xor22BThrd, NULL, xor22BThrdMain, &pipeset);
pthread_create(&dstThrd, NULL, dstThrdMain, &pipeset);
pthread_join(srcThrd, (void**)NULL);
pthread_join(a2xor0Thrd, (void**)NULL);
pthread_join(xor0Thrd, (void**)NULL);
pthread_join(xor1Thrd, (void**)NULL);
pthread_join(xor22BThrd, (void**)NULL);
pthread_join(dstThrd, (void**)NULL);
return 0;
}
There are many issues with your code, most of which have nothing to do with threading.
close()
the file descriptor d->gunzip_ptr->in
. This means that gunzip
can never know that no more input is forthcoming on its stdin
, so it will never exit.gunzip
doesn't ever exit, it will never close()
its stdout, and thus a blocking read()
at the other end will never unblock. A non-blocking read will instead always give -1
, with errno == EAGAIN
.popen3()
doesn't close()
p_stdin[POPEN3_READ]
, p_stdout[POPEN3_WRITE]
or p_stderr[POPEN3_WRITE]
on the parent side of the fork()
. Only the child should have those descriptors. Failing to close these means that when the parent itself tries to read the stdout and stderr of the child, it will never see EOF, again for the same reasons as above: Because it itself still owns a write-end pipe in which it could write, making new data appear to the read-end.gunzip
writing out at least one byte for every 1024 you write in. There is no guarantee that this will be the case, since gunzip
may, at its leisure, buffer internally.
BUF_LENGTH_VALUE
bytes into d->in_buf
. You then assign the number of bytes you read through fread()
to d->n_in_bytes
. This same d->n_in_bytes
is used in your write()
call to write to gunzip
's stdin. You then signal for consume_gunzip_chunk()
to wake up, then pthread_cond_wait()
's for the next gzip-compressed chunk. But this gzip-compressed chunk may never come, since there is no guarantee that gunzip
will be able to unpack useful output from just the first 1024 bytes of input, nor even a guarantee that it will write()
it out instead of buffering it until it has, say, 4096 bytes (a full page) of output. Therefore, the read()
call in consume_gunzip_chunk()
may never succeed (or even return, if read()
was blocking). And if read()
never returns, then consume_gunzip_chunk()
doesn't signal d->in_cond
, and so all three threads get stuck. And even if read()
is non-blocking, the last block of output from gzip may never come, since gzip
's input is never closed, so it doesn't flush out its buffers by write()
'ing them out, so read()
on the other end will never get useful data and no amount of pleading will elicit it without a close()
.POSSIBLE (LIKELY?) CAUSE OF BUG: d->n_out_bytes_read_from_gunzip
, once it becomes non-0
, will never become 0
again. This means that the extremely baffling
while (d->n_in_bytes != 0 || d->n_out_bytes_read_from_gunzip != 0)
pthread_cond_wait(&d->in_cond, &d->in_lock);
within produce_gzip_chunk()
will, once entered with d->n_out_bytes_read_from_gunzip != 0
, forever remain stuck. By calling sleep(1)
within consume_gunzip_chunk()
, which sets d->n_out_bytes_read_from_gunzip
, you may have defused the problem by reading all input before consume_gunzip_chunk()
could lock up the system by setting d->n_out_bytes_read_from_gunzip
to a non-zero value.
pthread_cond_wait(&d->in_cond, &d->in_lock);
, these being produce_gzip_chunk()
and consume_gzip_chunk()
. There is absolutely no guarantee that when consume_gunzip_chunk()
calls pthread_cond_signal(&d->in_cond);
, that the "correct" thread (whichever it is in your design) will receive the signal. To ensure that all of them will, use pthread_cond_broadcast()
, but then you expose yourself to the thundering herd problem. Needing to use pthread_cond_broadcast()
in this situation is, again, a symptom of a bad design in my opinion.pthread_cond_signal(&d->in_cond)
within a thread (indeed, a function) in which you call pthread_cond_wait(&d->in_cond, &d->in_lock)
. What purpose does that serve?d->in_lock
for too many disparate purposes, exposing yourself to the possibility of deadlock, or low performance due to excessive protection. In particular you use it as the protection for both d->in_cond
and d->out_cond
. This is too strong a protection – the output of gunzip
into d->in_line
should be able to happen simultaneously with the input of gunzip
being written into and out of d->in_buf
.Within consume_gunzip_chunk()
, you have
while (d->n_in_bytes_written_to_gunzip == 0) {
pthread_cond_wait(&d->out_cond, &d->in_lock);
}
if (d->n_in_bytes_written_to_gunzip) {
...
This if
can never fail! Is there a case you may have in mind?
struct pthread_data
volatile (or at least those integer elements which are used by multiple threads), since the compiler might decide to optimize out loads and stores that should, in fact, remain.So as to not sound too negative, I would like to say that in general your problems were not due to misuse of the pthreads API but due to erroneous consumer-producer logic and lack of close()
s. Additionally, you appear to understand that pthread_cond_wait()
may wake up spuriously, and so you have wrapped it up in a loop that checks the invariant.
I would use pipes, even between threads. This absolves you from needing to implement your own consumer-producer scheme; The kernel has solved it for you already, and provides you with the pipe()
, read()
and write()
primitives, which are all you need to take advantage of this ready-made solution. It also makes the code cleaner and void of mutexes and condition variables. One must simply be diligent in closing the ends, and one must be supremely careful around pipes in the presence of fork()
. The rules are simple:
read()
on an open read-end will not give EOF but will block or EAGAIN
.read()
on an open read-end will give EOF.write()
to any of its write-ends will cause SIGPIPE
.fork()
duplicates the entire process, including all descriptors (modulo maybe crazy stuff in pthread_atfork()
)!If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With