41
41
int de265_thread_create (de265_thread* t, void *(*start_routine) (void *), void *arg) { return pthread_create (t,NULL ,start_routine,arg); }
42
42
void de265_thread_join (de265_thread t) { pthread_join (t,NULL ); }
43
43
void de265_thread_destroy (de265_thread* t) { }
44
- void de265_mutex_init (de265_mutex* m) { pthread_mutex_init (m,NULL ); }
45
- void de265_mutex_destroy (de265_mutex* m) { pthread_mutex_destroy (m); }
46
- void de265_mutex_lock (de265_mutex* m) { pthread_mutex_lock (m); }
47
- void de265_mutex_unlock (de265_mutex* m) { pthread_mutex_unlock (m); }
48
- void de265_cond_init (de265_cond* c) { pthread_cond_init (c,NULL ); }
49
- void de265_cond_destroy (de265_cond* c) { pthread_cond_destroy (c); }
50
- void de265_cond_broadcast (de265_cond* c,de265_mutex* m) { pthread_cond_broadcast (c); }
51
- void de265_cond_wait (de265_cond* c,de265_mutex* m) { pthread_cond_wait (c,m); }
52
- void de265_cond_signal (de265_cond* c) { pthread_cond_signal (c); }
53
44
#else // _WIN32
54
45
55
46
#define THREAD_RESULT_TYPE DWORD
@@ -66,37 +57,16 @@ int de265_thread_create(de265_thread* t, LPTHREAD_START_ROUTINE start_routine,
66
57
}
67
58
void de265_thread_join (de265_thread t) { WaitForSingleObject (t, INFINITE); }
68
59
void de265_thread_destroy (de265_thread* t) { CloseHandle (*t); *t = NULL ; }
69
- void de265_mutex_init (de265_mutex* m) { *m = CreateMutex (NULL , FALSE , NULL ); }
70
- void de265_mutex_destroy (de265_mutex* m) { CloseHandle (*m); }
71
- void de265_mutex_lock (de265_mutex* m) { WaitForSingleObject (*m, INFINITE); }
72
- void de265_mutex_unlock (de265_mutex* m) { ReleaseMutex (*m); }
73
- void de265_cond_init (de265_cond* c) { win32_cond_init (c); }
74
- void de265_cond_destroy (de265_cond* c) { win32_cond_destroy (c); }
75
- void de265_cond_broadcast (de265_cond* c,de265_mutex* m)
76
- {
77
- de265_mutex_lock (m);
78
- win32_cond_broadcast (c);
79
- de265_mutex_unlock (m);
80
- }
81
- void de265_cond_wait (de265_cond* c,de265_mutex* m) { win32_cond_wait (c,m); }
82
- void de265_cond_signal (de265_cond* c) { win32_cond_signal (c); }
83
60
#endif // _WIN32
84
61
85
62
86
-
87
-
88
63
de265_progress_lock::de265_progress_lock ()
89
64
{
90
65
mProgress = 0 ;
91
-
92
- de265_mutex_init (&mutex);
93
- de265_cond_init (&cond);
94
66
}
95
67
96
68
de265_progress_lock::~de265_progress_lock ()
97
69
{
98
- de265_mutex_destroy (&mutex);
99
- de265_cond_destroy (&cond);
100
70
}
101
71
102
72
void de265_progress_lock::wait_for_progress (int progress)
@@ -105,34 +75,29 @@ void de265_progress_lock::wait_for_progress(int progress)
105
75
return ;
106
76
}
107
77
108
- de265_mutex_lock (& mutex);
78
+ std::unique_lock<std::mutex> lock ( mutex);
109
79
while (mProgress < progress) {
110
- de265_cond_wait (& cond, &mutex );
80
+ cond. wait (lock );
111
81
}
112
- de265_mutex_unlock (&mutex);
113
82
}
114
83
115
84
void de265_progress_lock::set_progress (int progress)
116
85
{
117
- de265_mutex_lock (& mutex);
86
+ std::unique_lock<std::mutex> lock ( mutex);
118
87
119
88
if (progress>mProgress ) {
120
89
mProgress = progress;
121
90
122
- de265_cond_broadcast (& cond, &mutex );
91
+ cond. notify_all ( );
123
92
}
124
-
125
- de265_mutex_unlock (&mutex);
126
93
}
127
94
128
95
void de265_progress_lock::increase_progress (int progress)
129
96
{
130
- de265_mutex_lock (& mutex);
97
+ std::unique_lock<std::mutex> lock ( mutex);
131
98
132
99
mProgress += progress;
133
- de265_cond_broadcast (&cond, &mutex);
134
-
135
- de265_mutex_unlock (&mutex);
100
+ cond.notify_all ();
136
101
}
137
102
138
103
int de265_progress_lock::get_progress () const
@@ -191,54 +156,54 @@ static THREAD_RESULT_TYPE THREAD_CALLING_CONVENTION worker_thread(THREAD_PARAM_T
191
156
thread_pool* pool = (thread_pool*)pool_ptr;
192
157
193
158
194
- de265_mutex_lock (&pool->mutex );
195
-
196
159
while (true ) {
197
160
198
- // wait until we can pick a task or until the pool has been stopped
161
+ thread_task* task = nullptr ;
199
162
200
- for (;;) {
201
- // end waiting if thread- pool has been stopped or we have a task to execute
163
+ {
164
+ std::unique_lock<std::mutex> lock ( pool-> mutex );
202
165
203
- if (pool->stopped || pool->tasks .size ()>0 ) {
204
- break ;
205
- }
166
+ // wait until we can pick a task or until the pool has been stopped
206
167
207
- // printf("going idle\n");
208
- de265_cond_wait (&pool->cond_var , &pool->mutex );
209
- }
168
+ for (;;) {
169
+ // end waiting if thread-pool has been stopped or we have a task to execute
210
170
211
- // if the pool was shut down, end the execution
171
+ if (pool->stopped || pool->tasks .size ()>0 ) {
172
+ break ;
173
+ }
212
174
213
- if (pool->stopped ) {
214
- de265_mutex_unlock (&pool->mutex );
215
- return (THREAD_RESULT_TYPE)0 ;
216
- }
175
+ // printf("going idle\n");
176
+ pool->cond_var .wait (lock);
177
+ }
217
178
179
+ // if the pool was shut down, end the execution
218
180
219
- // get a task
181
+ if (pool->stopped ) {
182
+ return (THREAD_RESULT_TYPE)0 ;
183
+ }
220
184
221
- thread_task* task = pool->tasks .front ();
222
- pool->tasks .pop_front ();
223
185
224
- pool-> num_threads_working ++;
186
+ // get a task
225
187
226
- // printblks(pool);
188
+ task = pool->tasks .front ();
189
+ pool->tasks .pop_front ();
227
190
228
- de265_mutex_unlock (& pool->mutex ) ;
191
+ pool->num_threads_working ++ ;
229
192
193
+ // printblks(pool);
194
+ }
230
195
231
196
// execute the task
232
197
233
198
task->work ();
234
199
235
200
// end processing and check if this was the last task to be processed
236
201
237
- de265_mutex_lock (&pool->mutex );
202
+ // TODO: the num_threads_working can probably be an atomic integer
203
+ std::unique_lock<std::mutex> lock (pool->mutex );
238
204
239
205
pool->num_threads_working --;
240
206
}
241
- de265_mutex_unlock (&pool->mutex );
242
207
243
208
return (THREAD_RESULT_TYPE)0 ;
244
209
}
@@ -257,13 +222,12 @@ de265_error start_thread_pool(thread_pool* pool, int num_threads)
257
222
258
223
pool->num_threads = 0 ; // will be increased below
259
224
260
- de265_mutex_init (&pool-> mutex );
261
- de265_cond_init (& pool->cond_var );
225
+ {
226
+ std::unique_lock<std::mutex> lock ( pool->mutex );
262
227
263
- de265_mutex_lock (&pool->mutex );
264
- pool->num_threads_working = 0 ;
265
- pool->stopped = false ;
266
- de265_mutex_unlock (&pool->mutex );
228
+ pool->num_threads_working = 0 ;
229
+ pool->stopped = false ;
230
+ }
267
231
268
232
// start worker threads
269
233
@@ -283,32 +247,30 @@ de265_error start_thread_pool(thread_pool* pool, int num_threads)
283
247
284
248
void stop_thread_pool (thread_pool* pool)
285
249
{
286
- de265_mutex_lock (&pool->mutex );
287
- pool->stopped = true ;
288
- de265_mutex_unlock (&pool->mutex );
250
+ {
251
+ std::unique_lock<std::mutex> lock (pool->mutex );
252
+ pool->stopped = true ;
253
+ }
289
254
290
- de265_cond_broadcast (& pool->cond_var , &pool-> mutex );
255
+ pool->cond_var . notify_all ( );
291
256
292
257
for (int i=0 ;i<pool->num_threads ;i++) {
293
258
de265_thread_join (pool->thread [i]);
294
259
de265_thread_destroy (&pool->thread [i]);
295
260
}
296
-
297
- de265_mutex_destroy (&pool->mutex );
298
- de265_cond_destroy (&pool->cond_var );
299
261
}
300
262
301
263
302
264
void add_task (thread_pool* pool, thread_task* task)
303
265
{
304
- de265_mutex_lock (&pool->mutex );
266
+ std::unique_lock<std::mutex> lock (pool->mutex );
267
+
305
268
if (!pool->stopped ) {
306
269
307
270
pool->tasks .push_back (task);
308
271
309
272
// wake up one thread
310
273
311
- de265_cond_signal (& pool->cond_var );
274
+ pool->cond_var . notify_one ( );
312
275
}
313
- de265_mutex_unlock (&pool->mutex );
314
276
}
0 commit comments