PULSAR  2.0.0
Parallel Ultra-Light Systolic Array Runtime
 All Data Structures Files Functions Typedefs Enumerations Macros Groups
prt_channel.c
Go to the documentation of this file.
1 
11 #include "prt_channel.h"
12 
14 
29  size_t size,
30  int *src_tuple, int src_slot,
31  int *dst_tuple, int dst_slot)
32 {
33  // Check input parameters.
34  prt_assert(size > 0, "channel size equals zero");
35  prt_assert(size <= INT_MAX, "channel size larger than INT_MAX");
36  prt_assert(src_tuple != NULL, "NULL source tuple");
37  prt_assert(src_slot >= 0, "negative source slot");
38  prt_assert(dst_tuple != NULL, "NULL destination tuple");
39  prt_assert(dst_slot >= 0, "negative destination slot");
40 
41  // Allocate the channel.
42  prt_channel_t *channel = (prt_channel_t*)malloc(sizeof(prt_channel_t));
43  prt_assert(channel != NULL, "malloc failed");
44 
45  // Initialize the channel.
46  channel->dst_vdp = NULL;
47  channel->src_vdp = NULL;
48  channel->proxy = NULL;
49  channel->size = size;
50  channel->src_tuple = src_tuple;
51  channel->dst_tuple = dst_tuple;
52  channel->src_slot = src_slot;
53  channel->dst_slot = dst_slot;
54  channel->active = 1;
55 
56  // Create the list of packets.
57  channel->packets = icl_deque_new();
58  prt_assert(channel->packets != NULL, "icl_deque_new() failed");
59 
60  // Return the channel.
61  return channel;
62 }
63 
65 
71 {
72  // Check for a NULL channel.
73  prt_assert(channel != NULL, "NULL channel");
74 
75  // Free the source tuple.
76  prt_assert(channel->src_tuple != NULL, "NULL tuple");
77  free(channel->src_tuple);
78 
79  // Free the destination tuple.
80  prt_assert(channel->dst_tuple != NULL, "NULL tuple");
81  free(channel->dst_tuple);
82 
83  // Destroy the list of packets.
84  // Only an empty list can be destroyed.
85  // Therefore NULL given as the packet destructor.
86  prt_assert(channel->packets != NULL, "NULL packets list");
87  int size = icl_deque_size(channel->packets);
88 
89  prt_assert(size == 0, "non-epty packet list");
90  icl_deque_destroy(channel->packets, NULL);
91 
92  // Free the channel.
93  free(channel);
94 }
95 
97 
105  prt_vdp_t *vdp, prt_channel_t *channel, prt_packet_t *packet)
106 {
107  // Increment references.
108  // Now the packet is in the VDP and in the channel.
109  // The VDP's reference is active until the VDP releases the packet.
110  __sync_fetch_and_add(&packet->num_refs, 1);
111 
112  // IF going to a remote destination.
113  if (channel->src_node != channel->dst_node) {
114  // Create the request.
115  prt_request_t *request =
117  packet, packet->size, channel->dst_node, channel->tag);
118  // Queue for send.
120  channel->proxy->sends_requested[vdp->thread->agent_rank], request);
121  }
122  // ELSE IF a local destination.
123  else { // IF a host destination.
124  if (channel->dst_vdp->location == PRT_LOCATION_HOST) {
125  // host -> host
126  icl_deque_append(channel->packets, (void*)packet);
127  }
128  // ELSE if a device destination.
129  else { // host -> device
130  prt_transfer_t *transfer =
131  prt_transfer_new(packet, channel, PRT_HOST_TO_DEVICE, -1);
132  icl_deque_append(channel->proxy->transfers, (void*)transfer);
133  }
134  }
135 }
136 
138 
152  prt_vdp_t *vdp, prt_channel_t *channel, prt_packet_t *packet)
153 {
154  // Increment references.
155  // Now the packet is in the channel and in the VDP.
156  // The VDP's reference is active until the VDP releases the packet.
157  __sync_fetch_and_add(&packet->num_refs, 1);
158 
159  // IF remote destination.
160  if (channel->src_node != channel->dst_node) {
161  // Device -> MPI.
162  // Add a callback that queues the transfer request.
163  prt_callback_queue_t *callback =
165  NULL, packet, channel,
166  PRT_DEVICE_MPI_TO_HOST,
167  vdp->device->agent_rank);
168  __sync_fetch_and_add(&channel->proxy->num_callbacks, 1);
169  cudaStreamAddCallback(
170  vdp->stream, prt_callback_queue_handler, (void*)callback, 0);
171  }
172  // ELSE IF local destination.
173  else { // IF host destination.
174  if (channel->dst_vdp->location == PRT_LOCATION_HOST) {
175  // Device -> host.
176  // Add a callback that queues the transfer request.
177  prt_callback_queue_t *callback =
179  NULL, packet, channel, PRT_DEVICE_TO_HOST, -1);
180  __sync_fetch_and_add(&channel->proxy->num_callbacks, 1);
181  cudaStreamAddCallback(
182  vdp->stream, prt_callback_queue_handler, (void*)callback, 0);
183  }
184  // ELSE IF device destination.
185  else { // IF the same device.
186  if (packet->device_rank == channel->dst_vdp->device->rank) {
187  // Device -> the same device.
188  // Add a callback that puts the packet in the channel.
189  prt_callback_finish_t *callback =
190  prt_callback_finish_new(NULL, packet, channel);
191  __sync_fetch_and_add(&channel->proxy->num_callbacks, 1);
192  cudaStreamAddCallback(
193  vdp->stream, prt_callback_finish_handler, (void*)callback, 0);
194  }
195  // ELSE IF a different device.
196  else { // dev -> different dev
197  // Add a callback that queues the transfer request.
198  prt_callback_queue_t *callback =
200  NULL, packet, channel, PRT_DEVICE_TO_DEVICE, -1);
201  __sync_fetch_and_add(&channel->proxy->num_callbacks, 1);
202  cudaStreamAddCallback(
203  vdp->stream, prt_callback_queue_handler, (void*)callback, 0);
204  }
205  }
206  }
207 }
208 
210 
220 {
221  // Defensive error check.
222  prt_assert(channel->packets != NULL, "NULL list of packets");
223 
224  icl_node_t *node = icl_deque_first(channel->packets);
225  prt_assert(node != NULL, "empty list of packets");
226 
227  prt_packet_t *packet = (prt_packet_t*)node->data;
228  prt_assert(packet != NULL, "NULL packet");
229 
230  icl_deque_delete(channel->packets, node, NULL);
231  return packet;
232 }
233 
235 
244 {
245  // Check for a NULL input params.
246  prt_assert(channel != NULL, "NULL channel");
247  prt_assert(channel->packets != NULL, "NULL list of packets");
248 
249  // Return the status.
250  return (icl_deque_first(channel->packets) == NULL);
251 }
252 
254 
264 int prt_channel_compare(void *channel1, void *channel2)
265 {
266  prt_channel_t *c1 = (prt_channel_t*)channel1;
267  prt_channel_t *c2 = (prt_channel_t*)channel2;
268 
269  if (prt_tuple_compare(c1->src_tuple, c2->src_tuple) != 0)
270  return prt_tuple_compare(c1->src_tuple, c2->src_tuple);
271 
272  if (c1->src_slot < c2->src_slot) return -1;
273  if (c1->src_slot > c2->src_slot) return 1;
274 
275  if (prt_tuple_compare(c1->dst_tuple, c2->dst_tuple) != 0)
276  return prt_tuple_compare(c1->dst_tuple, c2->dst_tuple);
277 
278  if (c1->dst_slot < c2->dst_slot) return -1;
279  if (c1->dst_slot > c2->dst_slot) return 1;
280 
281  return 0;
282 }
283 
285 
293 {
294  // Switch the channel off.
295  channel->active = 0;
296 }
297 
299 
307 {
308  // Switch the channel on.
309  channel->active = 1;
310 }