PULSAR  1.0.0
Parallel Ultra Light Systolic Array Runtime
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups
prt_proxy.h File Reference

PRT communication proxy. More...

#include "prt.h"
Include dependency graph for prt_proxy.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  prt_proxy_s
 VSA's communication proxy Serves communication requests from the worker threads. Contains a list of recv requests from each worker thread. Contains a list of send requests from each worker thread. More...
 

Macros

#define PRT_PROXY_MAX_TAGS_PER_NODE   10003
 maximum tags per node Size of the proxy's hash table of tags. Should be a prime number. More...
 
#define PRT_PROXY_MAX_SENDS_PER_THREAD   1
 
#define PRT_PROXY_MAX_RECVS_PER_THREAD   1
 

Typedefs

typedef struct prt_proxy_s prt_proxy_t
 VSA's communication proxy Serves communication requests from the worker threads. Contains a list of recv requests from each worker thread. Contains a list of send requests from each worker thread. More...
 

Functions

prt_proxy_tprt_proxy_new (int num_threads)
 communication proxy constructor More...
 
void prt_proxy_delete (prt_proxy_t *proxy)
 communication proxy destructor Checking if all the lists are empty at the time of destruction. Not destroying the list of receives (destroyed at the end of prt_proxy_run). More...
 
void prt_proxy_max_packet_size (prt_proxy_t *proxy, struct prt_channel_s *channel)
 Look for maximum channel/packet size. More...
 
void prt_proxy_send (prt_proxy_t *proxy, int thread_rank, struct prt_channel_s *channel)
 send from a channel More...
 
void prt_proxy_recv (prt_proxy_t *proxy, struct prt_request_s *request)
 recv to a channel More...
 
void prt_proxy_run (prt_proxy_t *proxy)
 communication proxy production cycle Serves communication requests of local worker threads until shut down. More...
 

Detailed Description

PRT communication proxy.

Author
Jakub Kurzak

PULSAR Runtime http://icl.eecs.utk.edu/pulsar/ Copyright (C) 2012-2013 University of Tennessee.

Definition in file prt_proxy.h.

Macro Definition Documentation

#define PRT_PROXY_MAX_RECVS_PER_THREAD   1

Definition at line 31 of file prt_proxy.h.

#define PRT_PROXY_MAX_SENDS_PER_THREAD   1

Definition at line 30 of file prt_proxy.h.

#define PRT_PROXY_MAX_TAGS_PER_NODE   10003

maximum tags per node Size of the proxy's hash table of tags. Should be a prime number.

Definition at line 29 of file prt_proxy.h.

Typedef Documentation

typedef struct prt_proxy_s prt_proxy_t

VSA's communication proxy Serves communication requests from the worker threads. Contains a list of recv requests from each worker thread. Contains a list of send requests from each worker thread.

Function Documentation

void prt_proxy_delete ( prt_proxy_t proxy)

communication proxy destructor Checking if all the lists are empty at the time of destruction. Not destroying the list of receives (destroyed at the end of prt_proxy_run).

Parameters
proxy

Definition at line 66 of file prt_proxy.c.

References icl_deque_destroy(), icl_deque_size(), icl_hash_destroy(), icl_list_destroy(), icl_list_size(), prt_proxy_s::num_threads, prt_assert, prt_proxy_s::sends_posted, prt_proxy_s::sends_requested, and prt_proxy_s::tags_hash.

67 {
68  int status;
69  // Destroy the hash.
70  // Free the keys (tuples).
71  // Leave the channel references alone.
72  status = icl_hash_destroy(proxy->tags_hash, free, NULL);
73  prt_assert(status == 0, "icl_hash_destroy failed");
74 
75  int i;
76  int size;
77  // Destroy send lists.
78  // Check if empty at destruction.
79  for (i = 0; i < proxy->num_threads; i++) {
80  size = icl_deque_size(proxy->sends_requested[i]);
81  prt_assert(size == 0, "destroying non-empty deque");
82  status = icl_deque_destroy(proxy->sends_requested[i], NULL);
83  prt_assert(status == 0, "icl_deque_destroy failed");
84 
85  size = icl_list_size(proxy->sends_posted[i]);
86  prt_assert(size == 0, "destroying non-empty list");
87  status = icl_list_destroy(proxy->sends_posted[i], NULL);
88  prt_assert(status == 0, "icl_list_destroy failed");
89  }
90  // Free arrays.
91  free(proxy->sends_requested);
92  free(proxy->sends_posted);
93 
94  // Free the proxy.
95  free(proxy);
96 }
icl_list_t ** sends_posted
Definition: prt_proxy.h:44
int icl_hash_destroy(icl_hash_t *ht, void(*free_key)(void *), void(*free_data)(void *))
Free hash table structures. Key and data are freed using functions.
Definition: icl_hash.c:279
int icl_list_size(icl_list_t *head)
Get the number of items in this linked list.
Definition: icl_list.c:200
int icl_deque_size(icl_deque_t *deque)
Return the deque size.
Definition: icl_deque.c:189
int icl_list_destroy(icl_list_t *head, void(*free_function)(void *))
Frees the resources associated with this linked list.
Definition: icl_list.c:173
int icl_deque_destroy(icl_deque_t *deque, void(*free_func)(void *))
deque destructor
Definition: icl_deque.c:53
icl_deque_t ** sends_requested
Definition: prt_proxy.h:43
#define prt_assert(cond, msg)
Definition: prt_assert.h:30
int num_threads
Definition: prt_proxy.h:41
icl_hash_t * tags_hash
Definition: prt_proxy.h:42

Here is the call graph for this function:

Here is the caller graph for this function:

void prt_proxy_max_packet_size ( prt_proxy_t proxy,
prt_channel_t channel 
)

Look for maximum channel/packet size.

Parameters
proxy
channel

Definition at line 105 of file prt_proxy.c.

References prt_channel_s::count, prt_channel_s::datatype, and prt_proxy_s::max_packet_size.

106 {
107  int type_size;
108  MPI_Type_size(channel->datatype, &type_size);
109  size_t packet_size = (size_t)channel->count*type_size;
110  if (packet_size > proxy->max_packet_size)
111  proxy->max_packet_size = packet_size;
112 }
MPI_Datatype datatype
Definition: prt_channel.h:32
size_t max_packet_size
Definition: prt_proxy.h:46

Here is the caller graph for this function:

prt_proxy_t* prt_proxy_new ( int  num_threads)

communication proxy constructor

Parameters
num_threadsnumber of local worker threads
Returns
new communication proxy

Definition at line 21 of file prt_proxy.c.

References icl_deque_new(), icl_hash_create(), icl_list_new(), prt_proxy_s::max_packet_size, prt_proxy_s::num_threads, prt_assert, PRT_PROXY_MAX_TAGS_PER_NODE, prt_tuple_equal(), prt_tuple_hash(), prt_proxy_s::recvs_posted, prt_proxy_s::sends_posted, prt_proxy_s::sends_requested, and prt_proxy_s::tags_hash.

22 {
23  // Allocate the proxy.
24  prt_proxy_t *proxy = (prt_proxy_t*)malloc(sizeof(prt_proxy_t));
25  prt_assert(proxy != NULL, "malloc failed");
26 
27  // Init the proxy.
28  proxy->num_threads = num_threads;
29  proxy->max_packet_size = 0;
30 
31  // Create the hash table of tags.
32  int nbuckets = PRT_PROXY_MAX_TAGS_PER_NODE;
34  prt_assert(proxy->tags_hash != NULL, "icl_hash_create failed");
35 
36  // Allocate arrays of list.
37  proxy->sends_requested = (icl_deque_t**)malloc(num_threads*sizeof(icl_deque_t*));
38  proxy->sends_posted = (icl_list_t**)malloc(num_threads*sizeof(icl_list_t*));
39  prt_assert(proxy->sends_requested != NULL, "malloc failed");
40  prt_assert(proxy->sends_posted != NULL, "malloc failed");
41 
42  int i;
43  // Create send lists.
44  for (i = 0; i < num_threads; i++) {
45  proxy->sends_requested[i] = icl_deque_new();
46  proxy->sends_posted[i] = icl_list_new();
47  prt_assert(proxy->sends_requested[i] != NULL, "icl_deque_new failed");
48  prt_assert(proxy->sends_posted[i] != NULL, "icl_list_new failed");
49  }
50  // Create list of receives.
51  proxy->recvs_posted = icl_list_new();
52  prt_assert(proxy->recvs_posted != NULL, "icl_list_new failed");
53 
54  // Return the proxy.
55  return proxy;
56 }
icl_list_t * recvs_posted
Definition: prt_proxy.h:45
icl_list_t ** sends_posted
Definition: prt_proxy.h:44
icl_list_t * icl_list_new()
Create new linked list.
Definition: icl_list.c:23
icl_deque_t * icl_deque_new()
deque constructor
Definition: icl_deque.c:23
unsigned int prt_tuple_hash(void *tuple)
tuple hash Required by the VSA&#39;s tuples hash table. Computes the lenght in characters and calls a str...
Definition: prt_tuple.c:188
VSA&#39;s communication proxy Serves communication requests from the worker threads. Contains a list of r...
Definition: prt_proxy.h:39
icl_deque_t ** sends_requested
Definition: prt_proxy.h:43
#define prt_assert(cond, msg)
Definition: prt_assert.h:30
int prt_tuple_equal(void *tuple_a, void *tuple_b)
tuple equality check Check if tuples are identical in length and content.
Definition: prt_tuple.c:161
icl_hash_t * icl_hash_create(int nbuckets, unsigned int(*hash_function)(void *), int(*hash_key_compare)(void *, void *))
Create a new hash table.
Definition: icl_hash.c:70
#define PRT_PROXY_MAX_TAGS_PER_NODE
maximum tags per node Size of the proxy&#39;s hash table of tags. Should be a prime number.
Definition: prt_proxy.h:29
size_t max_packet_size
Definition: prt_proxy.h:46
int num_threads
Definition: prt_proxy.h:41
icl_hash_t * tags_hash
Definition: prt_proxy.h:42

Here is the call graph for this function:

Here is the caller graph for this function:

void prt_proxy_recv ( prt_proxy_t proxy,
prt_request_t request 
)

recv to a channel

Parameters
proxy
request

Definition at line 143 of file prt_proxy.c.

References prt_packet_s::data, icl_hash_find(), prt_packet_s::num_refs, prt_request_s::packet, prt_assert, prt_channel_push(), prt_tuple_new2, prt_request_s::status, and prt_proxy_s::tags_hash.

144 {
145  int count;
146  int retval;
147  // Find the message size and realloc the packet data.
148  retval = MPI_Get_count(&request->status, MPI_BYTE, &count);
149  prt_assert(retval == MPI_SUCCESS, "MPI_Get_count failed");
150  request->packet->data = realloc(request->packet->data, count);
151 
152  // Find the channel and push.
153  int source = request->status.MPI_SOURCE;
154  int tag = request->status.MPI_TAG;
155  int *source_tag = prt_tuple_new2(source, tag);
156  prt_channel_t *channel = icl_hash_find(proxy->tags_hash, source_tag);
157  free(source_tag);
158 
159  // Reset num_refs and push.
160  request->packet->num_refs = 0;
161  prt_channel_push(channel, request->packet);
162 }
MPI_Status status
Definition: prt_request.h:34
void prt_channel_push(prt_channel_t *channel, prt_packet_t *packet)
Sends a packed down a channel. Increments the packet&#39;s number of active references.
Definition: prt_channel.c:98
void * data
Definition: prt_packet.h:25
#define prt_tuple_new2(a, b)
Definition: prt_tuple.h:35
VDP&#39;s data channel Implements a data link between a pair of VDPs. Identifies the source and destinati...
Definition: prt_channel.h:29
struct prt_packet_s * packet
Definition: prt_request.h:28
#define prt_assert(cond, msg)
Definition: prt_assert.h:30
void * icl_hash_find(icl_hash_t *ht, void *key)
Search for an entry in a hash table.
Definition: icl_hash.c:108
icl_hash_t * tags_hash
Definition: prt_proxy.h:42

Here is the call graph for this function:

Here is the caller graph for this function:

void prt_proxy_run ( prt_proxy_t proxy)

communication proxy production cycle Serves communication requests of local worker threads until shut down.

Parameters
proxy

Definition at line 171 of file prt_proxy.c.

References icl_list_s::data, prt_thread_s::finished, icl_deque_delete(), icl_deque_first(), icl_deque_size(), icl_list_append(), icl_list_delete(), icl_list_destroy(), icl_list_first(), icl_list_size(), prt_proxy_s::max_packet_size, prt_proxy_s::num_threads, prt_request_s::packet, prt_packet_new(), prt_packet_release(), PRT_PROXY_MAX_RECVS_PER_THREAD, PRT_PROXY_MAX_SENDS_PER_THREAD, prt_proxy_recv(), prt_request_cancel(), prt_request_destroy(), prt_request_new(), prt_request_recv(), prt_request_send(), prt_request_test(), prt_proxy_s::recvs_posted, prt_proxy_s::sends_posted, prt_proxy_s::sends_requested, prt_vsa_s::thread, and prt_proxy_s::vsa.

172 {
173  int sends_posted;
174  int sends_requested;
175  int threads_finished;
176  do {
177  int i;
178  // Post another send.
179  for (i = 0; i < proxy->num_threads; i++) {
180  if (icl_list_size(proxy->sends_posted[i]) <
182  icl_node_t *node = icl_deque_first(proxy->sends_requested[i]);
183  if (node != NULL) {
184  prt_request_t *request = (prt_request_t*)node->data;
185  // Post the send request.
186  prt_request_send(request);
187  // Move from the list of requested to the list of posted.
188  // Only moving the request, not destroying the request.
189  icl_deque_delete(proxy->sends_requested[i], node, NULL);
190  icl_list_append(proxy->sends_posted[i], request);
191  }
192  }
193  }
194  // Complete another send.
195  for (i = 0; i < proxy->num_threads; i++) {
196  icl_node_t *node = icl_list_first(proxy->sends_posted[i]);
197  if (node != NULL) {
198  prt_request_t *request = (prt_request_t*)node->data;
199  // If send request completed.
200  if (prt_request_test(request)) {
201  // Release the packet.
202  prt_packet_release(request->packet);
203  // Remove from the list & destroy the request.
204  icl_list_delete(proxy->sends_posted[i], node,
205  (void(*)(void*))prt_request_destroy);
206  }
207  }
208  }
209  // Post another receive.
210  if (icl_list_size(proxy->recvs_posted) <
212  prt_packet_t *packet = prt_packet_new(proxy->max_packet_size);
213  prt_request_t *request =
215  packet, proxy->max_packet_size,
216  MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG);
217  prt_request_recv(request);
218  icl_list_append(proxy->recvs_posted, request);
219  }
220  // Complete another receive.
221  icl_node_t *node = icl_list_first(proxy->recvs_posted);
222  if (node != NULL) {
223  prt_request_t *request = (prt_request_t*)node->data;
224  // IF recv request completed.
225  if (prt_request_test(request)) {
226  // Put the packet in a channel.
227  prt_proxy_recv(proxy, request);
228  // Remove from the list & destroy the request.
229  icl_list_delete(proxy->recvs_posted, node,
230  (void(*)(void*))prt_request_destroy);
231  }
232  }
233  threads_finished = 1;
234  // Check if threads finished.
235  for (i = 0; i < proxy->num_threads; i++)
236  if (!proxy->vsa->thread[i]->finished)
237  threads_finished = 0;
238 
239  sends_requested = 0;
240  // Check if sends pending.
241  for (i = 0; i < proxy->num_threads; i++)
242  if (icl_deque_size(proxy->sends_requested[i]) > 0)
243  sends_requested = 1;
244 
245  sends_posted = 0;
246  // Check if sends pending.
247  for (i = 0; i < proxy->num_threads; i++)
248  if (icl_list_size(proxy->sends_posted[i]) > 0)
249  sends_posted = 1;
250  }
251  while (!threads_finished || sends_requested || sends_posted);
252  // WHILE threads not finished OR sends pending.
253 
254  // Destroy the list of pending receives.
255  // Cancel and free all requests. Release all packets.
256  icl_list_destroy(proxy->recvs_posted, (void(*))(void*)prt_request_cancel);
257 
258  // Barrier before any other MPI call
259  // picks up one of those pending recvs.
260  MPI_Barrier(MPI_COMM_WORLD);
261 }
void prt_packet_release(prt_packet_t *packet)
Release a packet. Decrements the number of active references. Destroys the packet when the last refer...
Definition: prt_packet.c:44
icl_list_t * recvs_posted
Definition: prt_proxy.h:45
icl_list_t ** sends_posted
Definition: prt_proxy.h:44
int icl_list_size(icl_list_t *head)
Get the number of items in this linked list.
Definition: icl_list.c:200
void * data
Definition: icl_list.h:20
void prt_request_send(prt_request_t *request)
Post a send request. Detects a possible overflow of the request size.
Definition: prt_request.c:61
int icl_deque_size(icl_deque_t *deque)
Return the deque size.
Definition: icl_deque.c:189
int icl_list_destroy(icl_list_t *head, void(*free_function)(void *))
Frees the resources associated with this linked list.
Definition: icl_list.c:173
icl_node_t * icl_deque_first(icl_deque_t *deque)
Get the first node in the deque.
Definition: icl_deque.c:76
struct prt_thread_s ** thread
Definition: prt_vsa.h:51
icl_list_t * icl_list_append(icl_list_t *head, void *data)
Insert a node at the end of this list.
Definition: icl_list.c:326
icl_list_t * icl_list_first(icl_list_t *head)
Get the first item in this linked list.
Definition: icl_list.c:221
void prt_request_cancel(prt_request_t *request)
Cancel a request. Cancel the request, release the packet, free the request object.
Definition: prt_request.c:151
VDP&#39;s data packet A packet of data transferred through VDP&#39;s channels.
Definition: prt_packet.h:24
prt_request_t * prt_request_new(prt_packet_t *packet, int count, MPI_Datatype datatype, int peer, int tag)
request constructor
Definition: prt_request.c:25
#define PRT_PROXY_MAX_SENDS_PER_THREAD
Definition: prt_proxy.h:30
int prt_request_test(prt_request_t *request)
Test a request. Trace only completed requests.
Definition: prt_request.c:128
struct prt_packet_s * packet
Definition: prt_request.h:28
icl_deque_t ** sends_requested
Definition: prt_proxy.h:43
int icl_deque_delete(icl_deque_t *deque, icl_node_t *node, void(*free_func)(void *))
Delete the node from the deque.
Definition: icl_deque.c:164
struct prt_vsa_s * vsa
Definition: prt_proxy.h:40
prt_packet_t * prt_packet_new(size_t data_size)
packet constructor Sets the number of references to one.
Definition: prt_packet.c:23
#define PRT_PROXY_MAX_RECVS_PER_THREAD
Definition: prt_proxy.h:31
void prt_request_destroy(prt_request_t *request)
request destructor Request is only an envelope for a packet. Request destruction does not affect the ...
Definition: prt_request.c:48
int icl_list_delete(icl_list_t *head, icl_list_t *pos, void(*free_function)(void *))
Delete the specified node.
Definition: icl_list.c:83
void prt_request_recv(prt_request_t *request)
Post a receive request. Detects a possible overflow of the request size.
Definition: prt_request.c:93
void prt_proxy_recv(prt_proxy_t *proxy, prt_request_t *request)
recv to a channel
Definition: prt_proxy.c:143
VSA proxy&#39;s communication request Contains basic information about the communication request...
Definition: prt_request.h:27
size_t max_packet_size
Definition: prt_proxy.h:46
int num_threads
Definition: prt_proxy.h:41

Here is the call graph for this function:

Here is the caller graph for this function:

void prt_proxy_send ( prt_proxy_t proxy,
int  thread_rank,
prt_channel_t channel 
)

send from a channel

Parameters
proxy
thread_rank
channelchannel requesting a send

Definition at line 122 of file prt_proxy.c.

References prt_channel_s::count, prt_channel_s::datatype, prt_channel_s::dst_node, icl_deque_append(), prt_channel_pop(), prt_request_new(), prt_proxy_s::sends_requested, and prt_channel_s::tag.

123 {
124  // Pop the packet from the channel.
125  prt_packet_t *packet = prt_channel_pop(channel);
126  // Create a request.
127  prt_request_t *request =
129  packet,
130  channel->count, channel->datatype,
131  channel->dst_node, channel->tag);
132  // Queue for send.
133  icl_deque_append(proxy->sends_requested[thread_rank], request);
134 }
icl_node_t * icl_deque_append(icl_deque_t *deque, void *data)
Insert the node at the end of the deque.
Definition: icl_deque.c:118
MPI_Datatype datatype
Definition: prt_channel.h:32
VDP&#39;s data packet A packet of data transferred through VDP&#39;s channels.
Definition: prt_packet.h:24
prt_request_t * prt_request_new(prt_packet_t *packet, int count, MPI_Datatype datatype, int peer, int tag)
request constructor
Definition: prt_request.c:25
prt_packet_t * prt_channel_pop(prt_channel_t *channel)
Fetches a packef from a channel. Does not decrement the number of active references. The packet leaves the channel, but enters the VDP.
Definition: prt_channel.c:128
icl_deque_t ** sends_requested
Definition: prt_proxy.h:43
VSA proxy&#39;s communication request Contains basic information about the communication request...
Definition: prt_request.h:27

Here is the call graph for this function:

Here is the caller graph for this function: