PULSAR  2.0.0
Parallel Ultra-Light Systolic Array Runtime
 All Data Structures Files Functions Typedefs Enumerations Macros Groups
prt_vsa.c
Go to the documentation of this file.
1 
11 #include "prt_vsa.h"
12 
13 extern int prt_tuple_equal(void *tuple_a, void *tuple_b);
14 extern unsigned int prt_tuple_hash(void *tuple);
15 
17 
29  int num_threads, int num_devices, void *global_store,
30  struct prt_mapping_s (*vdp_mapping)(int*, void*, int, int))
31 {
32  // Check input parameters.
33  prt_assert(num_threads >= 0, "negative number of threads");
34  prt_assert(num_devices >= 0, "negative number of devices");
35  prt_assert(vdp_mapping != NULL, "NULL mapping function");
36 
37  // Allocate the VSA.
38  prt_vsa_t *vsa = (prt_vsa_t*)malloc(sizeof(prt_vsa_t));
39  prt_assert(vsa != NULL, "malloc failed");
40 
41  // Check for MPI.
42  int initialized;
43  int retval = MPI_Initialized(&initialized);
44  prt_assert(retval == MPI_SUCCESS, "MPI_Initialized failed");
45  if (initialized) {
46  MPI_Comm_rank(MPI_COMM_WORLD, &vsa->node_rank);
47  MPI_Comm_size(MPI_COMM_WORLD, &vsa->num_nodes);
48  }
49  else {
50  vsa->num_nodes = 1;
51  vsa->node_rank = 0;
52  }
53  // Init the VSA.
54  vsa->num_threads = num_threads;
55  vsa->num_cores = vsa->num_nodes*vsa->num_threads;
56  vsa->thread_warmup_func = NULL;
57 
58  vsa->num_devices = num_devices;
59  vsa->num_accelerators = vsa->num_nodes*vsa->num_devices;
60  vsa->device_warmup_func = NULL;
61 
62  vsa->vdp_mapping = vdp_mapping;
63  vsa->proxy = NULL;
64  vsa->config = prt_config_new();
65  vsa->global_store = global_store;
66 
67  // Init proxy if required.
68  vsa->concurrency = num_threads;
69  if (vsa->num_nodes > 1 || vsa->num_devices > 0) {
70  vsa->proxy = prt_proxy_new(num_threads+num_devices);
71  vsa->proxy->vsa = vsa;
72  vsa->concurrency++;
73  }
74  // Init pthreads.
75  pthread_setconcurrency(vsa->concurrency);
76  pthread_attr_init(&vsa->thread_attr);
77  pthread_attr_setscope(&vsa->thread_attr, PTHREAD_SCOPE_SYSTEM);
78 
79  int i;
80  // Initialize threads.
81  vsa->thread = (prt_thread_t**)malloc(vsa->num_threads*sizeof(prt_thread_t*));
82  prt_assert(vsa->thread != NULL, "malloc failed");
83  for (i = 0; i < vsa->num_threads; i++) {
84  vsa->thread[i] = prt_thread_new(i, vsa->node_rank*vsa->num_threads+i, i);
85  vsa->thread[i]->vsa = vsa;
86  }
87 
88  // Initialize devices.
89  vsa->device = (prt_device_t**)malloc(vsa->num_devices*sizeof(prt_device_t));
90  prt_assert(vsa->device != NULL, "malloc failed");
91  for (i = 0; i < vsa->num_devices; i++) {
92  int agent = vsa->num_threads+i;
93  vsa->device[i] =
94  prt_device_new(i, vsa->node_rank*vsa->num_devices+i, agent);
95  vsa->device[i]->vsa = vsa;
96  }
97  // Initialize device memory allocators.
98  // Allocating 80% of available GPU memory.
99  vsa->devmem = (gpu_malloc_t**)malloc(vsa->num_devices*sizeof(gpu_malloc_t*));
100  prt_assert(vsa->devmem != NULL, "malloc failed");
101  for (i = 0; i < vsa->num_devices; i++) {
102  size_t mem_free;
103  size_t mem_total;
104  cudaError_t error;
105  error = cudaSetDevice(i);
106  prt_assert(error == cudaSuccess, cudaGetErrorString(error));
107  error = cudaMemGetInfo(&mem_free, &mem_total);
108  prt_assert(error == cudaSuccess, cudaGetErrorString(error));
109  int num_segments = mem_free / PRT_VSA_GPU_ALLOC_UNIT_SIZE;
110  num_segments = num_segments * 4 / 5;
111  prt_assert(num_segments > 0, "zero segments available");
112  vsa->devmem[i] = gpu_malloc_init(
113  num_segments, PRT_VSA_GPU_ALLOC_UNIT_SIZE);
114  prt_assert(vsa->devmem[i] != NULL, "gpu_malloc_init failed");
115  }
116 
117  // Initialize thread barrier.
118  pthread_barrier_init(&vsa->barrier, NULL, vsa->concurrency);
119 
120  // Initialize the VDPs hash.
121  int nbuckets = PRT_VSA_MAX_VDPS_PER_NODE;
122  vsa->vdps_hash = icl_hash_create(nbuckets, prt_tuple_hash, prt_tuple_equal);
123 
124  // Allocate the array of channel lists.
125  vsa->channel_lists =
126  (icl_list_t**)calloc(vsa->num_nodes, sizeof(icl_list_t*));
127  prt_assert(vsa->channel_lists != NULL, "malloc failed");
128 
129  // Return the VSA.
130  return vsa;
131 }
132 
134 
141 {
142  // Check input parameters.
143  prt_assert(vsa != NULL, "NULL VSA");
144 
145  // Destroy the VDPs hash.
146  icl_hash_destroy(vsa->vdps_hash, NULL, (void(*)(void*))prt_vdp_delete);
147 
148  // Delete the config.
149  prt_config_delete(vsa->config);
150 
151  // Delete the proxy.
152  if (vsa->proxy != NULL)
153  prt_proxy_delete(vsa->proxy);
154 
155  // Delete thread barrier.
156  pthread_barrier_destroy(&vsa->barrier);
157 
158  int i;
159  // Delete threads.
160  for (i = 0; i < vsa->num_threads; i++)
161  prt_thread_delete(vsa->thread[i]);
162  free(vsa->thread);
163 
164  // Delete devices.
165  for (i = 0; i < vsa->num_devices; i++)
166  prt_device_delete(vsa->device[i]);
167  free(vsa->device);
168 
169  // Destroy device memory allocators.
170  for (i = 0; i < vsa->num_devices; i++) {
171  cudaError_t error = cudaSetDevice(i);
172  prt_assert(error == cudaSuccess, cudaGetErrorString(error));
173  int retval = gpu_malloc_fini(vsa->devmem[i]);
174  prt_assert(retval == 0, "gpu_malloc_fini failed");
175  }
176  free(vsa->devmem);
177 
178  // Destroy thread attributes.
179  pthread_attr_destroy(&vsa->thread_attr);
180 
181  // Free the VSA.
182  free(vsa);
183 }
184 
186 
201 {
202  // Check input parameters.
203  prt_assert(vsa != NULL, "NULL VSA");
204  prt_assert(vdp != NULL, "NULL VDP");
205 
206  // Find the mapping.
207  prt_mapping_t mapping =
208  vsa->vdp_mapping(
209  vdp->tuple, vsa->global_store,
210  vsa->num_cores, vsa->num_accelerators);
211 
212  int node_rank;
213  // IF host VDP.
214  if (mapping.location == PRT_LOCATION_HOST) {
215  // Compute node rank and thread rank;
216  node_rank = mapping.rank / vsa->num_threads;
217  int thread_rank = mapping.rank % vsa->num_threads;
218 
219  // IF VDP not in this node.
220  if (node_rank != vsa->node_rank) {
221  // Destroy along with all channels and return.
222  prt_vdp_annihilate(vdp);
223  return;
224  }
225 
226  // Insert in the thread's list of VDPs.
227  icl_list_t *node = icl_list_append(vsa->thread[thread_rank]->vdps, vdp);
228  prt_assert(node != NULL, "icl_list_append failed");
229  vdp->thread = vsa->thread[thread_rank];
230  }
231  // ELSE IF device VDP.
232  else {
233  // Compute node rank and device rank.
234  node_rank = mapping.rank / vsa->num_devices;
235  int device_rank = mapping.rank % vsa->num_devices;
236 
237  // IF VDP not in this node.
238  if (node_rank != vsa->node_rank) {
239  // Destroy along with all channels and return.
240  prt_vdp_annihilate(vdp);
241  return;
242  }
243  // Insert in the device's list of VDPs.
244  icl_list_t *node = icl_list_append(vsa->device[device_rank]->vdps, vdp);
245  prt_assert(node != NULL, "icl_list_append failed");
246  vdp->device = vsa->device[device_rank];
247 
248  cudaError_t error;
249  // Create the VDP's stream.
250  error = cudaSetDevice(device_rank);
251  prt_assert(error == cudaSuccess, cudaGetErrorString(error));
252  error = cudaStreamCreateWithFlags(&vdp->stream, cudaStreamNonBlocking);
253  prt_assert(error == cudaSuccess, cudaGetErrorString(error));
254  }
255  vdp->vsa = vsa;
256  vdp->location = mapping.location;
257  vdp->global_store = vsa->global_store;
258 
259  int i;
260  // Provide proxy to the input channels.
261  for (i = 0; i < vdp->num_inputs; i++)
262  if (vdp->input[i] != NULL)
263  vdp->input[i]->proxy = vsa->proxy;
264 
265  // Provide proxy to the output channels.
266  for (i = 0; i < vdp->num_outputs; i++)
267  if (vdp->output[i] != NULL)
268  vdp->output[i]->proxy = vsa->proxy;
269 
270  // Insert in the VSA's VDP hash.
271  icl_entry_t *entry = icl_hash_insert(
272  vsa->vdps_hash, (void*)vdp->tuple, (void*)vdp);
273  prt_assert(entry != NULL, "icl_hash_insert failed");
274 
275  // Merge intra-node channels.
276  prt_vsa_vdp_merge_channels(vsa, vdp);
277  // Track tags for inter-node communication.
278  prt_vsa_vdp_track_tags(vsa, vdp);
279 }
280 
282 
291 {
292  int i;
293  // FOR each input channel.
294  for (i = 0; i < vdp->num_inputs; i++) {
295  prt_channel_t *channel = vdp->input[i];
296  if (channel != NULL) {
297  // Look for maximum channel size.
298  prt_proxy_max_channel_size(vsa->proxy, channel);
299  // Look up the source VDP.
300  prt_vdp_t *src_vdp =
301  icl_hash_find(vsa->vdps_hash, (void*)channel->src_tuple);
302  // IF source VDP found.
303  if (src_vdp != NULL) {
304  // Check for channel-tuple mismatch.
305  int *src_vdp_dst_tuple =
306  src_vdp->output[channel->src_slot]->dst_tuple;
307  prt_assert(prt_tuple_equal(src_vdp_dst_tuple, vdp->tuple),
308  "VDP channel tuple mismatch");
309  // Swap the existing channel to this channel.
310  prt_channel_delete(src_vdp->output[channel->src_slot]);
311  src_vdp->output[channel->src_slot] = channel;
312  // Point to the source VDP in the channel.
313  channel->src_vdp = src_vdp;
314  }
315  }
316  }
317  // FOR each output channel.
318  for (i = 0; i < vdp->num_outputs; i++) {
319  prt_channel_t *channel = vdp->output[i];
320  if (channel != NULL) {
321  // Look for maximum channel size.
322  prt_proxy_max_channel_size(vsa->proxy, channel);
323  // Look up the destination VDP.
324  prt_vdp_t *dst_vdp =
325  icl_hash_find(vsa->vdps_hash, (void*)channel->dst_tuple);
326  // IF destination VDP found.
327  if (dst_vdp != NULL) {
328  // Check for channel-tuple mismatch.
329  int *dst_vdp_src_tuple =
330  dst_vdp->input[channel->dst_slot]->src_tuple;
331  prt_assert(prt_tuple_equal(dst_vdp_src_tuple, vdp->tuple),
332  "VDP channel tuple mismatch");
333  // Swap this channel for the existing channel.
334  vdp->output[i] = dst_vdp->input[channel->dst_slot];
335  // Point to the source VDP in the channel.
336  vdp->output[i]->src_vdp = vdp;
337  prt_channel_delete(channel);
338 
339  }
340  }
341  }
342 }
343 
345 
352 {
353  int i;
354  // FOR each input channel.
355  for (i = 0; i < vdp->num_inputs; i++) {
356  prt_channel_t *channel = vdp->input[i];
357  if (channel != NULL) {
358  // Assing destination node.
359  channel->dst_node = vsa->node_rank;
360 
361  int src_node;
362  // Find source node.
363  prt_mapping_t src_mapping =
364  vsa->vdp_mapping(
365  channel->src_tuple, vsa->global_store,
366  vsa->num_cores, vsa->num_accelerators);
367  if (src_mapping.location == PRT_LOCATION_HOST)
368  src_node = src_mapping.rank / vsa->num_threads;
369  else
370  src_node = src_mapping.rank / vsa->num_devices;
371  channel->src_node = src_node;
372 
373  // IF another node is the source.
374  if (src_node != vsa->node_rank) {
375  // Create the list if empty.
376  if (vsa->channel_lists[src_node] == NULL) {
377  vsa->channel_lists[src_node] = icl_list_new();
378  prt_assert(vsa->channel_lists[src_node] != NULL,
379  "icl_list_new failed");
380  }
381  // Add the channel to the list.
382  icl_list_t *node = icl_list_isort(
383  vsa->channel_lists[src_node], channel, prt_channel_compare);
384  prt_assert(node != NULL, "icl_list_isort failed");
385  }
386  }
387  }
388  // FOR each output channel.
389  for (i = 0; i < vdp->num_outputs; i++) {
390  prt_channel_t *channel = vdp->output[i];
391  if (channel != NULL) {
392  // Assing source node.
393  channel->src_node = vsa->node_rank;
394 
395  int dst_node;
396  // Find destination node.
397  prt_mapping_t dst_mapping =
398  vsa->vdp_mapping(
399  channel->dst_tuple, vsa->global_store,
400  vsa->num_cores, vsa->num_accelerators);
401  if (dst_mapping.location == PRT_LOCATION_HOST)
402  dst_node = dst_mapping.rank / vsa->num_threads;
403  else
404  dst_node = dst_mapping.rank / vsa->num_devices;
405  channel->dst_node = dst_node;
406 
407  // IF another node is the destination.
408  if (dst_node != vsa->node_rank) {
409  // Create the list if empty.
410  if (vsa->channel_lists[dst_node] == NULL) {
411  vsa->channel_lists[dst_node] = icl_list_new();
412  prt_assert(vsa->channel_lists[dst_node] != NULL,
413  "icl_list_new failed");
414  }
415  // Add the channel to the list.
416  icl_list_t *node = icl_list_isort(
417  vsa->channel_lists[dst_node], channel, prt_channel_compare);
418  prt_assert(node != NULL, "icl_list_isort failed");
419  }
420  }
421  }
422 }
423 
425 
433 {
434  int i;
435  for (i = 0; i < vsa->num_nodes; i++) {
436  if (vsa->channel_lists[i] != NULL) {
437  int tag = 0;
438  icl_list_t *node;
439  // Assign consecutive tags to the elements.
440  icl_list_foreach(vsa->channel_lists[i], node) {
441  prt_channel_t *channel = (prt_channel_t*)node->data;
442  channel->tag = tag++;
443 
444  int *node_tag;
445  if (channel->dst_node == vsa->node_rank)
446  node_tag = prt_tuple_new2(channel->src_node, channel->tag);
447  else
448  node_tag = prt_tuple_new2(channel->dst_node, channel->tag);
449 
450  icl_entry_t *entry = icl_hash_insert(
451  vsa->proxy->tags_hash, (void*)node_tag, (void*)channel);
452  prt_assert(entry != NULL, "icl_hash_insert failed");
453  }
454  // Destroy the list.
455  int status = icl_list_destroy(vsa->channel_lists[i], NULL);
456  prt_assert(status == 0, "icl_list_destroy failed");
457  }
458  }
459  // Free the array of lists.
460  free(vsa->channel_lists);
461 }
462 
464 
470 {
471  int j;
472  // FOR each device.
473  for (j = 0; j < vsa->num_devices; j++) {
474  prt_device_t *device = vsa->device[j];
475  icl_list_t *vdp_node;
476  // FOR each device VDP.
477  icl_list_foreach(device->vdps, vdp_node) {
478  prt_vdp_t *vdp = (prt_vdp_t*)vdp_node->data;
479  int i;
480  // FOR each input channel.
481  for (i = 0; i < vdp->num_inputs; i++) {
482  prt_channel_t *channel = vdp->input[i];
483  // IF the channel is not NULL.
484  if (channel != NULL) {
485  // IF coming from a different node
486  // OR coming from a host VDP
487  // OR coming from another device
488  // (when the device has to pull).
489  if (channel->src_vdp == NULL ||
490  channel->src_vdp->location == PRT_LOCATION_HOST ||
491  channel->src_vdp->device->rank !=
492  channel->dst_vdp->device->rank) {
493  // Create the in_stream.
494  cudaError_t error;
495  error = cudaSetDevice(device->rank);
496  prt_assert(error == cudaSuccess,
497  cudaGetErrorString(error));
498  error = cudaStreamCreateWithFlags(
499  &channel->in_stream, cudaStreamNonBlocking);
500  prt_assert(error == cudaSuccess,
501  cudaGetErrorString(error));
502  }
503  }
504  }
505  // FOR each output channel.
506  for (i = 0; i < vdp->num_outputs; i++) {
507  prt_channel_t *channel = vdp->output[i];
508  // IF the channel is not NULL.
509  if (channel != NULL) {
510  // IF going to another node
511  // OR going to a host VDP
512  // OR going to another device
513  // (when the device has to push).
514  if (channel->dst_vdp == NULL ||
515  channel->dst_vdp->location == PRT_LOCATION_HOST ||
516  channel->dst_vdp->device->rank !=
517  channel->src_vdp->device->rank) {
518  // Create the out_stream.
519  cudaError_t error;
520  error = cudaSetDevice(device->rank);
521  prt_assert(error == cudaSuccess,
522  cudaGetErrorString(error));
523  error = cudaStreamCreateWithFlags(
524  &channel->out_stream, cudaStreamNonBlocking);
525  prt_assert(error == cudaSuccess,
526  cudaGetErrorString(error));
527  }
528  }
529  }
530  }
531  }
532 }
533 
535 
546 double prt_vsa_run(prt_vsa_t *vsa)
547 {
548  // Check input parameters.
549  prt_assert(vsa != NULL, "NULL VSA");
550 
551  // Assign channel tags.
553 
554  // Create channel streams.
556 
557  // Initialize SVG tracing.
558  svg_trace_init(vsa->concurrency, vsa->num_devices);
559 
560  int i;
561  int status;
562  // Launch threads.
563  i = vsa->proxy == NULL;
564  for (; i < vsa->num_threads; i++) {
565  status = pthread_create(
566  &vsa->thread[i]->id, &vsa->thread_attr,
567  prt_thread_run, vsa->thread[i]);
568  prt_assert(status == 0, "pthread_create failed");
569  }
570  double time;
571  // IF no proxy.
572  if (vsa->proxy == NULL) {
573  // Serve as thread zero.
574  vsa->thread[0]->id = pthread_self();
575  prt_thread_run((void*)vsa->thread[0]);
576  time = vsa->thread[0]->time;
577  }
578  else {
579  // Call devices warmup function.
581  // Serve as the proxy.
582  time = prt_proxy_run(vsa->proxy);
583  }
584  // Join threads.
585  i = vsa->proxy == NULL;
586  for (; i < vsa->num_threads; i++) {
587  status = pthread_join(vsa->thread[i]->id, NULL);
588  prt_assert(status == 0, "pthread_join failed");
589  }
590  // Finish tracing.
591  if (vsa->config->svg_tracing == PRT_SVG_TRACING_ON)
592  svg_trace_finish(vsa->concurrency, vsa->num_devices);
593 
594  return time;
595 }
596 
598 
608 {
609  // Check input parameters.
610  prt_assert(vsa != NULL, "NULL VSA");
611 
612  // Set the value for the parameter.
613  switch (param) {
614  case PRT_VDP_SCHEDULING:
615  switch (value) {
616  case PRT_VDP_SCHEDULING_LAZY:
617  case PRT_VDP_SCHEDULING_AGGRESSIVE:
618  vsa->config->vdp_scheduling = value;
619  break;
620  default:
621  prt_error("invalid value PRT_VDP_SCHEDULING");
622  break;
623  }
624  break;
625  case PRT_SVG_TRACING:
626  switch (value) {
627  case PRT_SVG_TRACING_ON:
628  case PRT_SVG_TRACING_OFF:
629  vsa->config->svg_tracing = value;
630  break;
631  default:
632  prt_error("invalid value for PRT_SVG_TRACING");
633  break;
634  }
635  break;
636  default:
637  prt_error("invalid parameter");
638  break;
639  }
640 }
641 
643 
656 void prt_vsa_thread_warmup_func_set(prt_vsa_t *vsa, void (*func)())
657 {
658  // Check input parameters.
659  prt_assert(vsa != NULL, "NULL VSA");
660 
661  // Set the thread warmup function.
662  vsa->thread_warmup_func = func;
663 }
664 
666 
679 void prt_vsa_device_warmup_func_set(prt_vsa_t *vsa, void (*func)())
680 {
681  // Check input parameters.
682  prt_assert(vsa != NULL, "NULL VSA");
683 
684  // Set the device warmup function.
685  vsa->device_warmup_func = func;
686 }
687 
689 
695 {
696  // Quick return.
697  if (vsa->device_warmup_func == NULL)
698  return;
699 
700  int dev;
701  // Call the device warmup function.
702  for (dev = 0; dev < vsa->num_devices; dev++) {
703  cudaError_t error = cudaSetDevice(dev);
704  prt_assert(error == cudaSuccess, cudaGetErrorString(error));
705  vsa->device_warmup_func();
706  }
707  // Synchronize each device.
708  for (dev = 0; dev < vsa->num_devices; dev++) {
709  cudaError_t error = cudaSetDevice(dev);
710  prt_assert(error == cudaSuccess, cudaGetErrorString(error));
711  error = cudaDeviceSynchronize();
712  prt_assert(error == cudaSuccess, cudaGetErrorString(error));
713  }
714 }