MAGMA  magma-1.4.0
Matrix Algebra on GPU and Multicore Architectures
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups
quark.c
Go to the documentation of this file.
1 /* **************************************************************************** */
33 /* ****************************************************************************
34 
35 Summary of environment flags:
36 
37 Change the window size (default should be checked in the code)
38 export QUARK_UNROLL_TASKS_PER_THREAD=num
39 
40 Enable WAR avoidance (false dependency handling) (default=0 off)
41 export QUARK_WAR_DEPENDENCIES_ENABLE=1
42 
43 Enable DAG generation (default=0 off)
44 export QUARK_DOT_DAG_ENABLE=1
45 
46 **************************************************************************** */
47 
48 #include <stdio.h>
49 #include <stdlib.h>
50 #include <assert.h>
51 #include <stdarg.h>
52 #include <string.h>
53 #include <limits.h>
54 #include <errno.h>
55 
56 #ifndef inline
57 #define inline __inline
58 #endif
59 
60 #if defined( _WIN32 ) || defined( _WIN64 )
61 # define fopen(ppfile, name, mode) fopen_s(ppfile, name, mode)
62 # define strdup _strdup
63 # include "quarkwinthread.h"
64 #else
65 # define fopen(ppfile, name, mode) *ppfile = fopen(name, mode)
66 # include <pthread.h>
67 #endif
68 
69 #ifdef TRUE
70 #undef TRUE
71 #endif
72 
73 #ifdef FALSE
74 #undef FALSE
75 #endif
76 
77 #include "icl_list.h"
78 #include "icl_hash.h"
79 #include "bsd_queue.h"
80 #include "bsd_tree.h"
81 #include "quark.h"
82 #include "quark_unpack_args.h"
83 
84 #ifndef ULLONG_MAX
85 # define ULLONG_MAX 18446744073709551615ULL
86 #endif
87 
88 /* External functions */
89 int quark_getenv_int(char* name, int defval);
90 
91 #define DIRECTION_MASK 0x07
93 typedef enum { DGETRF, DTSTRF, DGESSM, DSSSM } task_num;
94 typedef enum { FALSE, TRUE } bool;
95 
96 struct quark_s {
99  int num_threads; /* number of threads */
100  struct worker_s **worker; /* array of workers [num_threads] */
101  int *coresbind; /* array of indices where to bind workers [num_threads] */
102  volatile int list_robin; /* round-robin list insertion index */
103  volatile bool start; /* start flag */
104  volatile bool all_tasks_queued; /* flag */
105  volatile long long num_tasks; /* number of tasks queued */
108  icl_hash_t *address_set; /* hash table of addresses */
109  pthread_mutex_t address_set_mutex; /* hash table access mutex */
110  pthread_attr_t thread_attr; /* threads' attributes */
111  int (*rank)();
112  volatile int num_queued_tasks;
115 #define tasklevel_width_max_level 5000
121  struct completed_tasks_head_s *completed_tasks;
122 };
123 
125  volatile int status;
127  struct ll_list_head_s *tasks_in_sequence;
128 };
129 
130 typedef struct worker_s {
133  struct task_priority_tree_head_s *ready_list;
134  volatile int ready_list_size;
137  volatile bool finalize; /* termination flag */
138  volatile bool executing_task;
139 } Worker;
140 
141 typedef struct quark_task_s {
143  void (*function) (Quark *); /* task function pointer */
144  volatile task_status status; /* Status of task; NOTREADY, READY; QUEUED; DONE */
145  volatile int num_dependencies; /* number of dependencies */
146  volatile int num_dependencies_remaining; /* number of dependencies remaining to be fulfilled */
147  icl_list_t *args_list; /* list of arguments (copies of scalar values and pointers) */
148  icl_list_t *dependency_list; /* list of dependencies */
149  icl_list_t *scratch_list; /* List of scratch space information and their sizes */
150  volatile struct dependency_s *locality_preserving_dep; /* Try to run task on core that preserves the locality of this dependency */
151  unsigned long long taskid; /* An identifier, used only for generating DAGs */
152  unsigned long long tasklevel; /* An identifier, used only for generating DAGs */
154  char *task_label; /* Label for this task, used in dot_dag generation */
155  char *task_color; /* Color for this task, used in dot_dag generation */
156  int priority; /* Is this a high priority task */
158  struct ll_list_node_s *ptr_to_task_in_sequence; /* convenience pointer to this task in the sequence */
159  int task_thread_count; /* Num of threads required by task */
160 } Task;
161 
162 typedef struct dependency_s {
163  struct quark_task_s *task; /* pointer to parent task containing this dependency */
164  void *address; /* address of data */
165  int size; /* Size of dependency data */
166  quark_direction_t direction; /* direction of this dependency, INPUT, INOUT, OUTPUT */
167  bool locality; /* Priority of this dependency; more like data locality */
168  volatile bool accumulator; /* Tasks depending on this may be reordered, they accumulate results */
169  bool gatherv; /* Tasks depending on this may be run in parallel, assured by the programmer */
170  struct address_set_node_s *address_set_node_ptr; /* convenience pointer to address_set_node */
171  icl_list_t *address_set_waiting_deps_node_ptr; /* convenience pointer to address_set_node waiting_deps node */
172  icl_list_t *task_args_list_node_ptr; /* convenience ptr to the task->args_list [node] to use for WAR address updates */
173  icl_list_t *task_dependency_list_node_ptr; /* convenience ptr to the task->dependency_list [node] */
174  volatile bool ready; /* Data dependency is ready */
175 } Dependency;
176 
177 typedef struct scratch_s {
178  void *ptr; /* address of scratch space */
179  int size; /* Size of scratch data */
180  icl_list_t *task_args_list_node_ptr; /* convenience ptr to the task->args_list [node] */
181 } Scratch;
182 
183 typedef struct address_set_node_s {
184  void *address; /* copy of key to the address_set - pointer to the data */
185  int size; /* data object size */
186  volatile int last_thread; /* last thread to use this data - for scheduling/locality */
187  icl_list_t *waiting_deps; /* list of dependencies waiting for this data */
188  volatile int num_waiting_input; /* count of input dependencies for this data */
189  volatile int num_waiting_output; /* count of output dependencies for this data */
190  volatile int num_waiting_inout; /* count of inout dependencies for this data */
191  volatile bool delete_data_at_address_when_node_is_deleted; /* used when data is copied in order to handle false dependencies */
192  unsigned long long last_writer_taskid; /* used for generating DOT DAGs */
193  unsigned long long last_writer_tasklevel; /* used for tracking critical depth */
194  unsigned long long last_reader_or_writer_taskid; /* used for generating DOT DAGs */
195  unsigned long long last_reader_or_writer_tasklevel; /* used for tracking critical depth */
197 
198 /* Data structure for a list containing long long int values. Used to
199  * track task ids in sequences of tasks, so that the tasks in a
200  * sequence can be controlled */
201 typedef struct ll_list_node_s {
202  long long int val;
203  LIST_ENTRY( ll_list_node_s ) entries;
205 LIST_HEAD(ll_list_head_s, ll_list_node_s);
206 typedef struct ll_list_head_s ll_list_head_t;
207 
208 typedef struct completed_tasks_node_s {
210  int workerid;
211  TAILQ_ENTRY( completed_tasks_node_s ) entries;
213 TAILQ_HEAD( completed_tasks_head_s, completed_tasks_node_s );
214 typedef struct completed_tasks_head_s completed_tasks_head_t;
215 
216 /* Tree (red-black) structure for keeping a priority list of
217  * executable tasks */
219  int priority;
221  RB_ENTRY( task_priority_tree_node_s ) n_entry;
223 RB_HEAD( task_priority_tree_head_s, task_priority_tree_node_s );
224 typedef struct task_priority_tree_head_s task_priority_tree_head_t;
226 {
227  return n2->priority - n1->priority;
228 }
229 /* Generate red-black tree functions */
230 RB_GENERATE( task_priority_tree_head_s, task_priority_tree_node_s, n_entry, compare_task_priority_tree_nodes );
231 
232 
233 /* **************************************************************************** */
238 static Task *quark_task_new();
239 static void task_delete( Quark *quark, Task *task);
240 static Worker *worker_new(Quark *quark, int rank);
241 static void worker_delete(Worker *worker);
242 static inline int quark_revolve_robin(Quark * quark);
243 static void quark_insert_task_dependencies(Quark * quark, Task * task);
244 static void quark_check_and_queue_ready_task( Quark *quark, Task *task );
245 static void work_set_affinity_and_call_main_loop(Worker *worker);
246 static void work_main_loop(Worker *worker);
247 static Scratch *scratch_new( void *arg_ptr, int arg_size, icl_list_t *task_args_list_node_ptr);
248 static void scratch_allocate( Task *task );
249 static void scratch_deallocate( Task *task );
250 static void address_set_node_delete( Quark *quark, Address_Set_Node *address_set_node );
251 
252 /* static void worker_remove_completed_task_and_check_for_ready(Quark *quark, Task *task, int exe_thread_idx); */
253 static void worker_remove_completed_task_enqueue_for_later_processing(Quark *quark, Task *task, int worker_rank);
254 static void remove_completed_task_and_check_for_ready(Quark *quark, Task *task, int worker_rank);
255 static void process_completed_tasks(Quark *quark);
256 
257 int quark_setaffinity(int rank);
258 void quark_topology_init();
261 int *quark_get_affthreads();
262 int quark_yield();
263 
264 /* **************************************************************************** */
269 inline static int pthread_mutex_lock_asn(pthread_mutex_t *mtx) { return pthread_mutex_lock( mtx ); }
270 inline static int pthread_mutex_trylock_asn(pthread_mutex_t *mtx) { return pthread_mutex_trylock( mtx ); }
271 inline static int pthread_mutex_unlock_asn(pthread_mutex_t *mtx) { return pthread_mutex_unlock( mtx ); }
272 
273 inline static int pthread_mutex_lock_ready_list(pthread_mutex_t *mtx) { return pthread_mutex_lock( mtx ); }
275 inline static int pthread_mutex_unlock_ready_list(pthread_mutex_t *mtx) { return pthread_mutex_unlock( mtx ); }
276 
277 inline static int pthread_mutex_lock_wrap(pthread_mutex_t *mtx) { return pthread_mutex_lock( mtx ); }
278 inline static int pthread_mutex_unlock_wrap(pthread_mutex_t *mtx) { return pthread_mutex_unlock( mtx ); }
279 
283 
284 inline static int pthread_cond_wait_ready_list( pthread_cond_t *cond, pthread_mutex_t *mtx ) { return pthread_cond_wait( cond, mtx ); }
285 
286 /* **************************************************************************** */
287 
288 /* If dags are to be generated, setup file name and pointer and
289  * various macros. This assumes that the fprintf function is thread
290  * safe. */
291 static char *quark_task_default_label = " ";
292 static char *quark_task_default_color = "white";
293 #define DEPCOLOR "black"
294 #define ANTIDEPCOLOR "red"
295 #define GATHERVDEPCOLOR "green"
296 #define DOT_DAG_FILENAME "dot_dag_file.dot"
298 #define dot_dag_level_update( parent_level, child_level, quark ) \
299  if ( quark->dot_dag_enable ) { \
300  pthread_mutex_lock_wrap( &quark->dot_dag_mutex ); \
301  child_level = (parent_level+1 < child_level ? child_level : parent_level+1 ); \
302  pthread_mutex_unlock_wrap( &quark->dot_dag_mutex ); }
303 #define dot_dag_print_edge( parentid, childid, color) \
304  if ( quark->dot_dag_enable && parentid!=0 ) { \
305  pthread_mutex_lock_wrap( &quark->dot_dag_mutex ); \
306  fprintf(dot_dag_file, "t%lld->t%lld [color=\"%s\"];\n", parentid, childid, color); \
307  pthread_mutex_unlock_wrap( &quark->dot_dag_mutex ); \
308  }
309 
310 /* **************************************************************************** */
315 {
316  static unsigned long long taskid = 1;
317  Task *task = (Task *)malloc(sizeof(Task));
318  assert(task != NULL);
319  task->function = NULL;
320  task->num_dependencies = 0;
321  task->num_dependencies_remaining = 0;
322  task->args_list = icl_list_new();
323  assert(task->args_list != NULL);
324  task->dependency_list = icl_list_new();
325  assert(task->dependency_list != NULL);
326  task->locality_preserving_dep = NULL;
327  task->status = NOTREADY;
328  task->scratch_list = icl_list_new();
329  assert( task->scratch_list != NULL);
330  assert( taskid < ULLONG_MAX );
331  task->taskid = taskid++;
332  task->tasklevel = 0;
333  pthread_mutex_init(&task->task_mutex, NULL);
334  task->ptr_to_task_in_sequence = NULL;
335  task->sequence = NULL;
339  task->lock_to_thread = -1;
340  task->task_thread_count = 1;
341  return task;
342 }
343 
344 /* **************************************************************************** */
348 static void task_delete(Quark *quark, Task *task)
349 {
351  icl_hash_delete( quark->task_set, &task->taskid, NULL, NULL );
354  icl_list_destroy(task->args_list, free);
355  icl_list_destroy(task->dependency_list, free);
356  icl_list_destroy(task->scratch_list, free);
357  if ( task->ptr_to_task_in_sequence != NULL ) {
359  LIST_REMOVE( task->ptr_to_task_in_sequence, entries );
361  free( task->ptr_to_task_in_sequence );
362  }
363  if (task->task_color!=NULL && task->task_color!=quark_task_default_color) free(task->task_color);
364  if (task->task_label!=NULL && task->task_label!=quark_task_default_label) free(task->task_label);
367  free( task );
368  // TODO pthread_mutex_lock_asn( &quark->address_set_mutex );
369  quark->num_tasks--;
370  // TODO pthread_mutex_unlock_asn( &quark->address_set_mutex );
371 }
372 
373 /* **************************************************************************** */
378 {
379  pthread_t self_id = pthread_self();
380  int i;
381  for (i=0; i<quark->num_threads; i++)
382  if (pthread_equal(quark->worker[i]->thread_id, self_id))
383  return i;
384  return -1;
385 }
386 
387 /* **************************************************************************** */
398 void *QUARK_Args_List(Quark *quark)
399 {
400  Task *curr_task = quark->worker[QUARK_Thread_Rank(quark)]->current_task_ptr;
401  assert( curr_task != NULL );
402  return (void *)curr_task->args_list;
403 }
404 
405 /* **************************************************************************** */
420 void *QUARK_Args_Pop( void *args_list, void **last_arg)
421 {
422  icl_list_t *args = (icl_list_t *)args_list;
423  icl_list_t *node = (icl_list_t *)*last_arg;
424  void *arg = NULL;
425  if ( node == NULL ) {
426  node = icl_list_first( args );
427  if (node!=NULL) arg = node->data;
428  } else {
429  node = icl_list_next( args, node );
430  if (node!=NULL) arg = node->data;
431  }
432  *last_arg = node;
433  return arg;
434 }
435 
436 /* **************************************************************************** */
440 static inline unsigned int fnv_hash_function( void *key, int len )
441 {
442  unsigned char *p = key;
443  unsigned int h = 2166136261u;
444  int i;
445  for ( i = 0; i < len; i++ )
446  h = ( h * 16777619 ) ^ p[i];
447  return h;
448 }
449 
450 /* **************************************************************************** */
456 static inline unsigned int address_hash_function(void *address)
457 {
458  int len = sizeof(void *);
459  unsigned int hashval = fnv_hash_function( &address, len );
460  return hashval;
461 }
462 
463 /* **************************************************************************** */
466 static inline int address_key_compare(void *addr1, void *addr2)
467 {
468  return (addr1 == addr2);
469 }
470 
471 /* **************************************************************************** */
475 static inline unsigned int ullong_hash_function( void *key )
476 {
477  int len = sizeof(unsigned long long);
478  unsigned int hashval = fnv_hash_function( key, len );
479  return hashval;
480 }
481 /* **************************************************************************** */
485 static inline int ullong_key_compare( void *key1, void *key2 )
486 {
487  return ( *(unsigned long long*)key1 == *(unsigned long long*)key2 );
488 }
489 
490 /* **************************************************************************** */
495 static inline int quark_revolve_robin(Quark * quark)
496 {
497  quark->list_robin++;
498  if (quark->list_robin == quark->num_threads)
499  quark->list_robin = 0;
500  if (quark->list_robin==0 && quark->num_threads>1)
501  quark->list_robin = 1;
502  return quark->list_robin;
503 }
504 
505 /* **************************************************************************** */
509 static inline char *arg_dup(char *arg, int size)
510 {
511  char *argbuf = (char *) malloc(size);
512  assert( argbuf != NULL );
513  memcpy(argbuf, arg, size);
514  return argbuf;
515 }
516 
517 /* **************************************************************************** */
521 static inline Dependency *dependency_new(void *addr, long long size, quark_direction_t dir, bool loc, Task *task, bool accumulator, bool gatherv, icl_list_t *task_args_list_node_ptr)
522 {
523  Dependency *dep = (Dependency *) malloc(sizeof(Dependency));
524  assert(dep != NULL);
525  dep->task = task;
526  dep->address = addr;
527  dep->size = size;
528  dep->direction = dir;
529  dep->locality = loc;
530  dep->accumulator = accumulator;
531  dep->gatherv = gatherv;
532  dep->address_set_node_ptr = NULL; /* convenience ptr, filled later */
533  dep->address_set_waiting_deps_node_ptr = NULL; /* convenience ptr, filled later */
534  dep->task_args_list_node_ptr = task_args_list_node_ptr; /* convenience ptr for WAR address updating */
535  dep->task_dependency_list_node_ptr = NULL; /* convenience ptr */
536  dep->ready = FALSE;
537  /* For the task, track the dependency to be use to do locality
538  * preservation; by default, use first output dependency. */
539  if ( dep->locality )
540  task->locality_preserving_dep = dep;
541  else if ( (task->locality_preserving_dep == NULL) && ( dep->direction==OUTPUT || dep->direction==INOUT) )
542  task->locality_preserving_dep = dep;
543  return dep;
544 }
545 
546 /* **************************************************************************** */
550 static Worker *worker_new(Quark *quark, int rank)
551 {
552  Worker *worker = (Worker *) malloc(sizeof(Worker));
553  assert(worker != NULL);
554  worker->thread_id = pthread_self();
555  worker->ready_list = malloc(sizeof(task_priority_tree_head_t));
556  assert(worker->ready_list != NULL);
557  RB_INIT( worker->ready_list );
558  worker->ready_list_size = 0;
559  pthread_mutex_init(&worker->ready_list_mutex, NULL);
560  /* convenience pointer to the real args for the task */
561  worker->current_task_ptr = NULL;
562  worker->quark_ptr = quark;
563  worker->finalize = FALSE;
564  worker->executing_task = FALSE;
565  return worker;
566 }
567 
568 /* **************************************************************************** */
572 static void worker_delete(Worker * worker)
573 {
574  task_priority_tree_node_t *node, *nxt;
575  /* Destroy the workers priority queue, if there is still anything there */
576  for ( node = RB_MIN( task_priority_tree_head_s, worker->ready_list ); node != NULL; node = nxt) {
577  nxt = RB_NEXT( task_priority_tree_head_s, worker->ready_list, node );
578  RB_REMOVE( task_priority_tree_head_s, worker->ready_list, node );
579  free(node);
580  }
581  free( worker->ready_list );
583  free(worker);
584 }
585 
586 /* **************************************************************************** */
591 static Scratch *scratch_new( void *arg_ptr, int arg_size, icl_list_t *task_args_list_node_ptr )
592 {
593  Scratch *scratch = (Scratch *)malloc(sizeof(Scratch));
594  assert(scratch != NULL);
595  scratch->ptr = arg_ptr;
596  scratch->size = arg_size;
597  scratch->task_args_list_node_ptr = task_args_list_node_ptr;
598  return(scratch);
599 }
600 
601 /* **************************************************************************** */
605 static void scratch_allocate( Task *task )
606 {
607  icl_list_t *scr_node;
608  for (scr_node = icl_list_first( task->scratch_list );
609  scr_node != NULL && scr_node->data != NULL;
610  scr_node = icl_list_next(task->scratch_list, scr_node)) {
611  Scratch *scratch = (Scratch *)scr_node->data;
612  if ( scratch->ptr == NULL ) {
613  /* Since ptr is null, space is to be allocted and attached */
614  assert( scratch->size > 0 );
615  void *scratchspace = malloc( scratch->size );
616  assert( scratchspace != NULL );
617  *(void **)scratch->task_args_list_node_ptr->data = scratchspace;
618  }
619  }
620 }
621 
622 /* **************************************************************************** */
626 static void scratch_deallocate( Task *task )
627 {
628  icl_list_t *scr_node;
629  for (scr_node = icl_list_first( task->scratch_list );
630  scr_node != NULL && scr_node->data!=NULL;
631  scr_node = icl_list_next(task->scratch_list, scr_node)) {
632  Scratch *scratch = (Scratch *)scr_node->data;
633  if ( scratch->ptr == NULL ) {
634  /* If scratch had to be allocated, free it */
635  free(*(void **)scratch->task_args_list_node_ptr->data);
636  }
637  }
638 }
639 
640 /* **************************************************************************** */
653 Quark *QUARK_Setup(int num_threads)
654 {
655  int i = 0;
656  Quark *quark = (Quark *) malloc(sizeof(Quark));
657  assert(quark != NULL);
658  /* Used to tell master when to act as worker */
659  int quark_unroll_tasks_per_thread = quark_getenv_int("QUARK_UNROLL_TASKS_PER_THREAD", 20);
660  int quark_unroll_tasks = quark_getenv_int("QUARK_UNROLL_TASKS", quark_unroll_tasks_per_thread * num_threads);
661  quark->war_dependencies_enable = quark_getenv_int("QUARK_WAR_DEPENDENCIES_ENABLE", 0);
662  quark->queue_before_computing = quark_getenv_int("QUARK_QUEUE_BEFORE_COMPUTING", 0);
663  quark->dot_dag_enable = quark_getenv_int("QUARK_DOT_DAG_ENABLE", 0);
664  if ( quark->dot_dag_enable ) quark->queue_before_computing = 1;
665  if ( quark->queue_before_computing==1 || quark_unroll_tasks==0 ) {
666  quark->high_water_mark = (int)(INT_MAX - 1);
667  quark->low_water_mark = (int)(quark->high_water_mark);
668  } else {
669  quark->low_water_mark = (int)(quark_unroll_tasks);
670  quark->high_water_mark = (int)(quark->low_water_mark + quark->low_water_mark*0.25);
671  }
672  quark->num_queued_tasks = 0;
673  pthread_cond_init( &quark->num_queued_tasks_cond, NULL );
674  quark->num_threads = num_threads;
675  quark->list_robin = 0;
676  quark->start = FALSE;
677  quark->all_tasks_queued = FALSE;
678  quark->num_tasks = 0;
680  pthread_mutex_init( &quark->task_set_mutex, NULL );
681  /* Define some function pointers that match a C++ interface */
682  quark->rank = QUARK_Thread_Rank;
683  /* Create hash table to hold addresses */
685  pthread_mutex_init(&quark->address_set_mutex, NULL);
686  /* To handle completed tasks */
687  quark->completed_tasks = malloc(sizeof(completed_tasks_head_t));
688  assert ( quark->completed_tasks != NULL );
689  TAILQ_INIT( quark->completed_tasks );
691  /* Setup workers */
692  quark->worker = (Worker **) malloc(num_threads * sizeof(Worker *));
693  assert(quark->worker != NULL);
694  /* The structure for the 0th worker will be used by the master */
695  quark->worker[0] = worker_new(quark, 0);
696  quark->worker[0]->thread_id = pthread_self();
697  if ( quark->dot_dag_enable ) {
698  fopen(&dot_dag_file, DOT_DAG_FILENAME, "w"); /* global FILE variable */
699  fprintf(dot_dag_file, "digraph G { size=\"10,7.5\"; center=1; orientation=portrait; \n");
700  for (i=0; i<tasklevel_width_max_level; i++ )
701  quark->tasklevel_width[i] = 0;
702  pthread_mutex_init(&quark->dot_dag_mutex, NULL);
703  /* fprintf(dot_dag_file, "%d [label=\"%d %d\",style=\"invis\"]\n", 0, 0, quark->tasklevel_width[i] ); */
704  fprintf(dot_dag_file, "%d [style=\"invis\"]\n", 0);
705  }
706  /* Launch workers; first create the structures */
707  for(i = 1; i < num_threads; i++)
708  quark->worker[i] = worker_new(quark, i);
709  /* Threads can start as soon as they want */
710  quark->start = TRUE;
711  return quark;
712 }
713 
714 /* **************************************************************************** */
728 Quark *QUARK_New(int num_threads)
729 {
730  int i, nthrd;
731 
732  /* Init number of cores and topology */
734  /* Get number of threads */
735  if ( num_threads < 1 ) {
736  nthrd = quark_get_numthreads();
737  if ( nthrd == -1 ) {
738  nthrd = 1;
739  }
740  }
741  else
742  nthrd = num_threads;
743 
744  /* Create scheduler data structures for master and workers */
745  Quark *quark = QUARK_Setup(nthrd);
746  /* Get binding informations */
747  quark->coresbind = quark_get_affthreads();
748  /* Setup thread attributes */
750  /* pthread_setconcurrency(quark->num_threads); */
752  /* Then start the threads, so that workers can scan the structures easily */
753  for(i = 1; i < nthrd; i++) {
754  int rc = pthread_create(&quark->worker[i]->thread_id, &quark->thread_attr, (void *(*)(void *))work_set_affinity_and_call_main_loop, quark->worker[i]);
755  assert(rc == 0);
756  }
757  quark_setaffinity( quark->coresbind[0] );
758  return quark;
759 }
760 
761 /* **************************************************************************** */
771 void QUARK_Barrier(Quark * quark)
772 {
773  quark->all_tasks_queued = TRUE;
774  while ( quark->num_tasks > 0 ) {
776  work_main_loop( quark->worker[0] );
777  }
778 }
779 
780 /* **************************************************************************** */
790 void QUARK_Waitall(Quark * quark)
791 {
792  int i;
793  QUARK_Barrier( quark );
794  /* Tell each worker to exit the work_loop; master handles himself */
795  for (i=1; i<quark->num_threads; i++)
796  quark->worker[i]->finalize = TRUE;
797 }
798 
799 /* **************************************************************************** */
809 void QUARK_Free(Quark * quark)
810 {
811  int i;
812  QUARK_Waitall(quark);
813  /* Write the level matching/forcing information */
814  if ( quark->dot_dag_enable ) {
815  for (i=1; i<tasklevel_width_max_level && quark->tasklevel_width[i]!=0; i++ ) {
816  fprintf(dot_dag_file, "%d [label=\"%d:%d\"]\n", i, i, quark->tasklevel_width[i] );
817  fprintf(dot_dag_file, "%d->%d [style=\"invis\"];\n", i-1, i );
818  }
819  fprintf(dot_dag_file, "} \n");
820  }
821  /* Destroy hash tables, workers and other data structures */
822  for (i = 1; i < quark->num_threads; i++)
823  worker_delete( quark->worker[i] );
824  worker_delete( quark->worker[0] );
825  if (quark->worker) free(quark->worker);
826  if (quark->completed_tasks) free(quark->completed_tasks);
827  icl_hash_destroy(quark->address_set, NULL, NULL);
828  icl_hash_destroy(quark->task_set, NULL, NULL);
829  if ( quark->dot_dag_enable ) {
831  fclose(dot_dag_file);
832  }
834  free(quark);
835 }
836 
837 /* **************************************************************************** */
846 void QUARK_Delete(Quark * quark)
847 {
848  void *exitcodep = NULL;
849  int i;
850 
851  QUARK_Waitall( quark );
852  /* Wait for workers to quit and join threads */
853  for (i = 1; i < quark->num_threads; i++)
854  pthread_join(quark->worker[i]->thread_id, &exitcodep);
856  /* Destroy specific structures */
857  if (quark->coresbind) free(quark->coresbind);
859  /* Destroy hash tables, workers and other data structures */
860  QUARK_Free( quark );
861 }
862 
863 /* **************************************************************************** */
869 {
870  if ( task_flags ) {
871  if ( task_flags->task_priority ) task->priority = task_flags->task_priority;
872  if ( task_flags->task_lock_to_thread >= 0 ) task->lock_to_thread = task_flags->task_lock_to_thread;
873  if ( task_flags->task_color && quark->dot_dag_enable ) task->task_color = strdup(task_flags->task_color);
874  if ( task_flags->task_label && quark->dot_dag_enable ) task->task_label = strdup(task_flags->task_label);
875  if ( task_flags->task_sequence ) task->sequence = task_flags->task_sequence;
876  if ( task_flags->task_thread_count > 1 ) task->task_thread_count = task_flags->task_thread_count;
877  }
878  return task;
879 }
880 
881 /* **************************************************************************** */
895 Task *QUARK_Task_Init(Quark * quark, void (*function) (Quark *), Quark_Task_Flags *task_flags )
896 {
897  Task *task = quark_task_new();
898  task->function = function;
899  quark_set_task_flags_in_task_structure( quark, task, task_flags );
900  return task;
901 }
902 
903 /* **************************************************************************** */
925 void QUARK_Task_Pack_Arg( Quark *quark, Task *task, int arg_size, void *arg_ptr, int arg_flags )
926 {
927  icl_list_t *task_args_list_node_ptr=NULL;
928  // extract information from the flags
929  bool arg_locality = (bool) ((arg_flags & LOCALITY) != 0 );
930  bool accumulator = (bool) ((arg_flags & ACCUMULATOR) != 0 );
931  bool gatherv = (bool) ((arg_flags & GATHERV) != 0 );
932  bool task_priority = (bool) ((arg_flags & TASK_PRIORITY) != 0 );
933  bool task_lock_to_thread = (bool) ((arg_flags & TASK_LOCK_TO_THREAD) != 0 );
934  bool task_thread_count = (bool) ((arg_flags & TASK_THREAD_COUNT) != 0 );
935  bool task_color = (bool) ((arg_flags & TASK_COLOR) != 0 );
936  bool task_label = (bool) ((arg_flags & TASK_LABEL) != 0 );
937  bool task_sequence = (bool) ((arg_flags & TASK_SEQUENCE) != 0 );
938  quark_direction_t arg_direction = (quark_direction_t) (arg_flags & DIRECTION_MASK);
939  if (arg_direction == VALUE) {
940  /* If argument is a value; Copy the contents to the argument buffer */
941  if ( task_priority ) task->priority = *((int *)arg_ptr);
942  else if ( task_lock_to_thread ) task->lock_to_thread = *((int *)arg_ptr);
943  else if ( task_thread_count ) task->task_thread_count = *((int *)arg_ptr);
944  else if ( task_sequence ) task->sequence = *((Quark_Sequence **)arg_ptr);
945  else if ( task_color && quark->dot_dag_enable ) {
946  if ( task->task_color && task->task_color!=quark_task_default_color) free(task->task_color);
947  task->task_color = arg_dup(arg_ptr, arg_size);
948  }
949  else if ( task_label && quark->dot_dag_enable ) {
950  if ( task->task_label && task->task_label!=quark_task_default_label) free(task->task_label);
951  task->task_label = arg_dup(arg_ptr, arg_size) ;
952  }
953  else task_args_list_node_ptr = icl_list_append(task->args_list, arg_dup(arg_ptr, arg_size));
954  } else {
955  /* Else - argument is a pointer; Copy the pointer to the argument buffer - pass by reference */
956  task_args_list_node_ptr = icl_list_append(task->args_list, arg_dup((char *) &arg_ptr, sizeof(char *)));
957  }
958  if ((arg_ptr != NULL) && ( arg_direction==INPUT || arg_direction==INOUT || arg_direction==OUTPUT )) {
959  /* If argument is a dependency/slice, add dependency to task dependency list */
960  Dependency *dep = dependency_new(arg_ptr, arg_size, arg_direction, arg_locality, task, accumulator, gatherv, task_args_list_node_ptr);
961  icl_list_t *task_dependency_list_node_ptr = icl_list_append( task->dependency_list, dep );
962  dep->task_dependency_list_node_ptr = task_dependency_list_node_ptr;
963  task->num_dependencies++;
965  }
966  else if( arg_direction==SCRATCH ) {
967  Scratch *scratch = scratch_new( arg_ptr, arg_size, task_args_list_node_ptr);
968  icl_list_append( task->scratch_list, scratch );
969  }
970 }
971 
972 /* **************************************************************************** */
986 unsigned long long QUARK_Insert_Task_Packed(Quark * quark, Task *task )
987 {
988  unsigned long long taskid = task->taskid;
989  /* Track sequence information if it is provided */
990  if ( task->sequence ) {
991 /* if ( task->sequence->status == QUARK_ERR ) { */
992 /* task_delete( quark, task ); */
993 /* return QUARK_ERR; */
994 /* } else { */
995 /* ll_list_node_t *entry = malloc(sizeof(ll_list_node_t)); */
996 /* entry->val = task->taskid; */
997 /* ll_list_head_t *headp = task->sequence->tasks_in_sequence; */
998 /* pthread_mutex_lock_wrap( &task->sequence->sequence_mutex ); */
999 /* LIST_INSERT_HEAD( headp, entry, entries ); */
1000 /* pthread_mutex_unlock_wrap( &task->sequence->sequence_mutex ); */
1001 /* /\* Keep pointer to task in sequence so it can be deleted when task completes *\/ */
1002 /* task->ptr_to_task_in_sequence = entry; */
1003 /* printf("sequence %p task %ld addto \n", task->sequence, task->taskid ); */
1004 /* } */
1005  /* TODO FIXME */
1006  if ( task->sequence->status == QUARK_ERR )
1007  task->function = NULL;
1008  ll_list_node_t *entry = malloc(sizeof(ll_list_node_t));
1009  entry->val = task->taskid;
1010  ll_list_head_t *headp = task->sequence->tasks_in_sequence;
1012  LIST_INSERT_HEAD( headp, entry, entries );
1014  /* Keep pointer to task in sequence so it can be deleted when task completes */
1015  task->ptr_to_task_in_sequence = entry;
1016  //printf("sequence %p task %ld addto \n", task->sequence, task->taskid );
1017 
1018  }
1019  /* Insert the task in the address hash, locking access to the address set hash */
1021  /* For repeated usage of the scheduler, if tasks are being added repeatedly
1022  * then quark->finalize and quark->all_tasks_queued must be set false */
1023  quark->all_tasks_queued = FALSE;
1024  quark_insert_task_dependencies( quark, task );
1025  /* FIXME does this need to be protected */
1026  quark->num_tasks++;
1027  /* Save the task, indexed by its taskid */
1029  icl_hash_insert( quark->task_set, &task->taskid, task );
1031  // Check if the task is ready
1032  quark_check_and_queue_ready_task( quark, task );
1034  /* If conditions are right, master works; this will return when
1035  * num_tasks becomes less than low_water_mark */
1036  process_completed_tasks(quark);
1037  while (quark->num_tasks >= quark->high_water_mark) {
1038  work_main_loop(quark->worker[0]);
1039  process_completed_tasks(quark);
1040  }
1041  return taskid ;
1042 }
1043 
1044 /* **************************************************************************** */
1073 unsigned long long QUARK_Insert_Task(Quark * quark, void (*function) (Quark *), Quark_Task_Flags *task_flags, ...)
1074 {
1075  va_list varg_list;
1076  int arg_size;
1077  unsigned long long taskid;
1078 
1079  Task *task = QUARK_Task_Init(quark, function, task_flags);
1080 
1081  va_start(varg_list, task_flags);
1082  // For each argument
1083  while( (arg_size = va_arg(varg_list, int)) != 0) {
1084  void *arg_ptr = va_arg(varg_list, void *);
1085  int arg_flags = va_arg(varg_list, int);
1086  QUARK_Task_Pack_Arg( quark, task, arg_size, arg_ptr, arg_flags );
1087  }
1088  va_end(varg_list);
1089 
1090  taskid = QUARK_Insert_Task_Packed( quark, task );
1091 
1092  return taskid ;
1093 }
1094 
1095 /* **************************************************************************** */
1122 unsigned long long QUARK_Execute_Task(Quark * quark, void (*function) (Quark *), Quark_Task_Flags *task_flags, ...)
1123 {
1124  va_list varg_list;
1125  int arg_size;
1126 
1127  Task *task = QUARK_Task_Init(quark, function, task_flags);
1128 
1129  va_start(varg_list, task_flags);
1130  // For each argument
1131  while( (arg_size = va_arg(varg_list, int)) != 0) {
1132  void *arg_ptr = va_arg(varg_list, void *);
1133  int arg_flags = va_arg(varg_list, int);
1134  QUARK_Task_Pack_Arg( quark, task, arg_size, arg_ptr, arg_flags );
1135  }
1136  va_end(varg_list);
1137 
1138  int thread_rank = QUARK_Thread_Rank(quark);
1139  Worker *worker = quark->worker[thread_rank];
1140  if ( task->function == NULL ) {
1141  /* This can occur if the task is cancelled */
1142  task->status = CANCELLED;
1143  } else {
1144  /* Call the task */
1145  task->status = RUNNING;
1146  worker->current_task_ptr = task;
1147  scratch_allocate( task );
1148  task->function( quark );
1149  scratch_deallocate( task );
1150  worker->current_task_ptr = NULL;
1151  task->status = DONE;
1152  }
1153 
1154  /* Delete the task data structures */
1155  icl_list_destroy(task->args_list, free);
1156  icl_list_destroy(task->dependency_list, free);
1157  icl_list_destroy(task->scratch_list, free);
1159  free(task);
1160 
1161  /* There is no real taskid to be returned, since the task has been deleted */
1162  return( -1 );
1163 }
1164 
1165 /* **************************************************************************** */
1182 int QUARK_Cancel_Task(Quark *quark, unsigned long long taskid)
1183 {
1185  Task *task = icl_hash_find( quark->task_set, &taskid );
1186  if ( task == NULL ) {
1188  return -1;
1189  }
1192  if ( task->status==RUNNING || task->status==DONE || task->status==CANCELLED ) {
1194  return -2;
1195  }
1196  task->function = NULL;
1198  return 1;
1199 }
1200 
1201 /* **************************************************************************** */
1206 static Address_Set_Node *address_set_node_new( void* address, int size )
1207 {
1208  Address_Set_Node *address_set_node = (Address_Set_Node *)malloc(sizeof(Address_Set_Node));
1209  assert( address_set_node != NULL );
1210  address_set_node->address = address;
1211  address_set_node->size = size;
1212  address_set_node->last_thread = -1;
1213  address_set_node->waiting_deps = icl_list_new();
1214  assert( address_set_node->waiting_deps != NULL );
1215  address_set_node->num_waiting_input = 0;
1216  address_set_node->num_waiting_output = 0;
1217  address_set_node->num_waiting_inout = 0;
1219  address_set_node->last_writer_taskid = 0;
1220  address_set_node->last_writer_tasklevel = 0;
1221  address_set_node->last_reader_or_writer_taskid = 0;
1222  address_set_node->last_reader_or_writer_tasklevel = 0;
1223  return address_set_node;
1224 }
1225 
1226 /* **************************************************************************** */
1230 static void address_set_node_delete( Quark *quark, Address_Set_Node *address_set_node )
1231 {
1232  /* Free data if it was allocted as a WAR data copy */
1233  if ( address_set_node->delete_data_at_address_when_node_is_deleted == TRUE ) {
1234  free( address_set_node->address );
1235  }
1236  /* Do not free this structure if we are generating DAGs. The
1237  * structure contains information about the last task to write the
1238  * data used to make DAG edges */
1239  if ( quark->dot_dag_enable )
1240  return;
1241 
1242  /* Remove any data structures in the waiting_deps list */
1243  if ( address_set_node->waiting_deps != NULL )
1244  icl_list_destroy( address_set_node->waiting_deps, NULL );
1245  /* Delete and free the hash table entry if this was NOT a WAR create entry */
1246  icl_hash_delete( quark->address_set, address_set_node->address, NULL, NULL );
1247  /* Remove the data structure */
1248  free( address_set_node );
1249 }
1250 
1251 /* **************************************************************************** */
1258 static void quark_check_and_queue_ready_task( Quark *quark, Task *task )
1259 {
1260  int worker_thread_id = -1;
1261  Worker *worker = NULL;
1262  int assigned_thread_count = 0;
1263 
1264  if ( task->num_dependencies_remaining > 0 || task->status == QUEUED || task->status == RUNNING || task->status == DONE) return;
1265  task->status = QUEUED;
1266  /* Assign task to thread. Locked tasks get sent to appropriate
1267  * thread. Locality tasks should have be correctly placed. Tasks
1268  * without either should have the original round robin thread
1269  * assignment */
1270  if ( task->lock_to_thread >= 0) {
1271  worker_thread_id = task->lock_to_thread % quark->num_threads;
1272  } else if ( task->locality_preserving_dep != NULL ) {
1273  int last_thread = task->locality_preserving_dep->address_set_node_ptr->last_thread;
1274  if ( last_thread >= 0 ) worker_thread_id = last_thread;
1275  }
1276  if ( worker_thread_id < 0 ) worker_thread_id = quark_revolve_robin(quark);
1277 
1278  /* Handle tasks that need multiple threads */
1279  while ( assigned_thread_count < task->task_thread_count) {
1280 
1281  worker = quark->worker[worker_thread_id];
1282  /* Create a new entry for the ready list */
1283  task_priority_tree_node_t *new_task_tree_node = malloc(sizeof(task_priority_tree_node_t));
1284  assert( new_task_tree_node != NULL );
1285  new_task_tree_node->priority = task->priority;
1286  new_task_tree_node->task = task;
1287  /* Insert new entry into the ready list */
1289  RB_INSERT( task_priority_tree_head_s, worker->ready_list, new_task_tree_node );
1290  worker->ready_list_size++;
1293  quark->num_queued_tasks++;
1294 
1295  assigned_thread_count++;
1296  /* TODO Abort when too many threads requested */
1297  if ( assigned_thread_count < task->task_thread_count )
1298  worker_thread_id = (worker_thread_id+1) % quark->num_threads;
1299  }
1300 }
1301 
1302 /* **************************************************************************** */
1314 void quark_avoid_war_dependencies( Quark *quark, Address_Set_Node *asn_old, Task *parent_task )
1315 {
1316  /* Figure out if there are enough input dependencies to make this worthwhile */
1317  int count_initial_input_deps = 0;
1318  bool output_dep_reached = FALSE;
1319  double avg_queued_tasks_per_thread = (double)quark->num_queued_tasks/(double)quark->num_threads;
1320  double avg_tasks_per_thread = (double)quark->num_tasks/(double)quark->num_threads;
1321  int min_input_deps;
1322  icl_list_t *dep_node_old;
1323 
1324  /* Quick return if this is not enabled */
1325  if ( !quark->war_dependencies_enable ) return;
1326 
1327  /* TODO This stuff is still under development.... */
1328  if ( avg_queued_tasks_per_thread < 0.4 ) min_input_deps = 1;
1329  else if ( avg_queued_tasks_per_thread < 0.75 ) min_input_deps = 6;
1330  else if ( avg_queued_tasks_per_thread < 0.90 ) min_input_deps = 7;
1331  else if ( avg_queued_tasks_per_thread < 1.20 ) min_input_deps = 10;
1332  else if ( avg_queued_tasks_per_thread > 1.80 ) min_input_deps = 2000;
1333  else if ( avg_tasks_per_thread < (double)quark->low_water_mark/(double)quark->num_threads/2 ) min_input_deps = 2000;
1334  else min_input_deps = (int)(7 + 27 * avg_queued_tasks_per_thread);
1335 
1336  /* Override computed value using environment variable */
1337  min_input_deps = quark_getenv_int( "QUARK_AVOID_WAR_WHEN_NUM_WAITING_READS", min_input_deps );
1338 
1339  /* Shortcut return if there are not enough input tasks */
1340  if ( asn_old->num_waiting_input < min_input_deps ) return;
1341 
1342  /* Scan thru initial deps, make sure they are inputs and that there
1343  * are enough of them to make data copying worthwhile */
1344  for (dep_node_old=icl_list_first(asn_old->waiting_deps);
1345  dep_node_old!=NULL;
1346  dep_node_old=icl_list_next(asn_old->waiting_deps, dep_node_old)) {
1347  Dependency *dep = (Dependency *)dep_node_old->data;
1348  Task *task = dep->task;
1349  if ( dep->direction==INPUT && task->status==NOTREADY ) {
1350  count_initial_input_deps++;
1351  } else if ( (dep->direction==OUTPUT || dep->direction==INOUT) && task->status!=DONE ) {
1352  output_dep_reached = TRUE;
1353  break;
1354  }
1355  }
1356 
1357  /* if ( count_initial_input_deps>=quark->min_input_deps_to_avoid_war_dependencies && output_dep_reached ) { */
1358  if ( count_initial_input_deps>=min_input_deps && output_dep_reached ) {
1359  icl_list_t *dep_node_asn_old;
1360  Address_Set_Node *asn_new;
1361  /* Allocate and copy data */
1362  void *datacopy = malloc( asn_old->size );
1363  assert(datacopy!=NULL);
1364  /* TODO track the allocated memory in datacopies */
1365  /* quark->mem_allocated_to_war_dependency_data += asn_old->size; */
1366  memcpy( datacopy, asn_old->address, asn_old->size );
1367  /* Create address set node, attach to hash, and set it to clean up when done */
1368  asn_new = address_set_node_new( datacopy, asn_old->size );
1370  icl_hash_insert( quark->address_set, asn_new->address, asn_new );
1371  /* Update task dependences to point to this new data */
1372  /* Grab input deps from the old list, copy to new list, delete, then repeat */
1373  for ( dep_node_asn_old=icl_list_first(asn_old->waiting_deps);
1374  dep_node_asn_old!=NULL; ) {
1375  icl_list_t *dep_node_asn_old_to_be_deleted = NULL;
1376  Dependency *dep = (Dependency *)dep_node_asn_old->data;
1377  Task *task = dep->task;
1378  if ( dep->direction==INPUT && task->status==NOTREADY ) {
1379  dep_node_asn_old_to_be_deleted = dep_node_asn_old;
1380  icl_list_t *dep_node_new = icl_list_append( asn_new->waiting_deps, dep );
1381  asn_new->num_waiting_input++;
1382  /* In the args list, set the arg pointer to the new datacopy address */
1383  *(void **)dep->task_args_list_node_ptr->data = datacopy;
1384  dep->address = asn_new->address;
1385  dep->address_set_node_ptr = asn_new;
1386  dep->address_set_waiting_deps_node_ptr = dep_node_new;
1387  if (dep->ready == FALSE) { /* dep->ready will always be FALSE */
1388  dep->ready = TRUE;
1389  dot_dag_print_edge( parent_task->taskid, task->taskid, DEPCOLOR );
1390  dot_dag_level_update( parent_task->tasklevel, task->tasklevel, quark );
1392  quark_check_and_queue_ready_task( quark, task );
1393  }
1394  } else if ( (dep->direction==OUTPUT || dep->direction==INOUT) && task->status!=DONE ) {
1395  /* Once we return from this routine, this dep dependency will be processed */
1396  break;
1397  }
1398  dep_node_asn_old = icl_list_next(asn_old->waiting_deps, dep_node_asn_old);
1399  if (dep_node_asn_old_to_be_deleted!=NULL) {
1400  icl_list_delete(asn_old->waiting_deps, dep_node_asn_old_to_be_deleted, NULL);
1401  }
1402  }
1403  }
1404 }
1405 
1406 /* **************************************************************************** */
1413 static void address_set_node_initial_gatherv_check_and_launch(Quark *quark, Address_Set_Node *address_set_node, Dependency *completed_dep, int worker_rank)
1414 {
1415  icl_list_t *next_dep_node;
1416  Task *completed_task = completed_dep->task;
1417  for ( next_dep_node=icl_list_first(address_set_node->waiting_deps);
1418  next_dep_node!=NULL && next_dep_node->data != NULL;
1419  next_dep_node=icl_list_next(address_set_node->waiting_deps, next_dep_node) ) {
1420  Dependency *next_dep = (Dependency *)next_dep_node->data;
1421  /* Break when we run out of GATHERV output dependencies */
1422  if ( next_dep->gatherv==FALSE ) break;
1423  if ( next_dep->direction!=OUTPUT && next_dep->direction!=INOUT ) break;
1424  Task *next_task = next_dep->task;
1425  /* Update next_dep ready status */
1426  if ( next_dep->ready == FALSE ) {
1427  /* Record the locality information with the task data structure */
1428  //if ( next_dep->locality ) next_task->locality_preserving_dep = worker_rank;
1429  /* Mark the next dependency as ready since we have GATHERV flag */
1430  next_dep->ready = TRUE;
1431  dot_dag_print_edge( completed_task->taskid, next_task->taskid, GATHERVDEPCOLOR );
1432  dot_dag_level_update( completed_task->tasklevel, next_task->tasklevel, quark );
1433  next_task->num_dependencies_remaining--;
1434  /* If the dep status became true check related task, and put onto ready queues */
1435  quark_check_and_queue_ready_task( quark, next_task );
1436  }
1437 
1438  }
1439 }
1440 
1441 /* **************************************************************************** */
1451 {
1452  icl_list_t *dep_node = NULL;
1453  icl_list_t *first_dep_node = NULL;
1454  icl_list_t *first_ready_dep_node = NULL;
1455  icl_list_t *last_ready_dep_node = NULL;
1456  icl_list_t *last_dep_node = NULL;
1457  icl_list_t *swap_node = NULL;
1458  int acc_dep_count = 0;
1459 
1460  /* FOR each ACCUMULATOR task waiting at the beginning of address_set_node */
1461  for (dep_node = icl_list_first(address_set_node->waiting_deps);
1462  dep_node != NULL;
1463  dep_node = icl_list_next( address_set_node->waiting_deps, dep_node )) {
1464  Dependency *dependency = (Dependency *)dep_node->data;
1465  /* IF not an ACCUMULATOR dependency - break */
1466  if (dependency->accumulator == FALSE) break;
1467  Task *task = dependency->task;
1468  /* Scan through list keeping first, first_ready, last_ready, last */
1469  if (first_dep_node==NULL) first_dep_node = dep_node;
1470  if ( task->num_dependencies_remaining==1 ) {
1471  if (first_ready_dep_node==NULL) first_ready_dep_node = dep_node;
1472  last_ready_dep_node = dep_node;
1473  }
1474  last_dep_node = dep_node; /* TODO */
1475  acc_dep_count++;
1476  }
1477 
1478  /* Choose and move chosen ready node to the front of the list */
1479  /* Heuristic: Flip-flop between first-ready and last-ready.
1480  * Tested (always first, always last, flip-flop first/last) but
1481  * there was always a bad scenario. If perfect loop orders are
1482  * provided (e.g. Choleky inversion test) then this will not make
1483  * performance worse. If bad loops are provided, this will
1484  * improve performance, though not to the point of perfect
1485  * loops. */
1486  if (acc_dep_count % 2 == 0 ) {
1487  if ( last_ready_dep_node!=NULL ) swap_node = last_ready_dep_node;
1488  } else {
1489  if ( first_ready_dep_node != NULL ) swap_node = first_ready_dep_node;
1490  }
1491  if ( swap_node != NULL ) {
1492  Dependency *dependency = (Dependency *)swap_node->data;
1493  /* Move to front of the address_set_node waiting_deps list (if not already there) */
1494  if ( swap_node!=icl_list_first(address_set_node->waiting_deps) ) {
1495  icl_list_t *tmp_swap_node = icl_list_prepend( address_set_node->waiting_deps, dependency );
1496  dependency->address_set_waiting_deps_node_ptr = tmp_swap_node;
1497  icl_list_delete( address_set_node->waiting_deps, swap_node, NULL );
1498  }
1499  /* Lock the dependency in place by setting ACC to false now */
1500  dependency->accumulator = FALSE;
1501  }
1502 }
1503 
1504 
1505 /* **************************************************************************** */
1512 static void address_set_node_initial_input_check_and_launch(Quark *quark, Address_Set_Node *address_set_node, Dependency *completed_dep, int worker_rank)
1513 {
1514  icl_list_t *next_dep_node;
1515  Task *completed_task = completed_dep->task;
1516  for ( next_dep_node=icl_list_first(address_set_node->waiting_deps);
1517  next_dep_node!=NULL && next_dep_node->data != NULL;
1518  next_dep_node=icl_list_next(address_set_node->waiting_deps, next_dep_node) ) {
1519  Dependency *next_dep = (Dependency *)next_dep_node->data;
1520  Task *next_task = next_dep->task;
1521  /* Break when we hit an output dependency */
1522  if ( (next_dep->direction==OUTPUT || next_dep->direction==INOUT) ) {
1523  if ( completed_dep->direction == INPUT ) {
1524  /* Print DAG connections for antidependencies */
1525  dot_dag_print_edge( completed_task->taskid, next_task->taskid, ANTIDEPCOLOR );
1526  dot_dag_level_update( completed_task->tasklevel, next_task->tasklevel, quark );
1527  }
1528  break;
1529  }
1530  /* Update next_dep ready status; this logic assumes the breaks at the bottom */
1531  if ( next_dep->direction==INPUT && next_dep->ready == FALSE ) {
1532  /* Record the locality information with the task data structure */
1533  //if ( next_dep->locality ) next_task->locality_thread_id = worker_rank;
1534  /* If next_dep is INPUT, mark the next dependency as ready */
1535  next_dep->ready = TRUE;
1536  /* Only OUTPUT->INPUT edges get here */
1537  dot_dag_print_edge( completed_task->taskid, next_task->taskid, DEPCOLOR );
1538  dot_dag_level_update( completed_task->tasklevel, next_task->tasklevel, quark );
1539  next_task->num_dependencies_remaining--;
1540  /* If the dep status became true check related task, and put onto ready queues */
1541  quark_check_and_queue_ready_task( quark, next_task );
1542  }
1543 
1544  /* if we are generating the DAG, keep looping till an output
1545  * dependency (in order to print all WAR edges) */
1546  if (! quark->dot_dag_enable ) {
1547  /* If current original dependency (dep) was INPUT, we only need to
1548  * activate next INPUT/OUTPUT/INOUT dep, others should already be
1549  * handled; if original dep was OUTPUT/INOUT, need to keep
1550  * going till next OUTPUT/INOUT */
1551  if ( completed_dep->direction == INPUT ) break;
1552  }
1553  }
1554 }
1555 
1556 
1557 /* **************************************************************************** */
1564 static void address_set_node_initial_output_check_and_launch(Quark *quark, Address_Set_Node *address_set_node, Dependency *completed_dep, int worker_rank)
1565 {
1566  icl_list_t *next_dep_node;
1567  next_dep_node = icl_list_first(address_set_node->waiting_deps);
1568  if ( next_dep_node!=NULL && next_dep_node->data!=NULL ) {
1569  Dependency *next_dep = (Dependency *)next_dep_node->data;
1570  Task *next_task = next_dep->task;
1571  if ( (next_dep->direction==OUTPUT || next_dep->direction==INOUT) ) {
1572  /* Process OUTPUT next_deps, if at beginning of address_set_list waiting_deps starts */
1573  if ( next_dep->ready == FALSE ) {
1574  /* Record the locality information with the task data structure */
1575  //if ( next_dep->locality ) next_task->locality_thread_id = worker_rank;
1576  /* If next_dep is output, mark the next dep as ready only if it is at the front */
1577  next_dep->ready = TRUE;
1578  Task *completed_task = completed_dep->task;
1579  if ( completed_dep->direction==OUTPUT || completed_dep->direction==INOUT )
1580  dot_dag_print_edge( completed_task->taskid, next_task->taskid, DEPCOLOR );
1581  /* else */ /* Handled in initial_input_check_and_launch */
1582  /* dot_dag_print_edge( completed_task->taskid, next_task->taskid, ANTIDEPCOLOR ); */
1583  dot_dag_level_update( completed_task->tasklevel, next_task->tasklevel, quark );
1584  next_task->num_dependencies_remaining--;
1585  quark_check_and_queue_ready_task( quark, next_task );
1586  }
1587  }
1588  }
1589 }
1590 
1591 /* **************************************************************************** */
1597 static void quark_insert_task_dependencies(Quark * quark, Task * task)
1598 {
1599  icl_list_t *task_dep_p = NULL; /* task dependency list pointer */
1600 
1601  /* For each task dependency list pointer */
1602  for (task_dep_p = icl_list_first(task->dependency_list);
1603  task_dep_p != NULL;
1604  task_dep_p = icl_list_next(task->dependency_list, task_dep_p)) {
1605  Dependency *dep = (Dependency *) task_dep_p->data;
1606  /* Lookup address in address_set hash */
1607  Address_Set_Node *address_set_node = (Address_Set_Node *)icl_hash_find( quark->address_set, dep->address );
1608  /* If not found, create a new address set node and add it to the hash */
1609  if ( address_set_node == NULL ) {
1610  address_set_node = address_set_node_new( dep->address, dep->size );
1611  icl_hash_insert( quark->address_set, address_set_node->address, address_set_node );
1612  }
1613  /* Convenience shortcut pointer so that we don't have to hash again */
1614  dep->address_set_node_ptr = address_set_node;
1615  /* Add the dependency to the list of waiting dependencies on this address set node */
1616  icl_list_t *curr_dep_node = icl_list_append( address_set_node->waiting_deps, dep );
1617  /* Convenience shortcut pointer so we don't have to scan the waiting dependencies */
1618  dep->address_set_waiting_deps_node_ptr = curr_dep_node;
1619  /* Track num of waiting input, output and inout to be used to check false dependency resolution */
1620  if (dep->direction == INPUT) address_set_node->num_waiting_input++;
1621  else if (dep->direction == OUTPUT) address_set_node->num_waiting_output++;
1622  else if (dep->direction == INOUT) address_set_node->num_waiting_inout++;
1623 
1624  /* Handle the case that the a single task make multiple dependencies on the same data address */
1625  /* e.g. func( A11:IN, A11:INOUT, A11:OUT, A11:IN, A22:OUT ) */
1626  icl_list_t *prev_dep_node = icl_list_prev( address_set_node->waiting_deps, curr_dep_node);
1627  if ( prev_dep_node != NULL ) {
1628  Dependency *prev_dep = (Dependency *)prev_dep_node->data;
1629  Task *prev_task = prev_dep->task;
1630  if ( prev_task->taskid == task->taskid ) {
1631  /* The curr dependency will updated using the ordering INPUT < OUTPUT < INOUT */
1632  /* When the scheduler checks the front of the dependency list, it will find the correct dep setting */
1633  dep->direction = (dep->direction > prev_dep->direction ? INOUT : prev_dep->direction );
1634  if ( prev_dep->ready == FALSE ) {
1635  prev_dep->ready = TRUE;
1637  }
1638  /* Remove the redundent dependency from waiting deps and from the task */
1639  icl_list_delete( address_set_node->waiting_deps, prev_dep_node, NULL );
1641  /* Update the prev_dep_node ptr since it has changed */
1642  prev_dep_node = icl_list_prev( address_set_node->waiting_deps, curr_dep_node);
1643  }
1644  }
1645 
1646  /* This will avoid WAR dependencies if possible: if enabled, and
1647  * the current dependency is a write, and there were only reads
1648  * earlier (input>1, output+inout=1) */
1649  if ( ((dep->direction==OUTPUT || dep->direction==INOUT)) &&
1650  ((address_set_node->num_waiting_output + address_set_node->num_waiting_inout) == 1) ) {
1651  quark_avoid_war_dependencies( quark, address_set_node, task );
1652  }
1653 
1654  /* The following code decides whether the dep is ready or not */
1655  if ( dep->direction==INOUT || dep->direction==OUTPUT ) {
1656  /* If output, and previous dep exists, then ready=false */
1657  if ( prev_dep_node != NULL ) {
1658  dep->ready = FALSE;
1659  } else {
1660  dep->ready = TRUE;
1661  dot_dag_print_edge( address_set_node->last_reader_or_writer_taskid, task->taskid, DEPCOLOR );
1662  dot_dag_level_update( address_set_node->last_reader_or_writer_tasklevel, task->tasklevel, quark );
1664  }
1665  } else if ( dep->direction == INPUT ) {
1666  if ( prev_dep_node != NULL ) {
1667  /* If input, and previous dep is a read that is ready, then ready=true */
1668  Dependency *prev_dep = (Dependency *)prev_dep_node->data;
1669  if ( prev_dep->direction==INPUT && prev_dep->ready==TRUE ) {
1670  dep->ready = TRUE;
1671  dot_dag_print_edge( address_set_node->last_writer_taskid, task->taskid, DEPCOLOR );
1672  dot_dag_level_update( address_set_node->last_writer_tasklevel, task->tasklevel, quark );
1674  } else {
1675  dep->ready = FALSE;
1676  }
1677  } else {
1678  /* Input, but no previous node (is first), so ready */
1679  dep->ready = TRUE;
1680  dot_dag_print_edge( address_set_node->last_writer_taskid, task->taskid, DEPCOLOR );
1681  dot_dag_level_update( address_set_node->last_writer_tasklevel, task->tasklevel, quark );
1683  }
1684  }
1685  }
1686 }
1687 
1688 
1689 /* **************************************************************************** */
1702 void QUARK_Worker_Loop(Quark *quark, int thread_rank)
1703 {
1704  quark->worker[thread_rank]->thread_id = pthread_self();
1705  work_main_loop( quark->worker[thread_rank] );
1706 }
1707 
1708 
1709 /* **************************************************************************** */
1719 {
1720  Quark *quark = worker->quark_ptr;
1721  int thread_rank = QUARK_Thread_Rank(quark);
1722  quark_setaffinity( quark->coresbind[thread_rank] ) ;
1723  work_main_loop( quark->worker[thread_rank] );
1724  return;
1725 }
1726 
1727 /* **************************************************************************** */
1732 static void work_main_loop(Worker *worker)
1733 {
1734  Quark *quark = worker->quark_ptr;
1735  Worker *worker_victim = NULL;
1736  task_priority_tree_node_t *task_priority_tree_node = NULL;
1737  Task *task = NULL;
1738  int ready_list_victim = -1;
1739 
1740  /* Busy wait while not ready */
1741  do {} while ( !quark->start );
1742  int worker_rank = QUARK_Thread_Rank(quark);
1743 
1744  /* Queue all tasks before running; this line for debugging use */
1745  /* while ( !quark->all_tasks_queued ) { if (worker_rank==0) return; else {} } */
1746  if ( quark->queue_before_computing )
1747  while ( !quark->all_tasks_queued ) { if (worker_rank==0) return; else {} }
1748  /* Master never does work; this line for debugging use */
1749  /* if (worker_rank == 0) return; */
1750 
1751  while ( !worker->finalize ) {
1752  /* Repeatedly try to find a task, first trying my own ready list,
1753  * then trying to steal from someone else */
1754  task = NULL;
1755  ready_list_victim = worker_rank;
1756  /* Loop while looking for tasks */
1757  while ( task==NULL && !worker->finalize ) {
1758 
1759  /* Process all completed tasks before doing work */
1760  if ( worker_rank==0 || worker_rank%10==1 ) process_completed_tasks(quark);
1761 
1762  worker_victim = quark->worker[ready_list_victim];
1763  task_priority_tree_node = NULL;
1764  assert ( worker_victim->ready_list_size >= 0 );
1765  if ( worker_victim->ready_list_size != 0 ) {
1766  /* Only lock if there is likely to be an item in the ready list */
1767  if ( pthread_mutex_trylock_ready_list( &worker_victim->ready_list_mutex ) == 0) {
1768  /* if (pthread_mutex_lock_ready_list(&worker_victim->ready_list_mutex)==0) { */
1769  /* Check front of my own queue, back of everyone else's queue */
1770  if ( worker_rank == ready_list_victim )
1771  task_priority_tree_node = RB_MIN( task_priority_tree_head_s, worker_victim->ready_list );
1772  else if ( worker_rank!=ready_list_victim && worker_victim->executing_task==TRUE )
1773  task_priority_tree_node = RB_MAX( task_priority_tree_head_s, worker_victim->ready_list );
1774  else
1775  task_priority_tree_node = NULL;
1776  /* Access task, checking to make sure it is not pinned to a thread */
1777  if ( task_priority_tree_node != NULL ) {
1778  task = task_priority_tree_node->task;
1779  /* If task should be locked to a thread, and this is not that thread, set task to NULL and continue */
1780  if ( task->lock_to_thread>=0 && task->lock_to_thread!=worker_rank) {
1781  task = NULL;
1782  } else {
1783  /* If task found, remove it from the ready list */
1784  RB_REMOVE( task_priority_tree_head_s, worker_victim->ready_list, task_priority_tree_node );
1785  free( task_priority_tree_node );
1786  worker_victim->ready_list_size--;
1787  }
1788  }
1790  }
1791  }
1792  /* If no task found */
1793  if (task == NULL) {
1794  /* Choose the next victim queue */
1795  ready_list_victim = (ready_list_victim + 1) % quark->num_threads;
1796  /* Break for master when a scan of all queues is finished and no tasks were found */
1797  if ( worker_rank==0 && ready_list_victim==0 ) break;
1798  /* If there are no tasks, wait for a task to be introduced, then check own queue first */
1799  if ( quark->num_queued_tasks==0 && !worker->finalize && worker_rank!=0 ) {
1800  do { assert( quark->num_queued_tasks >= 0); } while ( quark->num_queued_tasks==0 && !worker->finalize ) ;
1801  ready_list_victim = worker_rank;
1802  }
1803  }
1804  }
1805  /* EXECUTE THE TASK IF FOUND */
1806  if ( task!=NULL ) {
1807  //if ( quark->num_tasks != 1 ) { printf("quark->num_tasks %d %d %d\n", quark->num_tasks, quark->low_water_mark, quark->high_water_mark ); abort(); }
1809  if ( task->function == NULL ) {
1810  /* This can occur if the task is cancelled */
1811  task->status = CANCELLED;
1813  } else {
1814  /* Call the task */
1815  worker->executing_task = TRUE;
1816  task->status = RUNNING;
1818  scratch_allocate( task );
1819  worker->current_task_ptr = task;
1820  task->function( quark );
1821  scratch_deallocate( task );
1822  task->status = DONE;
1823  worker->executing_task = FALSE;
1824  }
1825  /* Remove the task from the address hash */
1826  /* Original solution */
1827  //pthread_mutex_lock_asn(&quark->address_set_mutex);
1828  //worker_remove_completed_task_and_check_for_ready(quark, task, worker_rank);
1829  //pthread_mutex_unlock_asn(&quark->address_set_mutex);
1830  /* New version */
1832  }
1833  /* Break if master */
1834  if ( worker_rank==0 && ready_list_victim==0 ) break;
1835  }
1836  /* Worker has exited loop; ready for next time this worker is activated */
1837  worker->finalize = FALSE;
1838 }
1839 
1840 
1841 /* **************************************************************************** */
1854 {
1855  Quark_Sequence *sequence = malloc(sizeof(Quark_Sequence));
1856  assert( sequence != NULL );
1857  sequence->status = QUARK_SUCCESS;
1858  pthread_mutex_init( &sequence->sequence_mutex, NULL );
1859  ll_list_head_t *head = malloc(sizeof(ll_list_head_t));
1860  assert ( head != NULL );
1861  LIST_INIT(head);
1862  sequence->tasks_in_sequence = head;
1863  return sequence;
1864 }
1865 
1866 /* **************************************************************************** */
1881 {
1882  int retval;
1883  if ( quark==NULL || sequence==NULL ) return QUARK_ERR;
1885  if ( sequence->status != QUARK_SUCCESS ) {
1886  /* sequence already cancelled */
1887  retval = QUARK_SUCCESS;
1888  } else {
1889  sequence->status = QUARK_ERR;
1890  ll_list_node_t *np, *np_temp;
1891  LIST_FOREACH_SAFE( np, sequence->tasks_in_sequence, entries, np_temp ) {
1892  long long int taskid = np->val;
1893  /* Find taskid, make function NULL */
1894  QUARK_Cancel_Task( quark, taskid );
1895  /* Task node is removed from sequence when it finishes and is
1896  * deleted; or when sequence is destroyed */
1897  }
1898  retval = QUARK_SUCCESS;
1899  }
1901  return retval;
1902 }
1903 
1904 /* **************************************************************************** */
1918 {
1919  if ( quark==NULL || sequence==NULL) return NULL;
1920  //printf("QUARK_Sequence_Destroy %p status %d\n", sequence, sequence->status);
1922  ll_list_node_t *np, *np_temp;
1923  ll_list_head_t *head = sequence->tasks_in_sequence;
1924  LIST_FOREACH_SAFE( np, head, entries, np_temp ) {
1925  long long int taskid = np->val;
1926  QUARK_Cancel_Task( quark, taskid );
1927  }
1929  QUARK_Sequence_Wait( quark, sequence );
1931  LIST_FOREACH_SAFE( np, head, entries, np_temp ) {
1932  LIST_REMOVE( np, entries );
1933  free( np );
1934  }
1936  free( head );
1937  head = NULL;
1938  pthread_mutex_destroy( &sequence->sequence_mutex );
1939  free( sequence );
1940  sequence = NULL;
1941  return sequence;
1942 }
1943 
1944 /* **************************************************************************** */
1957 int QUARK_Sequence_Wait( Quark *quark, Quark_Sequence *sequence )
1958 {
1959  if ( quark==NULL || sequence==NULL) return QUARK_ERR;
1960  int myrank = QUARK_Thread_Rank( quark );
1961  while ( !LIST_EMPTY( sequence->tasks_in_sequence ) ) {
1962  process_completed_tasks( quark );
1963  work_main_loop( quark->worker[myrank] );
1964  }
1965  return QUARK_SUCCESS;
1966 }
1967 
1968 
1969 /* **************************************************************************** */
1981 {
1982  Task *curr_task = quark->worker[QUARK_Thread_Rank(quark)]->current_task_ptr;
1983  assert( curr_task != NULL);
1984  return (Quark_Sequence *)curr_task->sequence;
1985 }
1986 
1987 /* **************************************************************************** */
2000 {
2001  Task *curr_task = quark->worker[QUARK_Thread_Rank(quark)]->current_task_ptr;
2002  assert( curr_task != NULL);
2003  return (char *)curr_task->task_label;
2004 }
2005 
2006 
2007 /* **************************************************************************** */
2012 static void worker_remove_completed_task_enqueue_for_later_processing(Quark *quark, Task *task, int worker_rank)
2013 {
2014  int threads_remaining_for_this_task = -1;
2016  threads_remaining_for_this_task = --task->task_thread_count;
2018  if ( threads_remaining_for_this_task == 0 ) {
2019  completed_tasks_node_t *node = malloc(sizeof(completed_tasks_node_t));
2020  node->task = task;
2021  node->workerid = worker_rank;
2023  TAILQ_INSERT_TAIL( quark->completed_tasks, node, entries );
2025  }
2026 }
2027 
2028 /* **************************************************************************** */
2032 static void process_completed_tasks(Quark *quark)
2033 {
2034  completed_tasks_node_t *node = NULL;
2035  do {
2036  node = NULL;
2037  if ( pthread_mutex_trylock_asn( &quark->address_set_mutex ) == 0 ) {
2039  node = TAILQ_FIRST(quark->completed_tasks);
2040  if ( node!= NULL ) TAILQ_REMOVE( quark->completed_tasks, node, entries );
2042  }
2043  if ( node != NULL ) {
2045  free( node );
2046  }
2048  }
2049  } while ( node != NULL );
2050 }
2051 
2052 /* **************************************************************************** */
2058 static void remove_completed_task_and_check_for_ready(Quark *quark, Task *task, int worker_rank)
2059 {
2060  if ( quark->dot_dag_enable ) {
2062  if (task->tasklevel < 1) task->tasklevel=1;
2063  fprintf(dot_dag_file, "t%lld [fillcolor=\"%s\",label=\"%s\",style=filled];\n", task->taskid, task->task_color, task->task_label);
2064  /* Track the width of each task level */
2065  quark->tasklevel_width[task->tasklevel]++;
2066  /* fprintf(dot_dag_file, "// critical-path depth %ld \n", task->tasklevel ); */
2067  fprintf(dot_dag_file, "{rank=same;%lld;t%lld};\n", task->tasklevel, task->taskid );
2069  }
2070 
2071  /* For each dependency in the task that was completed */
2072  icl_list_t *dep_node;
2073  for (dep_node = icl_list_first(task->dependency_list);
2074  dep_node != NULL && dep_node->data!=NULL;
2075  dep_node = icl_list_next(task->dependency_list, dep_node)) {
2076  Dependency *dep = (Dependency *)dep_node->data;
2077  Address_Set_Node *address_set_node = dep->address_set_node_ptr;
2078 
2079  /* Mark the address/data as having been written by worker_rank */
2080  if ( dep->direction==OUTPUT || dep->direction==INOUT )
2081  address_set_node->last_thread = worker_rank;
2082  if ( quark->dot_dag_enable ) {
2083  if ( dep->direction==OUTPUT || dep->direction==INOUT ) {
2084  /* Track last writer and level, needed when this structure becomes empty */
2085  address_set_node->last_writer_taskid = task->taskid;
2086  address_set_node->last_writer_tasklevel = task->tasklevel;
2087  }
2088  address_set_node->last_reader_or_writer_taskid = task->taskid;
2089  address_set_node->last_reader_or_writer_tasklevel = task->tasklevel;
2090  }
2091  /* Check the address set node to avoid WAR dependencies; if
2092  * just completed a write, and at least one more write
2093  * (sum>=2) is pending */
2094  if ( (quark->war_dependencies_enable) &&
2095  (dep->direction==OUTPUT || dep->direction==INOUT) &&
2096  ((address_set_node->num_waiting_output + address_set_node->num_waiting_inout) >= 2) ) {
2097  quark_avoid_war_dependencies( quark, address_set_node, task );
2098  }
2099  /* Remove competed dependency from address_set_node waiting_deps list */
2100  icl_list_delete( address_set_node->waiting_deps, dep->address_set_waiting_deps_node_ptr, NULL );
2101  /* Check initial INPUT next_deps attached to address_set_node */
2102  address_set_node_initial_input_check_and_launch( quark, address_set_node, dep, worker_rank );
2103  /* Handle any initial GATHERV dependencies */
2104  address_set_node_initial_gatherv_check_and_launch(quark, address_set_node, dep, worker_rank);
2105  /* Prepend any initial accumulater dependency that is ready to go */
2106  address_set_node_accumulator_find_prepend( quark, address_set_node );
2107  /* Check initial OUTPUT/INOUT deps waiting on address_set_node */
2108  address_set_node_initial_output_check_and_launch( quark, address_set_node, dep, worker_rank );
2109  /* Keep track of the waiting dependency counts for this address */
2110  if (dep->direction == INPUT) address_set_node->num_waiting_input--;
2111  else if (dep->direction == OUTPUT) address_set_node->num_waiting_output--;
2112  else if (dep->direction == INOUT) address_set_node->num_waiting_inout--;
2113 
2114  /* If this address_set_node has no more waiting_deps, remove it */
2115  if ( icl_list_first(address_set_node->waiting_deps) == NULL )
2116  address_set_node_delete( quark, address_set_node );
2117  }
2118 
2119  task_delete(quark, task);
2120  quark->num_queued_tasks--;
2121 }
2122 
2123 /* **************************************************************************** */
2145 Quark_Task_Flags *QUARK_Task_Flag_Set( Quark_Task_Flags *task_flags, int flag, intptr_t val )
2146 {
2147  switch (flag) {
2148  case TASK_PRIORITY:
2149  task_flags->task_priority = (int)val;
2150  break;
2151  case TASK_LOCK_TO_THREAD:
2152  task_flags->task_lock_to_thread = (int)val;
2153  break;
2154  case TASK_LABEL:
2155  task_flags->task_label = (char *)val;
2156  break;
2157  case TASK_COLOR:
2158  task_flags->task_color = (char *)val;
2159  break;
2160  case TASK_SEQUENCE:
2161  task_flags->task_sequence = (Quark_Sequence *)val;
2162  break;
2163  case TASK_THREAD_COUNT:
2164  task_flags->task_thread_count = (int)val;
2165  break;
2166  }
2167  return task_flags;
2168 }
2169 
2170 
int size
Definition: quark.c:165
void * task_sequence
Definition: quark.h:101
int QUARK_Cancel_Task(Quark *quark, unsigned long long taskid)
Definition: quark.c:1182
Definition: quark.c:96
icl_list_t * waiting_deps
Definition: quark.c:187
volatile int num_dependencies_remaining
Definition: quark.c:146
#define RB_NEXT(name, x, y)
Definition: bsd_tree.h:733
Definition: quark.h:52
#define RB_INSERT(name, x, y)
Definition: bsd_tree.h:729
volatile bool executing_task
Definition: quark.c:138
static int ullong_key_compare(void *key1, void *key2)
Definition: quark.c:485
#define dot_dag_print_edge(parentid, childid, color)
Definition: quark.c:303
static unsigned int address_hash_function(void *address)
Definition: quark.c:456
#define QUARK_ERR
Definition: quark.h:43
int tasklevel_width[tasklevel_width_max_level]
Definition: quark.c:118
int task_thread_count
Definition: quark.c:159
pthread_mutex_t ready_list_mutex
Definition: quark.c:132
static int pthread_mutex_lock_completed_tasks(pthread_mutex_t *mtx)
Definition: quark.c:280
int icl_hash_destroy(icl_hash_t *ht, void(*free_key)(void *), void(*free_data)(void *))
Definition: icl_hash.c:262
icl_list_t * icl_list_prepend(icl_list_t *head, void *data)
Definition: icl_list.c:289
#define LIST_FOREACH_SAFE(var, head, field, tvar)
Definition: bsd_queue.h:367
#define TASK_LABEL
Definition: quark.h:69
static int pthread_mutex_trylock_asn(pthread_mutex_t *mtx)
Definition: quark.c:270
pthread_mutex_t task_mutex
Definition: quark.c:142
Quark * QUARK_Setup(int num_threads)
Definition: quark.c:653
Quark * QUARK_New(int num_threads)
Definition: quark.c:728
icl_list_t * icl_list_new()
Definition: icl_list.c:22
#define QUARK_TASK_MIN_PRIORITY
Definition: quark.h:86
MAGMA_DLLPORT int MAGMA_CDECL pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start)(void *), void *arg)
#define RB_REMOVE(name, x, y)
Definition: bsd_tree.h:730
FILE * dot_dag_file
Definition: quark.c:297
struct task_priority_tree_head_s * ready_list
Definition: quark.c:133
void * QUARK_Args_Pop(void *args_list, void **last_arg)
Definition: quark.c:420
#define TAILQ_INIT(head)
Definition: bsd_queue.h:503
#define TASK_PRIORITY
Definition: quark.h:75
#define dot_dag_level_update(parent_level, child_level, quark)
Definition: quark.c:298
void * data
Definition: icl_list.h:18
static void address_set_node_delete(Quark *quark, Address_Set_Node *address_set_node)
Definition: quark.c:1230
pthread_mutex_t address_set_mutex
Definition: quark.c:109
#define RB_MAX(name, x)
Definition: bsd_tree.h:736
static int pthread_cond_wait_ready_list(pthread_cond_t *cond, pthread_mutex_t *mtx)
Definition: quark.c:284
static int pthread_mutex_trylock_completed_tasks(pthread_mutex_t *mtx)
Definition: quark.c:281
MAGMA_DLLPORT int MAGMA_CDECL pthread_cond_broadcast(pthread_cond_t *cond)
struct ll_list_head_s ll_list_head_t
Definition: quark.c:206
unsigned long long last_reader_or_writer_taskid
Definition: quark.c:194
pthread_mutex_t task_set_mutex
Definition: quark.c:107
quark_direction_t
Definition: quark.h:52
int lock_to_thread
Definition: quark.c:153
bool gatherv
Definition: quark.c:169
void QUARK_Waitall(Quark *quark)
Definition: quark.c:790
struct dependency_s Dependency
int * quark_get_affthreads()
Definition: quarkos.c:245
MAGMA_DLLPORT int MAGMA_CDECL pthread_mutex_unlock(pthread_mutex_t *mutex)
LIST_HEAD(ll_list_head_s, ll_list_node_s)
int icl_list_destroy(icl_list_t *head, void(*free_function)(void *))
Definition: icl_list.c:143
static Worker * worker_new(Quark *quark, int rank)
Definition: quark.c:550
unsigned long long last_reader_or_writer_tasklevel
Definition: quark.c:195
struct completed_tasks_head_s completed_tasks_head_t
Definition: quark.c:214
#define TASK_SEQUENCE
Definition: quark.h:79
volatile int num_waiting_output
Definition: quark.c:189
struct address_set_node_s * address_set_node_ptr
Definition: quark.c:170
volatile bool ready
Definition: quark.c:174
static unsigned int fnv_hash_function(void *key, int len)
Definition: quark.c:440
TAILQ_HEAD(completed_tasks_head_s, completed_tasks_node_s)
MAGMA_DLLPORT int MAGMA_CDECL pthread_join(pthread_t thread, void **value_ptr)
struct worker_s Worker
#define LIST_ENTRY(type)
Definition: bsd_queue.h:323
int QUARK_Thread_Rank(Quark *quark)
Definition: quark.c:377
char * task_label
Definition: quark.h:100
char * QUARK_Get_Task_Label(Quark *quark)
Definition: quark.c:1999
void(* function)(Quark *)
Definition: quark.c:143
icl_list_t * icl_list_append(icl_list_t *head, void *data)
Definition: icl_list.c:304
volatile int ready_list_size
Definition: quark.c:134
icl_entry_t * icl_hash_insert(icl_hash_t *ht, void *key, void *data)
Definition: icl_hash.c:132
volatile int num_waiting_inout
Definition: quark.c:190
unsigned long long tasklevel
Definition: quark.c:152
RB_HEAD(task_priority_tree_head_s, task_priority_tree_node_s)
Definition: quark.c:93
static int pthread_mutex_unlock_asn(pthread_mutex_t *mtx)
Definition: quark.c:271
icl_list_t * icl_list_first(icl_list_t *head)
Definition: icl_list.c:192
#define GATHERVDEPCOLOR
Definition: quark.c:295
Definition: quark.c:93
#define TASK_LOCK_TO_THREAD
Definition: quark.h:77
volatile int list_robin
Definition: quark.c:102
static void scratch_deallocate(Task *task)
Definition: quark.c:626
Definition: quark.c:94
icl_list_t * icl_list_prev(icl_list_t *head, icl_list_t *pos)
Definition: icl_list.c:246
static int address_key_compare(void *addr1, void *addr2)
Definition: quark.c:466
static int pthread_mutex_unlock_wrap(pthread_mutex_t *mtx)
Definition: quark.c:278
static void quark_insert_task_dependencies(Quark *quark, Task *task)
Definition: quark.c:1597
#define LIST_INSERT_HEAD(head, elm, field)
Definition: bsd_queue.h:393
Definition: quark.h:52
Quark_Sequence * sequence
Definition: quark.c:157
int quark_get_numthreads()
Definition: quarkos.c:222
volatile int status
Definition: quark.c:125
#define fopen(ppfile, name, mode)
Definition: quark.c:65
Quark_Sequence * QUARK_Get_Sequence(Quark *quark)
Definition: quark.c:1980
#define LIST_EMPTY(head)
Definition: bsd_queue.h:358
struct quark_task_s * task
Definition: quark.c:163
#define LOCALITY
Definition: quark.h:56
Definition: quark.c:93
MAGMA_DLLPORT int MAGMA_CDECL pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: quark.c:92
int quark_setaffinity(int rank)
Definition: quarkos.c:125
struct address_set_node_s Address_Set_Node
#define PTHREAD_SCOPE_SYSTEM
struct ll_list_node_s * ptr_to_task_in_sequence
Definition: quark.c:158
static void address_set_node_initial_input_check_and_launch(Quark *quark, Address_Set_Node *address_set_node, Dependency *completed_dep, int worker_rank)
Definition: quark.c:1512
icl_list_t * task_dependency_list_node_ptr
Definition: quark.c:173
struct worker_s ** worker
Definition: quark.c:100
MAGMA_DLLPORT int MAGMA_CDECL pthread_attr_setscope(pthread_attr_t *attr, int scope)
#define RB_MIN(name, x)
Definition: bsd_tree.h:735
void quark_topology_init()
Definition: quarkos.c:78
#define ACCUMULATOR
Definition: quark.h:60
volatile long long num_tasks
Definition: quark.c:105
static int quark_revolve_robin(Quark *quark)
Definition: quark.c:495
#define DIRECTION_MASK
Definition: quark.c:91
#define TASK_COLOR
Definition: quark.h:72
void QUARK_Free(Quark *quark)
Definition: quark.c:809
void QUARK_Worker_Loop(Quark *quark, int thread_rank)
Definition: quark.c:1702
void * ptr
Definition: quark.c:178
volatile int num_queued_tasks
Definition: quark.c:112
bool locality
Definition: quark.c:167
static void process_completed_tasks(Quark *quark)
Definition: quark.c:2032
int priority
Definition: quark.c:156
volatile bool start
Definition: quark.c:103
static Task * quark_task_new()
Definition: quark.c:314
static void work_set_affinity_and_call_main_loop(Worker *worker)
Definition: quark.c:1718
#define tasklevel_width_max_level
Definition: quark.c:115
int low_water_mark
Definition: quark.c:97
Quark_Sequence * QUARK_Sequence_Destroy(Quark *quark, Quark_Sequence *sequence)
Definition: quark.c:1917
char * task_label
Definition: quark.c:154
icl_hash_t * address_set
Definition: quark.c:108
int dot_dag_enable
Definition: quark.c:116
volatile task_status status
Definition: quark.c:144
Definition: quark.c:92
icl_list_t * args_list
Definition: quark.c:147
static Address_Set_Node * address_set_node_new(void *address, int size)
Definition: quark.c:1206
Definition: quark.c:93
void quark_avoid_war_dependencies(Quark *quark, Address_Set_Node *asn_old, Task *parent_task)
Definition: quark.c:1314
volatile bool delete_data_at_address_when_node_is_deleted
Definition: quark.c:191
volatile int num_waiting_input
Definition: quark.c:188
int queue_before_computing
Definition: quark.c:117
static void work_main_loop(Worker *worker)
Definition: quark.c:1732
static void address_set_node_initial_output_check_and_launch(Quark *quark, Address_Set_Node *address_set_node, Dependency *completed_dep, int worker_rank)
Definition: quark.c:1564
Quark * quark_ptr
Definition: quark.c:136
static int pthread_mutex_lock_ready_list(pthread_mutex_t *mtx)
Definition: quark.c:273
struct ll_list_head_s * tasks_in_sequence
Definition: quark.c:127
static char * quark_task_default_label
Definition: quark.c:291
static int pthread_mutex_unlock_completed_tasks(pthread_mutex_t *mtx)
Definition: quark.c:282
void QUARK_Task_Pack_Arg(Quark *quark, Quark_Task *task, int arg_size, void *arg_ptr, int arg_flags)
Definition: quark.c:925
void * address
Definition: quark.c:164
int size
Definition: quark.c:179
int task_thread_count
Definition: quark.h:102
#define TASK_THREAD_COUNT
Definition: quark.h:81
unsigned long long last_writer_tasklevel
Definition: quark.c:193
#define TAILQ_REMOVE(head, elm, field)
Definition: bsd_queue.h:565
void quark_topology_finalize()
Definition: quarkos.c:114
int num_threads
Definition: quark.c:99
char * task_color
Definition: quark.h:99
quark_direction_t direction
Definition: quark.c:166
MAGMA_DLLPORT int MAGMA_CDECL pthread_attr_destroy(pthread_attr_t *attr)
int war_dependencies_enable
Definition: quark.c:114
static int compare_task_priority_tree_nodes(task_priority_tree_node_t *n1, task_priority_tree_node_t *n2)
Definition: quark.c:225
char * task_color
Definition: quark.c:155
icl_hash_t * task_set
Definition: quark.c:106
#define LIST_REMOVE(elm, field)
Definition: bsd_queue.h:403
volatile bool finalize
Definition: quark.c:137
static int pthread_mutex_lock_asn(pthread_mutex_t *mtx)
Definition: quark.c:269
int quark_yield()
Definition: quarkos.c:187
static int pthread_mutex_lock_wrap(pthread_mutex_t *mtx)
Definition: quark.c:277
icl_list_t * address_set_waiting_deps_node_ptr
Definition: quark.c:171
static void worker_remove_completed_task_enqueue_for_later_processing(Quark *quark, Task *task, int worker_rank)
Definition: quark.c:2012
icl_list_t * scratch_list
Definition: quark.c:149
#define TAILQ_FIRST(head)
Definition: bsd_queue.h:481
#define LIST_INIT(head)
Definition: bsd_queue.h:372
struct scratch_s Scratch
Definition: quark.c:92
int high_water_mark
Definition: quark.c:98
static void remove_completed_task_and_check_for_ready(Quark *quark, Task *task, int worker_rank)
Definition: quark.c:2058
MAGMA_DLLPORT int MAGMA_CDECL pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Quark_Task_Flags * QUARK_Task_Flag_Set(Quark_Task_Flags *flags, int flag, intptr_t val)
Definition: quark.c:2145
void QUARK_Delete(Quark *quark)
Definition: quark.c:846
#define INPUT
Definition: quark.h:53
MAGMA_DLLPORT int MAGMA_CDECL pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
unsigned long long QUARK_Insert_Task(Quark *quark, void(*function)(Quark *), Quark_Task_Flags *task_flags,...)
Definition: quark.c:1073
HANDLE pthread_mutex_t
int task_lock_to_thread
Definition: quark.h:98
int QUARK_Sequence_Cancel(Quark *quark, Quark_Sequence *sequence)
Definition: quark.c:1880
struct quark_task_s Task
Definition: quark.c:94
unsigned long long taskid
Definition: quark.c:151
struct completed_tasks_head_s * completed_tasks
Definition: quark.c:121
int icl_list_delete(icl_list_t *head, icl_list_t *pos, void(*free_function)(void *))
Definition: icl_list.c:84
pthread_t thread_id
Definition: quark.c:131
task_num
Definition: quark.c:93
RB_GENERATE(task_priority_tree_head_s, task_priority_tree_node_s, n_entry, compare_task_priority_tree_nodes)
MAGMA_DLLPORT int MAGMA_CDECL pthread_mutex_trylock(pthread_mutex_t *mutex)
void * icl_hash_find(icl_hash_t *ht, void *key)
Definition: icl_hash.c:105
static void scratch_allocate(Task *task)
Definition: quark.c:605
int(* rank)()
Definition: quark.c:111
Definition: quark.h:52
MAGMA_DLLPORT pthread_t MAGMA_CDECL pthread_self(void)
static void quark_check_and_queue_ready_task(Quark *quark, Task *task)
Definition: quark.c:1258
MAGMA_DLLPORT int MAGMA_CDECL pthread_equal(pthread_t thread1, pthread_t thread2)
static int pthread_mutex_unlock_ready_list(pthread_mutex_t *mtx)
Definition: quark.c:275
volatile int num_dependencies
Definition: quark.c:145
static void task_delete(Quark *quark, Task *task)
Definition: quark.c:348
icl_list_t * task_args_list_node_ptr
Definition: quark.c:180
pthread_mutex_t dot_dag_mutex
Definition: quark.c:119
static void worker_delete(Worker *worker)
Definition: quark.c:572
static Dependency * dependency_new(void *addr, long long size, quark_direction_t dir, bool loc, Task *task, bool accumulator, bool gatherv, icl_list_t *task_args_list_node_ptr)
Definition: quark.c:521
void QUARK_Barrier(Quark *quark)
Definition: quark.c:771
long long int val
Definition: quark.c:202
void * address
Definition: quark.c:184
icl_hash_t * icl_hash_create(int nbuckets, unsigned int(*hash_function)(void *), int(*hash_key_compare)(void *, void *))
Definition: icl_hash.c:70
Quark_Sequence * QUARK_Sequence_Create(Quark *quark)
Definition: quark.c:1853
#define TAILQ_ENTRY(type)
Definition: bsd_queue.h:427
Definition: quark.h:52
volatile int last_thread
Definition: quark.c:186
#define GATHERV
Definition: quark.h:64
#define ULLONG_MAX
Definition: quark.c:85
MAGMA_DLLPORT int MAGMA_CDECL pthread_mutex_destroy(pthread_mutex_t *mutex)
#define DOT_DAG_FILENAME
Definition: quark.c:296
#define TAILQ_INSERT_TAIL(head, elm, field)
Definition: bsd_queue.h:547
task_status
Definition: quark.c:92
int icl_hash_delete(icl_hash_t *ht, void *key, void(*free_key)(void *), void(*free_data)(void *))
Definition: icl_hash.c:224
void * QUARK_Args_List(Quark *quark)
Definition: quark.c:398
static void address_set_node_initial_gatherv_check_and_launch(Quark *quark, Address_Set_Node *address_set_node, Dependency *completed_dep, int worker_rank)
Definition: quark.c:1413
Task * quark_set_task_flags_in_task_structure(Quark *quark, Task *task, Quark_Task_Flags *task_flags)
Definition: quark.c:868
static Scratch * scratch_new(void *arg_ptr, int arg_size, icl_list_t *task_args_list_node_ptr)
Definition: quark.c:591
int * coresbind
Definition: quark.c:101
unsigned long long last_writer_taskid
Definition: quark.c:192
pthread_mutex_t sequence_mutex
Definition: quark.c:126
static void address_set_node_accumulator_find_prepend(Quark *quark, Address_Set_Node *address_set_node)
Definition: quark.c:1450
static unsigned int ullong_hash_function(void *key)
Definition: quark.c:475
int QUARK_Sequence_Wait(Quark *quark, Quark_Sequence *sequence)
Definition: quark.c:1957
Quark_Task * QUARK_Task_Init(Quark *quark, void(*function)(Quark *), Quark_Task_Flags *task_flags)
Definition: quark.c:895
int quark_getenv_int(char *name, int defval)
Definition: quarkos.c:300
struct task_priority_tree_head_s task_priority_tree_head_t
Definition: quark.c:224
MAGMA_DLLPORT int MAGMA_CDECL pthread_mutex_lock(pthread_mutex_t *mutex)
unsigned long long QUARK_Execute_Task(Quark *quark, void(*function)(Quark *), Quark_Task_Flags *task_flags,...)
Definition: quark.c:1122
Quark_Task * current_task_ptr
Definition: quark.c:135
volatile bool accumulator
Definition: quark.c:168
pthread_attr_t thread_attr
Definition: quark.c:110
#define QUARK_SUCCESS
Definition: quark.h:42
static char * arg_dup(char *arg, int size)
Definition: quark.c:509
bool
Definition: quark.c:94
#define ANTIDEPCOLOR
Definition: quark.c:294
pthread_cond_t num_queued_tasks_cond
Definition: quark.c:113
icl_list_t * dependency_list
Definition: quark.c:148
Definition: quark.c:92
#define DEPCOLOR
Definition: quark.c:293
static char * quark_task_default_color
Definition: quark.c:292
unsigned long long QUARK_Insert_Task_Packed(Quark *quark, Quark_Task *task)
Definition: quark.c:986
#define RB_INIT(root)
Definition: bsd_tree.h:303
struct dependency_s * locality_preserving_dep
Definition: quark.c:150
#define RB_ENTRY(type)
Definition: bsd_tree.h:309
pthread_mutex_t completed_tasks_mutex
Definition: quark.c:120
MAGMA_DLLPORT int MAGMA_CDECL pthread_attr_init(pthread_attr_t *attr)
int task_priority
Definition: quark.h:97
int pthread_attr_t
static int pthread_mutex_trylock_ready_list(pthread_mutex_t *mtx)
Definition: quark.c:274
icl_list_t * task_args_list_node_ptr
Definition: quark.c:172
icl_list_t * icl_list_next(icl_list_t *head, icl_list_t *pos)
Definition: icl_list.c:228
volatile bool all_tasks_queued
Definition: quark.c:104