PULSAR  1.0.0
Parallel Ultra Light Systolic Array Runtime
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups
prt_proxy.c
Go to the documentation of this file.
1 
11 #include "prt_proxy.h"
12 
14 
21 prt_proxy_t* prt_proxy_new(int num_threads)
22 {
23  // Allocate the proxy.
24  prt_proxy_t *proxy = (prt_proxy_t*)malloc(sizeof(prt_proxy_t));
25  prt_assert(proxy != NULL, "malloc failed");
26 
27  // Init the proxy.
28  proxy->num_threads = num_threads;
29  proxy->max_packet_size = 0;
30 
31  // Create the hash table of tags.
32  int nbuckets = PRT_PROXY_MAX_TAGS_PER_NODE;
34  prt_assert(proxy->tags_hash != NULL, "icl_hash_create failed");
35 
36  // Allocate arrays of list.
37  proxy->sends_requested = (icl_deque_t**)malloc(num_threads*sizeof(icl_deque_t*));
38  proxy->sends_posted = (icl_list_t**)malloc(num_threads*sizeof(icl_list_t*));
39  prt_assert(proxy->sends_requested != NULL, "malloc failed");
40  prt_assert(proxy->sends_posted != NULL, "malloc failed");
41 
42  int i;
43  // Create send lists.
44  for (i = 0; i < num_threads; i++) {
45  proxy->sends_requested[i] = icl_deque_new();
46  proxy->sends_posted[i] = icl_list_new();
47  prt_assert(proxy->sends_requested[i] != NULL, "icl_deque_new failed");
48  prt_assert(proxy->sends_posted[i] != NULL, "icl_list_new failed");
49  }
50  // Create list of receives.
51  proxy->recvs_posted = icl_list_new();
52  prt_assert(proxy->recvs_posted != NULL, "icl_list_new failed");
53 
54  // Return the proxy.
55  return proxy;
56 }
57 
59 
67 {
68  int status;
69  // Destroy the hash.
70  // Free the keys (tuples).
71  // Leave the channel references alone.
72  status = icl_hash_destroy(proxy->tags_hash, free, NULL);
73  prt_assert(status == 0, "icl_hash_destroy failed");
74 
75  int i;
76  int size;
77  // Destroy send lists.
78  // Check if empty at destruction.
79  for (i = 0; i < proxy->num_threads; i++) {
80  size = icl_deque_size(proxy->sends_requested[i]);
81  prt_assert(size == 0, "destroying non-empty deque");
82  status = icl_deque_destroy(proxy->sends_requested[i], NULL);
83  prt_assert(status == 0, "icl_deque_destroy failed");
84 
85  size = icl_list_size(proxy->sends_posted[i]);
86  prt_assert(size == 0, "destroying non-empty list");
87  status = icl_list_destroy(proxy->sends_posted[i], NULL);
88  prt_assert(status == 0, "icl_list_destroy failed");
89  }
90  // Free arrays.
91  free(proxy->sends_requested);
92  free(proxy->sends_posted);
93 
94  // Free the proxy.
95  free(proxy);
96 }
97 
99 
106 {
107  int type_size;
108  MPI_Type_size(channel->datatype, &type_size);
109  size_t packet_size = (size_t)channel->count*type_size;
110  if (packet_size > proxy->max_packet_size)
111  proxy->max_packet_size = packet_size;
112 }
113 
115 
122 void prt_proxy_send(prt_proxy_t *proxy, int thread_rank, prt_channel_t *channel)
123 {
124  // Pop the packet from the channel.
125  prt_packet_t *packet = prt_channel_pop(channel);
126  // Create a request.
127  prt_request_t *request =
129  packet,
130  channel->count, channel->datatype,
131  channel->dst_node, channel->tag);
132  // Queue for send.
133  icl_deque_append(proxy->sends_requested[thread_rank], request);
134 }
135 
137 
144 {
145  int count;
146  int retval;
147  // Find the message size and realloc the packet data.
148  retval = MPI_Get_count(&request->status, MPI_BYTE, &count);
149  prt_assert(retval == MPI_SUCCESS, "MPI_Get_count failed");
150  request->packet->data = realloc(request->packet->data, count);
151 
152  // Find the channel and push.
153  int source = request->status.MPI_SOURCE;
154  int tag = request->status.MPI_TAG;
155  int *source_tag = prt_tuple_new2(source, tag);
156  prt_channel_t *channel = icl_hash_find(proxy->tags_hash, source_tag);
157  free(source_tag);
158 
159  // Reset num_refs and push.
160  request->packet->num_refs = 0;
161  prt_channel_push(channel, request->packet);
162 }
163 
165 
172 {
173  int sends_posted;
174  int sends_requested;
175  int threads_finished;
176  do {
177  int i;
178  // Post another send.
179  for (i = 0; i < proxy->num_threads; i++) {
180  if (icl_list_size(proxy->sends_posted[i]) <
182  icl_node_t *node = icl_deque_first(proxy->sends_requested[i]);
183  if (node != NULL) {
184  prt_request_t *request = (prt_request_t*)node->data;
185  // Post the send request.
186  prt_request_send(request);
187  // Move from the list of requested to the list of posted.
188  // Only moving the request, not destroying the request.
189  icl_deque_delete(proxy->sends_requested[i], node, NULL);
190  icl_list_append(proxy->sends_posted[i], request);
191  }
192  }
193  }
194  // Complete another send.
195  for (i = 0; i < proxy->num_threads; i++) {
196  icl_node_t *node = icl_list_first(proxy->sends_posted[i]);
197  if (node != NULL) {
198  prt_request_t *request = (prt_request_t*)node->data;
199  // If send request completed.
200  if (prt_request_test(request)) {
201  // Release the packet.
202  prt_packet_release(request->packet);
203  // Remove from the list & destroy the request.
204  icl_list_delete(proxy->sends_posted[i], node,
205  (void(*)(void*))prt_request_destroy);
206  }
207  }
208  }
209  // Post another receive.
210  if (icl_list_size(proxy->recvs_posted) <
212  prt_packet_t *packet = prt_packet_new(proxy->max_packet_size);
213  prt_request_t *request =
215  packet, proxy->max_packet_size,
216  MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG);
217  prt_request_recv(request);
218  icl_list_append(proxy->recvs_posted, request);
219  }
220  // Complete another receive.
221  icl_node_t *node = icl_list_first(proxy->recvs_posted);
222  if (node != NULL) {
223  prt_request_t *request = (prt_request_t*)node->data;
224  // IF recv request completed.
225  if (prt_request_test(request)) {
226  // Put the packet in a channel.
227  prt_proxy_recv(proxy, request);
228  // Remove from the list & destroy the request.
229  icl_list_delete(proxy->recvs_posted, node,
230  (void(*)(void*))prt_request_destroy);
231  }
232  }
233  threads_finished = 1;
234  // Check if threads finished.
235  for (i = 0; i < proxy->num_threads; i++)
236  if (!proxy->vsa->thread[i]->finished)
237  threads_finished = 0;
238 
239  sends_requested = 0;
240  // Check if sends pending.
241  for (i = 0; i < proxy->num_threads; i++)
242  if (icl_deque_size(proxy->sends_requested[i]) > 0)
243  sends_requested = 1;
244 
245  sends_posted = 0;
246  // Check if sends pending.
247  for (i = 0; i < proxy->num_threads; i++)
248  if (icl_list_size(proxy->sends_posted[i]) > 0)
249  sends_posted = 1;
250  }
251  while (!threads_finished || sends_requested || sends_posted);
252  // WHILE threads not finished OR sends pending.
253 
254  // Destroy the list of pending receives.
255  // Cancel and free all requests. Release all packets.
256  icl_list_destroy(proxy->recvs_posted, (void(*))(void*)prt_request_cancel);
257 
258  // Barrier before any other MPI call
259  // picks up one of those pending recvs.
260  MPI_Barrier(MPI_COMM_WORLD);
261 }
icl_node_t * icl_deque_append(icl_deque_t *deque, void *data)
Insert the node at the end of the deque.
Definition: icl_deque.c:118
void prt_packet_release(prt_packet_t *packet)
Release a packet. Decrements the number of active references. Destroys the packet when the last refer...
Definition: prt_packet.c:44
icl_list_t * recvs_posted
Definition: prt_proxy.h:45
icl_list_t ** sends_posted
Definition: prt_proxy.h:44
int icl_hash_destroy(icl_hash_t *ht, void(*free_key)(void *), void(*free_data)(void *))
Free hash table structures. Key and data are freed using functions.
Definition: icl_hash.c:279
MPI_Status status
Definition: prt_request.h:34
icl_list_t * icl_list_new()
Create new linked list.
Definition: icl_list.c:23
int icl_list_size(icl_list_t *head)
Get the number of items in this linked list.
Definition: icl_list.c:200
void * data
Definition: icl_list.h:20
void prt_request_send(prt_request_t *request)
Post a send request. Detects a possible overflow of the request size.
Definition: prt_request.c:61
icl_deque_t * icl_deque_new()
deque constructor
Definition: icl_deque.c:23
MPI_Datatype datatype
Definition: prt_channel.h:32
int icl_deque_size(icl_deque_t *deque)
Return the deque size.
Definition: icl_deque.c:189
int icl_list_destroy(icl_list_t *head, void(*free_function)(void *))
Frees the resources associated with this linked list.
Definition: icl_list.c:173
icl_node_t * icl_deque_first(icl_deque_t *deque)
Get the first node in the deque.
Definition: icl_deque.c:76
struct prt_thread_s ** thread
Definition: prt_vsa.h:51
icl_list_t * icl_list_append(icl_list_t *head, void *data)
Insert a node at the end of this list.
Definition: icl_list.c:326
void prt_proxy_delete(prt_proxy_t *proxy)
communication proxy destructor Checking if all the lists are empty at the time of destruction...
Definition: prt_proxy.c:66
icl_list_t * icl_list_first(icl_list_t *head)
Get the first item in this linked list.
Definition: icl_list.c:221
void prt_request_cancel(prt_request_t *request)
Cancel a request. Cancel the request, release the packet, free the request object.
Definition: prt_request.c:151
VDP&#39;s data packet A packet of data transferred through VDP&#39;s channels.
Definition: prt_packet.h:24
unsigned int prt_tuple_hash(void *tuple)
tuple hash Required by the VSA&#39;s tuples hash table. Computes the lenght in characters and calls a str...
Definition: prt_tuple.c:188
void prt_channel_push(prt_channel_t *channel, prt_packet_t *packet)
Sends a packed down a channel. Increments the packet&#39;s number of active references.
Definition: prt_channel.c:98
prt_request_t * prt_request_new(prt_packet_t *packet, int count, MPI_Datatype datatype, int peer, int tag)
request constructor
Definition: prt_request.c:25
VSA&#39;s communication proxy Serves communication requests from the worker threads. Contains a list of r...
Definition: prt_proxy.h:39
prt_proxy_t * prt_proxy_new(int num_threads)
communication proxy constructor
Definition: prt_proxy.c:21
void * data
Definition: prt_packet.h:25
void prt_proxy_max_packet_size(prt_proxy_t *proxy, prt_channel_t *channel)
Look for maximum channel/packet size.
Definition: prt_proxy.c:105
#define PRT_PROXY_MAX_SENDS_PER_THREAD
Definition: prt_proxy.h:30
int prt_request_test(prt_request_t *request)
Test a request. Trace only completed requests.
Definition: prt_request.c:128
#define prt_tuple_new2(a, b)
Definition: prt_tuple.h:35
int icl_deque_destroy(icl_deque_t *deque, void(*free_func)(void *))
deque destructor
Definition: icl_deque.c:53
VDP&#39;s data channel Implements a data link between a pair of VDPs. Identifies the source and destinati...
Definition: prt_channel.h:29
struct prt_packet_s * packet
Definition: prt_request.h:28
prt_packet_t * prt_channel_pop(prt_channel_t *channel)
Fetches a packef from a channel. Does not decrement the number of active references. The packet leaves the channel, but enters the VDP.
Definition: prt_channel.c:128
icl_deque_t ** sends_requested
Definition: prt_proxy.h:43
int icl_deque_delete(icl_deque_t *deque, icl_node_t *node, void(*free_func)(void *))
Delete the node from the deque.
Definition: icl_deque.c:164
struct prt_vsa_s * vsa
Definition: prt_proxy.h:40
#define prt_assert(cond, msg)
Definition: prt_assert.h:30
prt_packet_t * prt_packet_new(size_t data_size)
packet constructor Sets the number of references to one.
Definition: prt_packet.c:23
#define PRT_PROXY_MAX_RECVS_PER_THREAD
Definition: prt_proxy.h:31
void prt_request_destroy(prt_request_t *request)
request destructor Request is only an envelope for a packet. Request destruction does not affect the ...
Definition: prt_request.c:48
int icl_list_delete(icl_list_t *head, icl_list_t *pos, void(*free_function)(void *))
Delete the specified node.
Definition: icl_list.c:83
void * icl_hash_find(icl_hash_t *ht, void *key)
Search for an entry in a hash table.
Definition: icl_hash.c:108
int prt_tuple_equal(void *tuple_a, void *tuple_b)
tuple equality check Check if tuples are identical in length and content.
Definition: prt_tuple.c:161
void prt_proxy_run(prt_proxy_t *proxy)
communication proxy production cycle Serves communication requests of local worker threads until shut...
Definition: prt_proxy.c:171
icl_hash_t * icl_hash_create(int nbuckets, unsigned int(*hash_function)(void *), int(*hash_key_compare)(void *, void *))
Create a new hash table.
Definition: icl_hash.c:70
void prt_request_recv(prt_request_t *request)
Post a receive request. Detects a possible overflow of the request size.
Definition: prt_request.c:93
void prt_proxy_recv(prt_proxy_t *proxy, prt_request_t *request)
recv to a channel
Definition: prt_proxy.c:143
VSA proxy&#39;s communication request Contains basic information about the communication request...
Definition: prt_request.h:27
#define PRT_PROXY_MAX_TAGS_PER_NODE
maximum tags per node Size of the proxy&#39;s hash table of tags. Should be a prime number.
Definition: prt_proxy.h:29
size_t max_packet_size
Definition: prt_proxy.h:46
int num_threads
Definition: prt_proxy.h:41
PRT communication proxy.
icl_hash_t * tags_hash
Definition: prt_proxy.h:42
void prt_proxy_send(prt_proxy_t *proxy, int thread_rank, prt_channel_t *channel)
send from a channel
Definition: prt_proxy.c:122