PULSAR  1.0.0
Parallel Ultra Light Systolic Array Runtime
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups
prt_vsa.c
Go to the documentation of this file.
1 
11 #include "prt_vsa.h"
12 
13 extern int prt_tuple_equal(void *tuple_a, void *tuple_b);
14 extern unsigned int prt_tuple_hash(void *tuple);
15 
17 
29  int num_threads, void *global_store, int (*vdp_to_core)(int*, void*, int))
30 {
31  // Allocate the VSA.
32  prt_vsa_t *vsa = (prt_vsa_t*)malloc(sizeof(prt_vsa_t));
33  prt_assert(vsa != NULL, "malloc failed");
34 
35  // Init the VSA.
36  MPI_Comm_rank(MPI_COMM_WORLD, &vsa->node_rank);
37  MPI_Comm_size(MPI_COMM_WORLD, &vsa->num_nodes);
38  vsa->num_threads = num_threads;
39  vsa->num_cores = vsa->num_nodes*vsa->num_threads;
40  vsa->global_store = global_store;
41  vsa->vdp_to_core = vdp_to_core;
42 
43  // Init config & proxy.
44  vsa->config = prt_config_new();
45  vsa->proxy = prt_proxy_new(num_threads);
46  vsa->proxy->vsa = vsa;
47 
48  // Init pthreads.
49  pthread_attr_init(&vsa->thread_attr);
50  pthread_attr_setscope(&vsa->thread_attr, PTHREAD_SCOPE_SYSTEM);
51  pthread_setconcurrency(vsa->num_threads+1);
52 
53  int i;
54  // Initialize the threads.
55  vsa->thread = (prt_thread_t**)malloc(vsa->num_threads*sizeof(prt_thread_t*));
56  prt_assert(vsa->thread != NULL, "malloc failed");
57  for (i = 0; i < vsa->num_threads; i++) {
58  vsa->thread[i] = prt_thread_new(i, vsa->node_rank*vsa->num_threads+i);
59  vsa->thread[i]->vsa = vsa;
60  }
61 
62  // Initialize the VDPs hash.
63  int nbuckets = PRT_VSA_MAX_VDPS_PER_NODE;
65 
66  // Allocate the array of channel lists.
67  vsa->channel_lists = (icl_list_t**)calloc(vsa->num_nodes, sizeof(icl_list_t*));
68  prt_assert(vsa->channel_lists != NULL, "malloc failed");
69 
70  // Return the VSA.
71  return vsa;
72 }
73 
75 
82 {
83  // Destroy the VDPs hash.
84  icl_hash_destroy(vsa->vdps_hash, NULL, (void(*)(void*))prt_vdp_delete);
85 
86  // Delete config & proxy.
88  prt_proxy_delete(vsa->proxy);
89 
90  int i;
91  // Pthreads cleanup.
92  pthread_attr_destroy(&vsa->thread_attr);
93  for (i = 0; i < vsa->num_threads; i++)
94  prt_thread_delete(vsa->thread[i]);
95  free(vsa->thread);
96 
97  // Free the VSA.
98  free(vsa);
99 }
100 
102 
114 {
115  int i;
116  // Check arguments.
117  prt_assert(vsa != NULL, "NULL VSA");
118  prt_assert(vdp != NULL, "NULL VDP");
119 
120  // Compute global core number and process rank.
121  int core = vsa->vdp_to_core(vdp->tuple, vsa->global_store, vsa->num_cores);
122  int node_rank = core / vsa->num_threads;
123  int thread_rank = core % vsa->num_threads;
124 
125  // IF VDP not in this node.
126  if (node_rank != vsa->node_rank) {
127  // Destroy along with all channels and return.
128  prt_vdp_annihilate(vdp);
129  return;
130  }
131  // Insert in the VSA's VDP hash.
132  icl_entry_t *entry = icl_hash_insert(
133  vsa->vdps_hash, (void*)vdp->tuple, (void*)vdp);
134  prt_assert(entry != NULL, "icl_hash_insert failed");
135 
136  // Insert in the thread's VDPs list.
137  icl_list_t *node = icl_list_append(vsa->thread[thread_rank]->vdps, vdp);
138  prt_assert(node != NULL, "icl_list_append failed");
139  vdp->thread = vsa->thread[thread_rank];
140 
141  // Merge intra-node channels.
142  prt_vsa_vdp_merge_channels(vsa, vdp, node_rank);
143  // Track channel tags for inter-node communication.
144  prt_vsa_vdp_track_tags(vsa, vdp, core, node_rank);
145 }
146 
148 
154 void prt_vsa_vdp_merge_channels(prt_vsa_t *vsa, prt_vdp_t *vdp, int node_rank)
155 {
156  int i;
157  // FOR each input channel.
158  for (i = 0; i < vdp->num_inputs; i++) {
159  prt_channel_t *channel = vdp->input[i];
160  if (channel != NULL) {
161  // Look for maximum channel size.
162  prt_proxy_max_packet_size(vsa->proxy, channel);
163  // Look up the source VDP.
164  prt_vdp_t *src_vdp =
165  icl_hash_find(vsa->vdps_hash, (void*)channel->src_tuple);
166  // IF source VDP found.
167  if (src_vdp != NULL) {
168  // Check for channel tuple mismatch.
169  int *src_vdp_dst_tuple =
170  src_vdp->output[channel->src_slot]->dst_tuple;
171  prt_assert(prt_tuple_equal(src_vdp_dst_tuple, vdp->tuple),
172  "VDP channel tuple mismatch");
173  // Swap this channel to the existing channel.
174  vdp->input[i] = src_vdp->output[channel->src_slot];
175  prt_channel_delete(channel);
176  }
177  }
178  }
179  // FOR each output channel.
180  for (i = 0; i < vdp->num_outputs; i++) {
181  prt_channel_t *channel = vdp->output[i];
182  if (channel != NULL) {
183  // Look for maximum channel size.
184  prt_proxy_max_packet_size(vsa->proxy, channel);
185  // Look up the destination VDP.
186  prt_vdp_t *dst_vdp =
187  icl_hash_find(vsa->vdps_hash, (void*)channel->dst_tuple);
188  // IF destination VDP found.
189  if (dst_vdp != NULL) {
190  // Check for channel tuple mismatch.
191  int *dst_vdp_src_tuple =
192  dst_vdp->input[channel->dst_slot]->src_tuple;
193  prt_assert(prt_tuple_equal(dst_vdp_src_tuple, vdp->tuple),
194  "VDP channel tuple mismatch");
195  // Swap this channel for the existing channel.
196  vdp->output[i] = dst_vdp->input[channel->dst_slot];
197  prt_channel_delete(channel);
198  }
199  }
200  }
201 }
202 
204 
213  prt_vsa_t *vsa, prt_vdp_t *vdp, int core, int node_rank)
214 {
215  int i;
216  // FOR each input channel.
217  for (i = 0; i < vdp->num_inputs; i++) {
218  prt_channel_t *channel = vdp->input[i];
219  if (channel != NULL) {
220  channel->dst_node = node_rank;
221  int src_core = vsa->vdp_to_core(
222  channel->src_tuple, vsa->global_store, vsa->num_cores);
223  int src_node = src_core / vsa->num_threads;
224  channel->src_node = src_node;
225  // IF another node is the source.
226  if (src_node != vsa->node_rank) {
227  // Create the list if empty.
228  if (vsa->channel_lists[src_node] == NULL) {
229  vsa->channel_lists[src_node] = icl_list_new();
230  prt_assert(vsa->channel_lists[src_node] != NULL,
231  "icl_list_new failed");
232  }
233  // Add the channel to the list.
234  icl_list_t *node = icl_list_isort(
235  vsa->channel_lists[src_node], channel, prt_channel_compare);
236  prt_assert(node != NULL, "icl_list_isort failed");
237  }
238  }
239  }
240  // FOR each output channel.
241  for (i = 0; i < vdp->num_outputs; i++) {
242  prt_channel_t *channel = vdp->output[i];
243  if (channel != NULL) {
244  channel->src_node = node_rank;
245  int dst_core = vsa->vdp_to_core(
246  channel->dst_tuple, vsa->global_store, vsa->num_cores);
247  int dst_node = dst_core / vsa->num_threads;
248  channel->dst_node = dst_node;
249  // IF another node is the destination.
250  if (dst_node != vsa->node_rank) {
251  // Create the list if empty.
252  if (vsa->channel_lists[dst_node] == NULL) {
253  vsa->channel_lists[dst_node] = icl_list_new();
254  prt_assert(vsa->channel_lists[dst_node] != NULL,
255  "icl_list_new failed");
256  }
257  // Add the channel to the list.
258  icl_list_t *node = icl_list_isort(
259  vsa->channel_lists[dst_node], channel, prt_channel_compare);
260  prt_assert(node != NULL, "icl_list_isort failed");
261  }
262  }
263  }
264 }
265 
267 
275 {
276  int i;
277  for (i = 0; i < vsa->num_nodes; i++)
278  if (vsa->channel_lists[i] != NULL) {
279  int tag = 0;
280  icl_list_t *node;
281  // Assign consecutive tags to the elements.
282  icl_list_foreach(vsa->channel_lists[i], node) {
283  prt_channel_t *channel = (prt_channel_t*)node->data;
284  channel->tag = tag++;
285 
286  int *node_tag;
287  if (channel->dst_node == vsa->node_rank)
288  node_tag = prt_tuple_new2(channel->src_node, channel->tag);
289  else
290  node_tag = prt_tuple_new2(channel->dst_node, channel->tag);
291 
292  icl_entry_t *entry = icl_hash_insert(
293  vsa->proxy->tags_hash, (void*)node_tag, (void*)channel);
294  prt_assert(entry != NULL, "icl_hash_insert failed");
295  }
296  // Destroy the list.
297  int status = icl_list_destroy(vsa->channel_lists[i], NULL);
298  prt_assert(status == 0, "icl_list_destroy failed");
299  }
300  // Free the array of lists.
301  free(vsa->channel_lists);
302 }
303 
305 
315 {
316  // Assign channel tags.
318 
319  // Init tracing.
321 
322  int i;
323  // Launch threads.
324  for (i = 0; i < vsa->num_threads; i++) {
325  int status =
326  pthread_create(
327  &vsa->thread[i]->id, &vsa->thread_attr, prt_thread_run, vsa->thread[i]);
328  prt_assert(status == 0, "pthread_create failed");
329  }
330  // Barrier for trace alignment.
331  svg_trace_start(0);
332  MPI_Barrier(MPI_COMM_WORLD);
334 
335  // Service the communication proxy.
336  prt_proxy_run(vsa->proxy);
337 
338  // Join threads.
339  for (i = 0; i < vsa->num_threads; i++) {
340  int status = pthread_join(vsa->thread[i]->id, NULL);
341  prt_assert(status == 0, "pthread_join failed");
342  }
343  // Finish tracing.
346 }
347 
349 
359 {
360  switch (param) {
361  case PRT_VDP_SCHEDULING:
362  switch (value) {
365  vsa->config->vdp_scheduling = value;
366  break;
367  default:
368  prt_error("invalid value");
369  break;
370  }
371  break;
372  case PRT_SVG_TRACING:
373  switch (value) {
374  case PRT_SVG_TRACING_ON:
375  case PRT_SVG_TRACING_OFF:
376  vsa->config->svg_tracing = value;
377  break;
378  default:
379  prt_error("invalid value");
380  break;
381  }
382  break;
383  default:
384  prt_error("invalid param");
385  break;
386  }
387 }
enum prt_config_param_e prt_config_param_t
PRT configuration parameters.
int icl_hash_destroy(icl_hash_t *ht, void(*free_key)(void *), void(*free_data)(void *))
Free hash table structures. Key and data are freed using functions.
Definition: icl_hash.c:279
icl_list_t * icl_list_new()
Create new linked list.
Definition: icl_list.c:23
struct prt_channel_s ** input
Definition: prt_vdp.h:42
void * data
Definition: icl_list.h:20
void prt_vsa_config_set(prt_vsa_t *vsa, prt_config_param_t param, prt_config_value_t value)
Set VSA configuration parameter.
Definition: prt_vsa.c:357
int icl_list_destroy(icl_list_t *head, void(*free_function)(void *))
Frees the resources associated with this linked list.
Definition: icl_list.c:173
prt_vdp_map_func_t vdp_to_core
Definition: prt_vsa.h:53
struct prt_config_s * config
Definition: prt_vsa.h:55
struct prt_thread_s ** thread
Definition: prt_vsa.h:51
icl_list_t * icl_list_append(icl_list_t *head, void *data)
Insert a node at the end of this list.
Definition: icl_list.c:326
void svg_trace_init(int num_cores)
Initialize tracing.
Definition: svg_trace.c:38
icl_entry_t * icl_hash_insert(icl_hash_t *ht, void *key, void *data)
Insert an item into the hash table.
Definition: icl_hash.c:134
void prt_proxy_delete(prt_proxy_t *proxy)
communication proxy destructor Checking if all the lists are empty at the time of destruction...
Definition: prt_proxy.c:66
struct prt_vsa_s * vsa
Definition: prt_thread.h:28
#define prt_error(msg)
Definition: prt_assert.h:24
Virtual Systolic Array (VSA) VSA contains global informationa about the system, a local communication...
Definition: prt_vsa.h:45
int num_nodes
Definition: prt_vsa.h:47
void prt_vsa_vdp_insert(prt_vsa_t *vsa, prt_vdp_t *vdp)
Inserts a new VDP into a VSA. Destroys VDPs that do not belong to this node. Puts the VDP in the list...
Definition: prt_vsa.c:113
#define icl_list_foreach(list, ptr)
Definition: icl_list.h:41
unsigned int prt_tuple_hash(void *tuple)
tuple hash Required by the VSA&#39;s tuples hash table. Computes the lenght in characters and calls a str...
Definition: prt_tuple.c:188
int * dst_tuple
Definition: prt_channel.h:35
prt_proxy_t * prt_proxy_new(int num_threads)
communication proxy constructor
Definition: prt_proxy.c:21
int svg_tracing
Definition: prt_config.h:43
void prt_vsa_vdp_merge_channels(prt_vsa_t *vsa, prt_vdp_t *vdp, int node_rank)
Connects corresponding input and output channels of intra-node VDPs.
Definition: prt_vsa.c:154
void * prt_thread_run(void *thrd)
thread&#39;s production cycle Cycle through VDPs. Fire the ones that are ready. Remove the ones which bur...
Definition: prt_thread.c:70
int num_cores
Definition: prt_vsa.h:49
void prt_config_delete(prt_config_t *config)
config object destructor
Definition: prt_config.c:39
void prt_proxy_max_packet_size(prt_proxy_t *proxy, prt_channel_t *channel)
Look for maximum channel/packet size.
Definition: prt_proxy.c:105
void prt_thread_delete(prt_thread_t *thread)
thread object destructor
Definition: prt_thread.c:48
void svg_trace_start(int thread_rank)
Start tracing an event.
Definition: svg_trace.c:50
int * src_tuple
Definition: prt_channel.h:33
void prt_vsa_channel_tags(prt_vsa_t *vsa)
Assign channel tags. Build the node-tag lookup. Destroy channel lists.
Definition: prt_vsa.c:274
void prt_vsa_run(prt_vsa_t *vsa)
VSA&#39;s production cycle Launches worker threads. Sends the master thread in the communication proxy pr...
Definition: prt_vsa.c:314
#define prt_tuple_new2(a, b)
Definition: prt_tuple.h:35
void prt_channel_delete(prt_channel_t *channel)
channel destructor
Definition: prt_channel.c:64
icl_list_t * vdps
Definition: prt_thread.h:32
VDP&#39;s data channel Implements a data link between a pair of VDPs. Identifies the source and destinati...
Definition: prt_channel.h:29
prt_vsa_t * prt_vsa_new(int num_threads, void *global_store, int(*vdp_to_core)(int *, void *, int))
VSA constructor.
Definition: prt_vsa.c:28
icl_list_t * icl_list_isort(icl_list_t *head, void *data, int(*compare)(void *, void *))
Insert data into a sorted list. Does not support direct comparison of pointers.
Definition: icl_list.c:145
void prt_vdp_delete(prt_vdp_t *vdp)
VDP destructor Used for destruction of local VDPs. Destroy all input channels. Destroy all dangling o...
Definition: prt_vdp.c:80
pthread_attr_t thread_attr
Definition: prt_vsa.h:50
prt_thread_t * prt_thread_new(int rank, int core)
thread object constructor
Definition: prt_thread.c:22
void * global_store
Definition: prt_vsa.h:52
int prt_channel_compare(void *channel1, void *channel2)
Compare two channels.
Definition: prt_channel.c:174
int num_inputs
Definition: prt_vdp.h:41
struct prt_vsa_s * vsa
Definition: prt_proxy.h:40
int num_outputs
Definition: prt_vdp.h:43
int node_rank
Definition: prt_vsa.h:46
#define prt_assert(cond, msg)
Definition: prt_assert.h:30
Virtual Data Processor (VDP) Is uniquely identified by a tuple. Fires for a predefined number of cycl...
Definition: prt_vdp.h:37
void svg_trace_finish()
Finish tracing. Collect traces from all nodes. Write the combined trace to an SVG file...
Definition: svg_trace.c:78
void prt_vdp_annihilate(prt_vdp_t *vdp)
VDP annihilator Used for complete annihilation of VDPs that don&#39;t belong. Destroy all input channels...
Definition: prt_vdp.c:128
VSA&#39;s worker thread Owns a number of VDPs. Knows the communication proxy.
Definition: prt_thread.h:27
void * icl_hash_find(icl_hash_t *ht, void *key)
Search for an entry in a hash table.
Definition: icl_hash.c:108
icl_list_t ** channel_lists
Definition: prt_vsa.h:57
enum prt_config_value_e prt_config_value_t
values of PRT configuration parameters
void svg_trace_stop(int thread_rank, int color)
Stop tracing an event.
Definition: svg_trace.c:63
icl_hash_t * vdps_hash
Definition: prt_vsa.h:54
int prt_tuple_equal(void *tuple_a, void *tuple_b)
tuple equality check Check if tuples are identical in length and content.
Definition: prt_tuple.c:161
void prt_proxy_run(prt_proxy_t *proxy)
communication proxy production cycle Serves communication requests of local worker threads until shut...
Definition: prt_proxy.c:171
icl_hash_t * icl_hash_create(int nbuckets, unsigned int(*hash_function)(void *), int(*hash_key_compare)(void *, void *))
Create a new hash table.
Definition: icl_hash.c:70
#define PRT_VSA_MAX_VDPS_PER_NODE
maximum VDPs per node She size of the VSA&#39;s hash table of VDPs. Should be a prime number ...
Definition: prt_vsa.h:32
struct prt_thread_s * thread
Definition: prt_vdp.h:38
prt_config_t * prt_config_new()
config object constructor
Definition: prt_config.c:19
void prt_vsa_delete(prt_vsa_t *vsa)
VSA destructor.
Definition: prt_vsa.c:81
Definition: icl_hash.h:19
struct prt_proxy_s * proxy
Definition: prt_vsa.h:56
struct prt_channel_s ** output
Definition: prt_vdp.h:44
void prt_vsa_vdp_track_tags(prt_vsa_t *vsa, prt_vdp_t *vdp, int core, int node_rank)
Builds the list of channel connections to other nodes.
Definition: prt_vsa.c:212
int * tuple
Definition: prt_vdp.h:39
pthread_t id
Definition: prt_thread.h:31
Virtual Systolic Array (VSA)
int num_threads
Definition: prt_vsa.h:48
icl_hash_t * tags_hash
Definition: prt_proxy.h:42
int vdp_scheduling
Definition: prt_config.h:42