Condition variables?

Graham St Jack grahams at acres.com.au
Mon Oct 1 17:26:58 PDT 2007


I have tried to get conditions into the language before, without success 
(or even much interest from others). I find Conditions absolutely 
essential in multi-threaded applications, and can't understand why they 
are missing.

The Condition class in Tango isn't quite there yet - it needs to be 
something like the code below, which I am using in my own project (only 
works for Linux so far). Note the use of an Object's monitor's mutex, 
and the reasonably easy syntax for using it. IMO something along these 
lines belongs in both Phobos and Tango.

You use it like this (incomplete pseudo-code):

class ProtectedQueue(T) {
     Condition ready;
     int count;

     this() {
         ready = new Condition(this);
     }

     synchronized void add(T item) {
         add_item_to_queue;
         ++count;
         ready(count > 0);
     }

     synchronized T remove() {
         ready.wait;       // wait until count is > 0
         --count;
         ready(count > 0);
         return remove_item_from_queue;
     }
}





//---------------------------------------------------------------
// A condition
// Allows a thread to wait for the condition to be true.
//---------------------------------------------------------------

// stuff to get access to an Object's Monitor's mutex
// Note - the mutex is created on-demand, so we have to defer
// accessing it until the first time we use it.
// Modeled on monitor.c in Phobos and Tango (they are different)

private {
     version(Tango) {
         struct Monitor {
             void * impl;
             posix.pthread_mutex_t mutex;
         }
     }
     else {
         struct Monitor {
             size_t len;
             void * ptr;
             posix.pthread_mutex_t mutex;
         }
     }

     Monitor* getMonitor(Object obj) {
         return cast(Monitor*) (cast(void**) obj)[1];
     }

     posix.pthread_mutex_t * getMutex(Object obj) {
         return &getMonitor(obj).mutex;
     }
}


class Condition {
     alias value opCall;

     Object object;
     bool satisfied;
     posix.pthread_cond_t condition;
     posix.pthread_mutex_t * mutex;

     // construct a Condition which object's monitor's mutex
     this(Object object) {
         this.object = object;
         satisfied = false;
         posix.pthread_condattr_t attributes;
         posix.pthread_condattr_init(&attributes);
         int rc = posix.pthread_cond_init(&condition, &attributes);
         assert(!rc);
         posix.pthread_condattr_destroy(&attributes);
     }

     // destroy a condition
     ~this() {
         int rc = posix.pthread_cond_destroy(&condition);
         assert(!rc);
     }

     // wait until the condition is satisfied.
     // Must only be called from a synchronized method
     // in the object specified in the constructor of this Condition.
     void wait() {
         // FIXME - need an assertion that we have locked the mutex
         if (!mutex) {
             mutex = getMutex(object);
         }
         while (!satisfied) {
             int rc = posix.pthread_cond_wait(&condition, mutex);
             assert(!rc);
         }
     }

     // wait for timeout seconds, returning true if succeeded,
     // false if timed out.
     bool wait(double timeout) {
         // FIXME - need an assertion that we have locked the mutex
         if (!mutex) {
             mutex = getMutex(object);
         }
         posix.timespec t;
         long nsecs = cast(long) (timeout * 1000000000L);
         t.tv_sec   = cast(int)  (nsecs   / 1000000000L);
         t.tv_nsec  = cast(int)  (nsecs   % 1000000000L);
         while (!satisfied) {
             int rc = posix.pthread_cond_timedwait(
                     &condition, mutex, &t);
             if (rc != 0) {
                 return false;
             }
         }
         return true;
     }

     // set the condition to val, notifying a waiting thread if true.
     // Should only be called from a synchronized method in the object
     // provided in this condition's constructor.
     void value(bool val) {
         // FIXME - need an assertion that we have locked the mutex
         if (val != satisfied) {
             satisfied = val;
             if (satisfied) {
                 posix.pthread_cond_signal(&condition);
             }
         }
     }

     // getter for the value of the condition.
     bool value() {
         // FIXME - need an assertion that we have locked the mutex
         return satisfied;
     }
}



David Brown wrote:
> Hopefully I'm missing something obvious here, but D and phobos seem to
> be missing any kind of condition variables.  It's really hard to do
> non-trivial thread programming without this kind of synchronization.
> 
> In fact, I'm not sure how I could even go about implementing
> something, since there doesn't seem to be any way of easily accessing
> the object's monitor, which would be needed to do condition variables
> that work with 'synchronized'.
> 
> I can think of other ways of doing synchronization, but not in a
> terribly efficient way:
> 
>   - Use Thread's pause() and resume().  I would have to implement wait
>     queues and getting synchronization right on this would be
>     challenging.
> 
>   - Use another OS mechanism such as pipes to sleep and wakeup.  This
>     is also not very efficient.
> 
> I'm just kind of wondering why std.thread even exists without
> condition variables, since it really isn't useful for all that much,
> by itself, and doesn't seem to even have the needed hooks to implement
> any other mechanism.
> 
> David Brown




More information about the Digitalmars-d mailing list