source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/policy/local/LocalManagementSystem.java @ 1207

Revision 1207, 21.7 KB checked in by wojtekp, 11 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.local;
2
3import dcworms.schedframe.scheduling.ExecTask;
4import dcworms.schedframe.scheduling.Executable;
5import eduni.simjava.Sim_event;
6import eduni.simjava.Sim_system;
7import gridsim.Accumulator;
8import gridsim.ResourceCalendar;
9import gridsim.dcworms.DCWormsTags;
10import gridsim.dcworms.filter.ExecTaskFilter;
11
12import java.util.ArrayList;
13import java.util.HashMap;
14import java.util.Iterator;
15import java.util.List;
16import java.util.Map;
17
18import org.apache.commons.lang.ArrayUtils;
19import org.apache.commons.logging.Log;
20import org.apache.commons.logging.LogFactory;
21import org.joda.time.DateTime;
22import org.joda.time.DateTimeUtilsExt;
23import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
24
25import qcg.shared.constants.BrokerConstants;
26import schedframe.SimulatedEnvironment;
27import schedframe.events.scheduling.SchedulingEvent;
28import schedframe.events.scheduling.SchedulingEventType;
29import schedframe.events.scheduling.StartTaskExecutionEvent;
30import schedframe.events.scheduling.TaskFinishedEvent;
31import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent;
32import schedframe.exceptions.ResourceException;
33import schedframe.resources.computing.ComputingResource;
34import schedframe.resources.computing.profiles.energy.EnergyEvent;
35import schedframe.resources.computing.profiles.energy.EnergyEventType;
36import schedframe.resources.units.PEUnit;
37import schedframe.resources.units.ProcessingElements;
38import schedframe.resources.units.ResourceUnit;
39import schedframe.resources.units.ResourceUnitName;
40import schedframe.resources.units.StandardResourceUnitName;
41import schedframe.scheduling.ResourceHistoryItem;
42import schedframe.scheduling.Scheduler;
43import schedframe.scheduling.TaskList;
44import schedframe.scheduling.TaskListImpl;
45import schedframe.scheduling.UsedResourcesList;
46import schedframe.scheduling.WorkloadUnitHandler;
47import schedframe.scheduling.manager.resources.LocalResourceManager;
48import schedframe.scheduling.manager.resources.ManagedResources;
49import schedframe.scheduling.manager.resources.ResourceManager;
50import schedframe.scheduling.plan.AllocationInterface;
51import schedframe.scheduling.plan.ScheduledTaskInterface;
52import schedframe.scheduling.plan.SchedulingPlanInterface;
53import schedframe.scheduling.plugin.SchedulingPlugin;
54import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
55import schedframe.scheduling.plugin.grid.ModuleListImpl;
56import schedframe.scheduling.plugin.grid.ModuleType;
57import schedframe.scheduling.policy.AbstractManagementSystem;
58import schedframe.scheduling.queue.TaskQueueList;
59import schedframe.scheduling.tasks.AbstractProcesses;
60import schedframe.scheduling.tasks.Job;
61import schedframe.scheduling.tasks.JobInterface;
62import schedframe.scheduling.tasks.Task;
63import schedframe.scheduling.tasks.TaskInterface;
64import schedframe.scheduling.tasks.WorkloadUnit;
65import simulator.DCWormsConstants;
66import simulator.utils.DoubleMath;
67
68public class LocalManagementSystem extends AbstractManagementSystem {
69
70        private Log log = LogFactory.getLog(LocalManagementSystem.class);
71
72        protected double lastUpdateTime;
73
74        protected Accumulator accTotalLoad;
75
76        public LocalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
77                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
78                        throws Exception {
79
80                super(providerId, entityName, execTimeEstimationPlugin, queues);
81               
82                if (schedPlugin == null) {
83                        throw new Exception("Can not create local scheduling plugin instance.");
84                }
85                this.schedulingPlugin =  schedPlugin;
86                this.moduleList = new ModuleListImpl(1);
87               
88                this.accTotalLoad = new Accumulator();
89        }
90
91        public void init(Scheduler sched, ManagedResources managedResources) {
92                super.init(sched, managedResources);
93                double load = 0;
94                accTotalLoad.add(load);
95        }
96
97        public void processEvent(Sim_event ev) {
98
99                updateProcessingProgress();
100
101                int tag = ev.get_tag();
102
103                switch (tag) {
104
105                case DCWormsTags.TIMER:
106                        if (pluginSupportsEvent(tag)) {
107                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER);
108                                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
109                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
110                                executeSchedulingPlan(decision);
111                        }
112                        sendTimerEvent();
113                        break;
114
115                case DCWormsTags.TASK_READY_FOR_EXECUTION:
116                        ExecTask execTask = (ExecTask) ev.get_data();
117                        try {
118                                execTask.setStatus(DCWormsTags.READY);
119                                if (pluginSupportsEvent(tag)) {
120                                        SchedulingEvent event = new StartTaskExecutionEvent(execTask.getJobId(), execTask.getId());
121                                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
122                                                        queues,  getJobRegistry(), getResourceManager(), moduleList);
123                                        executeSchedulingPlan(decision);
124                                }
125                        } catch (Exception e) {
126                                e.printStackTrace();
127                        }
128                        break;
129
130                case DCWormsTags.TASK_EXECUTION_FINISHED:
131                        execTask = (ExecTask) ev.get_data();
132                        if (execTask.getStatus() == DCWormsTags.INEXEC) {
133                               
134                                finalizeExecutable(execTask);
135                                sendFinishedWorkloadUnit(execTask);
136                                log.debug(execTask.getJobId() + "_" + execTask.getId() + " finished execution on " + new DateTime());
137                                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
138                                if (pluginSupportsEvent(tag)) {
139                                        SchedulingEvent event = new TaskFinishedEvent(execTask.getJobId(), execTask.getId());
140                                        SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
141                                                        queues, getJobRegistry(), getResourceManager(), moduleList);
142                                        executeSchedulingPlan(decision);
143                                }
144                        }
145
146                        Job job = jobRegistry.getJob(execTask.getJobId());
147                        if(!job.isFinished()){
148                                getWorkloadUnitHandler().handleJob(job);
149                        }
150                        //SUPPORTS PRECEDING CONSTRAINST COMING BOTH FROM SWF FILES
151                        /*else {
152                                try {
153                                        job.setStatus((int)BrokerConstants.JOB_STATUS_FINISHED);
154                                } catch (Exception e) {
155
156                                }
157                                for(JobInterface<?> j: jobRegistry.getJobs((int)BrokerConstants.JOB_STATUS_SUBMITTED)){
158                                        getWorkloadUnitHandler().handleJob(j); 
159                                }
160                        }*/
161                        break;
162                       
163                case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED:
164                        execTask = (Executable) ev.get_data();
165                        if (pluginSupportsEvent(tag)) {
166                                SchedulingEvent event = new TaskRequestedTimeExpiredEvent(execTask.getJobId(), execTask.getId());
167                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
168                                                queues, getJobRegistry(), getResourceManager(), moduleList);
169                                executeSchedulingPlan(decision);
170                        }
171                        break;
172                       
173                case DCWormsTags.UPDATE_PROCESSING:
174                        updateProcessingTimes(ev);
175                        break;
176                       
177                case DCWormsTags.TASK_EXECUTION_CHANGED:
178                        execTask = (ExecTask) ev.get_data();
179                        updateTaskExecution(execTask, SchedulingEventType.RESOURCE_STATE_CHANGED);
180                        break;
181                }
182        }
183
184        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
185                if (pluginSupportsEvent(DCWormsTags.TASK_EXECUTION_FINISHED)) {
186                        SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
187                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
188                                        queues, getJobRegistry(), getResourceManager(), moduleList);
189                        executeSchedulingPlan(decision);
190                }
191                //if(scheduler.getParent() != null){
192                        sendFinishedWorkloadUnit(wu);
193                //}
194        }
195       
196        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
197
198                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
199                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
200                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
201
202                        if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
203                                continue;
204                        }
205
206                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
207
208                        TaskInterface<?> task = taskDecision.getTask();
209                        for (int j = 0; j < allocations.size(); j++) {
210
211                                AllocationInterface<?> allocation = allocations.get(j);
212                                if (allocation.getRequestedResources() == null || allocation.getRequestedResources().size() > 0) {
213                                        ExecTask exec = (ExecTask) task;                                       
214                                        executeTask(exec, allocation.getRequestedResources());
215                                } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){
216                                        allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName()));
217                                        submitTask(task, allocation);
218                                } else {
219                                        ExecTask exec = (ExecTask) task;
220                                        executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), exec));
221                                }
222                        }
223                }
224        }
225
226        protected void executeTask(ExecTask task, Map<ResourceUnitName, ResourceUnit> choosenResources) {
227
228                Executable exec = (Executable)task;
229                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources);
230                if(allocationStatus == false){
231                        log.info("Task " + task.getJobId() + "_" + task.getId() + " requires more resources than is available at this moment.");
232                        return;
233                }
234
235                removeFromQueue(task);
236
237                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime());
238
239                //if (phaseDuration < 0.0)
240                //      return;
241
242                //exec.setEstimatedDuration(exec.getEstimatedDuration() + phaseDuration);
243                DateTime currentTime = new DateTime();
244                ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime);
245                exec.addUsedResources(resHistItem);
246               
247                try {
248                        exec.setStatus(DCWormsTags.INEXEC);
249                } catch (Exception e) {
250                        // TODO Auto-generated catch block
251                        e.printStackTrace();
252                }
253               
254                updateTaskExecution(exec, SchedulingEventType.START_TASK_EXECUTION);
255                //scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, exec);
256
257                try {
258                        long expectedDuration = exec.getExpectedDuration().getMillis() / 1000;
259                        scheduler.sendInternal(expectedDuration, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
260                } catch (NoSuchFieldException e) {
261                        //double t = exec.getEstimatedDuration();
262                        //scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
263                }
264
265                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
266               
267                PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE);
268               
269                notifyComputingResources(peUnit, EnergyEventType.TASK_STARTED, exec);
270               
271        }
272       
273        protected void finalizeExecutable(ExecTask execTask){
274               
275                Executable exec = (Executable)execTask;
276                exec.finalizeExecutable();
277               
278                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED);
279                scheduler.sim_cancel(filter, null);
280               
281                Task task;
282                Job job = jobRegistry.getJob(exec.getJobId());
283                try {
284                        task = job.getTask(exec.getTaskId());
285                } catch (NoSuchFieldException e) {
286                        return;
287                }
288                if(exec.getProcessesId() == null){
289                        try {
290                                task.setStatus(exec.getStatus());
291                        } catch (Exception e) {
292                                e.printStackTrace();
293                        }
294                } else {
295                        List<AbstractProcesses> processesList = task.getProcesses();
296                        for(int i = 0; i < processesList.size(); i++){
297                                AbstractProcesses processes = processesList.get(i);
298                                if(processes.getId().equals(exec.getProcessesId())){
299                                        processes.setStatus(exec.getStatus());
300                                        break;
301                                }
302                        }
303                }
304               
305                UsedResourcesList lastUsedList = exec.getUsedResources();
306                Map<ResourceUnitName, ResourceUnit> lastUsed = lastUsedList.getLast()
307                                .getResourceUnits();
308                getAllocationManager().freeResources(lastUsed);
309               
310                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
311                notifyComputingResources(peUnit, EnergyEventType.TASK_FINISHED, exec);
312        }
313       
314        protected void updateProcessingProgress() {
315                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
316                if (timeSpan <= 0.0) {
317                        // don't update when nothing changed
318                        return;
319                }
320                lastUpdateTime = Sim_system.clock();
321                Iterator<ExecTask> iter = jobRegistry.getRunningTasks().iterator();
322                while (iter.hasNext()) {
323                        ExecTask task = iter.next();
324                        Executable exec = (Executable)task;
325                        exec.setCompletionPercentage(exec.getCompletionPercentage() + 100 * (timeSpan / exec.getEstimatedDuration()));
326                        UsedResourcesList usedResourcesList = exec.getUsedResources();
327                        PEUnit peUnit = (PEUnit)usedResourcesList.getLast().getResourceUnits()
328                                        .get(StandardResourceUnitName.PE);
329                        double load = getMIShare(timeSpan, peUnit);
330                        addTotalLoad(load);
331                }
332        }
333       
334        private void notifyComputingResources(PEUnit peUnit, EnergyEventType eventType, Object obj){
335                if(peUnit instanceof ProcessingElements){
336                        ProcessingElements pes = (ProcessingElements) peUnit;
337                        for (ComputingResource resource : pes) {
338                                resource.handleEvent(new EnergyEvent(eventType, obj));
339                                //DataCenterWorkloadSimulator.getEventManager().sendToResources(resource.getType(), 0, new EnergyEvent(eventType, obj));
340                        }
341                        /*try {
342                                for (ComputingResource resource : resourceManager.getResourcesOfType(pes.get(0).getType())) {
343                                        resource.handleEvent(new EnergyEvent(eventType, obj));
344                                }
345                        } catch (ResourceException e) {
346                                // TODO Auto-generated catch block
347                                e.printStackTrace();
348                        }*/
349                } else {
350                        ComputingResource resource = null;
351                        try {
352                                resource = SimulatedEnvironment.getComputingResourceByName(peUnit.getResourceId());
353                        } catch (ResourceException e) {
354                                return;
355                        }
356                        resource.handleEvent(new EnergyEvent(eventType, obj));
357                }
358        }
359       
360        private double getMIShare(double timeSpan, PEUnit pes) {
361                double localLoad;
362                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
363                if (resCalendar == null)
364                        localLoad = 0;
365                else
366                        // 1 - localLoad_ = available MI share percentage
367                        localLoad = resCalendar.getCurrentLoad();
368
369                int speed = pes.getSpeed();
370                int cnt = pes.getAmount();
371
372                double totalMI = speed * cnt * timeSpan * (1 - localLoad);
373                return totalMI;
374        }
375
376        protected void updateProcessingTimes(Sim_event ev) {
377                updateProcessingProgress();
378                for (ExecTask execTask : jobRegistry.getRunningTasks()) {
379                        Executable exec = (Executable)execTask;
380
381                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getUsedResources().getLast().getResourceUnits();
382                        double lastTimeStamp = exec.getUsedResources().getLast().getTimeStamp().getMillis()/1000;
383                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED),
384                                        execTask, choosenResources, exec.getCompletionPercentage())).intValue();
385
386                        if(DoubleMath.subtract((lastTimeStamp + exec.getEstimatedDuration()), (new DateTime().getMillis()/1000 + phaseDuration)) == 0.0){
387                                return;
388                        }
389
390                        exec.setEstimatedDuration(phaseDuration);
391                        DateTime currentTime = new DateTime();
392                        ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime);
393                        exec.addUsedResources(resHistItem);
394                       
395                        if(exec.getResourceConsumptionProfile().getCurrentResourceConsumption() == exec.getResourceConsumptionProfile().getResourceConsumptionList().getLast()){
396                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED);
397                                scheduler.sim_cancel(filter, null);
398                                scheduler.sendInternal(phaseDuration , DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
399                                PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
400                                notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
401                        } else{
402                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_CHANGED);
403                                scheduler.sim_cancel(filter, null);
404                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
405                                PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
406                                notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
407                        }
408                }
409        }       
410
411        protected void updateTaskExecution(ExecTask execTask, SchedulingEventType schedEvType) {
412
413                if (execTask.getStatus() == DCWormsTags.INEXEC) {
414                        Executable exec = (Executable)execTask;
415                       
416                        try {
417                                exec.setStatus(DCWormsTags.NEW_EXEC_PHASE);
418                        } catch (Exception e) {
419                        }
420                       
421                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getUsedResources().getLast().getResourceUnits();
422
423                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(schedEvType),
424                                        execTask, choosenResources, exec.getCompletionPercentage())).intValue();
425
426                        exec.setEstimatedDuration(phaseDuration);
427                       
428                        if(exec.getResourceConsumptionProfile().getCurrentResourceConsumption() == exec.getResourceConsumptionProfile().getResourceConsumptionList().getLast()){
429                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
430                                PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
431                                notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
432                        } else {
433                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
434                                PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
435                                notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
436                        }
437                }
438        }       
439
440        public double calculateTotalLoad(int size) {
441                // background load, defined during initialization
442                double load;
443                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
444                if (resCalendar == null)
445                        load = 0;
446                else
447                        load = resCalendar.getCurrentLoad();
448
449                int numberOfPE = 0;
450                try {
451                        for(ResourceUnit resUnit : getResourceManager().getPE()){
452                                numberOfPE = numberOfPE + resUnit.getAmount();
453                        }
454                } catch (Exception e) {
455                        numberOfPE = 1;
456                }
457                double tasksPerPE = (double) size / numberOfPE;
458                load += Math.min(1.0 - load, tasksPerPE);
459
460                return load;
461        }
462
463        public Accumulator getTotalLoad() {
464                return accTotalLoad;
465        }
466
467        protected void addTotalLoad(double load) {
468                accTotalLoad.add(load);
469        }
470       
471        private Map<ResourceUnitName, ResourceUnit> chooseResourcesForExecution(String resourceName,
472                        ExecTask task) {
473
474                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>();
475                LocalResourceManager resourceManager = getResourceManager();
476                if(resourceName != null){
477                        ComputingResource resource = null;
478                        try {
479                                resource = resourceManager.getResourceByName(resourceName);
480                        } catch (ResourceException e) {
481                                return null;
482                        }
483                        resourceManager = new LocalResourceManager(resource);
484                }
485
486                int cpuRequest;
487                try {
488                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
489                } catch (NoSuchFieldException e) {
490                        cpuRequest = 1;
491                }
492
493                if (cpuRequest != 0) {
494                       
495                        List<ResourceUnit> availableUnits = null;
496                        try {
497                                availableUnits = resourceManager.getPE();
498                        } catch (ResourceException e) {
499                                return null;
500                        }
501                       
502                        List<ResourceUnit> choosenPEUnits = new ArrayList<ResourceUnit>();
503                        for (int i = 0; i < availableUnits.size() && cpuRequest > 0; i++) {
504                                PEUnit peUnit = (PEUnit) availableUnits.get(i);
505                                if(peUnit.getFreeAmount() > 0){
506                                        int allocPE = Math.min(peUnit.getFreeAmount(), cpuRequest);
507                                        cpuRequest = cpuRequest - allocPE;
508                                        choosenPEUnits.add(peUnit.replicate(allocPE)); 
509                                }       
510                        }
511                       
512                        if(cpuRequest > 0){
513                                return null;
514                        }
515                        map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0));
516                }
517
518                return  map;
519        }
520       
521        public void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack) {
522                updateProcessingProgress();
523                if (log.isInfoEnabled())
524                        log.info("Received job " + wu.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
525                registerWorkloadUnit(wu);
526        }
527
528        private void registerWorkloadUnit(WorkloadUnit wu){
529                if(!wu.isRegistered()){
530                        wu.register(jobRegistry);
531                }
532                wu.accept(getWorkloadUnitHandler());
533        }
534       
535        class LocalWorkloadUnitHandler implements WorkloadUnitHandler{
536               
537                public void handleJob(JobInterface<?> job){
538
539                        if (log.isInfoEnabled())
540                                log.info("Handle " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
541                       
542                        try {
543                                if(job.getStatus() == BrokerConstants.JOB_STATUS_UNSUBMITTED)
544                                        job.setStatus((int)BrokerConstants.JOB_STATUS_SUBMITTED);
545                        } catch (Exception e) {
546                                e.printStackTrace();
547                        }
548                        List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
549                        jobsList.add(job);
550                        TaskListImpl availableTasks = new TaskListImpl();
551                        for(Task task: jobRegistry.getAvailableTasks(jobsList)){
552                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
553                                availableTasks.add(task);
554                        }
555
556                        for(TaskInterface<?> task: availableTasks){     
557                                registerWorkloadUnit(task);
558                        }
559                }
560               
561                public void handleTask(TaskInterface<?> t){
562                        Task task = (Task)t;
563                        List<AbstractProcesses> processes = task.getProcesses();
564
565                        if(processes == null || processes.size() == 0){
566                                Executable exec = new Executable(task);
567                                registerWorkloadUnit(exec);
568                        } else {
569                                for(int j = 0; j < processes.size(); j++){
570                                        AbstractProcesses procesesSet = processes.get(j);
571                                        Executable exec = new Executable(task, procesesSet);
572                                        registerWorkloadUnit(exec);
573                                }
574                        }
575                }
576               
577                public void handleExecutable(ExecTask task){
578                       
579                        Executable exec = (Executable) task;
580                        jobRegistry.addExecTask(exec);
581                       
582                        exec.trackResource(scheduler.get_name());
583                        Scheduler parentScheduler = scheduler.getParent();
584                        List<String> visitedResource = exec.getVisitedResources();
585                        String [] visitedResourcesArray = visitedResource.toArray(new String[visitedResource.size()]);
586                        while (parentScheduler != null && !ArrayUtils.contains(visitedResourcesArray, parentScheduler.get_name())) {
587                                exec.trackResource(parentScheduler.get_name());
588                                parentScheduler = parentScheduler.getParent();
589                        }
590                        exec.setSchedulerName(scheduler.get_id());
591                       
592                        TaskList newTasks = new TaskListImpl();
593                        newTasks.add(exec);
594               
595                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
596
597                        if (exec.getStatus() == DCWormsTags.QUEUED) {
598                                sendExecutableReadyEvent(exec);
599                        }
600                }
601        }
602
603        public WorkloadUnitHandler getWorkloadUnitHandler() {
604                return new LocalWorkloadUnitHandler();
605        }
606       
607       
608        public LocalResourceManager getResourceManager() {
609                if (resourceManager instanceof ResourceManager)
610                        return (LocalResourceManager) resourceManager;
611                else
612                        return null;
613        }
614       
615}
Note: See TracBrowser for help on using the repository browser.