creaImageIOMultiThreadImageReader.cpp

Go to the documentation of this file.
00001 #include <creaImageIOMultiThreadImageReader.h>
00002 #include <creaImageIOImageReader.h>
00003 #include <wx/utils.h>
00004 #include <creaImageIOSystem.h>
00005 
00006 #include <creaImageIOGimmick.h>
00007 #ifdef _DEBUG
00008 #define new DEBUG_NEW
00009 #endif
00010 namespace creaImageIO
00011 {
00012 
00013   //=====================================================================
00014   void MultiThreadImageReaderUser::MultiThreadImageReaderSendEvent
00015   ( const std::string& filename,
00016     EventType type,
00017     vtkImageData* image)
00018   {
00019     wxMutexLocker lock(mMultiThreadImageReaderUserMutex);
00020 
00021     this->OnMultiThreadImageReaderEvent(filename,type,image);
00022   }
00023   //=====================================================================
00024 
00025   //=====================================================================
00026   class ThreadedImageReader: public wxThread
00027   {
00028   public:
00029     ThreadedImageReader(MultiThreadImageReader* tir) :
00030       mMultiThreadImageReader(tir)
00031     {}
00032 
00033     void* Entry();
00034     void  OnExit();
00035 
00036     vtkImageData* Read(const std::string& filename);
00037     
00038         struct deleter
00039         {
00040                 void operator()(ThreadedImageReader* p)
00041                 {
00042                         p->Delete();
00043                 }
00044         };
00045         friend struct deleter;
00046 
00047 
00048   private:
00049     ImageReader mReader;
00050     MultiThreadImageReader* mMultiThreadImageReader;
00051         
00052   };
00053 
00054   //=====================================================================
00055 
00056   
00057   //=====================================================================
00058   MultiThreadImageReader::MultiThreadImageReader(int number_of_threads)
00059     : //mDoNotSignal(false),
00060       mReader(0),
00061       mTotalMem(0),
00062       mTotalMemMax(1000000)
00063   {
00064     //    std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
00065     //        << " #threads= " << number_of_threads <<" )"<<std::endl;
00066 
00067           mDone = false;
00068     // Create the threads
00069     for (int i=0; i<number_of_threads; i++) 
00070       {
00071                   //ThreadedImageReader* t = new ThreadedImageReader(this);
00072                   boost::shared_ptr<ThreadedImageReader> t(new ThreadedImageReader(this), ThreadedImageReader::deleter());
00073         mThreadedImageReaderList.push_back(t);
00074          std::cout << "  ===> Thread "<<i
00075                       <<" successfully added"<< std::endl;
00076       }
00077     mNumberOfThreadedReadersRunning = 0;
00078     // Init the queue
00079     mQueue.set(mComparator);
00080     mQueue.set(mIndexer);
00081     // 
00082     // no thread : alloc self reader
00083 //    if (number_of_threads==0)
00084 //      {
00085         mReader = new ImageReader();
00086 //      }
00087   }
00088   //=====================================================================
00089 
00090 
00091   //=====================================================================
00092   bool MultiThreadImageReader::Start()
00093   {
00094 
00095     //    std::cout << "#### MultiThreadImageReader::Start()"
00096     //                <<std::endl;
00097           if (mNumberOfThreadedReadersRunning > 0) return true;
00098           
00099     ThreadedImageReaderListType::iterator i;
00100     for (i =mThreadedImageReaderList.begin();
00101          i!=mThreadedImageReaderList.end();
00102          i++)
00103       {
00104         (*i)->Create();
00105         if ( (*i)->Run() != wxTHREAD_NO_ERROR )
00106           {
00107             std::cout << "ERROR starting a thread"<< std::endl;
00108             return false;
00109           }
00110         else 
00111           {
00112                     std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
00113                               <<" successfully created"<< std::endl;
00114             
00115           }
00116       }
00117     wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
00118     //    std::cout << "EO Start : #Threads running = "
00119     //                << mNumberOfThreadedReadersRunning<<std::endl;
00120 
00121     return true;
00122   }
00123   //=====================================================================
00124 
00125   //=====================================================================
00126   void MultiThreadImageReader::Stop()
00127   { 
00128 //                  std::cout << "#### MultiThreadImageReader::Stop()"
00129 //            <<std::endl;
00130   //  std::cout << "Sending stop order to the threads..."<<std::endl;
00131           if (mDone) return;
00132 
00133     ThreadedImageReaderListType::iterator i;
00134     for (i =mThreadedImageReaderList.begin();
00135          i!=mThreadedImageReaderList.end();
00136          i++)
00137       { std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
00138                               <<" successfully stopped"<< std::endl;
00139                   if((*i)->IsAlive())
00140                   {(*i)->Pause();
00141                           (*i).reset();
00142                          //                       (*i)->Delete();
00143                   }
00144       }
00145    mThreadedImageReaderList.clear();
00146     // Wait a little to be sure that all threads have stopped
00147     // A better way to do this ?
00148     //    wxMilliSleep(1000);
00149     // New method : the threads generate a stop event when they have finished
00150     // We wait until all threads have stopped
00151 //        std::cout << "Waiting for stop signals..."<<std::endl;
00152     do 
00153       {
00154         // Sleep a little
00155                 wxMilliSleep(10);
00156         // Lock
00157         {
00158           wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
00159 //                std::cout << "#Threads running = "
00160 //                          << mNumberOfThreadedReadersRunning<<std::endl;
00161           // Break if all readers have stopped
00162           if (mNumberOfThreadedReadersRunning <= 0) 
00163             {
00164               break;
00165             }
00166         }
00167       } 
00168     while (true);
00169 //        std::cout << "All threads stopped : OK "<<std::endl;
00170 
00171     ImageMapType::iterator j;
00172     for (j =mImages.begin();
00173          j!=mImages.end();
00174          ++j)
00175 
00176       {
00177         delete j->first;
00178       }
00179     mImages.clear();
00180         mDone = true;
00181   }
00182   //=====================================================================
00183 
00184   //=====================================================================
00185   MultiThreadImageReader::~MultiThreadImageReader()
00186   {
00187     //    std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
00188     //        <<std::endl;
00189     Stop();
00190     if (mReader) delete mReader;
00191         mThreadedImageReaderList.clear();
00192   }
00193   //=====================================================================
00194 
00195   //=====================================================================
00196   void MultiThreadImageReader::UpdateUnloadPriority(ImageToLoadPtr p, 
00197                                                     int priority)
00198   {
00199     // not in unload queue : ciao
00200     if (p->UnloadIndex()<0) return;
00201     int old_prio = p->GetPriority();
00202     if (priority > old_prio) 
00203       {
00204         p->SetPriority(priority);
00205         mUnloadQueue.downsort(p->UnloadIndex());
00206       }
00207     else if ( old_prio > priority )
00208       {
00209         p->SetPriority(priority);
00210         mUnloadQueue.upsort(p->UnloadIndex());
00211      }
00212   }
00213   //=====================================================================
00214   // function to read attributes for a file
00215   void MultiThreadImageReader::getAttributes(const std::string filename, 
00216           std::map <std::string , std::string> &infos,std::vector<std::string> i_attr)
00217   {
00218           mReader->getAttributes(filename, infos, i_attr);
00219   }
00220 
00221   //=====================================================================
00222   void MultiThreadImageReader::Request( MultiThreadImageReaderUser* user,
00223                                         const std::string& filename, 
00224                                         int priority )
00225   {
00226         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
00227 
00228           if (mNumberOfThreadedReadersRunning==0)
00229 //    if (mThreadedImageReaderList.size()==0) 
00230       {
00231         // no detached reader : use self reader
00232         ImageToLoad itl(user,filename);
00233         ImageMapType::iterator i = mImages.find(&itl);
00234         if (i!=mImages.end())
00235           {
00236             ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
00237             // Already inserted
00238             if (pitl->GetImage() != 0)
00239               {
00240                 // Already read
00241                 pitl->SetUser(user);
00242                 UpdateUnloadPriority(pitl,priority);
00243                 SignalImageRead(pitl,false);
00244                 return; // pitl->GetImage();
00245               }
00246           }
00247         ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
00248         mImages[pitl] = 0;
00249         pitl->SetImage(mReader->ReadImage(filename));
00250         UpdateUnloadPriority(pitl,priority);
00251         SignalImageRead(pitl,true);
00252         //      return pitl->GetImage();
00253         return;
00254       }
00255 
00256     ImageToLoad itl(user,filename);
00257     ImageMapType::iterator i = mImages.find(&itl);
00258     if (i!=mImages.end())
00259       {
00260         // Already inserted
00261         if (i->first->GetImage() != 0)
00262           {
00263             // Already read : ok :signal the user
00264             UpdateUnloadPriority(i->first,priority);
00265             SignalImageRead(i->first,false);
00266             return;
00267           }
00269         ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
00270         pitl->SetPriority(priority);
00271         // Already in queue
00272         if (pitl->Index()>=0) 
00273           {
00274             // Re-sort the queue
00275             mQueue.upsort(pitl->Index());
00276           }
00277         // Not read but not in queue = being read = ok
00278         else 
00279           {
00280             
00281           }
00282       }
00283     else 
00284       {
00285         // Never requested before or unloaded 
00286         ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
00287         mImages[pitl] = 0;
00288         mQueue.insert(pitl);
00289       }
00290   }
00291   //=====================================================================
00292 
00293   //=====================================================================
00294   void MultiThreadImageReader::OnMultiThreadImageReaderEvent
00295   (const std::string& filename,
00296    MultiThreadImageReaderUser::EventType e,
00297    vtkImageData* image)
00298   {
00299     if ((e==MultiThreadImageReaderUser::ImageLoaded) &&
00300         (filename == mRequestedFilename))
00301       {
00302         mRequestedImage = image;
00303       }
00304     else if (e==MultiThreadImageReaderUser::ThreadedReaderStarted)
00305       {
00306         mNumberOfThreadedReadersRunning++;
00307         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
00308       }
00309     else if (e==MultiThreadImageReaderUser::ThreadedReaderStopped)
00310       {
00311         
00312                  mNumberOfThreadedReadersRunning--;
00313         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
00314       }
00315   }
00316   //=====================================================================
00317 
00318   //=====================================================================
00319   vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
00320   {
00321          // Start();
00322     //       std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
00323     //           <<std::endl;
00324     
00325     do 
00326       {
00327         //      wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
00328                 
00329         //     std::cout << "** MultiThreadImageReader::GetImage('"<<filename
00330         //             <<"') lock ok"
00331         //               <<std::endl;
00332     
00333         //                if (mNumberOfThreadedReadersRunning==0)
00334         //      if (mThreadedImageReaderList.size()==0)
00335         if (true)
00336           {
00337             ImageToLoad itl(this,filename);
00338             ImageMapType::iterator i = mImages.find(&itl);
00339             if (i!=mImages.end())
00340               {
00341                 ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
00342                 // Already inserted
00343                 if (pitl->GetImage() != 0)
00344                   {
00345                     // Already read
00346                     UpdateUnloadPriority(pitl,
00347                                          GetMaximalPriorityWithoutLocking()+1);
00348                     return pitl->GetImage();
00349                   }
00350               }
00351             ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
00352             mImages[pitl] = 0;
00353             pitl->SetImage(mReader->ReadImage(filename));
00354             UpdateUnloadPriority(pitl,
00355                                  GetMaximalPriorityWithoutLocking()+1);
00356             return pitl->GetImage();
00357           }
00358 
00359         /*      
00360         mRequestedFilename = filename;
00361         mRequestedImage = 0;
00362         ImageToLoad itl(this,filename);
00363         ImageMapType::iterator i = mImages.find(&itl);
00364         if (i!=mImages.end())
00365           {
00366             // Already inserted in queue
00367             if (i->first->GetImage() != 0)
00368               {
00369                 // Already read : ok : return it 
00370                 return i->first->GetImage();
00371               }
00373               ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
00374               pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
00375               pitl->SetUser( this );
00376               // Already in queue
00377               if (pitl->Index()>=0) 
00378                 {
00379                   // Re-sort the queue
00380                   mQueue.upsort(pitl->Index());
00381                 }
00382               // Not read but not in queue = being read = ok
00383               else 
00384                 {
00385                   pitl->SetUser( this );
00386                 }
00387           }
00388         else 
00389           {
00390             
00391             // Never requested before or unloaded 
00392             ImageToLoadPtr pitl = 
00393               new ImageToLoad(this,filename,
00394                               GetMaximalPriorityWithoutLocking() + 1);
00395             mImages[pitl] = 0;
00396             mQueue.insert(pitl);
00397           }
00398         */
00399       }
00400     while (0);
00401 
00402     //    std::cout << "Waiting..."<<std::endl;
00403 
00404     /*
00405     // Waiting that it is read
00406     int n = 0;
00407     do 
00408       {
00409         //      std::cout << n++ << std::endl;
00410         wxMilliSleep(10);
00411         do 
00412           {
00413             //      wxMutexLocker lock(mMutex);
00414             wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
00415             if (mRequestedImage!=0) 
00416               {
00417                 return mRequestedImage;
00418               } 
00419           }
00420         while (0);
00421       }
00422     while (true);
00423     // 
00424     */
00425   }
00426   //=====================================================================
00427   
00428   //=====================================================================
00429   void MultiThreadImageReader::SignalImageRead(ImageToLoadPtr p, 
00430                                                bool purge)
00431   {
00432     
00433 //    std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
00434     //    std::cout << "this="<<this <<std::endl;
00435     //    std::cout << "user="<<p->GetUser() <<std::endl;
00436 
00437     if ( p->GetUser() == this ) 
00438       GetMultiThreadImageReaderUserMutex().Unlock();
00439 
00440     p->GetUser()->MultiThreadImageReaderSendEvent
00441       (p->GetFilename(),
00442        MultiThreadImageReaderUser::ImageLoaded,
00443        p->GetImage());
00444 
00445     /*
00446       AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
00447       BUGGY : TO FIX 
00448     */
00449     if (!purge)  return;
00450     GimmickMessage(5,"Image '"<<p->GetFilename()<<"' read"<<std::endl);
00451 
00452     //    wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
00453            
00454     mUnloadQueue.insert(p);
00455     p->GetImage()->UpdateInformation();
00456     p->GetImage()->PropagateUpdateExtent();
00457     long ImMem = p->GetImage()->GetEstimatedMemorySize();
00458     mTotalMem += ImMem;
00459 
00460     GimmickMessage(5,"==> Image in memory = "<<mUnloadQueue.size()<<std::endl);
00461     GimmickMessage(5,"==> Total mem       = "<<mTotalMem<<" Ko"<<std::endl);
00462 
00463     //  return;
00464 
00465     while (mTotalMem > mTotalMemMax)
00466       {
00467         GimmickMessage(5,
00468                        "   ! Exceeded max of "
00469                        << mTotalMemMax << " Ko : unloading oldest image ... "
00470                        << std::endl);
00471         if ( mUnloadQueue.size() <= 1 ) 
00472           {
00473              GimmickMessage(5,
00474                             "   Only one image : cannot load AND unload it !!"
00475                             <<std::endl);
00476             break; 
00477             
00478           }
00479         ImageToLoadPtr unload = mUnloadQueue.remove_top();
00480         MultiThreadImageReaderUser* user = unload->GetUser();
00481 
00482         /*
00483         if ((user!=0)&&(user!=this)) 
00484           {
00485             user->GetMultiThreadImageReaderUserMutex().Lock();
00486           }
00487         */
00488 
00489         std::string filename = unload->GetFilename();
00490 
00491         GimmickMessage(5,"'" << filename << "'" << std::endl);
00492         mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
00493 
00494         GimmickMessage(5," ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl);
00495 
00496         if (user!=0) 
00497           {
00498             //      std::cout << "unlock..."<<std::endl;
00499             //   user->GetMultiThreadImageReaderUserMutex().Unlock();
00500             //      std::cout << "event"<<std::endl;
00501             user->MultiThreadImageReaderSendEvent
00502               (filename,
00503                MultiThreadImageReaderUser::ImageUnloaded,
00504                0);
00505             //      std::cout << "event ok"<<std::endl;
00506           }     
00507 
00508         if (unload->Index()>=0)
00509           {
00510             // GimmickMessage(5,"still in queue"<<std::endl);
00511           }
00512         unload->Index() = -1;
00513 
00514 
00515         ImageMapType::iterator it = mImages.find(unload);
00516         if (it!=mImages.end())
00517           {
00518             mImages.erase(it);
00519           }
00520         //          std::cout << "delete..."<<std::endl;
00521         delete unload;
00522         //          std::cout << "delete ok."<<std::endl;
00523 
00524       }
00525   }
00526   //=====================================================================
00527 
00528   //=====================================================================
00529   int MultiThreadImageReader::GetMaximalPriority()
00530   { 
00531     wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
00532     return GetMaximalPriorityWithoutLocking();
00533   }
00534   //=====================================================================
00535 
00536 
00537   //=====================================================================
00538   int MultiThreadImageReader::GetMaximalPriorityWithoutLocking()
00539   { 
00540     long max = 0;
00541     if (mQueue.size()>0) 
00542       {
00543         max = mQueue.top()->GetPriority();
00544       }
00545     if (mUnloadQueue.size()>0)
00546       {
00547         int max2 = mUnloadQueue.top()->GetPriority();
00548         if (max2>max) max=max2;
00549       }
00550     return max;
00551   }
00552   //=====================================================================
00553 
00554 
00555   //=====================================================================
00556   //=====================================================================
00557   //=====================================================================
00558   //=====================================================================
00559 
00560   //=====================================================================
00561   void*  ThreadedImageReader::Entry()
00562   {
00563     //    std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
00564     //                << std::endl;
00565 
00566     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
00567       ("",
00568        MultiThreadImageReaderUser::ThreadedReaderStarted,
00569        0);
00570 
00571     // While was not deleted 
00572     while (!TestDestroy())
00573       {
00574                 //std::cout << "### Thread "<<GetCurrentId()<<" still alive"  << std::endl;
00575           
00576         // Lock the mutex
00577         mMultiThreadImageReader->MultiThreadImageReaderEventLock();
00578         //mMutex.Lock();
00579         // If image in queue
00580         if (mMultiThreadImageReader->mQueue.size()>0)
00581           {
00582             MultiThreadImageReader::ImageToLoadPtr i = 
00583               mMultiThreadImageReader->mQueue.remove_top();
00584 
00585             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
00586             //mMutex.Unlock();
00587 
00588             
00589             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
00590             //                << i->GetFilename() << "'" << std::endl;
00591             
00592             // Do the job
00593             vtkImageData* im = Read(i->GetFilename());
00594 
00595             // Store it in the map
00596             mMultiThreadImageReader->MultiThreadImageReaderEventLock();
00597             //mMutex.Lock();
00598             MultiThreadImageReader::ImageToLoad itl(0,i->GetFilename());
00599             MultiThreadImageReader::ImageMapType::iterator it = 
00600               mMultiThreadImageReader->mImages.find(&itl);
00601             MultiThreadImageReader::ImageToLoadPtr 
00602               pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
00603               (it->first);
00604             pitl->SetImage(im);
00605             mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
00606             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();           //mMutex.Unlock();
00607             
00608             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
00609             //                << i->GetFilename() << "' : DONE" << std::endl;
00610             
00611           }
00612         else 
00613           {
00614             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
00615             //mMutex.Unlock();
00616             // Wait a little to avoid blocking 
00617             Sleep(10);
00618           }
00619       };
00620     //    std::cout << "### Thread "<<GetCurrentId()<<" stopping"
00621     //                << std::endl;
00622        
00623     return 0;
00624   }
00625   //=====================================================================
00626 
00627   //=====================================================================
00628   void ThreadedImageReader::OnExit()
00629   {
00630     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
00631       ("",
00632        MultiThreadImageReaderUser::ThreadedReaderStopped,
00633        0);
00634   }
00635   //=====================================================================
00636 
00637   //=====================================================================
00638   vtkImageData* ThreadedImageReader::Read(const std::string& filename)
00639   {
00640     return mReader.ReadImage(filename);
00641   }
00642   //=====================================================================
00643 
00644 } // namespace creaImageIO