Changeset 1362 for DCWoRMS/branches/coolemall/src/schedframe/scheduling/policy/local/LocalManagementSystem.java
- Timestamp:
- 06/03/14 15:12:11 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
DCWoRMS/branches/coolemall/src/schedframe/scheduling/policy/local/LocalManagementSystem.java
r1357 r1362 1 1 package schedframe.scheduling.policy.local; 2 3 import dcworms.schedframe.scheduling.ExecTask;4 import dcworms.schedframe.scheduling.Executable;5 import eduni.simjava.Sim_event;6 import eduni.simjava.Sim_system;7 import gridsim.dcworms.DCWormsTags;8 import gridsim.dcworms.filter.ExecTaskFilter;9 2 10 3 import java.util.ArrayList; 11 4 import java.util.HashMap; 12 5 import java.util.Iterator; 6 import java.util.LinkedList; 13 7 import java.util.List; 14 8 import java.util.Map; 15 16 import org.apache.commons.lang.ArrayUtils; 9 import java.util.Set; 10 17 11 import org.apache.commons.logging.Log; 18 12 import org.apache.commons.logging.LogFactory; … … 38 32 import schedframe.resources.units.ResourceUnitName; 39 33 import schedframe.resources.units.StandardResourceUnitName; 34 import schedframe.scheduling.ExecutionHistoryItem; 40 35 import schedframe.scheduling.ResourceHistoryItem; 41 36 import schedframe.scheduling.Scheduler; 42 37 import schedframe.scheduling.TaskList; 43 38 import schedframe.scheduling.TaskListImpl; 44 import schedframe.scheduling.UsedResourcesList;45 39 import schedframe.scheduling.WorkloadUnitHandler; 46 40 import schedframe.scheduling.manager.resources.LocalResourceManager; … … 64 58 import simulator.stats.GSSAccumulator; 65 59 import simulator.utils.DoubleMath; 60 import dcworms.schedframe.scheduling.ExecTask; 61 import dcworms.schedframe.scheduling.Executable; 62 import eduni.simjava.Sim_event; 63 import eduni.simjava.Sim_system; 64 import gridsim.dcworms.DCWormsTags; 65 import gridsim.dcworms.filter.ExecTaskFilter; 66 66 67 67 public class LocalManagementSystem extends AbstractManagementSystem { … … 127 127 finalizeExecutable(execTask); 128 128 sendFinishedWorkloadUnit(execTask); 129 log.debug(execTask.getJobId() + "_" + execTask.getId() + " finished execution on " + new DateTime());130 log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());131 129 if (pluginSupportsEvent(tag)) { 132 130 SchedulingEvent event = new TaskFinishedEvent(execTask.getJobId(), execTask.getId()); … … 164 162 break; 165 163 164 case DCWormsTags.TASK_PAUSE:{ 165 String[] ids = (String[]) ev.get_data(); 166 execTask = jobRegistry.getTask(ids[0], ids[1]); 167 taskPause(execTask); 168 if (pluginSupportsEvent(tag)) { 169 SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_PAUSED); 170 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 171 queues, getJobRegistry(), getResourceManager(), moduleList); 172 executeSchedulingPlan(decision); 173 } 174 } 175 break; 176 177 case DCWormsTags.TASK_RESUME:{ 178 String[] ids = (String[]) ev.get_data(); 179 execTask = jobRegistry.getTask(ids[0], ids[1]); 180 taskResume(execTask, execTask.getAllocatedResources().getLast().getResourceUnits()); 181 if (pluginSupportsEvent(tag)) { 182 SchedulingEvent event = new StartTaskExecutionEvent(ids[0], ids[1]); 183 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 184 queues, getJobRegistry(), getResourceManager(), moduleList); 185 executeSchedulingPlan(decision); 186 } 187 } 188 break; 189 190 case DCWormsTags.TASK_MOVE:{ 191 Object[] data = (Object[]) ev.get_data(); 192 execTask = jobRegistry.getTask((String)data[0], (String)data[1]); 193 taskMove(execTask, (Map<ResourceUnitName, ResourceUnit>)data[2]); 194 if (pluginSupportsEvent(tag)) { 195 SchedulingEvent event = new StartTaskExecutionEvent((String)data[0], (String)data[1]); 196 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 197 queues, getJobRegistry(), getResourceManager(), moduleList); 198 executeSchedulingPlan(decision); 199 } 200 } 201 break; 166 202 167 203 case DCWormsTags.TASK_EXECUTION_CHANGED: 168 204 execTask = (ExecTask) ev.get_data(); 169 updateTaskExecution (execTask, SchedulingEventType.RESOURCE_STATE_CHANGED);205 updateTaskExecutionPhase(execTask, SchedulingEventType.RESOURCE_STATE_CHANGED); 170 206 break; 171 207 172 208 case DCWormsTags.UPDATE_PROCESSING: 173 updateProcessingTimes( ev);209 updateProcessingTimes(); 174 210 break; 175 211 … … 177 213 if (pluginSupportsEvent(tag)) { 178 214 SchedulingEvent event = new SchedulingEvent(SchedulingEventType.POWER_LIMIT_EXCEEDED); 179 schedulingPlugin.schedule(event,215 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 180 216 queues, getJobRegistry(), getResourceManager(), moduleList); 217 executeSchedulingPlan(decision); 181 218 } 182 219 break; … … 184 221 } 185 222 223 224 public void taskPause(ExecTask execTask) { 225 if (execTask == null) { 226 return; 227 } else { 228 try { 229 execTask.setStatus(DCWormsTags.PAUSED); 230 231 Executable exec = (Executable) execTask; 232 Map<ResourceUnitName, ResourceUnit> lastUsed = exec.getAllocatedResources().getLast().getResourceUnits(); 233 getAllocationManager().freeResources(lastUsed); 234 235 saveExecutionHistory(exec, exec.getCompletionPercentage(), exec.getEstimatedDuration()); 236 237 ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), -1); 238 scheduler.sim_cancel(filter, null); 239 240 PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE); 241 updateComputingResources(peUnit, EnergyEventType.TASK_FINISHED, exec); 242 } catch (Exception e) { 243 // TODO Auto-generated catch block 244 e.printStackTrace(); 245 } 246 } 247 } 248 249 public void taskResume(ExecTask execTask, Map<ResourceUnitName, ResourceUnit> resources) { 250 if (execTask == null) { 251 return; 252 } else if (execTask.getStatus() == DCWormsTags.PAUSED) { 253 try { 254 execTask.setStatus(DCWormsTags.RESUMED); 255 Executable exec = (Executable) execTask; 256 257 boolean status = allocateResources(exec, resources); 258 if(status == false){ 259 TaskList newTasks = new TaskListImpl(); 260 newTasks.add(exec); 261 schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList); 262 exec.setStatus(DCWormsTags.READY); 263 } else { 264 runTask(execTask); 265 266 PEUnit peUnit = (PEUnit)resources.get(StandardResourceUnitName.PE); 267 updateComputingResources(peUnit, EnergyEventType.TASK_STARTED, exec); 268 } 269 270 } catch (Exception e) { 271 // TODO Auto-generated catch block 272 e.printStackTrace(); 273 } 274 } 275 } 276 277 public void taskMove(ExecTask execTask, Map<ResourceUnitName, ResourceUnit> map) { 278 taskPause(execTask); 279 taskResume(execTask, map); 280 } 281 186 282 public void notifyReturnedWorkloadUnit(WorkloadUnit wu) { 187 283 if (pluginSupportsEvent(DCWormsTags.TASK_EXECUTION_FINISHED)) { … … 229 325 230 326 Executable exec = (Executable)task; 231 boolean allocationStatus = getAllocationManager().allocateResources(choosenResources);327 boolean allocationStatus = allocateResources(exec, choosenResources); 232 328 if(allocationStatus == false){ 233 329 log.info("Task " + task.getJobId() + "_" + task.getId() + " requires more resources than is available at this moment."); … … 239 335 log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime()); 240 336 241 //if (phaseDuration < 0.0) 242 // return; 243 244 //exec.setEstimatedDuration(exec.getEstimatedDuration() + phaseDuration); 245 DateTime currentTime = new DateTime(); 246 ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime); 247 resHistItem.setCompletionPercentage(0); 248 exec.addUsedResources(resHistItem); 249 250 try { 251 exec.setStatus(DCWormsTags.INEXEC); 252 } catch (Exception e) { 253 // TODO Auto-generated catch block 254 e.printStackTrace(); 255 } 337 runTask(exec); 256 338 257 339 PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE); 258 340 updateComputingResources(peUnit, EnergyEventType.TASK_STARTED, exec); 259 341 260 updateTaskExecution(exec, SchedulingEventType.START_TASK_EXECUTION);261 //scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, exec);262 342 263 343 try { … … 269 349 } 270 350 271 log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad()); 272 273 } 274 351 log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad()); 352 } 353 354 private void runTask(ExecTask execTask){ 355 Executable exec = (Executable) execTask; 356 Map<ResourceUnitName, ResourceUnit> resources = exec.getAllocatedResources().getLast().getResourceUnits(); 357 358 try { 359 exec.setStatus(DCWormsTags.INEXEC); 360 } catch (Exception e) { 361 // TODO Auto-generated catch block 362 e.printStackTrace(); 363 } 364 365 int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION), 366 execTask, resources, exec.getCompletionPercentage())).intValue(); 367 368 saveExecutionHistory(exec, exec.getCompletionPercentage(), phaseDuration); 369 if(exec.getResourceConsumptionProfile().isLast()){ 370 scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);; 371 } else { 372 scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask); 373 } 374 375 PEUnit peUnit = (PEUnit)resources.get(StandardResourceUnitName.PE); 376 updateComputingResources(peUnit, EnergyEventType.TASK_STARTED, exec); 377 378 } 275 379 protected void finalizeExecutable(ExecTask execTask){ 276 380 … … 304 408 } 305 409 } 306 307 UsedResourcesList lastUsedList = exec.getUsedResources(); 308 Map<ResourceUnitName, ResourceUnit> lastUsed = lastUsedList.getLast() 309 .getResourceUnits(); 410 411 Map<ResourceUnitName, ResourceUnit> lastUsed = exec.getAllocatedResources().getLast().getResourceUnits(); 310 412 getAllocationManager().freeResources(lastUsed); 413 414 saveExecutionHistory(exec, 100,0); 311 415 312 416 PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE); 313 417 updateComputingResources(peUnit, EnergyEventType.TASK_FINISHED, exec); 418 419 log.debug(execTask.getJobId() + "_" + execTask.getId() + " finished execution on " + new DateTime()); 420 log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad()); 314 421 } 315 422 … … 326 433 ExecTask task = iter.next(); 327 434 Executable exec = (Executable)task; 328 329 if(exec.getUsedResources().size() > 1){ 330 //System.out.println("--- upadteProgressX: " + Sim_system.sim_clock() ); 331 //System.out.println("taskId: " + exec.getId() + "; completion percentage: " + exec.getCompletionPercentage() + "; timespan: " + timeSpan + "; estimatedDuration: " + exec.getEstimatedDuration()); 332 exec.setCompletionPercentage(exec.getCompletionPercentage() + 100 * (timeSpan / (exec.getEstimatedDuration()) * (1.0 - exec.getUsedResources().get(exec.getUsedResources().size()-1).getCompletionPercentage()/100.0))); 333 //System.out.println("newProgress: " + exec.getCompletionPercentage() ); 334 } 335 else { 336 //System.out.println("--- upadteProgress2: " + Sim_system.sim_clock() ); 337 //System.out.println("taskId: " + exec.getId() + "; completion percentage: " + exec.getCompletionPercentage() + "; timespan: " + timeSpan + "; estimatedDuration: " + exec.getEstimatedDuration()); 338 exec.setCompletionPercentage(exec.getCompletionPercentage() + 100 * (timeSpan / exec.getEstimatedDuration())); 339 //System.out.println("newProgress: " + exec.getCompletionPercentage() ); 340 } 435 ExecutionHistoryItem execHistItem = exec.getExecHistory().getLast(); 436 //System.out.println("--- upadteProgressX: " + Sim_system.sim_clock() ); 437 //System.out.println("taskId: " + exec.getId() + "; completion percentage: " + exec.getCompletionPercentage() + "; timespan: " + timeSpan + "; estimatedDuration: " + execHistItem.getCompletionPercentage()); 438 exec.setCompletionPercentage(exec.getCompletionPercentage() + 100 * (timeSpan / (execHistItem.getEstimatedDuration()) * (1.0 - execHistItem.getCompletionPercentage()/100.0))); 439 //System.out.println("newProgress: " + exec.getCompletionPercentage() ); 440 341 441 } 342 442 } … … 368 468 } 369 469 370 protected void updateProcessingTimes( Sim_event ev) {470 protected void updateProcessingTimes() { 371 471 for (ExecTask execTask : jobRegistry.getRunningTasks()) { 372 472 Executable exec = (Executable)execTask; 373 473 374 Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getUsedResources().getLast().getResourceUnits(); 375 double lastTimeStamp = exec.getUsedResources().getLast().getTimeStamp().getMillis()/1000; 474 475 Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getAllocatedResources().getLast().getResourceUnits(); 476 376 477 int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED), 377 478 execTask, choosenResources, exec.getCompletionPercentage())).intValue(); 378 479 379 if(DoubleMath.subtract((lastTimeStamp + exec.getEstimatedDuration()), (new DateTime().getMillis()/1000 + phaseDuration)) == 0.0){ 480 ExecutionHistoryItem execHistItem = exec.getExecHistory().getLast(); 481 double lastTimeStamp = execHistItem.getTimeStamp().getMillis()/1000; 482 if(DoubleMath.subtract((lastTimeStamp + execHistItem.getEstimatedDuration()), (new DateTime().getMillis()/1000 + phaseDuration)) == 0.0){ 380 483 continue; 381 484 } 382 485 //System.out.println("=== upadteTIme: " + Sim_system.sim_clock() ); 383 //System.out.println("execId: " + exec.getId() + "; estimatedDuration " + exec .getEstimatedDuration());384 //System.out.println("execId: " + exec.getId() + "; difference " + DoubleMath.subtract((lastTimeStamp + exec .getEstimatedDuration()), (new DateTime().getMillis()/1000 + phaseDuration)));486 //System.out.println("execId: " + exec.getId() + "; estimatedDuration " + execHistItem.getEstimatedDuration()); 487 //System.out.println("execId: " + exec.getId() + "; difference " + DoubleMath.subtract((lastTimeStamp + execHistItem.getEstimatedDuration()), (new DateTime().getMillis()/1000 + phaseDuration))); 385 488 //System.out.println("completionPercantage: " + exec.getCompletionPercentage() + "; basic duration: " +exec.getResourceConsumptionProfile().getCurrentResourceConsumption().getDuration() + "; phaseDuration: " + phaseDuration); 386 489 387 exec.setEstimatedDuration(phaseDuration); 388 DateTime currentTime = new DateTime(); 389 ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime); 390 resHistItem.setCompletionPercentage(exec.getCompletionPercentage()); 391 exec.addUsedResources(resHistItem); 392 393 if(exec.getResourceConsumptionProfile().getCurrentResourceConsumption() == exec.getResourceConsumptionProfile().getResourceConsumptionList().getLast()){ 490 saveExecutionHistory(exec, exec.getCompletionPercentage(), phaseDuration); 491 492 if(exec.getResourceConsumptionProfile().isLast()){ 394 493 ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED); 395 494 scheduler.sim_cancel(filter, null); … … 407 506 } 408 507 409 protected void updateTaskExecution (ExecTask execTask, SchedulingEventType schedEvType) {508 protected void updateTaskExecutionPhase(ExecTask execTask, SchedulingEventType schedEvType) { 410 509 411 510 if (execTask.getStatus() == DCWormsTags.INEXEC) { … … 415 514 exec.setStatus(DCWormsTags.NEW_EXEC_PHASE); 416 515 } catch (Exception e) { 417 } 418 419 Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getUsedResources().getLast().getResourceUnits(); 516 517 } 518 519 Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getAllocatedResources().getLast().getResourceUnits(); 420 520 421 521 int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(schedEvType), 422 522 execTask, choosenResources, exec.getCompletionPercentage())).intValue(); 423 424 exec.setEstimatedDuration(phaseDuration);425 426 if(exec.getResourceConsumptionProfile(). getCurrentResourceConsumption() == exec.getResourceConsumptionProfile().getResourceConsumptionList().getLast()){523 524 saveExecutionHistory(exec, exec.getCompletionPercentage(), phaseDuration); 525 526 if(exec.getResourceConsumptionProfile().isLast()){ 427 527 scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask); 428 PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);429 updateComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);430 528 } else { 431 529 scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask); 432 PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE); 433 updateComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec); 434 } 530 } 531 532 PEUnit peUnit = (PEUnit)exec.getAllocatedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE); 533 updateComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec); 435 534 } 436 535 } … … 501 600 } 502 601 503 public void notifySubmittedWorkloadUnit(WorkloadUnit wu , boolean ack) {602 public void notifySubmittedWorkloadUnit(WorkloadUnit wu) { 504 603 //250314 505 604 //updateProcessingProgress(); … … 562 661 Executable exec = (Executable) task; 563 662 jobRegistry.addExecTask(exec); 564 565 exec.trackResource(scheduler.get_name()); 566 Scheduler parentScheduler = scheduler.getParent(); 567 List<String> visitedResource = exec.getVisitedResources(); 568 String [] visitedResourcesArray = visitedResource.toArray(new String[visitedResource.size()]); 569 while (parentScheduler != null && !ArrayUtils.contains(visitedResourcesArray, parentScheduler.get_name())) { 570 exec.trackResource(parentScheduler.get_name()); 571 parentScheduler = parentScheduler.getParent(); 572 } 573 exec.setSchedulerName(scheduler.get_id()); 663 664 exec.setSchedulerName(scheduler.getFullName()); 574 665 575 666 TaskList newTasks = new TaskListImpl(); … … 596 687 } 597 688 689 690 public void saveExecutionHistory(Executable exec, double completionPercentage, double estimatedDuration){ 691 692 ExecutionHistoryItem execHistoryItem = new ExecutionHistoryItem(new DateTime()); 693 execHistoryItem.setCompletionPercentage(completionPercentage); 694 execHistoryItem.setEstimatedDuration(estimatedDuration); 695 execHistoryItem.setResIndex(exec.getAllocatedResources().size() -1); 696 execHistoryItem.setStatus(exec.getStatus()); 697 exec.addExecHistory(execHistoryItem); 698 699 ProcessingElements pes = (ProcessingElements) exec.getAllocatedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE); 700 for (ComputingResource resource : pes) { 701 702 LinkedList<ComputingResource> toExamine = new LinkedList<ComputingResource>(); 703 toExamine.push(resource); 704 705 while (!toExamine.isEmpty()) { 706 ComputingResource compResource = toExamine.pop(); 707 List<ComputingResource> resources = compResource.getChildren(); 708 if(resources.isEmpty()){ 709 Set<String> visitedResources = exec.getAllocatedResources().getLast().getVisitedResources(); 710 if(!visitedResources.contains(compResource.getFullName())){ 711 exec.getAllocatedResources().getLast().trackResource(compResource.getFullName()); 712 } 713 } else { 714 for (int i = 0; i < resources.size(); i++) { 715 ComputingResource resourceChild = resources.get(i); 716 toExamine.addLast(resourceChild); 717 } 718 } 719 } 720 } 721 } 722 723 public boolean allocateResources(Executable exec, Map<ResourceUnitName, ResourceUnit> choosenResources){ 724 boolean allocationStatus = getAllocationManager().allocateResources(choosenResources); 725 if(allocationStatus){ 726 ResourceHistoryItem resourceHistoryItem = new ResourceHistoryItem(choosenResources); 727 exec.addAllocatedResources(resourceHistoryItem); 728 return true; 729 } 730 return false; 731 } 598 732 }
Note: See TracChangeset
for help on using the changeset viewer.