Functions

gs_dag_scheduler.c File Reference

#include "gs_dag.h"
#include "gs_sequence.h"
Include dependency graph for gs_dag_scheduler.c:

Go to the source code of this file.

Functions

int analyze_dep_GS_DAG (GS_DAG_t *dag)
int map_to_servers_GS_DAG (GS_DAG_t *dag, int scheduler)
int post_analysis_GS_DAG (GS_DAG_t *dag, va_list arg_list, int n)
int wait_level_finish_GS_DAG (grpc_sessionid_t *req_ids, int num_sched)
int execute_GS_DAG (GS_DAG_t *dag)

Function Documentation

int analyze_dep_GS_DAG ( GS_DAG_t *  dag  ) 

Analyze data dependencies among DAG nodes

Parameters:
dag -- the DAG to be analyzed
Returns:
0 on success, -1 on failure

Definition at line 19 of file gs_dag_scheduler.c.

                                      {
    GS_DAG_Node_t *lptr, *rptr;
    gs_argument_t *largp, *rargp;
    /*void *ldataptr, *rdataptr; */
    int mode;
    int i, j;


    if (dag == NULL) return -1;

    if (dag->num_nodes == 1) {
        for (largp = dag->head_node->handle->problem_desc->arglist;
             largp != NULL; largp = largp->next) {
            if (largp->inout != GS_IN) {
                largp->pass_back = 1;
                largp->data_handle = 0;
            }
        }
        
        dag->analyzed = 1;
        
        return 0;
    }

            
    /* analyze dependencies */
    /* compare each node with the subsequent nodes in the list */
    for (lptr = dag->head_node; lptr != NULL; lptr = lptr->next) {

        for (rptr = lptr->next; rptr != NULL; rptr = rptr->next) {

            /* compare each pair of arguments of the two nodes */
            for (largp = lptr->handle->problem_desc->arglist, i = 0;
                 largp != NULL;
                 largp = largp->next, i++) {

                for (rargp = rptr->handle->problem_desc->arglist, j = 0;
                     rargp != NULL;
                     rargp = rargp->next, j++) {

                    /* set default value to each field in each
                       argument that is used in sequencing */
                    largp->pass_back = 1;
                    rargp->pass_back = 1;
                    largp->data_handle = 0;
                    rargp->data_handle = 0;
                    largp->num_targets = 0;
                    rargp->num_targets = 0;
                    largp->target_server_list = NULL;
                    rargp->target_server_list = NULL;
                    largp->source = NULL;
                    rargp->source = NULL;
                    largp->index = i;
                    rargp->index = j;
                        
                    /* --------- process dependency among scalar ---------
                       --------- arguments in conservative mode  --------- */
                    mode = grpc_get_sequence_mode();

                    /* if it is in conservative mode */
                    if (mode == CONSERVATIVE_MODE) {
                        /* if both arguments are scalar and have the
                           same object and data types, and they are
                           not all input arguments, indentify this
                           case as a special dependency as
                           CONSERVATIVE_SCALAR_DEPENDENCY */
                        if ((largp->objecttype == GS_SCALAR &&
                             rargp->objecttype == GS_SCALAR) &&
                            (largp->datatype == rargp->datatype) &&
                            (largp->inout != GS_IN ||
                             rargp->inout != GS_IN)) {
                            /* adjust the scheduling level according
                               to the dependency relationship */
                            if (rptr->sched_level < lptr->sched_level + 1)
                                rptr->sched_level = lptr->sched_level + 1;
                            /* the currently maximum scheduling level */
                            dag->max_sched_level = rptr->sched_level;
                            /* insert the dependency into the DAG */
                            insert_dep_GS_DAG(dag, lptr, rptr,
                            CONSERVATIVE_SCALAR_DEPENDENCY,
                            largp, rargp);
                        }
                    }

                    /* ---- IN <-> IN is considered to be independent ---- */

                    /* ---------- INPUT-AFTER-OUTPUT dependency ---------- */
                    /* ------------------ INOUT -> IN -------------------- */
                    /* ------------------ OUT -> IN ---------------------- */
                    /* ---------------- INOUT -> INOUT ------------------- */
                    /* ----------------- OUT -> INOUT -------------------- */
                    if ((largp->inout == GS_INOUT || largp->inout == GS_OUT) &&
                        (rargp->inout == GS_IN || rargp->inout == GS_INOUT)) {
                        /* if they reference the same object */
                        if (verify_object_type(largp) == 1 &&
                            verify_object_type(rargp) == 1 &&
                            compare_object(largp, rargp) == 0) {
                            /* adjust the scheduling level according
                               to the dependency relationship */
                            if (rptr->sched_level < lptr->sched_level + 1)
                                rptr->sched_level = lptr->sched_level + 1;
                            /* the currently maximum scheduling level */
                            dag->max_sched_level = rptr->sched_level;
                            /* insert the dependency into the DAG */
                            insert_dep_GS_DAG(dag, lptr, rptr,
                            INPUT_AFTER_OUTPUT_DEPENDENCY,
                            largp, rargp);
                        }
                    }

                    /* ---------- OUTPUT-AFTER-INPUT dependency ---------- */
                    /* ------------------ IN -> OUT -----------------------*/
                    /* ------------------ IN -> INOUT ---------------------*/
                    else if ((largp->inout == GS_IN && rargp->inout == GS_OUT) ||
                        (largp->inout == GS_IN && rargp->inout == GS_INOUT)) {
                        /* if they reference the same object */
                        if (verify_object_type(largp) == 1 &&
                            verify_object_type(rargp) == 1 &&
                            compare_object(largp, rargp) == 0) {
                            /* adjust the scheduling level according
                               to the dependency relationship */
                            if (rptr->sched_level < lptr->sched_level + 1)
                                rptr->sched_level = lptr->sched_level + 1;
                            /* the currently maximum scheduling level */
                            dag->max_sched_level = rptr->sched_level;
                            /* insert the dependency into the DAG */
                            insert_dep_GS_DAG(dag, lptr, rptr,
                            OUTPUT_AFTER_INPUT_DEPENDENCY,
                            largp, rargp);
                        }
                    }

                    /* ---------- OUTPUT-AFTER-OUTPUT dependency --------- */
                    /* ------------------ INOUT -> OUT --------------------*/
                    /* ------------------- OUT -> OUT ---------------------*/
                    else if ((largp->inout == GS_OUT && rargp->inout == GS_OUT) ||
                        (largp->inout == GS_INOUT && rargp->inout == GS_OUT)) {
                        /* if they reference the same object */
                        if (verify_object_type(largp) == 1 &&
                            verify_object_type(rargp) == 1 &&
                            compare_object(largp, rargp) == 0) {
                            /* adjust the scheduling level according
                               to the dependency relationship */
                            if (rptr->sched_level < lptr->sched_level + 1)
                                rptr->sched_level = lptr->sched_level + 1;
                            /* the currently maximum scheduling level */
                            dag->max_sched_level = rptr->sched_level;
                            /* insert the dependency into the DAG */
                            insert_dep_GS_DAG(dag, lptr, rptr,
                            OUTPUT_AFTER_OUTPUT_DEPENDENCY,
                            largp, rargp);
                        }
                    } /* if & else-if */

                } /* inner-most for */

            } /* third-level for */

        } /* second-level for */

    } /* outer-most for */

    dag->analyzed = 1;

    return 0;
}

Here is the call graph for this function:

Here is the caller graph for this function:

int execute_GS_DAG ( GS_DAG_t *  dag  ) 

Definition at line 443 of file gs_dag_scheduler.c.

                                  {
    grpc_error_t status;
    grpc_sessionid_t *req_ids;
    GS_DAG_Node_t *node;
    int total_num_sched, num_sched, sched_level, finished;
    int i;


    /* if the DAG has not yet been analyzed,
       analyze it first before executing */
    if (!dag->analyzed) {
        analyze_dep_GS_DAG(dag);
    }

    req_ids = (grpc_sessionid_t *)
    malloc(sizeof(grpc_sessionid_t) * dag->num_nodes);
    if (!req_ids) {
        perror("malloc");
        return -1;
    }

    total_num_sched = 0;
    num_sched = 0;
    sched_level = 0;
    /* if there is node that is not executed */
    while (total_num_sched < dag->num_nodes) {
        printf("\nCurrent scheduling level: %d\n", sched_level);
        /* execute the DAG in a level-by-level fasion */
        for (node = dag->head_node; node != NULL; node = node->next) {
            /* if the node has the same scheduling level as the current */
            if (node->sched_level == sched_level) {
                /*node->handle->bind_servers_at_call_time = 1; */
                /* calling */
                if (node->arg_list && !node->arg_stack) {
                    /* async variable argument list call */
                    status = grpc_call_arg_list_async(
                    node->handle, req_ids + num_sched, node->arg_list);
                    if (status != GRPC_NO_ERROR) {
                        fprintf(stderr,
                        "error submitting job with request id (%d)\n",
                        req_ids[num_sched]);
                        return -1;
                    } else {
                        printf("Submitting job with request id (%d) onto server (%d)\n",
                        req_ids[num_sched], 
                        node->handle->srv_idx);
                        /*node->handle->server_list[node->handle->srv_idx]->hostname); */
                    }
                }
                else if (!node->arg_list && node->arg_stack) {
                    /* async argument stack call */
                    status = grpc_call_arg_stack_async(
                    node->handle, req_ids + num_sched, node->arg_stack);
                    if (status != GRPC_NO_ERROR) {
                        fprintf(stderr,
                        "error submitting job with request id (%d)\n",
                        req_ids[num_sched]);
                        return -1;
                    } else {
                        printf("Submitting job with request id (%d) onto server (%d)\n",
                        req_ids[num_sched],
                        node->handle->srv_idx);
                        /*node->handle->server_list[node->handle->srv_idx]->hostname); */
                        
                    }
                }

                num_sched++;
            }
        }
        /* finish submitting jobs at the current scheduling level */

        /* wait for the current scheduling level finishes */
        while ((finished = wait_level_finish_GS_DAG(req_ids, num_sched)) == 0);

        /* get back the results of the current scheduling level */
        for (i = 0; i < num_sched; i++) {
            status = grpc_wait(req_ids[i]);
            if (status != GRPC_NO_ERROR) {
                fprintf(stderr,
                "error finishing job with request id (%d)\n",
                req_ids[i]);
                return -1;
            }
        }
        
        /* the total number of nodes scheduled at this point */
        total_num_sched += num_sched;
        num_sched = 0;
        
        /* proceed to the next scheduling level */
        sched_level++;
    }

    return 0;
}

Here is the call graph for this function:

Here is the caller graph for this function:

int map_to_servers_GS_DAG ( GS_DAG_t *  dag,
int  scheduler 
)

Map a DAG to a set of servers

Parameters:
dag -- the DAG to be mapped
Returns:
0 on success, -1 on failure

Definition at line 194 of file gs_dag_scheduler.c.

                                                        {
    int num_servers;
    int num_reqs_mapped;
    int sched_level;
    int num_reqs_per_level;
    int num_reqs_per_server_level;
    int *assignments;
    int serv_idx;
    int i, j;
    GS_DAG_Node_t *node;
    

    if (dag == NULL) return -1;

    /* if the DAG has not yet been analyzed,
       analyze it first before mapping */
    if (!dag->analyzed) {
        analyze_dep_GS_DAG(dag);
    }
    
    /* number of servers available; we assume 
       that all servers have identical services */
    num_servers = dag->head_node->handle->num_servers;
    
    if (scheduler == ROUND_ROBIN) { 
        num_reqs_mapped = 0;
        sched_level = 0;
        while (num_reqs_mapped < dag->num_nodes) {
            num_reqs_per_level = 0;
            for (node = dag->head_node; node != NULL; node = node->next) {
                if (node->sched_level == sched_level) 
                    num_reqs_per_level++;
            }
    
            serv_idx = 0;
            for (node = dag->head_node; node != NULL; node = node->next) {
                if (node->sched_level == sched_level) {
                    node->handle->srv_idx = serv_idx % num_servers;
                    serv_idx++;
                }
            }
            
            num_reqs_mapped += num_reqs_per_level;
            sched_level++;
        }
    } 
    else if (scheduler == AVERAGE) {
        assignments = (int *) malloc(sizeof(int) * num_servers);
        num_reqs_mapped = 0;
        sched_level = 0;
        while (num_reqs_mapped < dag->num_nodes) {
            num_reqs_per_level = 0;
            for (node = dag->head_node; node != NULL; node = node->next) {
                if (node->sched_level == sched_level) 
                    num_reqs_per_level++;
            }
            
            num_reqs_per_server_level = num_reqs_per_level / num_servers; 
            for (i = 0; i < num_servers; i++) {
                assignments[i] = num_reqs_per_server_level;               
            }
            
            j = 0;
            for (i = num_reqs_per_server_level * num_servers; 
                 i < num_reqs_per_level; i++) { 
                assignments[j++]++;
            }
            
            i = 0;
            for (node = dag->head_node; node != NULL; node = node->next) {
                if (node->sched_level == sched_level) {
                    if (assignments[i] > 0) {
                        node->handle->srv_idx = i;                      
                    } else {
                        i++;
                        node->handle->srv_idx = i;
                    }
                    assignments[i]--;
                }
            }
            
            num_reqs_mapped += num_reqs_per_level;
            sched_level++;                                 
        }  
    }
    else if (scheduler == OPT_COMM) {
    }
    else if (scheduler == OPT_COMP) {
    }
    else if (scheduler == AGENT) {
        /* use the default mapping obtained from the agent */
    }
    
    return 0;
}

Here is the call graph for this function:

Here is the caller graph for this function:

int post_analysis_GS_DAG ( GS_DAG_t *  dag,
va_list  arg_list,
int  n 
)

Do some post analysis stuffs to optimize the DAG after server mapping

Parameters:
dag -- the DAG to be marked
Returns:
0 on success, -1 on failure

Definition at line 299 of file gs_dag_scheduler.c.

                                                                 {
    GS_DAG_Dep_t *dep;
    int i;


    if (dag == NULL) return -1;

    /* count the number of depending nodes of each
       argument in each RAW dependency in the DAG */
    for (dep = dag->head_dep; dep != NULL; dep = dep->next) {
        if (dep->dep_type == INPUT_AFTER_OUTPUT_DEPENDENCY)
            dep->largp->num_targets++;
    }

    /* process all the intermediate arguments in terms of
       whether they should be pass back to the client and
       which server they should be pushed to */

    /* create a target server list for each argument
       that is depended by some other nodes */
    for (dep = dag->head_dep; dep != NULL; dep = dep->next) {

        if (dep->dep_type == INPUT_AFTER_OUTPUT_DEPENDENCY &&
            dep->largp->num_targets > 0 &&
            dep->largp->target_server_list == NULL) {

            dep->largp->target_server_list = (gs_server_t **)
            malloc(sizeof(gs_server_t *) * dep->largp->num_targets);
            if (dep->largp->target_server_list == NULL) {
                perror("malloc");
                exit(1);
            }

            /* initialize to NULL */
            for (i = 0; i < dep->largp->num_targets; i++) {
                dep->largp->target_server_list[i] = NULL;
            }
        }
    }

    /* find out all the target nodes that are depending
       on each depended argument in each dependency */
    for (dep = dag->head_dep; dep != NULL; dep = dep->next) {
        if (dep->dep_type == INPUT_AFTER_OUTPUT_DEPENDENCY) {
            /* find the first unused position in target
               server list in order to insert a new target */
            for (i = 0; i < dep->largp->num_targets; i++) {
                if (dep->largp->target_server_list[i] == NULL)
                    break;
            }

            /* insert the new target */
            dep->largp->target_server_list[i] = dep->cnode->server;

            /* mark that the input argument of the depending node
               will come from the output argument of the dependend
               node; note that only the last depended node will be
               the source. This ensures that if one object is involved
               in multiple RAW dependencies, ony the last depended node
               will be the source. In addition, the way we analyze
               dependency will ensure the correct order */
            /*dep->largp->source = dep->pnode->server;*/
            dep->rargp->source = dep->pnode->server;
        }
    }

    /* the above steps setup the target server list and source
       of each argument involved in each dependency; however,
       there may be redundant and unnessary target markup, so
       this step remove all the redundancies */
    for (dep = dag->head_dep; dep != NULL; dep = dep->next) {
        /* the child node is actually not expecting
           to receive the intermediate result from
           the parent node */
        if (dep->rargp->source != dep->pnode->server) {
            for (i = 0; i < dep->largp->num_targets; i++) {
                /* remove the redundant target */
                if (dep->largp->target_server_list[i] ==
                    dep->rargp->source)
                    dep->largp->target_server_list[i] = NULL;
            }
            
            delete_dep_GS_DAG(dag, dep);
        }
    }

    /* mark the 'pass_back' field of the depended
       argument of the parent node with 0, means not
       to pass back this intermediate argument */
    for (dep = dag->head_dep; dep != NULL; dep = dep->next) {
        /* under these two kinds of dependencies,
           arguments need not to be passed back */
        if (dep->dep_type == INPUT_AFTER_OUTPUT_DEPENDENCY) {
            if (!if_passed_back(arg_list, n, dep->largp)) {
                /* the depended argument should not be passed back */
                /* dep->largp->pass_back = 0; */
                dep->largp->pass_back = 0;
                /* the dependent argument expects to receive data handle */
                dep->rargp->data_handle = 1;
            }
        }
        else if (dep->dep_type == OUTPUT_AFTER_OUTPUT_DEPENDENCY) {
            /* the depended argument should not be passed back */
            dep->largp->pass_back = 0;
        }
    }

    return 0;
}

Here is the call graph for this function:

Here is the caller graph for this function:

int wait_level_finish_GS_DAG ( grpc_sessionid_t *  req_ids,
int  num_sched 
)

Definition at line 410 of file gs_dag_scheduler.c.

                                                                       {
    int i;
    grpc_error_t status;


    for (i = 0; i < num_sched; i++) {
        /* probe to see if all the jobs in the 
           current scheduling level are completed */
        status = grpc_probe(req_ids[i]);

        /* mot completed */
        if (status == GRPC_NOT_COMPLETED) {
            return 0;
        }
        /* completed */
        else if (status == GRPC_NO_ERROR) {
            continue;
        }
        /* error condition */
        else {
            fprintf(stderr, 
            "error probing job with request id (%d)\n", req_ids[i]);
            return -1;
        }
    }
    
    return 1;
}

Here is the call graph for this function:

Here is the caller graph for this function: