creaImageIOQMultiThreadImageReader.cpp

Go to the documentation of this file.
00001 #include <creaImageIOQMultiThreadImageReader.h>
00002 #include <creaImageIOImageReader.h>
00003 //#include <wx/utils.h>
00004 #include <QThread>
00005 #include <creaImageIOSystem.h>
00006 
00007 #include <creaImageIOGimmick.h>
00008 #ifdef _DEBUG
00009 #define new DEBUG_NEW
00010 #endif
00011 namespace creaImageIO
00012 {
00013 
00014   //=====================================================================
00015   void MultiThreadImageReaderUser::MultiThreadImageReaderSendEvent
00016   ( const std::string& filename,
00017     EventType type,
00018     vtkImageData* image)
00019   {
00020     QMutexLocker lock(&mMultiThreadImageReaderUserMutex);
00021 
00022     this->OnMultiThreadImageReaderEvent(filename,type,image);
00023   }
00024   //=====================================================================
00025 
00026   //=====================================================================
00027   class ThreadedImageReader: public QThread
00028   {
00029   public:
00030     ThreadedImageReader(MultiThreadImageReader* tir) :
00031       mMultiThreadImageReader(tir)
00032     {}
00033 
00034     void* Entry();
00035     void  OnExit();
00036 
00037     vtkImageData* Read(const std::string& filename);
00038     
00039         struct deleter
00040         {
00041                 void operator()(ThreadedImageReader* p)
00042                 {
00043                         delete p;
00044                 }
00045         };
00046         friend struct deleter;
00047 
00048 
00049   private:
00050     ImageReader mReader;
00051     MultiThreadImageReader* mMultiThreadImageReader;
00052         
00053   };
00054 
00055   //=====================================================================
00056 
00057   
00058   //=====================================================================
00059   MultiThreadImageReader::MultiThreadImageReader(int number_of_threads)
00060     : //mDoNotSignal(false),
00061       mReader(0),
00062       mTotalMem(0),
00063       mTotalMemMax(1000000)
00064   {
00065     //    std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
00066     //        << " #threads= " << number_of_threads <<" )"<<std::endl;
00067 
00068           mDone = false;
00069     // Create the threads
00070     for (int i=0; i<number_of_threads; i++) 
00071       {
00072                   //ThreadedImageReader* t = new ThreadedImageReader(this);
00073                   boost::shared_ptr<ThreadedImageReader> t(new ThreadedImageReader(this), ThreadedImageReader::deleter());
00074         mThreadedImageReaderList.push_back(t);
00075          std::cout << "  ===> Thread "<<i
00076                       <<" successfully added"<< std::endl;
00077       }
00078     mNumberOfThreadedReadersRunning = 0;
00079     // Init the queue
00080     mQueue.set(mComparator);
00081     mQueue.set(mIndexer);
00082     // 
00083     // no thread : alloc self reader
00084 //    if (number_of_threads==0)
00085 //      {
00086         mReader = new ImageReader();
00087 //      }
00088   }
00089   //=====================================================================
00090 
00091 
00092   //=====================================================================
00093   bool MultiThreadImageReader::Start()
00094   {
00095 
00096     //    std::cout << "#### MultiThreadImageReader::Start()"
00097     //                <<std::endl;
00098           if (mNumberOfThreadedReadersRunning > 0) return true;
00099           
00100     ThreadedImageReaderListType::iterator i;
00101     for (i =mThreadedImageReaderList.begin();
00102          i!=mThreadedImageReaderList.end();
00103          i++)
00104       {
00105         (*i)->start();
00106         if ( !(*i)->isRunning() )
00107           {
00108             std::cout << "ERROR starting a thread"<< std::endl;
00109             return false;
00110           }
00111         else 
00112           {
00113                     std::cout << "  ===> Thread "<<(*i)->currentThreadId()
00114                               <<" successfully created"<< std::endl;
00115             
00116           }
00117       }
00118     QMutexLocker locker(GetMultiThreadImageReaderUserMutex());
00119     //    std::cout << "EO Start : #Threads running = "
00120     //                << mNumberOfThreadedReadersRunning<<std::endl;
00121 
00122     return true;
00123   }
00124   //=====================================================================
00125 
00126   //=====================================================================
00127   void MultiThreadImageReader::Stop()
00128   { 
00129 //                  std::cout << "#### MultiThreadImageReader::Stop()"
00130 //            <<std::endl;
00131   //  std::cout << "Sending stop order to the threads..."<<std::endl;
00132           if (mDone) return;
00133 
00134     ThreadedImageReaderListType::iterator i;
00135     for (i =mThreadedImageReaderList.begin();
00136          i!=mThreadedImageReaderList.end();
00137          i++)
00138       { std::cout << "  ===> Thread "<<(*i)->currentThreadId()
00139                               <<" successfully stopped"<< std::endl;
00140                   if(!(*i)->isFinished())
00141                   {(*i)->wait();
00142                           (*i).reset();
00143                          //                       (*i)->Delete();
00144                   }
00145       }
00146    mThreadedImageReaderList.clear();
00147     // Wait a little to be sure that all threads have stopped
00148     // A better way to do this ?
00149     //    wxMilliSleep(1000);
00150     // New method : the threads generate a stop event when they have finished
00151     // We wait until all threads have stopped
00152 //        std::cout << "Waiting for stop signals..."<<std::endl;
00153     do 
00154       {
00155         // Sleep a little
00156        //   QThread::msleep(10);
00157         // Lock
00158         {
00159           QMutexLocker locker(GetMultiThreadImageReaderUserMutex());
00160 //                std::cout << "#Threads running = "
00161 //                          << mNumberOfThreadedReadersRunning<<std::endl;
00162           // Break if all readers have stopped
00163           if (mNumberOfThreadedReadersRunning <= 0) 
00164             {
00165               break;
00166             }
00167         }
00168       } 
00169     while (true);
00170 //        std::cout << "All threads stopped : OK "<<std::endl;
00171 
00172     ImageMapType::iterator j;
00173     for (j =mImages.begin();
00174          j!=mImages.end();
00175          ++j)
00176 
00177       {
00178         delete j->first;
00179       }
00180     mImages.clear();
00181         mDone = true;
00182   }
00183   //=====================================================================
00184 
00185   //=====================================================================
00186   MultiThreadImageReader::~MultiThreadImageReader()
00187   {
00188     //    std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
00189     //        <<std::endl;
00190     Stop();
00191     if (mReader) delete mReader;
00192         mThreadedImageReaderList.clear();
00193   }
00194   //=====================================================================
00195 
00196   //=====================================================================
00197   void MultiThreadImageReader::UpdateUnloadPriority(ImageToLoadPtr p, 
00198                                                     int priority)
00199   {
00200     // not in unload queue : ciao
00201     if (p->UnloadIndex()<0) return;
00202     int old_prio = p->GetPriority();
00203     if (priority > old_prio) 
00204       {
00205         p->SetPriority(priority);
00206         mUnloadQueue.downsort(p->UnloadIndex());
00207       }
00208     else if ( old_prio > priority )
00209       {
00210         p->SetPriority(priority);
00211         mUnloadQueue.upsort(p->UnloadIndex());
00212      }
00213   }
00214   //=====================================================================
00215   // function to read attributes for a file
00216   void MultiThreadImageReader::getAttributes(const std::string filename, 
00217           std::map <std::string , std::string> &infos,std::vector<std::string> i_attr)
00218   {
00219           mReader->getAttributes(filename, infos, i_attr);
00220   }
00221 
00222   //=====================================================================
00223   void MultiThreadImageReader::Request( MultiThreadImageReaderUser* user,
00224                                         const std::string& filename, 
00225                                         int priority )
00226   {
00227         QMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
00228 
00229           if (mNumberOfThreadedReadersRunning==0)
00230 //    if (mThreadedImageReaderList.size()==0) 
00231       {
00232         // no detached reader : use self reader
00233         ImageToLoad itl(user,filename);
00234         ImageMapType::iterator i = mImages.find(&itl);
00235         if (i!=mImages.end())
00236           {
00237             ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
00238             // Already inserted
00239             if (pitl->GetImage() != 0)
00240               {
00241                 // Already read
00242                 pitl->SetUser(user);
00243                 UpdateUnloadPriority(pitl,priority);
00244                 SignalImageRead(pitl,false);
00245                 return; // pitl->GetImage();
00246               }
00247           }
00248         ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
00249         mImages[pitl] = 0;
00250         pitl->SetImage(mReader->ReadImage(filename));
00251         UpdateUnloadPriority(pitl,priority);
00252         SignalImageRead(pitl,true);
00253         //      return pitl->GetImage();
00254         return;
00255       }
00256 
00257     ImageToLoad itl(user,filename);
00258     ImageMapType::iterator i = mImages.find(&itl);
00259     if (i!=mImages.end())
00260       {
00261         // Already inserted
00262         if (i->first->GetImage() != 0)
00263           {
00264             // Already read : ok :signal the user
00265             UpdateUnloadPriority(i->first,priority);
00266             SignalImageRead(i->first,false);
00267             return;
00268           }
00270         ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
00271         pitl->SetPriority(priority);
00272         // Already in queue
00273         if (pitl->Index()>=0) 
00274           {
00275             // Re-sort the queue
00276             mQueue.upsort(pitl->Index());
00277           }
00278         // Not read but not in queue = being read = ok
00279         else 
00280           {
00281             
00282           }
00283       }
00284     else 
00285       {
00286         // Never requested before or unloaded 
00287         ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
00288         mImages[pitl] = 0;
00289         mQueue.insert(pitl);
00290       }
00291   }
00292   //=====================================================================
00293 
00294   //=====================================================================
00295   void MultiThreadImageReader::OnMultiThreadImageReaderEvent
00296   (const std::string& filename,
00297    MultiThreadImageReaderUser::EventType e,
00298    vtkImageData* image)
00299   {
00300     if ((e==MultiThreadImageReaderUser::ImageLoaded) &&
00301         (filename == mRequestedFilename))
00302       {
00303         mRequestedImage = image;
00304       }
00305     else if (e==MultiThreadImageReaderUser::ThreadedReaderStarted)
00306       {
00307         mNumberOfThreadedReadersRunning++;
00308         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
00309       }
00310     else if (e==MultiThreadImageReaderUser::ThreadedReaderStopped)
00311       {
00312         
00313                  mNumberOfThreadedReadersRunning--;
00314         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
00315       }
00316   }
00317   //=====================================================================
00318 
00319   //=====================================================================
00320   vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
00321   {
00322          // Start();
00323     //       std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
00324     //           <<std::endl;
00325     
00326     do 
00327       {
00328         //      wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
00329                 
00330         //     std::cout << "** MultiThreadImageReader::GetImage('"<<filename
00331         //             <<"') lock ok"
00332         //               <<std::endl;
00333     
00334         //                if (mNumberOfThreadedReadersRunning==0)
00335         //      if (mThreadedImageReaderList.size()==0)
00336         if (true)
00337           {
00338             ImageToLoad itl(this,filename);
00339             ImageMapType::iterator i = mImages.find(&itl);
00340             if (i!=mImages.end())
00341               {
00342                 ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
00343                 // Already inserted
00344                 if (pitl->GetImage() != 0)
00345                   {
00346                     // Already read
00347                     UpdateUnloadPriority(pitl,
00348                                          GetMaximalPriorityWithoutLocking()+1);
00349                     return pitl->GetImage();
00350                   }
00351               }
00352             ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
00353             mImages[pitl] = 0;
00354             pitl->SetImage(mReader->ReadImage(filename));
00355             UpdateUnloadPriority(pitl,
00356                                  GetMaximalPriorityWithoutLocking()+1);
00357             return pitl->GetImage();
00358           }
00359 
00360         /*      
00361         mRequestedFilename = filename;
00362         mRequestedImage = 0;
00363         ImageToLoad itl(this,filename);
00364         ImageMapType::iterator i = mImages.find(&itl);
00365         if (i!=mImages.end())
00366           {
00367             // Already inserted in queue
00368             if (i->first->GetImage() != 0)
00369               {
00370                 // Already read : ok : return it 
00371                 return i->first->GetImage();
00372               }
00374               ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
00375               pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
00376               pitl->SetUser( this );
00377               // Already in queue
00378               if (pitl->Index()>=0) 
00379                 {
00380                   // Re-sort the queue
00381                   mQueue.upsort(pitl->Index());
00382                 }
00383               // Not read but not in queue = being read = ok
00384               else 
00385                 {
00386                   pitl->SetUser( this );
00387                 }
00388           }
00389         else 
00390           {
00391             
00392             // Never requested before or unloaded 
00393             ImageToLoadPtr pitl = 
00394               new ImageToLoad(this,filename,
00395                               GetMaximalPriorityWithoutLocking() + 1);
00396             mImages[pitl] = 0;
00397             mQueue.insert(pitl);
00398           }
00399         */
00400       }
00401     while (0);
00402 
00403     //    std::cout << "Waiting..."<<std::endl;
00404 
00405     /*
00406     // Waiting that it is read
00407     int n = 0;
00408     do 
00409       {
00410         //      std::cout << n++ << std::endl;
00411         wxMilliSleep(10);
00412         do 
00413           {
00414             //      wxMutexLocker lock(mMutex);
00415             wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
00416             if (mRequestedImage!=0) 
00417               {
00418                 return mRequestedImage;
00419               } 
00420           }
00421         while (0);
00422       }
00423     while (true);
00424     // 
00425     */
00426   }
00427   //=====================================================================
00428   
00429   //=====================================================================
00430   void MultiThreadImageReader::SignalImageRead(ImageToLoadPtr p, 
00431                                                bool purge)
00432   {
00433     
00434 //    std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
00435     //    std::cout << "this="<<this <<std::endl;
00436     //    std::cout << "user="<<p->GetUser() <<std::endl;
00437 
00438     if ( p->GetUser() == this ) 
00439       GetMultiThreadImageReaderUserMutex()->unlock();
00440 
00441     p->GetUser()->MultiThreadImageReaderSendEvent
00442       (p->GetFilename(),
00443        MultiThreadImageReaderUser::ImageLoaded,
00444        p->GetImage());
00445 
00446     /*
00447       AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
00448       BUGGY : TO FIX 
00449     */
00450     if (!purge)  return;
00451     GimmickMessage(5,"Image '"<<p->GetFilename()<<"' read"<<std::endl);
00452 
00453     //    wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
00454            
00455     mUnloadQueue.insert(p);
00456     p->GetImage()->UpdateInformation();
00457     p->GetImage()->PropagateUpdateExtent();
00458     long ImMem = p->GetImage()->GetEstimatedMemorySize();
00459     mTotalMem += ImMem;
00460 
00461     GimmickMessage(5,"==> Image in memory = "<<mUnloadQueue.size()<<std::endl);
00462     GimmickMessage(5,"==> Total mem       = "<<mTotalMem<<" Ko"<<std::endl);
00463 
00464     //  return;
00465 
00466     while (mTotalMem > mTotalMemMax)
00467       {
00468         GimmickMessage(5,
00469                        "   ! Exceeded max of "
00470                        << mTotalMemMax << " Ko : unloading oldest image ... "
00471                        << std::endl);
00472         if ( mUnloadQueue.size() <= 1 ) 
00473           {
00474              GimmickMessage(5,
00475                             "   Only one image : cannot load AND unload it !!"
00476                             <<std::endl);
00477             break; 
00478             
00479           }
00480         ImageToLoadPtr unload = mUnloadQueue.remove_top();
00481         MultiThreadImageReaderUser* user = unload->GetUser();
00482 
00483         /*
00484         if ((user!=0)&&(user!=this)) 
00485           {
00486             user->GetMultiThreadImageReaderUserMutex().Lock();
00487           }
00488         */
00489 
00490         std::string filename = unload->GetFilename();
00491 
00492         GimmickMessage(5,"'" << filename << "'" << std::endl);
00493         mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
00494 
00495         GimmickMessage(5," ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl);
00496 
00497         if (user!=0) 
00498           {
00499             //      std::cout << "unlock..."<<std::endl;
00500             //   user->GetMultiThreadImageReaderUserMutex().Unlock();
00501             //      std::cout << "event"<<std::endl;
00502             user->MultiThreadImageReaderSendEvent
00503               (filename,
00504                MultiThreadImageReaderUser::ImageUnloaded,
00505                0);
00506             //      std::cout << "event ok"<<std::endl;
00507           }     
00508 
00509         if (unload->Index()>=0)
00510           {
00511             // GimmickMessage(5,"still in queue"<<std::endl);
00512           }
00513         unload->Index() = -1;
00514 
00515 
00516         ImageMapType::iterator it = mImages.find(unload);
00517         if (it!=mImages.end())
00518           {
00519             mImages.erase(it);
00520           }
00521         //          std::cout << "delete..."<<std::endl;
00522         delete unload;
00523         //          std::cout << "delete ok."<<std::endl;
00524 
00525       }
00526   }
00527   //=====================================================================
00528 
00529   //=====================================================================
00530   int MultiThreadImageReader::GetMaximalPriority()
00531   { 
00532     QMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
00533     return GetMaximalPriorityWithoutLocking();
00534   }
00535   //=====================================================================
00536 
00537 
00538   //=====================================================================
00539   int MultiThreadImageReader::GetMaximalPriorityWithoutLocking()
00540   { 
00541     long max = 0;
00542     if (mQueue.size()>0) 
00543       {
00544         max = mQueue.top()->GetPriority();
00545       }
00546     if (mUnloadQueue.size()>0)
00547       {
00548         int max2 = mUnloadQueue.top()->GetPriority();
00549         if (max2>max) max=max2;
00550       }
00551     return max;
00552   }
00553   //=====================================================================
00554 
00555 
00556   //=====================================================================
00557   //=====================================================================
00558   //=====================================================================
00559   //=====================================================================
00560 
00561   //=====================================================================
00562   void*  ThreadedImageReader::Entry()
00563   {
00564     //    std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
00565     //                << std::endl;
00566 
00567     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
00568       ("",
00569        MultiThreadImageReaderUser::ThreadedReaderStarted,
00570        0);
00571 
00572     // While was not deleted 
00573     while (!isFinished())
00574       {
00575                 //std::cout << "### Thread "<<GetCurrentId()<<" still alive"  << std::endl;
00576           
00577         // Lock the mutex
00578         mMultiThreadImageReader->MultiThreadImageReaderEventLock();
00579         //mMutex.Lock();
00580         // If image in queue
00581         if (mMultiThreadImageReader->mQueue.size()>0)
00582           {
00583             MultiThreadImageReader::ImageToLoadPtr i = 
00584               mMultiThreadImageReader->mQueue.remove_top();
00585 
00586             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
00587             //mMutex.Unlock();
00588 
00589             
00590             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
00591             //                << i->GetFilename() << "'" << std::endl;
00592             
00593             // Do the job
00594             vtkImageData* im = Read(i->GetFilename());
00595 
00596             // Store it in the map
00597             mMultiThreadImageReader->MultiThreadImageReaderEventLock();
00598             //mMutex.Lock();
00599             MultiThreadImageReader::ImageToLoad itl(0,i->GetFilename());
00600             MultiThreadImageReader::ImageMapType::iterator it = 
00601               mMultiThreadImageReader->mImages.find(&itl);
00602             MultiThreadImageReader::ImageToLoadPtr 
00603               pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
00604               (it->first);
00605             pitl->SetImage(im);
00606             mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
00607             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();           //mMutex.Unlock();
00608             
00609             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
00610             //                << i->GetFilename() << "' : DONE" << std::endl;
00611             
00612           }
00613         else 
00614           {
00615             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
00616             //mMutex.Unlock();
00617             // Wait a little to avoid blocking 
00618             Sleep(10);
00619           }
00620       };
00621     //    std::cout << "### Thread "<<GetCurrentId()<<" stopping"
00622     //                << std::endl;
00623        
00624     return 0;
00625   }
00626   //=====================================================================
00627 
00628   //=====================================================================
00629   void ThreadedImageReader::OnExit()
00630   {
00631     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
00632       ("",
00633        MultiThreadImageReaderUser::ThreadedReaderStopped,
00634        0);
00635   }
00636   //=====================================================================
00637 
00638   //=====================================================================
00639   vtkImageData* ThreadedImageReader::Read(const std::string& filename)
00640   {
00641     return mReader.ReadImage(filename);
00642   }
00643   //=====================================================================
00644 
00645 } // namespace creaImageIO