I like to log a programs output 'on demand'. Eg. the output is logged to the terminal, but another process can hook on the current output at any time.
The classic way would be:
myprogram 2>&1 | tee /tmp/mylog
and on demand
tail /tmp/mylog
However, this would create a ever growing log file even if not used until the drive runs out of space. So my attempt was:
mkfifo /tmp/mylog
myprogram 2>&1 | tee /tmp/mylog
and on demand
cat /tmp/mylog
Now I can read /tmp/mylog at any time. However, any output blocks the program until the /tmp/mylog is read. I like the fifo to flush any incoming data not read back. How to do that?
Inspired by your question I've written a simple program that will let you do this:
$ myprogram 2>&1 | ftee /tmp/mylog
It behaves similarly to tee
but clones the stdin to stdout and to a named pipe (a requirement for now) without blocking. This means that if you want to log this way it may happen that you're gonna lose your log data, but I guess it's acceptable in your scenario. The trick is to block SIGPIPE
signal and to ignore error on writing to a broken fifo. This sample may be optimized in various ways of course, but so far, it does the job I guess.
/* ftee - clone stdin to stdout and to a named pipe (c) racic@stackoverflow WTFPL Licence */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <errno.h> #include <signal.h> #include <unistd.h> int main(int argc, char *argv[]) { int readfd, writefd; struct stat status; char *fifonam; char buffer[BUFSIZ]; ssize_t bytes; signal(SIGPIPE, SIG_IGN); if(2!=argc) { printf("Usage:\n someprog 2>&1 | %s FIFO\n FIFO - path to a" " named pipe, required argument\n", argv[0]); exit(EXIT_FAILURE); } fifonam = argv[1]; readfd = open(fifonam, O_RDONLY | O_NONBLOCK); if(-1==readfd) { perror("ftee: readfd: open()"); exit(EXIT_FAILURE); } if(-1==fstat(readfd, &status)) { perror("ftee: fstat"); close(readfd); exit(EXIT_FAILURE); } if(!S_ISFIFO(status.st_mode)) { printf("ftee: %s in not a fifo!\n", fifonam); close(readfd); exit(EXIT_FAILURE); } writefd = open(fifonam, O_WRONLY | O_NONBLOCK); if(-1==writefd) { perror("ftee: writefd: open()"); close(readfd); exit(EXIT_FAILURE); } close(readfd); while(1) { bytes = read(STDIN_FILENO, buffer, sizeof(buffer)); if (bytes < 0 && errno == EINTR) continue; if (bytes <= 0) break; bytes = write(STDOUT_FILENO, buffer, bytes); if(-1==bytes) perror("ftee: writing to stdout"); bytes = write(writefd, buffer, bytes); if(-1==bytes);//Ignoring the errors } close(writefd); return(0); }
You can compile it with this standard command:
$ gcc ftee.c -o ftee
You can quickly verify it by running e.g.:
$ ping www.google.com | ftee /tmp/mylog
$ cat /tmp/mylog
Also note - this is no multiplexer. You can only have one process doing $ cat /tmp/mylog
at a time.
This is a (very) old thread, but I've run into a similar problem of late. In fact, what I needed is a cloning of stdin to stdout with a copy to a pipe that is non blocking. the proposed ftee in the first answer really helped there, but was (for my use case) too volatile. Meaning I lost data I could have processed if I had gotten to it in time.
The scenario I was faced with is that I have a process (some_process) that aggregates some data and writes its results every three seconds to stdout. The (simplified) setup looked like this (in the real setup I am using a named pipe):
some_process | ftee >(onlineAnalysis.pl > results) | gzip > raw_data.gz
Now, raw_data.gz has to be compressed and has to be complete. ftee does this job very well. But the pipe I am using in the middle was too slow to grab the data flushed out - but it was fast enough to process everything if it could get to it, which was tested with a normal tee. However, a normal tee blocks if anything happens to the unnamed pipe, and as I want to be able to hook in on demand, tee is not an option. Back to the topic: It got better when I put a buffer in between, resulting in:
some_process | ftee >(mbuffer -m 32M| onlineAnalysis.pl > results) | gzip > raw_data.gz
But that was still losing data I could have processed. So I went ahead and extended the ftee proposed before to a buffered version (bftee). It still has all the same properties, but uses an (inefficient ?) internal buffer in case a write fails. It still loses data if the buffer runs full, but it works beautifully for my case. As always there is a lot of room for improvement, but as I copied the code off of here I'd like to share it back to people that might have a use for it.
/* bftee - clone stdin to stdout and to a buffered, non-blocking pipe (c) racic@stackoverflow (c) fabraxias@stackoverflow WTFPL Licence */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <errno.h> #include <signal.h> #include <unistd.h> // the number of sBuffers that are being held at a maximum #define BUFFER_SIZE 4096 #define BLOCK_SIZE 2048 typedef struct { char data[BLOCK_SIZE]; int bytes; } sBuffer; typedef struct { sBuffer *data; //array of buffers int bufferSize; // number of buffer in data int start; // index of the current start buffer int end; // index of the current end buffer int active; // number of active buffer (currently in use) int maxUse; // maximum number of buffers ever used int drops; // number of discarded buffer due to overflow int sWrites; // number of buffer written to stdout int pWrites; // number of buffers written to pipe } sQueue; void InitQueue(sQueue*, int); // initialized the Queue void PushToQueue(sQueue*, sBuffer*, int); // pushes a buffer into Queue at the end sBuffer *RetrieveFromQueue(sQueue*); // returns the first entry of the buffer and removes it or NULL is buffer is empty sBuffer *PeakAtQueue(sQueue*); // returns the first entry of the buffer but does not remove it. Returns NULL on an empty buffer void ShrinkInQueue(sQueue *queue, int); // shrinks the first entry of the buffer by n-bytes. Buffer is removed if it is empty void DelFromQueue(sQueue *queue); // removes the first entry of the queue static void sigUSR1(int); // signal handled for SUGUSR1 - used for stats output to stderr static void sigINT(int); // signla handler for SIGKILL/SIGTERM - allows for a graceful stop ? sQueue queue; // Buffer storing the overflow volatile int quit; // for quiting the main loop int main(int argc, char *argv[]) { int readfd, writefd; struct stat status; char *fifonam; sBuffer buffer; ssize_t bytes; int bufferSize = BUFFER_SIZE; signal(SIGPIPE, SIG_IGN); signal(SIGUSR1, sigUSR1); signal(SIGTERM, sigINT); signal(SIGINT, sigINT); /** Handle commandline args and open the pipe for non blocking writing **/ if(argc < 2 || argc > 3) { printf("Usage:\n someprog 2>&1 | %s FIFO [BufferSize]\n" "FIFO - path to a named pipe, required argument\n" "BufferSize - temporary Internal buffer size in case write to FIFO fails\n", argv[0]); exit(EXIT_FAILURE); } fifonam = argv[1]; if (argc == 3) { bufferSize = atoi(argv[2]); if (bufferSize == 0) bufferSize = BUFFER_SIZE; } readfd = open(fifonam, O_RDONLY | O_NONBLOCK); if(-1==readfd) { perror("bftee: readfd: open()"); exit(EXIT_FAILURE); } if(-1==fstat(readfd, &status)) { perror("bftee: fstat"); close(readfd); exit(EXIT_FAILURE); } if(!S_ISFIFO(status.st_mode)) { printf("bftee: %s in not a fifo!\n", fifonam); close(readfd); exit(EXIT_FAILURE); } writefd = open(fifonam, O_WRONLY | O_NONBLOCK); if(-1==writefd) { perror("bftee: writefd: open()"); close(readfd); exit(EXIT_FAILURE); } close(readfd); InitQueue(&queue, bufferSize); quit = 0; while(!quit) { // read from STDIN bytes = read(STDIN_FILENO, buffer.data, sizeof(buffer.data)); // if read failed due to interrupt, then retry, otherwise STDIN has closed and we should stop reading if (bytes < 0 && errno == EINTR) continue; if (bytes <= 0) break; // save the number if read bytes in the current buffer to be processed buffer.bytes = bytes; // this is a blocking write. As long as buffer is smaller than 4096 Bytes, the write is atomic to a pipe in Linux // thus, this cannot be interrupted. however, to be save this should handle the error cases of partial or interrupted write none the less. bytes = write(STDOUT_FILENO, buffer.data, buffer.bytes); queue.sWrites++; if(-1==bytes) { perror("ftee: writing to stdout"); break; } sBuffer *tmpBuffer = NULL; // if the queue is empty (tmpBuffer gets set to NULL) the this does nothing - otherwise it tries to write // the buffered data to the pipe. This continues until the Buffer is empty or the write fails. // NOTE: bytes cannot be -1 (that would have failed just before) when the loop is entered. while ((bytes != -1) && (tmpBuffer = PeakAtQueue(&queue)) != NULL) { // write the oldest buffer to the pipe bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes); // the written bytes are equal to the buffer size, the write is successful - remove the buffer and continue if (bytes == tmpBuffer->bytes) { DelFromQueue(&queue); queue.pWrites++; } else if (bytes > 0) { // on a positive bytes value there was a partial write. we shrink the current buffer // and handle this as a write failure ShrinkInQueue(&queue, bytes); bytes = -1; } } // There are several cases here: // 1.) The Queue is empty -> bytes is still set from the write to STDOUT. in this case, we try to write the read data directly to the pipe // 2.) The Queue was not empty but is now -> bytes is set from the last write (which was successful) and is bigger 0. also try to write the data // 3.) The Queue was not empty and still is not -> there was a write error before (even partial), and bytes is -1. Thus this line is skipped. if (bytes != -1) bytes = write(writefd, buffer.data, buffer.bytes); // again, there are several cases what can happen here // 1.) the write before was successful -> in this case bytes is equal to buffer.bytes and nothing happens // 2.) the write just before is partial or failed all together - bytes is either -1 or smaller than buffer.bytes -> add the remaining data to the queue // 3.) the write before did not happen as the buffer flush already had an error. In this case bytes is -1 -> add the remaining data to the queue if (bytes != buffer.bytes) PushToQueue(&queue, &buffer, bytes); else queue.pWrites++; } // once we are done with STDIN, try to flush the buffer to the named pipe if (queue.active > 0) { //set output buffer to block - here we wait until we can write everything to the named pipe // --> this does not seem to work - just in case there is a busy loop that waits for buffer flush aswell. int saved_flags = fcntl(writefd, F_GETFL); int new_flags = saved_flags & ~O_NONBLOCK; int res = fcntl(writefd, F_SETFL, new_flags); sBuffer *tmpBuffer = NULL; //TODO: this does not handle partial writes yet while ((tmpBuffer = PeakAtQueue(&queue)) != NULL) { int bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes); if (bytes != -1) DelFromQueue(&queue); } } close(writefd); } /** init a given Queue **/ void InitQueue (sQueue *queue, int bufferSize) { queue->data = calloc(bufferSize, sizeof(sBuffer)); queue->bufferSize = bufferSize; queue->start = 0; queue->end = 0; queue->active = 0; queue->maxUse = 0; queue->drops = 0; queue->sWrites = 0; queue->pWrites = 0; } /** push a buffer into the Queue**/ void PushToQueue(sQueue *queue, sBuffer *p, int offset) { if (offset < 0) offset = 0; // offset cannot be smaller than 0 - if that is the case, we were given an error code. Set it to 0 instead if (offset == p->bytes) return; // in this case there are 0 bytes to add to the queue. Nothing to write // this should never happen - offset cannot be bigger than the buffer itself. Panic action if (offset > p->bytes) {perror("got more bytes to buffer than we read\n"); exit(EXIT_FAILURE);} // debug output on a partial write. TODO: remove this line // if (offset > 0 ) fprintf(stderr, "partial write to buffer\n"); // copy the data from the buffer into the queue and remember its size memcpy(queue->data[queue->end].data, p->data + offset , p->bytes-offset); queue->data[queue->end].bytes = p->bytes - offset; // move the buffer forward queue->end = (queue->end + 1) % queue->bufferSize; // there is still space in the buffer if (queue->active < queue->bufferSize) { queue->active++; if (queue->active > queue->maxUse) queue->maxUse = queue->active; } else { // Overwriting the oldest. Move start to next-oldest queue->start = (queue->start + 1) % queue->bufferSize; queue->drops++; } } /** return the oldest entry in the Queue and remove it or return NULL in case the Queue is empty **/ sBuffer *RetrieveFromQueue(sQueue *queue) { if (!queue->active) { return NULL; } queue->start = (queue->start + 1) % queue->bufferSize; queue->active--; return &(queue->data[queue->start]); } /** return the oldest entry in the Queue or NULL if the Queue is empty. Does not remove the entry **/ sBuffer *PeakAtQueue(sQueue *queue) { if (!queue->active) { return NULL; } return &(queue->data[queue->start]); } /*** Shrinks the oldest entry i the Queue by bytes. Removes the entry if buffer of the oldest entry runs empty*/ void ShrinkInQueue(sQueue *queue, int bytes) { // cannot remove negative amount of bytes - this is an error case. Ignore it if (bytes <= 0) return; // remove the entry if the offset is equal to the buffer size if (queue->data[queue->start].bytes == bytes) { DelFromQueue(queue); return; }; // this is a partial delete if (queue->data[queue->start].bytes > bytes) { //shift the memory by the offset memmove(queue->data[queue->start].data, queue->data[queue->start].data + bytes, queue->data[queue->start].bytes - bytes); queue->data[queue->start].bytes = queue->data[queue->start].bytes - bytes; return; } // panic is the are to remove more than we have the buffer if (queue->data[queue->start].bytes < bytes) { perror("we wrote more than we had - this should never happen\n"); exit(EXIT_FAILURE); return; } } /** delete the oldest entry from the queue. Do nothing if the Queue is empty **/ void DelFromQueue(sQueue *queue) { if (queue->active > 0) { queue->start = (queue->start + 1) % queue->bufferSize; queue->active--; } } /** Stats output on SIGUSR1 **/ static void sigUSR1(int signo) { fprintf(stderr, "Buffer use: %i (%i/%i), STDOUT: %i PIPE: %i:%i\n", queue.active, queue.maxUse, queue.bufferSize, queue.sWrites, queue.pWrites, queue.drops); } /** handle signal for terminating **/ static void sigINT(int signo) { quit++; if (quit > 1) exit(EXIT_FAILURE); }
This version takes one more (optional) argument which specifies the number of the blocks that are to buffered for the pipe. My sample call now looks like this:
some_process | bftee >(onlineAnalysis.pl > results) 16384 | gzip > raw_data.gz
resulting in 16384 blocks to be buffered before discards happen. this uses about 32 Mbyte more memory, but... who cares ?
Of course, in the real environment I am using a named pipe so that I can attach and detach as needed. There is looks like this:
mkfifo named_pipe some_process | bftee named_pipe 16384 | gzip > raw_data.gz & cat named_pipe | onlineAnalysis.pl > results
Also, the process reacts on signals as follows: SIGUSR1 -> print counters to STDERR SIGTERM, SIGINT -> first exits the main loop and flushed the buffer to the pipe, the second terminated the program immediatly.
Maybe this helps someone in the future... Enjoy
However, this would create a ever growing log file even if not used until the drive runs out of space.
Why not periodically rotate the logs? There's even a program to do it for you logrotate
.
There's also a system for generating log messages and doing different things with them according to type. It's called syslog
.
You could even combine the two. Have your program generate syslog messages, configure syslog to place them in a file and use logrotate to ensure they don't fill the disk.
If it turned out that you were writing for a small embedded system and the program's output is heavy there are a variety of techniques you might consider.
It seems like bash <>
redirection operator (3.6.10 Opening File Descriptors for Reading and WritingSee) makes writing to file/fifo opened with it non-blocking.
This should work:
$ mkfifo /tmp/mylog
$ exec 4<>/tmp/mylog
$ myprogram 2>&1 | tee >&4
$ cat /tmp/mylog # on demend
Solution given by gniourf_gniourf on #bash IRC channel.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With