QUARK  0.9.0
QUARK-QUeuingAndRuntimeforKernels
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups
quark.c
Go to the documentation of this file.
1 /* **************************************************************************** */
18 /* Define a group for Doxygen documentation */
26 /* Define a group for Doxygen documentation */
42 /* **************************************************************************** */
43 /*
44  Summary of environment flags:
45 
46  Change the window size (default should be checked in the code)
47  export QUARK_UNROLL_TASKS_PER_THREAD=num
48 
49  Enable WAR avoidance (false dependency handling) (default=0 off)
50  export QUARK_WAR_DEPENDENCIES_ENABLE=1
51 
52  Enable DAG generation (default=0 off)
53  export QUARK_DOT_DAG_ENABLE=1
54 */
55 /* **************************************************************************** */
56 
57 #include <stdio.h>
58 #include <stdlib.h>
59 #include <stdarg.h>
60 #include <string.h>
61 #include <limits.h>
62 #include <errno.h>
63 
64 #ifndef inline
65 #define inline __inline
66 #endif
67 
68 #if defined( _WIN32 ) || defined( _WIN64 )
69 # define fopen(ppfile, name, mode) fopen_s(ppfile, name, mode)
70 # define strdup _strdup
71 # include "quarkwinthread.h"
72 #else
73 # define fopen(ppfile, name, mode) *ppfile = fopen(name, mode)
74 # include <pthread.h>
75 #endif
76 
77 #ifdef TRUE
78 #undef TRUE
79 #endif
80 
81 #ifdef FALSE
82 #undef FALSE
83 #endif
84 
85 #include "icl_list.h"
86 #include "icl_hash.h"
87 #include "bsd_queue.h"
88 #include "bsd_tree.h"
89 #include "quark.h"
90 #include "quark_unpack_args.h"
91 #include "quark_trace.h"
92 
93 #ifdef DBGQUARK
94 #include <time.h>
95 #include <sys/time.h>
96 #endif /* DBGQUARK */
97 
98 #ifndef ULLONG_MAX
99 # define ULLONG_MAX 18446744073709551615ULL
100 #endif
101 
103 typedef enum { FALSE, TRUE } bool;
105 
106 struct quark_s {
110  int num_threads; /* number of threads */
111  struct worker_s **worker; /* array of workers [num_threads] */
112  int *coresbind; /* array of indices where to bind workers [num_threads] */
113  /* volatile */int list_robin; /* round-robin list insertion index */
114  volatile bool start; /* start flag */
115  volatile bool all_tasks_queued; /* flag */
116  volatile long long num_tasks; /* number of tasks queued */
119  icl_hash_t *address_set; /* hash table of addresses */
120  pthread_mutex_t address_set_mutex; /* hash table access mutex */
121  pthread_attr_t thread_attr; /* threads' attributes */
122  volatile int num_queued_tasks;
129 #define tasklevel_width_max_level 5000
133  struct completed_tasks_head_s *completed_tasks;
134  volatile int completed_tasks_size;
135 };
136 
138  volatile int status;
140  struct ll_list_head_s *tasks_in_sequence;
141 };
142 
143 typedef struct worker_s {
146  int rank;
147  struct task_priority_tree_head_s *ready_list;
148  volatile int ready_list_size;
151  volatile int finalize; /* termination flag */
152  volatile int executing_task;
155  int status;
156 } Worker;
157 
158 typedef struct quark_task_s {
160  void (*function) (Quark *); /* task function pointer */
161  volatile task_status status; /* Status of task; NOTREADY, READY; QUEUED; DONE */
162  volatile int num_dependencies_remaining; /* number of dependencies remaining to be fulfilled */
163  icl_list_t *args_list; /* list of arguments (copies of scalar values and pointers) */
164  icl_list_t *dependency_list; /* list of dependencies */
165  icl_list_t *scratch_list; /* List of scratch space information and their sizes */
166  volatile struct dependency_s *locality_preserving_dep; /* Try to run task on core that preserves the locality of this dependency */
167  unsigned long long taskid; /* An identifier, used only for generating DAGs */
168  unsigned long long tasklevel; /* An identifier, used only for generating DAGs */
170  unsigned char *lock_to_thread_mask;
171  char *task_label; /* Label for this task, used in dot_dag generation */
172  char *task_color; /* Color for this task, used in dot_dag generation */
173  int priority; /* Is this a high priority task */
175  struct ll_list_node_s *ptr_to_task_in_sequence; /* convenience pointer to this task in the sequence */
176  int task_thread_count; /* Num of threads required by task */
177  int task_thread_count_outstanding; /* Num of threads required by task */
178  int thread_set_to_manual_scheduling; /* enable or disable work stealing in the thread that runs this task */
179  volatile int threadid; /* Index of the thread calling the function GetRankInTask in parallel tasks */
180  int executed_on_threadid; /* Track which thread executes this task */
181 } Task;
182 
183 typedef struct dependency_s {
184  struct quark_task_s *task; /* pointer to parent task containing this dependency */
185  void *address; /* address of data */
186  int size; /* Size of dependency data */
187  quark_direction_t direction; /* direction of this dependency, INPUT, INOUT, OUTPUT */
188  bool locality; /* Priority of this dependency; more like data locality */
189  bool accumulator; /* Tasks depending on this may be reordered, they accumulate results */
190  int data_region; /* Different regions may be specified for dependencies; uses bitmask of 8 bits */
191  bool gatherv; /* Tasks depending on this may be run in parallel, assured by the programmer */
192  struct address_set_node_s *address_set_node_ptr; /* convenience pointer to address_set_node */
193  icl_list_t *address_set_waiting_deps_node_ptr; /* convenience pointer to address_set_node waiting_deps node */
194  icl_list_t *task_args_list_node_ptr; /* convenience ptr to the task->args_list [node] to use for WAR address updates */
195  icl_list_t *task_dependency_list_node_ptr; /* convenience ptr to the task->dependency_list [node] */
196  /* volatile */ bool ready; /* Data dependency is ready */
197 } Dependency;
198 
199 typedef struct scratch_s {
200  void *ptr; /* address of scratch space */
201  int size; /* Size of scratch data */
202  icl_list_t *task_args_list_node_ptr; /* convenience ptr to the task->args_list [node] */
203 } Scratch;
204 
205 typedef struct address_set_node_s {
206  void *address; /* copy of key to the address_set - pointer to the data */
207  int size; /* data object size */
208  /* volatile */ int last_thread; /* last thread to use this data - for scheduling/locality */
209  icl_list_t *waiting_deps; /* list of dependencies waiting for this data */
210  /* volatile */ bool delete_data_at_address_when_node_is_deleted; /* used when data is copied in order to handle false dependencies */
211  unsigned long long last_writer_taskid; /* used for generating DOT DAGs */
212  unsigned long long last_writer_tasklevel; /* used for tracking critical depth */
213  unsigned long long last_reader_or_writer_taskid; /* used for generating DOT DAGs */
214  unsigned long long last_reader_or_writer_tasklevel; /* used for tracking critical depth */
217 
218 /* Data structure for a list containing long long int values. Used to
219  * track task ids in sequences of tasks, so that the tasks in a
220  * sequence can be controlled */
221 typedef struct ll_list_node_s {
222  long long int val;
223  LIST_ENTRY( ll_list_node_s ) ll_entries;
225 LIST_HEAD(ll_list_head_s, ll_list_node_s);
226 typedef struct ll_list_head_s ll_list_head_t;
227 
228 typedef struct completed_tasks_node_s {
230  int workerid;
231  TAILQ_ENTRY( completed_tasks_node_s ) ctn_entries;
233 TAILQ_HEAD( completed_tasks_head_s, completed_tasks_node_s );
234 typedef struct completed_tasks_head_s completed_tasks_head_t;
235 
236 /* Tree (red-black) structure for keeping a priority list of
237  * executable tasks */
239  int priority;
241  RB_ENTRY( task_priority_tree_node_s ) n_entry;
243 RB_HEAD( task_priority_tree_head_s, task_priority_tree_node_s );
244 typedef struct task_priority_tree_head_s task_priority_tree_head_t;
245 static int compare_task_priority_tree_nodes( task_priority_tree_node_t *n1, task_priority_tree_node_t *n2 )
246 {
247  int diff = n2->priority - n1->priority;
248  return diff;
249 }
250 /* Generate red-black tree functions */
251 RB_GENERATE( task_priority_tree_head_s, task_priority_tree_node_s, n_entry, compare_task_priority_tree_nodes );
252 
253 
254 /* **************************************************************************** */
259 static Task *quark_task_new();
260 static void *quark_task_delete( Quark *quark, Task *task);
261 static Worker *quark_worker_new(Quark *quark, int rank);
262 static void quark_worker_delete(Worker *worker);
263 static inline int quark_worker_find_next_assignable( Quark *quark );
264 static void quark_insert_task_dependencies(Quark * quark, Task * task);
265 static void quark_check_and_queue_ready_task( Quark *quark, Task *task, int worker_rank );
266 static void quark_work_set_affinity_and_call_main_loop(Worker *worker);
267 static long long quark_work_main_loop(Worker *worker);
268 static Scratch *quark_scratch_new( void *arg_ptr, int arg_size, icl_list_t *task_args_list_node_ptr);
269 static void quark_scratch_allocate( Task *task );
270 static void quark_scratch_deallocate( Task *task );
271 static void quark_worker_remove_completed_task_enqueue_for_later_processing(Quark *quark, Task *task, int worker_rank);
272 static void quark_remove_completed_task_and_check_for_ready(Quark *quark, Task *task, int worker_rank);
273 static void quark_process_completed_tasks(Quark *quark);
274 static void quark_address_set_node_free( void* data );
275 static inline void quark_fatal_error(const char *func_name, char* msg_text);
276 
277 /* External functions, mostly implemented in quarkos.c */
278 int quark_setaffinity(int rank);
279 void quark_topology_init();
282 int *quark_get_affthreads();
283 int quark_yield();
284 int quark_getenv_int(char* name, int defval);
285 
286 /* **************************************************************************** */
291 inline static int pthread_mutex_lock_address_set(pthread_mutex_t *mtx) { int rv; if ((rv=pthread_mutex_lock( mtx ))!=0) { quark_fatal_error("pthread_mutex_lock_address_set", strerror(rv)); } return rv; }
292 inline static int pthread_mutex_trylock_address_set(pthread_mutex_t *mtx) { int rv; rv=pthread_mutex_trylock( mtx ); return rv; }
293 inline static int pthread_mutex_unlock_address_set(pthread_mutex_t *mtx) { int rv; if ((rv=pthread_mutex_unlock( mtx ))!=0) { quark_fatal_error("pthread_mutex_unlock_address_set", strerror(rv)); } return rv; }
294 
295 inline static int pthread_mutex_lock_ready_list(pthread_mutex_t *mtx) { int rv; if ((rv=pthread_mutex_lock( mtx ))!=0) { quark_fatal_error("pthread_mutex_lock_ready_list", strerror(rv)); } return rv; }
296 inline static int pthread_mutex_trylock_ready_list(pthread_mutex_t *mtx) { int rv; rv=pthread_mutex_trylock( mtx ); return rv; }
297 inline static int pthread_mutex_unlock_ready_list(pthread_mutex_t *mtx) { int rv; if ((rv=pthread_mutex_unlock( mtx ))!=0) { quark_fatal_error("pthread_mutex_unlock_ready_list", strerror(rv)); } return rv; }
298 
299 inline static int pthread_mutex_lock_task(pthread_mutex_t *mtx) { int rv; if ((rv=pthread_mutex_lock( mtx ))!=0) { quark_fatal_error("pthread_mutex_lock_task", strerror(rv)); } return rv; }
300 inline static int pthread_mutex_unlock_task(pthread_mutex_t *mtx) { int rv; if ((rv=pthread_mutex_unlock( mtx ))!=0) { quark_fatal_error("pthread_mutex_unlock_task", strerror(rv)); } return rv; }
301 
302 inline static int pthread_mutex_lock_atomic_add(pthread_mutex_t *mtx) { int rv; if ((rv=pthread_mutex_lock( mtx ))!=0) { quark_fatal_error("pthread_mutex_lock_atomic_add", strerror(rv)); } return rv; }
303 inline static int pthread_mutex_lock_atomic_set(pthread_mutex_t *mtx) { int rv; if ((rv=pthread_mutex_lock( mtx ))!=0) { quark_fatal_error("pthread_mutex_lock_atomic_set", strerror(rv)); } return rv; }
304 inline static int pthread_mutex_lock_atomic_get(pthread_mutex_t *mtx) { int rv; if ((rv=pthread_mutex_lock( mtx ))!=0) { quark_fatal_error("pthread_mutex_lock_atomic_get", strerror(rv)); } return rv; }
305 inline static int pthread_mutex_unlock_atomic(pthread_mutex_t *mtx) { int rv; if ((rv=pthread_mutex_unlock( mtx ))!=0) { quark_fatal_error("pthread_mutex_unlock_atomic", strerror(rv)); } return rv; }
306 
307 inline static int pthread_mutex_lock_wrap(pthread_mutex_t *mtx) { int rv; if ((rv=pthread_mutex_lock( mtx ))!=0) { quark_fatal_error("pthread_mutex_lock_wrap", strerror(rv)); } return rv; }
308 inline static int pthread_mutex_unlock_wrap(pthread_mutex_t *mtx) { int rv; if ((rv=pthread_mutex_unlock( mtx ))!=0) { quark_fatal_error("pthread_mutex_unlock_wrap", strerror(rv)); } return rv; }
309 
310 inline static int pthread_mutex_lock_completed_tasks(pthread_mutex_t *mtx) { int rv; if ((rv=pthread_mutex_lock( mtx ))!=0) { quark_fatal_error("pthread_mutex_lock_completed_tasks", strerror(rv)); } return rv; }
311 inline static int pthread_mutex_trylock_completed_tasks(pthread_mutex_t *mtx) { int rv; rv=pthread_mutex_trylock( mtx ); return rv; }
312 inline static int pthread_mutex_unlock_completed_tasks(pthread_mutex_t *mtx) { int rv; if ((rv=pthread_mutex_unlock( mtx ))!=0) { quark_fatal_error("pthread_mutex_unlock_completed_tasks", strerror(rv)); } return rv; }
313 inline static int pthread_cond_wait_ready_list( pthread_cond_t *cond, pthread_mutex_t *mtx ) { int rv; if ((rv=pthread_cond_wait( cond, mtx))!=0) { quark_fatal_error("pthread_cond_wait_ready_list", strerror(rv)); } return rv; }
314 inline static int pthread_cond_wait_wrap( pthread_cond_t *cond, pthread_mutex_t *mtx ) { int rv; if ((rv=pthread_cond_wait( cond, mtx))!=0) { quark_fatal_error("pthread_cond_wait_wrap", strerror(rv)); } return rv; }
315 
316 
317 /* **************************************************************************** */
318 
319 /* If dags are to be generated, setup file name and pointer and
320  * various macros. This assumes that the fprintf function is thread
321  * safe. */
322 static char *quark_task_default_label = " ";
323 static char *quark_task_default_color = "white";
324 #define DEPCOLOR "black"
325 #define DEPCOLOR_R_FIRST "black"
326 #define DEPCOLOR_W_FIRST "black"
327 #define DEPCOLOR_RAR "black"
328 #define DEPCOLOR_WAW "black"
329 #define DEPCOLOR_RAW "black"
330 #define DEPCOLOR_WAR "red"
331 #define DEPCOLOR_GATHERV "green"
332 #define DOT_DAG_FILENAME "dot_dag_file.dot"
333 FILE *dot_dag_file = NULL;
334 #define dot_dag_print_edge( quark, parentid, parent_level, childid, child_level, color) \
335  if ( quark->dot_dag_enable ) { \
336  pthread_mutex_lock_wrap( &quark->dot_dag_mutex ); \
337  if ( parentid>0 ) fprintf(dot_dag_file, "t%lld->t%lld [color=\"%s\"];\n", parentid, childid, color); \
338  fflush(dot_dag_file); \
339  child_level = (parent_level+1 <= child_level ? child_level : parent_level+1 ); \
340  pthread_mutex_unlock_wrap( &quark->dot_dag_mutex ); \
341  }
342 
343 /* **************************************************************************** */
348 #define quark_atomic_add( pval, addvalue, pmutex ) { \
349  pthread_mutex_lock_atomic_add(pmutex); pval += addvalue; pthread_mutex_unlock_atomic(pmutex); \
350  /* pval += addvalue; */ \
351  }
352 
353 
354 /* **************************************************************************** */
359 #define quark_atomic_set( pval, setvalue, pmutex ) { \
360  pthread_mutex_lock_atomic_set(pmutex); pval = setvalue; pthread_mutex_unlock_atomic(pmutex); \
361  /* pval = setvalue; */ \
362  }
363 
364 /* **************************************************************************** */
373 #define quark_atomic_get( retval, pval, pmutex ) { \
374  /* pthread_mutex_lock_atomic_get(pmutex); retval = pval; pthread_mutex_unlock_atomic(pmutex); */\
375  retval = pval; \
376  }
377 
378 
379 /***************************************************************************/
388 static void quark_fatal_error(const char *func_name, char* msg_text)
389 {
390  fprintf(stderr, "QUARK_FATAL_ERROR: %s(): %s\n", func_name, msg_text);
391  abort();
392  exit(0);
393 }
394 
395 /***************************************************************************/
404 void quark_warning(const char *func_name, char* msg_text)
405 {
406  fprintf(stderr, "QUARK_WARNING: %s(): %s\n", func_name, msg_text);
407 }
408 
409 /* **************************************************************************** */
413 static inline void *quark_malloc(size_t size)
414 {
415  void *mem = malloc(size);
416  if ( mem == NULL ) quark_fatal_error( "malloc", "memory allocation failed" );
417  return mem;
418 }
419 
420 /* **************************************************************************** */
424 static Task *quark_task_new()
425 {
426  static unsigned long long taskid = 1;
427  Task *task = (Task *)quark_malloc(sizeof(Task));
428  task->function = NULL;
429  task->num_dependencies_remaining = 0;
430  task->args_list = icl_list_new();
431  if ( task->args_list == NULL) quark_fatal_error( "quark_task_new", "Allocating arg list" );
432  task->dependency_list = icl_list_new();
433  if ( task->dependency_list == NULL) quark_fatal_error( "quark_task_new", "Allocating dependency list" );
434  task->locality_preserving_dep = NULL;
435  task->scratch_list = icl_list_new();
436  if ( task->scratch_list == NULL) quark_fatal_error( "quark_task_new", "Allocating scratch list" );
437  if ( taskid >= ULLONG_MAX) quark_fatal_error( "quark_task_new", "Task id > ULLONG_MAX, too many tasks" );
438  task->taskid = taskid++;
439  task->tasklevel = 0;
440  pthread_mutex_init( &task->task_mutex, NULL );
441  task->ptr_to_task_in_sequence = NULL;
442  task->sequence = NULL;
444  task->task_label = quark_task_default_label;
445  task->task_color = quark_task_default_color;
446  task->lock_to_thread = -1;
447  task->lock_to_thread_mask = NULL;
448  task->task_thread_count = 1;
450  task->threadid = 0;
451  task->status = ALLOCATED_ONLY;
452  task->executed_on_threadid = -1;
453  return task;
454 }
455 
456 
457 /* **************************************************************************** */
463 static void *quark_task_delete(Quark *quark, Task *task)
464 {
465  /* task is not just allocated, it has been inserted and may have other references to it */
466  if ( task->status!=ALLOCATED_ONLY ) {
468  pthread_mutex_lock_wrap( &quark->task_set_mutex );
469  icl_hash_delete( quark->task_set, &task->taskid, NULL, NULL );
470  quark->num_tasks--;
471  pthread_mutex_lock_task( &task->task_mutex );
472  pthread_mutex_unlock_wrap( &quark->task_set_mutex );
473  }
474  if ( task->task_color!=NULL && task->task_color!=quark_task_default_color ) { free(task->task_color); task->task_color = NULL; }
475  if ( task->task_label!=NULL && task->task_label!=quark_task_default_label ) { free(task->task_label); task->task_label = NULL; }
476  icl_list_destroy(task->args_list, free);
477  icl_list_destroy(task->dependency_list, free);
478  icl_list_destroy(task->scratch_list, free);
479  if ( task->status!=ALLOCATED_ONLY ) {
480  if ( task->ptr_to_task_in_sequence != NULL ) {
481  pthread_mutex_lock_wrap( &task->sequence->sequence_mutex );
482  LIST_REMOVE( task->ptr_to_task_in_sequence, ll_entries );
483  pthread_mutex_unlock_wrap( &task->sequence->sequence_mutex );
484  free( task->ptr_to_task_in_sequence );
485  }
486  pthread_mutex_unlock_task( &task->task_mutex );
487  }
489  free( task );
490  task = NULL;
491  return task;
492 }
493 
494 /* **************************************************************************** */
505 {
506  pthread_t self_id = pthread_self();
507  int i;
508  for (i=0; i<quark->num_threads; i++)
509  if (pthread_equal(quark->worker[i]->thread_id, self_id))
510  return i;
511  return -1;
512 }
513 
514 /* **************************************************************************** */
525 void *QUARK_Args_List(Quark *quark)
526 {
527  Task *curr_task = quark->worker[QUARK_Thread_Rank(quark)]->current_task_ptr;
528  return (void *)curr_task->args_list;
529 }
530 
531 /* **************************************************************************** */
541 /* FIXME This is working but could be more efficient. Depends on the
542  * function being called only once by each instance in a
543  * multi-threaded task */
545 {
546  int local_rank = 0;
547  int global_rank = QUARK_Thread_Rank(quark);
548  Task *curr_task = quark->worker[global_rank]->current_task_ptr;
549 
550  pthread_mutex_lock_wrap( &curr_task->task_mutex );
551  local_rank = curr_task->threadid;
552  curr_task->threadid++;
553  pthread_mutex_unlock_wrap( &curr_task->task_mutex );
554 
555  return local_rank;
556 }
557 
558 /* **************************************************************************** */
573 void *QUARK_Args_Pop( void *args_list, void **last_arg)
574 {
575  icl_list_t *args = (icl_list_t *)args_list;
576  icl_list_t *node = (icl_list_t *)*last_arg;
577  void *arg = NULL;
578  if ( node == NULL ) {
579  node = icl_list_first( args );
580  if (node!=NULL) arg = node->data;
581  } else {
582  node = icl_list_next( args, node );
583  if (node!=NULL) arg = node->data;
584  }
585  *last_arg = node;
586  return arg;
587 }
588 
589 /* **************************************************************************** */
593 static inline unsigned int fnv_hash_function( void *key, int len )
594 {
595  unsigned char *p = key;
596  unsigned int h = 2166136261u;
597  int i;
598  for ( i = 0; i < len; i++ )
599  h = ( h * 16777619 ) ^ p[i];
600  return h;
601 }
602 
603 /* **************************************************************************** */
609 static inline unsigned int address_hash_function(void *address)
610 {
611  int len = sizeof(void *);
612  unsigned int hashval = fnv_hash_function( &address, len );
613  return hashval;
614 }
615 
616 /* **************************************************************************** */
619 static inline int address_key_compare(void *addr1, void *addr2)
620 {
621  return (addr1 == addr2);
622 }
623 
624 /* **************************************************************************** */
628 static inline unsigned int ullong_hash_function( void *key )
629 {
630  int len = sizeof(unsigned long long);
631  unsigned int hashval = fnv_hash_function( key, len );
632  return hashval;
633 }
634 /* **************************************************************************** */
638 static inline int ullong_key_compare( void *key1, void *key2 )
639 {
640  return ( *(unsigned long long*)key1 == *(unsigned long long*)key2 );
641 }
642 
643 /* **************************************************************************** */
649 static inline int quark_worker_find_next_assignable( Quark *quark )
650 {
651  int id = quark->list_robin;
652  quark->list_robin = ((quark->list_robin + 1) % quark->num_threads);
653  return id;
654 }
655 
656 /* **************************************************************************** */
660 static inline char *arg_dup(char *arg, int size)
661 {
662  char *argbuf = (char *) quark_malloc(size);
663  memcpy(argbuf, arg, size);
664  return argbuf;
665 }
666 
667 /* **************************************************************************** */
671 static inline Dependency *dependency_new(void *addr, long long size, quark_direction_t dir, bool loc, Task *task, bool accumulator, bool gatherv, int data_region, icl_list_t *task_args_list_node_ptr)
672 {
673  Dependency *dep = (Dependency *) quark_malloc(sizeof(Dependency));
674  dep->task = task;
675  dep->address = addr;
676  dep->size = size;
677  dep->direction = dir;
678  dep->locality = loc;
679  dep->accumulator = accumulator;
680  dep->data_region = data_region;
681  dep->gatherv = gatherv;
682  dep->address_set_node_ptr = NULL; /* convenience ptr, filled later */
683  dep->address_set_waiting_deps_node_ptr = NULL; /* convenience ptr, filled later */
684  dep->task_args_list_node_ptr = task_args_list_node_ptr; /* convenience ptr for WAR address updating */
685  dep->task_dependency_list_node_ptr = NULL; /* convenience ptr */
686  dep->ready = FALSE;
687  /* For the task, track the dependency to be used to do locality
688  * preservation; by default, use first output dependency. */
689  if ( dep->locality )
690  task->locality_preserving_dep = dep;
691  else if ( (task->locality_preserving_dep == NULL) && ( dep->direction==OUTPUT || dep->direction==INOUT) )
692  task->locality_preserving_dep = dep;
693  return dep;
694 }
695 
696 /* **************************************************************************** */
700 static Worker *quark_worker_new(Quark *quark, int rank)
701 {
702  Worker *worker = (Worker *) quark_malloc(sizeof(Worker));
703  worker->thread_id = pthread_self();
704  pthread_mutex_init( &worker->worker_mutex, NULL );
705  worker->rank = rank;
706  worker->ready_list = quark_malloc(sizeof(task_priority_tree_head_t));
707  RB_INIT( worker->ready_list );
708  worker->ready_list_size = 0;
709  /* convenience pointer to the real args for the task */
710  worker->current_task_ptr = NULL;
711  worker->quark_ptr = quark;
712  worker->finalize = FALSE;
713  worker->executing_task = FALSE;
715  pthread_cond_init( &worker->worker_must_awake_cond, NULL );
716  worker->status = WORKER_NOT_SLEEPING;
717  return worker;
718 }
719 
720 /* **************************************************************************** */
724 static void quark_worker_delete(Worker * worker)
725 {
726  task_priority_tree_node_t *node, *nxt;
727  /* Destroy the workers priority queue, if there is still anything there */
728  for ( node = RB_MIN( task_priority_tree_head_s, worker->ready_list ); node != NULL; node = nxt) {
729  nxt = RB_NEXT( task_priority_tree_head_s, worker->ready_list, node );
730  RB_REMOVE( task_priority_tree_head_s, worker->ready_list, node );
731  free(node);
732  }
733  free( worker->ready_list );
735  free(worker);
736 }
737 
738 /* **************************************************************************** */
743 static Scratch *quark_scratch_new( void *arg_ptr, int arg_size, icl_list_t *task_args_list_node_ptr )
744 {
745  Scratch *scratch = (Scratch *)quark_malloc(sizeof(Scratch));
746  scratch->ptr = arg_ptr;
747  scratch->size = arg_size;
748  scratch->task_args_list_node_ptr = task_args_list_node_ptr;
749  return(scratch);
750 }
751 
752 /* **************************************************************************** */
756 static void quark_scratch_allocate( Task *task )
757 {
758  icl_list_t *scr_node;
759  for (scr_node = icl_list_first( task->scratch_list );
760  scr_node != NULL && scr_node->data != NULL;
761  scr_node = icl_list_next(task->scratch_list, scr_node)) {
762  Scratch *scratch = (Scratch *)scr_node->data;
763  if ( scratch->ptr == NULL ) {
764  /* Since ptr is null, space is to be allocted and attached */
765  if ( scratch->size <= 0 ) quark_fatal_error( "quark_scratch_allocate", "scratch->size <= 0 " );
766  void *scratchspace = quark_malloc( scratch->size );
767  *(void **)scratch->task_args_list_node_ptr->data = scratchspace;
768  }
769  }
770 }
771 
772 /* **************************************************************************** */
776 static void quark_scratch_deallocate( Task *task )
777 {
778  icl_list_t *scr_node;
779  for (scr_node = icl_list_first( task->scratch_list );
780  scr_node != NULL && scr_node->data!=NULL;
781  scr_node = icl_list_next(task->scratch_list, scr_node)) {
782  Scratch *scratch = (Scratch *)scr_node->data;
783  if ( scratch->ptr == NULL ) {
784  /* If scratch had to be allocated, free it */
785  free(*(void **)scratch->task_args_list_node_ptr->data);
786  }
787  }
788 }
789 
790 /* **************************************************************************** */
803 Quark *QUARK_Setup(int num_threads)
804 {
805  int i = 0;
806  Quark *quark = (Quark *) quark_malloc(sizeof(Quark));
807  /* Used to tell master when to act as worker */
808  int quark_unroll_tasks_per_thread = quark_getenv_int("QUARK_UNROLL_TASKS_PER_THREAD", 50);
809  int quark_unroll_tasks = quark_getenv_int("QUARK_UNROLL_TASKS", quark_unroll_tasks_per_thread * num_threads);
810  quark->war_dependencies_enable = quark_getenv_int("QUARK_WAR_DEPENDENCIES_ENABLE", 0);
811  quark->queue_before_computing = quark_getenv_int("QUARK_QUEUE_BEFORE_COMPUTING", 0);
812  quark->dot_dag_enable = quark_getenv_int("QUARK_DOT_DAG_ENABLE", 0);
813  //if ( quark->dot_dag_enable ) quark->queue_before_computing = 1;
814  if ( quark->queue_before_computing==1 || quark_unroll_tasks==0 ) {
815  quark->high_water_mark = (int)(INT_MAX - 1);
816  quark->low_water_mark = (int)(quark->high_water_mark);
817  } else {
818  quark->low_water_mark = (int)(quark_unroll_tasks);
819  quark->high_water_mark = (int)(quark->low_water_mark + quark->low_water_mark*0.25);
820  }
821  quark->num_queued_tasks = 0;
823  pthread_cond_init( &quark->num_queued_tasks_cond, NULL );
824  quark->num_threads = num_threads;
825  quark->list_robin = 0;
826  pthread_mutex_init( &quark->quark_mutex, NULL );
827  quark->start = FALSE;
828  quark->all_tasks_queued = FALSE;
829  quark->num_tasks = 0;
830  quark->task_set = icl_hash_create( 0x1<<12, ullong_hash_function, ullong_key_compare );
831  pthread_mutex_init( &quark->task_set_mutex, NULL );
832  /* Create hash table to hold addresses */
833  quark->address_set = icl_hash_create( 0x01<<12, address_hash_function, address_key_compare);
834  pthread_mutex_init( &quark->address_set_mutex, NULL );
835  /* To handle completed tasks */
836  quark->completed_tasks = quark_malloc(sizeof(completed_tasks_head_t));
837  TAILQ_INIT( quark->completed_tasks );
838  pthread_mutex_init( &quark->completed_tasks_mutex, NULL );
839  quark->completed_tasks_size = 0;
840  /* Setup workers */
841  quark->worker = (Worker **) quark_malloc(num_threads * sizeof(Worker *));
842  /* The structure for the 0th worker will be used by the master */
843  quark->worker[0] = quark_worker_new(quark, 0);
844  quark->worker[0]->thread_id = pthread_self();
845  quark->dot_dag_was_setup = 0;
846  if ( quark->dot_dag_enable ) QUARK_DOT_DAG_Enable( quark, 1 );
847  /* Launch workers; first create the structures */
848  for(i = 1; i < num_threads; i++)
849  quark->worker[i] = quark_worker_new(quark, i);
850  /* Threads can start as soon as they want */
851  quark->start = TRUE;
852  return quark;
853 }
854 
855 /* **************************************************************************** */
869 Quark *QUARK_New(int num_threads)
870 {
871  int i, nthrd;
872  /* Init number of cores and topology */
874  /* Get number of threads */
875  if ( num_threads < 1 ) {
876  nthrd = quark_get_numthreads();
877  if ( nthrd == -1 ) nthrd = 1;
878  } else {
879  nthrd = num_threads;
880  }
881  /* Create scheduler data structures for master and workers */
882  Quark *quark = QUARK_Setup(nthrd);
883  /* Get binding informations */
884  quark->coresbind = quark_get_affthreads();
885  /* Setup thread attributes */
887  /* pthread_setconcurrency(quark->num_threads); */
889  /* Then start the threads, so that workers can scan the structures easily */
890  for(i = 1; i < nthrd; i++) {
891  int rc = pthread_create(&quark->worker[i]->thread_id, &quark->thread_attr, (void *(*)(void *))quark_work_set_affinity_and_call_main_loop, quark->worker[i]);
892  if ( rc != 0 ) quark_fatal_error ( " QUARK_New", "Could not create threads properly" );
893  }
894  quark_setaffinity( quark->coresbind[0] );
895  return quark;
896 }
897 
898 /* **************************************************************************** */
908 void QUARK_Barrier(Quark * quark)
909 {
910  long long num_tasks = 1;
911  /* Force queue_before_computing to be OFF!! */
912  quark->queue_before_computing = FALSE;
913  quark->all_tasks_queued = TRUE;
914  do {
915  quark_process_completed_tasks(quark);
916  num_tasks = quark_work_main_loop( quark->worker[0] );
917 #ifdef QUARK_WITH_VALGRIND
918  /* Asim: maybe you can have a signal right here ? */
919  pthread_yield();
920 #endif
921  } while ( num_tasks > 0 );
922  /* FIXME Since address_set_nodes are not cleaned as the code progresses, they are freed here */
923  if ( quark->dot_dag_enable ) {
924  /* If dag generation is enabled, reset level counters */
925  unsigned long long tasklevel = 0;
926  for ( tasklevel=1; tasklevel<tasklevel_width_max_level; tasklevel++ )
927  if ( quark->tasklevel_width[tasklevel] == 0 )
928  break;
929  tasklevel = tasklevel -1;
930  int tmpint; icl_entry_t* tmpent; void *kp, *dp;
931  icl_hash_foreach(quark->address_set, tmpint, tmpent, kp, dp) {
932  Address_Set_Node *address_set_node = (Address_Set_Node *)dp;
933  address_set_node->last_writer_tasklevel = tasklevel;
934  address_set_node->last_reader_or_writer_tasklevel = tasklevel;
935  }
936  fprintf(dot_dag_file, "// QUARK_Barrier reached: level=%llu \n", tasklevel );
937  } else {
938  /* If NO dag generation, cleanup memory */
939  icl_hash_destroy( quark->address_set, NULL, quark_address_set_node_free );
940  quark->address_set = icl_hash_create( 0x01<<12, address_hash_function, address_key_compare);
941  }
942 }
943 
944 /* **************************************************************************** */
954 void QUARK_Waitall(Quark * quark)
955 {
956  int i;
957  Worker *worker;
958  QUARK_Barrier( quark );
959  /* Tell each worker to exit the work_loop; master handles himself */
960  for (i=1; i<quark->num_threads; i++) {
961  worker = quark->worker[i];
962  DBGPRINTF("Wkr %d [ %d ] setting finalize\n", worker->rank, worker->ready_list_size );
963  quark_atomic_set( worker->finalize, TRUE, &worker->worker_mutex );
964  }
965  pthread_mutex_lock_wrap( &quark->num_queued_tasks_mutex );
966  for (i=0; i<quark->num_threads; i++)
967  pthread_cond_signal( &quark->worker[i]->worker_must_awake_cond );
968  pthread_mutex_unlock_wrap( &quark->num_queued_tasks_mutex );
969 }
970 
971 /* **************************************************************************** */
981 void QUARK_Free(Quark * quark)
982 {
983  int i;
984  QUARK_Waitall(quark);
985  /* Write the level matching/forcing information */
986  QUARK_DOT_DAG_Enable( quark, 0 );
987  /* Destroy hash tables, workers and other data structures */
988  for (i = 1; i < quark->num_threads; i++)
989  quark_worker_delete( quark->worker[i] );
990  quark_worker_delete( quark->worker[0] );
991  if (quark->worker) free(quark->worker);
992  if (quark->completed_tasks) free(quark->completed_tasks);
993  icl_hash_destroy( quark->address_set, NULL, quark_address_set_node_free );
994  icl_hash_destroy( quark->task_set, NULL, NULL );
997  free(quark);
998 }
999 
1000 /* **************************************************************************** */
1009 void QUARK_Delete(Quark * quark)
1010 {
1011  void *exitcodep = NULL;
1012  int i;
1013  /* Wait for all tasks to complete */
1014  QUARK_Waitall( quark );
1015  /* Wait for workers to quit and join threads */
1016  for (i = 1; i < quark->num_threads; i++)
1017  pthread_join(quark->worker[i]->thread_id, &exitcodep);
1018  pthread_attr_destroy( &quark->thread_attr );
1019  /* Destroy specific structures */
1020  if (quark->coresbind) free(quark->coresbind);
1022  /* Destroy hash tables, workers and other data structures */
1023  QUARK_Free( quark );
1024 }
1025 
1026 /* **************************************************************************** */
1032 {
1033  if ( task_flags ) {
1034  if ( task_flags->task_priority ) task->priority = task_flags->task_priority;
1035  if ( task_flags->task_lock_to_thread >= 0 ) task->lock_to_thread = task_flags->task_lock_to_thread;
1036  if ( task_flags->task_lock_to_thread_mask ) task->lock_to_thread_mask = task_flags->task_lock_to_thread_mask;
1037  if ( task_flags->task_color && quark->dot_dag_enable ) task->task_color = strdup(task_flags->task_color);
1038  if ( task_flags->task_label && quark->dot_dag_enable ) task->task_label = strdup(task_flags->task_label);
1039  if ( task_flags->task_sequence ) task->sequence = task_flags->task_sequence;
1040  if ( task_flags->task_thread_count > 1 ) task->task_thread_count = task_flags->task_thread_count;
1042  }
1043  return task;
1044 }
1045 
1046 /* **************************************************************************** */
1060 Task *QUARK_Task_Init(Quark * quark, void (*function) (Quark *), Quark_Task_Flags *task_flags )
1061 {
1062  Task *task = quark_task_new();
1063  task->function = function;
1064  quark_set_task_flags_in_task_structure( quark, task, task_flags );
1065  return task;
1066 }
1067 
1068 /* **************************************************************************** */
1090 void QUARK_Task_Pack_Arg( Quark *quark, Task *task, int arg_size, void *arg_ptr, int arg_flags )
1091 {
1092  int value_mask;
1093  bool arg_locality, accumulator, gatherv;
1094  int data_region;
1095  icl_list_t *task_args_list_node_ptr;
1096  Scratch *scratcharg;
1097  // extract information from the flags
1098  quark_direction_t arg_direction = (quark_direction_t) (arg_flags & QUARK_DIRECTION_BITMASK);
1099  switch ( arg_direction ) {
1100  case VALUE:
1101  /* If argument is a value; Copy the contents to the argument buffer */
1102  value_mask = ( arg_flags & QUARK_VALUE_FLAGS_BITMASK );
1103  if ( value_mask==0 ) {
1104  icl_list_append(task->args_list, arg_dup(arg_ptr, arg_size));
1105  } else if ( (arg_flags & TASK_PRIORITY) != 0 ) {
1106  task->priority = *((int *)arg_ptr);
1107  } else if ( (arg_flags & TASK_LOCK_TO_THREAD) != 0 ) {
1108  task->lock_to_thread = *((int *)arg_ptr);
1109  } else if ( (arg_flags & TASK_THREAD_COUNT) != 0 ) {
1110  task->task_thread_count = *((int *)arg_ptr);
1111  } else if ( (arg_flags & TASK_SEQUENCE) != 0 ) {
1112  task->sequence = *((Quark_Sequence **)arg_ptr);
1113  } else if ( (arg_flags & THREAD_SET_TO_MANUAL_SCHEDULING) != 0 ) {
1114  task->thread_set_to_manual_scheduling = *((int *)arg_ptr);
1115  } else if ( (arg_flags & TASK_COLOR) != 0 ) {
1116  if ( quark->dot_dag_enable ) {
1117  task->task_color = arg_dup(arg_ptr, arg_size);
1118  }
1119  } else if ( (arg_flags & TASK_LABEL) != 0 ) {
1120  if ( quark->dot_dag_enable ) {
1121  task->task_label = arg_dup(arg_ptr, arg_size) ;
1122  }
1123  }
1124  break;
1125  case NODEP:
1126  icl_list_append(task->args_list, arg_dup((char *) &arg_ptr, sizeof(char *)));
1127  break;
1128  case SCRATCH:
1129  task_args_list_node_ptr = icl_list_append(task->args_list, arg_dup((char *) &arg_ptr, sizeof(char *)));
1130  scratcharg = quark_scratch_new( arg_ptr, arg_size, task_args_list_node_ptr);
1131  icl_list_append( task->scratch_list, scratcharg );
1132  break;
1133  case INPUT:
1134  case OUTPUT:
1135  case INOUT:
1136  default:
1137  task_args_list_node_ptr = icl_list_append(task->args_list, arg_dup((char *) &arg_ptr, sizeof(char *)));
1138  arg_locality = (bool) ((arg_flags & LOCALITY) != 0 );
1139  accumulator = (bool) ((arg_flags & ACCUMULATOR) != 0 );
1140  gatherv = (bool) ((arg_flags & GATHERV) != 0 );
1141  if ( (arg_flags & QUARK_REGION_BITMASK) != 0 )
1142  data_region = (arg_flags & QUARK_REGION_BITMASK);
1143  else
1144  data_region = QUARK_REGION_ALL;
1145  // DBGPRINTF("Adding dependency arg_flags %x arg_direction %d data_region %x\n", arg_flags, arg_direction, data_region);
1146  Dependency *dep = dependency_new(arg_ptr, arg_size, arg_direction, arg_locality, task, accumulator, gatherv, data_region, task_args_list_node_ptr);
1147  /* Insert dependency in order of address; uses simple resource ordering to avoid deadlock situations */
1148  icl_list_t *ptr = NULL;
1149  icl_list_t *task_dependency_list_node_ptr = NULL;
1150  for (ptr = icl_list_last(task->dependency_list); ptr != NULL; ptr = icl_list_prev(task->dependency_list, ptr)) {
1151  Dependency *ptrdep = (Dependency *)ptr->data;
1152  if (ptrdep->address > dep->address ) {
1153  task_dependency_list_node_ptr = icl_list_insert( task->dependency_list, ptr, dep );
1154  break;
1155  }
1156  }
1157  if ( ptr==NULL) task_dependency_list_node_ptr = icl_list_append( task->dependency_list, dep );
1158  dep->task_dependency_list_node_ptr = task_dependency_list_node_ptr;
1160  break;
1161  }
1162 }
1163 
1164 /* **************************************************************************** */
1181 unsigned long long QUARK_Insert_Task_Packed(Quark * quark, Task *task )
1182 {
1183  long long num_tasks = -1;
1184  unsigned long long taskid = task->taskid;
1185  Quark_Sequence *sequence;
1187  /* Track sequence information if it is provided */
1188  if ( task->sequence ) {
1189  sequence = task->sequence;
1190  pthread_mutex_lock_wrap( &sequence->sequence_mutex );
1191  if ( task->sequence->status == QUARK_ERR ) {
1192  /* If the sequence is cancelled or has error, return at once */
1193  task->function = NULL;
1194  pthread_mutex_unlock_wrap( &sequence->sequence_mutex );
1195  quark_task_delete( quark, task );
1196  return QUARK_ERR;
1197  } else {
1198  /* Otherwise insert this task into sequence */
1199  ll_list_node_t *entry = quark_malloc(sizeof(ll_list_node_t));
1200  entry->val = task->taskid;
1201  ll_list_head_t *headp = task->sequence->tasks_in_sequence;
1202  LIST_INSERT_HEAD( headp, entry, ll_entries );
1203  pthread_mutex_unlock_wrap( &task->sequence->sequence_mutex );
1204  /* Keep pointer to task in sequence so it can be deleted when task completes */
1205  task->ptr_to_task_in_sequence = entry;
1206  }
1207  }
1208  task->status = NOTREADY;
1210  /* Save the task in task_set, indexed by its taskid */
1211  pthread_mutex_lock_wrap( &quark->task_set_mutex );
1212  icl_hash_insert( quark->task_set, &task->taskid, task );
1213  quark->all_tasks_queued = FALSE;
1214  num_tasks = quark->num_tasks++;
1215  pthread_mutex_unlock_wrap( &quark->task_set_mutex );
1216  DBGPRINTF("Wkr %d [ %d ] Inserted %lld [ %d %d ] into task set [ %lld ]\n", QUARK_Thread_Rank(quark), quark->worker[0]->ready_list_size, task->taskid, task->priority, task->task_thread_count, quark->num_tasks );
1217  /* Insert the task in the address hash, locking access to the address set hash */
1218  quark_insert_task_dependencies( quark, task );
1219  /* Check and see if task is ready for execution */
1220  pthread_mutex_lock_task( &task->task_mutex );
1221  quark_check_and_queue_ready_task( quark, task, -1 );
1222  pthread_mutex_unlock_task( &task->task_mutex );
1223 
1224  /* If conditions are right, task insertion blocks and master
1225  * works; this will return when num_tasks becomes less than
1226  * low_water_mark */
1227  quark_process_completed_tasks(quark);
1228  while ( (quark->high_water_mark>0) && (num_tasks>=quark->high_water_mark) ) {
1229  num_tasks = quark_work_main_loop(quark->worker[0]);
1230  quark_process_completed_tasks(quark);
1231  }
1232  return taskid ;
1233 }
1234 
1235 /* **************************************************************************** */
1264 unsigned long long QUARK_Insert_Task(Quark * quark, void (*function) (Quark *), Quark_Task_Flags *task_flags, ...)
1265 {
1266  va_list varg_list;
1267  int arg_size;
1268  unsigned long long taskid;
1269 
1270  Task *task = QUARK_Task_Init(quark, function, task_flags);
1271 
1272  /* For each argument */
1273  va_start(varg_list, task_flags);
1274  while( (arg_size = va_arg(varg_list, int)) != 0) {
1275  void *arg_ptr = va_arg(varg_list, void *);
1276  int arg_flags = va_arg(varg_list, int);
1277  QUARK_Task_Pack_Arg( quark, task, arg_size, arg_ptr, arg_flags );
1278  }
1279  va_end(varg_list);
1280 
1281  taskid = QUARK_Insert_Task_Packed( quark, task );
1282 
1283  return taskid;
1284 }
1285 
1286 /* **************************************************************************** */
1313 unsigned long long QUARK_Execute_Task(Quark * quark, void (*function) (Quark *), Quark_Task_Flags *task_flags, ...)
1314 {
1315  va_list varg_list;
1316  int arg_size;
1317 
1318  Task *task = QUARK_Task_Init(quark, function, task_flags);
1319 
1320  va_start(varg_list, task_flags);
1321  // For each argument
1322  while( (arg_size = va_arg(varg_list, int)) != 0) {
1323  void *arg_ptr = va_arg(varg_list, void *);
1324  int arg_flags = va_arg(varg_list, int);
1325  QUARK_Task_Pack_Arg( quark, task, arg_size, arg_ptr, arg_flags );
1326  }
1327  va_end(varg_list);
1328 
1329  int thread_rank = QUARK_Thread_Rank(quark);
1330  Worker *worker = quark->worker[thread_rank];
1331  if ( task->function == NULL ) {
1332  /* This can occur if the task is cancelled */
1333  task->status = CANCELLED;
1334  } else {
1335  /* Call the task */
1336  task->status = RUNNING;
1337  worker->current_task_ptr = task;
1338  quark_scratch_allocate( task );
1339  task->function( quark );
1340  quark_scratch_deallocate( task );
1341  worker->current_task_ptr = NULL;
1342  task->status = DONE;
1343  }
1344 
1345  /* Delete the task data structures */
1346  icl_list_destroy(task->args_list, free);
1347  icl_list_destroy(task->dependency_list, free);
1348  icl_list_destroy(task->scratch_list, free);
1350  free(task);
1351 
1352  /* There is no real taskid to be returned, since the task has been deleted */
1353  return( 0 );
1354 }
1355 
1356 /* **************************************************************************** */
1373 int QUARK_Cancel_Task(Quark *quark, unsigned long long taskid)
1374 {
1375  pthread_mutex_lock_wrap( &quark->task_set_mutex );
1376  Task *task = icl_hash_find( quark->task_set, &taskid );
1377  if ( task == NULL ) {
1378  pthread_mutex_unlock_wrap( &quark->task_set_mutex );
1379  return -1;
1380  }
1381  pthread_mutex_lock_task( &task->task_mutex );
1382  if ( task->status==RUNNING || task->status==DONE || task->status==CANCELLED ) {
1383  pthread_mutex_unlock_task( &task->task_mutex );
1384  pthread_mutex_unlock_wrap( &quark->task_set_mutex );
1385  return -2;
1386  }
1387  task->function = NULL;
1388  pthread_mutex_unlock_task( &task->task_mutex );
1389  pthread_mutex_unlock_wrap( &quark->task_set_mutex );
1390  return 1;
1391 }
1392 
1393 /* **************************************************************************** */
1398 static Address_Set_Node *quark_address_set_node_new( void* address, int size )
1399 {
1400  Address_Set_Node *address_set_node = (Address_Set_Node *)quark_malloc(sizeof(Address_Set_Node));
1401  address_set_node->address = address;
1402  address_set_node->size = size;
1403  address_set_node->last_thread = -1;
1404  address_set_node->waiting_deps = icl_list_new();
1405  if ( address_set_node->waiting_deps == NULL )
1406  quark_fatal_error( "quark_address_set_node_new", "Problem creating icl_list_new" );
1408  address_set_node->last_writer_taskid = 0;
1409  address_set_node->last_writer_tasklevel = 0;
1410  address_set_node->last_reader_or_writer_taskid = 0;
1411  address_set_node->last_reader_or_writer_tasklevel = 0;
1412  pthread_mutex_init( &address_set_node->asn_mutex, NULL );
1413  return address_set_node;
1414 }
1415 
1416 /* **************************************************************************** */
1420 static void quark_address_set_node_free( void* data )
1421 {
1422  Address_Set_Node *address_set_node = (Address_Set_Node *)data;
1423  icl_list_destroy( address_set_node->waiting_deps, free );
1424  pthread_mutex_destroy( &address_set_node->asn_mutex );
1425  free (address_set_node );
1426 }
1427 
1428 /* **************************************************************************** */
1439 static void quark_check_and_queue_ready_task( Quark *quark, Task *task, int worker_rank )
1440 {
1441  int worker_thread_id = -1;
1442  Worker *worker = NULL;
1443  int assigned_thread_count = 0;
1444  int first_worker_thread_id_repeated = -1;
1445  int i = 0;
1446  int wtid = 0;
1447 
1448  /* Quick return */
1449  if ( task->num_dependencies_remaining > 0 || task->status == QUEUED || task->status == RUNNING || task->status == DONE ) {
1450  return;
1451  }
1452  task->status = QUEUED;
1453  /* Assign task to thread. Locked tasks get sent to appropriate
1454  * thread. Locality tasks should have be correctly placed. Tasks
1455  * without either should have the original round robin thread
1456  * assignment */
1457  if ( task->lock_to_thread >= 0 ) {
1458  worker_thread_id = task->lock_to_thread % quark->num_threads;
1459  }
1460  if ( worker_thread_id<0 && task->locality_preserving_dep != NULL ) {
1461  int test_thread_id = -1;
1462  if ( pthread_mutex_lock_address_set( &quark->address_set_mutex ) == 0 ) {
1464  if ( address_set_node != NULL )
1465  /* The asn_mutex may already be locked, so it should not be locked here. However it should not matter if we get an older version of the thread_id variable */
1466  test_thread_id = address_set_node->last_thread % quark->num_threads;
1467  pthread_mutex_unlock_address_set( &quark->address_set_mutex );
1468  }
1469  if (( test_thread_id >= 0 )
1470  /* test_thread_id is not set to manual scheduling */
1471  && ( quark->worker[test_thread_id]->set_to_manual_scheduling==FALSE )
1472  /* task is not locked to mask; OR it is stealable by test_thread_id */
1473  && ( task->lock_to_thread_mask==NULL || QUARK_Bit_Get( task->lock_to_thread_mask,test_thread_id)==1 ))
1474  worker_thread_id = test_thread_id;
1475  }
1476  /* Any unassiged tasks use round-robin on assignable workers to choose a worker thread */
1477  if ( worker_thread_id < 0 ) {
1478  for ( i=0; i<quark->num_threads; i++ ) {
1479  int test_thread_id = quark_worker_find_next_assignable( quark );
1480  if (( test_thread_id >= 0 )
1481  /* test_thread_id is not set to manual scheduling */
1482  && ( quark->worker[test_thread_id]->set_to_manual_scheduling==FALSE )
1483  /* task is not locked to mask; OR it is stealable by test_thread_id */
1484  && ( task->lock_to_thread_mask==NULL || QUARK_Bit_Get( task->lock_to_thread_mask,test_thread_id)==1 )) {
1485  worker_thread_id = test_thread_id;
1486  break;
1487  }
1488  }
1489  }
1490 
1491  /* Throw an error if for some reason no worker_thread could be found */
1492  if ( worker_thread_id < 0 )
1493  quark_fatal_error( "quark_check_and_queue_ready_task", "Task could not be assigned to any thread" );
1494 
1495  /* Parallel tasks using less than the total number of threads are not using thread 0 */
1496  if ( task->task_thread_count > quark->num_threads )
1497  quark_fatal_error( "quark_check_and_queue_ready_task", "Task requests more threads than available" );
1498  if ( ( task->task_thread_count > 1 )
1499  && ( quark->num_threads > task->task_thread_count )
1500  && ( worker_thread_id == 0 ))
1501  worker_thread_id++;
1502 
1503  first_worker_thread_id_repeated = worker_thread_id;
1504  while ( assigned_thread_count < task->task_thread_count) {
1505  worker = quark->worker[worker_thread_id];
1506  /* Create a new entry for the ready list */
1507  task_priority_tree_node_t *new_task_tree_node = quark_malloc(sizeof(task_priority_tree_node_t));
1508  new_task_tree_node->priority = task->priority;
1509  new_task_tree_node->task = task;
1510  /* Insert new entry into the ready list */
1511  if ( pthread_mutex_lock_ready_list( &worker->worker_mutex )==0 ) {
1512  RB_INSERT( task_priority_tree_head_s, worker->ready_list, new_task_tree_node );
1513  worker->ready_list_size++;
1514  pthread_mutex_unlock_ready_list(&worker->worker_mutex );
1515  quark_trace_addtask2worker(quark->worker[worker_thread_id]->thread_id);
1516  }
1517  assigned_thread_count++;
1518  /* DBGPRINTF("Wkr %d [ %d ] was_assigned tid %lld [ %d %d/%d ] assigned by task %d\n", worker_thread_id, worker->ready_list_size, task->taskid, task->priority, assigned_thread_count, task->task_thread_count, QUARK_Thread_Rank(quark) ); */
1519  /* Set worker to manual scheduling at the time the task is
1520  * assigned to the worker. Only the master should be changing
1521  * this variable, so it does not require locking. */
1523  else if ( task->thread_set_to_manual_scheduling == 1 ) worker->set_to_manual_scheduling = TRUE;
1524  /* Wake up worker, or if it is already awake, some other sleeping worker */
1525  if ( pthread_mutex_lock_wrap( &quark->num_queued_tasks_mutex ) == 0 ) {
1526  quark->num_queued_tasks++;
1527  for ( wtid=worker_thread_id; ; ) {
1528  if ( quark->worker[wtid]->status == WORKER_SLEEPING ) {
1529  pthread_cond_signal( &quark->worker[wtid]->worker_must_awake_cond );
1530  break;
1531  }
1532  wtid = ( wtid + 1 ) % quark->num_threads;
1533  if ( wtid==worker_thread_id ) break;
1534  }
1535  pthread_mutex_unlock_wrap( &quark->num_queued_tasks_mutex );
1536  }
1537  if ( assigned_thread_count < task->task_thread_count ) {
1538  /* NOTE This is a special case, multi-threaded tasks do scheduling strangely */
1539  worker_thread_id = (worker_thread_id+1) % quark->num_threads;
1540  while ( worker_thread_id != first_worker_thread_id_repeated &&
1541  quark->worker[worker_thread_id]->set_to_manual_scheduling )
1542  worker_thread_id = (worker_thread_id+1) % quark->num_threads;
1543  /* If it's a parallel task, we try to avoid waiting for thread 0 if possible */
1544  if ( ( worker_thread_id == 0 ) && (quark->num_threads > task->task_thread_count) )
1545  worker_thread_id++;
1546  if ( worker_thread_id == first_worker_thread_id_repeated )
1547  quark_fatal_error("quark_check_and_queue_ready_task", "Not enough workers for task" );
1548  }
1549  }
1550 }
1551 
1552 /* **************************************************************************** */
1564 /* FIXME This entire routine needs to be redone!! It does not work properly */
1565 void quark_avoid_war_dependencies( Quark *quark, Address_Set_Node *asn_old, Task *parent_task )
1566 {
1567  /* Quick return if this is not enabled */
1568  if ( !quark->war_dependencies_enable ) return;
1569 
1570  /* Figure out if there are enough input dependencies to make this worthwhile */
1571  int count_initial_input_deps = 0;
1572  bool output_dep_reached = FALSE;
1573  int quark_num_queued_tasks;
1574  quark_atomic_get( quark_num_queued_tasks, quark->num_queued_tasks, &quark->num_queued_tasks_mutex );
1575  double avg_queued_tasks_per_thread = (double)quark_num_queued_tasks/(double)quark->num_threads;
1576  double avg_tasks_per_thread = (double)quark->num_tasks/(double)quark->num_threads;
1577  int min_input_deps;
1578  icl_list_t *dep_node_old;
1579 
1580  /* This stuff is still under development.... */
1581  if ( avg_queued_tasks_per_thread < 0.4 ) min_input_deps = 1;
1582  else if ( avg_queued_tasks_per_thread < 0.75 ) min_input_deps = 6;
1583  else if ( avg_queued_tasks_per_thread < 0.90 ) min_input_deps = 7;
1584  else if ( avg_queued_tasks_per_thread < 1.20 ) min_input_deps = 10;
1585  else if ( avg_queued_tasks_per_thread > 1.80 ) min_input_deps = 2000;
1586  else if ( avg_tasks_per_thread < (double)quark->low_water_mark/(double)quark->num_threads/2 ) min_input_deps = 2000;
1587  else min_input_deps = (int)(7 + 27 * avg_queued_tasks_per_thread);
1588 
1589  /* Override computed value using environment variable */
1590  min_input_deps = quark_getenv_int( "QUARK_AVOID_WAR_WHEN_NUM_WAITING_READS", min_input_deps );
1591 
1592  /* Scan thru initial deps, make sure they are inputs and that there
1593  * are enough of them to make data copying worthwhile */
1594  for (dep_node_old=icl_list_first(asn_old->waiting_deps);
1595  dep_node_old!=NULL;
1596  dep_node_old=icl_list_next(asn_old->waiting_deps, dep_node_old)) {
1597  Dependency *dep = (Dependency *)dep_node_old->data;
1598  Task *task = dep->task;
1599  if ( dep->direction==INPUT && task->status==NOTREADY ) {
1600  count_initial_input_deps++;
1601  } else if ( (dep->direction==OUTPUT || dep->direction==INOUT) && task->status!=DONE ) {
1602  output_dep_reached = TRUE;
1603  break;
1604  }
1605  }
1606 
1607  /* if ( count_initial_input_deps>=quark->min_input_deps_to_avoid_war_dependencies && output_dep_reached ) { */
1608  if ( count_initial_input_deps>=min_input_deps && output_dep_reached ) {
1609  icl_list_t *dep_node_asn_old;
1610  Address_Set_Node *asn_new;
1611  /* Allocate and copy data */
1612  void *datacopy = quark_malloc( asn_old->size );
1613  /* Still need to track the allocated memory in datacopies TODO */
1614  /* quark->mem_allocated_to_war_dependency_data += asn_old->size; */
1615  memcpy( datacopy, asn_old->address, asn_old->size );
1616  /* Create address set node, attach to hash, and set it to clean up when done */
1617  asn_new = quark_address_set_node_new( datacopy, asn_old->size );
1619 
1620  /* Update task dependences to point to this new data */
1621  /* Grab input deps from the old list, copy to new list, delete, then repeat */
1622  for ( dep_node_asn_old=icl_list_first(asn_old->waiting_deps);
1623  dep_node_asn_old!=NULL; ) {
1624  icl_list_t *dep_node_asn_old_to_be_deleted = NULL;
1625  Dependency *dep = (Dependency *)dep_node_asn_old->data;
1626  Task *task = dep->task;
1627  if ( dep->direction==INPUT && task->status==NOTREADY ) {
1628  dep_node_asn_old_to_be_deleted = dep_node_asn_old;
1629  icl_list_t *dep_node_new = icl_list_append( asn_new->waiting_deps, dep );
1630  /* In the args list, set the arg pointer to the new datacopy address */
1631  *(void **)dep->task_args_list_node_ptr->data = datacopy;
1632  dep->address = asn_new->address;
1633  dep->address_set_node_ptr = asn_new;
1634  dep->address_set_waiting_deps_node_ptr = dep_node_new;
1635  if (dep->ready == FALSE) { /* dep->ready will always be FALSE */
1636  dep->ready = TRUE;
1637  dot_dag_print_edge( quark, parent_task->taskid, parent_task->tasklevel, task->taskid, task->tasklevel, DEPCOLOR );
1638  pthread_mutex_lock_task( &task->task_mutex );
1640  quark_check_and_queue_ready_task( quark, task, -1 );
1641  pthread_mutex_unlock_task( &task->task_mutex );
1642  }
1643  } else if ( (dep->direction==OUTPUT || dep->direction==INOUT) && task->status!=DONE ) {
1644  /* Once we return from this routine, this dep dependency will be processed */
1645  break;
1646  }
1647  dep_node_asn_old = icl_list_next(asn_old->waiting_deps, dep_node_asn_old);
1648  if (dep_node_asn_old_to_be_deleted!=NULL) {
1649  icl_list_delete(asn_old->waiting_deps, dep_node_asn_old_to_be_deleted, NULL);
1650  }
1651  }
1652  /* Insert the constructed asn_new into the address_set */
1653  pthread_mutex_lock_wrap( &quark->address_set_mutex );
1654  icl_hash_insert( quark->address_set, asn_new->address, asn_new );
1655  pthread_mutex_unlock_wrap( &quark->address_set_mutex );
1656  }
1657 }
1658 
1659 /* **************************************************************************** */
1665 static void quark_address_set_node_initial_gatherv_check_and_launch(Quark *quark, Address_Set_Node *address_set_node, Dependency *completed_dep, int worker_rank)
1666 {
1667  icl_list_t *next_dep_node;
1668  Task *completed_task = completed_dep->task;
1669  for ( next_dep_node=icl_list_first(address_set_node->waiting_deps);
1670  next_dep_node!=NULL && next_dep_node->data != NULL;
1671  next_dep_node=icl_list_next(address_set_node->waiting_deps, next_dep_node) ) {
1672  Dependency *next_dep = (Dependency *)next_dep_node->data;
1673  /* Break when we run out of GATHERV output dependencies */
1674  if ( next_dep->gatherv==FALSE ) break;
1675  if ( next_dep->direction!=OUTPUT && next_dep->direction!=INOUT ) break;
1676  if ( next_dep->data_region != completed_dep->data_region ) break;
1677  Task *next_task = next_dep->task;
1678  /* Update next_dep ready status */
1679  if ( next_dep->ready == FALSE ) {
1680  /* Record the locality information with the task data structure */
1681  // if ( next_dep->locality ) next_task->locality_preserving_dep = worker_rank;
1682  /* Mark the next dependency as ready since we have GATHERV flag */
1683  next_dep->ready = TRUE;
1684  dot_dag_print_edge( quark, completed_task->taskid, completed_task->tasklevel, next_task->taskid, next_task->tasklevel, DEPCOLOR_GATHERV );
1685  pthread_mutex_lock_task( &next_task->task_mutex );
1686  next_task->num_dependencies_remaining--;
1687  /* If the dep status became true check related task, and put onto ready queues */
1688  quark_check_and_queue_ready_task( quark, next_task, worker_rank );
1689  pthread_mutex_unlock_task( &next_task->task_mutex );
1690  }
1691 
1692  }
1693 }
1694 
1695 /* **************************************************************************** */
1704 static void quark_address_set_node_accumulator_find_prepend(Quark *quark, Address_Set_Node *address_set_node)
1705 {
1706  icl_list_t *dep_node = NULL;
1707  Dependency *first_dep = NULL;
1708  icl_list_t *first_ready_dep_node = NULL;
1709  icl_list_t *last_ready_dep_node = NULL;
1710  icl_list_t *swap_node = NULL;
1711  int acc_dep_count = 0;
1712 
1713  /* FOR each ACCUMULATOR task waiting at the beginning of address_set_node */
1714  for (dep_node = icl_list_first(address_set_node->waiting_deps);
1715  dep_node != NULL;
1716  dep_node = icl_list_next( address_set_node->waiting_deps, dep_node )) {
1717  Dependency *dependency = (Dependency *)dep_node->data;
1718  /* IF not an ACCUMULATOR dependency - break */
1719  if (dependency->accumulator == FALSE) break;
1720  Task *task = dependency->task;
1721  /* Scan through list keeping first, first_ready, last_ready, last */
1722  if ( first_dep==NULL ) first_dep = (Dependency *)dep_node->data;
1723  if ( task->num_dependencies_remaining==1 ) {
1724  if (first_ready_dep_node==NULL) first_ready_dep_node = dep_node;
1725  last_ready_dep_node = dep_node;
1726  }
1727  /* If the data_region changes, break */
1728  if ( dependency->data_region != first_dep->data_region ) break;
1729  acc_dep_count++;
1730  }
1731 
1732  /* Choose and move chosen ready node to the front of the list */
1733  /* Heuristic: Flip-flop between first-ready and last-ready.
1734  * Tested (always first, always last, flip-flop first/last) but
1735  * there was always a bad scenario. If perfect loop orders are
1736  * provided (e.g. Choleky inversion test) then this will not make
1737  * performance worse. If bad loops are provided, this will
1738  * improve performance, though not to the point of perfect
1739  * loops. */
1740  if (acc_dep_count % 2 == 0 ) {
1741  if ( last_ready_dep_node!=NULL ) swap_node = last_ready_dep_node;
1742  } else {
1743  if ( first_ready_dep_node != NULL ) swap_node = first_ready_dep_node;
1744  }
1745  if ( swap_node != NULL ) {
1746  Dependency *dependency = (Dependency *)swap_node->data;
1747  /* Move to front of the address_set_node waiting_deps list (if not already there) */
1748  if ( swap_node!=icl_list_first(address_set_node->waiting_deps) ) {
1749  icl_list_t *tmp_swap_node = icl_list_prepend( address_set_node->waiting_deps, dependency );
1750  dependency->address_set_waiting_deps_node_ptr = tmp_swap_node;
1751  icl_list_delete( address_set_node->waiting_deps, swap_node, NULL );
1752  }
1753  /* Lock the dependency in place by setting ACC to false now */
1754  dependency->accumulator = FALSE;
1755  }
1756 }
1757 
1758 
1759 /* **************************************************************************** */
1762 #if 0
1763 static void quark_address_set_node_delete( Quark *quark, Address_Set_Node *address_set_node )
1764 {
1765  return;
1766  /* FIXME; Currently does not free as soon as possible */
1767  if ( quark->dot_dag_enable == 0 ) {
1768  if ( icl_list_first( address_set_node->waiting_deps )==NULL ) {
1769  pthread_mutex_lock_address_set( &quark->address_set_mutex );
1770  icl_hash_delete( quark->address_set, address_set_node->address, NULL, NULL );
1771  /* Free data if it was allocted as a WAR data copy */
1772  if ( address_set_node->delete_data_at_address_when_node_is_deleted == TRUE )
1773  free( address_set_node->address );
1774  icl_list_destroy( address_set_node->waiting_deps, free );
1775  pthread_mutex_unlock_wrap( &address_set_node->asn_mutex );
1776  pthread_mutex_destroy( &address_set_node->asn_mutex );
1777  free( address_set_node );
1778  pthread_mutex_unlock_address_set( &quark->address_set_mutex );
1779  } else {
1780  pthread_mutex_unlock_wrap( &address_set_node->asn_mutex );
1781  }
1782  }
1783 }
1784 #endif
1785 
1786 /* **************************************************************************** */
1791 static void quark_insert_task_dependencies(Quark * quark, Task * task)
1792 {
1793  icl_list_t *task_dep_p = NULL; /* task dependency list pointer */
1794 
1795  /* For each task dependency list pointer */
1796  for (task_dep_p = icl_list_first(task->dependency_list);
1797  task_dep_p != NULL;
1798  task_dep_p = icl_list_next(task->dependency_list, task_dep_p)) {
1799  Dependency *dep = (Dependency *) task_dep_p->data;
1800  /* Lookup address in address_set hash, add it if it does not exist */
1801  pthread_mutex_lock_address_set( &quark->address_set_mutex );
1802  Address_Set_Node *address_set_node = (Address_Set_Node *)icl_hash_find( quark->address_set, dep->address );
1803  /* If not found, create a new address set node and add it to the hash */
1804  if ( address_set_node == NULL ) {
1805  address_set_node = quark_address_set_node_new( dep->address, dep->size );
1806  icl_hash_insert( quark->address_set, address_set_node->address, address_set_node );
1807  }
1808  /* Convenience shortcut pointer so that we don't have to hash again */
1809  dep->address_set_node_ptr = address_set_node;
1810  pthread_mutex_unlock_address_set( &quark->address_set_mutex );
1811 
1812  /* Lock the address_set_node and manipulate it */
1813  if ( pthread_mutex_lock_wrap( &address_set_node->asn_mutex ) == 0 ) {
1814 
1815  /* Add the dependency to the list of waiting dependencies on this address set node */
1816  icl_list_t *curr_dep_node = icl_list_append( address_set_node->waiting_deps, dep );
1817  /* Convenience shortcut pointer so we don't have to scan the waiting dependencies */
1818  dep->address_set_waiting_deps_node_ptr = curr_dep_node;
1819  /* Handle the case that the a single task makes multiple dependencies on the same data address */
1820  /* e.g. func( A11:IN, A11:INOUT, A11:OUT, A11:IN, A22:OUT ) */
1821  icl_list_t *prev_dep_node = icl_list_prev( address_set_node->waiting_deps, curr_dep_node);
1822  if ( prev_dep_node != NULL ) {
1823  Dependency *prev_dep = (Dependency *)prev_dep_node->data;
1824  Task *prev_task = prev_dep->task;
1825  if ( prev_task->taskid == task->taskid ) {
1826  pthread_mutex_lock_task( &task->task_mutex );
1827  DBGPRINTF( "task t%lld [label=\"%s %lld\" multiple dependencies on address %p];\n", task->taskid, task->task_label, task->taskid, dep->address );
1828  /* The curr dependency will updated using the ordering INPUT < OUTPUT < INOUT */
1829  /* When the scheduler checks the front of the dependency list, it will find the correct dep setting */
1830  dep->direction = (dep->direction > prev_dep->direction ? dep->direction : prev_dep->direction );
1831  dep->data_region = (dep->data_region | prev_dep->data_region );
1832  if ( prev_dep->ready == FALSE ) {
1833  prev_dep->ready = TRUE;
1835  }
1836  /* Remove the redundent dependency from waiting deps and from the task */
1837  icl_list_delete( address_set_node->waiting_deps, prev_dep_node, NULL );
1839  /* Update the prev_dep_node ptr since it has changed */
1840  prev_dep_node = icl_list_prev( address_set_node->waiting_deps, curr_dep_node);
1841  pthread_mutex_unlock_task( &task->task_mutex );
1842  }
1843  }
1844 
1845  /* This will avoid WAR dependencies if possible: if enabled, and
1846  * the current dependency is a write, and there were only reads
1847  * earlier (input>1, output+inout=1) */
1848  if ( dep->direction==OUTPUT || dep->direction==INOUT ) {
1849  quark_avoid_war_dependencies( quark, address_set_node, task );
1850  }
1851 
1852  /* The following code decides whether the dep is ready or not */
1853  if ( dep->direction==INOUT || dep->direction==OUTPUT ) {
1854  /* If output, and previous dep exists, then ready=false */
1855  if ( prev_dep_node != NULL ) {
1856  dep->ready = FALSE;
1857  } else {
1858  dep->ready = TRUE;
1859  dot_dag_print_edge( quark, address_set_node->last_writer_taskid, address_set_node->last_writer_tasklevel, task->taskid, task->tasklevel, DEPCOLOR_W_FIRST );
1861  }
1862  } else if ( dep->direction == INPUT ) {
1863  if ( prev_dep_node != NULL ) {
1864  /* If input, and previous dep is a read that is ready, then ready=true */
1865  Dependency *prev_dep = (Dependency *)prev_dep_node->data;
1866  if ( prev_dep->direction==INPUT && prev_dep->ready==TRUE ) {
1867  dep->ready = TRUE;
1868  dot_dag_print_edge( quark, address_set_node->last_writer_taskid, address_set_node->last_writer_tasklevel, task->taskid, task->tasklevel, DEPCOLOR_RAR );
1870  } else {
1871  dep->ready = FALSE;
1872  }
1873  } else {
1874  /* Input, but no previous node (is first), so ready */
1875  dep->ready = TRUE;
1876  dot_dag_print_edge( quark, address_set_node->last_writer_taskid, address_set_node->last_writer_tasklevel, task->taskid, task->tasklevel, DEPCOLOR_R_FIRST );
1878  }
1879  }
1880  pthread_mutex_unlock_wrap( &address_set_node->asn_mutex );
1881  }
1882  }
1883 }
1884 
1885 /* **************************************************************************** */
1898 void QUARK_Worker_Loop(Quark *quark, int thread_rank)
1899 {
1900  quark->worker[thread_rank]->thread_id = pthread_self();
1901  quark_work_main_loop( quark->worker[thread_rank] );
1902 }
1903 
1904 
1905 /* **************************************************************************** */
1914 static void quark_work_set_affinity_and_call_main_loop(Worker *worker)
1915 {
1916  Quark *quark = worker->quark_ptr;
1917  int thread_rank = QUARK_Thread_Rank(quark);
1918  quark_setaffinity( quark->coresbind[thread_rank] ) ;
1919  quark_work_main_loop( quark->worker[thread_rank] );
1920  return;
1921 }
1922 
1923 
1924 /* **************************************************************************** */
1925 static Task *quark_work_main_loop_check_for_task( Quark *quark, Worker *worker, int worker_rank )
1926 {
1927  Worker *worker_victim;
1928  task_priority_tree_node_t *task_priority_tree_node;
1929  Task *task = NULL;
1930  int ready_list_victim = worker_rank;
1931  int worker_finalize = FALSE;
1932  int completed_tasks_size;
1933  int quark_num_queued_tasks = 0;
1934 
1935  /* Loop while looking for tasks */
1936  quark_atomic_get( worker_finalize, worker->finalize, &worker->worker_mutex );
1937  /* worker_finalize = worker->finalize; */
1938  while ( task==NULL && !worker->finalize ) {
1939 
1940  quark_atomic_get( completed_tasks_size, quark->completed_tasks_size, &quark->completed_tasks_mutex );
1941  /* FIXME Tuning these statement is important to performance at small tile sizes */
1942  if ( worker_rank==0 ) quark_process_completed_tasks(quark);
1943  // else if ( worker->ready_list_size==0 ) quark_process_completed_tasks(quark);
1944  // else if ( completed_tasks_size>1 && worker_rank%5==1 ) quark_process_completed_tasks(quark);
1945  else if ( completed_tasks_size>1 ) quark_process_completed_tasks(quark);
1946  // else quark_process_completed_tasks(quark);
1947 
1948  worker_victim = quark->worker[ready_list_victim];
1949 
1950  /* DBGPRINTF("Wkr %d [ %d ] looking at queue %d [ %d ]\n", worker->rank, worker->ready_list_size, ready_list_victim, worker_victim->ready_list_size ); */
1951  if ( worker_rank==ready_list_victim ) {
1952  if ( pthread_mutex_lock_ready_list( &worker_victim->worker_mutex ) == 0 ) {
1953  task_priority_tree_node = RB_MIN( task_priority_tree_head_s, worker_victim->ready_list );
1954  if ( task_priority_tree_node != NULL ) {
1955  task = task_priority_tree_node->task;
1956  RB_REMOVE( task_priority_tree_head_s, worker_victim->ready_list, task_priority_tree_node );
1957  free( task_priority_tree_node );
1958  worker_victim->ready_list_size--;
1959  }
1960  pthread_mutex_unlock_ready_list( &worker_victim->worker_mutex );
1961  }
1962  } else if ( worker_rank!=ready_list_victim ) {
1963  if ( pthread_mutex_trylock_ready_list( &worker_victim->worker_mutex ) == 0) {
1964  if ( worker_victim->executing_task==TRUE && worker_victim->ready_list_size>0 ) { /* victim has at least so many tasks */
1965  /* DBGPRINTF("Wkr %d [ %d ] Got lock for queue %d [ %d ]\n", worker->rank, worker->ready_list_size, ready_list_victim, worker_victim->ready_list_size ); */
1966  task_priority_tree_node = RB_MAX( task_priority_tree_head_s, worker_victim->ready_list );
1967  if ( task_priority_tree_node != NULL ) {
1968  Task *task_to_steal = task_priority_tree_node->task;
1969  if ( task_to_steal->lock_to_thread == -1 /* task not locked, so steal OK */
1970  && ( task_to_steal->task_thread_count == 1 ) /* We don't steal // task */
1971  /* && !( ( worker_rank == 0 ) && /\* worker 0 is allowed to steal // task *\/ */
1972  /* ( task_priority_tree_node->task->task_thread_count > 1 ) && /\* only if there is just enough thread *\/ */
1973  /* ( quark->num_threads > task_priority_tree_node->task->task_thread_count ) ) ) */
1974  && ( task_to_steal->lock_to_thread_mask==NULL /* task is not locked to mask, so steal OK */
1975  || QUARK_Bit_Get(task_to_steal->lock_to_thread_mask,worker_rank)==1 )) /* OR task is locked to mask, but steal by worker_rank is OK */
1976  {
1977  task = task_to_steal;
1978  DBGPRINTF("Wkr %d [ %d ] Stealing tid %lld %p [ %d %d ] from thread %d [ %d ]\n", worker->rank, worker->ready_list_size, task->taskid, task->function, task->priority, task->task_thread_count, ready_list_victim, worker_victim->ready_list_size );
1979  RB_REMOVE( task_priority_tree_head_s, worker_victim->ready_list, task_priority_tree_node );
1980  free( task_priority_tree_node );
1981  worker_victim->ready_list_size--;
1982  quark_trace_deltask2worker(quark->worker[ready_list_victim]->thread_id);
1983  quark_trace_addtask2worker(quark->worker[worker_rank]->thread_id);
1984  }
1985  }
1986  }
1987  pthread_mutex_unlock_ready_list( &worker_victim->worker_mutex );
1988  }
1989  }
1990  /* If no task found */
1991  if ( task == NULL ) {
1992  /* If there are no tasks, wait for a task to be introduced, then check own queue first */
1993  /* If this worker is allowed to do work stealing, then move and check the next victim queue */
1994  if ( worker->set_to_manual_scheduling == FALSE )
1995  ready_list_victim = (ready_list_victim + 1) % quark->num_threads;
1996  /* Grab some high level counters */
1997  quark_atomic_get( quark_num_queued_tasks, quark->num_queued_tasks, &quark->num_queued_tasks_mutex );
1998  quark_atomic_get( worker_finalize, worker->finalize, &worker->worker_mutex );
1999  /* Break for master when a scan of all queues is finished and no tasks were found */
2000  if ( worker_rank==0 && ready_list_victim==0 ) return NULL;
2001  /* Break for master if there is no work */
2002  if ( worker_rank==0 && quark_num_queued_tasks==0 ) return NULL;
2003  /* Wait for work */
2004  if ( quark_num_queued_tasks==0 && worker_rank!=0 ) {
2005  pthread_mutex_lock_wrap( &quark->num_queued_tasks_mutex );
2006  quark_num_queued_tasks = quark->num_queued_tasks;
2007  worker_finalize = worker->finalize;
2008  DBGPRINTF("Wkr %d [ %d ] Goes to sleep\n", worker->rank, worker->ready_list_size );
2009  while ( quark_num_queued_tasks==0 && !worker_finalize ) {
2010  worker->status = WORKER_SLEEPING;
2011  pthread_cond_wait_wrap( &quark->worker[worker_rank]->worker_must_awake_cond, &quark->num_queued_tasks_mutex );
2012  quark_num_queued_tasks = quark->num_queued_tasks;
2013  worker_finalize = worker->finalize;
2014  }
2015  worker->status = WORKER_NOT_SLEEPING;
2016  DBGPRINTF("Wkr %d [ %d ] Wakes up\n", worker->rank, worker->ready_list_size );
2017  pthread_mutex_unlock_wrap( &quark->num_queued_tasks_mutex );
2018  DBGPRINTF("Wkr %d [ %d ] Unlock quark->num_queued_tasks_mutex\n", worker->rank, worker->ready_list_size );
2019  }
2020  }
2021  quark_atomic_get( worker_finalize, worker->finalize, &worker->worker_mutex );
2022  }
2023  DBGPRINTF("Wkr %d [ %d ] found a task and is returning with it or got a finalize\n", worker->rank, worker->ready_list_size );
2024  return task;
2025 }
2026 
2027 
2028 /* **************************************************************************** */
2033 static long long quark_work_main_loop(Worker *worker)
2034 {
2035  Quark *quark = worker->quark_ptr;
2036  Task *task = NULL;
2037  long long num_tasks = -1;
2038  /*int worker_finalize = FALSE;*/
2039 
2040  /* Busy wait while not ready */
2041  do {} while ( !quark->start );
2042  int worker_rank = QUARK_Thread_Rank(quark);
2043 
2044  /* Queue all tasks before running; this can be enabled via environment */
2045  if ( quark->queue_before_computing && worker_rank==0 && !quark->all_tasks_queued ) return quark->num_tasks;
2046  while ( quark->queue_before_computing && worker_rank!=0 && !quark->all_tasks_queued ) { /* busy loop */ }
2047 
2048  /* Master never does work; this line for debugging use */
2049  /* if (worker_rank == 0) return; */
2050  /* DBGPRINTF("Wkr %d [ %d ] Starting main loop\n", worker->rank, worker->ready_list_size); */
2051  /* quark_atomic_get( worker_finalize, worker->finalize, &worker->worker_mutex ); */
2052  while ( worker->finalize == FALSE ) {
2053  /* Repeatedly try to find a task, first trying my own ready list,
2054  * then trying to steal from someone else */
2055  task = quark_work_main_loop_check_for_task( quark, worker, worker_rank );
2056 
2057  /* EXECUTE THE TASK IF FOUND */
2058  if ( task!=NULL ) {
2059  DBGPRINTF("Wkr %d [ %d ] Found a task\n", worker->rank, worker->ready_list_size);
2060  int sequence_status = 0;
2061  if ( task->sequence!=NULL ) {
2062  pthread_mutex_lock_wrap( &task->sequence->sequence_mutex );
2063  sequence_status = task->sequence->status;
2064  pthread_mutex_unlock_wrap( &task->sequence->sequence_mutex );
2065  }
2066  pthread_mutex_lock_task( &task->task_mutex );
2067  if ( (sequence_status==QUARK_ERR) || (task->function==NULL) ) { /* cancelled */
2068  DBGPRINTF("Wkr %d [ %d ] Task was cancelled %lld\n", worker->rank, worker->ready_list_size, task->taskid);
2069  task->status = CANCELLED;
2070  pthread_mutex_unlock_task( &task->task_mutex );
2071  } else { /* Call the task */
2072  quark_atomic_set( worker->executing_task, TRUE, &worker->worker_mutex );
2073  task->status = RUNNING;
2074  quark_scratch_allocate( task );
2075  pthread_mutex_unlock_task( &task->task_mutex );
2076  worker->current_task_ptr = task;
2077  quark_trace_deltask2worker(quark->worker[worker_rank]->thread_id);
2078 #ifdef DBGQUARK
2079  struct timeval tstart; gettimeofday( &tstart, NULL );
2080 #endif /* DBGQUARK */
2081  /* THIS IS THE ACTUAL CALL TO EXECUTE THE FUNCTION */
2082  task->function( quark );
2083 #ifdef DBGQUARK
2084  struct timeval tend; gettimeofday( &tend, NULL );
2085  struct timeval tresult; timersub( &tend, &tstart, &tresult );
2086  DBGPRINTF("Wkr %d [ %d ] Did tid %lld %p [ %d %d ] %f\n", worker->rank, worker->ready_list_size, task->taskid, task->function, task->priority, task->task_thread_count, (double)tresult.tv_sec + (double)tresult.tv_usec/1000000.0 );
2087 #endif /* DBGQUARK */
2088  pthread_mutex_lock_task( &task->task_mutex );
2089  quark_scratch_deallocate( task );
2090  task->executed_on_threadid = worker_rank;
2091  task->status = DONE;
2092  pthread_mutex_unlock_task( &task->task_mutex );
2093  quark_atomic_set( worker->executing_task, FALSE, &worker->worker_mutex );
2094  }
2095  /* Put the task into a queue for later processing */
2096  quark_worker_remove_completed_task_enqueue_for_later_processing(quark, task, worker_rank);
2097  }
2098  /* Break if master */
2099  if ( worker_rank==0 ) break;
2100  /* quark_atomic_get( worker_finalize, worker->finalize, &worker->worker_mutex ); */
2101  }
2102  /* DBGPRINTF("Wkr %d [ %d ] Leaving main loop with finalize %d\n", worker->rank, worker->ready_list_size, worker->finalize); */
2103  /* Worker has exited loop; ready for next time this worker is activated */
2104  quark_atomic_set( worker->finalize, FALSE, &worker->worker_mutex );
2105  /* Get the num_tasks in the system and return it */
2106  quark_atomic_get( num_tasks, quark->num_tasks, &quark->task_set_mutex );
2107  DBGPRINTF("Wkr %d [ %d ] Exiting main work loop with num_tasks %lld\n", worker->rank, worker->ready_list_size, num_tasks );
2108  return num_tasks;
2109 }
2110 
2111 
2112 /* **************************************************************************** */
2125 {
2126  Quark_Sequence *sequence = quark_malloc(sizeof(Quark_Sequence));
2127  DBGPRINTF("Wkr %d [ %d ] In seq create\n", QUARK_Thread_Rank(quark), quark->worker[0]->ready_list_size );
2128  sequence->status = QUARK_SUCCESS;
2129  pthread_mutex_init( &sequence->sequence_mutex, NULL );
2130  ll_list_head_t *head = quark_malloc(sizeof(ll_list_head_t));
2131  LIST_INIT(head);
2132  sequence->tasks_in_sequence = head;
2133  return sequence;
2134 }
2135 
2136 /* **************************************************************************** */
2151 {
2152  int retval;
2153  if ( quark==NULL || sequence==NULL ) return QUARK_ERR;
2154  pthread_mutex_lock_wrap( &sequence->sequence_mutex );
2155  if ( sequence->status != QUARK_SUCCESS ) {
2156  /* sequence already cancelled */
2157  retval = QUARK_SUCCESS;
2158  } else {
2159  sequence->status = QUARK_ERR;
2160  ll_list_node_t *np;
2161  LIST_FOREACH( np, sequence->tasks_in_sequence, ll_entries ) {
2162  long long int taskid = np->val;
2163  /* Find taskid, make function NULL */
2164  QUARK_Cancel_Task( quark, taskid );
2165  /* Task node is removed from sequence when it finishes and is
2166  * deleted; or when sequence is destroyed */
2167  }
2168  retval = QUARK_SUCCESS;
2169  }
2170  pthread_mutex_unlock_wrap( &sequence->sequence_mutex );
2171  return retval;
2172 }
2173 
2174 /* **************************************************************************** */
2188 {
2189  DBGPRINTF("Wkr %d [ %d ] In seq destroy \n", QUARK_Thread_Rank(quark), quark->worker[0]->ready_list_size );
2190  if ( quark==NULL || sequence==NULL ) return NULL;
2191  if ( !LIST_EMPTY( sequence->tasks_in_sequence )) {
2192  if ( QUARK_Sequence_Cancel( quark, sequence ) != QUARK_SUCCESS ) return NULL;
2193  if ( QUARK_Sequence_Wait( quark, sequence ) != QUARK_SUCCESS ) return NULL;
2194  }
2195  /* Dont need to remove tasks in sequence, should have been removed by sequence_wait */
2196  free( sequence->tasks_in_sequence );
2197  sequence->tasks_in_sequence = NULL;
2198  pthread_mutex_destroy( &sequence->sequence_mutex );
2199  free( sequence );
2200  return NULL;
2201 }
2202 
2203 /* **************************************************************************** */
2216 int QUARK_Sequence_Wait( Quark *quark, Quark_Sequence *sequence )
2217 {
2218  if ( quark==NULL || sequence==NULL) return QUARK_ERR;
2219  int myrank = QUARK_Thread_Rank( quark );
2220  while ( !LIST_EMPTY( sequence->tasks_in_sequence ) ) {
2221  quark_process_completed_tasks( quark );
2222  quark_work_main_loop( quark->worker[myrank] );
2223  }
2224  return QUARK_SUCCESS;
2225 }
2226 
2227 /* **************************************************************************** */
2239 {
2240  Task *curr_task = quark->worker[QUARK_Thread_Rank(quark)]->current_task_ptr;
2241  return (Quark_Sequence *)curr_task->sequence;
2242 }
2243 
2244 /* **************************************************************************** */
2256 {
2257  Task *curr_task = quark->worker[QUARK_Thread_Rank(quark)]->current_task_ptr;
2258  return curr_task->priority;
2259 }
2260 
2261 /* **************************************************************************** */
2274 {
2275  Task *curr_task = quark->worker[QUARK_Thread_Rank(quark)]->current_task_ptr;
2276  return (char *)curr_task->task_label;
2277 }
2278 
2279 
2280 /* **************************************************************************** */
2285 static void quark_worker_remove_completed_task_enqueue_for_later_processing(Quark *quark, Task *task, int worker_rank)
2286 {
2287  int threads_remaining_for_this_task = -1;
2288  pthread_mutex_lock_task( &task->task_mutex );
2289  threads_remaining_for_this_task = --task->task_thread_count_outstanding;
2290  pthread_mutex_unlock_task( &task->task_mutex );
2291  if ( threads_remaining_for_this_task == 0 ) {
2292  completed_tasks_node_t *node = quark_malloc(sizeof(completed_tasks_node_t));
2293  node->task = task;
2294  node->workerid = worker_rank;
2295  pthread_mutex_lock_completed_tasks( &quark->completed_tasks_mutex );
2296  TAILQ_INSERT_TAIL( quark->completed_tasks, node, ctn_entries );
2297  quark->completed_tasks_size++;
2298  pthread_mutex_unlock_completed_tasks( &quark->completed_tasks_mutex );
2299  }
2300 }
2301 
2302 /* **************************************************************************** */
2306 static void quark_process_completed_tasks( Quark *quark )
2307 {
2308  int completed_tasks_size;
2309  Task *task;
2310  int workerid = -1;
2311  quark_atomic_get( completed_tasks_size, quark->completed_tasks_size, &quark->completed_tasks_mutex );
2312  if ( completed_tasks_size==0 ) return;
2313  do {
2314  task = NULL;
2315  if ( pthread_mutex_trylock_completed_tasks( &quark->completed_tasks_mutex ) == 0 ) {
2316  completed_tasks_node_t *completed_task_node = TAILQ_FIRST( quark->completed_tasks );
2317  if ( completed_task_node!= NULL ) {
2318  TAILQ_REMOVE( quark->completed_tasks, completed_task_node, ctn_entries );
2319  quark->completed_tasks_size--;
2320  task = completed_task_node->task;
2321  workerid = completed_task_node->workerid;
2322  free( completed_task_node );
2323  }
2324  pthread_mutex_unlock_completed_tasks( &quark->completed_tasks_mutex );
2325  }
2326  if ( task != NULL )
2327  quark_remove_completed_task_and_check_for_ready( quark, task, workerid );
2328  } while ( task!=NULL );
2329 }
2330 
2331 /* **************************************************************************** */
2339 static void quark_address_set_node_initial_check_and_launch( Quark *quark, Address_Set_Node *address_set_node, Dependency *completed_dep, int worker_rank )
2340 {
2341  int read_data_region = 0;
2342  int write_data_region = 0;
2343  icl_list_t *dep_node = NULL;
2344  int keep_processing_more_nodes = 1;
2345 
2346  for ( dep_node=icl_list_first( address_set_node->waiting_deps );
2347  dep_node!=NULL && keep_processing_more_nodes==1;
2348  dep_node=icl_list_next( address_set_node->waiting_deps, dep_node )) {
2349  Dependency *dep = (Dependency *)dep_node->data;
2350  Task *task = dep->task;
2351  /* NOTE Skip CANCELLED and DONE tasks */
2352  if ( task->status==CANCELLED || task->status==DONE ) continue;
2353  switch ( dep->direction ) {
2354  case INPUT:
2355  if ( (dep->data_region & write_data_region) == 0 ) {
2356  if ( dep->ready==FALSE ) {
2357  dep->ready = TRUE;
2358  pthread_mutex_lock_task( &task->task_mutex );
2360  quark_check_and_queue_ready_task( quark, task, worker_rank );
2361  pthread_mutex_unlock_task( &task->task_mutex );
2362  dot_dag_print_edge( quark, completed_dep->task->taskid, completed_dep->task->tasklevel, task->taskid, task->tasklevel, DEPCOLOR_RAW );
2363  }
2364  }
2365  read_data_region = read_data_region | dep->data_region;
2366  break;
2367  case OUTPUT:
2368  case INOUT:
2369  if ( ((dep->data_region & write_data_region)==0) && ((dep->data_region & read_data_region)==0) ) {
2370  if ( dep->ready==FALSE ) {
2371  dep->ready = TRUE;
2372  pthread_mutex_lock_task( &task->task_mutex );
2374  quark_check_and_queue_ready_task( quark, task, worker_rank );
2375  pthread_mutex_unlock_task( &task->task_mutex );
2376  if ( quark->dot_dag_enable ) {
2377  if ( completed_dep->direction==INPUT ) {
2378  dot_dag_print_edge( quark, completed_dep->task->taskid, completed_dep->task->tasklevel, task->taskid, task->tasklevel, DEPCOLOR_WAR );
2379  } else {
2380  dot_dag_print_edge( quark, completed_dep->task->taskid, completed_dep->task->tasklevel, task->taskid, task->tasklevel, DEPCOLOR_WAW );
2381  }
2382  }
2383  }
2384  } /* else keep_processing_more_nodes = 0; */
2385  write_data_region = write_data_region | dep->data_region;
2386  if ( write_data_region==QUARK_REGION_ALL )
2387  keep_processing_more_nodes = 0;
2388  break;
2389  case VALUE:
2390  case NODEP:
2391  case SCRATCH:
2392  default:
2393  DBGPRINTF("Unexpected dependency direction (not INPUT, OUTPUT, INOUT)\n");
2394  break;
2395  }
2396  }
2397 }
2398 /* **************************************************************************** */
2404 static void quark_remove_completed_task_and_check_for_ready(Quark *quark, Task *task, int worker_rank)
2405 {
2406  if ( quark->dot_dag_enable ) {
2407  pthread_mutex_lock_wrap( &quark->dot_dag_mutex );
2408  //if (task->tasklevel < 1) task->tasklevel=1;
2409  fprintf(dot_dag_file, "t%lld [fillcolor=\"%s\",label=\"%s\",style=filled]; // %lld %d %p %d %lld \n",
2410  task->taskid, task->task_color, task->task_label, task->taskid, task->priority, task->sequence, task->task_thread_count, task->tasklevel);
2411  /* Track the width of each task level */
2412  quark->tasklevel_width[task->tasklevel]++;
2413  /* fprintf(dot_dag_file, "// critical-path depth %ld \n", task->tasklevel ); */
2414  fprintf(dot_dag_file, "{rank=same;%lld;t%lld};\n", task->tasklevel, task->taskid );
2415  pthread_mutex_unlock_wrap( &quark->dot_dag_mutex );
2416  }
2417 
2418  /* For each dependency in the task that was completed */
2419  icl_list_t *dep_node;
2420  for (dep_node = icl_list_first(task->dependency_list);
2421  dep_node != NULL && dep_node->data!=NULL;
2422  dep_node = icl_list_next(task->dependency_list, dep_node)) {
2423  Dependency *dep = (Dependency *)dep_node->data;
2424  Address_Set_Node *address_set_node = dep->address_set_node_ptr;
2425 
2426  if ( pthread_mutex_lock_wrap( &address_set_node->asn_mutex )==0 ) {
2427  /* Mark the address/data as having been written by worker_rank */
2428  if ( dep->direction==OUTPUT || dep->direction==INOUT )
2429  address_set_node->last_thread = worker_rank;
2430  /* Update dag generation information */
2431  if ( quark->dot_dag_enable ) {
2432  if ( dep->direction==OUTPUT || dep->direction==INOUT ) {
2433  /* Track last writer and level, needed when this structure becomes empty */
2434  address_set_node->last_writer_taskid = task->taskid;
2435  address_set_node->last_writer_tasklevel = task->tasklevel;
2436  }
2437  address_set_node->last_reader_or_writer_taskid = task->taskid;
2438  address_set_node->last_reader_or_writer_tasklevel = task->tasklevel;
2439  }
2440  /* Check the address set node to avoid WAR dependencies */
2441  if ( (quark->war_dependencies_enable) &&
2442  (dep->direction==OUTPUT || dep->direction==INOUT) )
2443  quark_avoid_war_dependencies( quark, address_set_node, task );
2444  /* Remove competed dependencies from address_set_node waiting_deps list */
2445  icl_list_delete( address_set_node->waiting_deps, dep->address_set_waiting_deps_node_ptr, NULL );
2446  /* If dependencies are waiting ... */
2447  if ( icl_list_first(address_set_node->waiting_deps) != NULL ) {
2448  /* Handle any initial GATHERV dependencies */
2449  quark_address_set_node_initial_gatherv_check_and_launch(quark, address_set_node, dep, worker_rank);
2450  /* Prepend any initial accumulater dependency that is ready to go */
2451  quark_address_set_node_accumulator_find_prepend( quark, address_set_node );
2452  /* Initial input and or output */
2453  quark_address_set_node_initial_check_and_launch( quark, address_set_node, dep, worker_rank );
2454  pthread_mutex_unlock_wrap( &address_set_node->asn_mutex );
2455  } else { /* if ( icl_list_first(address_set_node->waiting_deps) == NULL ) { */
2456  pthread_mutex_unlock_wrap( &address_set_node->asn_mutex );
2457  /* FIXME the address set node is not actually deleted */
2458  // quark_address_set_node_delete( quark, address_set_node );
2459  }
2460  }
2461  }
2462  DBGPRINTF("Wkr %d [ %d ] deleting task %lld\n", worker_rank, quark->worker[worker_rank]->ready_list_size, task->taskid );
2463  task = quark_task_delete(quark, task);
2465 }
2466 
2467 /* **************************************************************************** */
2491 Quark_Task_Flags *QUARK_Task_Flag_Set( Quark_Task_Flags *task_flags, int flag, intptr_t val )
2492 {
2493  switch (flag) {
2494  case TASK_PRIORITY:
2495  task_flags->task_priority = (int)val;
2496  break;
2497  case TASK_LOCK_TO_THREAD:
2498  task_flags->task_lock_to_thread = (int)val;
2499  break;
2501  task_flags->task_lock_to_thread_mask = (unsigned char *)val;
2502  break;
2503  case TASK_LABEL:
2504  task_flags->task_label = (char *)val;
2505  break;
2506  case TASK_COLOR:
2507  task_flags->task_color = (char *)val;
2508  break;
2509  case TASK_SEQUENCE:
2510  task_flags->task_sequence = (Quark_Sequence *)val;
2511  break;
2512  case TASK_THREAD_COUNT:
2513  task_flags->task_thread_count = (int)val;
2514  break;
2516  task_flags->thread_set_to_manual_scheduling = (int)val;
2517  break;
2518  }
2519  return task_flags;
2520 }
2521 
2522 /* **************************************************************************** */
2541 intptr_t QUARK_Task_Flag_Get( Quark* quark, int flag )
2542 {
2543  Task *task = quark->worker[QUARK_Thread_Rank(quark)]->current_task_ptr;
2544  switch (flag) {
2545  case TASK_PRIORITY:
2546  return (intptr_t)task->priority;
2547  break;
2548  case TASK_LOCK_TO_THREAD:
2549  return (intptr_t)task->lock_to_thread;
2550  break;
2552  return (intptr_t)task->lock_to_thread_mask;
2553  break;
2554  case TASK_LABEL:
2555  return (intptr_t)task->task_label;
2556  break;
2557  case TASK_COLOR:
2558  return (intptr_t)task->task_color;
2559  break;
2560  case TASK_SEQUENCE:
2561  return (intptr_t)task->sequence;
2562  break;
2563  case TASK_THREAD_COUNT:
2564  return (intptr_t)task->task_thread_count;
2565  break;
2567  return (intptr_t)task->thread_set_to_manual_scheduling;
2568  break;
2569  default:
2570  return -9;
2571  break;
2572  }
2573 }
2574 
2575 /* **************************************************************************** */
2589 void QUARK_DOT_DAG_Enable( Quark *quark, int enable )
2590 {
2591  int i;
2592  if ( enable==1 ) {
2593  if ( !quark->dot_dag_was_setup ) {
2594  quark->high_water_mark = (int)(INT_MAX - 1);
2595  quark->low_water_mark = (int)(quark->high_water_mark);
2596  /* global FILE variable */
2597  if ( dot_dag_file == NULL ) fopen( &dot_dag_file, DOT_DAG_FILENAME, "w" );
2598  else fopen( &dot_dag_file, DOT_DAG_FILENAME, "a" );
2599  fprintf(dot_dag_file, "digraph G { size=\"10,7.5\"; center=1; orientation=portrait; \n");
2600  pthread_mutex_init( &quark->dot_dag_mutex, NULL );
2601  fprintf(dot_dag_file, "%d [style=\"invis\"]\n", 0);
2602  /* Reset tasklevel information */
2603  for (i=0; i<tasklevel_width_max_level; i++ )
2604  quark->tasklevel_width[i] = 0;
2605  /* Reset the address set nodes information */
2606  int tmpint;
2607  icl_entry_t* tmpent;
2608  void *kp, *dp;
2609  icl_hash_foreach(quark->address_set, tmpint, tmpent, kp, dp) {
2610  Address_Set_Node *address_set_node = (Address_Set_Node *)dp;
2611  address_set_node->last_writer_taskid = 0;
2612  address_set_node->last_writer_tasklevel = 0;
2613  address_set_node->last_reader_or_writer_taskid = 0;
2614  address_set_node->last_reader_or_writer_tasklevel = 0;
2615  }
2616  /* quark->dot_dag_was_setup is used to indicate that the
2617  * dot_dag_file needs to be finalized */
2618  quark->dot_dag_was_setup = 1;
2619  quark->dot_dag_enable = 1;
2620  }
2621  } else {
2622  if ( quark->dot_dag_was_setup ) {
2623  for (i=1; i<tasklevel_width_max_level && quark->tasklevel_width[i]!=0; i++ ) {
2624  fprintf(dot_dag_file, "%d [label=\"%d:%d\"]\n", i, i, quark->tasklevel_width[i] );
2625  fprintf(dot_dag_file, "%d->%d [style=\"invis\"];\n", i-1, i );
2626  }
2627  fprintf(dot_dag_file, "} // close graph\n");
2628  fprintf(dot_dag_file, "// ---------------------- \n");
2629  fprintf(dot_dag_file, "\n\n");
2630  fclose( dot_dag_file );
2632  quark->dot_dag_was_setup = 0;
2633  }
2634  quark->dot_dag_enable = 0;
2635  }
2636 }
2637 
2638 /* **************************************************************************** */