source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/policy/local/LocalManagementSystem.java @ 1415

Revision 1415, 26.8 KB checked in by wojtekp, 11 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.local;
2
3import java.util.ArrayList;
4import java.util.HashMap;
5import java.util.Iterator;
6import java.util.List;
7import java.util.Map;
8
9import org.apache.commons.logging.Log;
10import org.apache.commons.logging.LogFactory;
11import org.joda.time.DateTime;
12import org.joda.time.DateTimeUtilsExt;
13import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
14
15import qcg.shared.constants.BrokerConstants;
16import schedframe.SimulatedEnvironment;
17import schedframe.events.scheduling.SchedulingEvent;
18import schedframe.events.scheduling.SchedulingEventType;
19import schedframe.events.scheduling.StartTaskExecutionEvent;
20import schedframe.events.scheduling.TaskFinishedEvent;
21import schedframe.events.scheduling.TaskPausedEvent;
22import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent;
23import schedframe.events.scheduling.TaskResumedEvent;
24import schedframe.exceptions.ResourceException;
25import schedframe.resources.StandardResourceType;
26import schedframe.resources.computing.ComputingResource;
27import schedframe.resources.computing.profiles.energy.ResourceEvent;
28import schedframe.resources.computing.profiles.energy.ResourceEventType;
29import schedframe.resources.units.PEUnit;
30import schedframe.resources.units.ProcessingElements;
31import schedframe.resources.units.ResourceUnit;
32import schedframe.resources.units.ResourceUnitName;
33import schedframe.resources.units.StandardResourceUnitName;
34import schedframe.scheduling.ExecutionHistoryItem;
35import schedframe.scheduling.ResourceItem;
36import schedframe.scheduling.Scheduler;
37import schedframe.scheduling.TaskList;
38import schedframe.scheduling.TaskListImpl;
39import schedframe.scheduling.WorkloadUnitHandler;
40import schedframe.scheduling.manager.resources.LocalResourceManager;
41import schedframe.scheduling.manager.resources.ManagedResources;
42import schedframe.scheduling.manager.resources.ResourceManager;
43import schedframe.scheduling.plan.AllocationInterface;
44import schedframe.scheduling.plan.ScheduledTaskInterface;
45import schedframe.scheduling.plan.SchedulingPlanInterface;
46import schedframe.scheduling.plugin.ModuleListImpl;
47import schedframe.scheduling.plugin.SchedulingPlugin;
48import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
49import schedframe.scheduling.policy.AbstractManagementSystem;
50import schedframe.scheduling.queue.TaskQueueList;
51import schedframe.scheduling.tasks.AbstractProcesses;
52import schedframe.scheduling.tasks.Job;
53import schedframe.scheduling.tasks.JobInterface;
54import schedframe.scheduling.tasks.Task;
55import schedframe.scheduling.tasks.TaskInterface;
56import schedframe.scheduling.tasks.WorkloadUnit;
57import simulator.DCWormsConstants;
58import simulator.stats.DCwormsAccumulator;
59import simulator.utils.DoubleMath;
60import dcworms.schedframe.scheduling.ExecTask;
61import dcworms.schedframe.scheduling.Executable;
62import eduni.simjava.Sim_event;
63import eduni.simjava.Sim_system;
64import gridsim.dcworms.DCWormsTags;
65import gridsim.dcworms.filter.ExecTaskFilter;
66
67public class LocalManagementSystem extends AbstractManagementSystem {
68
69        private Log log = LogFactory.getLog(LocalManagementSystem.class);
70
71        protected double lastUpdateTime;
72
73        public LocalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
74                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
75                        throws Exception {
76
77                super(providerId, entityName, execTimeEstimationPlugin, queues);
78               
79                if (schedPlugin == null) {
80                        throw new Exception("Can not create local scheduling plugin instance.");
81                }
82                this.schedulingPlugin =  schedPlugin;
83                this.moduleList = new ModuleListImpl(1);
84        }
85
86        public void init(Scheduler sched, ManagedResources managedResources) {
87                super.init(sched, managedResources);
88        }
89
90        public void processEvent(Sim_event ev) {
91
92                updateProcessingProgress();
93
94                int tag = ev.get_tag();
95
96                switch (tag) {
97
98                case DCWormsTags.TIMER:
99                        if (pluginSupportsEvent(tag)) {
100                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER);
101                                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
102                                                queues, jobRegistry, getResourceManager(), moduleList);
103                                executeSchedulingPlan(decision);
104                        }
105                        sendTimerEvent();
106                        break;
107
108                case DCWormsTags.TASK_READY_FOR_EXECUTION:
109                        ExecTask execTask = (ExecTask) ev.get_data();
110                        try {
111                                execTask.setStatus(DCWormsTags.READY);
112                                if (pluginSupportsEvent(tag)) {
113                                        SchedulingEvent event = new StartTaskExecutionEvent(execTask.getJobId(), execTask.getId());
114                                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
115                                                        queues, jobRegistry, getResourceManager(), moduleList);
116                                        executeSchedulingPlan(decision);
117                                }
118                        } catch (Exception e) {
119                                e.printStackTrace();
120                        }
121                        break;
122
123                case DCWormsTags.TASK_EXECUTION_FINISHED:
124                        execTask = (ExecTask) ev.get_data();
125                        if (execTask.getStatus() == DCWormsTags.INEXEC) {
126                               
127                                finalizeExecutable(execTask);
128                                sendFinishedWorkloadUnit(execTask);
129                                if (pluginSupportsEvent(tag)) {
130                                        SchedulingEvent event = new TaskFinishedEvent(execTask.getJobId(), execTask.getId());
131                                        SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
132                                                        queues, jobRegistry, getResourceManager(), moduleList);
133                                        executeSchedulingPlan(decision);
134                                }
135                        }
136
137                        Job job = jobRegistry.getJob(execTask.getJobId());
138                        if(!job.isFinished()){
139                                getWorkloadUnitHandler().handleJob(job);
140                        }
141                        //SUPPORTS PRECEDING CONSTRAINST COMING BOTH FROM SWF FILES
142                        /*else {
143                                try {
144                                        job.setStatus((int)BrokerConstants.JOB_STATUS_FINISHED);
145                                } catch (Exception e) {
146
147                                }
148                                for(JobInterface<?> j: jobRegistry.getJobs((int)BrokerConstants.JOB_STATUS_SUBMITTED)){
149                                        getWorkloadUnitHandler().handleJob(j); 
150                                }
151                        }*/
152                        break;
153                       
154                case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED:
155                        execTask = (Executable) ev.get_data();
156                        if (pluginSupportsEvent(tag)) {
157                                SchedulingEvent event = new TaskRequestedTimeExpiredEvent(execTask.getJobId(), execTask.getId());
158                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
159                                                queues, jobRegistry, getResourceManager(), moduleList);
160                                executeSchedulingPlan(decision);
161                        }
162                        break;
163                       
164                case DCWormsTags.TASK_PAUSE:{
165                        String[] ids = (String[]) ev.get_data();
166                        execTask = jobRegistry.getTask(ids[0], ids[1]);
167                        pauseTask(execTask);
168                        if (pluginSupportsEvent(tag)) {
169                                SchedulingEvent event = new TaskPausedEvent(ids[0], ids[1]);
170                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
171                                                queues, jobRegistry, getResourceManager(), moduleList);
172                                executeSchedulingPlan(decision);
173                        }
174                }
175                break;
176               
177                case DCWormsTags.TASK_RESUME:{
178                        String[] ids = (String[]) ev.get_data();
179                        execTask = jobRegistry.getTask(ids[0], ids[1]);
180                        resumeTask(execTask, execTask.getAllocatedResources().getLast().getResourceUnits(), true);
181                        if (pluginSupportsEvent(tag)) {
182                                SchedulingEvent event = new TaskResumedEvent(ids[0], ids[1]);
183                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
184                                                queues, jobRegistry, getResourceManager(), moduleList);
185                                executeSchedulingPlan(decision);
186                        }
187                }
188                break;
189               
190                case DCWormsTags.TASK_MIGRATE:{
191                        Object[] data = (Object[]) ev.get_data();
192                        execTask = jobRegistry.getTask((String)data[0], (String)data[1]);
193                        double migrationTime = execTimeEstimationPlugin.estimateMigrationTime(new StartTaskExecutionEvent((String)data[0],
194                                        (String)data[1]), execTask, execTask.getAllocatedResources().getLast().getResourceUnits(), (Map<ResourceUnitName, ResourceUnit>)data[2]);
195                        scheduler.sendInternal(migrationTime, DCWormsTags.TASK_MOVE, data);
196                }
197                break;
198               
199                case DCWormsTags.TASK_MOVE:{
200                        Object[] data = (Object[]) ev.get_data();
201                        execTask = jobRegistry.getTask((String)data[0], (String)data[1]);
202                        moveTask(execTask, (Map<ResourceUnitName, ResourceUnit>)data[2]);
203                        if (pluginSupportsEvent(tag)) {
204                                SchedulingEvent event = new StartTaskExecutionEvent((String)data[0], (String)data[1]);
205                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
206                                                queues, jobRegistry, getResourceManager(), moduleList);
207                                executeSchedulingPlan(decision);
208                        }
209                }
210                break;
211                       
212                case DCWormsTags.TASK_EXECUTION_CHANGED:
213                        execTask = (ExecTask) ev.get_data();
214                        updateTaskExecutionPhase(execTask);
215                        break;
216
217                case DCWormsTags.UPDATE_PROCESSING:
218                        updateProcessingTimes();
219                        break;
220                       
221                case DCWormsTags.POWER_LIMIT_EXCEEDED:
222                        if (pluginSupportsEvent(tag)) {
223                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.POWER_LIMIT_EXCEEDED);
224                                String source;
225                                try{
226                                        source = ev.get_data().toString();
227                                } catch(Exception e){
228                                        source = null;
229                                }
230                                event.setSource(source);
231                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
232                                                queues, jobRegistry, getResourceManager(), moduleList);
233                                executeSchedulingPlan(decision);
234                        }
235                        break;
236                default:               
237                       
238                        break;
239                }
240        }
241
242
243        protected void pauseTask(ExecTask execTask) {
244                if (execTask == null) {
245                        return;
246                } else {
247                        try {
248                                execTask.setStatus(DCWormsTags.PAUSED);
249                               
250                                Executable exec = (Executable) execTask;
251                                Map<ResourceUnitName, ResourceUnit> lastUsed = exec.getAllocatedResources().getLast().getResourceUnits();
252                                getAllocationManager().freeResources(lastUsed, true);
253                               
254                                saveExecutionHistory(exec, exec.getExecutionProfile().getCompletionPercentage(), exec.getEstimatedDuration());
255                               
256                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), -1);
257                                scheduler.sim_cancel(filter, null);
258                               
259                                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
260                                updateComputingResources(peUnit, ResourceEventType.TASK_FINISHED, exec);
261                        } catch (Exception e) {
262                                // TODO Auto-generated catch block
263                                e.printStackTrace();
264                        }
265                }               
266        }
267       
268        protected void resumeTask(ExecTask execTask, Map<ResourceUnitName, ResourceUnit> resources, boolean exclusive) {
269                if (execTask == null) {
270                        return;
271                } else if (execTask.getStatus() == DCWormsTags.PAUSED) {
272                        try {
273                                execTask.setStatus(DCWormsTags.RESUMED);
274                                Executable exec = (Executable) execTask;
275
276                                boolean status = allocateResources(exec, resources, exclusive);
277                                if(status == false){
278                                        TaskList newTasks = new TaskListImpl();
279                                        newTasks.add(exec);
280                                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
281                                        exec.setStatus(DCWormsTags.READY);
282                                } else {
283                                        runTask(execTask);
284                                       
285                                        PEUnit peUnit = (PEUnit)resources.get(StandardResourceUnitName.PE);
286                                        updateComputingResources(peUnit, ResourceEventType.TASK_STARTED, exec);
287                                }
288
289                        } catch (Exception e) {
290                                // TODO Auto-generated catch block
291                                e.printStackTrace();
292                        }
293                }                       
294        }
295       
296        protected void moveTask(ExecTask execTask, Map<ResourceUnitName, ResourceUnit> map) {
297                pauseTask(execTask);
298                resumeTask(execTask, map, false);
299        }
300       
301        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
302                if (pluginSupportsEvent(DCWormsTags.TASK_EXECUTION_FINISHED)) {
303                        SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
304                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
305                                        queues, jobRegistry, getResourceManager(), moduleList);
306                        executeSchedulingPlan(decision);
307                }
308                //if(scheduler.getParent() != null){
309                        sendFinishedWorkloadUnit(wu);
310                //}
311        }
312       
313        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
314
315                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
316                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
317                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
318
319                        if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
320                                continue;
321                        }
322
323                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
324
325                        TaskInterface<?> task = taskDecision.getTask();
326                        for (int j = 0; j < allocations.size(); j++) {
327
328                                AllocationInterface<?> allocation = allocations.get(j);
329                                if (allocation.getRequestedResources() == null || allocation.getRequestedResources().size() > 0) {
330                                        ExecTask exec = (ExecTask) task;                                       
331                                        executeTask(exec, allocation.getRequestedResources());
332                                } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){
333                                        allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName()));
334                                        submitTask(task, allocation);
335                                } else {
336                                        ExecTask exec = (ExecTask) task;
337                                        executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), exec));
338                                }
339                        }
340                }
341        }
342
343        protected void executeTask(ExecTask task, Map<ResourceUnitName, ResourceUnit> choosenResources) {
344
345                Executable exec = (Executable)task;
346                boolean allocationStatus = allocateResources(exec, choosenResources, false);
347                if(allocationStatus == false){
348                        log.info("Task " + task.getJobId() + "_" + task.getId() + " requires more resources than is available at this moment.");
349                        return;
350                }
351
352                removeFromQueue(task);
353
354                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime());
355
356                runTask(exec);
357               
358                PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE);
359                updateComputingResources(peUnit, ResourceEventType.UTILIZATION_CHANGED, exec);
360               
361
362                try {
363                        long expectedDuration = exec.getExpectedDuration().getMillis() / 1000;
364                        scheduler.sendInternal(expectedDuration, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
365                } catch (NoSuchFieldException e) {
366                        //double t = exec.getEstimatedDuration();
367                        //scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
368                }
369
370                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());           
371        }
372       
373        private void runTask(ExecTask execTask){
374                Executable exec = (Executable) execTask;
375                Map<ResourceUnitName, ResourceUnit> resources = exec.getAllocatedResources().getLast().getResourceUnits();
376               
377                try {
378                        exec.setStatus(DCWormsTags.INEXEC);
379                } catch (Exception e) {
380                        // TODO Auto-generated catch block
381                        e.printStackTrace();
382                }
383
384                int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION),
385                                execTask, resources, exec.getExecutionProfile().getCompletionPercentage())).intValue();
386
387                saveExecutionHistory(exec, exec.getExecutionProfile().getCompletionPercentage(), phaseDuration);
388                if(exec.getExecutionProfile().isLast()){
389                        scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);;
390                } else {
391                        scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
392                }
393               
394                PEUnit peUnit = (PEUnit)resources.get(StandardResourceUnitName.PE);
395                updateComputingResources(peUnit, ResourceEventType.TASK_STARTED, exec);
396               
397        }
398        protected void finalizeExecutable(ExecTask execTask){
399               
400                Executable exec = (Executable)execTask;
401                exec.finalizeExecutable();
402               
403                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED);
404                scheduler.sim_cancel(filter, null);
405               
406                Task task;
407                Job job = jobRegistry.getJob(exec.getJobId());
408                try {
409                        task = job.getTask(exec.getTaskId());
410                } catch (NoSuchFieldException e) {
411                        return;
412                }
413                if(exec.getProcessesId() == null){
414                        try {
415                                task.setStatus(exec.getStatus());
416                        } catch (Exception e) {
417                                e.printStackTrace();
418                        }
419                } else {
420                        List<AbstractProcesses> processesList = task.getProcesses();
421                        for(int i = 0; i < processesList.size(); i++){
422                                AbstractProcesses processes = processesList.get(i);
423                                if(processes.getId().equals(exec.getProcessesId())){
424                                        processes.setStatus(exec.getStatus());
425                                        break;
426                                }
427                        }
428                }
429
430                Map<ResourceUnitName, ResourceUnit> lastUsed = exec.getAllocatedResources().getLast().getResourceUnits();
431                getAllocationManager().freeResources(lastUsed, true);
432               
433                //TODO calculate the value of completion
434                saveExecutionHistory(exec, 100,0);
435               
436                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
437                updateComputingResources(peUnit, ResourceEventType.TASK_FINISHED, exec);
438               
439                log.debug(execTask.getJobId() + "_" + execTask.getId() + " finished execution on " + new DateTime());
440                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());
441               
442                updateComputingResources(peUnit, ResourceEventType.UTILIZATION_CHANGED, exec);
443        }
444       
445        protected void updateProcessingProgress() {
446
447                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
448                if (timeSpan <= 0.0) {
449                        // don't update when nothing changed
450                        return;
451                }
452                lastUpdateTime = Sim_system.clock();
453                Iterator<ExecTask> iter = jobRegistry.getRunningTasks().iterator();
454                while (iter.hasNext()) {
455                        ExecTask task = iter.next();
456                        Executable exec = (Executable)task;
457                        ExecutionHistoryItem execHistItem = exec.getExecutionHistory().getLast();
458                        //System.out.println("--- upadteProgressX: " + Sim_system.sim_clock() );
459                        //System.out.println("taskId: " + exec.getId() + "; completion percentage: " + exec.getCompletionPercentage() + "; timespan: " + timeSpan + "; estimatedDuration: " +  execHistItem.getCompletionPercentage());
460                        exec.getExecutionProfile().setCompletionPercentage(exec.getExecutionProfile().getCompletionPercentage() + 100 * (timeSpan / (execHistItem.getEstimatedDuration()) * (1.0 - execHistItem.getCompletionPercentage()/100.0)));
461                        exec.setTotalCompletionPercentage(exec.getTotalCompletionPercentage() + 100 * (timeSpan / execHistItem.getEstimatedDuration()) * exec.getExecutionProfile().getCurrentResourceConsumption().getLenght() / exec.getExecutionProfile().getLength());
462                        //System.out.println("newProgress: " + exec.getCompletionPercentage()  );       
463
464                }
465        }
466       
467        private void updateComputingResources(PEUnit peUnit, ResourceEventType eventType, Object obj){
468                if(peUnit instanceof ProcessingElements){
469                        ProcessingElements pes = (ProcessingElements) peUnit;
470                        for (ComputingResource resource : pes) {
471                                resource.handleEvent(new ResourceEvent(eventType, obj, scheduler.getFullName()));
472                                //DataCenterWorkloadSimulator.getEventManager().sendToResources(resource.getType(), 0, new EnergyEvent(eventType, obj));
473                        }
474                        /*try {
475                                for (ComputingResource resource : resourceManager.getResourcesOfType(pes.get(0).getType())) {
476                                        resource.handleEvent(new EnergyEvent(eventType, obj));
477                                }
478                        } catch (ResourceException e) {
479                                // TODO Auto-generated catch block
480                                e.printStackTrace();
481                        }*/
482                } else {
483                        ComputingResource resource = null;
484                        try {
485                                resource = SimulatedEnvironment.getComputingResourceByName(peUnit.getResourceId());
486                        } catch (ResourceException e) {
487                                return;
488                        }
489                        resource.handleEvent(new ResourceEvent(eventType, obj, scheduler.getFullName()));
490                }
491        }
492
493        protected void updateProcessingTimes() {
494                for (ExecTask execTask : jobRegistry.getRunningTasks()) {
495                        Executable exec = (Executable)execTask;
496
497
498                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getAllocatedResources().getLast().getResourceUnits();
499
500                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED),
501                                        execTask, choosenResources, exec.getExecutionProfile().getCompletionPercentage())).intValue();
502
503                        ExecutionHistoryItem execHistItem = exec.getExecutionHistory().getLast();
504                        double lastTimeStamp = execHistItem.getTimeStamp().getMillis() / 1000;
505                        if(DoubleMath.subtract((lastTimeStamp + execHistItem.getEstimatedDuration()), (new DateTime().getMillis() / 1000 + phaseDuration)) == 0.0){
506                                continue;
507                        }
508                        //System.out.println("=== upadteTIme: " + Sim_system.sim_clock() );
509                        //System.out.println("execId: " + exec.getId() +  "; estimatedDuration " + execHistItem.getEstimatedDuration());
510                        //System.out.println("execId: " + exec.getId() + "; difference " + DoubleMath.subtract((lastTimeStamp + execHistItem.getEstimatedDuration()), (new DateTime().getMillis()/1000 + phaseDuration)));
511                        //System.out.println("completionPercantage: " + exec.getCompletionPercentage() + "; basic duration: " +exec.getResourceConsumptionProfile().getCurrentResourceConsumption().getDuration() +   "; phaseDuration: " +  phaseDuration);
512
513                        saveExecutionHistory(exec, exec.getExecutionProfile().getCompletionPercentage(), phaseDuration);
514                       
515                        if(exec.getExecutionProfile().isLast()){
516                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED);
517                                scheduler.sim_cancel(filter, null);
518                                scheduler.sendInternal(phaseDuration , DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
519                                //PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
520                                //notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
521                        } else{
522                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_CHANGED);
523                                scheduler.sim_cancel(filter, null);
524                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
525                                //PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
526                                //notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
527                        }
528                }
529        }       
530
531        protected void updateTaskExecutionPhase(ExecTask execTask) {
532
533                if (execTask.getStatus() == DCWormsTags.INEXEC) {
534                        Executable exec = (Executable)execTask;
535                       
536                        try {
537                                exec.setStatus(DCWormsTags.NEW_EXEC_PHASE);
538                        } catch (Exception e) {
539                               
540                        }
541
542                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getAllocatedResources().getLast().getResourceUnits();
543
544                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.TASK_EXECUTION_CHANGED),
545                                        execTask, choosenResources, exec.getExecutionProfile().getCompletionPercentage())).intValue();
546                       
547                        saveExecutionHistory(exec, exec.getExecutionProfile().getCompletionPercentage(), phaseDuration);
548                       
549                        if(exec.getExecutionProfile().isLast()){
550                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
551                        } else {
552                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
553                        }
554                       
555                        PEUnit peUnit = (PEUnit)exec.getAllocatedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
556                        updateComputingResources(peUnit, ResourceEventType.UTILIZATION_CHANGED, exec);
557                }
558        }       
559
560        protected double calculateTotalLoad() {
561
562                DCwormsAccumulator loadAcc = new DCwormsAccumulator();
563
564                for(ComputingResource compRes: getResourceManager().getResourcesOfType(StandardResourceType.Node)){
565                        loadAcc.add(compRes.getLoadInterface().getRecentUtilization().getValue());                             
566                }
567
568                return loadAcc.getMean();
569        }
570
571        private Map<ResourceUnitName, ResourceUnit> chooseResourcesForExecution(String resourceName,
572                        ExecTask task) {
573
574                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>();
575                LocalResourceManager resourceManager = getResourceManager();
576                if(resourceName != null){
577                        ComputingResource resource = resourceManager.getResourceByName(resourceName);
578                        if(resource == null){
579                                return null;
580                        }
581                        resourceManager = new LocalResourceManager(resource);
582                }
583
584                int cpuRequest;
585                try {
586                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
587                } catch (NoSuchFieldException e) {
588                        cpuRequest = 1;
589                }
590
591                if (cpuRequest != 0) {
592                       
593                        List<ResourceUnit> availableUnits = null;
594                        try {
595                                availableUnits = resourceManager.getPE();
596                        } catch (ResourceException e) {
597                                return null;
598                        }
599                       
600                        List<ResourceUnit> choosenPEUnits = new ArrayList<ResourceUnit>();
601                        for (int i = 0; i < availableUnits.size() && cpuRequest > 0; i++) {
602                                PEUnit peUnit = (PEUnit) availableUnits.get(i);
603                                if(peUnit.getFreeAmount() > 0){
604                                        int allocPE = Math.min(peUnit.getFreeAmount(), cpuRequest);
605                                        cpuRequest = cpuRequest - allocPE;
606                                        choosenPEUnits.add(peUnit.replicate(allocPE)); 
607                                }       
608                        }
609                       
610                        if(cpuRequest > 0){
611                                return null;
612                        }
613                        map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0));
614                }
615
616                return  map;
617        }
618       
619        public void notifySubmittedWorkloadUnit(WorkloadUnit wu) {
620                //250314
621                //updateProcessingProgress();
622                if (log.isInfoEnabled())
623                        log.info("Received job " + wu.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
624                registerWorkloadUnit(wu);
625        }
626
627        private void registerWorkloadUnit(WorkloadUnit wu){
628                if(!wu.isRegistered()){
629                        wu.register(jobRegistry);
630                }
631                wu.accept(getWorkloadUnitHandler());
632        }
633       
634        class LocalWorkloadUnitHandler implements WorkloadUnitHandler{
635               
636                public void handleJob(JobInterface<?> job){
637
638                        if (log.isInfoEnabled())
639                                log.info("Handle " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
640                       
641                        try {
642                                if(job.getStatus() == BrokerConstants.JOB_STATUS_UNSUBMITTED)
643                                        job.setStatus((int)BrokerConstants.JOB_STATUS_SUBMITTED);
644                        } catch (Exception e) {
645                                e.printStackTrace();
646                        }
647                        List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
648                        jobsList.add(job);
649                        TaskListImpl availableTasks = new TaskListImpl();
650                        for(Task task: jobRegistry.getAvailableTasks(jobsList)){
651                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
652                                availableTasks.add(task);
653                        }
654
655                        for(TaskInterface<?> task: availableTasks){     
656                                registerWorkloadUnit(task);
657                        }
658                }
659               
660                public void handleTask(TaskInterface<?> t){
661                        Task task = (Task)t;
662                        List<AbstractProcesses> processes = task.getProcesses();
663
664                        if(processes == null || processes.size() == 0){
665                                Executable exec = new Executable(task);
666                                registerWorkloadUnit(exec);
667                        } else {
668                                for(int j = 0; j < processes.size(); j++){
669                                        AbstractProcesses procesesSet = processes.get(j);
670                                        Executable exec = new Executable(task, procesesSet);
671                                        registerWorkloadUnit(exec);
672                                }
673                        }
674                }
675               
676                public void handleExecutable(ExecTask task){
677                       
678                        Executable exec = (Executable) task;
679                        jobRegistry.addExecTask(exec);
680
681                        exec.setSchedulerName(scheduler.getFullName());
682                       
683                        TaskList newTasks = new TaskListImpl();
684                        newTasks.add(exec);
685               
686                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
687
688                        if (exec.getStatus() == DCWormsTags.QUEUED) {
689                                sendExecutableReadyEvent(exec);
690                        }
691                }
692        }
693
694        public WorkloadUnitHandler getWorkloadUnitHandler() {
695                return new LocalWorkloadUnitHandler();
696        }
697       
698       
699        public LocalResourceManager getResourceManager() {
700                if (resourceManager instanceof ResourceManager)
701                        return (LocalResourceManager) resourceManager;
702                else
703                        return null;
704        }
705       
706       
707        private void saveExecutionHistory(Executable exec, double completionPercentage, double estimatedDuration){
708                ExecutionHistoryItem execHistoryItem = new ExecutionHistoryItem(new DateTime());
709                execHistoryItem.setCompletionPercentage(completionPercentage);
710                execHistoryItem.setEstimatedDuration(estimatedDuration);
711                execHistoryItem.setResIndex(exec.getAllocatedResources().size() -1);
712                execHistoryItem.setStatus(exec.getStatus());
713                exec.addExecHistory(execHistoryItem);
714        }
715       
716        public boolean allocateResources(Executable exec, Map<ResourceUnitName, ResourceUnit> choosenResources, boolean exclusive){
717                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources, exclusive);
718                if(allocationStatus){
719                        ResourceItem resourceHistoryItem = new ResourceItem(choosenResources);
720                        exec.addAllocatedResources(resourceHistoryItem);
721                        return true;
722                }
723                return false;
724        }
725       
726
727       
728}
Note: See TracBrowser for help on using the repository browser.