I'm attempting use caffe and python to do real-time image classification. I'm using OpenCV to stream from my webcam in one process, and in a separate process, using caffe to perform image classification on the frames pulled from the webcam. Then I'm passing the result of the classification back to the main thread to caption the webcam stream.
The problem is that even though I have an NVIDIA GPU and am performing the caffe predictions on the GPU, the main thread gets slown down. Normally without doing any predictions, my webcam stream runs at 30 fps; however, with the predictions, my webcam stream gets at best 15 fps.
I've verified that caffe is indeed using the GPU when performing the predictions, and that my GPU or GPU memory is not maxing out. I've also verified that my CPU cores are not getting maxed out at any point during the program. I'm wondering if I am doing something wrong or if there is no way to keep these 2 processes truly separate. Any advice is appreciated. Here is my code for reference
class Consumer(multiprocessing.Process): def __init__(self, task_queue, result_queue): multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue #other initialization stuff def run(self): caffe.set_mode_gpu() caffe.set_device(0) #Load caffe net -- code omitted while True: image = self.task_queue.get() #crop image -- code omitted text = net.predict(image) self.result_queue.put(text) return import cv2 import caffe import multiprocessing import Queue tasks = multiprocessing.Queue() results = multiprocessing.Queue() consumer = Consumer(tasks,results) consumer.start() #Creating window and starting video capturer from camera cv2.namedWindow("preview") vc = cv2.VideoCapture(0) #Try to get the first frame if vc.isOpened(): rval, frame = vc.read() else: rval = False frame_copy[:] = frame task_empty = True while rval: if task_empty: tasks.put(frame_copy) task_empty = False if not results.empty(): text = results.get() #Add text to frame cv2.putText(frame,text) task_empty = True #Showing the frame with all the applied modifications cv2.imshow("preview", frame) #Getting next frame from camera rval, frame = vc.read() frame_copy[:] = frame #Getting keyboard input key = cv2.waitKey(1) #exit on ESC if key == 27: break
I am pretty sure it is the caffe prediction slowing everything down, because when I comment out the prediction and pass dummy text back and forth between the processes, I get 30 fps again.
class Consumer(multiprocessing.Process): def __init__(self, task_queue, result_queue): multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue #other initialization stuff def run(self): caffe.set_mode_gpu() caffe.set_device(0) #Load caffe net -- code omitted while True: image = self.task_queue.get() #crop image -- code omitted #text = net.predict(image) text = "dummy text" self.result_queue.put(text) return import cv2 import caffe import multiprocessing import Queue tasks = multiprocessing.Queue() results = multiprocessing.Queue() consumer = Consumer(tasks,results) consumer.start() #Creating window and starting video capturer from camera cv2.namedWindow("preview") vc = cv2.VideoCapture(0) #Try to get the first frame if vc.isOpened(): rval, frame = vc.read() else: rval = False frame_copy[:] = frame task_empty = True while rval: if task_empty: tasks.put(frame_copy) task_empty = False if not results.empty(): text = results.get() #Add text to frame cv2.putText(frame,text) task_empty = True #Showing the frame with all the applied modifications cv2.imshow("preview", frame) #Getting next frame from camera rval, frame = vc.read() frame_copy[:] = frame #Getting keyboard input key = cv2.waitKey(1) #exit on ESC if key == 27: break
Using ANN, image classification problems become difficult because 2-dimensional images need to be converted to 1-dimensional vectors. This increases the number of trainable parameters exponentially. Increasing trainable parameters takes storage and processing capability. In other words, it would be expensive.
While RNNs are suitable for handling temporal or sequential data, CNNs are suitable for handling spatial data (images). Though both models work a bit similarly by introducing sparsity and reusing the same neurons and weights over time (in case of RNN) or over different parts of the image (in case of CNN).
Convolutional Neural Networks (CNNs) is the most popular neural network model being used for image classification problem.
Conclusion. Image classification can be done using neural network models. Identifying patterns and extracting features on images are what deep learning models can do, and they do it very well.
Some Explanations and Some Rethinks:
I ran my code below on a laptop with an Intel Core i5-6300HQ @2.3GHz
cpu, 8 GB RAM
and NVIDIA GeForce GTX 960M
gpu(2GB memory), and the result was:
Whether I ran the code with caffe running or not(by commenting out or not net_output = this->net_->Forward(net_input)
and some necessary stuff in void Consumer::entry()
), I could always get around 30 fps in the main thread.
The similar result was got on a PC with an Intel Core i5-4440
cpu, 8 GB RAM
, NVIDIA GeForce GT 630
gpu(1GB memory).
I ran the code of @user3543300 in the question on the same laptop, the result was:
Whether caffe was running(on gpu) or not, I could also get around 30 fps.
According to @user3543300 's feedback, with the 2 versions of code mentioned above, @user3543300 could get only around 15 fps, when running caffe(on a Nvidia GeForce 940MX GPU and Intel® Core™ i7-6500U CPU @ 2.50GHz × 4
laptop). And there will also be a slowdown of frame rate of the webcam when caffe running on gpu as an independent program.
So I still think that the problem may most possibly lie in hardware I/O limitaions such as DMA bandwidth(This thread about DMA may hint.) or RAM bandwidth. Hope @user3543300 can check this or find out the true problem that I haven't realized of.
If the problem is indeed what I think of above, then a sensible thought would be to reduce memory I/O overhead introduced by the CNN network. In fact, to solve the similar problem on embedded systems with limited hardware resources, there have been some research on this topic, e.g. Qautization Structurally Sparse Deep Neural Networks, SqueezeNet, Deep-Compression. So hopefully, it will also help to improve the frame rate of webcam in the question by applying such skills.
Original Answer:
Try this c++ solution. It uses threads for the I/O overhead in your task, I tested it using bvlc_alexnet.caffemodel
, deploy.prototxt to do image classification and didn't see obvious slowing down of the main thread(webcam stream) when caffe running(on GPU):
#include <stdio.h> #include <iostream> #include <string> #include <boost/thread.hpp> #include <boost/shared_ptr.hpp> #include "caffe/caffe.hpp" #include "caffe/util/blocking_queue.hpp" #include "caffe/data_transformer.hpp" #include "opencv2/opencv.hpp" using namespace cv; //Queue pair for sharing image/results between webcam and caffe threads template<typename T> class QueuePair { public: explicit QueuePair(int size); ~QueuePair(); caffe::BlockingQueue<T*> free_; caffe::BlockingQueue<T*> full_; DISABLE_COPY_AND_ASSIGN(QueuePair); }; template<typename T> QueuePair<T>::QueuePair(int size) { // Initialize the free queue for (int i = 0; i < size; ++i) { free_.push(new T); } } template<typename T> QueuePair<T>::~QueuePair(){ T *data; while (free_.try_pop(&data)){ delete data; } while (full_.try_pop(&data)){ delete data; } } template class QueuePair<Mat>; template class QueuePair<std::string>; //Do image classification(caffe predict) using a subthread class Consumer{ public: Consumer(boost::shared_ptr<QueuePair<Mat>> task , boost::shared_ptr<QueuePair<std::string>> result); ~Consumer(); void Run(); void Stop(); void entry(boost::shared_ptr<QueuePair<Mat>> task , boost::shared_ptr<QueuePair<std::string>> result); private: bool must_stop(); boost::shared_ptr<QueuePair<Mat> > task_q_; boost::shared_ptr<QueuePair<std::string> > result_q_; //caffe::Blob<float> *net_input_blob_; boost::shared_ptr<caffe::DataTransformer<float> > data_transformer_; boost::shared_ptr<caffe::Net<float> > net_; std::vector<std::string> synset_words_; boost::shared_ptr<boost::thread> thread_; }; Consumer::Consumer(boost::shared_ptr<QueuePair<Mat>> task , boost::shared_ptr<QueuePair<std::string>> result) : task_q_(task), result_q_(result), thread_(){ //for data preprocess caffe::TransformationParameter trans_para; //set mean trans_para.set_mean_file("/path/to/imagenet_mean.binaryproto"); //set crop size, here is cropping 227x227 from 256x256 trans_para.set_crop_size(227); //instantiate a DataTransformer using trans_para for image preprocess data_transformer_.reset(new caffe::DataTransformer<float>(trans_para , caffe::TEST)); //initialize a caffe net net_.reset(new caffe::Net<float>(std::string("/path/to/deploy.prototxt") , caffe::TEST)); //net parameter net_->CopyTrainedLayersFrom(std::string("/path/to/bvlc_alexnet.caffemodel")); std::fstream synset_word("path/to/caffe/data/ilsvrc12/synset_words.txt"); std::string line; if (!synset_word.good()){ std::cerr << "synset words open failed!" << std::endl; } while (std::getline(synset_word, line)){ synset_words_.push_back(line.substr(line.find_first_of(' '), line.length())); } //a container for net input, holds data converted from cv::Mat //net_input_blob_ = new caffe::Blob<float>(1, 3, 227, 227); } Consumer::~Consumer(){ Stop(); //delete net_input_blob_; } void Consumer::entry(boost::shared_ptr<QueuePair<Mat>> task , boost::shared_ptr<QueuePair<std::string>> result){ caffe::Caffe::set_mode(caffe::Caffe::GPU); caffe::Caffe::SetDevice(0); cv::Mat *frame; cv::Mat resized_image(256, 256, CV_8UC3); cv::Size re_size(resized_image.cols, resized_image.rows); //for caffe input and output const std::vector<caffe::Blob<float> *> net_input = this->net_->input_blobs(); std::vector<caffe::Blob<float> *> net_output; //net_input.push_back(net_input_blob_); std::string *res; int pre_num = 1; while (!must_stop()){ std::stringstream result_strm; frame = task->full_.pop(); cv::resize(*frame, resized_image, re_size, 0, 0, CV_INTER_LINEAR); this->data_transformer_->Transform(resized_image, *net_input[0]); net_output = this->net_->Forward(); task->free_.push(frame); res = result->free_.pop(); //Process results here for (int i = 0; i < pre_num; ++i){ result_strm << synset_words_[net_output[0]->cpu_data()[i]] << " " << net_output[0]->cpu_data()[i + pre_num] << "\n"; } *res = result_strm.str(); result->full_.push(res); } } void Consumer::Run(){ if (!thread_){ try{ thread_.reset(new boost::thread(&Consumer::entry, this, task_q_, result_q_)); } catch (std::exception& e) { std::cerr << "Thread exception: " << e.what() << std::endl; } } else std::cout << "Consumer thread may have been running!" << std::endl; }; void Consumer::Stop(){ if (thread_ && thread_->joinable()){ thread_->interrupt(); try { thread_->join(); } catch (boost::thread_interrupted&) { } catch (std::exception& e) { std::cerr << "Thread exception: " << e.what() << std::endl; } } } bool Consumer::must_stop(){ return thread_ && thread_->interruption_requested(); } int main(void) { int max_queue_size = 1000; boost::shared_ptr<QueuePair<Mat>> tasks(new QueuePair<Mat>(max_queue_size)); boost::shared_ptr<QueuePair<std::string>> results(new QueuePair<std::string>(max_queue_size)); char str[100], info_str[100] = " results: "; VideoCapture vc(0); if (!vc.isOpened()) return -1; Consumer consumer(tasks, results); consumer.Run(); Mat frame, *frame_copy; namedWindow("preview"); double t, fps; while (true){ t = (double)getTickCount(); vc.read(frame); if (waitKey(1) >= 0){ consuer.Stop(); break; } if (tasks->free_.try_peek(&frame_copy)){ frame_copy = tasks->free_.pop(); *frame_copy = frame.clone(); tasks->full_.push(frame_copy); } std::string *res; std::string frame_info(""); if (results->full_.try_peek(&res)){ res = results->full_.pop(); frame_info = frame_info + info_str; frame_info = frame_info + *res; results->free_.push(res); } t = ((double)getTickCount() - t) / getTickFrequency(); fps = 1.0 / t; sprintf(str, " fps: %.2f", fps); frame_info = frame_info + str; putText(frame, frame_info, Point(5, 20) , FONT_HERSHEY_SIMPLEX, 0.5, Scalar(0, 255, 0)); imshow("preview", frame); } }
And in src/caffe/util/blocking_queue.cpp, make a little change below and rebuild caffe:
...//Other stuff template class BlockingQueue<Batch<float>*>; template class BlockingQueue<Batch<double>*>; template class BlockingQueue<Datum*>; template class BlockingQueue<shared_ptr<DataReader::QueuePair> >; template class BlockingQueue<P2PSync<float>*>; template class BlockingQueue<P2PSync<double>*>; //add these 2 lines below template class BlockingQueue<cv::Mat*>; template class BlockingQueue<std::string*>;
It seems like caffe's python wrapper blocks the Global Interpreter Lock (GIL). Thus calling any caffe python command blocks ALL python threads.
A workaround (at your own risk) would be to disable the GIL for specific caffe functions. For instance, if you want to be able to run forward
without lock, you can edit $CAFFE_ROOT/python/caffe/_caffe.cpp
. Add this function:
void Net_Forward(Net<Dtype>& net, int start, int end) { Py_BEGIN_ALLOW_THREADS; // <-- disable GIL net.ForwardFromTo(start, end); Py_END_ALLOW_THREADS; // <-- restore GIL }
And replace .def("_forward", &Net<Dtype>::ForwardFromTo)
with:
.def("_forward", &Net_Forward)
Don't forget to make pycaffe
after the change.
See this for more details.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With