source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/policy/local/LocalManagementSystem.java @ 1444

Revision 1444, 27.3 KB checked in by wojtekp, 11 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.local;
2
3import java.util.ArrayList;
4import java.util.HashMap;
5import java.util.Iterator;
6import java.util.LinkedHashSet;
7import java.util.List;
8import java.util.Map;
9import java.util.Set;
10
11import org.apache.commons.logging.Log;
12import org.apache.commons.logging.LogFactory;
13import org.joda.time.DateTime;
14import org.joda.time.DateTimeUtilsExt;
15import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
16
17import qcg.shared.constants.BrokerConstants;
18import schedframe.SimulatedEnvironment;
19import schedframe.events.scheduling.SchedulingEvent;
20import schedframe.events.scheduling.SchedulingEventType;
21import schedframe.events.scheduling.StartTaskExecutionEvent;
22import schedframe.events.scheduling.TaskFinishedEvent;
23import schedframe.events.scheduling.TaskPausedEvent;
24import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent;
25import schedframe.events.scheduling.TaskResumedEvent;
26import schedframe.exceptions.ResourceException;
27import schedframe.resources.StandardResourceType;
28import schedframe.resources.computing.ComputingResource;
29import schedframe.resources.computing.profiles.energy.ResourceEvent;
30import schedframe.resources.computing.profiles.energy.ResourceEventType;
31import schedframe.resources.units.PEUnit;
32import schedframe.resources.units.ProcessingElements;
33import schedframe.resources.units.ResourceUnit;
34import schedframe.resources.units.ResourceUnitName;
35import schedframe.resources.units.StandardResourceUnitName;
36import schedframe.scheduling.ExecutionHistoryItem;
37import schedframe.scheduling.ResourceItem;
38import schedframe.scheduling.Scheduler;
39import schedframe.scheduling.TaskList;
40import schedframe.scheduling.TaskListImpl;
41import schedframe.scheduling.WorkloadUnitHandler;
42import schedframe.scheduling.manager.resources.LocalResourceManager;
43import schedframe.scheduling.manager.resources.ManagedResources;
44import schedframe.scheduling.manager.resources.ResourceManager;
45import schedframe.scheduling.plan.AllocationInterface;
46import schedframe.scheduling.plan.ScheduledTaskInterface;
47import schedframe.scheduling.plan.SchedulingPlanInterface;
48import schedframe.scheduling.plugin.ModuleListImpl;
49import schedframe.scheduling.plugin.SchedulingPlugin;
50import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
51import schedframe.scheduling.policy.AbstractManagementSystem;
52import schedframe.scheduling.queue.TaskQueueList;
53import schedframe.scheduling.tasks.AbstractProcesses;
54import schedframe.scheduling.tasks.Job;
55import schedframe.scheduling.tasks.JobInterface;
56import schedframe.scheduling.tasks.Task;
57import schedframe.scheduling.tasks.TaskInterface;
58import schedframe.scheduling.tasks.WorkloadUnit;
59import simulator.DCWormsConstants;
60import simulator.stats.DCwormsAccumulator;
61import simulator.utils.DoubleMath;
62import dcworms.schedframe.scheduling.ExecTask;
63import dcworms.schedframe.scheduling.Executable;
64import eduni.simjava.Sim_event;
65import eduni.simjava.Sim_system;
66import gridsim.dcworms.DCWormsTags;
67import gridsim.dcworms.filter.ExecTaskFilter;
68
69public class LocalManagementSystem extends AbstractManagementSystem {
70
71        private Log log = LogFactory.getLog(LocalManagementSystem.class);
72
73        protected double lastUpdateTime;
74
75        public LocalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
76                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
77                        throws Exception {
78
79                super(providerId, entityName, execTimeEstimationPlugin, queues);
80               
81                if (schedPlugin == null) {
82                        throw new Exception("Can not create local scheduling plugin instance.");
83                }
84                this.schedulingPlugin =  schedPlugin;
85                this.moduleList = new ModuleListImpl(1);
86        }
87
88        public void init(Scheduler sched, ManagedResources managedResources) {
89                super.init(sched, managedResources);
90        }
91
92        public void processEvent(Sim_event ev) {
93
94                updateProcessingProgress();
95
96                int tag = ev.get_tag();
97                SchedulingEvent event;
98                SchedulingPlanInterface<?> decision;
99                switch (tag) {
100                case DCWormsTags.TIMER:
101
102                        event = new SchedulingEvent(SchedulingEventType.TIMER);
103                        decision =  schedulingPlugin.schedule(event,
104                                        queues, jobRegistry, getResourceManager(), moduleList);
105                        executeSchedulingPlan(decision);
106
107                        sendTimerEvent();
108                        break;
109
110                case DCWormsTags.TASK_READY_FOR_EXECUTION:
111                        ExecTask execTask = (ExecTask) ev.get_data();
112                        try {
113                                execTask.setStatus(DCWormsTags.READY);
114
115                                event = new StartTaskExecutionEvent(execTask.getJobId(), execTask.getId());
116                                decision =  schedulingPlugin.schedule(event,
117                                                queues, jobRegistry, getResourceManager(), moduleList);
118                                executeSchedulingPlan(decision);
119
120                        } catch (Exception e) {
121                                e.printStackTrace();
122                        }
123                        break;
124
125                case DCWormsTags.TASK_EXECUTION_FINISHED:
126                        execTask = (ExecTask) ev.get_data();
127                        if (execTask.getStatus() == DCWormsTags.INEXEC) {
128                               
129                                finalizeExecutable(execTask);
130                                sendFinishedWorkloadUnit(execTask);
131                                event = new TaskFinishedEvent(execTask.getJobId(), execTask.getId());
132                                decision = schedulingPlugin.schedule(event,
133                                                queues, jobRegistry, getResourceManager(), moduleList);
134                                executeSchedulingPlan(decision);
135                        }
136
137                        Job job = jobRegistry.getJob(execTask.getJobId());
138                        if(!job.isFinished()){
139                                getWorkloadUnitHandler().handleJob(job);
140                        }
141                        //SUPPORTS PRECEDING CONSTRAINST COMING ALSO FROM SWF FILES
142                        /*else {
143                                try {
144                                        job.setStatus((int)BrokerConstants.JOB_STATUS_FINISHED);
145                                } catch (Exception e) {
146
147                                }
148                                for(JobInterface<?> j: jobRegistry.getJobs((int)BrokerConstants.JOB_STATUS_SUBMITTED)){
149                                        getWorkloadUnitHandler().handleJob(j); 
150                                }
151                        }*/
152                        break;
153                       
154                case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED:
155                        execTask = (Executable) ev.get_data();
156                        event = new TaskRequestedTimeExpiredEvent(execTask.getJobId(), execTask.getId());
157                        decision = schedulingPlugin.schedule(event,
158                                        queues, jobRegistry, getResourceManager(), moduleList);
159                        executeSchedulingPlan(decision);
160                        break;
161                       
162                case DCWormsTags.TASK_PAUSE:{
163                        String[] ids = (String[]) ev.get_data();
164                        execTask = jobRegistry.getTask(ids[0], ids[1]);
165                        pauseTask(execTask);
166       
167                        event = new TaskPausedEvent(ids[0], ids[1]);
168                        decision = schedulingPlugin.schedule(event,
169                                        queues, jobRegistry, getResourceManager(), moduleList);
170                        executeSchedulingPlan(decision);
171                        }
172                        break;
173               
174                case DCWormsTags.TASK_RESUME:{
175                        String[] ids = (String[]) ev.get_data();
176                        execTask = jobRegistry.getTask(ids[0], ids[1]);
177                        resumeTask(execTask, execTask.getAllocatedResources().getLast().getResourceUnits(), true);
178                        event = new TaskResumedEvent(ids[0], ids[1]);
179                        decision = schedulingPlugin.schedule(event,
180                                        queues, jobRegistry, getResourceManager(), moduleList);
181                        executeSchedulingPlan(decision);
182                        }
183                        break;
184               
185                case DCWormsTags.TASK_MIGRATE:{
186                        Object[] data = (Object[]) ev.get_data();
187                        execTask = jobRegistry.getTask((String)data[0], (String)data[1]);
188                        double migrationTime = execTimeEstimationPlugin.estimateMigrationTime(new StartTaskExecutionEvent((String)data[0],
189                                        (String)data[1]), execTask, execTask.getAllocatedResources().getLast().getResourceUnits(), (Map<ResourceUnitName, ResourceUnit>)data[2]);
190                        scheduler.sendInternal(migrationTime, DCWormsTags.TASK_MOVE, data);
191                        }
192                        break;
193               
194                case DCWormsTags.TASK_MOVE:{
195                        Object[] data = (Object[]) ev.get_data();
196                        execTask = jobRegistry.getTask((String)data[0], (String)data[1]);
197                        moveTask(execTask, (Map<ResourceUnitName, ResourceUnit>)data[2]);
198
199                        event = new StartTaskExecutionEvent((String)data[0], (String)data[1]);
200                        decision = schedulingPlugin.schedule(event,
201                                        queues, jobRegistry, getResourceManager(), moduleList);
202                        executeSchedulingPlan(decision);
203                        }
204                        break;
205                       
206                case DCWormsTags.RESOURCE_POWER_STATE_CHANGED:
207                        event = new SchedulingEvent(SchedulingEventType.RESOURCE_POWER_STATE_CHANGED);
208                        String source;
209                        try{
210                                source = ev.get_data().toString();
211                        } catch(Exception e){
212                                source = null;
213                        }
214                        event.setSource(source);
215                        decision =  schedulingPlugin.schedule(event,
216                                        queues, jobRegistry, getResourceManager(), moduleList);
217                        executeSchedulingPlan(decision);
218                        break;
219
220                case DCWormsTags.RESOURCE_POWER_LIMIT_EXCEEDED:
221                        event = new SchedulingEvent(SchedulingEventType.RESOURCE_POWER_LIMIT_EXCEEDED);
222                        try{
223                                source = ev.get_data().toString();
224                        } catch(Exception e){
225                                source = null;
226                        }
227                        event.setSource(source);
228                        decision = schedulingPlugin.schedule(event,
229                                        queues, jobRegistry, getResourceManager(), moduleList);
230                        executeSchedulingPlan(decision);
231                        break;
232                case DCWormsTags.TASK_EXECUTION_CHANGED:
233                        execTask = (ExecTask) ev.get_data();
234                        updateTaskExecutionPhase(execTask);
235                        break;
236                case DCWormsTags.UPDATE_PROCESSING:
237                        updateProcessingTimes();
238                        break;
239                       
240                default:               
241                        break;
242                }
243        }
244
245
246        protected void pauseTask(ExecTask execTask) {
247                if (execTask == null) {
248                        return;
249                } else {
250                        try {
251                                execTask.setStatus(DCWormsTags.PAUSED);
252                               
253                                Executable exec = (Executable) execTask;
254                                Map<ResourceUnitName, ResourceUnit> lastUsed = exec.getAllocatedResources().getLast().getResourceUnits();
255                                getAllocationManager().freeResources(lastUsed, true);
256                               
257                                saveExecutionHistory(exec, exec.getExecutionProfile().getCompletionPercentage(), exec.getEstimatedDuration());
258                               
259                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), -1);
260                                scheduler.sim_cancel(filter, null);
261                               
262                                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
263                                updateComputingResources(peUnit, ResourceEventType.TASK_FINISHED, exec);
264                        } catch (Exception e) {
265                                // TODO Auto-generated catch block
266                                e.printStackTrace();
267                        }
268                }               
269        }
270       
271        protected void resumeTask(ExecTask execTask, Map<ResourceUnitName, ResourceUnit> resources, boolean exclusive) {
272                if (execTask == null) {
273                        return;
274                } else if (execTask.getStatus() == DCWormsTags.PAUSED) {
275                        try {
276                                execTask.setStatus(DCWormsTags.RESUMED);
277                                Executable exec = (Executable) execTask;
278
279                                boolean status = allocateResources(exec, resources, exclusive);
280                                if(status == false){
281                                        TaskList newTasks = new TaskListImpl();
282                                        newTasks.add(exec);
283                                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
284                                        exec.setStatus(DCWormsTags.READY);
285                                } else {
286                                        runTask(execTask);
287                                       
288                                        PEUnit peUnit = (PEUnit)resources.get(StandardResourceUnitName.PE);
289                                        updateComputingResources(peUnit, ResourceEventType.TASK_STARTED, exec);
290                                }
291
292                        } catch (Exception e) {
293                                // TODO Auto-generated catch block
294                                e.printStackTrace();
295                        }
296                }                       
297        }
298       
299        protected void moveTask(ExecTask execTask, Map<ResourceUnitName, ResourceUnit> map) {
300                pauseTask(execTask);
301                resumeTask(execTask, map, false);
302        }
303       
304        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
305
306                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
307                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
308                                queues, jobRegistry, getResourceManager(), moduleList);
309                executeSchedulingPlan(decision);
310
311        //if(scheduler.getParent() != null){
312                sendFinishedWorkloadUnit(wu);
313        //}
314        }
315       
316        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
317
318                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
319                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
320                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
321
322                        if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
323                                continue;
324                        }
325
326                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
327
328                        TaskInterface<?> task = taskDecision.getTask();
329                        for (int j = 0; j < allocations.size(); j++) {
330
331                                AllocationInterface<?> allocation = allocations.get(j);
332                                if (allocation.getRequestedResources() == null || allocation.getRequestedResources().size() > 0) {
333                                        ExecTask exec = (ExecTask) task;                                       
334                                        executeTask(exec, allocation.getRequestedResources());
335                                } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){
336                                        allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName()));
337                                        submitTask(task, allocation);
338                                } else {
339                                        ExecTask exec = (ExecTask) task;
340                                        executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), exec));
341                                }
342                        }
343                }
344        }
345
346        protected void executeTask(ExecTask task, Map<ResourceUnitName, ResourceUnit> choosenResources) {
347
348                Executable exec = (Executable)task;
349                boolean allocationStatus = allocateResources(exec, choosenResources, false);
350                if(allocationStatus == false){
351                        log.info("Task " + task.getJobId() + "_" + task.getId() + " requires more resources than is available at this moment.");
352                        return;
353                }
354
355                removeFromQueue(task);
356
357                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime());
358
359                runTask(exec);
360               
361                PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE);
362                updateComputingResources(peUnit, ResourceEventType.UTILIZATION_CHANGED, exec);
363               
364
365                try {
366                        long expectedDuration = exec.getExpectedDuration().getMillis() / 1000;
367                        scheduler.sendInternal(expectedDuration, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
368                } catch (NoSuchFieldException e) {
369                        //double t = exec.getEstimatedDuration();
370                        //scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
371                }
372
373                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());           
374        }
375       
376        private void runTask(ExecTask execTask){
377                Executable exec = (Executable) execTask;
378                Map<ResourceUnitName, ResourceUnit> resources = exec.getAllocatedResources().getLast().getResourceUnits();
379               
380                try {
381                        exec.setStatus(DCWormsTags.INEXEC);
382                } catch (Exception e) {
383                        // TODO Auto-generated catch block
384                        e.printStackTrace();
385                }
386
387                int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION),
388                                execTask, resources, exec.getExecutionProfile().getCompletionPercentage())).intValue();
389
390                saveExecutionHistory(exec, exec.getExecutionProfile().getCompletionPercentage(), phaseDuration);
391                if(exec.getExecutionProfile().isLast()){
392                        scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);;
393                } else {
394                        scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
395                }
396               
397                PEUnit peUnit = (PEUnit)resources.get(StandardResourceUnitName.PE);
398                updateComputingResources(peUnit, ResourceEventType.TASK_STARTED, exec);
399        }
400
401        protected void finalizeExecutable(ExecTask execTask){
402               
403                Executable exec = (Executable)execTask;
404                exec.finalizeExecutable();
405               
406                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED);
407                scheduler.sim_cancel(filter, null);
408               
409                Task task;
410                Job job = jobRegistry.getJob(exec.getJobId());
411                try {
412                        task = job.getTask(exec.getTaskId());
413                } catch (NoSuchFieldException e) {
414                        return;
415                }
416                if(exec.getProcessesId() == null){
417                        try {
418                                task.setStatus(exec.getStatus());
419                        } catch (Exception e) {
420                                e.printStackTrace();
421                        }
422                } else {
423                        List<AbstractProcesses> processesList = task.getProcesses();
424                        for(int i = 0; i < processesList.size(); i++){
425                                AbstractProcesses processes = processesList.get(i);
426                                if(processes.getId().equals(exec.getProcessesId())){
427                                        processes.setStatus(exec.getStatus());
428                                        break;
429                                }
430                        }
431                }
432
433                Map<ResourceUnitName, ResourceUnit> lastUsed = exec.getAllocatedResources().getLast().getResourceUnits();
434                getAllocationManager().freeResources(lastUsed, true);
435               
436                //TODO calculate the value of completion
437                saveExecutionHistory(exec, 100,0);
438               
439                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
440                updateComputingResources(peUnit, ResourceEventType.TASK_FINISHED, exec);
441               
442                log.debug(execTask.getJobId() + "_" + execTask.getId() + " finished execution on " + new DateTime());
443                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());
444               
445                updateComputingResources(peUnit, ResourceEventType.UTILIZATION_CHANGED, exec);
446        }
447       
448        protected void updateProcessingProgress() {
449
450                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
451                if (timeSpan <= 0.0) {
452                        // don't update when nothing changed
453                        return;
454                }
455                lastUpdateTime = Sim_system.clock();
456                Iterator<ExecTask> iter = jobRegistry.getRunningTasks().iterator();
457                while (iter.hasNext()) {
458                        ExecTask task = iter.next();
459                        Executable exec = (Executable)task;
460                        ExecutionHistoryItem execHistItem = exec.getExecutionHistory().getLast();
461                        //System.out.println("--- upadteProgressX: " + Sim_system.sim_clock() );
462                        //System.out.println("taskId: " + exec.getId() + "; completion percentage: " + exec.getCompletionPercentage() + "; timespan: " + timeSpan + "; estimatedDuration: " +  execHistItem.getCompletionPercentage());
463                        exec.getExecutionProfile().setCompletionPercentage(exec.getExecutionProfile().getCompletionPercentage() + 100 * (timeSpan / (execHistItem.getEstimatedDuration()) * (1.0 - execHistItem.getCompletionPercentage()/100.0)));
464                        exec.setTotalCompletionPercentage(exec.getTotalCompletionPercentage() + 100 * (timeSpan / execHistItem.getEstimatedDuration()) * exec.getExecutionProfile().getCurrentExecutionPhase().getLenght() / exec.getExecutionProfile().getLength());
465                        //System.out.println("newProgress: " + exec.getCompletionPercentage()  );       
466
467                }
468        }
469       
470        private void updateComputingResources(PEUnit peUnit, ResourceEventType eventType, Object obj){
471                if(peUnit instanceof ProcessingElements){
472                        /*ProcessingElements pes = (ProcessingElements) peUnit;
473                        for (ComputingResource resource : pes) {
474                                resource.handleEvent(new ResourceEvent(eventType, obj, scheduler.getFullName()));
475                                //DataCenterWorkloadSimulator.getEventManager().sendToResources(resource.getType(), 0, new EnergyEvent(eventType, obj));
476                        }*/
477                        /*try {
478                                for (ComputingResource resource : resourceManager.getResourcesOfType(pes.get(0).getType())) {
479                                        resource.handleEvent(new EnergyEvent(eventType, obj));
480                                }
481                        } catch (ResourceException e) {
482                                // TODO Auto-generated catch block
483                                e.printStackTrace();
484                        }*/
485                       
486                        ProcessingElements pes = (ProcessingElements) peUnit;
487                        ResourceEvent resEvent = new ResourceEvent(eventType, obj, scheduler.getFullName());           
488                        try{
489                                Set<ComputingResource> resourcesToUpdate = new LinkedHashSet<ComputingResource>();
490                       
491                                for (ComputingResource compResource : pes) {
492                                        resourcesToUpdate.add(compResource);
493                                }
494                                while(resourcesToUpdate.size() > 0){
495                                        Set<ComputingResource> newSet = new LinkedHashSet<ComputingResource>();
496                                        for(ComputingResource compResource: resourcesToUpdate){
497                                                compResource.updateState(resEvent);
498                                                if(compResource.getParent() != null){
499                                                        newSet.add(compResource.getParent());
500                                                }
501                                        }
502                                        resourcesToUpdate = new LinkedHashSet<ComputingResource>(newSet);
503                                }
504                        }catch(Exception e){
505                        }
506                } else {
507                        ComputingResource resource = null;
508                        try {
509                                resource = SimulatedEnvironment.getComputingResourceByName(peUnit.getResourceId());
510                        } catch (ResourceException e) {
511                                return;
512                        }
513                        resource.handleEvent(new ResourceEvent(eventType, obj, scheduler.getFullName()));
514                }
515        }
516
517        protected void updateProcessingTimes() {
518                for (ExecTask execTask: jobRegistry.getRunningTasks()) {
519                        Executable exec = (Executable)execTask;
520
521                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getAllocatedResources().getLast().getResourceUnits();
522
523                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED),
524                                        execTask, choosenResources, exec.getExecutionProfile().getCompletionPercentage())).intValue();
525
526                        ExecutionHistoryItem execHistItem = exec.getExecutionHistory().getLast();
527                        double lastTimeStamp = execHistItem.getTimeStamp() / 1000;
528                        if(DoubleMath.subtract((lastTimeStamp + execHistItem.getEstimatedDuration()), (new DateTime().getMillis() / 1000 + phaseDuration)) == 0.0){
529                                continue;
530                        }
531                        //System.out.println("=== upadteTIme: " + Sim_system.sim_clock() );
532                        //System.out.println("execId: " + exec.getId() +  "; estimatedDuration " + execHistItem.getEstimatedDuration());
533                        //System.out.println("execId: " + exec.getId() + "; difference " + DoubleMath.subtract((lastTimeStamp + execHistItem.getEstimatedDuration()), (new DateTime().getMillis()/1000 + phaseDuration)));
534                        //System.out.println("completionPercantage: " + exec.getCompletionPercentage() + "; basic duration: " +exec.getResourceConsumptionProfile().getCurrentResourceConsumption().getDuration() +   "; phaseDuration: " +  phaseDuration);
535
536                        saveExecutionHistory(exec, exec.getExecutionProfile().getCompletionPercentage(), phaseDuration);
537                       
538                        if(exec.getExecutionProfile().isLast()){
539                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED);
540                                scheduler.sim_cancel(filter, null);
541                                scheduler.sendInternal(phaseDuration , DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
542                                //PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
543                                //notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
544                        } else{
545                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_CHANGED);
546                                scheduler.sim_cancel(filter, null);
547                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
548                                //PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
549                                //notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
550                        }
551                }
552        }       
553
554        protected void updateTaskExecutionPhase(ExecTask execTask) {
555
556                if (execTask.getStatus() == DCWormsTags.INEXEC) {
557                        Executable exec = (Executable)execTask;
558                       
559                        try {
560                                exec.setStatus(DCWormsTags.NEW_EXEC_PHASE);
561                        } catch (Exception e) {
562                               
563                        }
564
565                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getAllocatedResources().getLast().getResourceUnits();
566
567                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.TASK_EXECUTION_CHANGED),
568                                        execTask, choosenResources, exec.getExecutionProfile().getCompletionPercentage())).intValue();
569                       
570                        saveExecutionHistory(exec, exec.getExecutionProfile().getCompletionPercentage(), phaseDuration);
571                       
572                        if(exec.getExecutionProfile().isLast()){
573                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
574                        } else {
575                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
576                        }
577                       
578                        PEUnit peUnit = (PEUnit)exec.getAllocatedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
579                        updateComputingResources(peUnit, ResourceEventType.UTILIZATION_CHANGED, exec);
580                }
581        }       
582
583        protected double calculateTotalLoad() {
584
585                DCwormsAccumulator loadAcc = new DCwormsAccumulator();
586
587                for(ComputingResource compRes: getResourceManager().getResourcesOfType(StandardResourceType.Node)){
588                        loadAcc.add(compRes.getLoadInterface().getRecentUtilization().getValue());                             
589                }
590
591                return loadAcc.getMean();
592        }
593
594        private Map<ResourceUnitName, ResourceUnit> chooseResourcesForExecution(String resourceName,
595                        ExecTask task) {
596
597                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>();
598                LocalResourceManager resourceManager = getResourceManager();
599                if(resourceName != null){
600                        ComputingResource resource = resourceManager.getResourceByName(resourceName);
601                        if(resource == null){
602                                return null;
603                        }
604                        resourceManager = new LocalResourceManager(resource);
605                }
606
607                int cpuRequest;
608                try {
609                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
610                } catch (NoSuchFieldException e) {
611                        cpuRequest = 1;
612                }
613
614                if (cpuRequest != 0) {
615                       
616                        List<ResourceUnit> availableUnits = null;
617                        try {
618                                availableUnits = resourceManager.getPE();
619                        } catch (ResourceException e) {
620                                return null;
621                        }
622                       
623                        List<ResourceUnit> choosenPEUnits = new ArrayList<ResourceUnit>();
624                        for (int i = 0; i < availableUnits.size() && cpuRequest > 0; i++) {
625                                PEUnit peUnit = (PEUnit) availableUnits.get(i);
626                                if(peUnit.getFreeAmount() > 0){
627                                        int allocPE = Math.min(peUnit.getFreeAmount(), cpuRequest);
628                                        cpuRequest = cpuRequest - allocPE;
629                                        choosenPEUnits.add(peUnit.replicate(allocPE)); 
630                                }       
631                        }
632                       
633                        if(cpuRequest > 0){
634                                return null;
635                        }
636                        map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0));
637                }
638
639                return  map;
640        }
641       
642        public void notifySubmittedWorkloadUnit(WorkloadUnit wu) {
643                //250314
644                //updateProcessingProgress();
645                if (log.isInfoEnabled())
646                        log.info("Received job " + wu.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
647                registerWorkloadUnit(wu);
648        }
649
650        private void registerWorkloadUnit(WorkloadUnit wu){
651                if(!wu.isRegistered()){
652                        wu.register(jobRegistry);
653                }
654                wu.accept(getWorkloadUnitHandler());
655        }
656       
657        class LocalWorkloadUnitHandler implements WorkloadUnitHandler{
658               
659                public void handleJob(JobInterface<?> job){
660
661                        if (log.isInfoEnabled())
662                                log.info("Handle " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
663                       
664                        try {
665                                if(job.getStatus() == BrokerConstants.JOB_STATUS_UNSUBMITTED)
666                                        job.setStatus((int)BrokerConstants.JOB_STATUS_SUBMITTED);
667                        } catch (Exception e) {
668                                e.printStackTrace();
669                        }
670                        List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
671                        jobsList.add(job);
672                        TaskListImpl availableTasks = new TaskListImpl();
673                        for(Task task: jobRegistry.getAvailableTasks(jobsList)){
674                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
675                                availableTasks.add(task);
676                        }
677
678                        for(TaskInterface<?> task: availableTasks){     
679                                registerWorkloadUnit(task);
680                        }
681                }
682               
683                public void handleTask(TaskInterface<?> t){
684                        Task task = (Task)t;
685                        List<AbstractProcesses> processes = task.getProcesses();
686
687                        if(processes == null || processes.size() == 0){
688                                Executable exec = new Executable(task);
689                                registerWorkloadUnit(exec);
690                        } else {
691                                for(int j = 0; j < processes.size(); j++){
692                                        AbstractProcesses procesesSet = processes.get(j);
693                                        Executable exec = new Executable(task, procesesSet);
694                                        registerWorkloadUnit(exec);
695                                }
696                        }
697                }
698               
699                public void handleExecutable(ExecTask task){
700                       
701                        Executable exec = (Executable) task;
702                        jobRegistry.addExecTask(exec);
703
704                        exec.setSchedulerName(scheduler.getFullName());
705                       
706                        TaskList newTasks = new TaskListImpl();
707                        newTasks.add(exec);
708               
709                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
710
711                        if (exec.getStatus() == DCWormsTags.QUEUED) {
712                                sendExecutableReadyEvent(exec);
713                        }
714                }
715        }
716
717        public WorkloadUnitHandler getWorkloadUnitHandler() {
718                return new LocalWorkloadUnitHandler();
719        }
720       
721       
722        public LocalResourceManager getResourceManager() {
723                if (resourceManager instanceof ResourceManager)
724                        return (LocalResourceManager) resourceManager;
725                else
726                        return null;
727        }
728       
729       
730        private void saveExecutionHistory(Executable exec, double completionPercentage, double estimatedDuration){
731                ExecutionHistoryItem execHistoryItem = new ExecutionHistoryItem(new DateTime().getMillis());
732                execHistoryItem.setCompletionPercentage(completionPercentage);
733                execHistoryItem.setEstimatedDuration(estimatedDuration);
734                execHistoryItem.setResIndex(exec.getAllocatedResources().size() - 1);
735                execHistoryItem.setStatus(exec.getStatus());
736                exec.addExecHistory(execHistoryItem);
737        }
738       
739        public boolean allocateResources(Executable exec, Map<ResourceUnitName, ResourceUnit> choosenResources, boolean exclusive){
740                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources, exclusive);
741                if(allocationStatus){
742                        ResourceItem resourceHistoryItem = new ResourceItem(choosenResources);
743                        exec.addAllocatedResources(resourceHistoryItem);
744                        return true;
745                }
746                return false;
747        }
748       
749
750       
751}
Note: See TracBrowser for help on using the repository browser.