source: DCWoRMS/trunk/src/schedframe/scheduling/policy/local/LocalManagementSystem.java @ 822

Revision 822, 22.1 KB checked in by wojtekp, 12 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.local;
2
3import dcworms.schedframe.scheduling.ExecTask;
4import dcworms.schedframe.scheduling.Executable;
5import eduni.simjava.Sim_event;
6import eduni.simjava.Sim_system;
7import gridsim.Accumulator;
8import gridsim.ResourceCalendar;
9import gridsim.dcworms.DCWormsTags;
10import gridsim.dcworms.filter.ExecTaskFilter;
11
12import java.util.ArrayList;
13import java.util.HashMap;
14import java.util.Iterator;
15import java.util.List;
16import java.util.Map;
17
18import org.apache.commons.lang.ArrayUtils;
19import org.apache.commons.logging.Log;
20import org.apache.commons.logging.LogFactory;
21import org.joda.time.DateTime;
22import org.joda.time.DateTimeUtilsExt;
23import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
24
25import qcg.shared.constants.BrokerConstants;
26import schedframe.ResourceController;
27import schedframe.events.scheduling.SchedulingEvent;
28import schedframe.events.scheduling.SchedulingEventType;
29import schedframe.events.scheduling.StartTaskExecutionEvent;
30import schedframe.events.scheduling.TaskFinishedEvent;
31import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent;
32import schedframe.exceptions.ResourceException;
33import schedframe.resources.computing.ComputingResource;
34import schedframe.resources.computing.profiles.energy.EnergyEvent;
35import schedframe.resources.computing.profiles.energy.EnergyEventType;
36import schedframe.resources.units.PEUnit;
37import schedframe.resources.units.ProcessingElements;
38import schedframe.resources.units.ResourceUnit;
39import schedframe.resources.units.ResourceUnitName;
40import schedframe.resources.units.StandardResourceUnitName;
41import schedframe.scheduling.ResourceHistoryItem;
42import schedframe.scheduling.Scheduler;
43import schedframe.scheduling.TaskList;
44import schedframe.scheduling.TaskListImpl;
45import schedframe.scheduling.UsedResourcesList;
46import schedframe.scheduling.WorkloadUnitHandler;
47import schedframe.scheduling.manager.resources.LocalResourceManager;
48import schedframe.scheduling.manager.resources.ManagedResources;
49import schedframe.scheduling.manager.resources.ResourceManager;
50import schedframe.scheduling.plan.AllocationInterface;
51import schedframe.scheduling.plan.ScheduledTaskInterface;
52import schedframe.scheduling.plan.SchedulingPlanInterface;
53import schedframe.scheduling.plugin.SchedulingPlugin;
54import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
55import schedframe.scheduling.plugin.grid.ModuleListImpl;
56import schedframe.scheduling.plugin.grid.ModuleType;
57import schedframe.scheduling.policy.AbstractManagementSystem;
58import schedframe.scheduling.queue.TaskQueueList;
59import schedframe.scheduling.tasks.AbstractProcesses;
60import schedframe.scheduling.tasks.Job;
61import schedframe.scheduling.tasks.JobInterface;
62import schedframe.scheduling.tasks.Task;
63import schedframe.scheduling.tasks.TaskInterface;
64import schedframe.scheduling.tasks.WorkloadUnit;
65import simulator.DCWormsConstants;
66import simulator.utils.DoubleMath;
67
68public class LocalManagementSystem extends AbstractManagementSystem {
69
70        private Log log = LogFactory.getLog(LocalManagementSystem.class);
71
72        protected double lastUpdateTime;
73
74        protected Accumulator accTotalLoad;
75
76        public LocalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
77                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
78                        throws Exception {
79
80                super(providerId, entityName, execTimeEstimationPlugin, queues);
81               
82                if (schedPlugin == null) {
83                        throw new Exception("Can not create local scheduling plugin instance.");
84                }
85                this.schedulingPlugin =  schedPlugin;
86                this.moduleList = new ModuleListImpl(1);
87               
88                this.accTotalLoad = new Accumulator();
89        }
90
91        public void init(Scheduler sched, ManagedResources managedResources) {
92                super.init(sched, managedResources);
93                double load = 0;
94                accTotalLoad.add(load);
95        }
96
97        public void processEvent(Sim_event ev) {
98
99                updateProcessingProgress();
100
101                int tag = ev.get_tag();
102
103                switch (tag) {
104
105                case DCWormsTags.TIMER:
106                        if (pluginSupportsEvent(tag)) {
107                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER);
108                                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
109                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
110                                executeSchedulingPlan(decision);
111                        }
112                        sendTimerEvent();
113                        break;
114
115                case DCWormsTags.TASK_READY_FOR_EXECUTION:
116                        ExecTask execTask = (ExecTask) ev.get_data();
117                        try {
118                                execTask.setStatus(DCWormsTags.READY);
119                                if (pluginSupportsEvent(tag)) {
120                                        SchedulingEvent event = new StartTaskExecutionEvent(execTask.getJobId(), execTask.getId());
121                                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
122                                                        queues,  getJobRegistry(), getResourceManager(), moduleList);
123                                        executeSchedulingPlan(decision);
124                                }
125                        } catch (Exception e) {
126                                e.printStackTrace();
127                        }
128                        break;
129
130                case DCWormsTags.TASK_EXECUTION_FINISHED:
131                        execTask = (ExecTask) ev.get_data();
132                        if (execTask.getStatus() == DCWormsTags.INEXEC) {
133                               
134                                finalizeExecutable(execTask);
135                                sendFinishedWorkloadUnit(execTask);
136                                log.debug(execTask.getJobId() + "_" + execTask.getId() + " finished execution on " + new DateTime());
137                                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
138                                if (pluginSupportsEvent(tag)) {
139                                        SchedulingEvent event = new TaskFinishedEvent(execTask.getJobId(), execTask.getId());
140                                        SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
141                                                        queues, getJobRegistry(), getResourceManager(), moduleList);
142                                        executeSchedulingPlan(decision);
143                                }
144                        }
145
146                        Job job = jobRegistry.getJob(execTask.getJobId());
147                        if(!job.isFinished()){
148                                getWorkloadUnitHandler().handleJob(job);
149                        }
150                        break;
151                       
152                case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED:
153                        execTask = (Executable) ev.get_data();
154                        if (pluginSupportsEvent(tag)) {
155                                SchedulingEvent event = new TaskRequestedTimeExpiredEvent(execTask.getJobId(), execTask.getId());
156                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
157                                                queues, getJobRegistry(), getResourceManager(), moduleList);
158                                executeSchedulingPlan(decision);
159                        }
160                        break;
161                       
162                case DCWormsTags.UPDATE_PROCESSING:
163                        updateProcessingTimes(ev);
164                        break;
165                       
166                case DCWormsTags.PHASE_CHANGED:
167                        execTask = (ExecTask) ev.get_data();
168                        //updatePhases(execTask);
169                        break;
170                }
171        }
172
173        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
174                if (pluginSupportsEvent(DCWormsTags.TASK_EXECUTION_FINISHED)) {
175                        SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
176                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
177                                        queues, getJobRegistry(), getResourceManager(), moduleList);
178                        executeSchedulingPlan(decision);
179                }
180                //if(scheduler.getParent() != null){
181                        sendFinishedWorkloadUnit(wu);
182                //}
183        }
184       
185        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
186
187                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
188                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
189                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
190
191                        if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
192                                continue;
193                        }
194
195                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
196
197                        TaskInterface<?> task = taskDecision.getTask();
198                        for (int j = 0; j < allocations.size(); j++) {
199
200                                AllocationInterface<?> allocation = allocations.get(j);
201                                if (allocation.getRequestedResources() == null || allocation.getRequestedResources().size() > 0) {
202                                        ExecTask exec = (ExecTask) task;                                       
203                                        executeTask(exec, allocation.getRequestedResources());
204                                } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){
205                                        allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName()));
206                                        submitTask(task, allocation);
207                                } else {
208                                        ExecTask exec = (ExecTask) task;
209                                        executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), exec));
210                                }
211                        }
212                }
213        }
214
215        protected void executeTask(ExecTask task, Map<ResourceUnitName, ResourceUnit> choosenResources) {
216
217                Executable exec = (Executable)task;
218                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources);
219                if(allocationStatus == false){
220                        log.info("Task " + task.getJobId() + "_" + task.getId() + " requires more resources than is available at this moment.");
221                        return;
222                }
223
224                removeFromQueue(task);
225
226                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION);
227                int time = Double.valueOf(
228                                execTimeEstimationPlugin.execTimeEstimation(event, task, choosenResources, exec.getCompletionPercentage())).intValue();
229                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime()
230                                + " will finish after " + time);
231
232                if (time < 0.0)
233                        return;
234
235                exec.setEstimatedDuration(time);
236                DateTime currentTime = new DateTime();
237                ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime);
238                exec.addUsedResources(resHistItem);
239                try {
240                        exec.setStatus(DCWormsTags.INEXEC);
241                } catch (Exception e) {
242                        // TODO Auto-generated catch block
243                        e.printStackTrace();
244                }
245               
246                //updatePhases(exec);
247                scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, exec);
248
249                try {
250                        long expectedDuration = exec.getExpectedDuration().getMillis() / 1000;
251                        scheduler.sendInternal(expectedDuration, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
252                } catch (NoSuchFieldException e) {
253                        double t = exec.getEstimatedDuration();
254                        scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
255                }
256
257                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
258               
259                PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE);
260               
261                notifyComputingResources(peUnit, EnergyEventType.TASK_STARTED, exec);
262               
263                /*if(peUnit instanceof ProcessingElements){
264                        ProcessingElements pes = (ProcessingElements) peUnit;
265                        for (ComputingResource resource : pes) {
266                                resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, exec));
267                        }
268                } else {
269                        ComputingResource resource = null;
270                        try {
271                                resource = ResourceController.getComputingResourceByName(peUnit.getResourceId());
272                        } catch (ResourceException e) {
273                                return;
274                        }
275                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, exec));
276                }
277*/
278
279                /*for(ExecTaskInterface etask : jobRegistry.getRunningTasks()){
280                        System.out.println(etask.getJobId());
281                        for(String taskId: etask.getVisitedResources())
282                                System.out.println("====="+taskId);
283                }*/
284        }
285       
286        public void finalizeExecutable(ExecTask execTask){
287               
288                Executable exec = (Executable)execTask;
289                exec.finalizeExecutable();
290               
291                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED);
292                scheduler.sim_cancel(filter, null);
293               
294                Task task;
295                Job job = jobRegistry.getJob(exec.getJobId());
296                try {
297                        task = job.getTask(exec.getTaskId());
298                } catch (NoSuchFieldException e) {
299                        return;
300                }
301                if(exec.getProcessesId() == null){
302                        try {
303                                task.setStatus(exec.getStatus());
304                        } catch (Exception e) {
305                                e.printStackTrace();
306                        }
307                } else {
308                        List<AbstractProcesses> processesList = task.getProcesses();
309                        for(int i = 0; i < processesList.size(); i++){
310                                AbstractProcesses processes = processesList.get(i);
311                                if(processes.getId().equals(exec.getProcessesId())){
312                                        processes.setStatus(exec.getStatus());
313                                        break;
314                                }
315                        }
316                }
317               
318                UsedResourcesList lastUsedList = exec.getUsedResources();
319                Map<ResourceUnitName, ResourceUnit> lastUsed = lastUsedList.getLast()
320                                .getResourceUnits();
321                getAllocationManager().freeResources(lastUsed);
322               
323                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
324                notifyComputingResources(peUnit, EnergyEventType.TASK_FINISHED, exec);
325                /*if(peUnit instanceof ProcessingElements){
326                        ProcessingElements pes = (ProcessingElements) peUnit;
327                        for (ComputingResource resource : pes) {
328                                resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec));
329                        }
330                } else {
331                        ComputingResource resource = null;
332                        try {
333                                resource = ResourceController.getComputingResourceByName(peUnit.getResourceId());
334                        } catch (ResourceException e) {
335                                return;
336                        }
337                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec));
338                }*/
339
340                //sendFinishedWorkloadUnit(executable);
341        }
342       
343        protected void updateProcessingProgress() {
344                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
345                if (timeSpan <= 0.0) {
346                        // don't update when nothing changed
347                        return;
348                }
349                lastUpdateTime = Sim_system.clock();
350                Iterator<ExecTask> iter = jobRegistry.getRunningTasks().iterator();
351                while (iter.hasNext()) {
352                        ExecTask task = iter.next();
353                        Executable exec = (Executable)task;
354                        //exec.setCompletionPercentage(exec.getCompletionPercentage() + 100 * timeSpan/exec.getEstimatedDuration());
355                        exec.setCompletionPercentage(exec.getCompletionPercentage() + (100 - exec.getCompletionPercentage()) * timeSpan/(exec.getEstimatedDuration() - new DateTime().getMillis()/1000 + exec.getExecStartTime() + timeSpan));
356                       
357                        UsedResourcesList usedResourcesList = exec.getUsedResources();
358                        PEUnit peUnit = (PEUnit)usedResourcesList.getLast().getResourceUnits()
359                                        .get(StandardResourceUnitName.PE);
360                        double load = getMIShare(timeSpan, peUnit);
361                        addTotalLoad(load);
362                }
363        }
364       
365        private void notifyComputingResources(PEUnit peUnit, EnergyEventType eventType, Object obj){
366
367                if(peUnit instanceof ProcessingElements){
368                        ProcessingElements pes = (ProcessingElements) peUnit;
369                        for (ComputingResource resource : pes) {
370                                resource.handleEvent(new EnergyEvent(eventType, obj));
371                        }
372                        /*try {
373                                for (ComputingResource resource : resourceManager.getResourcesOfType(pes.get(0).getType())) {
374                                        resource.handleEvent(new EnergyEvent(eventType, obj));
375                                }
376                        } catch (ResourceException e) {
377                                // TODO Auto-generated catch block
378                                e.printStackTrace();
379                        }*/
380                } else {
381                        ComputingResource resource = null;
382                        try {
383                                resource = ResourceController.getComputingResourceByName(peUnit.getResourceId());
384                        } catch (ResourceException e) {
385                                return;
386                        }
387                        resource.handleEvent(new EnergyEvent(eventType, obj));
388                }
389        }
390       
391        private double getMIShare(double timeSpan, PEUnit pes) {
392                double localLoad;
393                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
394                if (resCalendar == null)
395                        localLoad = 0;
396                else
397                        // 1 - localLoad_ = available MI share percentage
398                        localLoad = resCalendar.getCurrentLoad();
399
400                int speed = pes.getSpeed();
401                int cnt = pes.getAmount();
402
403                double totalMI = speed * cnt * timeSpan * (1 - localLoad);
404                return totalMI;
405        }
406
407        protected void updateProcessingTimes(Sim_event ev) {
408                updateProcessingProgress();
409                for (ExecTask execTask : jobRegistry.getRunningTasks()) {
410                        Executable exec = (Executable)execTask;
411                        List<String> visitedResource = exec.getVisitedResources();
412                        String originResource = ev.get_data().toString();
413                        if(!ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), originResource)){
414                                continue;
415                        }
416                       
417                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getUsedResources().getLast().getResourceUnits();
418                        int time = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED),
419                                        execTask, choosenResources, exec.getCompletionPercentage())).intValue();
420
421                        //check if the new estimated end time is equal to the previous one; if yes the continue without update
422                        if(DoubleMath.subtract((exec.getExecStartTime() + exec.getEstimatedDuration()), (new DateTime().getMillis()/1000 + time)) == 0.0){
423                                continue;
424                        }
425                        //exec.setEstimatedDuration(time);
426                        exec.setEstimatedDuration(Long.valueOf(new DateTime().getMillis()/1000).intValue() - Double.valueOf(exec.getExecStartTime()).intValue() + time);
427                        ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED);
428                        scheduler.sim_cancel(filter, null);
429                        scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
430                }
431        }       
432       
433       
434        /*protected void updatePhases(ExecTask execTask) {
435                updateProcessingProgress();
436
437                if (execTask.getStatus() == DCWormsTags.INEXEC) {
438                        Executable exec = (Executable)execTask;
439
440                        double phaseLength = 0;
441                        try{
442                                phaseLength = exec.getResourceConsumptionProfile().getResourceConsumption(exec.getCurrentPhase()).getResourceConsumptionTypeChoice().getPercentage()/100;
443                        } catch(Exception e){
444                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED);
445                                scheduler.sim_cancel(filter, null);
446                                double t = DoubleMath.subtract((exec.getExecStartTime() + exec.getEstimatedDuration()), (new DateTime().getMillis()/1000));
447                                scheduler.sendInternal(t, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
448                                PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
449                                notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
450                        }
451                        if(phaseLength != 0){                   
452                                exec.setCurrentPhase(exec.getCurrentPhase() + 1);
453                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.PHASE_CHANGED);
454                                scheduler.sim_cancel(filter, null);
455                                scheduler.sendInternal(phaseLength * exec.getEstimatedDuration(), DCWormsTags.PHASE_CHANGED, execTask);
456                                PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
457                                notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
458                        }
459                        System.out.println("===" + exec.getJobId() + ":" + phaseLength);
460                }
461        }       */
462
463        public double calculateTotalLoad(int size) {
464                // background load, defined during initialization
465                double load;
466                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
467                if (resCalendar == null)
468                        load = 0;
469                else
470                        load = resCalendar.getCurrentLoad();
471
472                int numberOfPE = 0;
473                try {
474                        for(ResourceUnit resUnit : getResourceManager().getPE()){
475                                numberOfPE = numberOfPE + resUnit.getAmount();
476                        }
477                } catch (Exception e) {
478                        numberOfPE = 1;
479                }
480                double tasksPerPE = (double) size / numberOfPE;
481                load += Math.min(1.0 - load, tasksPerPE);
482
483                return load;
484        }
485
486        public Accumulator getTotalLoad() {
487                return accTotalLoad;
488        }
489
490        protected void addTotalLoad(double load) {
491                accTotalLoad.add(load);
492        }
493       
494        private Map<ResourceUnitName, ResourceUnit> chooseResourcesForExecution(String resourceName,
495                        ExecTask task) {
496
497                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>();
498                LocalResourceManager resourceManager = getResourceManager();
499                if(resourceName != null){
500                        ComputingResource resource = null;
501                        try {
502                                resource = resourceManager.getResourceByName(resourceName);
503                        } catch (ResourceException e) {
504                                return null;
505                        }
506                        resourceManager = new LocalResourceManager(resource);
507                }
508
509                int cpuRequest;
510                try {
511                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
512                } catch (NoSuchFieldException e) {
513                        cpuRequest = 1;
514                }
515
516                if (cpuRequest != 0) {
517                       
518                        List<ResourceUnit> availableUnits = null;
519                        try {
520                                availableUnits = resourceManager.getPE();
521                        } catch (ResourceException e) {
522                                return null;
523                        }
524                       
525                        List<ResourceUnit> choosenPEUnits = new ArrayList<ResourceUnit>();
526                        for (int i = 0; i < availableUnits.size() && cpuRequest > 0; i++) {
527                                PEUnit peUnit = (PEUnit) availableUnits.get(i);
528                                if(peUnit.getFreeAmount() > 0){
529                                        int allocPE = Math.min(peUnit.getFreeAmount(), cpuRequest);
530                                        cpuRequest = cpuRequest - allocPE;
531                                        choosenPEUnits.add(peUnit.replicate(allocPE)); 
532                                }       
533                        }
534                       
535                        if(cpuRequest > 0){
536                                return null;
537                        }
538                        map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0));
539                }
540
541                return  map;
542        }
543       
544        public void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack) {
545                updateProcessingProgress();
546                registerWorkloadUnit(wu);
547        }
548
549        private void registerWorkloadUnit(WorkloadUnit wu){
550                if(!wu.isRegistered()){
551                        wu.register(jobRegistry);
552                }
553                wu.accept(getWorkloadUnitHandler());
554        }
555       
556        class LocalWorkloadUnitHandler implements WorkloadUnitHandler{
557               
558                public void handleJob(JobInterface<?> job){
559
560                        if (log.isInfoEnabled())
561                                log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
562
563                        List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
564                        jobsList.add(job);
565                        TaskListImpl availableTasks = new TaskListImpl();
566                        for(Task task: jobRegistry.getAvailableTasks(jobsList)){
567                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
568                                availableTasks.add(task);
569                        }
570
571                        for(TaskInterface<?> task: availableTasks){     
572                                registerWorkloadUnit(task);
573                        }
574                }
575               
576                public void handleTask(TaskInterface<?> t){
577                        Task task = (Task)t;
578                        List<AbstractProcesses> processes = task.getProcesses();
579
580                        if(processes == null || processes.size() == 0){
581                                Executable exec = new Executable(task);
582                                registerWorkloadUnit(exec);
583                        } else {
584                                for(int j = 0; j < processes.size(); j++){
585                                        AbstractProcesses procesesSet = processes.get(j);
586                                        Executable exec = new Executable(task, procesesSet);
587                                        registerWorkloadUnit(exec);
588                                }
589                        }
590                }
591               
592                public void handleExecutable(ExecTask task){
593                       
594                        Executable exec = (Executable) task;
595                        jobRegistry.addExecTask(exec);
596                       
597                        exec.trackResource(scheduler.get_name());
598                        Scheduler parentScheduler = scheduler.getParent();
599                        List<String> visitedResource = exec.getVisitedResources();
600                        String [] visitedResourcesArray = visitedResource.toArray(new String[visitedResource.size()]);
601                        while (parentScheduler != null && !ArrayUtils.contains(visitedResourcesArray, parentScheduler.get_name())) {
602                                exec.trackResource(parentScheduler.get_name());
603                                parentScheduler = parentScheduler.getParent();
604                        }
605                        exec.setSchedulerName(scheduler.get_id());
606                       
607                        TaskList newTasks = new TaskListImpl();
608                        newTasks.add(exec);
609               
610                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
611
612                        if (exec.getStatus() == DCWormsTags.QUEUED) {
613                                sendExecutableReadyEvent(exec);
614                        }
615                }
616        }
617
618        public WorkloadUnitHandler getWorkloadUnitHandler() {
619                return new LocalWorkloadUnitHandler();
620        }
621       
622       
623        public LocalResourceManager getResourceManager() {
624                if (resourceManager instanceof ResourceManager)
625                        return (LocalResourceManager) resourceManager;
626                else
627                        return null;
628        }
629       
630}
Note: See TracBrowser for help on using the repository browser.