source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/policy/local/LocalManagementSystem.java @ 1440

Revision 1440, 27.2 KB checked in by wojtekp, 11 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.local;
2
3import java.util.ArrayList;
4import java.util.HashMap;
5import java.util.Iterator;
6import java.util.LinkedHashSet;
7import java.util.List;
8import java.util.Map;
9import java.util.Set;
10
11import org.apache.commons.logging.Log;
12import org.apache.commons.logging.LogFactory;
13import org.joda.time.DateTime;
14import org.joda.time.DateTimeUtilsExt;
15import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
16
17import qcg.shared.constants.BrokerConstants;
18import schedframe.SimulatedEnvironment;
19import schedframe.events.scheduling.SchedulingEvent;
20import schedframe.events.scheduling.SchedulingEventType;
21import schedframe.events.scheduling.StartTaskExecutionEvent;
22import schedframe.events.scheduling.TaskFinishedEvent;
23import schedframe.events.scheduling.TaskPausedEvent;
24import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent;
25import schedframe.events.scheduling.TaskResumedEvent;
26import schedframe.exceptions.ResourceException;
27import schedframe.resources.StandardResourceType;
28import schedframe.resources.computing.ComputingResource;
29import schedframe.resources.computing.profiles.energy.ResourceEvent;
30import schedframe.resources.computing.profiles.energy.ResourceEventType;
31import schedframe.resources.units.PEUnit;
32import schedframe.resources.units.ProcessingElements;
33import schedframe.resources.units.ResourceUnit;
34import schedframe.resources.units.ResourceUnitName;
35import schedframe.resources.units.StandardResourceUnitName;
36import schedframe.scheduling.ExecutionHistoryItem;
37import schedframe.scheduling.ResourceItem;
38import schedframe.scheduling.Scheduler;
39import schedframe.scheduling.TaskList;
40import schedframe.scheduling.TaskListImpl;
41import schedframe.scheduling.WorkloadUnitHandler;
42import schedframe.scheduling.manager.resources.LocalResourceManager;
43import schedframe.scheduling.manager.resources.ManagedResources;
44import schedframe.scheduling.manager.resources.ResourceManager;
45import schedframe.scheduling.plan.AllocationInterface;
46import schedframe.scheduling.plan.ScheduledTaskInterface;
47import schedframe.scheduling.plan.SchedulingPlanInterface;
48import schedframe.scheduling.plugin.ModuleListImpl;
49import schedframe.scheduling.plugin.SchedulingPlugin;
50import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
51import schedframe.scheduling.policy.AbstractManagementSystem;
52import schedframe.scheduling.queue.TaskQueueList;
53import schedframe.scheduling.tasks.AbstractProcesses;
54import schedframe.scheduling.tasks.Job;
55import schedframe.scheduling.tasks.JobInterface;
56import schedframe.scheduling.tasks.Task;
57import schedframe.scheduling.tasks.TaskInterface;
58import schedframe.scheduling.tasks.WorkloadUnit;
59import simulator.DCWormsConstants;
60import simulator.stats.DCwormsAccumulator;
61import simulator.utils.DoubleMath;
62import dcworms.schedframe.scheduling.ExecTask;
63import dcworms.schedframe.scheduling.Executable;
64import eduni.simjava.Sim_event;
65import eduni.simjava.Sim_system;
66import gridsim.dcworms.DCWormsTags;
67import gridsim.dcworms.filter.ExecTaskFilter;
68
69public class LocalManagementSystem extends AbstractManagementSystem {
70
71        private Log log = LogFactory.getLog(LocalManagementSystem.class);
72
73        protected double lastUpdateTime;
74
75        public LocalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
76                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
77                        throws Exception {
78
79                super(providerId, entityName, execTimeEstimationPlugin, queues);
80               
81                if (schedPlugin == null) {
82                        throw new Exception("Can not create local scheduling plugin instance.");
83                }
84                this.schedulingPlugin =  schedPlugin;
85                this.moduleList = new ModuleListImpl(1);
86        }
87
88        public void init(Scheduler sched, ManagedResources managedResources) {
89                super.init(sched, managedResources);
90        }
91
92        public void processEvent(Sim_event ev) {
93
94                updateProcessingProgress();
95
96                int tag = ev.get_tag();
97                SchedulingEvent event;
98                SchedulingPlanInterface<?> decision;
99                switch (tag) {
100                case DCWormsTags.TIMER:
101
102                        event = new SchedulingEvent(SchedulingEventType.TIMER);
103                        decision =  schedulingPlugin.schedule(event,
104                                        queues, jobRegistry, getResourceManager(), moduleList);
105                        executeSchedulingPlan(decision);
106
107                        sendTimerEvent();
108                        break;
109
110                case DCWormsTags.TASK_READY_FOR_EXECUTION:
111                        ExecTask execTask = (ExecTask) ev.get_data();
112                        try {
113                                execTask.setStatus(DCWormsTags.READY);
114
115                                event = new StartTaskExecutionEvent(execTask.getJobId(), execTask.getId());
116                                decision =  schedulingPlugin.schedule(event,
117                                                queues, jobRegistry, getResourceManager(), moduleList);
118                                executeSchedulingPlan(decision);
119
120                        } catch (Exception e) {
121                                e.printStackTrace();
122                        }
123                        break;
124
125                case DCWormsTags.TASK_EXECUTION_FINISHED:
126                        execTask = (ExecTask) ev.get_data();
127                        if (execTask.getStatus() == DCWormsTags.INEXEC) {
128                               
129                                finalizeExecutable(execTask);
130                                sendFinishedWorkloadUnit(execTask);
131                                event = new TaskFinishedEvent(execTask.getJobId(), execTask.getId());
132                                decision = schedulingPlugin.schedule(event,
133                                                queues, jobRegistry, getResourceManager(), moduleList);
134                                executeSchedulingPlan(decision);
135                        }
136
137                        Job job = jobRegistry.getJob(execTask.getJobId());
138                        if(!job.isFinished()){
139                                getWorkloadUnitHandler().handleJob(job);
140                        }
141                        //SUPPORTS PRECEDING CONSTRAINST COMING ALSO FROM SWF FILES
142                        /*else {
143                                try {
144                                        job.setStatus((int)BrokerConstants.JOB_STATUS_FINISHED);
145                                } catch (Exception e) {
146
147                                }
148                                for(JobInterface<?> j: jobRegistry.getJobs((int)BrokerConstants.JOB_STATUS_SUBMITTED)){
149                                        getWorkloadUnitHandler().handleJob(j); 
150                                }
151                        }*/
152                        break;
153                       
154                case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED:
155                        execTask = (Executable) ev.get_data();
156                        event = new TaskRequestedTimeExpiredEvent(execTask.getJobId(), execTask.getId());
157                        decision = schedulingPlugin.schedule(event,
158                                        queues, jobRegistry, getResourceManager(), moduleList);
159                        executeSchedulingPlan(decision);
160                        break;
161                       
162                case DCWormsTags.TASK_PAUSE:{
163                        String[] ids = (String[]) ev.get_data();
164                        execTask = jobRegistry.getTask(ids[0], ids[1]);
165                        pauseTask(execTask);
166       
167                        event = new TaskPausedEvent(ids[0], ids[1]);
168                        decision = schedulingPlugin.schedule(event,
169                                        queues, jobRegistry, getResourceManager(), moduleList);
170                        executeSchedulingPlan(decision);
171                        }
172                        break;
173               
174                case DCWormsTags.TASK_RESUME:{
175                        String[] ids = (String[]) ev.get_data();
176                        execTask = jobRegistry.getTask(ids[0], ids[1]);
177                        resumeTask(execTask, execTask.getAllocatedResources().getLast().getResourceUnits(), true);
178                        event = new TaskResumedEvent(ids[0], ids[1]);
179                        decision = schedulingPlugin.schedule(event,
180                                        queues, jobRegistry, getResourceManager(), moduleList);
181                        executeSchedulingPlan(decision);
182                        }
183                        break;
184               
185                case DCWormsTags.TASK_MIGRATE:{
186                        Object[] data = (Object[]) ev.get_data();
187                        execTask = jobRegistry.getTask((String)data[0], (String)data[1]);
188                        double migrationTime = execTimeEstimationPlugin.estimateMigrationTime(new StartTaskExecutionEvent((String)data[0],
189                                        (String)data[1]), execTask, execTask.getAllocatedResources().getLast().getResourceUnits(), (Map<ResourceUnitName, ResourceUnit>)data[2]);
190                        scheduler.sendInternal(migrationTime, DCWormsTags.TASK_MOVE, data);
191                        }
192                        break;
193               
194                case DCWormsTags.TASK_MOVE:{
195                        Object[] data = (Object[]) ev.get_data();
196                        execTask = jobRegistry.getTask((String)data[0], (String)data[1]);
197                        moveTask(execTask, (Map<ResourceUnitName, ResourceUnit>)data[2]);
198
199                        event = new StartTaskExecutionEvent((String)data[0], (String)data[1]);
200                        decision = schedulingPlugin.schedule(event,
201                                        queues, jobRegistry, getResourceManager(), moduleList);
202                        executeSchedulingPlan(decision);
203                        }
204                        break;
205                       
206                case DCWormsTags.RESOURCE_POWER_STATE_CHANGED:
207                        event = new SchedulingEvent(SchedulingEventType.RESOURCE_POWER_STATE_CHANGED);
208                        decision =  schedulingPlugin.schedule(event,
209                                        queues, jobRegistry, getResourceManager(), moduleList);
210                        executeSchedulingPlan(decision);
211                        break;
212
213                case DCWormsTags.RESOURCE_POWER_LIMIT_EXCEEDED:
214                        event = new SchedulingEvent(SchedulingEventType.RESOURCE_POWER_LIMIT_EXCEEDED);
215                        String source;
216                        try{
217                                source = ev.get_data().toString();
218                        } catch(Exception e){
219                                source = null;
220                        }
221                        event.setSource(source);
222                        decision = schedulingPlugin.schedule(event,
223                                        queues, jobRegistry, getResourceManager(), moduleList);
224                        executeSchedulingPlan(decision);
225                        break;
226                case DCWormsTags.TASK_EXECUTION_CHANGED:
227                        execTask = (ExecTask) ev.get_data();
228                        updateTaskExecutionPhase(execTask);
229                        break;
230                case DCWormsTags.UPDATE_PROCESSING:
231                        updateProcessingTimes();
232                        break;
233                       
234                default:               
235                        break;
236                }
237        }
238
239
240        protected void pauseTask(ExecTask execTask) {
241                if (execTask == null) {
242                        return;
243                } else {
244                        try {
245                                execTask.setStatus(DCWormsTags.PAUSED);
246                               
247                                Executable exec = (Executable) execTask;
248                                Map<ResourceUnitName, ResourceUnit> lastUsed = exec.getAllocatedResources().getLast().getResourceUnits();
249                                getAllocationManager().freeResources(lastUsed, true);
250                               
251                                saveExecutionHistory(exec, exec.getExecutionProfile().getCompletionPercentage(), exec.getEstimatedDuration());
252                               
253                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), -1);
254                                scheduler.sim_cancel(filter, null);
255                               
256                                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
257                                updateComputingResources(peUnit, ResourceEventType.TASK_FINISHED, exec);
258                        } catch (Exception e) {
259                                // TODO Auto-generated catch block
260                                e.printStackTrace();
261                        }
262                }               
263        }
264       
265        protected void resumeTask(ExecTask execTask, Map<ResourceUnitName, ResourceUnit> resources, boolean exclusive) {
266                if (execTask == null) {
267                        return;
268                } else if (execTask.getStatus() == DCWormsTags.PAUSED) {
269                        try {
270                                execTask.setStatus(DCWormsTags.RESUMED);
271                                Executable exec = (Executable) execTask;
272
273                                boolean status = allocateResources(exec, resources, exclusive);
274                                if(status == false){
275                                        TaskList newTasks = new TaskListImpl();
276                                        newTasks.add(exec);
277                                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
278                                        exec.setStatus(DCWormsTags.READY);
279                                } else {
280                                        runTask(execTask);
281                                       
282                                        PEUnit peUnit = (PEUnit)resources.get(StandardResourceUnitName.PE);
283                                        updateComputingResources(peUnit, ResourceEventType.TASK_STARTED, exec);
284                                }
285
286                        } catch (Exception e) {
287                                // TODO Auto-generated catch block
288                                e.printStackTrace();
289                        }
290                }                       
291        }
292       
293        protected void moveTask(ExecTask execTask, Map<ResourceUnitName, ResourceUnit> map) {
294                pauseTask(execTask);
295                resumeTask(execTask, map, false);
296        }
297       
298        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
299
300                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
301                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
302                                queues, jobRegistry, getResourceManager(), moduleList);
303                executeSchedulingPlan(decision);
304
305        //if(scheduler.getParent() != null){
306                sendFinishedWorkloadUnit(wu);
307        //}
308        }
309       
310        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
311
312                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
313                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
314                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
315
316                        if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
317                                continue;
318                        }
319
320                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
321
322                        TaskInterface<?> task = taskDecision.getTask();
323                        for (int j = 0; j < allocations.size(); j++) {
324
325                                AllocationInterface<?> allocation = allocations.get(j);
326                                if (allocation.getRequestedResources() == null || allocation.getRequestedResources().size() > 0) {
327                                        ExecTask exec = (ExecTask) task;                                       
328                                        executeTask(exec, allocation.getRequestedResources());
329                                } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){
330                                        allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName()));
331                                        submitTask(task, allocation);
332                                } else {
333                                        ExecTask exec = (ExecTask) task;
334                                        executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), exec));
335                                }
336                        }
337                }
338        }
339
340        protected void executeTask(ExecTask task, Map<ResourceUnitName, ResourceUnit> choosenResources) {
341
342                Executable exec = (Executable)task;
343                boolean allocationStatus = allocateResources(exec, choosenResources, false);
344                if(allocationStatus == false){
345                        log.info("Task " + task.getJobId() + "_" + task.getId() + " requires more resources than is available at this moment.");
346                        return;
347                }
348
349                removeFromQueue(task);
350
351                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime());
352
353                runTask(exec);
354               
355                PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE);
356                updateComputingResources(peUnit, ResourceEventType.UTILIZATION_CHANGED, exec);
357               
358
359                try {
360                        long expectedDuration = exec.getExpectedDuration().getMillis() / 1000;
361                        scheduler.sendInternal(expectedDuration, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
362                } catch (NoSuchFieldException e) {
363                        //double t = exec.getEstimatedDuration();
364                        //scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
365                }
366
367                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());           
368        }
369       
370        private void runTask(ExecTask execTask){
371                Executable exec = (Executable) execTask;
372                Map<ResourceUnitName, ResourceUnit> resources = exec.getAllocatedResources().getLast().getResourceUnits();
373               
374                try {
375                        exec.setStatus(DCWormsTags.INEXEC);
376                } catch (Exception e) {
377                        // TODO Auto-generated catch block
378                        e.printStackTrace();
379                }
380
381                int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION),
382                                execTask, resources, exec.getExecutionProfile().getCompletionPercentage())).intValue();
383
384                saveExecutionHistory(exec, exec.getExecutionProfile().getCompletionPercentage(), phaseDuration);
385                if(exec.getExecutionProfile().isLast()){
386                        scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);;
387                } else {
388                        scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
389                }
390               
391                PEUnit peUnit = (PEUnit)resources.get(StandardResourceUnitName.PE);
392                updateComputingResources(peUnit, ResourceEventType.TASK_STARTED, exec);
393        }
394
395        protected void finalizeExecutable(ExecTask execTask){
396               
397                Executable exec = (Executable)execTask;
398                exec.finalizeExecutable();
399               
400                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED);
401                scheduler.sim_cancel(filter, null);
402               
403                Task task;
404                Job job = jobRegistry.getJob(exec.getJobId());
405                try {
406                        task = job.getTask(exec.getTaskId());
407                } catch (NoSuchFieldException e) {
408                        return;
409                }
410                if(exec.getProcessesId() == null){
411                        try {
412                                task.setStatus(exec.getStatus());
413                        } catch (Exception e) {
414                                e.printStackTrace();
415                        }
416                } else {
417                        List<AbstractProcesses> processesList = task.getProcesses();
418                        for(int i = 0; i < processesList.size(); i++){
419                                AbstractProcesses processes = processesList.get(i);
420                                if(processes.getId().equals(exec.getProcessesId())){
421                                        processes.setStatus(exec.getStatus());
422                                        break;
423                                }
424                        }
425                }
426
427                Map<ResourceUnitName, ResourceUnit> lastUsed = exec.getAllocatedResources().getLast().getResourceUnits();
428                getAllocationManager().freeResources(lastUsed, true);
429               
430                //TODO calculate the value of completion
431                saveExecutionHistory(exec, 100,0);
432               
433                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
434                updateComputingResources(peUnit, ResourceEventType.TASK_FINISHED, exec);
435               
436                log.debug(execTask.getJobId() + "_" + execTask.getId() + " finished execution on " + new DateTime());
437                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());
438               
439                updateComputingResources(peUnit, ResourceEventType.UTILIZATION_CHANGED, exec);
440        }
441       
442        protected void updateProcessingProgress() {
443
444                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
445                if (timeSpan <= 0.0) {
446                        // don't update when nothing changed
447                        return;
448                }
449                lastUpdateTime = Sim_system.clock();
450                Iterator<ExecTask> iter = jobRegistry.getRunningTasks().iterator();
451                while (iter.hasNext()) {
452                        ExecTask task = iter.next();
453                        Executable exec = (Executable)task;
454                        ExecutionHistoryItem execHistItem = exec.getExecutionHistory().getLast();
455                        //System.out.println("--- upadteProgressX: " + Sim_system.sim_clock() );
456                        //System.out.println("taskId: " + exec.getId() + "; completion percentage: " + exec.getCompletionPercentage() + "; timespan: " + timeSpan + "; estimatedDuration: " +  execHistItem.getCompletionPercentage());
457                        exec.getExecutionProfile().setCompletionPercentage(exec.getExecutionProfile().getCompletionPercentage() + 100 * (timeSpan / (execHistItem.getEstimatedDuration()) * (1.0 - execHistItem.getCompletionPercentage()/100.0)));
458                        exec.setTotalCompletionPercentage(exec.getTotalCompletionPercentage() + 100 * (timeSpan / execHistItem.getEstimatedDuration()) * exec.getExecutionProfile().getCurrentExecutionPhase().getLenght() / exec.getExecutionProfile().getLength());
459                        //System.out.println("newProgress: " + exec.getCompletionPercentage()  );       
460
461                }
462        }
463       
464        private void updateComputingResources(PEUnit peUnit, ResourceEventType eventType, Object obj){
465                if(peUnit instanceof ProcessingElements){
466                        /*ProcessingElements pes = (ProcessingElements) peUnit;
467                        for (ComputingResource resource : pes) {
468                                resource.handleEvent(new ResourceEvent(eventType, obj, scheduler.getFullName()));
469                                //DataCenterWorkloadSimulator.getEventManager().sendToResources(resource.getType(), 0, new EnergyEvent(eventType, obj));
470                        }*/
471                        /*try {
472                                for (ComputingResource resource : resourceManager.getResourcesOfType(pes.get(0).getType())) {
473                                        resource.handleEvent(new EnergyEvent(eventType, obj));
474                                }
475                        } catch (ResourceException e) {
476                                // TODO Auto-generated catch block
477                                e.printStackTrace();
478                        }*/
479                       
480                        ProcessingElements pes = (ProcessingElements) peUnit;
481                        ResourceEvent resEvent = new ResourceEvent(eventType, obj, scheduler.getFullName());           
482                        try{
483                                Set<ComputingResource> resourcesToUpdate = new LinkedHashSet<ComputingResource>();
484                       
485                                for (ComputingResource compResource : pes) {
486                                        resourcesToUpdate.add(compResource);
487                                }
488                                while(resourcesToUpdate.size() > 0){
489                                        Set<ComputingResource> newSet = new LinkedHashSet<ComputingResource>();
490                                        for(ComputingResource compResource: resourcesToUpdate){
491                                                compResource.updateState(resEvent);
492                                                if(compResource.getParent() != null){
493                                                        newSet.add(compResource.getParent());
494                                                }
495                                        }
496                                        resourcesToUpdate = new LinkedHashSet<ComputingResource>(newSet);
497                                }
498                        }catch(Exception e){
499                        }
500                } else {
501                        ComputingResource resource = null;
502                        try {
503                                resource = SimulatedEnvironment.getComputingResourceByName(peUnit.getResourceId());
504                        } catch (ResourceException e) {
505                                return;
506                        }
507                        resource.handleEvent(new ResourceEvent(eventType, obj, scheduler.getFullName()));
508                }
509        }
510
511        protected void updateProcessingTimes() {
512                for (ExecTask execTask: jobRegistry.getRunningTasks()) {
513                        Executable exec = (Executable)execTask;
514
515                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getAllocatedResources().getLast().getResourceUnits();
516
517                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED),
518                                        execTask, choosenResources, exec.getExecutionProfile().getCompletionPercentage())).intValue();
519
520                        ExecutionHistoryItem execHistItem = exec.getExecutionHistory().getLast();
521                        double lastTimeStamp = execHistItem.getTimeStamp() / 1000;
522                        if(DoubleMath.subtract((lastTimeStamp + execHistItem.getEstimatedDuration()), (new DateTime().getMillis() / 1000 + phaseDuration)) == 0.0){
523                                continue;
524                        }
525                        //System.out.println("=== upadteTIme: " + Sim_system.sim_clock() );
526                        //System.out.println("execId: " + exec.getId() +  "; estimatedDuration " + execHistItem.getEstimatedDuration());
527                        //System.out.println("execId: " + exec.getId() + "; difference " + DoubleMath.subtract((lastTimeStamp + execHistItem.getEstimatedDuration()), (new DateTime().getMillis()/1000 + phaseDuration)));
528                        //System.out.println("completionPercantage: " + exec.getCompletionPercentage() + "; basic duration: " +exec.getResourceConsumptionProfile().getCurrentResourceConsumption().getDuration() +   "; phaseDuration: " +  phaseDuration);
529
530                        saveExecutionHistory(exec, exec.getExecutionProfile().getCompletionPercentage(), phaseDuration);
531                       
532                        if(exec.getExecutionProfile().isLast()){
533                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED);
534                                scheduler.sim_cancel(filter, null);
535                                scheduler.sendInternal(phaseDuration , DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
536                                //PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
537                                //notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
538                        } else{
539                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_CHANGED);
540                                scheduler.sim_cancel(filter, null);
541                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
542                                //PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
543                                //notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
544                        }
545                }
546        }       
547
548        protected void updateTaskExecutionPhase(ExecTask execTask) {
549
550                if (execTask.getStatus() == DCWormsTags.INEXEC) {
551                        Executable exec = (Executable)execTask;
552                       
553                        try {
554                                exec.setStatus(DCWormsTags.NEW_EXEC_PHASE);
555                        } catch (Exception e) {
556                               
557                        }
558
559                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getAllocatedResources().getLast().getResourceUnits();
560
561                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.TASK_EXECUTION_CHANGED),
562                                        execTask, choosenResources, exec.getExecutionProfile().getCompletionPercentage())).intValue();
563                       
564                        saveExecutionHistory(exec, exec.getExecutionProfile().getCompletionPercentage(), phaseDuration);
565                       
566                        if(exec.getExecutionProfile().isLast()){
567                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
568                        } else {
569                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
570                        }
571                       
572                        PEUnit peUnit = (PEUnit)exec.getAllocatedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
573                        updateComputingResources(peUnit, ResourceEventType.UTILIZATION_CHANGED, exec);
574                }
575        }       
576
577        protected double calculateTotalLoad() {
578
579                DCwormsAccumulator loadAcc = new DCwormsAccumulator();
580
581                for(ComputingResource compRes: getResourceManager().getResourcesOfType(StandardResourceType.Node)){
582                        loadAcc.add(compRes.getLoadInterface().getRecentUtilization().getValue());                             
583                }
584
585                return loadAcc.getMean();
586        }
587
588        private Map<ResourceUnitName, ResourceUnit> chooseResourcesForExecution(String resourceName,
589                        ExecTask task) {
590
591                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>();
592                LocalResourceManager resourceManager = getResourceManager();
593                if(resourceName != null){
594                        ComputingResource resource = resourceManager.getResourceByName(resourceName);
595                        if(resource == null){
596                                return null;
597                        }
598                        resourceManager = new LocalResourceManager(resource);
599                }
600
601                int cpuRequest;
602                try {
603                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
604                } catch (NoSuchFieldException e) {
605                        cpuRequest = 1;
606                }
607
608                if (cpuRequest != 0) {
609                       
610                        List<ResourceUnit> availableUnits = null;
611                        try {
612                                availableUnits = resourceManager.getPE();
613                        } catch (ResourceException e) {
614                                return null;
615                        }
616                       
617                        List<ResourceUnit> choosenPEUnits = new ArrayList<ResourceUnit>();
618                        for (int i = 0; i < availableUnits.size() && cpuRequest > 0; i++) {
619                                PEUnit peUnit = (PEUnit) availableUnits.get(i);
620                                if(peUnit.getFreeAmount() > 0){
621                                        int allocPE = Math.min(peUnit.getFreeAmount(), cpuRequest);
622                                        cpuRequest = cpuRequest - allocPE;
623                                        choosenPEUnits.add(peUnit.replicate(allocPE)); 
624                                }       
625                        }
626                       
627                        if(cpuRequest > 0){
628                                return null;
629                        }
630                        map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0));
631                }
632
633                return  map;
634        }
635       
636        public void notifySubmittedWorkloadUnit(WorkloadUnit wu) {
637                //250314
638                //updateProcessingProgress();
639                if (log.isInfoEnabled())
640                        log.info("Received job " + wu.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
641                registerWorkloadUnit(wu);
642        }
643
644        private void registerWorkloadUnit(WorkloadUnit wu){
645                if(!wu.isRegistered()){
646                        wu.register(jobRegistry);
647                }
648                wu.accept(getWorkloadUnitHandler());
649        }
650       
651        class LocalWorkloadUnitHandler implements WorkloadUnitHandler{
652               
653                public void handleJob(JobInterface<?> job){
654
655                        if (log.isInfoEnabled())
656                                log.info("Handle " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
657                       
658                        try {
659                                if(job.getStatus() == BrokerConstants.JOB_STATUS_UNSUBMITTED)
660                                        job.setStatus((int)BrokerConstants.JOB_STATUS_SUBMITTED);
661                        } catch (Exception e) {
662                                e.printStackTrace();
663                        }
664                        List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
665                        jobsList.add(job);
666                        TaskListImpl availableTasks = new TaskListImpl();
667                        for(Task task: jobRegistry.getAvailableTasks(jobsList)){
668                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
669                                availableTasks.add(task);
670                        }
671
672                        for(TaskInterface<?> task: availableTasks){     
673                                registerWorkloadUnit(task);
674                        }
675                }
676               
677                public void handleTask(TaskInterface<?> t){
678                        Task task = (Task)t;
679                        List<AbstractProcesses> processes = task.getProcesses();
680
681                        if(processes == null || processes.size() == 0){
682                                Executable exec = new Executable(task);
683                                registerWorkloadUnit(exec);
684                        } else {
685                                for(int j = 0; j < processes.size(); j++){
686                                        AbstractProcesses procesesSet = processes.get(j);
687                                        Executable exec = new Executable(task, procesesSet);
688                                        registerWorkloadUnit(exec);
689                                }
690                        }
691                }
692               
693                public void handleExecutable(ExecTask task){
694                       
695                        Executable exec = (Executable) task;
696                        jobRegistry.addExecTask(exec);
697
698                        exec.setSchedulerName(scheduler.getFullName());
699                       
700                        TaskList newTasks = new TaskListImpl();
701                        newTasks.add(exec);
702               
703                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
704
705                        if (exec.getStatus() == DCWormsTags.QUEUED) {
706                                sendExecutableReadyEvent(exec);
707                        }
708                }
709        }
710
711        public WorkloadUnitHandler getWorkloadUnitHandler() {
712                return new LocalWorkloadUnitHandler();
713        }
714       
715       
716        public LocalResourceManager getResourceManager() {
717                if (resourceManager instanceof ResourceManager)
718                        return (LocalResourceManager) resourceManager;
719                else
720                        return null;
721        }
722       
723       
724        private void saveExecutionHistory(Executable exec, double completionPercentage, double estimatedDuration){
725                ExecutionHistoryItem execHistoryItem = new ExecutionHistoryItem(new DateTime().getMillis());
726                execHistoryItem.setCompletionPercentage(completionPercentage);
727                execHistoryItem.setEstimatedDuration(estimatedDuration);
728                execHistoryItem.setResIndex(exec.getAllocatedResources().size() - 1);
729                execHistoryItem.setStatus(exec.getStatus());
730                exec.addExecHistory(execHistoryItem);
731        }
732       
733        public boolean allocateResources(Executable exec, Map<ResourceUnitName, ResourceUnit> choosenResources, boolean exclusive){
734                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources, exclusive);
735                if(allocationStatus){
736                        ResourceItem resourceHistoryItem = new ResourceItem(choosenResources);
737                        exec.addAllocatedResources(resourceHistoryItem);
738                        return true;
739                }
740                return false;
741        }
742       
743
744       
745}
Note: See TracBrowser for help on using the repository browser.