1 /**************************************************************************
3 * Copyright 2013-2014 RAD Game Tools and Valve Software
4 * Copyright 2010-2014 Rich Geldreich and Tenacious Software LLC
7 * Permission is hereby granted, free of charge, to any person obtaining a copy
8 * of this software and associated documentation files (the "Software"), to deal
9 * in the Software without restriction, including without limitation the rights
10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 * copies of the Software, and to permit persons to whom the Software is
12 * furnished to do so, subject to the following conditions:
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25 **************************************************************************/
27 // File: vogl_threading_pthreads.cpp
28 #include "vogl_core.h"
29 #include "vogl_threading_pthreads.h"
30 #include "vogl_timer.h"
32 #if VOGL_USE_PTHREADS_API
35 #pragma comment(lib, "../ext/libpthread/lib/pthreadVC2.lib")
36 #include "vogl_winhdr.h"
40 #include <sys/sysinfo.h>
49 uint g_number_of_processors = 1;
51 void vogl_threading_init()
54 SYSTEM_INFO g_system_info;
55 GetSystemInfo(&g_system_info);
56 g_number_of_processors = math::maximum<uint>(1U, g_system_info.dwNumberOfProcessors);
57 #elif defined(__GNUC__)
58 g_number_of_processors = math::maximum<int>(1, get_nprocs());
60 g_number_of_processors = 1;
64 vogl_thread_id_t vogl_get_current_thread_id()
66 // FIXME: Not portable
67 return static_cast<vogl_thread_id_t>(pthread_self());
70 void vogl_sleep(unsigned int milliseconds)
73 struct timespec interval;
74 interval.tv_sec = milliseconds / 1000;
75 interval.tv_nsec = (milliseconds % 1000) * 1000000L;
76 pthread_delay_np(&interval);
80 int msecs_to_sleep = VOGL_MIN(milliseconds, 1000);
81 usleep(msecs_to_sleep * 1000);
82 milliseconds -= msecs_to_sleep;
87 mutex::mutex(unsigned int spin_count, bool recursive)
89 VOGL_NOTE_UNUSED(spin_count);
91 pthread_mutexattr_t mta;
93 int status = pthread_mutexattr_init(&mta);
96 dynamic_string msg(cVarArg, "%s: pthread_mutexattr_init() failed with status %i", VOGL_METHOD_NAME, status);
97 vogl_fail(msg.get_ptr(), __FILE__, __LINE__);
100 status = pthread_mutexattr_settype(&mta, recursive ? PTHREAD_MUTEX_RECURSIVE : PTHREAD_MUTEX_NORMAL);
103 dynamic_string msg(cVarArg, "%s: pthread_mutexattr_settype() failed with status %i", VOGL_METHOD_NAME, status);
104 vogl_fail(msg.get_ptr(), __FILE__, __LINE__);
107 status = pthread_mutex_init(&m_mutex, &mta);
110 dynamic_string msg(cVarArg, "%s: pthread_mutex_init() failed with status %i", VOGL_METHOD_NAME, status);
111 vogl_fail(msg.get_ptr(), __FILE__, __LINE__);
114 #ifdef VOGL_BUILD_DEBUG
121 #ifdef VOGL_BUILD_DEBUG
123 vogl_assert("mutex::~mutex: mutex is still locked", __FILE__, __LINE__);
125 if (pthread_mutex_destroy(&m_mutex))
126 vogl_assert("mutex::~mutex: pthread_mutex_destroy() failed", __FILE__, __LINE__);
131 pthread_mutex_lock(&m_mutex);
132 #ifdef VOGL_BUILD_DEBUG
139 #ifdef VOGL_BUILD_DEBUG
141 vogl_assert("mutex::unlock: mutex is not locked", __FILE__, __LINE__);
144 pthread_mutex_unlock(&m_mutex);
147 void mutex::set_spin_count(unsigned int count)
149 VOGL_NOTE_UNUSED(count);
152 semaphore::semaphore(uint32 initialCount, uint32 maximumCount, const char *pName)
154 VOGL_NOTE_UNUSED(maximumCount);
155 VOGL_NOTE_UNUSED(pName);
157 VOGL_ASSERT(maximumCount >= initialCount);
158 VOGL_ASSERT(maximumCount >= 1);
160 if (sem_init(&m_sem, 0, initialCount))
162 VOGL_FAIL("semaphore: sem_init() failed");
166 semaphore::~semaphore()
171 void semaphore::release(uint32 releaseCount)
173 VOGL_ASSERT(releaseCount >= 1);
177 if (1 == releaseCount)
178 status = sem_post(&m_sem);
180 status = sem_post_multiple(&m_sem, releaseCount);
182 while (releaseCount > 0)
184 status = sem_post(&m_sem);
193 VOGL_FAIL("semaphore: sem_post() or sem_post_multiple() failed");
197 void semaphore::try_release(uint32 releaseCount)
199 VOGL_ASSERT(releaseCount >= 1);
202 if (1 == releaseCount)
205 sem_post_multiple(&m_sem, releaseCount);
207 while (releaseCount > 0)
215 bool semaphore::wait(uint32 milliseconds)
218 if (milliseconds == cUINT32_MAX)
220 status = sem_wait(&m_sem);
224 struct timespec interval;
225 interval.tv_sec = milliseconds / 1000;
226 interval.tv_nsec = (milliseconds % 1000) * 1000000L;
227 status = sem_timedwait(&m_sem, &interval);
232 if (errno != ETIMEDOUT)
234 VOGL_FAIL("semaphore: sem_wait() or sem_timedwait() failed");
244 #ifdef VOGL_BUILD_DEBUG
248 if (pthread_spin_init(&m_spinlock, 0))
250 VOGL_FAIL("spinlock: pthread_spin_init() failed");
254 spinlock::~spinlock()
256 #ifdef VOGL_BUILD_DEBUG
257 VOGL_ASSERT(!m_in_lock);
260 pthread_spin_destroy(&m_spinlock);
263 void spinlock::lock()
265 if (pthread_spin_lock(&m_spinlock))
267 VOGL_FAIL("spinlock: pthread_spin_lock() failed");
270 #ifdef VOGL_BUILD_DEBUG
271 VOGL_ASSERT(!m_in_lock);
276 void spinlock::unlock()
278 #ifdef VOGL_BUILD_DEBUG
279 VOGL_ASSERT(m_in_lock);
283 if (pthread_spin_unlock(&m_spinlock))
285 VOGL_FAIL("spinlock: pthread_spin_unlock() failed");
289 task_pool::task_pool()
291 m_tasks_available(0, 32767),
292 m_all_tasks_completed(0, 1),
293 m_total_submitted_tasks(0),
294 m_total_completed_tasks(0),
297 utils::zero_object(m_threads);
300 task_pool::task_pool(uint num_threads)
302 m_tasks_available(0, 32767),
303 m_all_tasks_completed(0, 1),
304 m_total_submitted_tasks(0),
305 m_total_completed_tasks(0),
308 utils::zero_object(m_threads);
310 bool status = init(num_threads);
314 task_pool::~task_pool()
319 bool task_pool::init(uint num_threads)
321 VOGL_ASSERT(num_threads <= cMaxThreads);
322 num_threads = math::minimum<uint>(num_threads, cMaxThreads);
326 bool succeeded = true;
329 while (m_num_threads < num_threads)
331 int status = pthread_create(&m_threads[m_num_threads], NULL, thread_func, this);
350 void task_pool::deinit()
356 atomic_exchange32(&m_exit_flag, true);
358 m_tasks_available.release(m_num_threads);
360 for (uint i = 0; i < m_num_threads; i++)
361 pthread_join(m_threads[i], NULL);
365 atomic_exchange32(&m_exit_flag, false);
368 m_task_stack.clear();
369 m_total_submitted_tasks = 0;
370 m_total_completed_tasks = 0;
373 bool task_pool::queue_task(task_callback_func pFunc, uint64_t data, void *pData_ptr)
378 tsk.m_callback = pFunc;
380 tsk.m_pData_ptr = pData_ptr;
383 atomic_increment32(&m_total_submitted_tasks);
384 if (!m_task_stack.try_push(tsk))
386 atomic_increment32(&m_total_completed_tasks);
390 m_tasks_available.release(1);
395 // It's the object's responsibility to delete pObj within the execute_task() method, if needed!
396 bool task_pool::queue_task(executable_task *pObj, uint64_t data, void *pData_ptr)
403 tsk.m_pData_ptr = pData_ptr;
404 tsk.m_flags = cTaskFlagObject;
406 atomic_increment32(&m_total_submitted_tasks);
407 if (!m_task_stack.try_push(tsk))
409 atomic_increment32(&m_total_completed_tasks);
413 m_tasks_available.release(1);
418 void task_pool::process_task(task &tsk)
420 if (tsk.m_flags & cTaskFlagObject)
421 tsk.m_pObj->execute_task(tsk.m_data, tsk.m_pData_ptr);
423 tsk.m_callback(tsk.m_data, tsk.m_pData_ptr);
425 if (atomic_increment32(&m_total_completed_tasks) == m_total_submitted_tasks)
427 // Try to signal the semaphore (the max count is 1 so this may actually fail).
428 m_all_tasks_completed.try_release();
432 void task_pool::join()
434 // Try to steal any outstanding tasks. This could cause one or more worker threads to wake up and immediately go back to sleep, which is wasteful but should be harmless.
436 while (m_task_stack.pop(tsk))
439 // At this point the task stack is empty.
440 // Now wait for all concurrent tasks to complete. The m_all_tasks_completed semaphore has a max count of 1, so it's possible it could have saturated to 1 as the tasks
441 // where issued and asynchronously completed, so this loop may iterate a few times.
442 const int total_submitted_tasks = static_cast<int>(atomic_add32(&m_total_submitted_tasks, 0));
443 while (m_total_completed_tasks != total_submitted_tasks)
445 // If the previous (m_total_completed_tasks != total_submitted_tasks) check failed the semaphore MUST be eventually signalled once the last task completes.
446 // So I think this can actually be an INFINITE delay, but it shouldn't really matter if it's 1ms.
447 m_all_tasks_completed.wait(1);
451 void *task_pool::thread_func(void *pContext)
453 task_pool *pPool = static_cast<task_pool *>(pContext);
458 if (!pPool->m_tasks_available.wait())
461 if (pPool->m_exit_flag)
464 if (pPool->m_task_stack.pop(tsk))
466 pPool->process_task(tsk);
475 #endif // VOGL_USE_PTHREADS_API