creaImageIO_lib
creaImageIOMultiThreadImageReader.cpp
Go to the documentation of this file.
1 /*
2 # ---------------------------------------------------------------------
3 #
4 # Copyright (c) CREATIS (Centre de Recherche en Acquisition et Traitement de l'Image
5 # pour la Santé)
6 # Authors : Eduardo Davila, Frederic Cervenansky, Claire Mouton
7 # Previous Authors : Laurent Guigues, Jean-Pierre Roux
8 # CreaTools website : www.creatis.insa-lyon.fr/site/fr/creatools_accueil
9 #
10 # This software is governed by the CeCILL-B license under French law and
11 # abiding by the rules of distribution of free software. You can use,
12 # modify and/ or redistribute the software under the terms of the CeCILL-B
13 # license as circulated by CEA, CNRS and INRIA at the following URL
14 # http://www.cecill.info/licences/Licence_CeCILL-B_V1-en.html
15 # or in the file LICENSE.txt.
16 #
17 # As a counterpart to the access to the source code and rights to copy,
18 # modify and redistribute granted by the license, users are provided only
19 # with a limited warranty and the software's author, the holder of the
20 # economic rights, and the successive licensors have only limited
21 # liability.
22 #
23 # The fact that you are presently reading this means that you have had
24 # knowledge of the CeCILL-B license and that you accept its terms.
25 # ------------------------------------------------------------------------
26 */
27 
28 
30 #include <creaImageIOImageReader.h>
31 #include <wx/utils.h>
32 #include <creaImageIOSystem.h>
33 
34 #include <creaImageIOGimmick.h>
35 #ifdef _DEBUG
36 #define new DEBUG_NEW
37 #endif
38 namespace creaImageIO
39 {
40 
41  //=====================================================================
43  ( const std::string& filename,
44  EventType type,
45  vtkImageData* image)
46  {
47  wxMutexLocker lock(mMultiThreadImageReaderUserMutex);
48 
49  this->OnMultiThreadImageReaderEvent(filename,type,image);
50  }
51  //=====================================================================
52 
53  //=====================================================================
54  class ThreadedImageReader: public wxThread
55  {
56  public:
59  {}
60 
61  void* Entry();
62  void OnExit();
63 
64  vtkImageData* Read(const std::string& filename);
65 
66  struct deleter
67  {
69  {
70  p->Delete();
71  }
72  };
73  friend struct deleter;
74 
75 
76  private:
79 
80  };
81 
82  //=====================================================================
83 
84 
85  //=====================================================================
87  : //mDoNotSignal(false),
88  mReader(0),
89  mTotalMem(0),
90  mTotalMemMax(1000000)
91  {
92  // std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
93  // << " #threads= " << number_of_threads <<" )"<<std::endl;
94 
95  mDone = false;
96  // Create the threads
97  for (int i=0; i<number_of_threads; i++)
98  {
99  //ThreadedImageReader* t = new ThreadedImageReader(this);
100  boost::shared_ptr<ThreadedImageReader> t(new ThreadedImageReader(this), ThreadedImageReader::deleter());
101  mThreadedImageReaderList.push_back(t);
102  std::cout << " ===> Thread "<<i
103  <<" successfully added"<< std::endl;
104  }
106  // Init the queue
107  mQueue.set(mComparator);
108  mQueue.set(mIndexer);
109  //
110  // no thread : alloc self reader
111 // if (number_of_threads==0)
112 // {
113  mReader = new ImageReader();
114 // }
115  }
116  //=====================================================================
117 
118 
119  //=====================================================================
121  {
122 
123  // std::cout << "#### MultiThreadImageReader::Start()"
124  // <<std::endl;
125  if (mNumberOfThreadedReadersRunning > 0) return true;
126 
127  ThreadedImageReaderListType::iterator i;
128  for (i =mThreadedImageReaderList.begin();
129  i!=mThreadedImageReaderList.end();
130  i++)
131  {
132  (*i)->Create();
133  if ( (*i)->Run() != wxTHREAD_NO_ERROR )
134  {
135  std::cout << "ERROR starting a thread"<< std::endl;
136  return false;
137  }
138  else
139  {
140  std::cout << " ===> Thread "<<(*i)->GetCurrentId()
141  <<" successfully created"<< std::endl;
142 
143  }
144  }
145  wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
146  // std::cout << "EO Start : #Threads running = "
147  // << mNumberOfThreadedReadersRunning<<std::endl;
148 
149  return true;
150  }
151  //=====================================================================
152 
153  //=====================================================================
155  {
156 // std::cout << "#### MultiThreadImageReader::Stop()"
157 // <<std::endl;
158  // std::cout << "Sending stop order to the threads..."<<std::endl;
159  if (mDone) return;
160 
161  ThreadedImageReaderListType::iterator i;
162  for (i =mThreadedImageReaderList.begin();
163  i!=mThreadedImageReaderList.end();
164  i++)
165  { std::cout << " ===> Thread "<<(*i)->GetCurrentId()
166  <<" successfully stopped"<< std::endl;
167  if((*i)->IsAlive())
168  {(*i)->Pause();
169  (*i).reset();
170  // (*i)->Delete();
171  }
172  }
173  mThreadedImageReaderList.clear();
174  // Wait a little to be sure that all threads have stopped
175  // A better way to do this ?
176  // wxMilliSleep(1000);
177  // New method : the threads generate a stop event when they have finished
178  // We wait until all threads have stopped
179 // std::cout << "Waiting for stop signals..."<<std::endl;
180  do
181  {
182  // Sleep a little
183  wxMilliSleep(10);
184  // Lock
185  {
186  wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
187 // std::cout << "#Threads running = "
188 // << mNumberOfThreadedReadersRunning<<std::endl;
189  // Break if all readers have stopped
191  {
192  break;
193  }
194  }
195  }
196  while (true);
197 // std::cout << "All threads stopped : OK "<<std::endl;
198 
199  ImageMapType::iterator j;
200  for (j =mImages.begin();
201  j!=mImages.end();
202  ++j)
203 
204  {
205  delete j->first;
206  }
207  mImages.clear();
208  mDone = true;
209  }
210  //=====================================================================
211 
212  //=====================================================================
214  {
215  // std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
216  // <<std::endl;
217  Stop();
218  if (mReader) delete mReader;
219  mThreadedImageReaderList.clear();
220  }
221  //=====================================================================
222 
223  //=====================================================================
225  int priority)
226  {
227  // not in unload queue : ciao
228  if (p->UnloadIndex()<0) return;
229  int old_prio = p->GetPriority();
230  if (priority > old_prio)
231  {
232  p->SetPriority(priority);
233  mUnloadQueue.downsort(p->UnloadIndex());
234  }
235  else if ( old_prio > priority )
236  {
237  p->SetPriority(priority);
238  mUnloadQueue.upsort(p->UnloadIndex());
239  }
240  }
241  //=====================================================================
242  // function to read attributes for a file
243  void MultiThreadImageReader::getAttributes(const std::string filename,
244  std::map <std::string , std::string> &infos,std::vector<std::string> i_attr)
245  {
246  mReader->getAttributes(filename, infos, i_attr);
247  }
248 
249  //=====================================================================
251  const std::string& filename,
252  int priority )
253  {
254  wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
255 
257 // if (mThreadedImageReaderList.size()==0)
258  {
259  // no detached reader : use self reader
260  ImageToLoad itl(user,filename);
261  ImageMapType::iterator i = mImages.find(&itl);
262  if (i!=mImages.end())
263  {
264  ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
265  // Already inserted
266  if (pitl->GetImage() != 0)
267  {
268  // Already read
269  pitl->SetUser(user);
270  UpdateUnloadPriority(pitl,priority);
271  SignalImageRead(pitl,false);
272  return; // pitl->GetImage();
273  }
274  }
275  ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
276  mImages[pitl] = 0;
277  pitl->SetImage(mReader->ReadImage(filename));
278  UpdateUnloadPriority(pitl,priority);
279  SignalImageRead(pitl,true);
280  // return pitl->GetImage();
281  return;
282  }
283 
284  ImageToLoad itl(user,filename);
285  ImageMapType::iterator i = mImages.find(&itl);
286  if (i!=mImages.end())
287  {
288  // Already inserted
289  if (i->first->GetImage() != 0)
290  {
291  // Already read : ok :signal the user
292  UpdateUnloadPriority(i->first,priority);
293  SignalImageRead(i->first,false);
294  return;
295  }
297  ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
298  pitl->SetPriority(priority);
299  // Already in queue
300  if (pitl->Index()>=0)
301  {
302  // Re-sort the queue
303  mQueue.upsort(pitl->Index());
304  }
305  // Not read but not in queue = being read = ok
306  else
307  {
308 
309  }
310  }
311  else
312  {
313  // Never requested before or unloaded
314  ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
315  mImages[pitl] = 0;
316  mQueue.insert(pitl);
317  }
318  }
319  //=====================================================================
320 
321  //=====================================================================
323  (const std::string& filename,
325  vtkImageData* image)
326  {
328  (filename == mRequestedFilename))
329  {
330  mRequestedImage = image;
331  }
333  {
334  mNumberOfThreadedReadersRunning++;
335  // std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
336  }
338  {
339 
340  mNumberOfThreadedReadersRunning--;
341  // std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
342  }
343  }
344  //=====================================================================
345 
346  //=====================================================================
347  vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
348  {
349  // Start();
350  // std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
351  // <<std::endl;
352 
353  do
354  {
355  // wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
356 
357  // std::cout << "** MultiThreadImageReader::GetImage('"<<filename
358  // <<"') lock ok"
359  // <<std::endl;
360 
361  // if (mNumberOfThreadedReadersRunning==0)
362  // if (mThreadedImageReaderList.size()==0)
363  if (true)
364  {
365  ImageToLoad itl(this,filename);
366  ImageMapType::iterator i = mImages.find(&itl);
367  if (i!=mImages.end())
368  {
369  ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
370  // Already inserted
371  if (pitl->GetImage() != 0)
372  {
373  // Already read
376  return pitl->GetImage();
377  }
378  }
379  ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
380  mImages[pitl] = 0;
381  pitl->SetImage(mReader->ReadImage(filename));
384  return pitl->GetImage();
385  }
386 
387  /*
388  mRequestedFilename = filename;
389  mRequestedImage = 0;
390  ImageToLoad itl(this,filename);
391  ImageMapType::iterator i = mImages.find(&itl);
392  if (i!=mImages.end())
393  {
394  // Already inserted in queue
395  if (i->first->GetImage() != 0)
396  {
397  // Already read : ok : return it
398  return i->first->GetImage();
399  }
401  ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
402  pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
403  pitl->SetUser( this );
404  // Already in queue
405  if (pitl->Index()>=0)
406  {
407  // Re-sort the queue
408  mQueue.upsort(pitl->Index());
409  }
410  // Not read but not in queue = being read = ok
411  else
412  {
413  pitl->SetUser( this );
414  }
415  }
416  else
417  {
418 
419  // Never requested before or unloaded
420  ImageToLoadPtr pitl =
421  new ImageToLoad(this,filename,
422  GetMaximalPriorityWithoutLocking() + 1);
423  mImages[pitl] = 0;
424  mQueue.insert(pitl);
425  }
426  */
427  }
428  while (0);
429 
430  // std::cout << "Waiting..."<<std::endl;
431 
432  /*
433  // Waiting that it is read
434  int n = 0;
435  do
436  {
437  // std::cout << n++ << std::endl;
438  wxMilliSleep(10);
439  do
440  {
441  // wxMutexLocker lock(mMutex);
442  wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
443  if (mRequestedImage!=0)
444  {
445  return mRequestedImage;
446  }
447  }
448  while (0);
449  }
450  while (true);
451  //
452  */
453  }
454  //=====================================================================
455 
456  //=====================================================================
458  bool purge)
459  {
460 
461 // std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
462  // std::cout << "this="<<this <<std::endl;
463  // std::cout << "user="<<p->GetUser() <<std::endl;
464 
465  if ( p->GetUser() == this )
467 
469  (p->GetFilename(),
471  p->GetImage());
472 
473  /*
474  AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
475  BUGGY : TO FIX
476  */
477  if (!purge) return;
478  GimmickMessage(5,"Image '"<<p->GetFilename()<<"' read"<<std::endl);
479 
480  // wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
481 
482  mUnloadQueue.insert(p);
483  p->GetImage()->UpdateInformation();
484  p->GetImage()->PropagateUpdateExtent();
485  long ImMem = p->GetImage()->GetEstimatedMemorySize();
486  mTotalMem += ImMem;
487 
488  GimmickMessage(5,"==> Image in memory = "<<mUnloadQueue.size()<<std::endl);
489  GimmickMessage(5,"==> Total mem = "<<mTotalMem<<" Ko"<<std::endl);
490 
491  // return;
492 
493  while (mTotalMem > mTotalMemMax)
494  {
495  GimmickMessage(5,
496  " ! Exceeded max of "
497  << mTotalMemMax << " Ko : unloading oldest image ... "
498  << std::endl);
499  if ( mUnloadQueue.size() <= 1 )
500  {
501  GimmickMessage(5,
502  " Only one image : cannot load AND unload it !!"
503  <<std::endl);
504  break;
505 
506  }
507  ImageToLoadPtr unload = mUnloadQueue.remove_top();
508  MultiThreadImageReaderUser* user = unload->GetUser();
509 
510  /*
511  if ((user!=0)&&(user!=this))
512  {
513  user->GetMultiThreadImageReaderUserMutex().Lock();
514  }
515  */
516 
517  std::string filename = unload->GetFilename();
518 
519  GimmickMessage(5,"'" << filename << "'" << std::endl);
520  mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
521 
522  GimmickMessage(5," ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl);
523 
524  if (user!=0)
525  {
526  // std::cout << "unlock..."<<std::endl;
527  // user->GetMultiThreadImageReaderUserMutex().Unlock();
528  // std::cout << "event"<<std::endl;
530  (filename,
532  0);
533  // std::cout << "event ok"<<std::endl;
534  }
535 
536  if (unload->Index()>=0)
537  {
538  // GimmickMessage(5,"still in queue"<<std::endl);
539  }
540  unload->Index() = -1;
541 
542 
543  ImageMapType::iterator it = mImages.find(unload);
544  if (it!=mImages.end())
545  {
546  mImages.erase(it);
547  }
548  // std::cout << "delete..."<<std::endl;
549  delete unload;
550  // std::cout << "delete ok."<<std::endl;
551 
552  }
553  }
554  //=====================================================================
555 
556  //=====================================================================
558  {
559  wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
561  }
562  //=====================================================================
563 
564 
565  //=====================================================================
567  {
568  long max = 0;
569  if (mQueue.size()>0)
570  {
571  max = mQueue.top()->GetPriority();
572  }
573  if (mUnloadQueue.size()>0)
574  {
575  int max2 = mUnloadQueue.top()->GetPriority();
576  if (max2>max) max=max2;
577  }
578  return max;
579  }
580  //=====================================================================
581 
582 
583  //=====================================================================
584  //=====================================================================
585  //=====================================================================
586  //=====================================================================
587 
588  //=====================================================================
590  {
591  // std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
592  // << std::endl;
593 
594  mMultiThreadImageReader->MultiThreadImageReaderSendEvent
595  ("",
597  0);
598 
599  // While was not deleted
600  while (!TestDestroy())
601  {
602  //std::cout << "### Thread "<<GetCurrentId()<<" still alive" << std::endl;
603 
604  // Lock the mutex
606  //mMutex.Lock();
607  // If image in queue
608  if (mMultiThreadImageReader->mQueue.size()>0)
609  {
611  mMultiThreadImageReader->mQueue.remove_top();
612 
614  //mMutex.Unlock();
615 
616 
617  // std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
618  // << i->GetFilename() << "'" << std::endl;
619 
620  // Do the job
621  vtkImageData* im = Read(i->GetFilename());
622 
623  // Store it in the map
625  //mMutex.Lock();
627  MultiThreadImageReader::ImageMapType::iterator it =
628  mMultiThreadImageReader->mImages.find(&itl);
630  pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
631  (it->first);
632  pitl->SetImage(im);
633  mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
635 
636  // std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
637  // << i->GetFilename() << "' : DONE" << std::endl;
638 
639  }
640  else
641  {
643  //mMutex.Unlock();
644  // Wait a little to avoid blocking
645  Sleep(10);
646  }
647  };
648  // std::cout << "### Thread "<<GetCurrentId()<<" stopping"
649  // << std::endl;
650 
651  return 0;
652  }
653  //=====================================================================
654 
655  //=====================================================================
657  {
658  mMultiThreadImageReader->MultiThreadImageReaderSendEvent
659  ("",
661  0);
662  }
663  //=====================================================================
664 
665  //=====================================================================
666  vtkImageData* ThreadedImageReader::Read(const std::string& filename)
667  {
668  return mReader.ReadImage(filename);
669  }
670  //=====================================================================
671 
672 } // namespace creaImageIO