Changeset 490 for DCWoRMS/trunk/src/schedframe/scheduling/policy/local
- Timestamp:
- 10/09/12 13:58:06 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
DCWoRMS/trunk/src/schedframe/scheduling/policy/local/LocalManagementSystem.java
r481 r490 1 1 package schedframe.scheduling.policy.local; 2 2 3 import dcworms.schedframe.scheduling.ExecTask; 4 import dcworms.schedframe.scheduling.Executable; 3 5 import eduni.simjava.Sim_event; 4 6 import eduni.simjava.Sim_system; … … 7 9 import gridsim.gssim.DCWormsTags; 8 10 import gridsim.gssim.filter.ExecTaskFilter; 9 import gssim.schedframe.scheduling.ExecTask;10 import gssim.schedframe.scheduling.Executable;11 11 12 12 import java.util.ArrayList; … … 41 41 import schedframe.scheduling.ResourceHistoryItem; 42 42 import schedframe.scheduling.Scheduler; 43 import schedframe.scheduling.TaskList; 43 44 import schedframe.scheduling.TaskListImpl; 44 import schedframe.scheduling.UsedResource List;45 import schedframe.scheduling.UsedResourcesList; 45 46 import schedframe.scheduling.WorkloadUnitHandler; 46 47 import schedframe.scheduling.manager.resources.LocalResourceManager; … … 78 79 79 80 super(providerId, entityName, execTimeEstimationPlugin, queues); 80 81 //schedulingPlugin = (LocalSchedulingPlugin) InstanceFactory.createInstance(schedulingPluginClassName, LocalSchedulingPlugin.class);82 81 83 82 if (schedPlugin == null) { … … 85 84 } 86 85 this.schedulingPlugin = schedPlugin; 87 accTotalLoad = new Accumulator();88 moduleList = new ModuleListImpl(1);89 86 this.moduleList = new ModuleListImpl(1); 87 88 this.accTotalLoad = new Accumulator(); 90 89 } 91 90 92 91 public void init(Scheduler sched, ManagedResources managedResources) { 93 92 super.init(sched, managedResources); 94 //scheduler = sched;95 //resourceManager = ResourceManagerFactory.createResourceManager(scheduler);96 93 double load = 0; 97 94 accTotalLoad.add(load); … … 115 112 } 116 113 sendTimerEvent(); 117 118 114 break; 119 115 120 116 case DCWormsTags.TASK_READY_FOR_EXECUTION: 121 117 122 ExecTask data= (ExecTask) ev.get_data();123 try { 124 data.setStatus(DCWormsTags.READY);118 ExecTask execTask = (ExecTask) ev.get_data(); 119 try { 120 execTask.setStatus(DCWormsTags.READY); 125 121 if (pluginSupportsEvent(tag)) { 126 SchedulingEvent event = new StartTaskExecutionEvent( data.getJobId(), data.getId());122 SchedulingEvent event = new StartTaskExecutionEvent(execTask.getJobId(), execTask.getId()); 127 123 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 128 124 queues, getJobRegistry(), getResourceManager(), moduleList); … … 136 132 case DCWormsTags.TASK_EXECUTION_FINISHED: 137 133 obj = ev.get_data(); 138 ExecTask exec = (ExecTask) obj; 139 if (exec.getStatus() == DCWormsTags.INEXEC) { 140 finalizeExecutable(exec); 141 142 sendFinishedWorkloadUnit(exec); 143 //task.setGridletStatus(Gridlet.SUCCESS); 144 //task.finalizeGridlet(); 145 log.debug(exec.getJobId() + "_" + exec.getId() + " finished execution on " + new DateTime()); 134 execTask = (ExecTask) obj; 135 if (execTask.getStatus() == DCWormsTags.INEXEC) { 136 137 finalizeExecutable(execTask); 138 sendFinishedWorkloadUnit(execTask); 139 log.debug(execTask.getJobId() + "_" + execTask.getId() + " finished execution on " + new DateTime()); 146 140 log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size())); 147 /*UsedResourceList<ResourceHistoryItem> lastUsedList = task.getUsedResources(); 148 Map<ResourceUnitName, AbstractResourceUnit> lastUsed = lastUsedList.getLast() 149 .getResourceUnits(); 150 getAllocationManager().freeResources(lastUsed); 151 ProcessingElements pes = (ProcessingElements) lastUsed.get(StandardResourceUnitName.PROCESSINGELEMENTS); 152 for (ComputingResource resource : pes) { 153 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, task)); 141 if (pluginSupportsEvent(tag)) { 142 SchedulingEvent event = new TaskFinishedEvent(execTask.getJobId(), execTask.getId()); 143 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 144 queues, getJobRegistry(), getResourceManager(), moduleList); 145 executeSchedulingPlan(decision); 154 146 } 155 SubTaskFilter filter = new SubTaskFilter(task.getGridletID(), GssimTags.TASK_REQUESTED_TIME_EXPIRED); 156 scheduler.sim_cancel(filter, null); 157 super.sendFinishJob((Executable) task.getGridlet());*/ 158 } 147 } 148 149 Job job = jobRegistry.getJob(execTask.getJobId()); 150 if(!job.isFinished()){ 151 getWorkloadUnitHandler().handleJob(job); 152 } 153 break; 154 155 case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED: 156 obj = ev.get_data(); 157 execTask = (Executable) obj; 159 158 if (pluginSupportsEvent(tag)) { 160 SchedulingEvent event = new Task FinishedEvent(exec.getJobId(), exec.getId());159 SchedulingEvent event = new TaskRequestedTimeExpiredEvent(execTask.getJobId(), execTask.getId()); 161 160 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 162 161 queues, getJobRegistry(), getResourceManager(), moduleList); 163 162 executeSchedulingPlan(decision); 164 163 } 165 Job job = jobRegistry.getJob(exec.getJobId());166 if(!job.isFinished()){167 getWorkloadUnitHandler().handleJob(job);168 }169 170 164 break; 171 case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED: 172 obj = ev.get_data(); 173 exec = (Executable) obj; 174 if (pluginSupportsEvent(tag)) { 175 SchedulingEvent event = new TaskRequestedTimeExpiredEvent(exec.getJobId(), exec.getId()); 176 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 177 queues, getJobRegistry(), getResourceManager(), moduleList); 178 executeSchedulingPlan(decision); 179 } 180 181 break; 165 182 166 case DCWormsTags.UPDATE: 183 167 updateProcessingTimes(ev); … … 185 169 } 186 170 } 187 188 171 189 172 public void notifyReturnedWorkloadUnit(WorkloadUnit wu) { … … 223 206 } else { 224 207 ExecTask exec = (ExecTask) task; 225 executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), (ExecTask)task));208 executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), exec)); 226 209 } 227 210 } … … 237 220 return; 238 221 removeFromQueue(task); 239 //double completionPercentage = (submittedTask.getLength() - submittedTask.getRemainingGridletLength())/submittedTask.getLength(); 222 240 223 SchedulingEvent event = new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION); 241 224 int time = Double.valueOf( … … 251 234 ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime); 252 235 exec.addUsedResources(resHistItem); 253 254 scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, 255 exec); 236 try { 237 exec.setStatus(DCWormsTags.INEXEC); 238 } catch (Exception e) { 239 // TODO Auto-generated catch block 240 e.printStackTrace(); 241 } 242 scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, exec); 256 243 257 244 try { … … 262 249 scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec); 263 250 } 264 265 try { 266 exec.setStatus(DCWormsTags.INEXEC); 267 } catch (Exception e1) { 268 // TODO Auto-generated catch block 269 e1.printStackTrace(); 270 } 251 271 252 log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size())); 272 253 273 254 PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE); 274 if(peUnit instanceof ProcessingElements){ 255 256 notifyComputingResources(peUnit, EnergyEventType.TASK_STARTED, exec); 257 258 /*if(peUnit instanceof ProcessingElements){ 275 259 ProcessingElements pes = (ProcessingElements) peUnit; 276 260 for (ComputingResource resource : pes) { 277 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_ FINISHED, exec));261 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, exec)); 278 262 } 279 263 } else { … … 282 266 resource = ResourceController.getComputingResourceByName(peUnit.getResourceId()); 283 267 } catch (ResourceException e) { 284 285 } 286 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 287 } 288 /*ProcessingElements pes = (ProcessingElements) choosenResources.get(StandardResourceUnitName.PE); 289 for (ComputingResource resource : pes) { 290 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, submittedTask)); 291 }*/ 268 return; 269 } 270 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, exec)); 271 } 272 */ 292 273 293 274 /*for(ExecTaskInterface etask : jobRegistry.getRunningTasks()){ … … 301 282 302 283 Executable exec = (Executable)execTask; 303 try {304 exec.setStatus(DCWormsTags.SUCCESS);305 } catch (Exception e1) {306 // TODO Auto-generated catch block307 e1.printStackTrace();308 }309 284 exec.finalizeExecutable(); 310 UsedResourceList<ResourceHistoryItem> lastUsedList = exec.getUsedResources(); 311 Map<ResourceUnitName, ResourceUnit> lastUsed = lastUsedList.getLast() 312 .getResourceUnits(); 313 getAllocationManager().freeResources(lastUsed); 314 315 PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE); 316 if(peUnit instanceof ProcessingElements){ 317 ProcessingElements pes = (ProcessingElements) peUnit; 318 for (ComputingResource resource : pes) { 319 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 320 } 321 } else { 322 ComputingResource resource = null; 323 try { 324 resource = ResourceController.getComputingResourceByName(peUnit.getResourceId()); 325 } catch (ResourceException e) { 326 327 } 328 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 329 } 330 /*ProcessingElements pes = (ProcessingElements) lastUsed.get(StandardResourceUnitName.PE); 331 for (ComputingResource resource : pes) { 332 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, subTask)); 333 }*/ 285 334 286 ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED); 335 287 scheduler.sim_cancel(filter, null); 336 288 337 289 Task task; 338 290 Job job = jobRegistry.getJob(exec.getJobId()); 339 340 Task task = null;341 291 try { 342 292 task = job.getTask(exec.getTaskId()); 343 293 } catch (NoSuchFieldException e) { 344 e.printStackTrace();294 return; 345 295 } 346 296 if(exec.getProcessesId() == null){ … … 360 310 } 361 311 } 312 313 UsedResourcesList lastUsedList = exec.getUsedResources(); 314 Map<ResourceUnitName, ResourceUnit> lastUsed = lastUsedList.getLast() 315 .getResourceUnits(); 316 getAllocationManager().freeResources(lastUsed); 317 318 PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE); 319 notifyComputingResources(peUnit, EnergyEventType.TASK_FINISHED, exec); 320 /*if(peUnit instanceof ProcessingElements){ 321 ProcessingElements pes = (ProcessingElements) peUnit; 322 for (ComputingResource resource : pes) { 323 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 324 } 325 } else { 326 ComputingResource resource = null; 327 try { 328 resource = ResourceController.getComputingResourceByName(peUnit.getResourceId()); 329 } catch (ResourceException e) { 330 return; 331 } 332 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 333 }*/ 334 362 335 //sendFinishedWorkloadUnit(executable); 363 336 } … … 374 347 ExecTask task = iter.next(); 375 348 Executable exec = (Executable)task; 376 UsedResource List<ResourceHistoryItem>usedResourcesList = exec.getUsedResources();377 ResourceUnit unit =usedResourcesList.getLast().getResourceUnits()349 UsedResourcesList usedResourcesList = exec.getUsedResources(); 350 PEUnit peUnit = (PEUnit)usedResourcesList.getLast().getResourceUnits() 378 351 .get(StandardResourceUnitName.PE); 379 352 380 double load = getMIShare(timeSpan, (PEUnit) unit);353 double load = getMIShare(timeSpan, peUnit); 381 354 exec.setCompletionPercentage(100 * timeSpan/exec.getEstimatedDuration()); 382 355 addTotalLoad(load); 383 356 } 384 357 } 385 358 private void notifyComputingResources(PEUnit peUnit, EnergyEventType eventType, Object obj){ 359 360 if(peUnit instanceof ProcessingElements){ 361 ProcessingElements pes = (ProcessingElements) peUnit; 362 for (ComputingResource resource : pes) { 363 resource.handleEvent(new EnergyEvent(eventType, obj)); 364 } 365 } else { 366 ComputingResource resource = null; 367 try { 368 resource = ResourceController.getComputingResourceByName(peUnit.getResourceId()); 369 } catch (ResourceException e) { 370 return; 371 } 372 resource.handleEvent(new EnergyEvent(eventType, obj)); 373 } 374 } 375 386 376 private double getMIShare(double timeSpan, PEUnit pes) { 387 377 double localLoad; … … 402 392 protected void updateProcessingTimes(Sim_event ev) { 403 393 updateProcessingProgress(); 404 for (ExecTask task : jobRegistry.getRunningTasks()) {405 Executable exec = (Executable) task;394 for (ExecTask execTask : jobRegistry.getRunningTasks()) { 395 Executable exec = (Executable)execTask; 406 396 List<String> visitedResource = exec.getVisitedResources(); 407 397 String originResource = ev.get_data().toString(); … … 411 401 412 402 Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getUsedResources().getLast().getResourceUnits(); 413 //double completionPercentage = (task.getLength() - subTask.getRemainingGridletLength())/task.getLength();414 403 double time = execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED), 415 task, choosenResources, exec.getCompletionPercentage()); 416 417 /*if(!subTask.getVisitedResources().contains(ev.get_data().toString())) { 418 continue; 419 }*/ 404 execTask, choosenResources, exec.getCompletionPercentage()); 405 420 406 //check if the new estimated end time is equal to the previous one; if yes the continue without update 421 407 if( DoubleMath.subtract((exec.getExecStartTime() + exec.getEstimatedDuration()), (new DateTime().getMillis()/1000 + time)) == 0.0){ … … 424 410 ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED); 425 411 scheduler.sim_cancel(filter, null); 426 scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, task); 427 412 scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, execTask); 428 413 } 429 414 } … … 443 428 numberOfPE = numberOfPE + resUnit.getAmount(); 444 429 } 445 //numberOfPE = getResourceManager().getPE().size();446 430 } catch (Exception e) { 447 431 numberOfPE = 1; … … 464 448 ExecTask task) { 465 449 450 Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>(); 466 451 ResourceManager resourceManager = this.resourceManager; 467 452 if(resourceName != null){ … … 472 457 return null; 473 458 } 474 475 459 resourceManager = new LocalResourceManager(resource); 476 460 } 477 Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>();478 479 461 480 462 int cpuRequest; … … 485 467 } 486 468 487 //PEUnit processingUnits = null;488 469 if (cpuRequest != 0) { 489 470 … … 494 475 return null; 495 476 } 477 496 478 List<ResourceUnit> choosenPEUnits = new ArrayList<ResourceUnit>(); 497 498 479 for (int i = 0; i < availableUnits.size() && cpuRequest > 0; i++) { 499 480 PEUnit peUnit = (PEUnit) availableUnits .get(i); … … 508 489 return null; 509 490 } 510 511 491 map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0)); 512 492 } … … 529 509 class LocalWorkloadUnitHandler implements WorkloadUnitHandler{ 530 510 531 public void handleJob(Job job){511 public void handleJob(JobInterface<?> job){ 532 512 533 513 if (log.isInfoEnabled()) … … 536 516 List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>(); 537 517 jobsList.add(job); 538 TaskListImpl readyTasks = new TaskListImpl();518 TaskListImpl availableTasks = new TaskListImpl(); 539 519 for(Task task: jobRegistry.getAvailableTasks(jobsList)){ 540 520 task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED); 541 readyTasks.add(task);542 } 543 544 for( WorkloadUnit e:readyTasks){545 registerWorkloadUnit( e);546 } 547 } 548 549 public void handleTask(TaskInterface<?> t i){550 Task task = (Task)t i;521 availableTasks.add(task); 522 } 523 524 for(TaskInterface<?> task: availableTasks){ 525 registerWorkloadUnit(task); 526 } 527 } 528 529 public void handleTask(TaskInterface<?> t){ 530 Task task = (Task)t; 551 531 List<AbstractProcesses> processes = task.getProcesses(); 552 532 … … 564 544 565 545 public void handleExecutable(ExecTask task){ 546 566 547 Executable exec = (Executable) task; 567 568 // int cost = 569 // this.resourceManager.getResourceCharacteristic().getResUnits() != 570 // null ? 571 // this.resourceManager.getResourceCharacteristic().getResUnits().get(ResourceParameterName.COST).getAmount() 572 // : 1; 573 574 exec.visitResource(scheduler.get_name()); 548 jobRegistry.addExecTask(exec); 549 550 exec.trackResource(scheduler.get_name()); 575 551 Scheduler parentScheduler = scheduler.getParent(); 576 while (parentScheduler != null && !exec.getVisitedResources().contains(parentScheduler.get_name())) { 577 exec.visitResource(parentScheduler.get_name()); 552 List<String> visitedResource = exec.getVisitedResources(); 553 String [] visitedResourcesArray = visitedResource.toArray(new String[visitedResource.size()]); 554 while (parentScheduler != null && !ArrayUtils.contains(visitedResourcesArray, parentScheduler.get_name())) { 555 exec.trackResource(parentScheduler.get_name()); 578 556 parentScheduler = parentScheduler.getParent(); 579 557 } 580 581 558 exec.setSchedulerName(scheduler.get_id()); 582 jobRegistry.addExecTask(exec);583 TaskList ImplnewTasks = new TaskListImpl();559 560 TaskList newTasks = new TaskListImpl(); 584 561 newTasks.add(exec); 585 562
Note: See TracChangeset
for help on using the changeset viewer.