source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/policy/local/LocalManagementSystem.java @ 1396

Revision 1396, 26.4 KB checked in by wojtekp, 11 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.local;
2
3import java.util.ArrayList;
4import java.util.HashMap;
5import java.util.Iterator;
6import java.util.List;
7import java.util.Map;
8
9import org.apache.commons.logging.Log;
10import org.apache.commons.logging.LogFactory;
11import org.joda.time.DateTime;
12import org.joda.time.DateTimeUtilsExt;
13import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
14
15import qcg.shared.constants.BrokerConstants;
16import schedframe.SimulatedEnvironment;
17import schedframe.events.scheduling.SchedulingEvent;
18import schedframe.events.scheduling.SchedulingEventType;
19import schedframe.events.scheduling.StartTaskExecutionEvent;
20import schedframe.events.scheduling.TaskFinishedEvent;
21import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent;
22import schedframe.exceptions.ResourceException;
23import schedframe.resources.StandardResourceType;
24import schedframe.resources.computing.ComputingResource;
25import schedframe.resources.computing.profiles.energy.EnergyEvent;
26import schedframe.resources.computing.profiles.energy.EnergyEventType;
27import schedframe.resources.units.PEUnit;
28import schedframe.resources.units.ProcessingElements;
29import schedframe.resources.units.ResourceUnit;
30import schedframe.resources.units.ResourceUnitName;
31import schedframe.resources.units.StandardResourceUnitName;
32import schedframe.scheduling.ExecutionHistoryItem;
33import schedframe.scheduling.ResourceItem;
34import schedframe.scheduling.Scheduler;
35import schedframe.scheduling.TaskList;
36import schedframe.scheduling.TaskListImpl;
37import schedframe.scheduling.WorkloadUnitHandler;
38import schedframe.scheduling.manager.resources.LocalResourceManager;
39import schedframe.scheduling.manager.resources.ManagedResources;
40import schedframe.scheduling.manager.resources.ResourceManager;
41import schedframe.scheduling.plan.AllocationInterface;
42import schedframe.scheduling.plan.ScheduledTaskInterface;
43import schedframe.scheduling.plan.SchedulingPlanInterface;
44import schedframe.scheduling.plugin.ModuleListImpl;
45import schedframe.scheduling.plugin.SchedulingPlugin;
46import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
47import schedframe.scheduling.policy.AbstractManagementSystem;
48import schedframe.scheduling.queue.TaskQueueList;
49import schedframe.scheduling.tasks.AbstractProcesses;
50import schedframe.scheduling.tasks.Job;
51import schedframe.scheduling.tasks.JobInterface;
52import schedframe.scheduling.tasks.Task;
53import schedframe.scheduling.tasks.TaskInterface;
54import schedframe.scheduling.tasks.WorkloadUnit;
55import simulator.DCWormsConstants;
56import simulator.stats.DCwormsAccumulator;
57import simulator.utils.DoubleMath;
58import dcworms.schedframe.scheduling.ExecTask;
59import dcworms.schedframe.scheduling.Executable;
60import eduni.simjava.Sim_event;
61import eduni.simjava.Sim_system;
62import gridsim.dcworms.DCWormsTags;
63import gridsim.dcworms.filter.ExecTaskFilter;
64
65public class LocalManagementSystem extends AbstractManagementSystem {
66
67        private Log log = LogFactory.getLog(LocalManagementSystem.class);
68
69        protected double lastUpdateTime;
70
71        public LocalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
72                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
73                        throws Exception {
74
75                super(providerId, entityName, execTimeEstimationPlugin, queues);
76               
77                if (schedPlugin == null) {
78                        throw new Exception("Can not create local scheduling plugin instance.");
79                }
80                this.schedulingPlugin =  schedPlugin;
81                this.moduleList = new ModuleListImpl(1);
82        }
83
84        public void init(Scheduler sched, ManagedResources managedResources) {
85                super.init(sched, managedResources);
86        }
87
88        public void processEvent(Sim_event ev) {
89
90                updateProcessingProgress();
91
92                int tag = ev.get_tag();
93
94                switch (tag) {
95
96                case DCWormsTags.TIMER:
97                        if (pluginSupportsEvent(tag)) {
98                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER);
99                                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
100                                                queues, jobRegistry, getResourceManager(), moduleList);
101                                executeSchedulingPlan(decision);
102                        }
103                        sendTimerEvent();
104                        break;
105
106                case DCWormsTags.TASK_READY_FOR_EXECUTION:
107                        ExecTask execTask = (ExecTask) ev.get_data();
108                        try {
109                                execTask.setStatus(DCWormsTags.READY);
110                                if (pluginSupportsEvent(tag)) {
111                                        SchedulingEvent event = new StartTaskExecutionEvent(execTask.getJobId(), execTask.getId());
112                                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
113                                                        queues, jobRegistry, getResourceManager(), moduleList);
114                                        executeSchedulingPlan(decision);
115                                }
116                        } catch (Exception e) {
117                                e.printStackTrace();
118                        }
119                        break;
120
121                case DCWormsTags.TASK_EXECUTION_FINISHED:
122                        execTask = (ExecTask) ev.get_data();
123                        if (execTask.getStatus() == DCWormsTags.INEXEC) {
124                               
125                                finalizeExecutable(execTask);
126                                sendFinishedWorkloadUnit(execTask);
127                                if (pluginSupportsEvent(tag)) {
128                                        SchedulingEvent event = new TaskFinishedEvent(execTask.getJobId(), execTask.getId());
129                                        SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
130                                                        queues, jobRegistry, getResourceManager(), moduleList);
131                                        executeSchedulingPlan(decision);
132                                }
133                        }
134
135                        Job job = jobRegistry.getJob(execTask.getJobId());
136                        if(!job.isFinished()){
137                                getWorkloadUnitHandler().handleJob(job);
138                        }
139                        //SUPPORTS PRECEDING CONSTRAINST COMING BOTH FROM SWF FILES
140                        /*else {
141                                try {
142                                        job.setStatus((int)BrokerConstants.JOB_STATUS_FINISHED);
143                                } catch (Exception e) {
144
145                                }
146                                for(JobInterface<?> j: jobRegistry.getJobs((int)BrokerConstants.JOB_STATUS_SUBMITTED)){
147                                        getWorkloadUnitHandler().handleJob(j); 
148                                }
149                        }*/
150                        break;
151                       
152                case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED:
153                        execTask = (Executable) ev.get_data();
154                        if (pluginSupportsEvent(tag)) {
155                                SchedulingEvent event = new TaskRequestedTimeExpiredEvent(execTask.getJobId(), execTask.getId());
156                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
157                                                queues, jobRegistry, getResourceManager(), moduleList);
158                                executeSchedulingPlan(decision);
159                        }
160                        break;
161                       
162                case DCWormsTags.TASK_PAUSE:{
163                        String[] ids = (String[]) ev.get_data();
164                        execTask = jobRegistry.getTask(ids[0], ids[1]);
165                        taskPause(execTask);
166                        if (pluginSupportsEvent(tag)) {
167                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_PAUSED);
168                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
169                                                queues, jobRegistry, getResourceManager(), moduleList);
170                                executeSchedulingPlan(decision);
171                        }
172                }
173                break;
174               
175                case DCWormsTags.TASK_RESUME:{
176                        String[] ids = (String[]) ev.get_data();
177                        execTask = jobRegistry.getTask(ids[0], ids[1]);
178                        taskResume(execTask, execTask.getAllocatedResources().getLast().getResourceUnits(), true);
179                        if (pluginSupportsEvent(tag)) {
180                                SchedulingEvent event = new StartTaskExecutionEvent(ids[0], ids[1]);
181                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
182                                                queues, jobRegistry, getResourceManager(), moduleList);
183                                executeSchedulingPlan(decision);
184                        }
185                }
186                break;
187               
188                case DCWormsTags.TASK_MIGRATE:{
189                        Object[] data = (Object[]) ev.get_data();
190                        execTask = jobRegistry.getTask((String)data[0], (String)data[1]);
191                        double migrationTime = execTimeEstimationPlugin.estimateMigrationTime(new StartTaskExecutionEvent((String)data[0], (String)data[1]), execTask, execTask.getAllocatedResources().getLast().getResourceUnits(), (Map<ResourceUnitName, ResourceUnit>)data[2]);
192                        scheduler.sendInternal(migrationTime, DCWormsTags.TASK_MOVE, data);
193                }
194                break;
195               
196                case DCWormsTags.TASK_MOVE:{
197                        Object[] data = (Object[]) ev.get_data();
198                        execTask = jobRegistry.getTask((String)data[0], (String)data[1]);
199                        taskMove(execTask, (Map<ResourceUnitName, ResourceUnit>)data[2]);
200                        if (pluginSupportsEvent(tag)) {
201                                SchedulingEvent event = new StartTaskExecutionEvent((String)data[0], (String)data[1]);
202                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
203                                                queues, jobRegistry, getResourceManager(), moduleList);
204                                executeSchedulingPlan(decision);
205                        }
206                }
207                break;
208                       
209                case DCWormsTags.TASK_EXECUTION_CHANGED:
210                        execTask = (ExecTask) ev.get_data();
211                        updateTaskExecutionPhase(execTask, SchedulingEventType.RESOURCE_STATE_CHANGED);
212                        break;
213
214                case DCWormsTags.UPDATE_PROCESSING:
215                        updateProcessingTimes();
216                        break;
217                       
218                case DCWormsTags.POWER_LIMIT_EXCEEDED:
219                        if (pluginSupportsEvent(tag)) {
220                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.POWER_LIMIT_EXCEEDED);
221                                String source;
222                                try{
223                                        source = ev.get_data().toString();
224                                } catch(Exception e){
225                                        source = null;
226                                }
227                                event.setSource(source);
228                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
229                                                queues, jobRegistry, getResourceManager(), moduleList);
230                                executeSchedulingPlan(decision);
231                        }
232                        break;
233                }
234        }
235
236
237        public void taskPause(ExecTask execTask) {
238                if (execTask == null) {
239                        return;
240                } else {
241                        try {
242                                execTask.setStatus(DCWormsTags.PAUSED);
243                               
244                                Executable exec = (Executable) execTask;
245                                Map<ResourceUnitName, ResourceUnit> lastUsed = exec.getAllocatedResources().getLast().getResourceUnits();
246                                getAllocationManager().freeResources(lastUsed, true);
247                               
248                                saveExecutionHistory(exec, exec.getCompletionPercentage(), exec.getEstimatedDuration());
249                               
250                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), -1);
251                                scheduler.sim_cancel(filter, null);
252                               
253                                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
254                                updateComputingResources(peUnit, EnergyEventType.TASK_FINISHED, exec);
255                        } catch (Exception e) {
256                                // TODO Auto-generated catch block
257                                e.printStackTrace();
258                        }
259                }               
260        }
261       
262        public void taskResume(ExecTask execTask, Map<ResourceUnitName, ResourceUnit> resources, boolean exclusive) {
263                if (execTask == null) {
264                        return;
265                } else if (execTask.getStatus() == DCWormsTags.PAUSED) {
266                        try {
267                                execTask.setStatus(DCWormsTags.RESUMED);
268                                Executable exec = (Executable) execTask;
269
270                                boolean status = allocateResources(exec, resources, exclusive);
271                                if(status == false){
272                                        TaskList newTasks = new TaskListImpl();
273                                        newTasks.add(exec);
274                                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
275                                        exec.setStatus(DCWormsTags.READY);
276                                } else {
277                                        runTask(execTask);
278                                       
279                                        PEUnit peUnit = (PEUnit)resources.get(StandardResourceUnitName.PE);
280                                        updateComputingResources(peUnit, EnergyEventType.TASK_STARTED, exec);
281                                }
282
283                        } catch (Exception e) {
284                                // TODO Auto-generated catch block
285                                e.printStackTrace();
286                        }
287                }                       
288        }
289       
290        public void taskMove(ExecTask execTask, Map<ResourceUnitName, ResourceUnit> map) {
291                taskPause(execTask);
292                taskResume(execTask, map, false);
293        }
294       
295        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
296                if (pluginSupportsEvent(DCWormsTags.TASK_EXECUTION_FINISHED)) {
297                        SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
298                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
299                                        queues, jobRegistry, getResourceManager(), moduleList);
300                        executeSchedulingPlan(decision);
301                }
302                //if(scheduler.getParent() != null){
303                        sendFinishedWorkloadUnit(wu);
304                //}
305        }
306       
307        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
308
309                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
310                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
311                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
312
313                        if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
314                                continue;
315                        }
316
317                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
318
319                        TaskInterface<?> task = taskDecision.getTask();
320                        for (int j = 0; j < allocations.size(); j++) {
321
322                                AllocationInterface<?> allocation = allocations.get(j);
323                                if (allocation.getRequestedResources() == null || allocation.getRequestedResources().size() > 0) {
324                                        ExecTask exec = (ExecTask) task;                                       
325                                        executeTask(exec, allocation.getRequestedResources());
326                                } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){
327                                        allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName()));
328                                        submitTask(task, allocation);
329                                } else {
330                                        ExecTask exec = (ExecTask) task;
331                                        executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), exec));
332                                }
333                        }
334                }
335        }
336
337        protected void executeTask(ExecTask task, Map<ResourceUnitName, ResourceUnit> choosenResources) {
338
339                Executable exec = (Executable)task;
340                boolean allocationStatus = allocateResources(exec, choosenResources, false);
341                if(allocationStatus == false){
342                        log.info("Task " + task.getJobId() + "_" + task.getId() + " requires more resources than is available at this moment.");
343                        return;
344                }
345
346                removeFromQueue(task);
347
348                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime());
349
350                runTask(exec);
351               
352                PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE);
353                updateComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
354               
355
356                try {
357                        long expectedDuration = exec.getExpectedDuration().getMillis() / 1000;
358                        scheduler.sendInternal(expectedDuration, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
359                } catch (NoSuchFieldException e) {
360                        //double t = exec.getEstimatedDuration();
361                        //scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
362                }
363
364                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());           
365        }
366       
367        private void runTask(ExecTask execTask){
368                Executable exec = (Executable) execTask;
369                Map<ResourceUnitName, ResourceUnit> resources = exec.getAllocatedResources().getLast().getResourceUnits();
370               
371                try {
372                        exec.setStatus(DCWormsTags.INEXEC);
373                } catch (Exception e) {
374                        // TODO Auto-generated catch block
375                        e.printStackTrace();
376                }
377
378                int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION),
379                                execTask, resources, exec.getCompletionPercentage())).intValue();
380
381                saveExecutionHistory(exec, exec.getCompletionPercentage(), phaseDuration);
382                if(exec.getExecutionProfile().isLast()){
383                        scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);;
384                } else {
385                        scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
386                }
387               
388                PEUnit peUnit = (PEUnit)resources.get(StandardResourceUnitName.PE);
389                updateComputingResources(peUnit, EnergyEventType.TASK_STARTED, exec);
390               
391        }
392        protected void finalizeExecutable(ExecTask execTask){
393               
394                Executable exec = (Executable)execTask;
395                exec.finalizeExecutable();
396               
397                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED);
398                scheduler.sim_cancel(filter, null);
399               
400                Task task;
401                Job job = jobRegistry.getJob(exec.getJobId());
402                try {
403                        task = job.getTask(exec.getTaskId());
404                } catch (NoSuchFieldException e) {
405                        return;
406                }
407                if(exec.getProcessesId() == null){
408                        try {
409                                task.setStatus(exec.getStatus());
410                        } catch (Exception e) {
411                                e.printStackTrace();
412                        }
413                } else {
414                        List<AbstractProcesses> processesList = task.getProcesses();
415                        for(int i = 0; i < processesList.size(); i++){
416                                AbstractProcesses processes = processesList.get(i);
417                                if(processes.getId().equals(exec.getProcessesId())){
418                                        processes.setStatus(exec.getStatus());
419                                        break;
420                                }
421                        }
422                }
423
424                Map<ResourceUnitName, ResourceUnit> lastUsed = exec.getAllocatedResources().getLast().getResourceUnits();
425                getAllocationManager().freeResources(lastUsed, true);
426               
427                //TODO calculate the value of completion
428                saveExecutionHistory(exec, 100,0);
429               
430                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
431                updateComputingResources(peUnit, EnergyEventType.TASK_FINISHED, exec);
432               
433                log.debug(execTask.getJobId() + "_" + execTask.getId() + " finished execution on " + new DateTime());
434                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());
435               
436                updateComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
437        }
438       
439        protected void updateProcessingProgress() {
440
441                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
442                if (timeSpan <= 0.0) {
443                        // don't update when nothing changed
444                        return;
445                }
446                lastUpdateTime = Sim_system.clock();
447                Iterator<ExecTask> iter = jobRegistry.getRunningTasks().iterator();
448                while (iter.hasNext()) {
449                        ExecTask task = iter.next();
450                        Executable exec = (Executable)task;
451                        ExecutionHistoryItem execHistItem = exec.getExecutionHistory().getLast();
452                        //System.out.println("--- upadteProgressX: " + Sim_system.sim_clock() );
453                        //System.out.println("taskId: " + exec.getId() + "; completion percentage: " + exec.getCompletionPercentage() + "; timespan: " + timeSpan + "; estimatedDuration: " +  execHistItem.getCompletionPercentage());
454                        exec.setCompletionPercentage(exec.getCompletionPercentage() + 100 * (timeSpan / (execHistItem.getEstimatedDuration()) * (1.0 - execHistItem.getCompletionPercentage()/100.0)));
455                        //System.out.println("newProgress: " + exec.getCompletionPercentage()  );       
456
457                }
458        }
459       
460        private void updateComputingResources(PEUnit peUnit, EnergyEventType eventType, Object obj){
461                if(peUnit instanceof ProcessingElements){
462                        ProcessingElements pes = (ProcessingElements) peUnit;
463                        for (ComputingResource resource : pes) {
464                                resource.handleEvent(new EnergyEvent(eventType, obj, scheduler.getFullName()));
465                                //DataCenterWorkloadSimulator.getEventManager().sendToResources(resource.getType(), 0, new EnergyEvent(eventType, obj));
466                        }
467                        /*try {
468                                for (ComputingResource resource : resourceManager.getResourcesOfType(pes.get(0).getType())) {
469                                        resource.handleEvent(new EnergyEvent(eventType, obj));
470                                }
471                        } catch (ResourceException e) {
472                                // TODO Auto-generated catch block
473                                e.printStackTrace();
474                        }*/
475                } else {
476                        ComputingResource resource = null;
477                        try {
478                                resource = SimulatedEnvironment.getComputingResourceByName(peUnit.getResourceId());
479                        } catch (ResourceException e) {
480                                return;
481                        }
482                        resource.handleEvent(new EnergyEvent(eventType, obj, scheduler.getFullName()));
483                }
484        }
485
486        protected void updateProcessingTimes() {
487                for (ExecTask execTask : jobRegistry.getRunningTasks()) {
488                        Executable exec = (Executable)execTask;
489
490
491                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getAllocatedResources().getLast().getResourceUnits();
492
493                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED),
494                                        execTask, choosenResources, exec.getCompletionPercentage())).intValue();
495
496                        ExecutionHistoryItem execHistItem = exec.getExecutionHistory().getLast();
497                        double lastTimeStamp = execHistItem.getTimeStamp().getMillis()/1000;
498                        if(DoubleMath.subtract((lastTimeStamp + execHistItem.getEstimatedDuration()), (new DateTime().getMillis()/1000 + phaseDuration)) == 0.0){
499                                continue;
500                        }
501                        //System.out.println("=== upadteTIme: " + Sim_system.sim_clock() );
502                        //System.out.println("execId: " + exec.getId() +  "; estimatedDuration " + execHistItem.getEstimatedDuration());
503                        //System.out.println("execId: " + exec.getId() + "; difference " + DoubleMath.subtract((lastTimeStamp + execHistItem.getEstimatedDuration()), (new DateTime().getMillis()/1000 + phaseDuration)));
504                        //System.out.println("completionPercantage: " + exec.getCompletionPercentage() + "; basic duration: " +exec.getResourceConsumptionProfile().getCurrentResourceConsumption().getDuration() +   "; phaseDuration: " +  phaseDuration);
505
506                        saveExecutionHistory(exec, exec.getCompletionPercentage(), phaseDuration);
507                       
508                        if(exec.getExecutionProfile().isLast()){
509                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED);
510                                scheduler.sim_cancel(filter, null);
511                                scheduler.sendInternal(phaseDuration , DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
512                                //PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
513                                //notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
514                        } else{
515                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_CHANGED);
516                                scheduler.sim_cancel(filter, null);
517                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
518                                //PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
519                                //notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
520                        }
521                }
522        }       
523
524        protected void updateTaskExecutionPhase(ExecTask execTask, SchedulingEventType schedEvType) {
525
526                if (execTask.getStatus() == DCWormsTags.INEXEC) {
527                        Executable exec = (Executable)execTask;
528                       
529                        try {
530                                exec.setStatus(DCWormsTags.NEW_EXEC_PHASE);
531                        } catch (Exception e) {
532                               
533                        }
534
535                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getAllocatedResources().getLast().getResourceUnits();
536
537                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(schedEvType),
538                                        execTask, choosenResources, exec.getCompletionPercentage())).intValue();
539                       
540                        saveExecutionHistory(exec, exec.getCompletionPercentage(), phaseDuration);
541                       
542                        if(exec.getExecutionProfile().isLast()){
543                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
544                        } else {
545                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
546                        }
547                       
548                        PEUnit peUnit = (PEUnit)exec.getAllocatedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
549                        updateComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
550                }
551        }       
552
553        public double calculateTotalLoad() {
554
555                DCwormsAccumulator loadAcc = new DCwormsAccumulator();
556                try {
557                        for(ComputingResource compRes: getResourceManager().getResourcesOfType(StandardResourceType.Node)){
558                                loadAcc.add(compRes.getLoadInterface().getRecentUtilization().getValue());                             
559                        }
560                } catch (ResourceException e) {
561                        // TODO Auto-generated catch block
562                        e.printStackTrace();
563                }
564               
565                return loadAcc.getMean();
566        }
567
568        private Map<ResourceUnitName, ResourceUnit> chooseResourcesForExecution(String resourceName,
569                        ExecTask task) {
570
571                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>();
572                LocalResourceManager resourceManager = getResourceManager();
573                if(resourceName != null){
574                        ComputingResource resource = resourceManager.getResourceByName(resourceName);
575                        if(resource == null){
576                                return null;
577                        }
578                        resourceManager = new LocalResourceManager(resource);
579                }
580
581                int cpuRequest;
582                try {
583                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
584                } catch (NoSuchFieldException e) {
585                        cpuRequest = 1;
586                }
587
588                if (cpuRequest != 0) {
589                       
590                        List<ResourceUnit> availableUnits = null;
591                        try {
592                                availableUnits = resourceManager.getPE();
593                        } catch (ResourceException e) {
594                                return null;
595                        }
596                       
597                        List<ResourceUnit> choosenPEUnits = new ArrayList<ResourceUnit>();
598                        for (int i = 0; i < availableUnits.size() && cpuRequest > 0; i++) {
599                                PEUnit peUnit = (PEUnit) availableUnits.get(i);
600                                if(peUnit.getFreeAmount() > 0){
601                                        int allocPE = Math.min(peUnit.getFreeAmount(), cpuRequest);
602                                        cpuRequest = cpuRequest - allocPE;
603                                        choosenPEUnits.add(peUnit.replicate(allocPE)); 
604                                }       
605                        }
606                       
607                        if(cpuRequest > 0){
608                                return null;
609                        }
610                        map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0));
611                }
612
613                return  map;
614        }
615       
616        public void notifySubmittedWorkloadUnit(WorkloadUnit wu) {
617                //250314
618                //updateProcessingProgress();
619                if (log.isInfoEnabled())
620                        log.info("Received job " + wu.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
621                registerWorkloadUnit(wu);
622        }
623
624        private void registerWorkloadUnit(WorkloadUnit wu){
625                if(!wu.isRegistered()){
626                        wu.register(jobRegistry);
627                }
628                wu.accept(getWorkloadUnitHandler());
629        }
630       
631        class LocalWorkloadUnitHandler implements WorkloadUnitHandler{
632               
633                public void handleJob(JobInterface<?> job){
634
635                        if (log.isInfoEnabled())
636                                log.info("Handle " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
637                       
638                        try {
639                                if(job.getStatus() == BrokerConstants.JOB_STATUS_UNSUBMITTED)
640                                        job.setStatus((int)BrokerConstants.JOB_STATUS_SUBMITTED);
641                        } catch (Exception e) {
642                                e.printStackTrace();
643                        }
644                        List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
645                        jobsList.add(job);
646                        TaskListImpl availableTasks = new TaskListImpl();
647                        for(Task task: jobRegistry.getAvailableTasks(jobsList)){
648                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
649                                availableTasks.add(task);
650                        }
651
652                        for(TaskInterface<?> task: availableTasks){     
653                                registerWorkloadUnit(task);
654                        }
655                }
656               
657                public void handleTask(TaskInterface<?> t){
658                        Task task = (Task)t;
659                        List<AbstractProcesses> processes = task.getProcesses();
660
661                        if(processes == null || processes.size() == 0){
662                                Executable exec = new Executable(task);
663                                registerWorkloadUnit(exec);
664                        } else {
665                                for(int j = 0; j < processes.size(); j++){
666                                        AbstractProcesses procesesSet = processes.get(j);
667                                        Executable exec = new Executable(task, procesesSet);
668                                        registerWorkloadUnit(exec);
669                                }
670                        }
671                }
672               
673                public void handleExecutable(ExecTask task){
674                       
675                        Executable exec = (Executable) task;
676                        jobRegistry.addExecTask(exec);
677
678                        exec.setSchedulerName(scheduler.getFullName());
679                       
680                        TaskList newTasks = new TaskListImpl();
681                        newTasks.add(exec);
682               
683                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
684
685                        if (exec.getStatus() == DCWormsTags.QUEUED) {
686                                sendExecutableReadyEvent(exec);
687                        }
688                }
689        }
690
691        public WorkloadUnitHandler getWorkloadUnitHandler() {
692                return new LocalWorkloadUnitHandler();
693        }
694       
695       
696        public LocalResourceManager getResourceManager() {
697                if (resourceManager instanceof ResourceManager)
698                        return (LocalResourceManager) resourceManager;
699                else
700                        return null;
701        }
702       
703       
704        private void saveExecutionHistory(Executable exec, double completionPercentage, double estimatedDuration){
705
706                ExecutionHistoryItem execHistoryItem = new ExecutionHistoryItem(new DateTime());
707                execHistoryItem.setCompletionPercentage(completionPercentage);
708                execHistoryItem.setEstimatedDuration(estimatedDuration);
709                execHistoryItem.setResIndex(exec.getAllocatedResources().size() -1);
710                execHistoryItem.setStatus(exec.getStatus());
711                exec.addExecHistory(execHistoryItem);
712               
713        }
714       
715        public boolean allocateResources(Executable exec, Map<ResourceUnitName, ResourceUnit> choosenResources, boolean exclusive){
716                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources, exclusive);
717                if(allocationStatus){
718                        ResourceItem resourceHistoryItem = new ResourceItem(choosenResources);
719                        exec.addAllocatedResources(resourceHistoryItem);
720                        return true;
721                }
722                return false;
723        }
724       
725
726       
727}
Note: See TracBrowser for help on using the repository browser.