source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/policy/local/LocalManagementSystem.java @ 1392

Revision 1392, 27.4 KB checked in by wojtekp, 11 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.local;
2
3import java.util.ArrayList;
4import java.util.HashMap;
5import java.util.Iterator;
6import java.util.LinkedList;
7import java.util.List;
8import java.util.Map;
9import java.util.Set;
10
11import org.apache.commons.logging.Log;
12import org.apache.commons.logging.LogFactory;
13import org.joda.time.DateTime;
14import org.joda.time.DateTimeUtilsExt;
15import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
16
17import qcg.shared.constants.BrokerConstants;
18import schedframe.SimulatedEnvironment;
19import schedframe.events.scheduling.SchedulingEvent;
20import schedframe.events.scheduling.SchedulingEventType;
21import schedframe.events.scheduling.StartTaskExecutionEvent;
22import schedframe.events.scheduling.TaskFinishedEvent;
23import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent;
24import schedframe.exceptions.ResourceException;
25import schedframe.resources.StandardResourceType;
26import schedframe.resources.computing.ComputingResource;
27import schedframe.resources.computing.profiles.energy.EnergyEvent;
28import schedframe.resources.computing.profiles.energy.EnergyEventType;
29import schedframe.resources.units.PEUnit;
30import schedframe.resources.units.ProcessingElements;
31import schedframe.resources.units.ResourceUnit;
32import schedframe.resources.units.ResourceUnitName;
33import schedframe.resources.units.StandardResourceUnitName;
34import schedframe.scheduling.ExecutionHistoryItem;
35import schedframe.scheduling.ResourceHistoryItem;
36import schedframe.scheduling.Scheduler;
37import schedframe.scheduling.TaskList;
38import schedframe.scheduling.TaskListImpl;
39import schedframe.scheduling.WorkloadUnitHandler;
40import schedframe.scheduling.manager.resources.LocalResourceManager;
41import schedframe.scheduling.manager.resources.ManagedResources;
42import schedframe.scheduling.manager.resources.ResourceManager;
43import schedframe.scheduling.plan.AllocationInterface;
44import schedframe.scheduling.plan.ScheduledTaskInterface;
45import schedframe.scheduling.plan.SchedulingPlanInterface;
46import schedframe.scheduling.plugin.SchedulingPlugin;
47import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
48import schedframe.scheduling.plugin.grid.ModuleListImpl;
49import schedframe.scheduling.policy.AbstractManagementSystem;
50import schedframe.scheduling.queue.TaskQueueList;
51import schedframe.scheduling.tasks.AbstractProcesses;
52import schedframe.scheduling.tasks.Job;
53import schedframe.scheduling.tasks.JobInterface;
54import schedframe.scheduling.tasks.Task;
55import schedframe.scheduling.tasks.TaskInterface;
56import schedframe.scheduling.tasks.WorkloadUnit;
57import simulator.DCWormsConstants;
58import simulator.stats.GSSAccumulator;
59import simulator.utils.DoubleMath;
60import dcworms.schedframe.scheduling.ExecTask;
61import dcworms.schedframe.scheduling.Executable;
62import eduni.simjava.Sim_event;
63import eduni.simjava.Sim_system;
64import gridsim.dcworms.DCWormsTags;
65import gridsim.dcworms.filter.ExecTaskFilter;
66
67public class LocalManagementSystem extends AbstractManagementSystem {
68
69        private Log log = LogFactory.getLog(LocalManagementSystem.class);
70
71        protected double lastUpdateTime;
72
73        public LocalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
74                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
75                        throws Exception {
76
77                super(providerId, entityName, execTimeEstimationPlugin, queues);
78               
79                if (schedPlugin == null) {
80                        throw new Exception("Can not create local scheduling plugin instance.");
81                }
82                this.schedulingPlugin =  schedPlugin;
83                this.moduleList = new ModuleListImpl(1);
84        }
85
86        public void init(Scheduler sched, ManagedResources managedResources) {
87                super.init(sched, managedResources);
88        }
89
90        public void processEvent(Sim_event ev) {
91
92                updateProcessingProgress();
93
94                int tag = ev.get_tag();
95
96                switch (tag) {
97
98                case DCWormsTags.TIMER:
99                        if (pluginSupportsEvent(tag)) {
100                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER);
101                                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
102                                                queues, jobRegistry, getResourceManager(), moduleList);
103                                executeSchedulingPlan(decision);
104                        }
105                        sendTimerEvent();
106                        break;
107
108                case DCWormsTags.TASK_READY_FOR_EXECUTION:
109                        ExecTask execTask = (ExecTask) ev.get_data();
110                        try {
111                                execTask.setStatus(DCWormsTags.READY);
112                                if (pluginSupportsEvent(tag)) {
113                                        SchedulingEvent event = new StartTaskExecutionEvent(execTask.getJobId(), execTask.getId());
114                                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
115                                                        queues, jobRegistry, getResourceManager(), moduleList);
116                                        executeSchedulingPlan(decision);
117                                }
118                        } catch (Exception e) {
119                                e.printStackTrace();
120                        }
121                        break;
122
123                case DCWormsTags.TASK_EXECUTION_FINISHED:
124                        execTask = (ExecTask) ev.get_data();
125                        if (execTask.getStatus() == DCWormsTags.INEXEC) {
126                               
127                                finalizeExecutable(execTask);
128                                sendFinishedWorkloadUnit(execTask);
129                                if (pluginSupportsEvent(tag)) {
130                                        SchedulingEvent event = new TaskFinishedEvent(execTask.getJobId(), execTask.getId());
131                                        SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
132                                                        queues, jobRegistry, getResourceManager(), moduleList);
133                                        executeSchedulingPlan(decision);
134                                }
135                        }
136
137                        Job job = jobRegistry.getJob(execTask.getJobId());
138                        if(!job.isFinished()){
139                                getWorkloadUnitHandler().handleJob(job);
140                        }
141                        //SUPPORTS PRECEDING CONSTRAINST COMING BOTH FROM SWF FILES
142                        /*else {
143                                try {
144                                        job.setStatus((int)BrokerConstants.JOB_STATUS_FINISHED);
145                                } catch (Exception e) {
146
147                                }
148                                for(JobInterface<?> j: jobRegistry.getJobs((int)BrokerConstants.JOB_STATUS_SUBMITTED)){
149                                        getWorkloadUnitHandler().handleJob(j); 
150                                }
151                        }*/
152                        break;
153                       
154                case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED:
155                        execTask = (Executable) ev.get_data();
156                        if (pluginSupportsEvent(tag)) {
157                                SchedulingEvent event = new TaskRequestedTimeExpiredEvent(execTask.getJobId(), execTask.getId());
158                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
159                                                queues, jobRegistry, getResourceManager(), moduleList);
160                                executeSchedulingPlan(decision);
161                        }
162                        break;
163                       
164                case DCWormsTags.TASK_PAUSE:{
165                        String[] ids = (String[]) ev.get_data();
166                        execTask = jobRegistry.getTask(ids[0], ids[1]);
167                        taskPause(execTask);
168                        if (pluginSupportsEvent(tag)) {
169                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_PAUSED);
170                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
171                                                queues, jobRegistry, getResourceManager(), moduleList);
172                                executeSchedulingPlan(decision);
173                        }
174                }
175                break;
176               
177                case DCWormsTags.TASK_RESUME:{
178                        String[] ids = (String[]) ev.get_data();
179                        execTask = jobRegistry.getTask(ids[0], ids[1]);
180                        taskResume(execTask, execTask.getAllocatedResources().getLast().getResourceUnits(), true);
181                        if (pluginSupportsEvent(tag)) {
182                                SchedulingEvent event = new StartTaskExecutionEvent(ids[0], ids[1]);
183                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
184                                                queues, jobRegistry, getResourceManager(), moduleList);
185                                executeSchedulingPlan(decision);
186                        }
187                }
188                break;
189               
190                case DCWormsTags.TASK_MIGRATE:{
191                        Object[] data = (Object[]) ev.get_data();
192                        execTask = jobRegistry.getTask((String)data[0], (String)data[1]);
193                        double migrationTime = execTimeEstimationPlugin.estimateMigrationTime(new StartTaskExecutionEvent((String)data[0], (String)data[1]), execTask, execTask.getAllocatedResources().getLast().getResourceUnits(), (Map<ResourceUnitName, ResourceUnit>)data[2]);
194                        scheduler.sendInternal(migrationTime, DCWormsTags.TASK_MOVE, data);
195                }
196                break;
197               
198                case DCWormsTags.TASK_MOVE:{
199                        Object[] data = (Object[]) ev.get_data();
200                        execTask = jobRegistry.getTask((String)data[0], (String)data[1]);
201                        taskMove(execTask, (Map<ResourceUnitName, ResourceUnit>)data[2]);
202                        if (pluginSupportsEvent(tag)) {
203                                SchedulingEvent event = new StartTaskExecutionEvent((String)data[0], (String)data[1]);
204                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
205                                                queues, jobRegistry, getResourceManager(), moduleList);
206                                executeSchedulingPlan(decision);
207                        }
208                }
209                break;
210                       
211                case DCWormsTags.TASK_EXECUTION_CHANGED:
212                        execTask = (ExecTask) ev.get_data();
213                        updateTaskExecutionPhase(execTask, SchedulingEventType.RESOURCE_STATE_CHANGED);
214                        break;
215
216                case DCWormsTags.UPDATE_PROCESSING:
217                        updateProcessingTimes();
218                        break;
219                       
220                case DCWormsTags.POWER_LIMIT_EXCEEDED:
221                        if (pluginSupportsEvent(tag)) {
222                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.POWER_LIMIT_EXCEEDED);
223                                String source;
224                                try{
225                                        source = ev.get_data().toString();
226                                } catch(Exception e){
227                                        source = null;
228                                }
229                                event.setSource(source);
230                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
231                                                queues, jobRegistry, getResourceManager(), moduleList);
232                                executeSchedulingPlan(decision);
233                        }
234                        break;
235                }
236        }
237
238
239        public void taskPause(ExecTask execTask) {
240                if (execTask == null) {
241                        return;
242                } else {
243                        try {
244                                execTask.setStatus(DCWormsTags.PAUSED);
245                               
246                                Executable exec = (Executable) execTask;
247                                Map<ResourceUnitName, ResourceUnit> lastUsed = exec.getAllocatedResources().getLast().getResourceUnits();
248                                getAllocationManager().freeResources(lastUsed, true);
249                               
250                                saveExecutionHistory(exec, exec.getCompletionPercentage(), exec.getEstimatedDuration());
251                               
252                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), -1);
253                                scheduler.sim_cancel(filter, null);
254                               
255                                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
256                                updateComputingResources(peUnit, EnergyEventType.TASK_FINISHED, exec);
257                        } catch (Exception e) {
258                                // TODO Auto-generated catch block
259                                e.printStackTrace();
260                        }
261                }               
262        }
263       
264        public void taskResume(ExecTask execTask, Map<ResourceUnitName, ResourceUnit> resources, boolean exclusive) {
265                if (execTask == null) {
266                        return;
267                } else if (execTask.getStatus() == DCWormsTags.PAUSED) {
268                        try {
269                                execTask.setStatus(DCWormsTags.RESUMED);
270                                Executable exec = (Executable) execTask;
271
272                                boolean status = allocateResources(exec, resources, exclusive);
273                                if(status == false){
274                                        TaskList newTasks = new TaskListImpl();
275                                        newTasks.add(exec);
276                                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
277                                        exec.setStatus(DCWormsTags.READY);
278                                } else {
279                                        runTask(execTask);
280                                       
281                                        PEUnit peUnit = (PEUnit)resources.get(StandardResourceUnitName.PE);
282                                        updateComputingResources(peUnit, EnergyEventType.TASK_STARTED, exec);
283                                }
284
285                        } catch (Exception e) {
286                                // TODO Auto-generated catch block
287                                e.printStackTrace();
288                        }
289                }                       
290        }
291       
292        public void taskMove(ExecTask execTask, Map<ResourceUnitName, ResourceUnit> map) {
293                taskPause(execTask);
294                taskResume(execTask, map, false);
295        }
296       
297        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
298                if (pluginSupportsEvent(DCWormsTags.TASK_EXECUTION_FINISHED)) {
299                        SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
300                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
301                                        queues, jobRegistry, getResourceManager(), moduleList);
302                        executeSchedulingPlan(decision);
303                }
304                //if(scheduler.getParent() != null){
305                        sendFinishedWorkloadUnit(wu);
306                //}
307        }
308       
309        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
310
311                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
312                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
313                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
314
315                        if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
316                                continue;
317                        }
318
319                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
320
321                        TaskInterface<?> task = taskDecision.getTask();
322                        for (int j = 0; j < allocations.size(); j++) {
323
324                                AllocationInterface<?> allocation = allocations.get(j);
325                                if (allocation.getRequestedResources() == null || allocation.getRequestedResources().size() > 0) {
326                                        ExecTask exec = (ExecTask) task;                                       
327                                        executeTask(exec, allocation.getRequestedResources());
328                                } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){
329                                        allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName()));
330                                        submitTask(task, allocation);
331                                } else {
332                                        ExecTask exec = (ExecTask) task;
333                                        executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), exec));
334                                }
335                        }
336                }
337        }
338
339        protected void executeTask(ExecTask task, Map<ResourceUnitName, ResourceUnit> choosenResources) {
340
341                Executable exec = (Executable)task;
342                boolean allocationStatus = allocateResources(exec, choosenResources, false);
343                if(allocationStatus == false){
344                        log.info("Task " + task.getJobId() + "_" + task.getId() + " requires more resources than is available at this moment.");
345                        return;
346                }
347
348                removeFromQueue(task);
349
350                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime());
351
352                runTask(exec);
353               
354                PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE);
355                updateComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
356               
357
358                try {
359                        long expectedDuration = exec.getExpectedDuration().getMillis() / 1000;
360                        scheduler.sendInternal(expectedDuration, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
361                } catch (NoSuchFieldException e) {
362                        //double t = exec.getEstimatedDuration();
363                        //scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
364                }
365
366                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());           
367        }
368       
369        private void runTask(ExecTask execTask){
370                Executable exec = (Executable) execTask;
371                Map<ResourceUnitName, ResourceUnit> resources = exec.getAllocatedResources().getLast().getResourceUnits();
372               
373                try {
374                        exec.setStatus(DCWormsTags.INEXEC);
375                } catch (Exception e) {
376                        // TODO Auto-generated catch block
377                        e.printStackTrace();
378                }
379
380                int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION),
381                                execTask, resources, exec.getCompletionPercentage())).intValue();
382
383                saveExecutionHistory(exec, exec.getCompletionPercentage(), phaseDuration);
384                if(exec.getResourceConsumptionProfile().isLast()){
385                        scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);;
386                } else {
387                        scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
388                }
389               
390                PEUnit peUnit = (PEUnit)resources.get(StandardResourceUnitName.PE);
391                updateComputingResources(peUnit, EnergyEventType.TASK_STARTED, exec);
392               
393        }
394        protected void finalizeExecutable(ExecTask execTask){
395               
396                Executable exec = (Executable)execTask;
397                exec.finalizeExecutable();
398               
399                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED);
400                scheduler.sim_cancel(filter, null);
401               
402                Task task;
403                Job job = jobRegistry.getJob(exec.getJobId());
404                try {
405                        task = job.getTask(exec.getTaskId());
406                } catch (NoSuchFieldException e) {
407                        return;
408                }
409                if(exec.getProcessesId() == null){
410                        try {
411                                task.setStatus(exec.getStatus());
412                        } catch (Exception e) {
413                                e.printStackTrace();
414                        }
415                } else {
416                        List<AbstractProcesses> processesList = task.getProcesses();
417                        for(int i = 0; i < processesList.size(); i++){
418                                AbstractProcesses processes = processesList.get(i);
419                                if(processes.getId().equals(exec.getProcessesId())){
420                                        processes.setStatus(exec.getStatus());
421                                        break;
422                                }
423                        }
424                }
425
426                Map<ResourceUnitName, ResourceUnit> lastUsed = exec.getAllocatedResources().getLast().getResourceUnits();
427                getAllocationManager().freeResources(lastUsed, true);
428               
429                //TODO calculate the value of completion
430                saveExecutionHistory(exec, 100,0);
431               
432                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
433                updateComputingResources(peUnit, EnergyEventType.TASK_FINISHED, exec);
434               
435                log.debug(execTask.getJobId() + "_" + execTask.getId() + " finished execution on " + new DateTime());
436                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());
437               
438                updateComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
439        }
440       
441        protected void updateProcessingProgress() {
442
443                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
444                if (timeSpan <= 0.0) {
445                        // don't update when nothing changed
446                        return;
447                }
448                lastUpdateTime = Sim_system.clock();
449                Iterator<ExecTask> iter = jobRegistry.getRunningTasks().iterator();
450                while (iter.hasNext()) {
451                        ExecTask task = iter.next();
452                        Executable exec = (Executable)task;
453                        ExecutionHistoryItem execHistItem = exec.getExecHistory().getLast();
454                        //System.out.println("--- upadteProgressX: " + Sim_system.sim_clock() );
455                        //System.out.println("taskId: " + exec.getId() + "; completion percentage: " + exec.getCompletionPercentage() + "; timespan: " + timeSpan + "; estimatedDuration: " +  execHistItem.getCompletionPercentage());
456                        exec.setCompletionPercentage(exec.getCompletionPercentage() + 100 * (timeSpan / (execHistItem.getEstimatedDuration()) * (1.0 - execHistItem.getCompletionPercentage()/100.0)));
457                        //System.out.println("newProgress: " + exec.getCompletionPercentage()  );       
458
459                }
460        }
461       
462        private void updateComputingResources(PEUnit peUnit, EnergyEventType eventType, Object obj){
463                if(peUnit instanceof ProcessingElements){
464                        ProcessingElements pes = (ProcessingElements) peUnit;
465                        for (ComputingResource resource : pes) {
466                                resource.handleEvent(new EnergyEvent(eventType, obj, scheduler.getFullName()));
467                                //DataCenterWorkloadSimulator.getEventManager().sendToResources(resource.getType(), 0, new EnergyEvent(eventType, obj));
468                        }
469                        /*try {
470                                for (ComputingResource resource : resourceManager.getResourcesOfType(pes.get(0).getType())) {
471                                        resource.handleEvent(new EnergyEvent(eventType, obj));
472                                }
473                        } catch (ResourceException e) {
474                                // TODO Auto-generated catch block
475                                e.printStackTrace();
476                        }*/
477                } else {
478                        ComputingResource resource = null;
479                        try {
480                                resource = SimulatedEnvironment.getComputingResourceByName(peUnit.getResourceId());
481                        } catch (ResourceException e) {
482                                return;
483                        }
484                        resource.handleEvent(new EnergyEvent(eventType, obj, scheduler.getFullName()));
485                }
486        }
487
488        protected void updateProcessingTimes() {
489                for (ExecTask execTask : jobRegistry.getRunningTasks()) {
490                        Executable exec = (Executable)execTask;
491
492
493                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getAllocatedResources().getLast().getResourceUnits();
494
495                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED),
496                                        execTask, choosenResources, exec.getCompletionPercentage())).intValue();
497
498                        ExecutionHistoryItem execHistItem = exec.getExecHistory().getLast();
499                        double lastTimeStamp = execHistItem.getTimeStamp().getMillis()/1000;
500                        if(DoubleMath.subtract((lastTimeStamp + execHistItem.getEstimatedDuration()), (new DateTime().getMillis()/1000 + phaseDuration)) == 0.0){
501                                continue;
502                        }
503                        //System.out.println("=== upadteTIme: " + Sim_system.sim_clock() );
504                        //System.out.println("execId: " + exec.getId() +  "; estimatedDuration " + execHistItem.getEstimatedDuration());
505                        //System.out.println("execId: " + exec.getId() + "; difference " + DoubleMath.subtract((lastTimeStamp + execHistItem.getEstimatedDuration()), (new DateTime().getMillis()/1000 + phaseDuration)));
506                        //System.out.println("completionPercantage: " + exec.getCompletionPercentage() + "; basic duration: " +exec.getResourceConsumptionProfile().getCurrentResourceConsumption().getDuration() +   "; phaseDuration: " +  phaseDuration);
507
508                        saveExecutionHistory(exec, exec.getCompletionPercentage(), phaseDuration);
509                       
510                        if(exec.getResourceConsumptionProfile().isLast()){
511                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED);
512                                scheduler.sim_cancel(filter, null);
513                                scheduler.sendInternal(phaseDuration , DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
514                                //PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
515                                //notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
516                        } else{
517                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_CHANGED);
518                                scheduler.sim_cancel(filter, null);
519                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
520                                //PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
521                                //notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
522                        }
523                }
524        }       
525
526        protected void updateTaskExecutionPhase(ExecTask execTask, SchedulingEventType schedEvType) {
527
528                if (execTask.getStatus() == DCWormsTags.INEXEC) {
529                        Executable exec = (Executable)execTask;
530                       
531                        try {
532                                exec.setStatus(DCWormsTags.NEW_EXEC_PHASE);
533                        } catch (Exception e) {
534                               
535                        }
536
537                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getAllocatedResources().getLast().getResourceUnits();
538
539                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(schedEvType),
540                                        execTask, choosenResources, exec.getCompletionPercentage())).intValue();
541                       
542                        saveExecutionHistory(exec, exec.getCompletionPercentage(), phaseDuration);
543                       
544                        if(exec.getResourceConsumptionProfile().isLast()){
545                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
546                        } else {
547                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
548                        }
549                       
550                        PEUnit peUnit = (PEUnit)exec.getAllocatedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
551                        updateComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
552                }
553        }       
554
555        public double calculateTotalLoad() {
556
557                GSSAccumulator loadAcc = new GSSAccumulator();
558                try {
559                        for(ComputingResource compRes: getResourceManager().getResourcesOfType(StandardResourceType.Node)){
560                                loadAcc.add(compRes.getLoadInterface().getRecentUtilization().getValue());                             
561                        }
562                } catch (ResourceException e) {
563                        // TODO Auto-generated catch block
564                        e.printStackTrace();
565                }
566               
567                return loadAcc.getMean();
568        }
569
570        private Map<ResourceUnitName, ResourceUnit> chooseResourcesForExecution(String resourceName,
571                        ExecTask task) {
572
573                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>();
574                LocalResourceManager resourceManager = getResourceManager();
575                if(resourceName != null){
576                        ComputingResource resource = resourceManager.getResourceByName(resourceName);
577                        if(resource == null){
578                                return null;
579                        }
580                        resourceManager = new LocalResourceManager(resource);
581                }
582
583                int cpuRequest;
584                try {
585                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
586                } catch (NoSuchFieldException e) {
587                        cpuRequest = 1;
588                }
589
590                if (cpuRequest != 0) {
591                       
592                        List<ResourceUnit> availableUnits = null;
593                        try {
594                                availableUnits = resourceManager.getPE();
595                        } catch (ResourceException e) {
596                                return null;
597                        }
598                       
599                        List<ResourceUnit> choosenPEUnits = new ArrayList<ResourceUnit>();
600                        for (int i = 0; i < availableUnits.size() && cpuRequest > 0; i++) {
601                                PEUnit peUnit = (PEUnit) availableUnits.get(i);
602                                if(peUnit.getFreeAmount() > 0){
603                                        int allocPE = Math.min(peUnit.getFreeAmount(), cpuRequest);
604                                        cpuRequest = cpuRequest - allocPE;
605                                        choosenPEUnits.add(peUnit.replicate(allocPE)); 
606                                }       
607                        }
608                       
609                        if(cpuRequest > 0){
610                                return null;
611                        }
612                        map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0));
613                }
614
615                return  map;
616        }
617       
618        public void notifySubmittedWorkloadUnit(WorkloadUnit wu) {
619                //250314
620                //updateProcessingProgress();
621                if (log.isInfoEnabled())
622                        log.info("Received job " + wu.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
623                registerWorkloadUnit(wu);
624        }
625
626        private void registerWorkloadUnit(WorkloadUnit wu){
627                if(!wu.isRegistered()){
628                        wu.register(jobRegistry);
629                }
630                wu.accept(getWorkloadUnitHandler());
631        }
632       
633        class LocalWorkloadUnitHandler implements WorkloadUnitHandler{
634               
635                public void handleJob(JobInterface<?> job){
636
637                        if (log.isInfoEnabled())
638                                log.info("Handle " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
639                       
640                        try {
641                                if(job.getStatus() == BrokerConstants.JOB_STATUS_UNSUBMITTED)
642                                        job.setStatus((int)BrokerConstants.JOB_STATUS_SUBMITTED);
643                        } catch (Exception e) {
644                                e.printStackTrace();
645                        }
646                        List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
647                        jobsList.add(job);
648                        TaskListImpl availableTasks = new TaskListImpl();
649                        for(Task task: jobRegistry.getAvailableTasks(jobsList)){
650                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
651                                availableTasks.add(task);
652                        }
653
654                        for(TaskInterface<?> task: availableTasks){     
655                                registerWorkloadUnit(task);
656                        }
657                }
658               
659                public void handleTask(TaskInterface<?> t){
660                        Task task = (Task)t;
661                        List<AbstractProcesses> processes = task.getProcesses();
662
663                        if(processes == null || processes.size() == 0){
664                                Executable exec = new Executable(task);
665                                registerWorkloadUnit(exec);
666                        } else {
667                                for(int j = 0; j < processes.size(); j++){
668                                        AbstractProcesses procesesSet = processes.get(j);
669                                        Executable exec = new Executable(task, procesesSet);
670                                        registerWorkloadUnit(exec);
671                                }
672                        }
673                }
674               
675                public void handleExecutable(ExecTask task){
676                       
677                        Executable exec = (Executable) task;
678                        jobRegistry.addExecTask(exec);
679
680                        exec.setSchedulerName(scheduler.getFullName());
681                       
682                        TaskList newTasks = new TaskListImpl();
683                        newTasks.add(exec);
684               
685                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
686
687                        if (exec.getStatus() == DCWormsTags.QUEUED) {
688                                sendExecutableReadyEvent(exec);
689                        }
690                }
691        }
692
693        public WorkloadUnitHandler getWorkloadUnitHandler() {
694                return new LocalWorkloadUnitHandler();
695        }
696       
697       
698        public LocalResourceManager getResourceManager() {
699                if (resourceManager instanceof ResourceManager)
700                        return (LocalResourceManager) resourceManager;
701                else
702                        return null;
703        }
704       
705       
706        private void saveExecutionHistory(Executable exec, double completionPercentage, double estimatedDuration){
707
708                ExecutionHistoryItem execHistoryItem = new ExecutionHistoryItem(new DateTime());
709                execHistoryItem.setCompletionPercentage(completionPercentage);
710                execHistoryItem.setEstimatedDuration(estimatedDuration);
711                execHistoryItem.setResIndex(exec.getAllocatedResources().size() -1);
712                execHistoryItem.setStatus(exec.getStatus());
713                exec.addExecHistory(execHistoryItem);
714               
715        }
716       
717        public boolean allocateResources(Executable exec, Map<ResourceUnitName, ResourceUnit> choosenResources, boolean exclusive){
718                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources, exclusive);
719                if(allocationStatus){
720                        ResourceHistoryItem resourceHistoryItem = new ResourceHistoryItem(choosenResources);
721                        exec.addAllocatedResources(resourceHistoryItem);
722                        saveResourceHistory(exec);
723                        return true;
724                }
725                return false;
726        }
727       
728        private void saveResourceHistory(Executable exec){
729                ProcessingElements pes = (ProcessingElements) exec.getAllocatedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
730                for (ComputingResource resource : pes) {
731
732                        LinkedList<ComputingResource> toExamine = new LinkedList<ComputingResource>();
733                        toExamine.push(resource);
734
735                        while (!toExamine.isEmpty()) {
736                                ComputingResource compResource = toExamine.pop();
737                                List<ComputingResource> resources = compResource.getChildren();
738                                if(resources.isEmpty()){
739                                        Set<String> visitedResources = exec.getAllocatedResources().getLast().getVisitedResources();
740                                        if(!visitedResources.contains(compResource.getFullName())){
741                                                exec.getAllocatedResources().getLast().trackResource(compResource.getFullName());
742                                        }
743                                } else {
744                                        for (int i = 0; i < resources.size(); i++) {
745                                                ComputingResource resourceChild = resources.get(i);
746                                                toExamine.addLast(resourceChild);
747                                        }
748                                }
749                        }
750                }
751        }
752       
753}
Note: See TracBrowser for help on using the repository browser.