std.parallelism: How to wait all tasks finished?

Cooler kulkin at hotbox.ru
Thu Feb 6 10:33:51 PST 2014


On Thursday, 6 February 2014 at 16:19:38 UTC, Andrea Fontana
wrote:
> On Thursday, 6 February 2014 at 16:07:51 UTC, Andrea Fontana 
> wrote:
>> On Thursday, 6 February 2014 at 14:52:36 UTC, Cooler wrote:
>>> On Thursday, 6 February 2014 at 14:42:57 UTC, Cooler wrote:
>>>> On Thursday, 6 February 2014 at 11:30:17 UTC, Andrea Fontana 
>>>> wrote:
>>>>> On Wednesday, 5 February 2014 at 15:38:14 UTC, Cooler wrote:
>>>>>> On Tuesday, 4 February 2014 at 03:26:04 UTC, Dan Killebrew 
>>>>>> wrote:
>>>>>>>>> It seems to me that worker threads will continue as 
>>>>>>>>> long as the queue isn't empty. So if a task adds 
>>>>>>>>> another task to the pool, some worker will process the 
>>>>>>>>> newly enqueued task.
>>>>>>>>
>>>>>>>> No. After taskPool.finish() no way to add new tasks to 
>>>>>>>> the queue. taskPool.put will not add new tasks.
>>>>>>>
>>>>>>> Then perhaps you need to create a new TaskPool (and make 
>>>>>>> sure that workers add their tasks to the correct task 
>>>>>>> pool), so that you can wait on the first task pool, then 
>>>>>>> wait on the second task pool, etc.
>>>>>>>
>>>>>>> auto phase1 = new TaskPool();
>>>>>>> //make sure all new tasks are added to phase1
>>>>>>> phase1.finish(true);
>>>>>>>
>>>>>>> auto phase2 = new TaskPool();
>>>>>>> //make sure all new tasks are added to phase2
>>>>>>> phase2.finish(true);
>>>>>>
>>>>>> Will not help. I don't know beforehand what tasks will be
>>>>>> created. procData is recursive and it decides create new 
>>>>>> task or
>>>>>> not.
>>>>>
>>>>>
>>>>> Something like this? (not tested...)
>>>>>
>>>>> shared bool more = true;
>>>>> ...
>>>>> ...
>>>>> ...
>>>>>
>>>>> void procData(){
>>>>> if(...)
>>>>> {
>>>>> taskPool.put(task(&procData));
>>>>> more = true;
>>>>> }
>>>>> }
>>>>>
>>>>> while(true)
>>>>> {
>>>>> taskPool.finish(true);
>>>>> if (!more) break;
>>>>> else more = false;
>>>>> }
>>>>
>>>> It is closer, but after taskPool.finish() all tries to 
>>>> taskPool.put() will be rejected. Let's me clear example.
>>>>
>>>> import std.stdio, std.parallelism, core.thread;
>>>>
>>>> shared int i;
>>>>
>>>> void procData(){
>>>> synchronized ++i;
>>>> if(i >= 100)
>>>>  return;
>>>> foreach(i; 0 .. 100)
>>>>  taskPool.put(task(&procData)); // New tasks will be 
>>>> rejected after
>>>>                                 // taskPool.finish()
>>>> }
>>>>
>>>> void main(){
>>>> taskPool.put(task(&procData));
>>>> Thread.sleep(1.msecs); // The final output of "i" depends on 
>>>> duration here
>>>> taskPool.finish(true);
>>>> writefln("i = %s", i);
>>>> }
>>>>
>>>> In the example above the total number of tasks executed 
>>>> depends on sleep duration.
>>>
>>> Forgot to say - I know how to solve the topic problem. My
>>> question is "What is the BEST way?".
>>> One of my idea - may be introduce new function, named for 
>>> example
>>> "wait", that will block until there are working tasks?
>>
>> What about sync ++taskCount when you put() something and 
>> --taskCount when task is done? And on main while(i > 0) 
>> Thread.yield(); ?
>
> Something like this:
>
> import std.stdio, std.parallelism, core.thread;
> import std.random;
>
> shared size_t taskCount;
> shared size_t i;
>
> void procData()
> in  { synchronized ++i; }
> out { synchronized --taskCount; }
> body
> {
>     if (i > 100)
>         return;
>
>     foreach(i; 0 .. 100)
>     {
>         taskPool.put(task(&procData));
>         synchronized ++taskCount;
>     }
>
> }
>
> void main(){
>
>     taskCount = 2;
>     taskPool.put(task(&procData));
>     taskPool.put(task(&procData));
>
>     while(taskCount > 0)
>         Thread.yield();
> }

Now I do almost the same in my program, but instead of while(...)
Thread.yield() I wait on semaphore to be notified. And in threads
when --taskCount reaches 0, i do Semaphore.notify(). But all of
this don't look beautiful enough. As I mention above - may be
introduce new function, named for example "wait", that will block
TaskPool until there are working tasks? If such situation
encounters frequently, may be it is worth to add Phobos some
functionality?


More information about the Digitalmars-d-learn mailing list