source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/policy/local/LocalManagementSystem.java @ 1574

Revision 1574, 27.7 KB checked in by wojtekp, 9 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.local;
2
3import java.util.ArrayList;
4import java.util.HashMap;
5import java.util.Iterator;
6import java.util.LinkedHashSet;
7import java.util.List;
8import java.util.Map;
9import java.util.Set;
10
11import org.apache.commons.logging.Log;
12import org.apache.commons.logging.LogFactory;
13import org.joda.time.DateTime;
14import org.joda.time.DateTimeUtilsExt;
15import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
16
17import qcg.shared.constants.BrokerConstants;
18import schedframe.SimulatedEnvironment;
19import schedframe.events.scheduling.SchedulingEvent;
20import schedframe.events.scheduling.SchedulingEventType;
21import schedframe.events.scheduling.StartTaskExecutionEvent;
22import schedframe.events.scheduling.TaskFinishedEvent;
23import schedframe.events.scheduling.TaskPausedEvent;
24import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent;
25import schedframe.events.scheduling.TaskResumedEvent;
26import schedframe.exceptions.ResourceException;
27import schedframe.resources.StandardResourceType;
28import schedframe.resources.computing.ComputingResource;
29import schedframe.resources.computing.profiles.energy.ResourceEvent;
30import schedframe.resources.computing.profiles.energy.ResourceEventType;
31import schedframe.resources.units.PEUnit;
32import schedframe.resources.units.ProcessingElements;
33import schedframe.resources.units.ResourceUnit;
34import schedframe.resources.units.ResourceUnitName;
35import schedframe.resources.units.StandardResourceUnitName;
36import schedframe.scheduling.ExecutionHistoryItem;
37import schedframe.scheduling.ResourceItem;
38import schedframe.scheduling.Scheduler;
39import schedframe.scheduling.TaskList;
40import schedframe.scheduling.TaskListImpl;
41import schedframe.scheduling.WorkloadUnitHandler;
42import schedframe.scheduling.manager.resources.LocalResourceManager;
43import schedframe.scheduling.manager.resources.ManagedResources;
44import schedframe.scheduling.manager.resources.ResourceManager;
45import schedframe.scheduling.plan.AllocationInterface;
46import schedframe.scheduling.plan.ScheduledTaskInterface;
47import schedframe.scheduling.plan.SchedulingPlanInterface;
48import schedframe.scheduling.plugin.ModuleListImpl;
49import schedframe.scheduling.plugin.SchedulingPlugin;
50import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
51import schedframe.scheduling.policy.AbstractManagementSystem;
52import schedframe.scheduling.queue.TaskQueueList;
53import schedframe.scheduling.tasks.AbstractProcesses;
54import schedframe.scheduling.tasks.Job;
55import schedframe.scheduling.tasks.JobInterface;
56import schedframe.scheduling.tasks.Task;
57import schedframe.scheduling.tasks.TaskInterface;
58import schedframe.scheduling.tasks.WorkloadUnit;
59import simulator.DCWormsConstants;
60import simulator.stats.DCwormsAccumulator;
61import simulator.utils.DoubleMath;
62import dcworms.schedframe.scheduling.ExecTask;
63import dcworms.schedframe.scheduling.Executable;
64import eduni.simjava.Sim_event;
65import eduni.simjava.Sim_system;
66import gridsim.dcworms.DCWormsTags;
67import gridsim.dcworms.filter.ExecTaskFilter;
68
69public class LocalManagementSystem extends AbstractManagementSystem {
70
71        private Log log = LogFactory.getLog(LocalManagementSystem.class);
72
73        protected double lastUpdateTime;
74
75        public LocalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
76                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
77                        throws Exception {
78
79                super(providerId, entityName, execTimeEstimationPlugin, queues);
80               
81                if (schedPlugin == null) {
82                        throw new Exception("Can not create local scheduling plugin instance.");
83                }
84                this.schedulingPlugin =  schedPlugin;
85                this.moduleList = new ModuleListImpl(1);
86        }
87
88        public void init(Scheduler sched, ManagedResources managedResources) {
89                super.init(sched, managedResources);
90        }
91
92        public void processEvent(Sim_event ev) {
93
94                updateProcessingProgress();
95
96                int tag = ev.get_tag();
97                SchedulingEvent event;
98                SchedulingPlanInterface<?> decision;
99                switch (tag) {
100                case DCWormsTags.TIMER:
101
102                        event = new SchedulingEvent(SchedulingEventType.TIMER);
103                        decision = schedulingPlugin.schedule(event,
104                                        queues, jobRegistry, getResourceManager(), moduleList);
105                        executeSchedulingPlan(decision);
106
107                        sendTimerEvent();
108                        break;
109
110                case DCWormsTags.TASK_READY_FOR_EXECUTION:
111                        ExecTask execTask = (ExecTask) ev.get_data();
112                        try {
113                                execTask.setStatus(DCWormsTags.READY);
114
115                                event = new StartTaskExecutionEvent(execTask.getJobId(), execTask.getId());
116                                decision =  schedulingPlugin.schedule(event,
117                                                queues, jobRegistry, getResourceManager(), moduleList);
118                                executeSchedulingPlan(decision);
119
120                        } catch (Exception e) {
121                                e.printStackTrace();
122                        }
123                        break;
124
125                case DCWormsTags.TASK_EXECUTION_FINISHED:
126                        execTask = (ExecTask) ev.get_data();
127                        if (execTask.getStatus() == DCWormsTags.INEXEC) {
128                               
129                                finalizeExecutable(execTask);
130                                sendFinishedWorkloadUnit(execTask);
131                                event = new TaskFinishedEvent(execTask.getJobId(), execTask.getId());
132                                decision = schedulingPlugin.schedule(event,
133                                                queues, jobRegistry, getResourceManager(), moduleList);
134                                executeSchedulingPlan(decision);
135                        }
136
137                        Job job = jobRegistry.getJob(execTask.getJobId());
138                        if(!job.isFinished()){
139                                getWorkloadUnitHandler().handleJob(job);
140                        }
141                        //SUPPORTS PRECEDING CONSTRAINST COMING ALSO FROM SWF FILES
142                        /*else {
143                                try {
144                                        job.setStatus((int)BrokerConstants.JOB_STATUS_FINISHED);
145                                } catch (Exception e) {
146
147                                }
148                                for(JobInterface<?> j: jobRegistry.getJobs((int)BrokerConstants.JOB_STATUS_SUBMITTED)){
149                                        getWorkloadUnitHandler().handleJob(j); 
150                                }
151                        }*/
152                        break;
153                       
154                case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED:
155                        execTask = (Executable) ev.get_data();
156                        event = new TaskRequestedTimeExpiredEvent(execTask.getJobId(), execTask.getId());
157                        decision = schedulingPlugin.schedule(event,
158                                        queues, jobRegistry, getResourceManager(), moduleList);
159                        executeSchedulingPlan(decision);
160                        break;
161                       
162                case DCWormsTags.TASK_PAUSE:{
163                        String[] ids = (String[]) ev.get_data();
164                        execTask = jobRegistry.getTask(ids[0], ids[1]);
165                        pauseTask(execTask);
166       
167                        event = new TaskPausedEvent(ids[0], ids[1]);
168                        decision = schedulingPlugin.schedule(event,
169                                        queues, jobRegistry, getResourceManager(), moduleList);
170                        executeSchedulingPlan(decision);
171                        }
172                        break;
173               
174                case DCWormsTags.TASK_RESUME:{
175                        String[] ids = (String[]) ev.get_data();
176                        execTask = jobRegistry.getTask(ids[0], ids[1]);
177                        resumeTask(execTask, execTask.getAllocatedResources().getLast().getResourceUnits(), true);
178                        event = new TaskResumedEvent(ids[0], ids[1]);
179                        decision = schedulingPlugin.schedule(event,
180                                        queues, jobRegistry, getResourceManager(), moduleList);
181                        executeSchedulingPlan(decision);
182                        }
183                        break;
184               
185                case DCWormsTags.TASK_MIGRATE:{
186                        Object[] data = (Object[]) ev.get_data();
187                        execTask = jobRegistry.getTask((String)data[0], (String)data[1]);
188                        double migrationTime = execTimeEstimationPlugin.estimateMigrationTime(new StartTaskExecutionEvent((String)data[0],
189                                        (String)data[1]), execTask, execTask.getAllocatedResources().getLast().getResourceUnits(), (Map<ResourceUnitName, ResourceUnit>)data[2]);
190                        scheduler.sendInternal(migrationTime, DCWormsTags.TASK_MOVE, data);
191                        }
192                        break;
193               
194                case DCWormsTags.TASK_MOVE:{
195                        Object[] data = (Object[]) ev.get_data();
196                        execTask = jobRegistry.getTask((String)data[0], (String)data[1]);
197                        moveTask(execTask, (Map<ResourceUnitName, ResourceUnit>)data[2]);
198
199                        event = new StartTaskExecutionEvent((String)data[0], (String)data[1]);
200                        decision = schedulingPlugin.schedule(event,
201                                        queues, jobRegistry, getResourceManager(), moduleList);
202                        executeSchedulingPlan(decision);
203                        }
204                        break;
205                       
206                case DCWormsTags.RESOURCE_POWER_STATE_CHANGED:
207                        event = new SchedulingEvent(SchedulingEventType.RESOURCE_POWER_STATE_CHANGED);
208                        String source;
209                        try{
210                                source = ev.get_data().toString();
211                        } catch(Exception e){
212                                source = null;
213                        }
214                        event.setSource(source);
215                        decision =  schedulingPlugin.schedule(event,
216                                        queues, jobRegistry, getResourceManager(), moduleList);
217                        executeSchedulingPlan(decision);
218                        break;
219
220                case DCWormsTags.RESOURCE_POWER_LIMIT_EXCEEDED:
221                        event = new SchedulingEvent(SchedulingEventType.RESOURCE_POWER_LIMIT_EXCEEDED);
222                        try{
223                                source = ev.get_data().toString();
224                        } catch(Exception e){
225                                source = null;
226                        }
227                        event.setSource(source);
228                        decision = schedulingPlugin.schedule(event,
229                                        queues, jobRegistry, getResourceManager(), moduleList);
230                        executeSchedulingPlan(decision);
231                        break;
232               
233                case DCWormsTags.RESOURCE_TEMPERATURE_LIMIT_EXCEEDED:
234                        event = new SchedulingEvent(SchedulingEventType.RESOURCE_TEMPERATURE_LIMIT_EXCEEDED);
235                        try{
236                                source = ev.get_data().toString();
237                        } catch(Exception e){
238                                source = null;
239                        }
240                        event.setSource(source);
241                        decision = schedulingPlugin.schedule(event,
242                                        queues, jobRegistry, getResourceManager(), moduleList);
243                        executeSchedulingPlan(decision);
244                        break;
245                       
246                case DCWormsTags.TASK_EXECUTION_CHANGED:
247                        execTask = (ExecTask) ev.get_data();
248                        updateTaskExecutionPhase(execTask);
249                        break;
250                case DCWormsTags.UPDATE_PROCESSING:
251                        updateProcessingTimes();
252                        break;
253                       
254                default:               
255                        break;
256                }
257        }
258
259
260        protected void pauseTask(ExecTask execTask) {
261                if (execTask == null) {
262                        return;
263                } else {
264                        try {
265                                execTask.setStatus(DCWormsTags.PAUSED);
266                               
267                                Executable exec = (Executable) execTask;
268                                Map<ResourceUnitName, ResourceUnit> lastUsed = exec.getAllocatedResources().getLast().getResourceUnits();
269                                getAllocationManager().freeResources(lastUsed, true);
270                               
271                                saveExecutionHistory(exec, exec.getExecutionProfile().getCompletionPercentage(), exec.getEstimatedDuration());
272                               
273                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), -1);
274                                scheduler.sim_cancel(filter, null);
275                               
276                                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
277                                updateComputingResources(peUnit, ResourceEventType.TASK_FINISHED, exec);
278                        } catch (Exception e) {
279                                // TODO Auto-generated catch block
280                                e.printStackTrace();
281                        }
282                }               
283        }
284       
285        protected void resumeTask(ExecTask execTask, Map<ResourceUnitName, ResourceUnit> resources, boolean exclusive) {
286                if (execTask == null) {
287                        return;
288                } else if (execTask.getStatus() == DCWormsTags.PAUSED) {
289                        try {
290                                execTask.setStatus(DCWormsTags.RESUMED);
291                                Executable exec = (Executable) execTask;
292
293                                boolean status = allocateResources(exec, resources, exclusive);
294                                if(status == false){
295                                        TaskList newTasks = new TaskListImpl();
296                                        newTasks.add(exec);
297                                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
298                                        exec.setStatus(DCWormsTags.READY);
299                                } else {
300                                        runTask(execTask);
301                                       
302                                        PEUnit peUnit = (PEUnit)resources.get(StandardResourceUnitName.PE);
303                                        updateComputingResources(peUnit, ResourceEventType.TASK_STARTED, exec);
304                                }
305
306                        } catch (Exception e) {
307                                // TODO Auto-generated catch block
308                                e.printStackTrace();
309                        }
310                }                       
311        }
312       
313        protected void moveTask(ExecTask execTask, Map<ResourceUnitName, ResourceUnit> map) {
314                pauseTask(execTask);
315                resumeTask(execTask, map, false);
316        }
317       
318        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
319
320                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
321                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
322                                queues, jobRegistry, getResourceManager(), moduleList);
323                executeSchedulingPlan(decision);
324
325        //if(scheduler.getParent() != null){
326                sendFinishedWorkloadUnit(wu);
327        //}
328        }
329       
330        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
331
332                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
333                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
334                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
335
336                        if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
337                                continue;
338                        }
339
340                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
341
342                        TaskInterface<?> task = taskDecision.getTask();
343                        for (int j = 0; j < allocations.size(); j++) {
344
345                                AllocationInterface<?> allocation = allocations.get(j);
346                                if (allocation.getRequestedResources() == null || allocation.getRequestedResources().size() > 0) {
347                                        ExecTask exec = (ExecTask) task;                                       
348                                        executeTask(exec, allocation.getRequestedResources());
349                                } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){
350                                        allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName()));
351                                        submitTask(task, allocation);
352                                } else {
353                                        ExecTask exec = (ExecTask) task;
354                                        executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), exec));
355                                }
356                        }
357                }
358        }
359
360        protected void executeTask(ExecTask task, Map<ResourceUnitName, ResourceUnit> choosenResources) {
361
362                Executable exec = (Executable)task;
363                boolean allocationStatus = allocateResources(exec, choosenResources, false);
364                if(allocationStatus == false){
365                        log.info("Task " + task.getJobId() + "_" + task.getId() + " requires more resources than is available at this moment.");
366                        return;
367                }
368
369                removeFromQueue(task);
370
371                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime());
372
373                runTask(exec);
374               
375                PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE);
376                updateComputingResources(peUnit, ResourceEventType.UTILIZATION_CHANGED, exec);
377               
378
379                try {
380                        long expectedDuration = exec.getExpectedDuration().getMillis() / 1000;
381                        scheduler.sendInternal(expectedDuration, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
382                } catch (NoSuchFieldException e) {
383                        //double t = exec.getEstimatedDuration();
384                        //scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
385                }
386
387                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());           
388        }
389       
390        private void runTask(ExecTask execTask){
391                Executable exec = (Executable) execTask;
392                Map<ResourceUnitName, ResourceUnit> resources = exec.getAllocatedResources().getLast().getResourceUnits();
393               
394                try {
395                        exec.setStatus(DCWormsTags.INEXEC);
396                } catch (Exception e) {
397                        // TODO Auto-generated catch block
398                        e.printStackTrace();
399                }
400
401                int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION),
402                                execTask, resources, exec.getExecutionProfile().getCompletionPercentage())).intValue();
403
404                saveExecutionHistory(exec, exec.getExecutionProfile().getCompletionPercentage(), phaseDuration);
405                if(exec.getExecutionProfile().isLast()){
406                        scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
407                } else {
408                        scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
409                }
410               
411                PEUnit peUnit = (PEUnit)resources.get(StandardResourceUnitName.PE);
412                updateComputingResources(peUnit, ResourceEventType.TASK_STARTED, exec);
413        }
414
415        protected void finalizeExecutable(ExecTask execTask){
416               
417                Executable exec = (Executable)execTask;
418                exec.finalizeExecutable();
419               
420                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED);
421                scheduler.sim_cancel(filter, null);
422               
423                Task task;
424                Job job = jobRegistry.getJob(exec.getJobId());
425                try {
426                        task = job.getTask(exec.getTaskId());
427                } catch (NoSuchFieldException e) {
428                        return;
429                }
430                if(exec.getProcessesId() == null){
431                        try {
432                                task.setStatus(exec.getStatus());
433                        } catch (Exception e) {
434                                e.printStackTrace();
435                        }
436                } else {
437                        List<AbstractProcesses> processesList = task.getProcesses();
438                        for(int i = 0; i < processesList.size(); i++){
439                                AbstractProcesses processes = processesList.get(i);
440                                if(processes.getId().equals(exec.getProcessesId())){
441                                        processes.setStatus(exec.getStatus());
442                                        break;
443                                }
444                        }
445                }
446
447                Map<ResourceUnitName, ResourceUnit> lastUsed = exec.getAllocatedResources().getLast().getResourceUnits();
448                getAllocationManager().freeResources(lastUsed, true);
449               
450                //TODO calculate the value of completion
451                saveExecutionHistory(exec, 100,0);
452               
453                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
454                updateComputingResources(peUnit, ResourceEventType.TASK_FINISHED, exec);
455               
456                log.debug(execTask.getJobId() + "_" + execTask.getId() + " finished execution on " + new DateTime());
457                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());
458               
459                updateComputingResources(peUnit, ResourceEventType.UTILIZATION_CHANGED, exec);
460        }
461       
462        protected void updateProcessingProgress() {
463
464                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
465                if (timeSpan <= 0.0) {
466                        // don't update when nothing changed
467                        return;
468                }
469                lastUpdateTime = Sim_system.clock();
470                Iterator<ExecTask> iter = jobRegistry.getRunningTasks().iterator();
471                while (iter.hasNext()) {
472                        ExecTask task = iter.next();
473                        Executable exec = (Executable)task;
474                        ExecutionHistoryItem execHistItem = exec.getExecutionHistory().getLast();
475                        //System.out.println("--- upadteProgressX: " + Sim_system.sim_clock() );
476                        //System.out.println("taskId: " + exec.getId() + "; completion percentage: " + exec.getCompletionPercentage() + "; timespan: " + timeSpan + "; estimatedDuration: " +  execHistItem.getCompletionPercentage());
477                        exec.getExecutionProfile().setCompletionPercentage(exec.getExecutionProfile().getCompletionPercentage() + 100 * (timeSpan / (execHistItem.getEstimatedDuration()) * (1.0 - execHistItem.getCompletionPercentage()/100.0)));
478                        exec.setTotalCompletionPercentage(exec.getTotalCompletionPercentage() + 100 * (timeSpan / execHistItem.getEstimatedDuration()) * exec.getExecutionProfile().getCurrentExecutionPhase().getLenght() / exec.getExecutionProfile().getLength());
479                        //System.out.println("newProgress: " + exec.getCompletionPercentage()  );       
480
481                }
482        }
483       
484        private void updateComputingResources(PEUnit peUnit, ResourceEventType eventType, Object obj){
485                if(peUnit instanceof ProcessingElements){
486                        /*ProcessingElements pes = (ProcessingElements) peUnit;
487                        for (ComputingResource resource : pes) {
488                                resource.handleEvent(new ResourceEvent(eventType, obj, scheduler.getFullName()));
489                                //DataCenterWorkloadSimulator.getEventManager().sendToResources(resource.getType(), 0, new EnergyEvent(eventType, obj));
490                        }*/
491                        /*try {
492                                for (ComputingResource resource : resourceManager.getResourcesOfType(pes.get(0).getType())) {
493                                        resource.handleEvent(new EnergyEvent(eventType, obj));
494                                }
495                        } catch (ResourceException e) {
496                                // TODO Auto-generated catch block
497                                e.printStackTrace();
498                        }*/
499                       
500                        ProcessingElements pes = (ProcessingElements) peUnit;
501                        ResourceEvent resEvent = new ResourceEvent(eventType, obj, scheduler.getFullName());           
502                        try{
503                                Set<ComputingResource> resourcesToUpdate = new LinkedHashSet<ComputingResource>();
504                       
505                                for (ComputingResource compResource : pes) {
506                                        resourcesToUpdate.add(compResource);
507                                }
508                                while(resourcesToUpdate.size() > 0){
509                                        Set<ComputingResource> newSet = new LinkedHashSet<ComputingResource>();
510                                        for(ComputingResource compResource: resourcesToUpdate){
511                                                compResource.updateState(resEvent);
512                                                if(compResource.getParent() != null){
513                                                        newSet.add(compResource.getParent());
514                                                }
515                                        }
516                                        resourcesToUpdate = new LinkedHashSet<ComputingResource>(newSet);
517                                }
518                        }catch(Exception e){
519                        }
520                } else {
521                        ComputingResource resource = null;
522                        try {
523                                resource = SimulatedEnvironment.getComputingResourceByName(peUnit.getResourceId());
524                        } catch (ResourceException e) {
525                                return;
526                        }
527                        resource.handleEvent(new ResourceEvent(eventType, obj, scheduler.getFullName()));
528                }
529        }
530
531        protected void updateProcessingTimes() {
532                for (ExecTask execTask: jobRegistry.getRunningTasks()) {
533                        Executable exec = (Executable)execTask;
534
535                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getAllocatedResources().getLast().getResourceUnits();
536
537                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED),
538                                        execTask, choosenResources, exec.getExecutionProfile().getCompletionPercentage())).intValue();
539
540                        ExecutionHistoryItem execHistItem = exec.getExecutionHistory().getLast();
541                        double lastTimeStamp = execHistItem.getTimeStamp() / 1000;
542                        if(DoubleMath.subtract((lastTimeStamp + execHistItem.getEstimatedDuration()), (new DateTime().getMillis() / 1000 + phaseDuration)) == 0.0){
543                                continue;
544                        }
545                        //System.out.println("=== upadteTIme: " + Sim_system.sim_clock() );
546                        //System.out.println("execId: " + exec.getId() +  "; estimatedDuration " + execHistItem.getEstimatedDuration());
547                        //System.out.println("execId: " + exec.getId() + "; difference " + DoubleMath.subtract((lastTimeStamp + execHistItem.getEstimatedDuration()), (new DateTime().getMillis()/1000 + phaseDuration)));
548                        //System.out.println("completionPercantage: " + exec.getCompletionPercentage() + "; basic duration: " +exec.getResourceConsumptionProfile().getCurrentResourceConsumption().getDuration() +   "; phaseDuration: " +  phaseDuration);
549
550                        saveExecutionHistory(exec, exec.getExecutionProfile().getCompletionPercentage(), phaseDuration);
551                       
552                        if(exec.getExecutionProfile().isLast()){
553                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED);
554                                scheduler.sim_cancel(filter, null);
555                                scheduler.sendInternal(phaseDuration , DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
556                                //PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
557                                //notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
558                        } else{
559                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_CHANGED);
560                                scheduler.sim_cancel(filter, null);
561                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
562                                //PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
563                                //notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
564                        }
565                }
566        }       
567
568        protected void updateTaskExecutionPhase(ExecTask execTask) {
569
570                if (execTask.getStatus() == DCWormsTags.INEXEC) {
571                        Executable exec = (Executable)execTask;
572                       
573                        try {
574                                exec.setStatus(DCWormsTags.NEW_EXEC_PHASE);
575                        } catch (Exception e) {
576                               
577                        }
578
579                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getAllocatedResources().getLast().getResourceUnits();
580
581                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.TASK_EXECUTION_CHANGED),
582                                        execTask, choosenResources, exec.getExecutionProfile().getCompletionPercentage())).intValue();
583                       
584                        saveExecutionHistory(exec, exec.getExecutionProfile().getCompletionPercentage(), phaseDuration);
585                       
586                        if(exec.getExecutionProfile().isLast()){
587                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
588                        } else {
589                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
590                        }
591                       
592                        PEUnit peUnit = (PEUnit)exec.getAllocatedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
593                        updateComputingResources(peUnit, ResourceEventType.UTILIZATION_CHANGED, exec);
594                }
595        }       
596
597        protected double calculateTotalLoad() {
598
599                DCwormsAccumulator loadAcc = new DCwormsAccumulator();
600
601                for(ComputingResource compRes: getResourceManager().getResourcesOfType(StandardResourceType.Node)){
602                        loadAcc.add(compRes.getLoadInterface().getRecentUtilization().getValue());                             
603                }
604
605                return loadAcc.getMean();
606        }
607
608        private Map<ResourceUnitName, ResourceUnit> chooseResourcesForExecution(String resourceName,
609                        ExecTask task) {
610
611                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>();
612                LocalResourceManager resourceManager = getResourceManager();
613                if(resourceName != null){
614                        ComputingResource resource = resourceManager.getResourceByName(resourceName);
615                        if(resource == null){
616                                return null;
617                        }
618                        resourceManager = new LocalResourceManager(resource);
619                }
620
621                int cpuRequest;
622                try {
623                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
624                } catch (NoSuchFieldException e) {
625                        cpuRequest = 1;
626                }
627
628                if (cpuRequest != 0) {
629                       
630                        List<ResourceUnit> availableUnits = null;
631                        try {
632                                availableUnits = resourceManager.getPE();
633                        } catch (ResourceException e) {
634                                return null;
635                        }
636                       
637                        List<ResourceUnit> choosenPEUnits = new ArrayList<ResourceUnit>();
638                        for (int i = 0; i < availableUnits.size() && cpuRequest > 0; i++) {
639                                PEUnit peUnit = (PEUnit) availableUnits.get(i);
640                                if(peUnit.getFreeAmount() > 0){
641                                        int allocPE = Math.min(peUnit.getFreeAmount(), cpuRequest);
642                                        cpuRequest = cpuRequest - allocPE;
643                                        choosenPEUnits.add(peUnit.replicate(allocPE)); 
644                                }       
645                        }
646                       
647                        if(cpuRequest > 0){
648                                return null;
649                        }
650                        map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0));
651                }
652
653                return  map;
654        }
655       
656        public void notifySubmittedWorkloadUnit(WorkloadUnit wu) {
657                //250314
658                //updateProcessingProgress();
659                if (log.isInfoEnabled())
660                        log.info("Received job " + wu.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
661                registerWorkloadUnit(wu);
662        }
663
664        private void registerWorkloadUnit(WorkloadUnit wu){
665                if(!wu.isRegistered()){
666                        wu.register(jobRegistry);
667                }
668                wu.accept(getWorkloadUnitHandler());
669        }
670       
671        class LocalWorkloadUnitHandler implements WorkloadUnitHandler{
672               
673                public void handleJob(JobInterface<?> job){
674
675                        if (log.isInfoEnabled())
676                                log.info("Handle " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
677                       
678                        try {
679                                if(job.getStatus() == BrokerConstants.JOB_STATUS_UNSUBMITTED)
680                                        job.setStatus((int)BrokerConstants.JOB_STATUS_SUBMITTED);
681                        } catch (Exception e) {
682                                e.printStackTrace();
683                        }
684                        List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
685                        jobsList.add(job);
686                        TaskListImpl availableTasks = new TaskListImpl();
687                        for(Task task: jobRegistry.getAvailableTasks(jobsList)){
688                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
689                                availableTasks.add(task);
690                        }
691
692                        for(TaskInterface<?> task: availableTasks){     
693                                registerWorkloadUnit(task);
694                        }
695                }
696               
697                public void handleTask(TaskInterface<?> t){
698                        Task task = (Task)t;
699                        List<AbstractProcesses> processes = task.getProcesses();
700
701                        if(processes == null || processes.size() == 0){
702                                Executable exec = new Executable(task);
703                                registerWorkloadUnit(exec);
704                        } else {
705                                for(int j = 0; j < processes.size(); j++){
706                                        AbstractProcesses procesesSet = processes.get(j);
707                                        Executable exec = new Executable(task, procesesSet);
708                                        registerWorkloadUnit(exec);
709                                }
710                        }
711                }
712               
713                public void handleExecutable(ExecTask task){
714                       
715                        Executable exec = (Executable) task;
716                        jobRegistry.addExecTask(exec);
717
718                        exec.setSchedulerName(scheduler.getFullName());
719                       
720                        TaskList newTasks = new TaskListImpl();
721                        newTasks.add(exec);
722               
723                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
724
725                        if (exec.getStatus() == DCWormsTags.QUEUED) {
726                                sendExecutableReadyEvent(exec);
727                        }
728                }
729        }
730
731        public WorkloadUnitHandler getWorkloadUnitHandler() {
732                return new LocalWorkloadUnitHandler();
733        }
734       
735       
736        public LocalResourceManager getResourceManager() {
737                if (resourceManager instanceof ResourceManager)
738                        return (LocalResourceManager) resourceManager;
739                else
740                        return null;
741        }
742       
743       
744        private void saveExecutionHistory(Executable exec, double completionPercentage, double estimatedDuration){
745                ExecutionHistoryItem execHistoryItem = new ExecutionHistoryItem(new DateTime().getMillis());
746                execHistoryItem.setCompletionPercentage(completionPercentage);
747                execHistoryItem.setEstimatedDuration(estimatedDuration);
748                execHistoryItem.setResIndex(exec.getAllocatedResources().size() - 1);
749                execHistoryItem.setStatus(exec.getStatus());
750                exec.addExecHistory(execHistoryItem);
751        }
752       
753        public boolean allocateResources(Executable exec, Map<ResourceUnitName, ResourceUnit> choosenResources, boolean exclusive){
754                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources, exclusive);
755                if(allocationStatus){
756                        ResourceItem resourceHistoryItem = new ResourceItem(choosenResources);
757                        exec.addAllocatedResources(resourceHistoryItem);
758                        return true;
759                }
760                return false;
761        }
762       
763
764       
765}
Note: See TracBrowser for help on using the repository browser.