/* $Id$ */ /* * PSNC DRMAA for LL * Copyright (C) 2010 Poznan Supercomputing and Networking Center * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include static char *lldrmaa_session_run_job(fsd_drmaa_session_t *self, const fsd_template_t *jt); static fsd_iter_t *lldrmaa_session_run_bulk(fsd_drmaa_session_t *self, const fsd_template_t *jt, int start, int end, int incr ); static fsd_job_t *lldrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id ); static void lldrmaa_session_apply_configuration( fsd_drmaa_session_t *self ); static void lldrmaa_session_destroy_nowait(fsd_drmaa_session_t *self); static void *lldrmaa_session_wait_thread( fsd_drmaa_session_t *self ); static void lldrmaa_create_unix_domain_socket(lldrmaa_session_t *self); fsd_drmaa_session_t * lldrmaa_session_new( const char *contact ) { lldrmaa_session_t *volatile self = NULL; TRY { self = (lldrmaa_session_t*)fsd_drmaa_session_new(contact); fsd_realloc( self, 1, lldrmaa_session_t ); self->super.run_job = lldrmaa_session_run_job; self->super.run_bulk = lldrmaa_session_run_bulk; self->super.new_job = lldrmaa_session_new_job; self->super.wait_thread = lldrmaa_session_wait_thread; self->super_destroy_nowait = self->super.destroy_nowait; self->super.destroy_nowait = lldrmaa_session_destroy_nowait; self->super_apply_configuration = self->super.apply_configuration; self->super.apply_configuration = lldrmaa_session_apply_configuration; self->terminate_job_on_vacated = true; self->unix_socket_name = NULL; self->socket_fd = -1; lldrmaa_create_unix_domain_socket(self); /* This must be called after creating socket as wait thread tries to accept new connections on it*/ self->super.load_configuration( &self->super, "ll_drmaa" ); } EXCEPT_DEFAULT { fsd_free( self ); fsd_exc_reraise(); } END_TRY return (fsd_drmaa_session_t*)self; } char * lldrmaa_session_run_job( fsd_drmaa_session_t *self, const fsd_template_t *jt ) { char *job_id = NULL; fsd_iter_t *volatile job_ids = NULL; TRY { job_ids = self->run_bulk( self, jt, 0, 0, 0 ); /* single job run as bulk job specialization */ job_id = fsd_strdup( job_ids->next( job_ids ) ); } FINALLY { if( job_ids ) job_ids->destroy( job_ids ); } END_TRY return job_id; } fsd_iter_t * lldrmaa_session_run_bulk( fsd_drmaa_session_t *self, const fsd_template_t *jt, int start, int end, int incr ) { fsd_job_t *volatile job = NULL; char **volatile job_ids = NULL; unsigned n_jobs = 0; volatile bool connection_lock = false; lldrmaa_session_t *llself = (lldrmaa_session_t*) self; fsd_environ_t *volatile env = NULL; char *volatile cmd_path = NULL; int status; LL_job job_info; char *monitor_program = LL_DRMAA_BIN_DIR"/lldrmaa_monitor"; TRY { if( start != end ) n_jobs = (end - start) / incr + 1; else n_jobs = 1; fsd_log_debug(("Create cmd file")); cmd_path = (char *volatile) lldrmaa_job_create_req( self, jt, (fsd_environ_t**)&env , n_jobs, start, incr); fsd_log_debug(("llsubmit: %s",cmd_path)); connection_lock = fsd_mutex_lock( &self->drm_connection_mutex ); if (self->wait_thread_run_flag) { fsd_log_info(("llsubmit(%s, %s, %s, %p, %d)",cmd_path, monitor_program, llself->unix_socket_name, (void*)&job_info, LL_JOB_VERSION)); status = llsubmit(cmd_path, monitor_program, llself->unix_socket_name, &job_info, LL_JOB_VERSION); } else { fsd_log_info(("llsubmit(%s, NULL, NULL, %p, %d)",cmd_path, (void*)&job_info, LL_JOB_VERSION)); status = llsubmit(cmd_path, NULL, NULL, &job_info, LL_JOB_VERSION); } connection_lock = fsd_mutex_unlock( &self->drm_connection_mutex ); if(getenv("LLDRMAA_KEEP_CMD") == NULL && remove(cmd_path) == -1) fsd_log_warning(("Can't delete cmd file: %s", cmd_path)); fsd_free(cmd_path); /* check if this affects llusbmit */ if(status) /* -1 Error, error messages written to stderr.*/ { fsd_log_error(("llsubmit: returned non-zero")); fsd_exc_raise_fmt(lldrmaa_map_submit(status), "llsubmit returned non-zero. %s",lldrmaa_err_submit(status)); } else /* 0 */ { fsd_log_debug(("llsubmit: %s",lldrmaa_err_submit(status))); } if( start != end ) { unsigned idx, i; fsd_calloc( job_ids, n_jobs+1, char* ); for( idx = start, i = 0; idx <= (unsigned)end; idx += incr, i++ ) { job_ids[i] = fsd_asprintf("%s.%d.%d", job_info.step_list[i]->id.from_host, job_info.step_list[i]->id.cluster, job_info.step_list[i]->id.proc); fsd_log_info((" new array job id: %s", job_ids[i])); job = lldrmaa_job_new( fsd_strdup(job_ids[i]) ); job->session = self; job->submit_time = time(NULL); self->jobs->add( self->jobs, job ); job->release( job ); job = NULL; } fsd_assert( i == n_jobs ); } else /* ! bulk */ { fsd_calloc( job_ids, n_jobs+1, char* ); job_ids[0] = fsd_asprintf( "%s.%d.0", job_info.step_list[0]->id.from_host, job_info.step_list[0]->id.cluster); fsd_log_info((" new job id: %s", job_ids[0])); job = lldrmaa_job_new( fsd_strdup(job_ids[0]) ); job->session = self; job->submit_time = time(NULL); self->jobs->add( self->jobs, job ); job->release( job ); job = NULL; } } ELSE { connection_lock = fsd_mutex_lock( &self->drm_connection_mutex ); llfree_job_info(&job_info,LL_JOB_VERSION); connection_lock = fsd_mutex_unlock( &self->drm_connection_mutex ); fsd_log_debug(("llfree_job_info run")); } FINALLY { if( connection_lock ) fsd_mutex_unlock( &self->drm_connection_mutex ); if( job ) job->release( job ); if( fsd_exc_get() != NULL ) fsd_free_vector( job_ids ); } END_TRY return fsd_iter_new( job_ids, n_jobs ); } fsd_job_t * lldrmaa_session_new_job( fsd_drmaa_session_t *self, const char *job_id ) { fsd_job_t *job; job = lldrmaa_job_new( fsd_strdup(job_id) ); job->session = self; return job; } void lldrmaa_session_apply_configuration( fsd_drmaa_session_t *self ) { lldrmaa_session_t *llself = (lldrmaa_session_t*)self; fsd_conf_option_t *terminate_job_on_vacated; llself->super_apply_configuration(self); /* call method from the superclass */ terminate_job_on_vacated = fsd_conf_dict_get(self->configuration, "terminate_job_on_vacated" ); if( terminate_job_on_vacated ) { if( terminate_job_on_vacated->type == FSD_CONF_INTEGER && (terminate_job_on_vacated->val.integer == 0 || terminate_job_on_vacated->val.integer == 1)) { llself->terminate_job_on_vacated = (terminate_job_on_vacated->val.integer != 0 ); fsd_log_debug(("terminate_job_on_vacated: %d",llself->terminate_job_on_vacated)); } else fsd_exc_raise_msg( FSD_ERRNO_INTERNAL_ERROR, "configuration: 'terminate_job_on_vacated' should be 0 or 1" ); } } void lldrmaa_session_destroy_nowait(fsd_drmaa_session_t *self) { lldrmaa_session_t *llself = (lldrmaa_session_t*) self; fsd_log_enter(( "" )); /*close socket */ if (close(llself->socket_fd) != 0) { fsd_log_error(("Failed to close UNIX socket")); } else { llself->socket_fd = -1; } /*delete socket file*/ if (unlink(llself->unix_socket_name) != 0) { fsd_log_error(("Failed to delete UNIX socket: %s", llself->unix_socket_name)); } fsd_free(llself->unix_socket_name); /* call the destroy_nowait method from the super class */ llself->super_destroy_nowait(self); fsd_log_return(( "" )); } void * lldrmaa_session_wait_thread( fsd_drmaa_session_t *self ) { lldrmaa_session_t *llself = (lldrmaa_session_t*) self; fsd_job_t *volatile job = NULL; lldrmaa_job_t *lljob = NULL; char state[256] = ""; char job_id[256] = ""; int status = -1; char buffer[256] = ""; struct sockaddr_un saddress; size_t address_length = 0; int connection_fd = -1; fsd_log_enter(( "" )); fsd_mutex_lock( &self->mutex ); TRY { while( self->wait_thread_run_flag ) TRY { int bytes_read = -1; int rc = -1; fd_set fds; struct timeval tv; /*beginning of blocking part - release mutex */ fsd_mutex_unlock( &self->mutex ); fsd_log_debug(("WT - next iteration")); FD_ZERO(&fds); FD_SET(llself->socket_fd,&fds); tv.tv_sec = 1; tv.tv_usec = 0; rc = select(llself->socket_fd+1, &fds, NULL, NULL, &tv); if (rc < 0) { fsd_log_error(("WT - select() failed (errno=%d)", errno)); fsd_mutex_lock( &self->mutex ); continue; } else if (rc > 0 && FD_ISSET(llself->socket_fd,&fds)) { fsd_log_debug(("WT - server socket ready for accept... ")); } else /*rc == 0 (timeout) || FD_ISSET == false */ { fsd_log_debug(("WT - select() timeout")); fsd_mutex_lock( &self->mutex ); continue; } if ((connection_fd = accept(llself->socket_fd, (struct sockaddr *) &saddress, &address_length)) == -1) { fsd_log_error(("WT - failed to accept new connection (errno = %d)", errno)); fsd_mutex_lock( &self->mutex ); continue; } fsd_log_debug(("WT - connection accepted: fd = %d ", connection_fd)); if ((bytes_read = read(connection_fd, buffer, 255)) < 0) { fsd_log_error(("WT - failed to read from socket (errno = %d)", errno)); close(connection_fd); fsd_mutex_lock( &self->mutex ); continue; } else { buffer[bytes_read] = '\0'; /* make sure that message is NULL terminated */ } close(connection_fd); /*end of blocking part - acquire mutex */ fsd_mutex_lock( &self->mutex ); if (sscanf(buffer,"%s %s %d",state,job_id,&status) != 3) { fsd_log_error(("WT - failed to decode message: %s", buffer)); continue; } fsd_log_info(("WT - monitor program: %s %s %d", state, job_id, status)); /*update job status */ TRY { job = self->get_job( self, job_id ); lljob = (lldrmaa_job_t*) job; if( job ) { if( strcmp(state,"JOB_STARTED")==0 && job->state >= DRMAA_PS_DONE) { fsd_log_debug(("WT - received JOB_STARTED when job has already finished")); } else if ( strcmp(state, "JOB_COMPLETED")==0) { lljob->read_job_info_mon(job, state, status); } else { lljob->read_job_info_mon(job, state, -1); } } else { fsd_log_error(("WT - Unknown job: %s", job_id)); } } FINALLY { if( job ) job->release( job ); } END_TRY fsd_cond_broadcast( &self->wait_condition ); } EXCEPT_DEFAULT { const fsd_exc_t *e = fsd_exc_get(); fsd_log_error(( "wait thread: <%d:%s>", e->code(e), e->message(e) )); fsd_exc_reraise(); } END_TRY } FINALLY { fsd_log_debug(("WT - Terminated.")); fsd_mutex_unlock( &self->mutex ); } END_TRY fsd_log_return(( " =NULL" )); return NULL; } void lldrmaa_create_unix_domain_socket(lldrmaa_session_t *self) { char socket_name_template[] = "/tmp/drmaa_socket_XXXXXX"; struct sockaddr_un address; size_t address_length; int rc; if (mktemp(socket_name_template) == NULL) fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "Can't generate socket tmp name: %s",socket_name_template); fsd_log_info(("UNIX DOMAIN SOCKET name: %s",socket_name_template)); if ((self->socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "Unable to create UNIX socket"); address.sun_family = AF_UNIX; address_length = sizeof(address.sun_family) + sprintf(address.sun_path, socket_name_template) + 1; if(bind(self->socket_fd, (struct sockaddr *) &address, address_length) != 0) fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "Bind failed (socket name: %s)", socket_name_template); if((rc = chmod(socket_name_template, S_IRUSR|S_IWUSR)) == -1) { char errbuf[256] = "InternalError"; (void)strerror_r(errno, errbuf, 256); /*on error the default message would be returned */ fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "Chmod() returned -1 errmsg: %s", errbuf); } else if (rc != 0) { fsd_assert(false); } else { fsd_log_debug(("Socket permissions changed with chmod()")); } if(listen(self->socket_fd, 128) != 0) fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "listen() failed"); self->unix_socket_name = fsd_strdup(socket_name_template); }