package simulator; import eduni.simjava.Sim_event; import gridsim.GridSimTags; import gridsim.IO_data; import gridsim.gssim.GssimConstants; import gridsim.net.Link; import grms.shared.constants.BrokerConstants; import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus; import gssim.schedframe.scheduling.AbstractExecutable; import gssim.schedframe.scheduling.Executable; import gssim.schedframe.scheduling.plugin.grid.GridReservationManager; import gssim.schedframe.scheduling.plugin.grid.GssimJobRegistry; import gssim.schedframe.scheduling.queues.JobQueue; import gssim.schedframe.scheduling.queues.TaskQueue; import gssim.schedframe.scheduling.utils.JobDescription; import gssim.schedframe.scheduling.utils.TaskDescription; import java.util.ArrayList; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.exolab.castor.types.Duration; import org.joda.time.DateTime; import schedframe.scheduling.AbstractProcesses; import schedframe.scheduling.Job; import schedframe.scheduling.Task; import schedframe.scheduling.TaskInterface; import schedframe.scheduling.events.SchedulingEventReason; import schedframe.scheduling.events.TaskArrivedEvent; import schedframe.scheduling.events.TaskCanceledEvent; import schedframe.scheduling.events.TimerEvent; import schedframe.scheduling.plan.AllocationInterface; import schedframe.scheduling.plan.ComputingResourceTypeInterface; import schedframe.scheduling.plan.ComputingResourceTypeItemInterface; import schedframe.scheduling.plan.HostInterface; import schedframe.scheduling.plan.ParameterPropertyInterface; import schedframe.scheduling.plan.ScheduledTaskInterface; import schedframe.scheduling.plan.SchedulingPlanInterface; import schedframe.scheduling.plan.impl.Host; import schedframe.scheduling.plugin.grid.ModuleList; import schedframe.scheduling.utils.ResourceParameterName; import simulator.lists.ExecutablesList; import simulator.utils.XsltTransformations; /** * This class implements the functionality of a grid scheduler interface for the * task and host descriptions used in PSNC (that is: QcgJobDescriptionSchema.xsd and HostParamSchema.xsd) * * This class implements also the functionality of the reservation manager * used across multiple calls to {@link #scheduleJob(JobGridletList, String, String, String, ResourceDiscovery, String, String, String, String, String)} * @author Stanislaw Szczepanowski * */ public class GridBroker extends AbstractGridBroker { // private static Log log = LogFactory.getLog(GridBroker.class); /** The persistent job registry object */ protected GssimJobRegistry jobRegistry; /** Unfinished jobs */ protected JobQueue unfinishedJobs; protected TaskQueue unfinishedTasks; protected XsltTransformations xsltTransformer; protected ExecutablesList executables; /** * Creates an interface for a given plug-in. * * @param name the name of this entity * @param options the options object of the simulator * @param expectedJobNumber the number of expected jobs, that will be sent to the scheduler (for optimization purposes) * @param expectedTaskNumber the number of expected tasks, that will be sent to the scheduler (for optimization purposes) * @throws Exception if any error occurs */ public GridBroker(String name, ConfigurationOptions options, int expectedJobNumber, int expectedTaskNumber) throws Exception { super(name, options.gridSchedulingPluginName); set_stat(GssimConstants.getBrokersStatisticsObject()); // this.allTasks = new HashMap>(expectedJobNumber); // this.jobsMap = new HashMap(expectedJobNumber); this.jobRegistry = new GssimJobRegistry(); this.unfinishedJobs = new JobQueue(); this.unfinishedTasks = new TaskQueue(); this.moduleList.add(new GridReservationManager(this)); this.xsltTransformer = new XsltTransformations(); this.executables = new ExecutablesList(); } /** * Creates an interface for a given plug-in. * * @param name the name of this entity * @param link the connection between entity and router * @param options the options object of the simulator * @param expectedJobNumber the number of expected jobs, that will be sent to the scheduler (for optimization purposes) * @param expectedTaskNumber the number of expected tasks, that will be sent to the scheduler (for optimization purposes) * @throws Exception if any error occurs */ public GridBroker(String name, Link link, ConfigurationOptions options, int expectedJobNumber, int expectedTaskNumber) throws Exception { super(name, link, options.gridSchedulingPluginName); set_stat(GssimConstants.getBrokersStatisticsObject()); // this.allTasks = new HashMap>(expectedJobNumber); // this.jobsMap = new HashMap(expectedJobNumber); this.jobRegistry = new GssimJobRegistry(); this.unfinishedJobs = new JobQueue(); this.unfinishedTasks = new TaskQueue(); this.moduleList.add(new GridReservationManager(this)); this.xsltTransformer = new XsltTransformations(); this.executables = new ExecutablesList(); } public void scheduleCyclic(){ SchedulingPlanInterface decision = null; try { decision = gridSchedulerPlugin.schedule(new TimerEvent(), unfinishedJobs, unfinishedTasks, jobRegistry, moduleList, null); if (decision == null) return; log.info(decision.getDocument()); execute(decision); } catch (Exception e) { log.error("Exception during scheduling. " + e.getMessage()); e.printStackTrace(); } } /** * @see GridSchedulerInterface#scheduleJob */ public void scheduleJob(List jobsList, String userPreferences, String reservationRequest, ModuleList moduleList, String reservationOffers, String predictedTimes, String jobSet, String jobRegistryString, String brokerConfiguration) { //List jobs = (List) jobsList; TaskQueue waitingTasks = new TaskQueue(); for(int i = 0; i < jobsList.size(); i++){ Job newJob = (Job)jobsList.get(i); //jobsMap.put(newJob.getId(), newJob); this.jobRegistry.addJob(newJob); unfinishedJobs.add(newJob); List tasks = newJob.getTask(); //unfinishedTasks.addAll(tasks); waitingTasks.addAll(tasks); } if(jobRegistryString != null){ waitingTasks.addAll(jobRegistry.get(jobRegistryString).getTask()); } unfinishedTasks.addAll(waitingTasks.getReadyTasks(jobRegistry)); try { SchedulingPlanInterface decision = gridSchedulerPlugin.schedule( new TaskArrivedEvent(), unfinishedJobs, unfinishedTasks, jobRegistry, moduleList, null); if (decision == null) return; log.info(decision.getDocument()); execute(decision); } catch (Exception e) { log.error("Exception during scheduling. " + e.getMessage()); e.printStackTrace(); } } protected void execute(SchedulingPlanInterface decision){ ScheduledTaskInterface taskSchedulingDecisions[] = decision.getTask(); for (int i = 0; i < taskSchedulingDecisions.length; i++) { try { ScheduledTaskInterface taskDecision = taskSchedulingDecisions[i]; String jobID = taskDecision.getJobId(); String taskID = taskDecision.getTaskId(); // Task allocations that were rejected because of lack of resources or which were canceled and // not scheduled again are returned to the user. if(taskDecision.getStatus() == AllocationStatus.REJECTED){ Job jobGridlet = this.jobRegistry.get(jobID); send(output, 0, GridSimTags.GRIDLET_RETURN, new IO_data(jobGridlet, 0, jobGridlet.getSenderId())); continue; } AllocationInterface allocations[] = taskDecision.getAllocation(); //indicates, whether the scheduling decision concerns a new task or a previously scheduled task boolean schedulingNewTask = true; /* Map tasksMap = allTasks.get(jobID); if (tasksMap == null) { tasksMap = new HashMap(); allTasks.put(jobID, tasksMap); } Task taskGridlet = tasksMap.get(taskID); if (taskGridlet == null) { Job jg = jobsMap.get(jobID); taskGridlet = jg.getTask(taskID); tasksMap.put(taskID, taskGridlet); } else { schedulingNewTask = false; } */ if (schedulingNewTask) { // create new sub tasks //sendSubTasks(taskGridlet, processDecisions); Task taskGridlet = (Task) this.jobRegistry.getTaskInfo(jobID, taskID); sendSubTasks(taskGridlet, allocations); taskGridlet.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED); } else { // move task // SubTaskGridlet[] simpleGridlets = taskGridlet.getGridlets(); // moveSubTasks(simpleGridlets, allocations); } }catch (Exception e){ e.printStackTrace(); } } } /** * Sends (spreads) the given task gridlet to resources * @param taskGridlet the task gridlet to be sent * @param resourceIDs the IDs of resources to which the gridlet is to be sent * @param reservationID the IDs of reservations * @param procCounts the number of processor to be used on each resource * @param peRatings the processing elements rating to be used on each resource * @pre resourceIDs.length == reservationID.length == procCounts.length == peRatings.length == memRequests.length */ protected void sendSubTasks(Task task, AllocationInterface allocations[] ) { for (int i = 0; i < allocations.length; i++) { AllocationInterface allocation = allocations[i]; String refersTo = allocation.getProcessGroupId(); // null;//allocation.getRefersTo(); if(refersTo == null) refersTo = task.getId(); Executable exec = null; if(refersTo.equals(task.getId())){ exec = new Executable(task); } else { List processes = task.getProcesses(); if(processes == null) { try { log.error("Allocation: " + allocation.getDocument() + "\nrefers to unknown task or processes set." + " Set correct value (task id or prcesses set id) for allocation refersTo attribute."); } catch (Exception e) { e.printStackTrace(); } } boolean found = false; for(int j = 0; j < processes.size() && !found; j++){ AbstractProcesses procesesSet = processes.get(j); if(refersTo.equals(procesesSet.getId())){ exec = new Executable(task, procesesSet); found = true; } } if(!found){ log.error("Allocation refers to unknown proceses set."); } } exec.setUserID(this.get_id()); exec.setLength(task.getLength()); exec.setReservationId(allocation.getReservationId()); executables.add(exec); HostInterface host = allocation.getHost(); ComputingResourceTypeInterface crt = host.getMachineParameters(); if(crt != null){ ComputingResourceTypeItemInterface crti = crt.getComputingResourceTypeItem(0); if(crti != null){ ParameterPropertyInterface properties[] = crti.getHostParameter().getProperty(); for(int p = 0; p < properties.length; p++){ ParameterPropertyInterface property = properties[p]; if("chosenCPUs".equals(property.getName())){ Object cpuNames = property.getValue(); exec.addSpecificResource(ResourceParameterName.FREECPUS, cpuNames); } } } } String resourceID = allocation.getHost().getHostname(); int resID = getEntityId(resourceID); IO_data data = new IO_data(exec, 0, resID); send(output, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, data); if(log.isDebugEnabled()) log.debug("Submitted task " + exec.getJobId() + "_" + exec.getId() + " to resource " + resourceID); } } protected List prepareJobDescription(Sim_event ev){ List jobsList = (List) ev.get_data(); int senderId = ev.get_src(); ArrayList ret = new ArrayList(jobsList.size()); DateTime submitionTime = new DateTime(); for (JobDescription jobDescription : jobsList) { try { // transform job description to resource requirements if(log.isInfoEnabled()) log.info("Received job " + jobDescription.getJobId() + " at " + submitionTime); Job newJob = new Job(jobDescription.getJobId()); newJob.setSenderId(senderId); for (TaskDescription taskDescription : jobDescription) { String xmlResReq = this.xsltTransformer.taskToResourceRequirements( taskDescription.getDocument(), jobDescription.getJobId(), taskDescription.getUserDn(), submitionTime); Task newTask = new Task(xmlResReq); newTask.setSenderId(senderId); newTask.setStatus((int)BrokerConstants.TASK_STATUS_UNSUBMITTED); newTask.setLength(taskDescription.getTaskLength()); newTask.setWorkloadLogWaitTime(taskDescription.getWorkloadLogWaitTime()); // newTask.setSubmissionTime(taskDescription.getSubmissionTime()); if(log.isInfoEnabled()) log.info("Received task " + newTask.getId() + " at " + newTask.getSubmissionTimeToBroker()); newJob.add(newTask); } ret.add(newJob); jobDescription.discardUnused(); } catch (Exception e){ log.error(e.getMessage()); e.printStackTrace(); } } return ret; } public void notifyCanceledGridlet(AbstractExecutable taskGridlet){ if(log.isDebugEnabled()) log.debug("przyszedl scancelowany gridlet."); Executable task = (Executable) taskGridlet; String jobID = task.getJobId(); SchedulingPlanInterface decision = null; TaskInterface tii = jobRegistry.getTaskInfo(jobID, task.getId()); try { tii.setStatus((int)BrokerConstants.JOB_STATUS_CANCELED); TaskCanceledEvent event = new TaskCanceledEvent(task.getJobId(), task.getTaskId()); event.setReason(SchedulingEventReason.RESERVATION_EXCEEDED); decision = gridSchedulerPlugin.schedule(event, unfinishedJobs, unfinishedTasks, jobRegistry, moduleList, null); if(decision == null) return; execute(decision); } catch (Exception e) { log.error("Exception during scheduling. " + e.getMessage()); e.printStackTrace(); } } public void notifyReturnedGridlet(AbstractExecutable simpleGridlet) { Executable exec = (Executable) simpleGridlet; long duration = Double.valueOf(exec.getFinishTime() - exec.getExecStartTime()).longValue(); log.debug("Executable " + exec.getId() + "\nstart time: " + new java.util.Date(Double.valueOf(exec.getExecStartTime()).longValue() * 1000) + "\nfinish time: " + new java.util.Date(Double.valueOf(exec.getFinishTime()).longValue() * 1000) + "\nduration: " + new Duration(duration * 1000)); try { Job job = this.jobRegistry.get(exec.getJobId()); Task task = job.getTask(exec.getTaskId()); if(exec.getProcessesId() == null){ task.setStatus(exec.getStatus()); } else { List processesList = task.getProcesses(); for(int i = 0; i < processesList.size(); i++){ AbstractProcesses processes = processesList.get(i); if(processes.getId().equals(exec.getProcessesId())){ processes.setStatus(exec.getStatus()); break; } } } if(job.isFinished()){ send(output, 0, GridSimTags.GRIDLET_RETURN, new IO_data(job, 0, job.getSenderId())); } else { scheduleJob(new ArrayList(), null, null, moduleList, null, null, null, job.getId(), null); } } catch (NoSuchFieldException e) { e.printStackTrace(); } } public ExecutablesList getExecutables(){ return this.executables; } @Override public void processRequest(Sim_event ev) { // TODO Auto-generated method stub } @Override public void send(int entityID, double delay, int gridSimTag, Object data) { // TODO Auto-generated method stub } }