gs_seq_dsi.c

Go to the documentation of this file.
00001 /********************************************************************/
00002 /*                            gs_seq_dsi.h                          */
00003 /*      the source code for GridSolve request sequencing DSI API    */
00004 /*                       Yinan Li, 06/17/2008                       */
00005 /********************************************************************/
00006 
00007 
00008 #include "portability.h"
00009 #include "gs_seq_dsi.h"
00010 
00011 
00012 /* the default paths to put temporary data files 
00013    that hold the data to be transferred between
00014    servers; one for sender and one for receiver */
00015 /* sender: the service that produces the data */
00016 /* receiver: the service that consumes the data */
00017 char *DATA_SEND_FILE_PATH;
00018 char *DATA_RECV_FILE_PATH; 
00019 
00020 
00025 int gs_seq_set_lfs_dsi_data_storage_path() {
00026     char *ds_root;
00027     char *dsend = "/data_storage/send/";
00028     char *drecv = "/data_storage/recv/";
00029 
00030     
00031     ds_root = getenv("GRIDSOLVE_LDS_ROOT");
00032     if (!ds_root) {
00033         fprintf(stderr, 
00034         "error getting environment variable GRIDSOLVE_LDS_ROOT\n");
00035         return -1;
00036     }
00037     
00038     DATA_SEND_FILE_PATH = (char *) malloc(sizeof(char) * 
00039     (strlen(ds_root) + strlen(dsend) + 1));
00040     DATA_RECV_FILE_PATH = (char *) malloc(sizeof(char) *
00041     (strlen(ds_root) + strlen(drecv) + 1));
00042         
00043     if (!DATA_SEND_FILE_PATH || !DATA_RECV_FILE_PATH) {
00044         perror("malloc");
00045         return -1;
00046     }
00047 
00048     /* combine the root directory path of GridSolve and 
00049        relative path for senders and receivers */
00050     strcpy(DATA_SEND_FILE_PATH, ds_root);
00051     DATA_SEND_FILE_PATH[strlen(ds_root)] = '\0';
00052     strcat(DATA_SEND_FILE_PATH, dsend);
00053     DATA_SEND_FILE_PATH[strlen(ds_root) + strlen(dsend)] = '\0';
00054 
00055     strcpy(DATA_RECV_FILE_PATH, ds_root);
00056     DATA_RECV_FILE_PATH[strlen(ds_root)] = '\0';
00057     strcat(DATA_RECV_FILE_PATH, drecv);
00058     DATA_RECV_FILE_PATH[strlen(ds_root) + strlen(drecv)] = '\0';
00059         
00060     return 0;
00061 }
00062 
00063 
00064 /* 
00065  * decode a data handle (string) into 
00066  * a local-files-ystem-based dsi object 
00067  */
00068 LFS_DSI_OBJECT *gs_seq_decode_lfs_dsi_object(char *data_handle) {
00069     LFS_DSI_OBJECT *obj;
00070     char *tmp;
00071     int nchars;
00072 
00073 
00074     if (!data_handle) {
00075         fprintf(stderr, "bad data handle\n");
00076         return NULL;
00077     }
00078 
00079     obj = (LFS_DSI_OBJECT *) malloc(sizeof(LFS_DSI_OBJECT));
00080     if (!obj) {
00081         perror("malloc");
00082         return NULL;
00083     }
00084 
00085     tmp = (char *) malloc(sizeof(char) * (strlen(data_handle) + 1));
00086     if (!tmp) {
00087         perror("malloc");
00088         return NULL;
00089     }
00090 
00091     sscanf(data_handle, "<lfs_dsi_object>\n\t<type> %d </type>\n%n",
00092         &obj->data_type, &nchars);
00093     data_handle += nchars;
00094 
00095     sscanf(data_handle, "\t<major> %d </major>\n%n", &obj->major, &nchars);
00096     data_handle += nchars;
00097 
00098     sscanf(data_handle, "\t<size> %d </size>\n%n", &obj->data_size, &nchars);
00099     data_handle += nchars;
00100 
00101     sscanf(data_handle, "\t<index> %d </index>\n%n", &obj->index, &nchars);
00102     data_handle += nchars;
00103     
00104     sscanf(data_handle, "\t<function> %s </function>\n%n", tmp, &nchars);
00105     obj->func_name = strdup(tmp);
00106     data_handle += nchars;
00107     
00108     sscanf(data_handle, "\t<source> %s </source>\n%n", tmp, &nchars);
00109     obj->source = strdup(tmp);
00110     data_handle += nchars;
00111 
00112     sscanf(data_handle, "\t<path> %s </path>\n%n", tmp, &nchars);
00113     obj->path = strdup(tmp);
00114     data_handle += nchars;
00115 
00116     sscanf(data_handle, "\t<fname> %s </fname>\n</lfs_dsi_object>\n", tmp);
00117     obj->file_name = strdup(tmp);
00118 
00119     return obj;
00120 }
00121 
00122 
00123 /*
00124  * encode an local-filesystem-based 
00125  * dsi object to a data handle (string)
00126  */
00127 char *gs_seq_encode_lfs_dsi_object(LFS_DSI_OBJECT *obj) {
00128     char *msg, *tmp;
00129 
00130 
00131     msg = dstring_sprintf("<lfs_dsi_object>\n");
00132 
00133     tmp = dstring_sprintf("\t<type> %d </type>\n", obj->data_type);
00134     msg = dstring_append_free(msg, tmp);
00135 
00136     tmp = dstring_sprintf("\t<major> %d </major>\n", obj->major);
00137     msg = dstring_append_free(msg, tmp);
00138 
00139     tmp = dstring_sprintf("\t<size> %d </size>\n", obj->data_size);
00140     msg = dstring_append_free(msg, tmp);
00141 
00142     tmp = dstring_sprintf("\t<index> %d </index>\n", obj->index);
00143     msg = dstring_append_free(msg, tmp);
00144     
00145     tmp = dstring_sprintf("\t<function> %s </function>\n", obj->func_name);
00146     msg = dstring_append_free(msg, tmp);
00147     
00148     tmp = dstring_sprintf("\t<source> %s </source>\n", obj->source);
00149     msg = dstring_append_free(msg, tmp);
00150 
00151     tmp = dstring_sprintf("\t<path> %s </path>\n", obj->path);
00152     msg = dstring_append_free(msg, tmp);
00153 
00154     tmp = dstring_sprintf("\t<fname> %s </fname>\n", obj->file_name);
00155     msg = dstring_append_free(msg, tmp);
00156 
00157     tmp = dstring_sprintf("</lfs_dsi_object>\n");
00158     msg = dstring_append_free(msg, tmp);
00159 
00160     return msg;
00161 }
00162 
00163 
00164 /*
00165  * create a data handle
00166  */
00167 LFS_DSI_OBJECT *gs_seq_create_data_handle(
00168 gs_argument_t *argptr, char *path, char *fname, int my_major) {
00169     LFS_DSI_OBJECT *obj;
00170 
00171     if (!argptr || !path || !fname) {
00172         fprintf(stderr, "bad input arguments\n");
00173         return NULL;
00174     }
00175 
00176     
00177     obj = (LFS_DSI_OBJECT *) malloc(sizeof(LFS_DSI_OBJECT));
00178     if (!obj) {
00179         perror("malloc");
00180         return NULL;
00181     }
00182 
00183     obj->data_type = argptr->datatype;
00184     obj->major = my_major;
00185     obj->data_size = argptr->rows * argptr->cols;
00186     obj->index = argptr->index;
00187     obj->func_name = argptr->prob->name;
00188     obj->source = argptr->hostname;
00189     obj->path = path;
00190     obj->file_name = fname;
00191     
00192     return obj;
00193 }
00194 
00195 
00202 char *gs_seq_create_unique_file_name(gs_argument_t *argptr) {
00203     char *file_name;
00204     char *tmp;
00205     struct timeval   tv;
00206 /*  struct timezone  tz; */
00207     double cur_time;
00208       
00209     
00210     if (!argptr) {
00211         fprintf(stderr, "bad argument pointer\n");
00212         return NULL;
00213     }
00214     
00215     /* the file name is like the form */
00216     /* data_hostname_funcname_objecttype_datatype_inout_index.dat */
00217     file_name = dstring_sprintf("data_");
00218     tmp = dstring_sprintf("%s_", argptr->hostname);
00219     file_name = dstring_append_free(file_name, tmp);
00220     tmp = dstring_sprintf("%s_", argptr->prob->name);
00221     file_name = dstring_append_free(file_name, tmp);
00222     tmp = dstring_sprintf("%d_", argptr->objecttype);
00223     file_name = dstring_append_free(file_name, tmp);
00224     tmp = dstring_sprintf("%d_", argptr->datatype);
00225     file_name = dstring_append_free(file_name, tmp);
00226     tmp = dstring_sprintf("%d_", argptr->inout);
00227     file_name = dstring_append_free(file_name, tmp);
00228     tmp = dstring_sprintf("%d_", argptr->index);
00229     file_name = dstring_append_free(file_name, tmp);
00230     gettimeofday(&tv, NULL);
00231     cur_time = tv.tv_sec + tv.tv_usec / 1000000.0;
00232     tmp = dstring_sprintf("%f.dat", cur_time);
00233     file_name = dstring_append_free(file_name, tmp);
00234     return file_name;
00235 }
00236 
00237 
00241 int gs_seq_save_data_into_file(
00242 char *fname, gs_argument_t *argptr, int my_major, int my_dsig) {
00243     int fd, status;
00244 
00245     fd = open(fname, O_WRONLY | O_CREAT | O_TRUNC, 0666);
00246     if(fd < 0) {
00247         fprintf(stderr, "file '%s' could not be opened\n", fname);
00248         return -1;
00249     }
00250 
00251 
00252     if ((status = gs_send_tag(fd, my_major)) < 0) {
00253         ERRPRINTF("error writing major to file\n");
00254         return -1;
00255     }
00256 
00257     if ((status = gs_send_int(fd, my_dsig)) < 0) {
00258         ERRPRINTF("error writing data signature to file\n");
00259         return -1;
00260     }
00261     
00262     if ((status = gs_send_arg(fd, argptr, my_dsig)) < 0) {
00263         ERRPRINTF("error writing argument to file\n");
00264         return -1;
00265     }
00266     
00267     close(fd);
00268     
00269     return 0;
00270 }
00271 
00272 
00276 int gs_seq_restore_data_from_file(
00277 char *fname, gs_argument_t *argptr, int my_dsig) {
00278     int fd, status, sender_dsig, sender_major;
00279     
00280     fd = open(fname, O_RDONLY, 0666);
00281     if (fd < 0) {
00282         ERRPRINTF("file '%s' could not be opened\n", fname);
00283         return -1;
00284     }
00285 
00286     if ((status = gs_recv_tag(fd, &sender_major)) < 0) {
00287         ERRPRINTF("error receiving major from file\n");
00288         return -1;
00289     }
00290 
00291     if ((status = gs_recv_int(fd, &sender_dsig)) < 0) {
00292         ERRPRINTF("error receiving sender data signature from file\n");
00293         return -1;
00294     }
00295     
00296     if((status = gs_recv_arg(fd, argptr, sender_major, sender_dsig, my_dsig,
00297         GS_SERVER_SIDE)) < 0) {
00298         ERRPRINTF("error reading argument from file\n");
00299         return -1;
00300     }
00301     
00302     close(fd);
00303 
00304     return 0;
00305 }