gs_dag_scheduler.c

Go to the documentation of this file.
00001 /********************************************************************/
00002 /*                        gs_dag_scheduler.c                        */
00003 /*     the functions for GridSolve DAG analysis and scheduling      */
00004 /*                       Yinan Li, 05/17/2007                       */
00005 /********************************************************************/
00006 
00007 
00008 #include "gs_dag.h"
00009 #include "gs_sequence.h"
00010 
00011 
00019 int analyze_dep_GS_DAG(GS_DAG_t *dag) {
00020     GS_DAG_Node_t *lptr, *rptr;
00021     gs_argument_t *largp, *rargp;
00022     /*void *ldataptr, *rdataptr; */
00023     int mode;
00024     int i, j;
00025 
00026 
00027     if (dag == NULL) return -1;
00028 
00029     if (dag->num_nodes == 1) {
00030         for (largp = dag->head_node->handle->problem_desc->arglist;
00031              largp != NULL; largp = largp->next) {
00032             if (largp->inout != GS_IN) {
00033                 largp->pass_back = 1;
00034                 largp->data_handle = 0;
00035             }
00036         }
00037         
00038         dag->analyzed = 1;
00039         
00040         return 0;
00041     }
00042 
00043             
00044     /* analyze dependencies */
00045     /* compare each node with the subsequent nodes in the list */
00046     for (lptr = dag->head_node; lptr != NULL; lptr = lptr->next) {
00047 
00048         for (rptr = lptr->next; rptr != NULL; rptr = rptr->next) {
00049 
00050             /* compare each pair of arguments of the two nodes */
00051             for (largp = lptr->handle->problem_desc->arglist, i = 0;
00052                  largp != NULL;
00053                  largp = largp->next, i++) {
00054 
00055                 for (rargp = rptr->handle->problem_desc->arglist, j = 0;
00056                      rargp != NULL;
00057                      rargp = rargp->next, j++) {
00058 
00059                     /* set default value to each field in each
00060                        argument that is used in sequencing */
00061                     largp->pass_back = 1;
00062                     rargp->pass_back = 1;
00063                     largp->data_handle = 0;
00064                     rargp->data_handle = 0;
00065                     largp->num_targets = 0;
00066                     rargp->num_targets = 0;
00067                     largp->target_server_list = NULL;
00068                     rargp->target_server_list = NULL;
00069                     largp->source = NULL;
00070                     rargp->source = NULL;
00071                     largp->index = i;
00072                     rargp->index = j;
00073                         
00074                     /* --------- process dependency among scalar ---------
00075                        --------- arguments in conservative mode  --------- */
00076                     mode = grpc_get_sequence_mode();
00077 
00078                     /* if it is in conservative mode */
00079                     if (mode == CONSERVATIVE_MODE) {
00080                         /* if both arguments are scalar and have the
00081                            same object and data types, and they are
00082                            not all input arguments, indentify this
00083                            case as a special dependency as
00084                            CONSERVATIVE_SCALAR_DEPENDENCY */
00085                         if ((largp->objecttype == GS_SCALAR &&
00086                              rargp->objecttype == GS_SCALAR) &&
00087                             (largp->datatype == rargp->datatype) &&
00088                             (largp->inout != GS_IN ||
00089                              rargp->inout != GS_IN)) {
00090                             /* adjust the scheduling level according
00091                                to the dependency relationship */
00092                             if (rptr->sched_level < lptr->sched_level + 1)
00093                                 rptr->sched_level = lptr->sched_level + 1;
00094                             /* the currently maximum scheduling level */
00095                             dag->max_sched_level = rptr->sched_level;
00096                             /* insert the dependency into the DAG */
00097                             insert_dep_GS_DAG(dag, lptr, rptr,
00098                             CONSERVATIVE_SCALAR_DEPENDENCY,
00099                             largp, rargp);
00100                         }
00101                     }
00102 
00103                     /* ---- IN <-> IN is considered to be independent ---- */
00104 
00105                     /* ---------- INPUT-AFTER-OUTPUT dependency ---------- */
00106                     /* ------------------ INOUT -> IN -------------------- */
00107                     /* ------------------ OUT -> IN ---------------------- */
00108                     /* ---------------- INOUT -> INOUT ------------------- */
00109                     /* ----------------- OUT -> INOUT -------------------- */
00110                     if ((largp->inout == GS_INOUT || largp->inout == GS_OUT) &&
00111                         (rargp->inout == GS_IN || rargp->inout == GS_INOUT)) {
00112                         /* if they reference the same object */
00113                         if (verify_object_type(largp) == 1 &&
00114                             verify_object_type(rargp) == 1 &&
00115                             compare_object(largp, rargp) == 0) {
00116                             /* adjust the scheduling level according
00117                                to the dependency relationship */
00118                             if (rptr->sched_level < lptr->sched_level + 1)
00119                                 rptr->sched_level = lptr->sched_level + 1;
00120                             /* the currently maximum scheduling level */
00121                             dag->max_sched_level = rptr->sched_level;
00122                             /* insert the dependency into the DAG */
00123                             insert_dep_GS_DAG(dag, lptr, rptr,
00124                             INPUT_AFTER_OUTPUT_DEPENDENCY,
00125                             largp, rargp);
00126                         }
00127                     }
00128 
00129                     /* ---------- OUTPUT-AFTER-INPUT dependency ---------- */
00130                     /* ------------------ IN -> OUT -----------------------*/
00131                     /* ------------------ IN -> INOUT ---------------------*/
00132                     else if ((largp->inout == GS_IN && rargp->inout == GS_OUT) ||
00133                         (largp->inout == GS_IN && rargp->inout == GS_INOUT)) {
00134                         /* if they reference the same object */
00135                         if (verify_object_type(largp) == 1 &&
00136                             verify_object_type(rargp) == 1 &&
00137                             compare_object(largp, rargp) == 0) {
00138                             /* adjust the scheduling level according
00139                                to the dependency relationship */
00140                             if (rptr->sched_level < lptr->sched_level + 1)
00141                                 rptr->sched_level = lptr->sched_level + 1;
00142                             /* the currently maximum scheduling level */
00143                             dag->max_sched_level = rptr->sched_level;
00144                             /* insert the dependency into the DAG */
00145                             insert_dep_GS_DAG(dag, lptr, rptr,
00146                             OUTPUT_AFTER_INPUT_DEPENDENCY,
00147                             largp, rargp);
00148                         }
00149                     }
00150 
00151                     /* ---------- OUTPUT-AFTER-OUTPUT dependency --------- */
00152                     /* ------------------ INOUT -> OUT --------------------*/
00153                     /* ------------------- OUT -> OUT ---------------------*/
00154                     else if ((largp->inout == GS_OUT && rargp->inout == GS_OUT) ||
00155                         (largp->inout == GS_INOUT && rargp->inout == GS_OUT)) {
00156                         /* if they reference the same object */
00157                         if (verify_object_type(largp) == 1 &&
00158                             verify_object_type(rargp) == 1 &&
00159                             compare_object(largp, rargp) == 0) {
00160                             /* adjust the scheduling level according
00161                                to the dependency relationship */
00162                             if (rptr->sched_level < lptr->sched_level + 1)
00163                                 rptr->sched_level = lptr->sched_level + 1;
00164                             /* the currently maximum scheduling level */
00165                             dag->max_sched_level = rptr->sched_level;
00166                             /* insert the dependency into the DAG */
00167                             insert_dep_GS_DAG(dag, lptr, rptr,
00168                             OUTPUT_AFTER_OUTPUT_DEPENDENCY,
00169                             largp, rargp);
00170                         }
00171                     } /* if & else-if */
00172 
00173                 } /* inner-most for */
00174 
00175             } /* third-level for */
00176 
00177         } /* second-level for */
00178 
00179     } /* outer-most for */
00180 
00181     dag->analyzed = 1;
00182 
00183     return 0;
00184 }
00185 
00186 
00194 int map_to_servers_GS_DAG(GS_DAG_t *dag, int scheduler) {
00195     int num_servers;
00196     int num_reqs_mapped;
00197     int sched_level;
00198     int num_reqs_per_level;
00199     int num_reqs_per_server_level;
00200     int *assignments;
00201     int serv_idx;
00202     int i, j;
00203     GS_DAG_Node_t *node;
00204     
00205 
00206     if (dag == NULL) return -1;
00207 
00208     /* if the DAG has not yet been analyzed,
00209        analyze it first before mapping */
00210     if (!dag->analyzed) {
00211         analyze_dep_GS_DAG(dag);
00212     }
00213     
00214     /* number of servers available; we assume 
00215        that all servers have identical services */
00216     num_servers = dag->head_node->handle->num_servers;
00217     
00218     if (scheduler == ROUND_ROBIN) { 
00219         num_reqs_mapped = 0;
00220         sched_level = 0;
00221         while (num_reqs_mapped < dag->num_nodes) {
00222             num_reqs_per_level = 0;
00223             for (node = dag->head_node; node != NULL; node = node->next) {
00224                 if (node->sched_level == sched_level) 
00225                     num_reqs_per_level++;
00226             }
00227     
00228             serv_idx = 0;
00229             for (node = dag->head_node; node != NULL; node = node->next) {
00230                 if (node->sched_level == sched_level) {
00231                     node->handle->srv_idx = serv_idx % num_servers;
00232                     serv_idx++;
00233                 }
00234             }
00235             
00236             num_reqs_mapped += num_reqs_per_level;
00237             sched_level++;
00238         }
00239     } 
00240     else if (scheduler == AVERAGE) {
00241         assignments = (int *) malloc(sizeof(int) * num_servers);
00242         num_reqs_mapped = 0;
00243         sched_level = 0;
00244         while (num_reqs_mapped < dag->num_nodes) {
00245             num_reqs_per_level = 0;
00246             for (node = dag->head_node; node != NULL; node = node->next) {
00247                 if (node->sched_level == sched_level) 
00248                     num_reqs_per_level++;
00249             }
00250             
00251             num_reqs_per_server_level = num_reqs_per_level / num_servers; 
00252             for (i = 0; i < num_servers; i++) {
00253                 assignments[i] = num_reqs_per_server_level;               
00254             }
00255             
00256             j = 0;
00257             for (i = num_reqs_per_server_level * num_servers; 
00258                  i < num_reqs_per_level; i++) { 
00259                 assignments[j++]++;
00260             }
00261             
00262             i = 0;
00263             for (node = dag->head_node; node != NULL; node = node->next) {
00264                 if (node->sched_level == sched_level) {
00265                     if (assignments[i] > 0) {
00266                         node->handle->srv_idx = i;                      
00267                     } else {
00268                         i++;
00269                         node->handle->srv_idx = i;
00270                     }
00271                     assignments[i]--;
00272                 }
00273             }
00274             
00275             num_reqs_mapped += num_reqs_per_level;
00276             sched_level++;                                 
00277         }  
00278     }
00279     else if (scheduler == OPT_COMM) {
00280     }
00281     else if (scheduler == OPT_COMP) {
00282     }
00283     else if (scheduler == AGENT) {
00284         /* use the default mapping obtained from the agent */
00285     }
00286     
00287     return 0;
00288 }
00289 
00290 
00299 int post_analysis_GS_DAG(GS_DAG_t *dag, va_list arg_list, int n) {
00300     GS_DAG_Dep_t *dep;
00301     int i;
00302 
00303 
00304     if (dag == NULL) return -1;
00305 
00306     /* count the number of depending nodes of each
00307        argument in each RAW dependency in the DAG */
00308     for (dep = dag->head_dep; dep != NULL; dep = dep->next) {
00309         if (dep->dep_type == INPUT_AFTER_OUTPUT_DEPENDENCY)
00310             dep->largp->num_targets++;
00311     }
00312 
00313     /* process all the intermediate arguments in terms of
00314        whether they should be pass back to the client and
00315        which server they should be pushed to */
00316 
00317     /* create a target server list for each argument
00318        that is depended by some other nodes */
00319     for (dep = dag->head_dep; dep != NULL; dep = dep->next) {
00320 
00321         if (dep->dep_type == INPUT_AFTER_OUTPUT_DEPENDENCY &&
00322             dep->largp->num_targets > 0 &&
00323             dep->largp->target_server_list == NULL) {
00324 
00325             dep->largp->target_server_list = (gs_server_t **)
00326             malloc(sizeof(gs_server_t *) * dep->largp->num_targets);
00327             if (dep->largp->target_server_list == NULL) {
00328                 perror("malloc");
00329                 exit(1);
00330             }
00331 
00332             /* initialize to NULL */
00333             for (i = 0; i < dep->largp->num_targets; i++) {
00334                 dep->largp->target_server_list[i] = NULL;
00335             }
00336         }
00337     }
00338 
00339     /* find out all the target nodes that are depending
00340        on each depended argument in each dependency */
00341     for (dep = dag->head_dep; dep != NULL; dep = dep->next) {
00342         if (dep->dep_type == INPUT_AFTER_OUTPUT_DEPENDENCY) {
00343             /* find the first unused position in target
00344                server list in order to insert a new target */
00345             for (i = 0; i < dep->largp->num_targets; i++) {
00346                 if (dep->largp->target_server_list[i] == NULL)
00347                     break;
00348             }
00349 
00350             /* insert the new target */
00351             dep->largp->target_server_list[i] = dep->cnode->server;
00352 
00353             /* mark that the input argument of the depending node
00354                will come from the output argument of the dependend
00355                node; note that only the last depended node will be
00356                the source. This ensures that if one object is involved
00357                in multiple RAW dependencies, ony the last depended node
00358                will be the source. In addition, the way we analyze
00359                dependency will ensure the correct order */
00360             /*dep->largp->source = dep->pnode->server;*/
00361             dep->rargp->source = dep->pnode->server;
00362         }
00363     }
00364 
00365     /* the above steps setup the target server list and source
00366        of each argument involved in each dependency; however,
00367        there may be redundant and unnessary target markup, so
00368        this step remove all the redundancies */
00369     for (dep = dag->head_dep; dep != NULL; dep = dep->next) {
00370         /* the child node is actually not expecting
00371            to receive the intermediate result from
00372            the parent node */
00373         if (dep->rargp->source != dep->pnode->server) {
00374             for (i = 0; i < dep->largp->num_targets; i++) {
00375                 /* remove the redundant target */
00376                 if (dep->largp->target_server_list[i] ==
00377                     dep->rargp->source)
00378                     dep->largp->target_server_list[i] = NULL;
00379             }
00380             
00381             delete_dep_GS_DAG(dag, dep);
00382         }
00383     }
00384 
00385     /* mark the 'pass_back' field of the depended
00386        argument of the parent node with 0, means not
00387        to pass back this intermediate argument */
00388     for (dep = dag->head_dep; dep != NULL; dep = dep->next) {
00389         /* under these two kinds of dependencies,
00390            arguments need not to be passed back */
00391         if (dep->dep_type == INPUT_AFTER_OUTPUT_DEPENDENCY) {
00392             if (!if_passed_back(arg_list, n, dep->largp)) {
00393                 /* the depended argument should not be passed back */
00394                 /* dep->largp->pass_back = 0; */
00395                 dep->largp->pass_back = 0;
00396                 /* the dependent argument expects to receive data handle */
00397                 dep->rargp->data_handle = 1;
00398             }
00399         }
00400         else if (dep->dep_type == OUTPUT_AFTER_OUTPUT_DEPENDENCY) {
00401             /* the depended argument should not be passed back */
00402             dep->largp->pass_back = 0;
00403         }
00404     }
00405 
00406     return 0;
00407 }
00408 
00409 
00410 int wait_level_finish_GS_DAG(grpc_sessionid_t *req_ids, int num_sched) {
00411     int i;
00412     grpc_error_t status;
00413 
00414 
00415     for (i = 0; i < num_sched; i++) {
00416         /* probe to see if all the jobs in the 
00417            current scheduling level are completed */
00418         status = grpc_probe(req_ids[i]);
00419 
00420         /* mot completed */
00421         if (status == GRPC_NOT_COMPLETED) {
00422             return 0;
00423         }
00424         /* completed */
00425         else if (status == GRPC_NO_ERROR) {
00426             continue;
00427         }
00428         /* error condition */
00429         else {
00430             fprintf(stderr, 
00431             "error probing job with request id (%d)\n", req_ids[i]);
00432             return -1;
00433         }
00434     }
00435     
00436     return 1;
00437 }
00438 
00439 
00443 int execute_GS_DAG(GS_DAG_t *dag) {
00444     grpc_error_t status;
00445     grpc_sessionid_t *req_ids;
00446     GS_DAG_Node_t *node;
00447     int total_num_sched, num_sched, sched_level, finished;
00448     int i;
00449 
00450 
00451     /* if the DAG has not yet been analyzed,
00452        analyze it first before executing */
00453     if (!dag->analyzed) {
00454         analyze_dep_GS_DAG(dag);
00455     }
00456 
00457     req_ids = (grpc_sessionid_t *)
00458     malloc(sizeof(grpc_sessionid_t) * dag->num_nodes);
00459     if (!req_ids) {
00460         perror("malloc");
00461         return -1;
00462     }
00463 
00464     total_num_sched = 0;
00465     num_sched = 0;
00466     sched_level = 0;
00467     /* if there is node that is not executed */
00468     while (total_num_sched < dag->num_nodes) {
00469         printf("\nCurrent scheduling level: %d\n", sched_level);
00470         /* execute the DAG in a level-by-level fasion */
00471         for (node = dag->head_node; node != NULL; node = node->next) {
00472             /* if the node has the same scheduling level as the current */
00473             if (node->sched_level == sched_level) {
00474                 /*node->handle->bind_servers_at_call_time = 1; */
00475                 /* calling */
00476                 if (node->arg_list && !node->arg_stack) {
00477                     /* async variable argument list call */
00478                     status = grpc_call_arg_list_async(
00479                     node->handle, req_ids + num_sched, node->arg_list);
00480                     if (status != GRPC_NO_ERROR) {
00481                         fprintf(stderr,
00482                         "error submitting job with request id (%d)\n",
00483                         req_ids[num_sched]);
00484                         return -1;
00485                     } else {
00486                         printf("Submitting job with request id (%d) onto server (%d)\n",
00487                         req_ids[num_sched], 
00488                         node->handle->srv_idx);
00489                         /*node->handle->server_list[node->handle->srv_idx]->hostname); */
00490                     }
00491                 }
00492                 else if (!node->arg_list && node->arg_stack) {
00493                     /* async argument stack call */
00494                     status = grpc_call_arg_stack_async(
00495                     node->handle, req_ids + num_sched, node->arg_stack);
00496                     if (status != GRPC_NO_ERROR) {
00497                         fprintf(stderr,
00498                         "error submitting job with request id (%d)\n",
00499                         req_ids[num_sched]);
00500                         return -1;
00501                     } else {
00502                         printf("Submitting job with request id (%d) onto server (%d)\n",
00503                         req_ids[num_sched],
00504                         node->handle->srv_idx);
00505                         /*node->handle->server_list[node->handle->srv_idx]->hostname); */
00506                         
00507                     }
00508                 }
00509 
00510                 num_sched++;
00511             }
00512         }
00513         /* finish submitting jobs at the current scheduling level */
00514 
00515         /* wait for the current scheduling level finishes */
00516         while ((finished = wait_level_finish_GS_DAG(req_ids, num_sched)) == 0);
00517 
00518         /* get back the results of the current scheduling level */
00519         for (i = 0; i < num_sched; i++) {
00520             status = grpc_wait(req_ids[i]);
00521             if (status != GRPC_NO_ERROR) {
00522                 fprintf(stderr,
00523                 "error finishing job with request id (%d)\n",
00524                 req_ids[i]);
00525                 return -1;
00526             }
00527         }
00528         
00529         /* the total number of nodes scheduled at this point */
00530         total_num_sched += num_sched;
00531         num_sched = 0;
00532         
00533         /* proceed to the next scheduling level */
00534         sched_level++;
00535     }
00536 
00537     return 0;
00538 }