Monday, April 25, 2011

ThreadPool Best Practices, Correctness

Hey everyone. I've been working on some code for a while that would spawn threads when needed, but decided a simpler and more effective solution would be to create a thread pool. It's implemented with a queue I made that has conditional waits on queuing and dequeuing. The only reason I'm posting this, is because I'm getting some weird errors randomly throughout my code that never happened before the switch to the threadpool that go away when I added some debug-print statements. If my code starts working because of print statements, that sounds like some issue with memory and the stack being caused possibly by some bad threading code.

I figured the first place to look would be in the threadpool for correctness and thread-safety. Here are the 3 main functions. Threadstart being the function each thread sits in waiting on the dequeue, and the init function that spawns the threads. The last one is what queues up a work item. The q_enq function is what will signal the conditional variables that wake up the certain threads to then dequeue.

void *
threadstart(void *arg)
{
    threadpool_t * tp = (threadpool_t*)arg;

    while (1)
    {
     workitem_t *work = q_dq(tp->workqueue);

     if (work == NULL)
      break;

     (*work->action)(work->arg);
     free(work);
    }

    pthread_exit(NULL);
};

threadpool_t *
threadpool_init(int max_threads, int max_workload)
{
    threadpool_t *tp;
    pthread_attr_t attr;
    register int i=0;
    int rc =0;
    ASSERT(max_threads > 0 && max_workload > 0);

    tp = malloc(sizeof(threadpool_t));
    tp->max_threads = max_threads;
    tp->threads = calloc(max_threads, sizeof(pthread_t));
    tp->workqueue = q_init(max_workload);

    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
    pthread_attr_setschedpolicy(&attr, SCHED_RR);

    for (; i < max_threads; i++)
    {
     rc = pthread_create(&tp->threads[i], &attr, threadstart, tp);

     /* worry about errors creating threads later :( */
     if (rc) printf("Error creating threadpool thread %d [%d]\r\n", i, rc);
    }
    pthread_attr_destroy(&attr);

    return tp;
}
void
threadpool_q_workitem(threadpool_t *tp, action_f action, void *arg)
{
    workitem_t *item;
    ASSERT(tp != NULL);

    item = malloc(sizeof(workitem_t));
    item->action = action;
    item->arg = arg;
    q_enq(tp->workqueue, (void*)item);
};

EDIT: Queue Functions

void q_enq(struct queue *q, void *data) {
    struct timeval now;
    struct timespec timeout;

    pthread_mutex_lock(q->mut);

    while (q->full) {
     gettimeofday(&now, (struct timezone *)0);
     timeout.tv_sec = now.tv_sec + Q_TIMEOUT;
     timeout.tv_nsec = now.tv_usec * 1000;

     pthread_cond_timedwait(q->notfull, q->mut, &timeout);

    }
    q->buffer[q->tail++] = data;
    if (q->tail == q->num) {
     q->tail = 0;
    }
    if (q->head == q->tail) {
     q->full = 1;
    }
    q->empty = 0;

    pthread_mutex_unlock(q->mut);
    pthread_cond_signal(q->notempty);
}

void *q_dq(struct queue *q) {
    void *data;
    int rc;
    struct timeval now;
    struct timespec timeout;

    pthread_mutex_lock(q->mut);

    while (q->empty) {
     gettimeofday(&now, NULL);
     timeout.tv_sec = now.tv_sec + Q_TIMEOUT;
     timeout.tv_nsec = now.tv_usec * 1000;
     if (q->finished) {
      pthread_mutex_unlock(q->mut);
      return NULL;
     }

     rc = pthread_cond_timedwait(q->notempty, q->mut, &timeout);
     if (q->finished) {
      pthread_mutex_unlock(q->mut);
      return NULL;
     }
    }
    data = q->buffer[q->head++];
    if (q->head == q->num) {
     q->head = 0;
    }
    if (q->head == q->tail) {
     q->empty = 1;
    }
    q->full = 0;
    pthread_mutex_unlock(q->mut);
    pthread_cond_signal(q->notfull);

    return data;
}
From stackoverflow
  • I think you should do something like this:

    void q_enq(struct queue *q, void *data) {
     int next;
    
     // wait until there's room
     do{
      pthread_mutex_lock( q->mut );
    
      next = q->tail + 1;
      if( next == q->num) {
       next = 0;
      }
    
      // still room to add
      if( q->head != next )
       break;
    
      pthread_mutex_unlock(q->mut);
      sched_yield();
     } while( 1 );
    
     q->tail = next;
     q->buffer[ q->tail ] = data;
    
     // signal consumer and unlock mutex
     pthread_cond_signal(q->notempty);
     pthread_mutex_unlock(q->mut);
    }
    
    void *q_dq(struct queue *q) {
     void *data;
     int rc;
    
     pthread_mutex_lock(q->mut);
    
     // while empty wait
     while( q->tail == q->head ){
      pthread_cond_wait(q->notempty, q->mut );
     }
    
     // get next and wrap
     data = q->buffer[q->head++];
     if (q->head == q->num) {
      q->head = 0;
     }
    
     pthread_mutex_unlock(q->mut);
     return data;
    }
    
    Nicholas Mancuso : I'll try that out later today. I'm running some stuff and dont have time for a rebuild...
    sfossen : did you get a chance to try it?
    Nicholas Mancuso : not yet. we have some deadlines on some other stuff. I'll definitely let you know though.
    Nicholas Mancuso : i just realized i dont have the pthread_yield() subroutine!
    Nicholas Mancuso : I guess i could get away with sleep(0), but that feels kinda like a hack to me. Why couldn't I just use another conditional wait?
    sfossen : what OS are you using?
    sfossen : if it's macOSX, try sched_yield() from .
    Nicholas Mancuso : i'm using windows server 08
    sfossen : For windows it's the same (sched_yield)[http://sourceware.org/pthreads-win32/manual/sched_yield.html).
    sfossen : you could also use [pthread_delay_np](http://sourceware.org/pthreads-win32/manual/pthread_delay_np.html)
  • you're calling pthread_attr_destroy too early; don't call pthread_attr_destroy until its time to destroy the pool (i.e,. keep attr around for the lifetime of the thread(s))

    Nicholas Mancuso : really? most examples I've read show it being destroyed early. After its value has been passed along...

0 comments:

Post a Comment