package schedframe.scheduling.policy.global; import java.util.ArrayList; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.exolab.castor.types.Duration; import org.joda.time.DateTime; import org.joda.time.DateTimeUtilsExt; import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus; import qcg.shared.constants.BrokerConstants; import schedframe.events.scheduling.EventReason; import schedframe.events.scheduling.SchedulingEvent; import schedframe.events.scheduling.TaskArrivedEvent; import schedframe.events.scheduling.TaskCanceledEvent; import schedframe.events.scheduling.TimerEvent; import schedframe.scheduling.Scheduler; import schedframe.scheduling.WorkloadUnitHandler; import schedframe.scheduling.WorkloadUnitListImpl; import schedframe.scheduling.plan.AllocationInterface; import schedframe.scheduling.plan.ScheduledTaskInterface; import schedframe.scheduling.plan.SchedulingPlanInterface; import schedframe.scheduling.plugin.SchedulingPlugin; import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin; import schedframe.scheduling.policy.AbstractManagementSystem; import schedframe.scheduling.queue.TaskQueueList; import schedframe.scheduling.tasks.AbstractProcesses; import schedframe.scheduling.tasks.Job; import schedframe.scheduling.tasks.JobInterface; import schedframe.scheduling.tasks.SubmittedTask; import schedframe.scheduling.tasks.Task; import schedframe.scheduling.tasks.TaskInterface; import schedframe.scheduling.tasks.WorkloadUnit; import eduni.simjava.Sim_event; import gridsim.GridSim; import gridsim.GridSimTags; import gridsim.Gridlet; import gridsim.IO_data; import gridsim.gssim.WormsTags; import gssim.schedframe.scheduling.ExecTask; import gssim.schedframe.scheduling.Executable; public class GlobalManagementSystem extends AbstractManagementSystem { private static Log log = LogFactory.getLog(GlobalManagementSystem.class); public GlobalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin, ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues) throws Exception { super(providerId, entityName, execTimeEstimationPlugin, queues); /*schedulingPlugin = (GlobalSchedulingPlugin) InstanceFactory.createInstance( schedulingPluginClassName, GlobalSchedulingPlugin.class);*/ if(schedPlugin == null){ throw new Exception("Can not create global scheduling plugin instance"); } this.schedulingPlugin = schedPlugin; } public void processEvent(Sim_event ev) { int tag = ev.get_tag(); switch (tag) { case WormsTags.TIMER: if (pluginSupportsEvent(WormsTags.TIMER)) { TimerEvent event = new TimerEvent(); SchedulingPlanInterface decision = schedulingPlugin.schedule(event, queues, getJobRegistry(), getResourceManager(), moduleList); executeSchedulingPlan(decision); } sendTimerEvent(); break; } } public void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack) { if (!pluginSupportsEvent(GridSimTags.GRIDLET_SUBMIT)) { log.error("Plugin " + schedulingPlugin.getClass() + " does not provide support for TASK_ARRIVED event.\n" + "Check plugin configuration or use default one."); return; } registerWorkloadUnit(wu); /*Job job = (Job) wu; jobRegistry.addJob(job); if (log.isInfoEnabled()) log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis())); List> jobsList = new ArrayList>(); jobsList.add(job); WorkloadUnitList readyWorkloadUnits = new WorkloadUnitList(); readyWorkloadUnits.addAll(jobRegistry.getReadyTasks(jobsList)); schedulingPlugin.placeJobsInQueues(readyWorkloadUnits, queues, getResourceManager(), moduleList); schedule(new TaskArrivedEvent());*/ } private void registerWorkloadUnit(WorkloadUnit wu){ if(!wu.isRegistered()){ wu.register(jobRegistry); } wu.accept(getWorkloadUnitHandler()); } protected void scheduleReadyTasks(Job job){ List> jobsList = new ArrayList>(); jobsList.add(job); WorkloadUnitListImpl readyWorkloadUnits = new WorkloadUnitListImpl(); readyWorkloadUnits.addAll(jobRegistry.getReadyTasks(jobsList)); schedulingPlugin.placeJobsInQueues(readyWorkloadUnits, queues, getResourceManager(), moduleList); schedule(new TaskArrivedEvent()); } protected void schedule(SchedulingEvent schedulingEvent) { try { SchedulingPlanInterface decision = schedulingPlugin.schedule( schedulingEvent, queues, getJobRegistry(), getResourceManager(), moduleList); if (decision == null) return; executeSchedulingPlan(decision); } catch (Exception e) { e.printStackTrace(); } } public void notifyReturnedWorkloadUnit(WorkloadUnit wu) { Executable exec = (Executable) wu; long duration = Double.valueOf(exec.getFinishTime() - exec.getExecStartTime()).longValue(); log.debug("Executable " + exec.getJobId() + "_" + exec.getId() + "\nstart time: " + new java.util.Date(Double.valueOf(exec.getExecStartTime()).longValue() * 1000) + "\nfinish time: " + new java.util.Date(Double.valueOf(exec.getFinishTime()).longValue() * 1000) + "\nduration: " + new Duration(duration * 1000)); try { Job job = jobRegistry.get(exec.getJobId()); /*Task task = job.getTask(exec.getTaskId()); if(exec.getProcessesId() == null){ try { task.setStatus(exec.getStatus()); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { List processesList = task.getProcesses(); for(int i = 0; i < processesList.size(); i++){ AbstractProcesses processes = processesList.get(i); if(processes.getId().equals(exec.getProcessesId())){ processes.setStatus(exec.getStatus()); break; } } }*/ if(job.isFinished()){ scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job); } else { scheduleReadyTasks(job); /*List> jobs = new ArrayList>(); jobs.add(jobRegistry.getJobInfo(job.getId())); WorkloadUnitList readyWorkloadUnits = new WorkloadUnitList(); readyWorkloadUnits.addAll(jobRegistry.getReadyTasks(jobs)); schedulingPlugin.placeJobsInQueues(readyWorkloadUnits, queues, getResourceManager(), moduleList); schedule(new TaskArrivedEvent());*/ } } catch (Exception e) { e.printStackTrace(); } } public void notifyCanceledWorkloadUnit(WorkloadUnit wu){; Executable task = (Executable) wu; String jobID = task.getJobId(); String taskID = task.getId(); if(log.isDebugEnabled()) log.debug("Received canceled job" + jobID + "_" + taskID); TaskInterface ti = jobRegistry.getTaskInfo(jobID, taskID) ; try { ti.setStatus((int)BrokerConstants.JOB_STATUS_CANCELED); TaskCanceledEvent event = new TaskCanceledEvent(jobID, taskID); event.setReason(EventReason.RESERVATION_EXCEEDED); schedule(event); } catch (Exception e) { log.error("Exception during scheduling. " + e.getMessage()); e.printStackTrace(); } } protected void executeSchedulingPlan(SchedulingPlanInterface decision) { ArrayList taskSchedulingDecisions = decision.getTasks(); for (int i = 0; i < taskSchedulingDecisions.size(); i++) { try { ScheduledTaskInterface taskDecision = taskSchedulingDecisions.get(i); //log.info(decision.getDocument()); String jobID = taskDecision.getJobId(); String taskID = taskDecision.getTaskId(); // Task allocations that were rejected because of lack of resources or which were canceled and // not scheduled again are returned to the user. if(taskDecision.getStatus() == AllocationStatus.REJECTED){ Job job = jobRegistry.get(jobID); scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job); continue; } ArrayList allocations = taskDecision.getAllocations(); Task task = (Task) jobRegistry.getTaskInfo(jobID, taskID); for (int j = 0; j < allocations.size(); j++) { AllocationInterface allocation = allocations.get(j); Executable exec = jobRegistry.createExecutable(task, allocation); submitWorkloadUnit(exec, allocation); task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED); } }catch (Exception e){ e.printStackTrace(); } } } protected void submitWorkloadUnit(WorkloadUnit job, AllocationInterface allocation) { String providerName = allocation.getProviderName(); if (providerName == null) { return; } TaskInterface task = (TaskInterface) job; removeFromQueue(task); int resID = GridSim.getEntityId(providerName); IO_data data = new IO_data(job, 0, resID); scheduler.send(scheduler.getOutputPort(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, data); //scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, job); if(log.isDebugEnabled()) log.debug("Submitted job " + job.getId() + " to " + providerName); } class GlobalWorkloadUnitHandler implements WorkloadUnitHandler{ public void handleJob(Job job){ jobRegistry.addJob(job); if (log.isInfoEnabled()) log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis())); scheduleReadyTasks(job); } public void handleTask(TaskInterface task) { throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler."); } public void handleExecutable(ExecTask task) { throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler."); } public void handleSubmittedTask(SubmittedTask task) { throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler."); } } public WorkloadUnitHandler getWorkloadUnitHandler() { return new GlobalWorkloadUnitHandler(); } }