source: DCWoRMS/trunk/src/schedframe/scheduling/policy/local/LocalManagementSystem.java @ 822

Revision 822, 22.1 KB checked in by wojtekp, 12 years ago (diff)
  • Property svn:mime-type set to text/plain
RevLine 
[477]1package schedframe.scheduling.policy.local;
2
[490]3import dcworms.schedframe.scheduling.ExecTask;
4import dcworms.schedframe.scheduling.Executable;
[477]5import eduni.simjava.Sim_event;
6import eduni.simjava.Sim_system;
7import gridsim.Accumulator;
8import gridsim.ResourceCalendar;
[493]9import gridsim.dcworms.DCWormsTags;
10import gridsim.dcworms.filter.ExecTaskFilter;
[477]11
12import java.util.ArrayList;
13import java.util.HashMap;
14import java.util.Iterator;
15import java.util.List;
16import java.util.Map;
17
18import org.apache.commons.lang.ArrayUtils;
19import org.apache.commons.logging.Log;
20import org.apache.commons.logging.LogFactory;
21import org.joda.time.DateTime;
22import org.joda.time.DateTimeUtilsExt;
23import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
24
25import qcg.shared.constants.BrokerConstants;
26import schedframe.ResourceController;
27import schedframe.events.scheduling.SchedulingEvent;
28import schedframe.events.scheduling.SchedulingEventType;
29import schedframe.events.scheduling.StartTaskExecutionEvent;
30import schedframe.events.scheduling.TaskFinishedEvent;
31import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent;
32import schedframe.exceptions.ResourceException;
33import schedframe.resources.computing.ComputingResource;
34import schedframe.resources.computing.profiles.energy.EnergyEvent;
35import schedframe.resources.computing.profiles.energy.EnergyEventType;
36import schedframe.resources.units.PEUnit;
37import schedframe.resources.units.ProcessingElements;
38import schedframe.resources.units.ResourceUnit;
39import schedframe.resources.units.ResourceUnitName;
40import schedframe.resources.units.StandardResourceUnitName;
41import schedframe.scheduling.ResourceHistoryItem;
42import schedframe.scheduling.Scheduler;
[490]43import schedframe.scheduling.TaskList;
[481]44import schedframe.scheduling.TaskListImpl;
[490]45import schedframe.scheduling.UsedResourcesList;
[477]46import schedframe.scheduling.WorkloadUnitHandler;
47import schedframe.scheduling.manager.resources.LocalResourceManager;
48import schedframe.scheduling.manager.resources.ManagedResources;
49import schedframe.scheduling.manager.resources.ResourceManager;
50import schedframe.scheduling.plan.AllocationInterface;
51import schedframe.scheduling.plan.ScheduledTaskInterface;
52import schedframe.scheduling.plan.SchedulingPlanInterface;
53import schedframe.scheduling.plugin.SchedulingPlugin;
54import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
55import schedframe.scheduling.plugin.grid.ModuleListImpl;
56import schedframe.scheduling.plugin.grid.ModuleType;
57import schedframe.scheduling.policy.AbstractManagementSystem;
58import schedframe.scheduling.queue.TaskQueueList;
59import schedframe.scheduling.tasks.AbstractProcesses;
60import schedframe.scheduling.tasks.Job;
61import schedframe.scheduling.tasks.JobInterface;
62import schedframe.scheduling.tasks.Task;
63import schedframe.scheduling.tasks.TaskInterface;
64import schedframe.scheduling.tasks.WorkloadUnit;
[481]65import simulator.DCWormsConstants;
[477]66import simulator.utils.DoubleMath;
67
68public class LocalManagementSystem extends AbstractManagementSystem {
69
70        private Log log = LogFactory.getLog(LocalManagementSystem.class);
71
72        protected double lastUpdateTime;
73
74        protected Accumulator accTotalLoad;
75
76        public LocalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
77                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
78                        throws Exception {
79
80                super(providerId, entityName, execTimeEstimationPlugin, queues);
81               
82                if (schedPlugin == null) {
83                        throw new Exception("Can not create local scheduling plugin instance.");
84                }
85                this.schedulingPlugin =  schedPlugin;
[490]86                this.moduleList = new ModuleListImpl(1);
87               
88                this.accTotalLoad = new Accumulator();
[477]89        }
90
91        public void init(Scheduler sched, ManagedResources managedResources) {
92                super.init(sched, managedResources);
93                double load = 0;
94                accTotalLoad.add(load);
95        }
96
97        public void processEvent(Sim_event ev) {
98
99                updateProcessingProgress();
100
101                int tag = ev.get_tag();
102
103                switch (tag) {
104
[481]105                case DCWormsTags.TIMER:
[477]106                        if (pluginSupportsEvent(tag)) {
107                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER);
[481]108                                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
[477]109                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
110                                executeSchedulingPlan(decision);
111                        }
112                        sendTimerEvent();
113                        break;
114
[481]115                case DCWormsTags.TASK_READY_FOR_EXECUTION:
[490]116                        ExecTask execTask = (ExecTask) ev.get_data();
[477]117                        try {
[490]118                                execTask.setStatus(DCWormsTags.READY);
[477]119                                if (pluginSupportsEvent(tag)) {
[490]120                                        SchedulingEvent event = new StartTaskExecutionEvent(execTask.getJobId(), execTask.getId());
[481]121                                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
[477]122                                                        queues,  getJobRegistry(), getResourceManager(), moduleList);
123                                        executeSchedulingPlan(decision);
124                                }
125                        } catch (Exception e) {
126                                e.printStackTrace();
127                        }
128                        break;
129
[481]130                case DCWormsTags.TASK_EXECUTION_FINISHED:
[822]131                        execTask = (ExecTask) ev.get_data();
[490]132                        if (execTask.getStatus() == DCWormsTags.INEXEC) {
133                               
134                                finalizeExecutable(execTask);
135                                sendFinishedWorkloadUnit(execTask);
136                                log.debug(execTask.getJobId() + "_" + execTask.getId() + " finished execution on " + new DateTime());
[481]137                                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
[490]138                                if (pluginSupportsEvent(tag)) {
139                                        SchedulingEvent event = new TaskFinishedEvent(execTask.getJobId(), execTask.getId());
140                                        SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
141                                                        queues, getJobRegistry(), getResourceManager(), moduleList);
142                                        executeSchedulingPlan(decision);
[477]143                                }
144                        }
[490]145
146                        Job job = jobRegistry.getJob(execTask.getJobId());
[477]147                        if(!job.isFinished()){
148                                getWorkloadUnitHandler().handleJob(job);
149                        }
150                        break;
[490]151                       
[481]152                case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED:
[822]153                        execTask = (Executable) ev.get_data();
[477]154                        if (pluginSupportsEvent(tag)) {
[490]155                                SchedulingEvent event = new TaskRequestedTimeExpiredEvent(execTask.getJobId(), execTask.getId());
[481]156                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
[477]157                                                queues, getJobRegistry(), getResourceManager(), moduleList);
158                                executeSchedulingPlan(decision);
159                        }
160                        break;
[490]161                       
[822]162                case DCWormsTags.UPDATE_PROCESSING:
[477]163                        updateProcessingTimes(ev);
164                        break;
[822]165                       
166                case DCWormsTags.PHASE_CHANGED:
167                        execTask = (ExecTask) ev.get_data();
168                        //updatePhases(execTask);
169                        break;
[477]170                }
171        }
172
[478]173        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
[481]174                if (pluginSupportsEvent(DCWormsTags.TASK_EXECUTION_FINISHED)) {
[477]175                        SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
[481]176                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
[477]177                                        queues, getJobRegistry(), getResourceManager(), moduleList);
178                        executeSchedulingPlan(decision);
179                }
180                //if(scheduler.getParent() != null){
181                        sendFinishedWorkloadUnit(wu);
182                //}
183        }
184       
[481]185        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
[477]186
[481]187                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
[477]188                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
[481]189                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
[477]190
[481]191                        if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
192                                continue;
193                        }
[477]194
[481]195                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
[477]196
[481]197                        TaskInterface<?> task = taskDecision.getTask();
198                        for (int j = 0; j < allocations.size(); j++) {
[477]199
[481]200                                AllocationInterface<?> allocation = allocations.get(j);
[555]201                                if (allocation.getRequestedResources() == null || allocation.getRequestedResources().size() > 0) {
[481]202                                        ExecTask exec = (ExecTask) task;                                       
203                                        executeTask(exec, allocation.getRequestedResources());
204                                } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){
205                                        allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName()));
206                                        submitTask(task, allocation);
207                                } else {
208                                        ExecTask exec = (ExecTask) task;
[490]209                                        executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), exec));
[477]210                                }
[481]211                        }
[477]212                }
213        }
214
215        protected void executeTask(ExecTask task, Map<ResourceUnitName, ResourceUnit> choosenResources) {
[481]216
217                Executable exec = (Executable)task;
[477]218                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources);
[555]219                if(allocationStatus == false){
[822]220                        log.info("Task " + task.getJobId() + "_" + task.getId() + " requires more resources than is available at this moment.");
[477]221                        return;
[555]222                }
223
[477]224                removeFromQueue(task);
[490]225
[477]226                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION);
227                int time = Double.valueOf(
[481]228                                execTimeEstimationPlugin.execTimeEstimation(event, task, choosenResources, exec.getCompletionPercentage())).intValue();
[477]229                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime()
230                                + " will finish after " + time);
231
232                if (time < 0.0)
233                        return;
234
[481]235                exec.setEstimatedDuration(time);
[477]236                DateTime currentTime = new DateTime();
237                ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime);
[481]238                exec.addUsedResources(resHistItem);
[490]239                try {
240                        exec.setStatus(DCWormsTags.INEXEC);
241                } catch (Exception e) {
242                        // TODO Auto-generated catch block
243                        e.printStackTrace();
244                }
[822]245               
246                //updatePhases(exec);
[490]247                scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, exec);
[477]248
249                try {
[481]250                        long expectedDuration = exec.getExpectedDuration().getMillis() / 1000;
251                        scheduler.sendInternal(expectedDuration, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
[477]252                } catch (NoSuchFieldException e) {
[481]253                        double t = exec.getEstimatedDuration();
254                        scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
[477]255                }
[490]256
[481]257                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
[477]258               
259                PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE);
[490]260               
261                notifyComputingResources(peUnit, EnergyEventType.TASK_STARTED, exec);
262               
263                /*if(peUnit instanceof ProcessingElements){
[477]264                        ProcessingElements pes = (ProcessingElements) peUnit;
265                        for (ComputingResource resource : pes) {
[490]266                                resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, exec));
[477]267                        }
268                } else {
269                        ComputingResource resource = null;
270                        try {
271                                resource = ResourceController.getComputingResourceByName(peUnit.getResourceId());
272                        } catch (ResourceException e) {
[490]273                                return;
[477]274                        }
[490]275                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, exec));
[477]276                }
[490]277*/
[477]278
279                /*for(ExecTaskInterface etask : jobRegistry.getRunningTasks()){
280                        System.out.println(etask.getJobId());
281                        for(String taskId: etask.getVisitedResources())
282                                System.out.println("====="+taskId);
283                }*/
284        }
285       
[481]286        public void finalizeExecutable(ExecTask execTask){
[477]287               
[481]288                Executable exec = (Executable)execTask;
289                exec.finalizeExecutable();
[477]290               
[481]291                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED);
[477]292                scheduler.sim_cancel(filter, null);
293               
[490]294                Task task;
[481]295                Job job = jobRegistry.getJob(exec.getJobId());
[477]296                try {
[481]297                        task = job.getTask(exec.getTaskId());
[477]298                } catch (NoSuchFieldException e) {
[490]299                        return;
[477]300                }
[481]301                if(exec.getProcessesId() == null){
[477]302                        try {
[481]303                                task.setStatus(exec.getStatus());
[477]304                        } catch (Exception e) {
305                                e.printStackTrace();
306                        }
307                } else {
308                        List<AbstractProcesses> processesList = task.getProcesses();
309                        for(int i = 0; i < processesList.size(); i++){
310                                AbstractProcesses processes = processesList.get(i);
[481]311                                if(processes.getId().equals(exec.getProcessesId())){
312                                        processes.setStatus(exec.getStatus());
[477]313                                        break;
314                                }
315                        }
316                }
[490]317               
318                UsedResourcesList lastUsedList = exec.getUsedResources();
319                Map<ResourceUnitName, ResourceUnit> lastUsed = lastUsedList.getLast()
320                                .getResourceUnits();
321                getAllocationManager().freeResources(lastUsed);
322               
323                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
324                notifyComputingResources(peUnit, EnergyEventType.TASK_FINISHED, exec);
325                /*if(peUnit instanceof ProcessingElements){
326                        ProcessingElements pes = (ProcessingElements) peUnit;
327                        for (ComputingResource resource : pes) {
328                                resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec));
329                        }
330                } else {
331                        ComputingResource resource = null;
332                        try {
333                                resource = ResourceController.getComputingResourceByName(peUnit.getResourceId());
334                        } catch (ResourceException e) {
335                                return;
336                        }
337                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec));
338                }*/
339
[477]340                //sendFinishedWorkloadUnit(executable);
341        }
342       
343        protected void updateProcessingProgress() {
344                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
345                if (timeSpan <= 0.0) {
346                        // don't update when nothing changed
347                        return;
348                }
349                lastUpdateTime = Sim_system.clock();
350                Iterator<ExecTask> iter = jobRegistry.getRunningTasks().iterator();
351                while (iter.hasNext()) {
352                        ExecTask task = iter.next();
[481]353                        Executable exec = (Executable)task;
[678]354                        //exec.setCompletionPercentage(exec.getCompletionPercentage() + 100 * timeSpan/exec.getEstimatedDuration());
[822]355                        exec.setCompletionPercentage(exec.getCompletionPercentage() + (100 - exec.getCompletionPercentage()) * timeSpan/(exec.getEstimatedDuration() - new DateTime().getMillis()/1000 + exec.getExecStartTime() + timeSpan));
[497]356                       
[490]357                        UsedResourcesList usedResourcesList = exec.getUsedResources();
358                        PEUnit peUnit = (PEUnit)usedResourcesList.getLast().getResourceUnits()
[477]359                                        .get(StandardResourceUnitName.PE);
[490]360                        double load = getMIShare(timeSpan, peUnit);
[477]361                        addTotalLoad(load);
362                }
363        }
[767]364       
[490]365        private void notifyComputingResources(PEUnit peUnit, EnergyEventType eventType, Object obj){
[477]366
[490]367                if(peUnit instanceof ProcessingElements){
368                        ProcessingElements pes = (ProcessingElements) peUnit;
369                        for (ComputingResource resource : pes) {
370                                resource.handleEvent(new EnergyEvent(eventType, obj));
371                        }
[822]372                        /*try {
373                                for (ComputingResource resource : resourceManager.getResourcesOfType(pes.get(0).getType())) {
374                                        resource.handleEvent(new EnergyEvent(eventType, obj));
375                                }
376                        } catch (ResourceException e) {
377                                // TODO Auto-generated catch block
378                                e.printStackTrace();
379                        }*/
[490]380                } else {
381                        ComputingResource resource = null;
382                        try {
383                                resource = ResourceController.getComputingResourceByName(peUnit.getResourceId());
384                        } catch (ResourceException e) {
385                                return;
386                        }
387                        resource.handleEvent(new EnergyEvent(eventType, obj));
388                }
389        }
390       
[477]391        private double getMIShare(double timeSpan, PEUnit pes) {
392                double localLoad;
393                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
394                if (resCalendar == null)
395                        localLoad = 0;
396                else
397                        // 1 - localLoad_ = available MI share percentage
398                        localLoad = resCalendar.getCurrentLoad();
399
400                int speed = pes.getSpeed();
401                int cnt = pes.getAmount();
402
403                double totalMI = speed * cnt * timeSpan * (1 - localLoad);
404                return totalMI;
405        }
406
407        protected void updateProcessingTimes(Sim_event ev) {
408                updateProcessingProgress();
[490]409                for (ExecTask execTask : jobRegistry.getRunningTasks()) {
410                        Executable exec = (Executable)execTask;
[481]411                        List<String> visitedResource = exec.getVisitedResources();
[477]412                        String originResource = ev.get_data().toString();
413                        if(!ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), originResource)){
414                                continue;
415                        }
416                       
[481]417                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getUsedResources().getLast().getResourceUnits();
[763]418                        int time = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED),
[497]419                                        execTask, choosenResources, exec.getCompletionPercentage())).intValue();
[477]420
421                        //check if the new estimated end time is equal to the previous one; if yes the continue without update
[763]422                        if(DoubleMath.subtract((exec.getExecStartTime() + exec.getEstimatedDuration()), (new DateTime().getMillis()/1000 + time)) == 0.0){
[477]423                                continue;
424                        }
[678]425                        //exec.setEstimatedDuration(time);
426                        exec.setEstimatedDuration(Long.valueOf(new DateTime().getMillis()/1000).intValue() - Double.valueOf(exec.getExecStartTime()).intValue() + time);
[481]427                        ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED);
[477]428                        scheduler.sim_cancel(filter, null);
[490]429                        scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
[477]430                }
431        }       
[822]432       
433       
434        /*protected void updatePhases(ExecTask execTask) {
435                updateProcessingProgress();
[477]436
[822]437                if (execTask.getStatus() == DCWormsTags.INEXEC) {
438                        Executable exec = (Executable)execTask;
439
440                        double phaseLength = 0;
441                        try{
442                                phaseLength = exec.getResourceConsumptionProfile().getResourceConsumption(exec.getCurrentPhase()).getResourceConsumptionTypeChoice().getPercentage()/100;
443                        } catch(Exception e){
444                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED);
445                                scheduler.sim_cancel(filter, null);
446                                double t = DoubleMath.subtract((exec.getExecStartTime() + exec.getEstimatedDuration()), (new DateTime().getMillis()/1000));
447                                scheduler.sendInternal(t, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
448                                PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
449                                notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
450                        }
451                        if(phaseLength != 0){                   
452                                exec.setCurrentPhase(exec.getCurrentPhase() + 1);
453                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.PHASE_CHANGED);
454                                scheduler.sim_cancel(filter, null);
455                                scheduler.sendInternal(phaseLength * exec.getEstimatedDuration(), DCWormsTags.PHASE_CHANGED, execTask);
456                                PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
457                                notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
458                        }
459                        System.out.println("===" + exec.getJobId() + ":" + phaseLength);
460                }
461        }       */
462
[477]463        public double calculateTotalLoad(int size) {
464                // background load, defined during initialization
465                double load;
466                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
467                if (resCalendar == null)
468                        load = 0;
469                else
470                        load = resCalendar.getCurrentLoad();
471
472                int numberOfPE = 0;
473                try {
474                        for(ResourceUnit resUnit : getResourceManager().getPE()){
475                                numberOfPE = numberOfPE + resUnit.getAmount();
476                        }
477                } catch (Exception e) {
478                        numberOfPE = 1;
479                }
480                double tasksPerPE = (double) size / numberOfPE;
481                load += Math.min(1.0 - load, tasksPerPE);
482
483                return load;
484        }
485
486        public Accumulator getTotalLoad() {
487                return accTotalLoad;
488        }
489
490        protected void addTotalLoad(double load) {
491                accTotalLoad.add(load);
492        }
493       
494        private Map<ResourceUnitName, ResourceUnit> chooseResourcesForExecution(String resourceName,
495                        ExecTask task) {
496
[490]497                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>();
[497]498                LocalResourceManager resourceManager = getResourceManager();
[477]499                if(resourceName != null){
500                        ComputingResource resource = null;
501                        try {
502                                resource = resourceManager.getResourceByName(resourceName);
503                        } catch (ResourceException e) {
504                                return null;
505                        }
506                        resourceManager = new LocalResourceManager(resource);
507                }
508
509                int cpuRequest;
510                try {
511                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
512                } catch (NoSuchFieldException e) {
513                        cpuRequest = 1;
514                }
515
516                if (cpuRequest != 0) {
517                       
518                        List<ResourceUnit> availableUnits = null;
519                        try {
[497]520                                availableUnits = resourceManager.getPE();
[477]521                        } catch (ResourceException e) {
522                                return null;
523                        }
[490]524                       
[477]525                        List<ResourceUnit> choosenPEUnits = new ArrayList<ResourceUnit>();
526                        for (int i = 0; i < availableUnits.size() && cpuRequest > 0; i++) {
[763]527                                PEUnit peUnit = (PEUnit) availableUnits.get(i);
[477]528                                if(peUnit.getFreeAmount() > 0){
[763]529                                        int allocPE = Math.min(peUnit.getFreeAmount(), cpuRequest);
[477]530                                        cpuRequest = cpuRequest - allocPE;
531                                        choosenPEUnits.add(peUnit.replicate(allocPE)); 
532                                }       
533                        }
534                       
535                        if(cpuRequest > 0){
536                                return null;
537                        }
538                        map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0));
539                }
540
541                return  map;
542        }
543       
[481]544        public void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack) {
[477]545                updateProcessingProgress();
[481]546                registerWorkloadUnit(wu);
[477]547        }
548
[478]549        private void registerWorkloadUnit(WorkloadUnit wu){
[477]550                if(!wu.isRegistered()){
551                        wu.register(jobRegistry);
552                }
553                wu.accept(getWorkloadUnitHandler());
554        }
555       
556        class LocalWorkloadUnitHandler implements WorkloadUnitHandler{
557               
[490]558                public void handleJob(JobInterface<?> job){
[477]559
560                        if (log.isInfoEnabled())
561                                log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
562
563                        List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
564                        jobsList.add(job);
[490]565                        TaskListImpl availableTasks = new TaskListImpl();
[481]566                        for(Task task: jobRegistry.getAvailableTasks(jobsList)){
[477]567                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
[490]568                                availableTasks.add(task);
[477]569                        }
570
[490]571                        for(TaskInterface<?> task: availableTasks){     
572                                registerWorkloadUnit(task);
[477]573                        }
574                }
575               
[490]576                public void handleTask(TaskInterface<?> t){
577                        Task task = (Task)t;
[481]578                        List<AbstractProcesses> processes = task.getProcesses();
579
580                        if(processes == null || processes.size() == 0){
[477]581                                Executable exec = new Executable(task);
582                                registerWorkloadUnit(exec);
583                        } else {
[481]584                                for(int j = 0; j < processes.size(); j++){
585                                        AbstractProcesses procesesSet = processes.get(j);
586                                        Executable exec = new Executable(task, procesesSet);
[477]587                                        registerWorkloadUnit(exec);
588                                }
589                        }
590                }
591               
592                public void handleExecutable(ExecTask task){
[490]593                       
[477]594                        Executable exec = (Executable) task;
[490]595                        jobRegistry.addExecTask(exec);
596                       
597                        exec.trackResource(scheduler.get_name());
[477]598                        Scheduler parentScheduler = scheduler.getParent();
[490]599                        List<String> visitedResource = exec.getVisitedResources();
600                        String [] visitedResourcesArray = visitedResource.toArray(new String[visitedResource.size()]);
601                        while (parentScheduler != null && !ArrayUtils.contains(visitedResourcesArray, parentScheduler.get_name())) {
602                                exec.trackResource(parentScheduler.get_name());
[477]603                                parentScheduler = parentScheduler.getParent();
604                        }
[490]605                        exec.setSchedulerName(scheduler.get_id());
[477]606                       
[490]607                        TaskList newTasks = new TaskListImpl();
[481]608                        newTasks.add(exec);
[477]609               
[481]610                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
[477]611
[481]612                        if (exec.getStatus() == DCWormsTags.QUEUED) {
[477]613                                sendExecutableReadyEvent(exec);
614                        }
615                }
616        }
617
618        public WorkloadUnitHandler getWorkloadUnitHandler() {
619                return new LocalWorkloadUnitHandler();
620        }
621       
622       
623        public LocalResourceManager getResourceManager() {
624                if (resourceManager instanceof ResourceManager)
625                        return (LocalResourceManager) resourceManager;
626                else
627                        return null;
628        }
629       
630}
Note: See TracBrowser for help on using the repository browser.