std.parallelism: How to wait all tasks finished?

Andrea Fontana nospam at example.com
Thu Feb 6 08:19:37 PST 2014


On Thursday, 6 February 2014 at 16:07:51 UTC, Andrea Fontana 
wrote:
> On Thursday, 6 February 2014 at 14:52:36 UTC, Cooler wrote:
>> On Thursday, 6 February 2014 at 14:42:57 UTC, Cooler wrote:
>>> On Thursday, 6 February 2014 at 11:30:17 UTC, Andrea Fontana 
>>> wrote:
>>>> On Wednesday, 5 February 2014 at 15:38:14 UTC, Cooler wrote:
>>>>> On Tuesday, 4 February 2014 at 03:26:04 UTC, Dan Killebrew 
>>>>> wrote:
>>>>>>>> It seems to me that worker threads will continue as long 
>>>>>>>> as the queue isn't empty. So if a task adds another task 
>>>>>>>> to the pool, some worker will process the newly enqueued 
>>>>>>>> task.
>>>>>>>
>>>>>>> No. After taskPool.finish() no way to add new tasks to 
>>>>>>> the queue. taskPool.put will not add new tasks.
>>>>>>
>>>>>> Then perhaps you need to create a new TaskPool (and make 
>>>>>> sure that workers add their tasks to the correct task 
>>>>>> pool), so that you can wait on the first task pool, then 
>>>>>> wait on the second task pool, etc.
>>>>>>
>>>>>> auto phase1 = new TaskPool();
>>>>>> //make sure all new tasks are added to phase1
>>>>>> phase1.finish(true);
>>>>>>
>>>>>> auto phase2 = new TaskPool();
>>>>>> //make sure all new tasks are added to phase2
>>>>>> phase2.finish(true);
>>>>>
>>>>> Will not help. I don't know beforehand what tasks will be
>>>>> created. procData is recursive and it decides create new 
>>>>> task or
>>>>> not.
>>>>
>>>>
>>>> Something like this? (not tested...)
>>>>
>>>> shared bool more = true;
>>>> ...
>>>> ...
>>>> ...
>>>>
>>>> void procData(){
>>>> if(...)
>>>> {
>>>>  taskPool.put(task(&procData));
>>>>  more = true;
>>>> }
>>>> }
>>>>
>>>> while(true)
>>>> {
>>>> taskPool.finish(true);
>>>> if (!more) break;
>>>> else more = false;
>>>> }
>>>
>>> It is closer, but after taskPool.finish() all tries to 
>>> taskPool.put() will be rejected. Let's me clear example.
>>>
>>> import std.stdio, std.parallelism, core.thread;
>>>
>>> shared int i;
>>>
>>> void procData(){
>>> synchronized ++i;
>>> if(i >= 100)
>>>   return;
>>> foreach(i; 0 .. 100)
>>>   taskPool.put(task(&procData)); // New tasks will be 
>>> rejected after
>>>                                  // taskPool.finish()
>>> }
>>>
>>> void main(){
>>> taskPool.put(task(&procData));
>>> Thread.sleep(1.msecs); // The final output of "i" depends on 
>>> duration here
>>> taskPool.finish(true);
>>> writefln("i = %s", i);
>>> }
>>>
>>> In the example above the total number of tasks executed 
>>> depends on sleep duration.
>>
>> Forgot to say - I know how to solve the topic problem. My
>> question is "What is the BEST way?".
>> One of my idea - may be introduce new function, named for 
>> example
>> "wait", that will block until there are working tasks?
>
> What about sync ++taskCount when you put() something and 
> --taskCount when task is done? And on main while(i > 0) 
> Thread.yield(); ?

Something like this:

import std.stdio, std.parallelism, core.thread;
import std.random;

shared size_t taskCount;
shared size_t i;

void procData()
in  { synchronized ++i; }
out { synchronized --taskCount; }
body
{
     if (i > 100)
         return;

     foreach(i; 0 .. 100)
     {
         taskPool.put(task(&procData));
         synchronized ++taskCount;
     }

}

void main(){

     taskCount = 2;
     taskPool.put(task(&procData));
     taskPool.put(task(&procData));

     while(taskCount > 0)
         Thread.yield();
}


More information about the Digitalmars-d-learn mailing list