[477] | 1 | package schedframe.scheduling.policy.global; |
---|
| 2 | |
---|
| 3 | import java.util.ArrayList; |
---|
| 4 | import java.util.List; |
---|
| 5 | |
---|
| 6 | import org.apache.commons.logging.Log; |
---|
| 7 | import org.apache.commons.logging.LogFactory; |
---|
| 8 | import org.exolab.castor.types.Duration; |
---|
| 9 | import org.joda.time.DateTime; |
---|
| 10 | import org.joda.time.DateTimeUtilsExt; |
---|
| 11 | import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus; |
---|
| 12 | |
---|
[539] | 13 | import dcworms.schedframe.scheduling.ExecTask; |
---|
| 14 | import dcworms.schedframe.scheduling.Executable; |
---|
| 15 | |
---|
[477] | 16 | import qcg.shared.constants.BrokerConstants; |
---|
| 17 | import schedframe.events.scheduling.TaskArrivedEvent; |
---|
| 18 | import schedframe.events.scheduling.TimerEvent; |
---|
[539] | 19 | import schedframe.scheduling.TaskListImpl; |
---|
[477] | 20 | import schedframe.scheduling.WorkloadUnitHandler; |
---|
| 21 | import schedframe.scheduling.plan.AllocationInterface; |
---|
| 22 | import schedframe.scheduling.plan.ScheduledTaskInterface; |
---|
| 23 | import schedframe.scheduling.plan.SchedulingPlanInterface; |
---|
| 24 | import schedframe.scheduling.plugin.SchedulingPlugin; |
---|
| 25 | import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin; |
---|
| 26 | import schedframe.scheduling.policy.AbstractManagementSystem; |
---|
| 27 | import schedframe.scheduling.queue.TaskQueueList; |
---|
| 28 | import schedframe.scheduling.tasks.AbstractProcesses; |
---|
| 29 | import schedframe.scheduling.tasks.Job; |
---|
| 30 | import schedframe.scheduling.tasks.JobInterface; |
---|
| 31 | import schedframe.scheduling.tasks.Task; |
---|
| 32 | import schedframe.scheduling.tasks.TaskInterface; |
---|
| 33 | import schedframe.scheduling.tasks.WorkloadUnit; |
---|
| 34 | import eduni.simjava.Sim_event; |
---|
| 35 | import gridsim.GridSim; |
---|
| 36 | import gridsim.GridSimTags; |
---|
| 37 | import gridsim.IO_data; |
---|
[539] | 38 | import gridsim.dcworms.DCWormsTags; |
---|
[477] | 39 | |
---|
| 40 | public class GlobalManagementSystem extends AbstractManagementSystem { |
---|
| 41 | |
---|
| 42 | private static Log log = LogFactory.getLog(GlobalManagementSystem.class); |
---|
| 43 | |
---|
| 44 | public GlobalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin, |
---|
| 45 | ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues) |
---|
| 46 | throws Exception { |
---|
| 47 | super(providerId, entityName, execTimeEstimationPlugin, queues); |
---|
| 48 | |
---|
| 49 | if(schedPlugin == null){ |
---|
| 50 | throw new Exception("Can not create global scheduling plugin instance"); |
---|
| 51 | } |
---|
| 52 | |
---|
| 53 | this.schedulingPlugin = schedPlugin; |
---|
| 54 | } |
---|
| 55 | |
---|
| 56 | public void processEvent(Sim_event ev) { |
---|
| 57 | |
---|
| 58 | int tag = ev.get_tag(); |
---|
| 59 | switch (tag) { |
---|
| 60 | |
---|
[539] | 61 | case DCWormsTags.TIMER: |
---|
| 62 | if (pluginSupportsEvent(DCWormsTags.TIMER)) { |
---|
[477] | 63 | TimerEvent event = new TimerEvent(); |
---|
[539] | 64 | SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, |
---|
[477] | 65 | queues, getJobRegistry(), getResourceManager(), moduleList); |
---|
| 66 | executeSchedulingPlan(decision); |
---|
| 67 | } |
---|
| 68 | sendTimerEvent(); |
---|
| 69 | break; |
---|
| 70 | } |
---|
| 71 | } |
---|
| 72 | |
---|
[539] | 73 | public void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack) { |
---|
[477] | 74 | if (!pluginSupportsEvent(GridSimTags.GRIDLET_SUBMIT)) { |
---|
| 75 | log.error("Plugin " + schedulingPlugin.getClass() |
---|
| 76 | + " does not provide support for TASK_ARRIVED event.\n" |
---|
| 77 | + "Check plugin configuration or use default one."); |
---|
| 78 | return; |
---|
| 79 | } |
---|
| 80 | |
---|
| 81 | registerWorkloadUnit(wu); |
---|
| 82 | |
---|
| 83 | } |
---|
| 84 | |
---|
[539] | 85 | private void registerWorkloadUnit(WorkloadUnit wu){ |
---|
[477] | 86 | if(!wu.isRegistered()){ |
---|
| 87 | wu.register(jobRegistry); |
---|
| 88 | } |
---|
| 89 | wu.accept(getWorkloadUnitHandler()); |
---|
| 90 | } |
---|
| 91 | |
---|
| 92 | |
---|
[539] | 93 | protected void schedule(JobInterface<?> job){ |
---|
[477] | 94 | List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>(); |
---|
| 95 | jobsList.add(job); |
---|
[539] | 96 | TaskListImpl readyTasks = new TaskListImpl(); |
---|
| 97 | readyTasks.addAll(jobRegistry.getAvailableTasks(jobsList)); |
---|
| 98 | |
---|
| 99 | schedulingPlugin.placeTasksInQueues(readyTasks, queues, getResourceManager(), moduleList); |
---|
| 100 | SchedulingPlanInterface<?> decision = schedulingPlugin.schedule( |
---|
| 101 | new TaskArrivedEvent(), queues, getJobRegistry(), getResourceManager(), moduleList); |
---|
| 102 | if (decision != null) |
---|
| 103 | executeSchedulingPlan(decision); |
---|
[477] | 104 | } |
---|
| 105 | |
---|
[539] | 106 | public void notifyReturnedWorkloadUnit(WorkloadUnit wu) { |
---|
[477] | 107 | Executable exec = (Executable) wu; |
---|
| 108 | |
---|
| 109 | long duration = Double.valueOf(exec.getFinishTime() - exec.getExecStartTime()).longValue(); |
---|
| 110 | log.debug("Executable " + exec.getJobId() + "_" + exec.getId() + "\nstart time: " + |
---|
| 111 | new java.util.Date(Double.valueOf(exec.getExecStartTime()).longValue() * 1000) |
---|
| 112 | + "\nfinish time: " + new java.util.Date(Double.valueOf(exec.getFinishTime()).longValue() * 1000) |
---|
| 113 | + "\nduration: " + new Duration(duration * 1000)); |
---|
| 114 | |
---|
| 115 | try { |
---|
[539] | 116 | Job job = jobRegistry.getJob(exec.getJobId()); |
---|
| 117 | Task task = job.getTask(exec.getTaskId()); |
---|
[477] | 118 | if(exec.getProcessesId() == null){ |
---|
| 119 | try { |
---|
| 120 | task.setStatus(exec.getStatus()); |
---|
| 121 | } catch (Exception e) { |
---|
[539] | 122 | |
---|
[477] | 123 | } |
---|
| 124 | } else { |
---|
| 125 | List<AbstractProcesses> processesList = task.getProcesses(); |
---|
| 126 | for(int i = 0; i < processesList.size(); i++){ |
---|
| 127 | AbstractProcesses processes = processesList.get(i); |
---|
| 128 | if(processes.getId().equals(exec.getProcessesId())){ |
---|
| 129 | processes.setStatus(exec.getStatus()); |
---|
| 130 | break; |
---|
| 131 | } |
---|
| 132 | } |
---|
[539] | 133 | } |
---|
[477] | 134 | |
---|
| 135 | if(job.isFinished()){ |
---|
| 136 | scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job); |
---|
| 137 | } |
---|
| 138 | else { |
---|
[539] | 139 | schedule(job); |
---|
[477] | 140 | } |
---|
| 141 | |
---|
| 142 | } catch (Exception e) { |
---|
| 143 | e.printStackTrace(); |
---|
| 144 | } |
---|
| 145 | |
---|
| 146 | } |
---|
| 147 | |
---|
[539] | 148 | protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) { |
---|
[477] | 149 | |
---|
[539] | 150 | ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks(); |
---|
| 151 | for (int i = 0; i < taskSchedulingDecisions.size(); i++) { |
---|
[477] | 152 | |
---|
[539] | 153 | ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i); |
---|
| 154 | |
---|
| 155 | //log.info(decision.getDocument()); |
---|
| 156 | |
---|
| 157 | String jobID = taskDecision.getJobId(); |
---|
| 158 | String taskID = taskDecision.getTaskId(); |
---|
[477] | 159 | |
---|
[539] | 160 | // Task allocations that were rejected because of lack of resources or which were canceled and |
---|
| 161 | // not scheduled again are returned to the user. |
---|
| 162 | if(taskDecision.getStatus() == AllocationStatus.REJECTED){ |
---|
| 163 | Job job = jobRegistry.getJob(jobID); |
---|
| 164 | scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job); |
---|
| 165 | continue; |
---|
| 166 | } |
---|
[477] | 167 | |
---|
[539] | 168 | Task task = (Task) jobRegistry.getTaskInfo(jobID, taskID); |
---|
| 169 | |
---|
| 170 | ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations(); |
---|
| 171 | for (int j = 0; j < allocations.size(); j++) { |
---|
| 172 | |
---|
| 173 | AllocationInterface<?> allocation = allocations.get(j); |
---|
| 174 | Executable exec = createExecutable(task, allocation); |
---|
| 175 | submitTask(exec, allocation); |
---|
| 176 | task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED); |
---|
| 177 | } |
---|
[477] | 178 | } |
---|
| 179 | } |
---|
| 180 | |
---|
[539] | 181 | private Executable createExecutable(Task task, AllocationInterface<?> allocation) { |
---|
[477] | 182 | |
---|
[539] | 183 | String refersTo = allocation.getProcessGroupId(); // null;//allocation.getRefersTo(); |
---|
| 184 | if(refersTo == null) |
---|
| 185 | refersTo = task.getId(); |
---|
| 186 | |
---|
| 187 | Executable exec = null; |
---|
| 188 | |
---|
| 189 | if(refersTo.equals(task.getId())){ |
---|
| 190 | exec = new Executable(task); |
---|
| 191 | } else { |
---|
| 192 | List<AbstractProcesses> processes = task.getProcesses(); |
---|
| 193 | if(processes == null) { |
---|
| 194 | try { |
---|
| 195 | log.error("Allocation: " + allocation.getDocument() + "\nrefers to unknown task or processes set." + |
---|
| 196 | " Set correct value (task id or prcesses set id) for allocation refersTo attribute."); |
---|
| 197 | } catch (Exception e) { |
---|
| 198 | e.printStackTrace(); |
---|
[477] | 199 | } |
---|
[539] | 200 | } |
---|
| 201 | boolean found = false; |
---|
| 202 | for(int j = 0; j < processes.size() && !found; j++){ |
---|
| 203 | AbstractProcesses procesesSet = processes.get(j); |
---|
| 204 | if(refersTo.equals(procesesSet.getId())){ |
---|
| 205 | exec = new Executable(task, procesesSet); |
---|
| 206 | found = true; |
---|
| 207 | } |
---|
| 208 | } |
---|
| 209 | if(!found){ |
---|
| 210 | log.error("Allocation refers to unknown proceses set."); |
---|
| 211 | } |
---|
| 212 | } |
---|
[477] | 213 | |
---|
[539] | 214 | exec.setReservationId(allocation.getReservationId()); |
---|
| 215 | |
---|
| 216 | /*HostInterface<?> host = allocation.getHost(); |
---|
| 217 | ComputingResourceTypeInterface<?> crt = host.getMachineParameters(); |
---|
| 218 | if(crt != null){ |
---|
| 219 | ComputingResourceTypeItemInterface<?> crti = crt.getComputingResourceTypeItem(0); |
---|
| 220 | if(crti != null){ |
---|
| 221 | ParameterPropertyInterface<?> properties[] = crti.getHostParameter().getProperty(); |
---|
| 222 | for(int p = 0; p < properties.length; p++){ |
---|
| 223 | ParameterPropertyInterface<?> property = properties[p]; |
---|
| 224 | if("chosenCPUs".equals(property.getName())){ |
---|
| 225 | Object cpuNames = property.getValue(); |
---|
| 226 | exec.addSpecificResource(ResourceParameterName.FREECPUS, cpuNames); |
---|
| 227 | } |
---|
| 228 | } |
---|
[477] | 229 | } |
---|
[539] | 230 | }*/ |
---|
| 231 | return exec; |
---|
[477] | 232 | } |
---|
[539] | 233 | |
---|
| 234 | protected void submitTask(TaskInterface<?> task, AllocationInterface<?> allocation) { |
---|
[477] | 235 | |
---|
| 236 | String providerName = allocation.getProviderName(); |
---|
| 237 | if (providerName == null) { |
---|
| 238 | return; |
---|
| 239 | } |
---|
| 240 | removeFromQueue(task); |
---|
| 241 | |
---|
| 242 | int resID = GridSim.getEntityId(providerName); |
---|
[539] | 243 | IO_data data = new IO_data(task, 0, resID); |
---|
[477] | 244 | scheduler.send(scheduler.getOutputPort(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, data); |
---|
[539] | 245 | //scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, job); |
---|
[477] | 246 | |
---|
| 247 | if(log.isDebugEnabled()) |
---|
[539] | 248 | log.debug("Submitted job " + task.getId() + " to " + providerName); |
---|
| 249 | |
---|
[477] | 250 | } |
---|
| 251 | |
---|
| 252 | class GlobalWorkloadUnitHandler implements WorkloadUnitHandler{ |
---|
| 253 | |
---|
[539] | 254 | public void handleJob(JobInterface<?> job){ |
---|
[477] | 255 | if (log.isInfoEnabled()) |
---|
| 256 | log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis())); |
---|
| 257 | |
---|
[539] | 258 | jobRegistry.addJob(job); |
---|
| 259 | schedule(job); |
---|
[477] | 260 | } |
---|
| 261 | |
---|
| 262 | public void handleTask(TaskInterface<?> task) { |
---|
| 263 | throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler."); |
---|
| 264 | } |
---|
| 265 | |
---|
| 266 | public void handleExecutable(ExecTask task) { |
---|
| 267 | throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler."); |
---|
| 268 | } |
---|
| 269 | } |
---|
| 270 | |
---|
| 271 | public WorkloadUnitHandler getWorkloadUnitHandler() { |
---|
| 272 | return new GlobalWorkloadUnitHandler(); |
---|
| 273 | } |
---|
| 274 | |
---|
| 275 | |
---|
| 276 | } |
---|