source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/policy/local/LocalManagementSystem.java @ 1427

Revision 1427, 27.6 KB checked in by wojtekp, 11 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.local;
2
3import java.util.ArrayList;
4import java.util.HashMap;
5import java.util.Iterator;
6import java.util.LinkedHashSet;
7import java.util.List;
8import java.util.Map;
9import java.util.Set;
10
11import org.apache.commons.logging.Log;
12import org.apache.commons.logging.LogFactory;
13import org.joda.time.DateTime;
14import org.joda.time.DateTimeUtilsExt;
15import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
16
17import qcg.shared.constants.BrokerConstants;
18import schedframe.SimulatedEnvironment;
19import schedframe.events.scheduling.SchedulingEvent;
20import schedframe.events.scheduling.SchedulingEventType;
21import schedframe.events.scheduling.StartTaskExecutionEvent;
22import schedframe.events.scheduling.TaskFinishedEvent;
23import schedframe.events.scheduling.TaskPausedEvent;
24import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent;
25import schedframe.events.scheduling.TaskResumedEvent;
26import schedframe.exceptions.ResourceException;
27import schedframe.resources.StandardResourceType;
28import schedframe.resources.computing.ComputingResource;
29import schedframe.resources.computing.profiles.energy.ResourceEvent;
30import schedframe.resources.computing.profiles.energy.ResourceEventType;
31import schedframe.resources.units.PEUnit;
32import schedframe.resources.units.ProcessingElements;
33import schedframe.resources.units.ResourceUnit;
34import schedframe.resources.units.ResourceUnitName;
35import schedframe.resources.units.StandardResourceUnitName;
36import schedframe.scheduling.ExecutionHistoryItem;
37import schedframe.scheduling.ResourceItem;
38import schedframe.scheduling.Scheduler;
39import schedframe.scheduling.TaskList;
40import schedframe.scheduling.TaskListImpl;
41import schedframe.scheduling.WorkloadUnitHandler;
42import schedframe.scheduling.manager.resources.LocalResourceManager;
43import schedframe.scheduling.manager.resources.ManagedResources;
44import schedframe.scheduling.manager.resources.ResourceManager;
45import schedframe.scheduling.plan.AllocationInterface;
46import schedframe.scheduling.plan.ScheduledTaskInterface;
47import schedframe.scheduling.plan.SchedulingPlanInterface;
48import schedframe.scheduling.plugin.ModuleListImpl;
49import schedframe.scheduling.plugin.SchedulingPlugin;
50import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
51import schedframe.scheduling.policy.AbstractManagementSystem;
52import schedframe.scheduling.queue.TaskQueueList;
53import schedframe.scheduling.tasks.AbstractProcesses;
54import schedframe.scheduling.tasks.Job;
55import schedframe.scheduling.tasks.JobInterface;
56import schedframe.scheduling.tasks.Task;
57import schedframe.scheduling.tasks.TaskInterface;
58import schedframe.scheduling.tasks.WorkloadUnit;
59import simulator.DCWormsConstants;
60import simulator.stats.DCwormsAccumulator;
61import simulator.utils.DoubleMath;
62import dcworms.schedframe.scheduling.ExecTask;
63import dcworms.schedframe.scheduling.Executable;
64import eduni.simjava.Sim_event;
65import eduni.simjava.Sim_system;
66import gridsim.dcworms.DCWormsTags;
67import gridsim.dcworms.filter.ExecTaskFilter;
68
69public class LocalManagementSystem extends AbstractManagementSystem {
70
71        private Log log = LogFactory.getLog(LocalManagementSystem.class);
72
73        protected double lastUpdateTime;
74
75        public LocalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
76                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
77                        throws Exception {
78
79                super(providerId, entityName, execTimeEstimationPlugin, queues);
80               
81                if (schedPlugin == null) {
82                        throw new Exception("Can not create local scheduling plugin instance.");
83                }
84                this.schedulingPlugin =  schedPlugin;
85                this.moduleList = new ModuleListImpl(1);
86        }
87
88        public void init(Scheduler sched, ManagedResources managedResources) {
89                super.init(sched, managedResources);
90        }
91
92        public void processEvent(Sim_event ev) {
93
94                updateProcessingProgress();
95
96                int tag = ev.get_tag();
97                switch (tag) {
98
99                case DCWormsTags.TIMER:
100                        if (pluginSupportsEvent(tag)) {
101                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER);
102                                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
103                                                queues, jobRegistry, getResourceManager(), moduleList);
104                                executeSchedulingPlan(decision);
105                        }
106                        sendTimerEvent();
107                        break;
108
109                case DCWormsTags.TASK_READY_FOR_EXECUTION:
110                        ExecTask execTask = (ExecTask) ev.get_data();
111                        try {
112                                execTask.setStatus(DCWormsTags.READY);
113                                if (pluginSupportsEvent(tag)) {
114                                        SchedulingEvent event = new StartTaskExecutionEvent(execTask.getJobId(), execTask.getId());
115                                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
116                                                        queues, jobRegistry, getResourceManager(), moduleList);
117                                        executeSchedulingPlan(decision);
118                                }
119                        } catch (Exception e) {
120                                e.printStackTrace();
121                        }
122                        break;
123
124                case DCWormsTags.TASK_EXECUTION_FINISHED:
125                        execTask = (ExecTask) ev.get_data();
126                        if (execTask.getStatus() == DCWormsTags.INEXEC) {
127                               
128                                finalizeExecutable(execTask);
129                                sendFinishedWorkloadUnit(execTask);
130                                if (pluginSupportsEvent(tag)) {
131                                        SchedulingEvent event = new TaskFinishedEvent(execTask.getJobId(), execTask.getId());
132                                        SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
133                                                        queues, jobRegistry, getResourceManager(), moduleList);
134                                        executeSchedulingPlan(decision);
135                                }
136                        }
137
138                        Job job = jobRegistry.getJob(execTask.getJobId());
139                        if(!job.isFinished()){
140                                getWorkloadUnitHandler().handleJob(job);
141                        }
142                        //SUPPORTS PRECEDING CONSTRAINST COMING ALSO FROM SWF FILES
143                        /*else {
144                                try {
145                                        job.setStatus((int)BrokerConstants.JOB_STATUS_FINISHED);
146                                } catch (Exception e) {
147
148                                }
149                                for(JobInterface<?> j: jobRegistry.getJobs((int)BrokerConstants.JOB_STATUS_SUBMITTED)){
150                                        getWorkloadUnitHandler().handleJob(j); 
151                                }
152                        }*/
153                        break;
154                       
155                case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED:
156                        execTask = (Executable) ev.get_data();
157                        if (pluginSupportsEvent(tag)) {
158                                SchedulingEvent event = new TaskRequestedTimeExpiredEvent(execTask.getJobId(), execTask.getId());
159                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
160                                                queues, jobRegistry, getResourceManager(), moduleList);
161                                executeSchedulingPlan(decision);
162                        }
163                        break;
164                       
165                case DCWormsTags.TASK_PAUSE:{
166                        String[] ids = (String[]) ev.get_data();
167                        execTask = jobRegistry.getTask(ids[0], ids[1]);
168                        pauseTask(execTask);
169                        if (pluginSupportsEvent(tag)) {
170                                SchedulingEvent event = new TaskPausedEvent(ids[0], ids[1]);
171                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
172                                                queues, jobRegistry, getResourceManager(), moduleList);
173                                executeSchedulingPlan(decision);
174                        }
175                }
176                break;
177               
178                case DCWormsTags.TASK_RESUME:{
179                        String[] ids = (String[]) ev.get_data();
180                        execTask = jobRegistry.getTask(ids[0], ids[1]);
181                        resumeTask(execTask, execTask.getAllocatedResources().getLast().getResourceUnits(), true);
182                        if (pluginSupportsEvent(tag)) {
183                                SchedulingEvent event = new TaskResumedEvent(ids[0], ids[1]);
184                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
185                                                queues, jobRegistry, getResourceManager(), moduleList);
186                                executeSchedulingPlan(decision);
187                        }
188                }
189                break;
190               
191                case DCWormsTags.TASK_MIGRATE:{
192                        Object[] data = (Object[]) ev.get_data();
193                        execTask = jobRegistry.getTask((String)data[0], (String)data[1]);
194                        double migrationTime = execTimeEstimationPlugin.estimateMigrationTime(new StartTaskExecutionEvent((String)data[0],
195                                        (String)data[1]), execTask, execTask.getAllocatedResources().getLast().getResourceUnits(), (Map<ResourceUnitName, ResourceUnit>)data[2]);
196                        scheduler.sendInternal(migrationTime, DCWormsTags.TASK_MOVE, data);
197                }
198                break;
199               
200                case DCWormsTags.TASK_MOVE:{
201                        Object[] data = (Object[]) ev.get_data();
202                        execTask = jobRegistry.getTask((String)data[0], (String)data[1]);
203                        moveTask(execTask, (Map<ResourceUnitName, ResourceUnit>)data[2]);
204                        if (pluginSupportsEvent(tag)) {
205                                SchedulingEvent event = new StartTaskExecutionEvent((String)data[0], (String)data[1]);
206                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
207                                                queues, jobRegistry, getResourceManager(), moduleList);
208                                executeSchedulingPlan(decision);
209                        }
210                }
211                break;
212                       
213                case DCWormsTags.TASK_EXECUTION_CHANGED:
214                        execTask = (ExecTask) ev.get_data();
215                        updateTaskExecutionPhase(execTask);
216                        break;
217
218                case DCWormsTags.UPDATE_PROCESSING:
219                        updateProcessingTimes();
220                        break;
221                       
222                case DCWormsTags.POWER_LIMIT_EXCEEDED:
223                        if (pluginSupportsEvent(tag)) {
224                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.POWER_LIMIT_EXCEEDED);
225                                String source;
226                                try{
227                                        source = ev.get_data().toString();
228                                } catch(Exception e){
229                                        source = null;
230                                }
231                                event.setSource(source);
232                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
233                                                queues, jobRegistry, getResourceManager(), moduleList);
234                                executeSchedulingPlan(decision);
235                        }
236                        break;
237                default:               
238                       
239                        break;
240                }
241        }
242
243
244        protected void pauseTask(ExecTask execTask) {
245                if (execTask == null) {
246                        return;
247                } else {
248                        try {
249                                execTask.setStatus(DCWormsTags.PAUSED);
250                               
251                                Executable exec = (Executable) execTask;
252                                Map<ResourceUnitName, ResourceUnit> lastUsed = exec.getAllocatedResources().getLast().getResourceUnits();
253                                getAllocationManager().freeResources(lastUsed, true);
254                               
255                                saveExecutionHistory(exec, exec.getExecutionProfile().getCompletionPercentage(), exec.getEstimatedDuration());
256                               
257                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), -1);
258                                scheduler.sim_cancel(filter, null);
259                               
260                                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
261                                updateComputingResources(peUnit, ResourceEventType.TASK_FINISHED, exec);
262                        } catch (Exception e) {
263                                // TODO Auto-generated catch block
264                                e.printStackTrace();
265                        }
266                }               
267        }
268       
269        protected void resumeTask(ExecTask execTask, Map<ResourceUnitName, ResourceUnit> resources, boolean exclusive) {
270                if (execTask == null) {
271                        return;
272                } else if (execTask.getStatus() == DCWormsTags.PAUSED) {
273                        try {
274                                execTask.setStatus(DCWormsTags.RESUMED);
275                                Executable exec = (Executable) execTask;
276
277                                boolean status = allocateResources(exec, resources, exclusive);
278                                if(status == false){
279                                        TaskList newTasks = new TaskListImpl();
280                                        newTasks.add(exec);
281                                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
282                                        exec.setStatus(DCWormsTags.READY);
283                                } else {
284                                        runTask(execTask);
285                                       
286                                        PEUnit peUnit = (PEUnit)resources.get(StandardResourceUnitName.PE);
287                                        updateComputingResources(peUnit, ResourceEventType.TASK_STARTED, exec);
288                                }
289
290                        } catch (Exception e) {
291                                // TODO Auto-generated catch block
292                                e.printStackTrace();
293                        }
294                }                       
295        }
296       
297        protected void moveTask(ExecTask execTask, Map<ResourceUnitName, ResourceUnit> map) {
298                pauseTask(execTask);
299                resumeTask(execTask, map, false);
300        }
301       
302        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
303                if (pluginSupportsEvent(DCWormsTags.TASK_EXECUTION_FINISHED)) {
304                        SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
305                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
306                                        queues, jobRegistry, getResourceManager(), moduleList);
307                        executeSchedulingPlan(decision);
308                }
309                //if(scheduler.getParent() != null){
310                        sendFinishedWorkloadUnit(wu);
311                //}
312        }
313       
314        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
315
316                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
317                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
318                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
319
320                        if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
321                                continue;
322                        }
323
324                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
325
326                        TaskInterface<?> task = taskDecision.getTask();
327                        for (int j = 0; j < allocations.size(); j++) {
328
329                                AllocationInterface<?> allocation = allocations.get(j);
330                                if (allocation.getRequestedResources() == null || allocation.getRequestedResources().size() > 0) {
331                                        ExecTask exec = (ExecTask) task;                                       
332                                        executeTask(exec, allocation.getRequestedResources());
333                                } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){
334                                        allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName()));
335                                        submitTask(task, allocation);
336                                } else {
337                                        ExecTask exec = (ExecTask) task;
338                                        executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), exec));
339                                }
340                        }
341                }
342        }
343
344        protected void executeTask(ExecTask task, Map<ResourceUnitName, ResourceUnit> choosenResources) {
345
346                Executable exec = (Executable)task;
347                boolean allocationStatus = allocateResources(exec, choosenResources, false);
348                if(allocationStatus == false){
349                        log.info("Task " + task.getJobId() + "_" + task.getId() + " requires more resources than is available at this moment.");
350                        return;
351                }
352
353                removeFromQueue(task);
354
355                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime());
356
357                runTask(exec);
358               
359                PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE);
360                updateComputingResources(peUnit, ResourceEventType.UTILIZATION_CHANGED, exec);
361               
362
363                try {
364                        long expectedDuration = exec.getExpectedDuration().getMillis() / 1000;
365                        scheduler.sendInternal(expectedDuration, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
366                } catch (NoSuchFieldException e) {
367                        //double t = exec.getEstimatedDuration();
368                        //scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
369                }
370
371                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());           
372        }
373       
374        private void runTask(ExecTask execTask){
375                Executable exec = (Executable) execTask;
376                Map<ResourceUnitName, ResourceUnit> resources = exec.getAllocatedResources().getLast().getResourceUnits();
377               
378                try {
379                        exec.setStatus(DCWormsTags.INEXEC);
380                } catch (Exception e) {
381                        // TODO Auto-generated catch block
382                        e.printStackTrace();
383                }
384
385                int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION),
386                                execTask, resources, exec.getExecutionProfile().getCompletionPercentage())).intValue();
387
388                saveExecutionHistory(exec, exec.getExecutionProfile().getCompletionPercentage(), phaseDuration);
389                if(exec.getExecutionProfile().isLast()){
390                        scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);;
391                } else {
392                        scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
393                }
394               
395                PEUnit peUnit = (PEUnit)resources.get(StandardResourceUnitName.PE);
396                updateComputingResources(peUnit, ResourceEventType.TASK_STARTED, exec);
397        }
398
399        protected void finalizeExecutable(ExecTask execTask){
400               
401                Executable exec = (Executable)execTask;
402                exec.finalizeExecutable();
403               
404                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED);
405                scheduler.sim_cancel(filter, null);
406               
407                Task task;
408                Job job = jobRegistry.getJob(exec.getJobId());
409                try {
410                        task = job.getTask(exec.getTaskId());
411                } catch (NoSuchFieldException e) {
412                        return;
413                }
414                if(exec.getProcessesId() == null){
415                        try {
416                                task.setStatus(exec.getStatus());
417                        } catch (Exception e) {
418                                e.printStackTrace();
419                        }
420                } else {
421                        List<AbstractProcesses> processesList = task.getProcesses();
422                        for(int i = 0; i < processesList.size(); i++){
423                                AbstractProcesses processes = processesList.get(i);
424                                if(processes.getId().equals(exec.getProcessesId())){
425                                        processes.setStatus(exec.getStatus());
426                                        break;
427                                }
428                        }
429                }
430
431                Map<ResourceUnitName, ResourceUnit> lastUsed = exec.getAllocatedResources().getLast().getResourceUnits();
432                getAllocationManager().freeResources(lastUsed, true);
433               
434                //TODO calculate the value of completion
435                saveExecutionHistory(exec, 100,0);
436               
437                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
438                updateComputingResources(peUnit, ResourceEventType.TASK_FINISHED, exec);
439               
440                log.debug(execTask.getJobId() + "_" + execTask.getId() + " finished execution on " + new DateTime());
441                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());
442               
443                updateComputingResources(peUnit, ResourceEventType.UTILIZATION_CHANGED, exec);
444        }
445       
446        protected void updateProcessingProgress() {
447
448                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
449                if (timeSpan <= 0.0) {
450                        // don't update when nothing changed
451                        return;
452                }
453                lastUpdateTime = Sim_system.clock();
454                Iterator<ExecTask> iter = jobRegistry.getRunningTasks().iterator();
455                while (iter.hasNext()) {
456                        ExecTask task = iter.next();
457                        Executable exec = (Executable)task;
458                        ExecutionHistoryItem execHistItem = exec.getExecutionHistory().getLast();
459                        //System.out.println("--- upadteProgressX: " + Sim_system.sim_clock() );
460                        //System.out.println("taskId: " + exec.getId() + "; completion percentage: " + exec.getCompletionPercentage() + "; timespan: " + timeSpan + "; estimatedDuration: " +  execHistItem.getCompletionPercentage());
461                        exec.getExecutionProfile().setCompletionPercentage(exec.getExecutionProfile().getCompletionPercentage() + 100 * (timeSpan / (execHistItem.getEstimatedDuration()) * (1.0 - execHistItem.getCompletionPercentage()/100.0)));
462                        exec.setTotalCompletionPercentage(exec.getTotalCompletionPercentage() + 100 * (timeSpan / execHistItem.getEstimatedDuration()) * exec.getExecutionProfile().getCurrentExecutionPhase().getLenght() / exec.getExecutionProfile().getLength());
463                        //System.out.println("newProgress: " + exec.getCompletionPercentage()  );       
464
465                }
466        }
467       
468        private void updateComputingResources(PEUnit peUnit, ResourceEventType eventType, Object obj){
469                if(peUnit instanceof ProcessingElements){
470                        /*ProcessingElements pes = (ProcessingElements) peUnit;
471                        for (ComputingResource resource : pes) {
472                                resource.handleEvent(new ResourceEvent(eventType, obj, scheduler.getFullName()));
473                                //DataCenterWorkloadSimulator.getEventManager().sendToResources(resource.getType(), 0, new EnergyEvent(eventType, obj));
474                        }*/
475                        /*try {
476                                for (ComputingResource resource : resourceManager.getResourcesOfType(pes.get(0).getType())) {
477                                        resource.handleEvent(new EnergyEvent(eventType, obj));
478                                }
479                        } catch (ResourceException e) {
480                                // TODO Auto-generated catch block
481                                e.printStackTrace();
482                        }*/
483                       
484                        ProcessingElements pes = (ProcessingElements) peUnit;
485                        ResourceEvent resEvent = new ResourceEvent(eventType, obj, scheduler.getFullName());           
486                        try{
487                                Set<ComputingResource> resourcesToUpdate = new LinkedHashSet<ComputingResource>();
488                       
489                                for (ComputingResource compResource : pes) {
490                                        resourcesToUpdate.add(compResource);
491                                }
492                                while(resourcesToUpdate.size() > 0){
493                                        Set<ComputingResource> newSet = new LinkedHashSet<ComputingResource>();
494                                        for(ComputingResource compResource: resourcesToUpdate){
495                                                compResource.updateState(resEvent);
496                                                if(compResource.getParent() != null){
497                                                        newSet.add(compResource.getParent());
498                                                }
499                                        }
500                                        resourcesToUpdate = new LinkedHashSet<ComputingResource>(newSet);
501                                }
502                        }catch(Exception e){
503                        }
504                } else {
505                        ComputingResource resource = null;
506                        try {
507                                resource = SimulatedEnvironment.getComputingResourceByName(peUnit.getResourceId());
508                        } catch (ResourceException e) {
509                                return;
510                        }
511                        resource.handleEvent(new ResourceEvent(eventType, obj, scheduler.getFullName()));
512                }
513        }
514
515        protected void updateProcessingTimes() {
516                for (ExecTask execTask: jobRegistry.getRunningTasks()) {
517                        Executable exec = (Executable)execTask;
518
519                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getAllocatedResources().getLast().getResourceUnits();
520
521                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED),
522                                        execTask, choosenResources, exec.getExecutionProfile().getCompletionPercentage())).intValue();
523
524                        ExecutionHistoryItem execHistItem = exec.getExecutionHistory().getLast();
525                        double lastTimeStamp = execHistItem.getTimeStamp() / 1000;
526                        if(DoubleMath.subtract((lastTimeStamp + execHistItem.getEstimatedDuration()), (new DateTime().getMillis() / 1000 + phaseDuration)) == 0.0){
527                                continue;
528                        }
529                        //System.out.println("=== upadteTIme: " + Sim_system.sim_clock() );
530                        //System.out.println("execId: " + exec.getId() +  "; estimatedDuration " + execHistItem.getEstimatedDuration());
531                        //System.out.println("execId: " + exec.getId() + "; difference " + DoubleMath.subtract((lastTimeStamp + execHistItem.getEstimatedDuration()), (new DateTime().getMillis()/1000 + phaseDuration)));
532                        //System.out.println("completionPercantage: " + exec.getCompletionPercentage() + "; basic duration: " +exec.getResourceConsumptionProfile().getCurrentResourceConsumption().getDuration() +   "; phaseDuration: " +  phaseDuration);
533
534                        saveExecutionHistory(exec, exec.getExecutionProfile().getCompletionPercentage(), phaseDuration);
535                       
536                        if(exec.getExecutionProfile().isLast()){
537                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED);
538                                scheduler.sim_cancel(filter, null);
539                                scheduler.sendInternal(phaseDuration , DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
540                                //PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
541                                //notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
542                        } else{
543                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_CHANGED);
544                                scheduler.sim_cancel(filter, null);
545                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
546                                //PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
547                                //notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
548                        }
549                }
550        }       
551
552        protected void updateTaskExecutionPhase(ExecTask execTask) {
553
554                if (execTask.getStatus() == DCWormsTags.INEXEC) {
555                        Executable exec = (Executable)execTask;
556                       
557                        try {
558                                exec.setStatus(DCWormsTags.NEW_EXEC_PHASE);
559                        } catch (Exception e) {
560                               
561                        }
562
563                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getAllocatedResources().getLast().getResourceUnits();
564
565                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.TASK_EXECUTION_CHANGED),
566                                        execTask, choosenResources, exec.getExecutionProfile().getCompletionPercentage())).intValue();
567                       
568                        saveExecutionHistory(exec, exec.getExecutionProfile().getCompletionPercentage(), phaseDuration);
569                       
570                        if(exec.getExecutionProfile().isLast()){
571                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
572                        } else {
573                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
574                        }
575                       
576                        PEUnit peUnit = (PEUnit)exec.getAllocatedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
577                        updateComputingResources(peUnit, ResourceEventType.UTILIZATION_CHANGED, exec);
578                }
579        }       
580
581        protected double calculateTotalLoad() {
582
583                DCwormsAccumulator loadAcc = new DCwormsAccumulator();
584
585                for(ComputingResource compRes: getResourceManager().getResourcesOfType(StandardResourceType.Node)){
586                        loadAcc.add(compRes.getLoadInterface().getRecentUtilization().getValue());                             
587                }
588
589                return loadAcc.getMean();
590        }
591
592        private Map<ResourceUnitName, ResourceUnit> chooseResourcesForExecution(String resourceName,
593                        ExecTask task) {
594
595                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>();
596                LocalResourceManager resourceManager = getResourceManager();
597                if(resourceName != null){
598                        ComputingResource resource = resourceManager.getResourceByName(resourceName);
599                        if(resource == null){
600                                return null;
601                        }
602                        resourceManager = new LocalResourceManager(resource);
603                }
604
605                int cpuRequest;
606                try {
607                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
608                } catch (NoSuchFieldException e) {
609                        cpuRequest = 1;
610                }
611
612                if (cpuRequest != 0) {
613                       
614                        List<ResourceUnit> availableUnits = null;
615                        try {
616                                availableUnits = resourceManager.getPE();
617                        } catch (ResourceException e) {
618                                return null;
619                        }
620                       
621                        List<ResourceUnit> choosenPEUnits = new ArrayList<ResourceUnit>();
622                        for (int i = 0; i < availableUnits.size() && cpuRequest > 0; i++) {
623                                PEUnit peUnit = (PEUnit) availableUnits.get(i);
624                                if(peUnit.getFreeAmount() > 0){
625                                        int allocPE = Math.min(peUnit.getFreeAmount(), cpuRequest);
626                                        cpuRequest = cpuRequest - allocPE;
627                                        choosenPEUnits.add(peUnit.replicate(allocPE)); 
628                                }       
629                        }
630                       
631                        if(cpuRequest > 0){
632                                return null;
633                        }
634                        map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0));
635                }
636
637                return  map;
638        }
639       
640        public void notifySubmittedWorkloadUnit(WorkloadUnit wu) {
641                //250314
642                //updateProcessingProgress();
643                if (log.isInfoEnabled())
644                        log.info("Received job " + wu.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
645                registerWorkloadUnit(wu);
646        }
647
648        private void registerWorkloadUnit(WorkloadUnit wu){
649                if(!wu.isRegistered()){
650                        wu.register(jobRegistry);
651                }
652                wu.accept(getWorkloadUnitHandler());
653        }
654       
655        class LocalWorkloadUnitHandler implements WorkloadUnitHandler{
656               
657                public void handleJob(JobInterface<?> job){
658
659                        if (log.isInfoEnabled())
660                                log.info("Handle " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
661                       
662                        try {
663                                if(job.getStatus() == BrokerConstants.JOB_STATUS_UNSUBMITTED)
664                                        job.setStatus((int)BrokerConstants.JOB_STATUS_SUBMITTED);
665                        } catch (Exception e) {
666                                e.printStackTrace();
667                        }
668                        List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
669                        jobsList.add(job);
670                        TaskListImpl availableTasks = new TaskListImpl();
671                        for(Task task: jobRegistry.getAvailableTasks(jobsList)){
672                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
673                                availableTasks.add(task);
674                        }
675
676                        for(TaskInterface<?> task: availableTasks){     
677                                registerWorkloadUnit(task);
678                        }
679                }
680               
681                public void handleTask(TaskInterface<?> t){
682                        Task task = (Task)t;
683                        List<AbstractProcesses> processes = task.getProcesses();
684
685                        if(processes == null || processes.size() == 0){
686                                Executable exec = new Executable(task);
687                                registerWorkloadUnit(exec);
688                        } else {
689                                for(int j = 0; j < processes.size(); j++){
690                                        AbstractProcesses procesesSet = processes.get(j);
691                                        Executable exec = new Executable(task, procesesSet);
692                                        registerWorkloadUnit(exec);
693                                }
694                        }
695                }
696               
697                public void handleExecutable(ExecTask task){
698                       
699                        Executable exec = (Executable) task;
700                        jobRegistry.addExecTask(exec);
701
702                        exec.setSchedulerName(scheduler.getFullName());
703                       
704                        TaskList newTasks = new TaskListImpl();
705                        newTasks.add(exec);
706               
707                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
708
709                        if (exec.getStatus() == DCWormsTags.QUEUED) {
710                                sendExecutableReadyEvent(exec);
711                        }
712                }
713        }
714
715        public WorkloadUnitHandler getWorkloadUnitHandler() {
716                return new LocalWorkloadUnitHandler();
717        }
718       
719       
720        public LocalResourceManager getResourceManager() {
721                if (resourceManager instanceof ResourceManager)
722                        return (LocalResourceManager) resourceManager;
723                else
724                        return null;
725        }
726       
727       
728        private void saveExecutionHistory(Executable exec, double completionPercentage, double estimatedDuration){
729                ExecutionHistoryItem execHistoryItem = new ExecutionHistoryItem(new DateTime().getMillis());
730                execHistoryItem.setCompletionPercentage(completionPercentage);
731                execHistoryItem.setEstimatedDuration(estimatedDuration);
732                execHistoryItem.setResIndex(exec.getAllocatedResources().size() -1);
733                execHistoryItem.setStatus(exec.getStatus());
734                exec.addExecHistory(execHistoryItem);
735        }
736       
737        public boolean allocateResources(Executable exec, Map<ResourceUnitName, ResourceUnit> choosenResources, boolean exclusive){
738                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources, exclusive);
739                if(allocationStatus){
740                        ResourceItem resourceHistoryItem = new ResourceItem(choosenResources);
741                        exec.addAllocatedResources(resourceHistoryItem);
742                        return true;
743                }
744                return false;
745        }
746       
747
748       
749}
Note: See TracBrowser for help on using the repository browser.