package test.rewolucja.scheduling.implementation; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.exolab.castor.types.Duration; import org.joda.time.DateTime; import org.joda.time.DateTimeUtilsExt; import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus; import schedframe.scheduling.AbstractProcesses; import schedframe.scheduling.Job; import schedframe.scheduling.JobInterface; import schedframe.scheduling.Task; import schedframe.scheduling.TaskInterface; import schedframe.scheduling.events.SchedulingEventReason; import schedframe.scheduling.events.SchedulingEventType; import schedframe.scheduling.events.TaskArrivedEvent; import schedframe.scheduling.events.TaskCanceledEvent; import schedframe.scheduling.events.TimerEvent; import schedframe.scheduling.plugin.SchedulingPluginConfiguration; import schedframe.scheduling.plugin.estimation.ExecTimeEstimationPlugin; import schedframe.scheduling.plugin.grid.GlobalSchedulingPlugin; import simulator.utils.InstanceFactory; import test.rewolucja.GSSIMJobInterface; import test.rewolucja.resources.description.ExecResourceDescription; import test.rewolucja.scheduling.plan.AllocationInterfaceNew; import test.rewolucja.scheduling.plan.ScheduledTaskInterfaceNew; import test.rewolucja.scheduling.plan.SchedulingPlanInterfaceNew; import test.rewolucja.task.JobList; import eduni.simjava.Sim_event; import gridsim.GridSim; import gridsim.GridSimTags; import gridsim.IO_data; import gridsim.gssim.GssimTags; import grms.shared.constants.BrokerConstants; import gssim.schedframe.scheduling.ExecTaskInterface; import gssim.schedframe.scheduling.Executable; public class GlobalManagementSystem extends ManagementSystem { private static Log log = LogFactory.getLog(GlobalManagementSystem.class); public GlobalManagementSystem(String providerId, String entityName, String schedulingPluginClassName, ExecTimeEstimationPlugin execTimeEstimationPlugin, ExecResourceDescription resourceDescription) throws Exception { super(providerId, entityName, schedulingPluginClassName, execTimeEstimationPlugin, resourceDescription); schedulingPlugin = (GlobalSchedulingPlugin) InstanceFactory.createInstance( schedulingPluginClassName, GlobalSchedulingPlugin.class); if(schedulingPlugin == null){ throw new Exception("Can not create grid scheduling plugin instance"); } } public void processEvent(Sim_event ev) { int tag = ev.get_tag(); switch (tag) { case GssimTags.TIMER: if (pluginSupportsEvent(GssimTags.TIMER)) { TimerEvent event = new TimerEvent(); SchedulingPlanInterfaceNew decision = schedulingPlugin.schedule(event, queues, getJobRegistry(), getResourceManager(), moduleList); executeSchedulingPlan(decision); } sendTimerEvent(); break; } } public void notifySubmittedJob(GSSIMJobInterface gssimJob, boolean ack) { if (!pluginSupportsEvent(GridSimTags.GRIDLET_SUBMIT)) { log.error("Plugin " + schedulingPlugin.getClass() + " does not provide support for TASK_ARRIVED event.\n" + "Check plugin configuration or use default one."); return; } Job job = (Job) gssimJob; jobRegistry.addJob(job); if (log.isInfoEnabled()) log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis())); List> jobsList = new ArrayList>(); jobsList.add(job); JobList tasks = new JobList(); tasks.addAll(jobRegistry.getReadyTasks(jobsList)); schedulingPlugin.placeJobsInQueues(tasks, queues, getResourceManager(), moduleList); schedule(new TaskArrivedEvent()); } public void notifyReturnedJob(GSSIMJobInterface gssimJob) { Executable exec = (Executable) gssimJob; long duration = Double.valueOf(exec.getFinishTime() - exec.getExecStartTime()).longValue(); log.debug("Executable " + exec.getJobId() + "_" + exec.getId() + "\nstart time: " + new java.util.Date(Double.valueOf(exec.getExecStartTime()).longValue() * 1000) + "\nfinish time: " + new java.util.Date(Double.valueOf(exec.getFinishTime()).longValue() * 1000) + "\nduration: " + new Duration(duration * 1000)); try { Job job = jobRegistry.get(exec.getJobId()); Task task = job.getTask(exec.getTaskId()); if(exec.getProcessesId() == null){ try { task.setStatus(exec.getStatus()); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { List processesList = task.getProcesses(); for(int i = 0; i < processesList.size(); i++){ AbstractProcesses processes = processesList.get(i); if(processes.getId().equals(exec.getProcessesId())){ processes.setStatus(exec.getStatus()); break; } } } if(job.isFinished()){ logicalResource.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job); } else { //prepareTaskQueue(new ArrayList(), job.getId()); List> jobs = new ArrayList>(); jobs.add(jobRegistry.getJobInfo(job.getId())); JobList tasks = new JobList(); tasks.addAll(jobRegistry.getReadyTasks(jobs)); schedulingPlugin.placeJobsInQueues(tasks, queues, getResourceManager(), moduleList); schedule(new TaskArrivedEvent()); } } catch (NoSuchFieldException e) { e.printStackTrace(); } } public void notifyCanceledJob(GSSIMJobInterface job){; Executable task = (Executable) job; String jobID = task.getJobId(); String taskID = task.getId(); if(log.isDebugEnabled()) log.debug("Received canceled job" + jobID + "_" + taskID); TaskInterface tii = jobRegistry.getTaskInfo(jobID, taskID) ; try { tii.setStatus((int)BrokerConstants.JOB_STATUS_CANCELED); TaskCanceledEvent event = new TaskCanceledEvent(jobID, taskID); event.setReason(SchedulingEventReason.RESERVATION_EXCEEDED); schedule(event); } catch (Exception e) { log.error("Exception during scheduling. " + e.getMessage()); e.printStackTrace(); } } protected void executeSchedulingPlan(SchedulingPlanInterfaceNew decision) { ArrayList taskSchedulingDecisions = decision.getTasks(); for (int i = 0; i < taskSchedulingDecisions.size(); i++) { try { ScheduledTaskInterfaceNew taskDecision = taskSchedulingDecisions.get(i); //log.info(decision.getDocument()); String jobID = taskDecision.getJobId(); String taskID = taskDecision.getTaskId(); // Task allocations that were rejected because of lack of resources or which were canceled and // not scheduled again are returned to the user. if(taskDecision.getStatus() == AllocationStatus.REJECTED){ Job job = jobRegistry.get(jobID); logicalResource.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job); continue; } ArrayList allocations = taskDecision.getAllocations(); Task task = (Task) jobRegistry.getTaskInfo(jobID, taskID); for (int j = 0; j < allocations.size(); j++) { AllocationInterfaceNew allocation = allocations.get(j); Executable exec = jobRegistry.createExecutable(task, allocation); submitJob(exec, allocation); } }catch (Exception e){ e.printStackTrace(); } } } protected void submitJob(GSSIMJobInterface job, AllocationInterfaceNew allocation) { String providerName = allocation.getProviderName(); if (providerName == null) { return; } ExecTaskInterface task = (ExecTaskInterface) job; removeFromQueue(task); int resID = GridSim.getEntityId(providerName); IO_data data = new IO_data(job, 0, resID); logicalResource.send(logicalResource.getOutputPort(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, data); //logicalResource.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, job); if(log.isDebugEnabled()) try { log.debug("Submitted job " + job.getId() + " to " + providerName); } catch (NoSuchFieldException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public boolean pluginSupportsEvent(int eventType) { SchedulingPluginConfiguration config = (SchedulingPluginConfiguration) schedulingPlugin.getConfiguration(); if (config == null) return false; Map servedEvent = config.getServedEvents(); if (servedEvent == null) return false; switch (eventType) { case GssimTags.TIMER: return servedEvent.containsKey(SchedulingEventType.TIMER); case GssimTags.GRIDLET_SUBMIT: return servedEvent.containsKey(SchedulingEventType.TASK_ARRIVED); case GssimTags.GRIDLET_CANCEL: return servedEvent.containsKey(SchedulingEventType.TASK_CANCELED); case GssimTags.GRIDLET_RESUME: return servedEvent.containsKey(SchedulingEventType.TASK_ARRIVED); default: return false; } } }