source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/policy/local/LocalManagementSystem.java @ 1374

Revision 1374, 26.7 KB checked in by wojtekp, 11 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.local;
2
3import java.util.ArrayList;
4import java.util.HashMap;
5import java.util.Iterator;
6import java.util.LinkedList;
7import java.util.List;
8import java.util.Map;
9import java.util.Set;
10
11import org.apache.commons.logging.Log;
12import org.apache.commons.logging.LogFactory;
13import org.joda.time.DateTime;
14import org.joda.time.DateTimeUtilsExt;
15import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
16
17import qcg.shared.constants.BrokerConstants;
18import schedframe.SimulatedEnvironment;
19import schedframe.events.scheduling.SchedulingEvent;
20import schedframe.events.scheduling.SchedulingEventType;
21import schedframe.events.scheduling.StartTaskExecutionEvent;
22import schedframe.events.scheduling.TaskFinishedEvent;
23import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent;
24import schedframe.exceptions.ResourceException;
25import schedframe.resources.StandardResourceType;
26import schedframe.resources.computing.ComputingResource;
27import schedframe.resources.computing.profiles.energy.EnergyEvent;
28import schedframe.resources.computing.profiles.energy.EnergyEventType;
29import schedframe.resources.units.PEUnit;
30import schedframe.resources.units.ProcessingElements;
31import schedframe.resources.units.ResourceUnit;
32import schedframe.resources.units.ResourceUnitName;
33import schedframe.resources.units.StandardResourceUnitName;
34import schedframe.scheduling.ExecutionHistoryItem;
35import schedframe.scheduling.ResourceHistoryItem;
36import schedframe.scheduling.Scheduler;
37import schedframe.scheduling.TaskList;
38import schedframe.scheduling.TaskListImpl;
39import schedframe.scheduling.WorkloadUnitHandler;
40import schedframe.scheduling.manager.resources.LocalResourceManager;
41import schedframe.scheduling.manager.resources.ManagedResources;
42import schedframe.scheduling.manager.resources.ResourceManager;
43import schedframe.scheduling.plan.AllocationInterface;
44import schedframe.scheduling.plan.ScheduledTaskInterface;
45import schedframe.scheduling.plan.SchedulingPlanInterface;
46import schedframe.scheduling.plugin.SchedulingPlugin;
47import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
48import schedframe.scheduling.plugin.grid.ModuleListImpl;
49import schedframe.scheduling.policy.AbstractManagementSystem;
50import schedframe.scheduling.queue.TaskQueueList;
51import schedframe.scheduling.tasks.AbstractProcesses;
52import schedframe.scheduling.tasks.Job;
53import schedframe.scheduling.tasks.JobInterface;
54import schedframe.scheduling.tasks.Task;
55import schedframe.scheduling.tasks.TaskInterface;
56import schedframe.scheduling.tasks.WorkloadUnit;
57import simulator.DCWormsConstants;
58import simulator.stats.GSSAccumulator;
59import simulator.utils.DoubleMath;
60import dcworms.schedframe.scheduling.ExecTask;
61import dcworms.schedframe.scheduling.Executable;
62import eduni.simjava.Sim_event;
63import eduni.simjava.Sim_system;
64import gridsim.dcworms.DCWormsTags;
65import gridsim.dcworms.filter.ExecTaskFilter;
66
67public class LocalManagementSystem extends AbstractManagementSystem {
68
69        private Log log = LogFactory.getLog(LocalManagementSystem.class);
70
71        protected double lastUpdateTime;
72
73        public LocalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
74                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
75                        throws Exception {
76
77                super(providerId, entityName, execTimeEstimationPlugin, queues);
78               
79                if (schedPlugin == null) {
80                        throw new Exception("Can not create local scheduling plugin instance.");
81                }
82                this.schedulingPlugin =  schedPlugin;
83                this.moduleList = new ModuleListImpl(1);
84        }
85
86        public void init(Scheduler sched, ManagedResources managedResources) {
87                super.init(sched, managedResources);
88        }
89
90        public void processEvent(Sim_event ev) {
91
92                updateProcessingProgress();
93
94                int tag = ev.get_tag();
95
96                switch (tag) {
97
98                case DCWormsTags.TIMER:
99                        if (pluginSupportsEvent(tag)) {
100                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER);
101                                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
102                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
103                                executeSchedulingPlan(decision);
104                        }
105                        sendTimerEvent();
106                        break;
107
108                case DCWormsTags.TASK_READY_FOR_EXECUTION:
109                        ExecTask execTask = (ExecTask) ev.get_data();
110                        try {
111                                execTask.setStatus(DCWormsTags.READY);
112                                if (pluginSupportsEvent(tag)) {
113                                        SchedulingEvent event = new StartTaskExecutionEvent(execTask.getJobId(), execTask.getId());
114                                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
115                                                        queues,  getJobRegistry(), getResourceManager(), moduleList);
116                                        executeSchedulingPlan(decision);
117                                }
118                        } catch (Exception e) {
119                                e.printStackTrace();
120                        }
121                        break;
122
123                case DCWormsTags.TASK_EXECUTION_FINISHED:
124                        execTask = (ExecTask) ev.get_data();
125                        if (execTask.getStatus() == DCWormsTags.INEXEC) {
126                               
127                                finalizeExecutable(execTask);
128                                sendFinishedWorkloadUnit(execTask);
129                                if (pluginSupportsEvent(tag)) {
130                                        SchedulingEvent event = new TaskFinishedEvent(execTask.getJobId(), execTask.getId());
131                                        SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
132                                                        queues, getJobRegistry(), getResourceManager(), moduleList);
133                                        executeSchedulingPlan(decision);
134                                }
135                        }
136
137                        Job job = jobRegistry.getJob(execTask.getJobId());
138                        if(!job.isFinished()){
139                                getWorkloadUnitHandler().handleJob(job);
140                        }
141                        //SUPPORTS PRECEDING CONSTRAINST COMING BOTH FROM SWF FILES
142                        /*else {
143                                try {
144                                        job.setStatus((int)BrokerConstants.JOB_STATUS_FINISHED);
145                                } catch (Exception e) {
146
147                                }
148                                for(JobInterface<?> j: jobRegistry.getJobs((int)BrokerConstants.JOB_STATUS_SUBMITTED)){
149                                        getWorkloadUnitHandler().handleJob(j); 
150                                }
151                        }*/
152                        break;
153                       
154                case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED:
155                        execTask = (Executable) ev.get_data();
156                        if (pluginSupportsEvent(tag)) {
157                                SchedulingEvent event = new TaskRequestedTimeExpiredEvent(execTask.getJobId(), execTask.getId());
158                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
159                                                queues, getJobRegistry(), getResourceManager(), moduleList);
160                                executeSchedulingPlan(decision);
161                        }
162                        break;
163                       
164                case DCWormsTags.TASK_PAUSE:{
165                        String[] ids = (String[]) ev.get_data();
166                        execTask = jobRegistry.getTask(ids[0], ids[1]);
167                        taskPause(execTask);
168                        if (pluginSupportsEvent(tag)) {
169                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_PAUSED);
170                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
171                                                queues, getJobRegistry(), getResourceManager(), moduleList);
172                                executeSchedulingPlan(decision);
173                        }
174                }
175                break;
176               
177                case DCWormsTags.TASK_RESUME:{
178                        String[] ids = (String[]) ev.get_data();
179                        execTask = jobRegistry.getTask(ids[0], ids[1]);
180                        taskResume(execTask, execTask.getAllocatedResources().getLast().getResourceUnits());
181                        if (pluginSupportsEvent(tag)) {
182                                SchedulingEvent event = new StartTaskExecutionEvent(ids[0], ids[1]);
183                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
184                                                queues, getJobRegistry(), getResourceManager(), moduleList);
185                                executeSchedulingPlan(decision);
186                        }
187                }
188                break;
189                       
190                case DCWormsTags.TASK_MOVE:{
191                        Object[] data = (Object[]) ev.get_data();
192                        execTask = jobRegistry.getTask((String)data[0], (String)data[1]);
193                        taskMove(execTask, (Map<ResourceUnitName, ResourceUnit>)data[2]);
194                        if (pluginSupportsEvent(tag)) {
195                                SchedulingEvent event = new StartTaskExecutionEvent((String)data[0], (String)data[1]);
196                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
197                                                queues, getJobRegistry(), getResourceManager(), moduleList);
198                                executeSchedulingPlan(decision);
199                        }
200                }
201                break;
202                       
203                case DCWormsTags.TASK_EXECUTION_CHANGED:
204                        execTask = (ExecTask) ev.get_data();
205                        updateTaskExecutionPhase(execTask, SchedulingEventType.RESOURCE_STATE_CHANGED);
206                        break;
207
208                case DCWormsTags.UPDATE_PROCESSING:
209                        updateProcessingTimes();
210                        break;
211                       
212                case DCWormsTags.POWER_LIMIT_EXCEEDED:
213                        if (pluginSupportsEvent(tag)) {
214                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.POWER_LIMIT_EXCEEDED);
215                                String source;
216                                try{
217                                        source = ev.get_data().toString();
218                                } catch(Exception e){
219                                        source = null;
220                                }
221                                event.setSource(source);
222                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
223                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
224                                executeSchedulingPlan(decision);
225                        }
226                        break;
227                }
228        }
229
230
231        public void taskPause(ExecTask execTask) {
232                if (execTask == null) {
233                        return;
234                } else {
235                        try {
236                                execTask.setStatus(DCWormsTags.PAUSED);
237                               
238                                Executable exec = (Executable) execTask;
239                                Map<ResourceUnitName, ResourceUnit> lastUsed = exec.getAllocatedResources().getLast().getResourceUnits();
240                                getAllocationManager().freeResources(lastUsed);
241                               
242                                saveExecutionHistory(exec, exec.getCompletionPercentage(), exec.getEstimatedDuration());
243                               
244                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), -1);
245                                scheduler.sim_cancel(filter, null);
246                               
247                                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
248                                updateComputingResources(peUnit, EnergyEventType.TASK_FINISHED, exec);
249                        } catch (Exception e) {
250                                // TODO Auto-generated catch block
251                                e.printStackTrace();
252                        }
253                }               
254        }
255       
256        public void taskResume(ExecTask execTask, Map<ResourceUnitName, ResourceUnit> resources) {
257                if (execTask == null) {
258                        return;
259                } else if (execTask.getStatus() == DCWormsTags.PAUSED) {
260                        try {
261                                execTask.setStatus(DCWormsTags.RESUMED);
262                                Executable exec = (Executable) execTask;
263
264                                boolean status = allocateResources(exec, resources);
265                                if(status == false){
266                                        TaskList newTasks = new TaskListImpl();
267                                        newTasks.add(exec);
268                                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
269                                        exec.setStatus(DCWormsTags.READY);
270                                } else {
271                                        runTask(execTask);
272                                       
273                                        PEUnit peUnit = (PEUnit)resources.get(StandardResourceUnitName.PE);
274                                        updateComputingResources(peUnit, EnergyEventType.TASK_STARTED, exec);
275                                }
276
277                        } catch (Exception e) {
278                                // TODO Auto-generated catch block
279                                e.printStackTrace();
280                        }
281                }                       
282        }
283       
284        public void taskMove(ExecTask execTask, Map<ResourceUnitName, ResourceUnit> map) {
285                taskPause(execTask);
286                taskResume(execTask, map);
287        }
288       
289        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
290                if (pluginSupportsEvent(DCWormsTags.TASK_EXECUTION_FINISHED)) {
291                        SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
292                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
293                                        queues, getJobRegistry(), getResourceManager(), moduleList);
294                        executeSchedulingPlan(decision);
295                }
296                //if(scheduler.getParent() != null){
297                        sendFinishedWorkloadUnit(wu);
298                //}
299        }
300       
301        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
302
303                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
304                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
305                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
306
307                        if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
308                                continue;
309                        }
310
311                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
312
313                        TaskInterface<?> task = taskDecision.getTask();
314                        for (int j = 0; j < allocations.size(); j++) {
315
316                                AllocationInterface<?> allocation = allocations.get(j);
317                                if (allocation.getRequestedResources() == null || allocation.getRequestedResources().size() > 0) {
318                                        ExecTask exec = (ExecTask) task;                                       
319                                        executeTask(exec, allocation.getRequestedResources());
320                                } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){
321                                        allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName()));
322                                        submitTask(task, allocation);
323                                } else {
324                                        ExecTask exec = (ExecTask) task;
325                                        executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), exec));
326                                }
327                        }
328                }
329        }
330
331        protected void executeTask(ExecTask task, Map<ResourceUnitName, ResourceUnit> choosenResources) {
332
333                Executable exec = (Executable)task;
334                boolean allocationStatus = allocateResources(exec, choosenResources);
335                if(allocationStatus == false){
336                        log.info("Task " + task.getJobId() + "_" + task.getId() + " requires more resources than is available at this moment.");
337                        return;
338                }
339
340                removeFromQueue(task);
341
342                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime());
343
344                runTask(exec);
345               
346                PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE);
347                updateComputingResources(peUnit, EnergyEventType.TASK_STARTED, exec);
348               
349
350                try {
351                        long expectedDuration = exec.getExpectedDuration().getMillis() / 1000;
352                        scheduler.sendInternal(expectedDuration, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
353                } catch (NoSuchFieldException e) {
354                        //double t = exec.getEstimatedDuration();
355                        //scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
356                }
357
358                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());           
359        }
360       
361        private void runTask(ExecTask execTask){
362                Executable exec = (Executable) execTask;
363                Map<ResourceUnitName, ResourceUnit> resources = exec.getAllocatedResources().getLast().getResourceUnits();
364               
365                try {
366                        exec.setStatus(DCWormsTags.INEXEC);
367                } catch (Exception e) {
368                        // TODO Auto-generated catch block
369                        e.printStackTrace();
370                }
371
372                int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION),
373                                execTask, resources, exec.getCompletionPercentage())).intValue();
374
375                saveExecutionHistory(exec, exec.getCompletionPercentage(), phaseDuration);
376                if(exec.getResourceConsumptionProfile().isLast()){
377                        scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);;
378                } else {
379                        scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
380                }
381               
382                PEUnit peUnit = (PEUnit)resources.get(StandardResourceUnitName.PE);
383                updateComputingResources(peUnit, EnergyEventType.TASK_STARTED, exec);
384               
385        }
386        protected void finalizeExecutable(ExecTask execTask){
387               
388                Executable exec = (Executable)execTask;
389                exec.finalizeExecutable();
390               
391                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED);
392                scheduler.sim_cancel(filter, null);
393               
394                Task task;
395                Job job = jobRegistry.getJob(exec.getJobId());
396                try {
397                        task = job.getTask(exec.getTaskId());
398                } catch (NoSuchFieldException e) {
399                        return;
400                }
401                if(exec.getProcessesId() == null){
402                        try {
403                                task.setStatus(exec.getStatus());
404                        } catch (Exception e) {
405                                e.printStackTrace();
406                        }
407                } else {
408                        List<AbstractProcesses> processesList = task.getProcesses();
409                        for(int i = 0; i < processesList.size(); i++){
410                                AbstractProcesses processes = processesList.get(i);
411                                if(processes.getId().equals(exec.getProcessesId())){
412                                        processes.setStatus(exec.getStatus());
413                                        break;
414                                }
415                        }
416                }
417
418                Map<ResourceUnitName, ResourceUnit> lastUsed = exec.getAllocatedResources().getLast().getResourceUnits();
419                getAllocationManager().freeResources(lastUsed);
420               
421                //TODO calculate the value of completion
422                saveExecutionHistory(exec, 100,0);
423               
424                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
425                updateComputingResources(peUnit, EnergyEventType.TASK_FINISHED, exec);
426               
427                log.debug(execTask.getJobId() + "_" + execTask.getId() + " finished execution on " + new DateTime());
428                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());
429        }
430       
431        protected void updateProcessingProgress() {
432
433                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
434                if (timeSpan <= 0.0) {
435                        // don't update when nothing changed
436                        return;
437                }
438                lastUpdateTime = Sim_system.clock();
439                Iterator<ExecTask> iter = jobRegistry.getRunningTasks().iterator();
440                while (iter.hasNext()) {
441                        ExecTask task = iter.next();
442                        Executable exec = (Executable)task;
443                        ExecutionHistoryItem execHistItem = exec.getExecHistory().getLast();
444                        //System.out.println("--- upadteProgressX: " + Sim_system.sim_clock() );
445                        //System.out.println("taskId: " + exec.getId() + "; completion percentage: " + exec.getCompletionPercentage() + "; timespan: " + timeSpan + "; estimatedDuration: " +  execHistItem.getCompletionPercentage());
446                        exec.setCompletionPercentage(exec.getCompletionPercentage() + 100 * (timeSpan / (execHistItem.getEstimatedDuration()) * (1.0 - execHistItem.getCompletionPercentage()/100.0)));
447                        //System.out.println("newProgress: " + exec.getCompletionPercentage()  );       
448
449                }
450        }
451       
452        private void updateComputingResources(PEUnit peUnit, EnergyEventType eventType, Object obj){
453                if(peUnit instanceof ProcessingElements){
454                        ProcessingElements pes = (ProcessingElements) peUnit;
455                        for (ComputingResource resource : pes) {
456                                resource.handleEvent(new EnergyEvent(eventType, obj, scheduler.getFullName()));
457                                //DataCenterWorkloadSimulator.getEventManager().sendToResources(resource.getType(), 0, new EnergyEvent(eventType, obj));
458                        }
459                        /*try {
460                                for (ComputingResource resource : resourceManager.getResourcesOfType(pes.get(0).getType())) {
461                                        resource.handleEvent(new EnergyEvent(eventType, obj));
462                                }
463                        } catch (ResourceException e) {
464                                // TODO Auto-generated catch block
465                                e.printStackTrace();
466                        }*/
467                } else {
468                        ComputingResource resource = null;
469                        try {
470                                resource = SimulatedEnvironment.getComputingResourceByName(peUnit.getResourceId());
471                        } catch (ResourceException e) {
472                                return;
473                        }
474                        resource.handleEvent(new EnergyEvent(eventType, obj, scheduler.getFullName()));
475                }
476        }
477
478        protected void updateProcessingTimes() {
479                for (ExecTask execTask : jobRegistry.getRunningTasks()) {
480                        Executable exec = (Executable)execTask;
481
482
483                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getAllocatedResources().getLast().getResourceUnits();
484
485                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED),
486                                        execTask, choosenResources, exec.getCompletionPercentage())).intValue();
487
488                        ExecutionHistoryItem execHistItem = exec.getExecHistory().getLast();
489                        double lastTimeStamp = execHistItem.getTimeStamp().getMillis()/1000;
490                        if(DoubleMath.subtract((lastTimeStamp + execHistItem.getEstimatedDuration()), (new DateTime().getMillis()/1000 + phaseDuration)) == 0.0){
491                                continue;
492                        }
493                        //System.out.println("=== upadteTIme: " + Sim_system.sim_clock() );
494                        //System.out.println("execId: " + exec.getId() +  "; estimatedDuration " + execHistItem.getEstimatedDuration());
495                        //System.out.println("execId: " + exec.getId() + "; difference " + DoubleMath.subtract((lastTimeStamp + execHistItem.getEstimatedDuration()), (new DateTime().getMillis()/1000 + phaseDuration)));
496                        //System.out.println("completionPercantage: " + exec.getCompletionPercentage() + "; basic duration: " +exec.getResourceConsumptionProfile().getCurrentResourceConsumption().getDuration() +   "; phaseDuration: " +  phaseDuration);
497
498                        saveExecutionHistory(exec, exec.getCompletionPercentage(), phaseDuration);
499                       
500                        if(exec.getResourceConsumptionProfile().isLast()){
501                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED);
502                                scheduler.sim_cancel(filter, null);
503                                scheduler.sendInternal(phaseDuration , DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
504                                //PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
505                                //notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
506                        } else{
507                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_CHANGED);
508                                scheduler.sim_cancel(filter, null);
509                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
510                                //PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
511                                //notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
512                        }
513                }
514        }       
515
516        protected void updateTaskExecutionPhase(ExecTask execTask, SchedulingEventType schedEvType) {
517
518                if (execTask.getStatus() == DCWormsTags.INEXEC) {
519                        Executable exec = (Executable)execTask;
520                       
521                        try {
522                                exec.setStatus(DCWormsTags.NEW_EXEC_PHASE);
523                        } catch (Exception e) {
524                               
525                        }
526
527                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getAllocatedResources().getLast().getResourceUnits();
528
529                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(schedEvType),
530                                        execTask, choosenResources, exec.getCompletionPercentage())).intValue();
531                       
532                        saveExecutionHistory(exec, exec.getCompletionPercentage(), phaseDuration);
533                       
534                        if(exec.getResourceConsumptionProfile().isLast()){
535                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
536                        } else {
537                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
538                        }
539                       
540                        PEUnit peUnit = (PEUnit)exec.getAllocatedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
541                        updateComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
542                }
543        }       
544
545        public double calculateTotalLoad() {
546
547                GSSAccumulator loadAcc = new GSSAccumulator();
548                try {
549                        for(ComputingResource compRes: getResourceManager().getResourcesOfType(StandardResourceType.Node)){
550                                loadAcc.add(compRes.getLoadInterface().getRecentUtilization().getValue());                             
551                        }
552                } catch (ResourceException e) {
553                        // TODO Auto-generated catch block
554                        e.printStackTrace();
555                }
556               
557                return loadAcc.getMean();
558        }
559
560        private Map<ResourceUnitName, ResourceUnit> chooseResourcesForExecution(String resourceName,
561                        ExecTask task) {
562
563                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>();
564                LocalResourceManager resourceManager = getResourceManager();
565                if(resourceName != null){
566                        ComputingResource resource = resourceManager.getResourceByName(resourceName);
567                        if(resource == null){
568                                return null;
569                        }
570                        resourceManager = new LocalResourceManager(resource);
571                }
572
573                int cpuRequest;
574                try {
575                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
576                } catch (NoSuchFieldException e) {
577                        cpuRequest = 1;
578                }
579
580                if (cpuRequest != 0) {
581                       
582                        List<ResourceUnit> availableUnits = null;
583                        try {
584                                availableUnits = resourceManager.getPE();
585                        } catch (ResourceException e) {
586                                return null;
587                        }
588                       
589                        List<ResourceUnit> choosenPEUnits = new ArrayList<ResourceUnit>();
590                        for (int i = 0; i < availableUnits.size() && cpuRequest > 0; i++) {
591                                PEUnit peUnit = (PEUnit) availableUnits.get(i);
592                                if(peUnit.getFreeAmount() > 0){
593                                        int allocPE = Math.min(peUnit.getFreeAmount(), cpuRequest);
594                                        cpuRequest = cpuRequest - allocPE;
595                                        choosenPEUnits.add(peUnit.replicate(allocPE)); 
596                                }       
597                        }
598                       
599                        if(cpuRequest > 0){
600                                return null;
601                        }
602                        map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0));
603                }
604
605                return  map;
606        }
607       
608        public void notifySubmittedWorkloadUnit(WorkloadUnit wu) {
609                //250314
610                //updateProcessingProgress();
611                if (log.isInfoEnabled())
612                        log.info("Received job " + wu.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
613                registerWorkloadUnit(wu);
614        }
615
616        private void registerWorkloadUnit(WorkloadUnit wu){
617                if(!wu.isRegistered()){
618                        wu.register(jobRegistry);
619                }
620                wu.accept(getWorkloadUnitHandler());
621        }
622       
623        class LocalWorkloadUnitHandler implements WorkloadUnitHandler{
624               
625                public void handleJob(JobInterface<?> job){
626
627                        if (log.isInfoEnabled())
628                                log.info("Handle " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
629                       
630                        try {
631                                if(job.getStatus() == BrokerConstants.JOB_STATUS_UNSUBMITTED)
632                                        job.setStatus((int)BrokerConstants.JOB_STATUS_SUBMITTED);
633                        } catch (Exception e) {
634                                e.printStackTrace();
635                        }
636                        List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
637                        jobsList.add(job);
638                        TaskListImpl availableTasks = new TaskListImpl();
639                        for(Task task: jobRegistry.getAvailableTasks(jobsList)){
640                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
641                                availableTasks.add(task);
642                        }
643
644                        for(TaskInterface<?> task: availableTasks){     
645                                registerWorkloadUnit(task);
646                        }
647                }
648               
649                public void handleTask(TaskInterface<?> t){
650                        Task task = (Task)t;
651                        List<AbstractProcesses> processes = task.getProcesses();
652
653                        if(processes == null || processes.size() == 0){
654                                Executable exec = new Executable(task);
655                                registerWorkloadUnit(exec);
656                        } else {
657                                for(int j = 0; j < processes.size(); j++){
658                                        AbstractProcesses procesesSet = processes.get(j);
659                                        Executable exec = new Executable(task, procesesSet);
660                                        registerWorkloadUnit(exec);
661                                }
662                        }
663                }
664               
665                public void handleExecutable(ExecTask task){
666                       
667                        Executable exec = (Executable) task;
668                        jobRegistry.addExecTask(exec);
669
670                        exec.setSchedulerName(scheduler.getFullName());
671                       
672                        TaskList newTasks = new TaskListImpl();
673                        newTasks.add(exec);
674               
675                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
676
677                        if (exec.getStatus() == DCWormsTags.QUEUED) {
678                                sendExecutableReadyEvent(exec);
679                        }
680                }
681        }
682
683        public WorkloadUnitHandler getWorkloadUnitHandler() {
684                return new LocalWorkloadUnitHandler();
685        }
686       
687       
688        public LocalResourceManager getResourceManager() {
689                if (resourceManager instanceof ResourceManager)
690                        return (LocalResourceManager) resourceManager;
691                else
692                        return null;
693        }
694       
695       
696        public void saveExecutionHistory(Executable exec, double completionPercentage, double estimatedDuration){
697
698                ExecutionHistoryItem execHistoryItem = new ExecutionHistoryItem(new DateTime());
699                execHistoryItem.setCompletionPercentage(completionPercentage);
700                execHistoryItem.setEstimatedDuration(estimatedDuration);
701                execHistoryItem.setResIndex(exec.getAllocatedResources().size() -1);
702                execHistoryItem.setStatus(exec.getStatus());
703                exec.addExecHistory(execHistoryItem);
704               
705                ProcessingElements pes = (ProcessingElements) exec.getAllocatedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
706                for (ComputingResource resource : pes) {
707
708                        LinkedList<ComputingResource> toExamine = new LinkedList<ComputingResource>();
709                        toExamine.push(resource);
710
711                        while (!toExamine.isEmpty()) {
712                                ComputingResource compResource = toExamine.pop();
713                                List<ComputingResource> resources = compResource.getChildren();
714                                if(resources.isEmpty()){
715                                        Set<String> visitedResources = exec.getAllocatedResources().getLast().getVisitedResources();
716                                        if(!visitedResources.contains(compResource.getFullName())){
717                                                exec.getAllocatedResources().getLast().trackResource(compResource.getFullName());
718                                        }
719                                } else {
720                                        for (int i = 0; i < resources.size(); i++) {
721                                                ComputingResource resourceChild = resources.get(i);
722                                                toExamine.addLast(resourceChild);
723                                        }
724                                }
725                        }
726                }
727        }
728       
729        public boolean allocateResources(Executable exec, Map<ResourceUnitName, ResourceUnit> choosenResources){
730                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources);
731                if(allocationStatus){
732                        ResourceHistoryItem resourceHistoryItem = new ResourceHistoryItem(choosenResources);
733                        exec.addAllocatedResources(resourceHistoryItem);
734                        return true;
735                }
736                return false;
737        }
738}
Note: See TracBrowser for help on using the repository browser.