PULSAR  2.0.0
Parallel Ultra-Light Systolic Array Runtime
 All Data Structures Files Functions Typedefs Enumerations Macros Groups
prt_proxy.c
Go to the documentation of this file.
1 
20 #include "prt_proxy.h"
21 
23 
30 prt_proxy_t* prt_proxy_new(int num_agents)
31 {
32  // Allocate the proxy.
33  prt_proxy_t *proxy = (prt_proxy_t*)malloc(sizeof(prt_proxy_t));
34  prt_assert(proxy != NULL, "malloc failed");
35 
36  // Init the proxy.
37  proxy->num_agents = num_agents;
38  proxy->max_channel_size = 0;
39  proxy->num_callbacks = 0;
40 
41  // Create the hash table of tags.
42  int nbuckets = PRT_PROXY_MAX_TAGS_PER_NODE;
43  proxy->tags_hash =
45  prt_assert(proxy->tags_hash != NULL, "icl_hash_create failed");
46 
47  int i;
48  // Create lists of requested sends.
49  proxy->sends_requested =
50  (icl_deque_t**)malloc(num_agents*sizeof(icl_deque_t*));
51  prt_assert(proxy->sends_requested != NULL, "malloc failed");
52  for (i = 0; i < num_agents; i++) {
53  proxy->sends_requested[i] = icl_deque_new();
54  prt_assert(proxy->sends_requested[i] != NULL, "icl_deque_new failed");
55  }
56 
57  // Allocate lists of posted sends.
58  proxy->sends_posted =
59  (icl_list_t**)malloc(num_agents*sizeof(icl_list_t*));
60  prt_assert(proxy->sends_posted != NULL, "malloc failed");
61  for (i = 0; i < num_agents; i++) {
62  proxy->sends_posted[i] = icl_list_new();
63  prt_assert(proxy->sends_posted[i] != NULL, "icl_list_new failed");
64  }
65 
66  // Create list of receives.
67  proxy->recvs_posted = icl_list_new();
68  prt_assert(proxy->recvs_posted != NULL, "icl_list_new failed");
69 
70  // Create deque of local transfers.
71  proxy->transfers = icl_deque_new();
72  prt_assert(proxy->transfers != NULL, "icl_deque_new failed");
73 
74  // Return the proxy.
75  return proxy;
76 }
77 
79 
87 {
88  int status;
89  // Destroy the hash.
90  // Free the keys (tuples).
91  // Leave the channel references alone.
92  status = icl_hash_destroy(proxy->tags_hash, free, NULL);
93  prt_assert(status == 0, "icl_hash_destroy failed");
94 
95  int i;
96  int size;
97  // Destroy lists of sends requested.
98  for (i = 0; i < proxy->num_agents; i++) {
99  size = icl_deque_size(proxy->sends_requested[i]);
100  prt_assert(size == 0, "destroying non-empty deque");
101  status = icl_deque_destroy(proxy->sends_requested[i], NULL);
102  prt_assert(status == 0, "icl_deque_destroy failed");
103  }
104  free(proxy->sends_requested);
105 
106  // Destroy lists of sends posted.
107  for (i = 0; i < proxy->num_agents; i++) {
108  size = icl_list_size(proxy->sends_posted[i]);
109  prt_assert(size == 0, "destroying non-empty list");
110  status = icl_list_destroy(proxy->sends_posted[i], NULL);
111  prt_assert(status == 0, "icl_list_destroy failed");
112  }
113  free(proxy->sends_posted);
114 
115  // Destroy the deque of local transfers.
116  size = icl_deque_size(proxy->transfers);
117  prt_assert(size == 0, "destroying non-empty deque");
118  status = icl_deque_destroy(proxy->transfers, NULL);
119  prt_assert(status == 0, "icl_deque_destroy failed");
120 
121  // Free the proxy.
122  free(proxy);
123 }
124 
126 
133 {
134  // Quit if no proxy.
135  if (proxy == NULL)
136  return;
137 
138  // Store the largest packet size.
139  if (channel->size > proxy->max_channel_size)
140  proxy->max_channel_size = channel->size;
141 }
142 
144 
151 {
152  int count;
153  int retval;
154  // Find the message size.
155  retval = MPI_Get_count(&request->status, MPI_BYTE, &count);
156  prt_assert(retval == MPI_SUCCESS, "MPI_Get_count failed");
157  // Resize the packet if necessary.
158  if (count != request->packet->size)
159  prt_packet_resize_host(request->packet, count);
160 
161  // Locate the destination channel.
162  int source = request->status.MPI_SOURCE;
163  int tag = request->status.MPI_TAG;
164  int *source_tag = prt_tuple_new2(source, tag);
165  prt_channel_t *channel = icl_hash_find(proxy->tags_hash, source_tag);
166  free(source_tag);
167 
168  // IF host VDP.
169  if (channel->dst_vdp->location == PRT_LOCATION_HOST) {
170  // Place packet in the channel's dequeue.
171  icl_deque_append(channel->packets, (void*)request->packet);
172  }
173  // ELSE if device VDP
174  else {
175  // Place a host-to-device transfer in the channel's stream.
176  prt_packet_host_to_device(request->packet, channel);
177  }
178 }
179 
181 
188 {
189  int i;
190  // Post another send.
191  for (i = 0; i < proxy->num_agents; i++) {
192  if (icl_list_size(proxy->sends_posted[i]) <
194  icl_node_t *node = icl_deque_first(proxy->sends_requested[i]);
195  if (node != NULL) {
196  prt_request_t *request = (prt_request_t*)node->data;
197  // Post the send request.
198  prt_request_send(request);
199  // Move from the list of requested to the list of posted.
200  // Only moving the request, not destroying the request.
201  icl_deque_delete(proxy->sends_requested[i], node, NULL);
202  icl_list_append(proxy->sends_posted[i], request);
203  }
204  }
205  }
206  // Complete another send.
207  for (i = 0; i < proxy->num_agents; i++) {
208  icl_node_t *node = icl_list_first(proxy->sends_posted[i]);
209  if (node != NULL) {
210  prt_request_t *request = (prt_request_t*)node->data;
211  // If send request completed.
212  if (prt_request_test(request)) {
213  // Release the packet.
214  prt_packet_release_host(request->packet);
215  // Remove from the list & destroy the request.
216  icl_list_delete(proxy->sends_posted[i], node,
217  (void(*)(void*))prt_request_delete);
218  }
219  }
220  }
221  // Post another receive.
222  if (icl_list_size(proxy->recvs_posted) <
223  PRT_PROXY_MAX_RECVS_PER_AGENT*proxy->num_agents) {
224  prt_packet_t *packet =
225  prt_packet_new_host(proxy->max_channel_size, NULL);
226  prt_request_t *request =
228  packet, proxy->max_channel_size,
229  MPI_ANY_SOURCE, MPI_ANY_TAG);
230  prt_request_recv(request);
231  icl_list_append(proxy->recvs_posted, request);
232  }
233  // Complete another receive.
234  icl_node_t *node = icl_list_first(proxy->recvs_posted);
235  if (node != NULL) {
236  prt_request_t *request = (prt_request_t*)node->data;
237  // IF recv request completed.
238  if (prt_request_test(request)) {
239  // Put the packet in a channel.
240  prt_proxy_recv(proxy, request);
241  // Remove from the list & destroy the request.
242  icl_list_delete(proxy->recvs_posted, node,
243  (void(*)(void*))prt_request_delete);
244  }
245  }
246 }
247 
249 
257 {
258  int dev;
259  // Cycle devices.
260  for (dev = 0; dev < proxy->vsa->num_devices; dev++)
261  prt_device_cycle(proxy->vsa->device[dev]);
262 
263  // Issue all local comms.
264  icl_node_t *node;
265  while ((node = icl_deque_first(proxy->transfers)) != NULL) {
266  prt_transfer_t *transfer =
267  (prt_transfer_t*)node->data;
268  switch (transfer->direction) {
269  case PRT_HOST_TO_DEVICE:
271  transfer->packet, transfer->channel);
272  break;
273  case PRT_DEVICE_TO_HOST:
275  transfer->packet, transfer->channel);
276  break;
277  case PRT_DEVICE_TO_DEVICE:
279  transfer->packet, transfer->channel);
280  break;
281  case PRT_DEVICE_MPI_TO_HOST:
283  transfer->packet,
284  transfer->channel,
285  transfer->agent);
286  break;
287  case PRT_DEVICE_MPI_FROM_HOST: {
288  prt_request_t *request =
290  transfer->packet,
291  transfer->packet->size,
292  transfer->channel->dst_node,
293  transfer->channel->tag);
295  proxy->sends_requested[transfer->agent], request);
296  break;
297  }
298  case PRT_DEVICE_PACKET_RELEASE:
299  prt_packet_release_device(transfer->packet);
300  break;
301  }
302  icl_deque_delete(proxy->transfers, node, NULL);
303  prt_transfer_delete(transfer);
304  }
305 }
306 
308 
320 {
321  // MPI barrier
323  MPI_Barrier(MPI_COMM_WORLD);
324  svg_trace_stop_cpu(0, -Honeydew);
325 
326  // Barrier threads.
328  pthread_barrier_wait(&proxy->vsa->barrier);
329  svg_trace_stop_cpu(0, -Azure);
330  double start = get_time_of_day();
331 
332  int dev;
333  // Register initial device and DMA events.
334  for (dev = 0; dev < proxy->vsa->num_devices; dev++) {
335  cudaSetDevice(dev);
338  svg_trace_stop_gpu(0, Black);
339  svg_trace_stop_dma(0, Black);
340  }
341  int sends_posted;
342  int sends_requested;
343  int threads_finished;
344  int devices_finished;
345  do {
346  // Cycle MPI.
347  if (proxy->vsa->num_nodes > 1)
348  prt_proxy_mpi(proxy);
349 
350  // Cycle CUDA.
351  if (proxy->vsa->num_devices > 0)
352  prt_proxy_cuda(proxy);
353 
354  int i;
355  devices_finished = 1;
356  // Check if devices finished.
357  for (i = 0; i < proxy->vsa->num_devices; i++)
358  if (!proxy->vsa->device[i]->finished)
359  devices_finished = 0;
360 
361  threads_finished = 1;
362  // Check if threads finished.
363  for (i = 0; i < proxy->vsa->num_threads; i++)
364  if (!proxy->vsa->thread[i]->finished)
365  threads_finished = 0;
366 
367  sends_requested = 0;
368  // Check if sends pending.
369  for (i = 0; i < proxy->num_agents; i++)
370  if (icl_deque_size(proxy->sends_requested[i]) > 0)
371  sends_requested = 1;
372 
373  sends_posted = 0;
374  // Check if sends pending.
375  for (i = 0; i < proxy->num_agents; i++)
376  if (icl_list_size(proxy->sends_posted[i]) > 0)
377  sends_posted = 1;
378  }
379  while (
380  !threads_finished || !devices_finished ||
381  sends_requested || sends_posted ||
382  icl_deque_size(proxy->transfers) > 0 || proxy->num_callbacks > 0);
383  // WHILE threads not finished OR sends pending.
384 
385  // Destroy the list of pending receives.
386  // Cancel and free all requests. Release all packets.
387  icl_list_destroy(proxy->recvs_posted, (void(*))(void*)prt_request_cancel);
388 
389  // Synchronize each device.
390  for (dev = 0; dev < proxy->vsa->num_devices; dev++) {
391  cudaSetDevice(dev);
392  cudaDeviceSynchronize();
393  }
394  // thread barrier
396  pthread_barrier_wait(&proxy->vsa->barrier);
397  svg_trace_stop_cpu(0, -Azure);
398 
399  // Barrier before any other MPI call
400  // picks up one of those pending recvs.
402  MPI_Barrier(MPI_COMM_WORLD);
403  svg_trace_stop_cpu(0, -Honeydew);
404 
405  // Return the time between the end of the first MPI barrier
406  // and the end of the second MPI barrier.
407  double stop = get_time_of_day();
408  return (stop - start);
409 }