source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/policy/local/LocalManagementSystem.java @ 1332

Revision 1332, 22.5 KB checked in by wojtekp, 11 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.local;
2
3import dcworms.schedframe.scheduling.ExecTask;
4import dcworms.schedframe.scheduling.Executable;
5import eduni.simjava.Sim_event;
6import eduni.simjava.Sim_system;
7import gridsim.dcworms.DCWormsTags;
8import gridsim.dcworms.filter.ExecTaskFilter;
9
10import java.util.ArrayList;
11import java.util.HashMap;
12import java.util.Iterator;
13import java.util.List;
14import java.util.Map;
15
16import org.apache.commons.lang.ArrayUtils;
17import org.apache.commons.logging.Log;
18import org.apache.commons.logging.LogFactory;
19import org.joda.time.DateTime;
20import org.joda.time.DateTimeUtilsExt;
21import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
22
23import qcg.shared.constants.BrokerConstants;
24import schedframe.SimulatedEnvironment;
25import schedframe.events.scheduling.SchedulingEvent;
26import schedframe.events.scheduling.SchedulingEventType;
27import schedframe.events.scheduling.StartTaskExecutionEvent;
28import schedframe.events.scheduling.TaskFinishedEvent;
29import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent;
30import schedframe.exceptions.ResourceException;
31import schedframe.resources.StandardResourceType;
32import schedframe.resources.computing.ComputingResource;
33import schedframe.resources.computing.profiles.energy.EnergyEvent;
34import schedframe.resources.computing.profiles.energy.EnergyEventType;
35import schedframe.resources.units.PEUnit;
36import schedframe.resources.units.ProcessingElements;
37import schedframe.resources.units.ResourceUnit;
38import schedframe.resources.units.ResourceUnitName;
39import schedframe.resources.units.StandardResourceUnitName;
40import schedframe.scheduling.ResourceHistoryItem;
41import schedframe.scheduling.Scheduler;
42import schedframe.scheduling.TaskList;
43import schedframe.scheduling.TaskListImpl;
44import schedframe.scheduling.UsedResourcesList;
45import schedframe.scheduling.WorkloadUnitHandler;
46import schedframe.scheduling.manager.resources.LocalResourceManager;
47import schedframe.scheduling.manager.resources.ManagedResources;
48import schedframe.scheduling.manager.resources.ResourceManager;
49import schedframe.scheduling.plan.AllocationInterface;
50import schedframe.scheduling.plan.ScheduledTaskInterface;
51import schedframe.scheduling.plan.SchedulingPlanInterface;
52import schedframe.scheduling.plugin.SchedulingPlugin;
53import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
54import schedframe.scheduling.plugin.grid.ModuleListImpl;
55import schedframe.scheduling.policy.AbstractManagementSystem;
56import schedframe.scheduling.queue.TaskQueueList;
57import schedframe.scheduling.tasks.AbstractProcesses;
58import schedframe.scheduling.tasks.Job;
59import schedframe.scheduling.tasks.JobInterface;
60import schedframe.scheduling.tasks.Task;
61import schedframe.scheduling.tasks.TaskInterface;
62import schedframe.scheduling.tasks.WorkloadUnit;
63import simulator.DCWormsConstants;
64import simulator.stats.GSSAccumulator;
65import simulator.utils.DoubleMath;
66
67public class LocalManagementSystem extends AbstractManagementSystem {
68
69        private Log log = LogFactory.getLog(LocalManagementSystem.class);
70
71        protected double lastUpdateTime;
72
73        public LocalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
74                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
75                        throws Exception {
76
77                super(providerId, entityName, execTimeEstimationPlugin, queues);
78               
79                if (schedPlugin == null) {
80                        throw new Exception("Can not create local scheduling plugin instance.");
81                }
82                this.schedulingPlugin =  schedPlugin;
83                this.moduleList = new ModuleListImpl(1);
84        }
85
86        public void init(Scheduler sched, ManagedResources managedResources) {
87                super.init(sched, managedResources);
88        }
89
90        public void processEvent(Sim_event ev) {
91
92                updateProcessingProgress();
93
94                int tag = ev.get_tag();
95
96                switch (tag) {
97
98                case DCWormsTags.TIMER:
99                        if (pluginSupportsEvent(tag)) {
100                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER);
101                                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
102                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
103                                executeSchedulingPlan(decision);
104                        }
105                        sendTimerEvent();
106                        break;
107
108                case DCWormsTags.TASK_READY_FOR_EXECUTION:
109                        ExecTask execTask = (ExecTask) ev.get_data();
110                        try {
111                                execTask.setStatus(DCWormsTags.READY);
112                                if (pluginSupportsEvent(tag)) {
113                                        SchedulingEvent event = new StartTaskExecutionEvent(execTask.getJobId(), execTask.getId());
114                                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
115                                                        queues,  getJobRegistry(), getResourceManager(), moduleList);
116                                        executeSchedulingPlan(decision);
117                                }
118                        } catch (Exception e) {
119                                e.printStackTrace();
120                        }
121                        break;
122
123                case DCWormsTags.TASK_EXECUTION_FINISHED:
124                        execTask = (ExecTask) ev.get_data();
125                        if (execTask.getStatus() == DCWormsTags.INEXEC) {
126                               
127                                finalizeExecutable(execTask);
128                                sendFinishedWorkloadUnit(execTask);
129                                log.debug(execTask.getJobId() + "_" + execTask.getId() + " finished execution on " + new DateTime());
130                                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());
131                                if (pluginSupportsEvent(tag)) {
132                                        SchedulingEvent event = new TaskFinishedEvent(execTask.getJobId(), execTask.getId());
133                                        SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
134                                                        queues, getJobRegistry(), getResourceManager(), moduleList);
135                                        executeSchedulingPlan(decision);
136                                }
137                        }
138
139                        Job job = jobRegistry.getJob(execTask.getJobId());
140                        if(!job.isFinished()){
141                                getWorkloadUnitHandler().handleJob(job);
142                        }
143                        //SUPPORTS PRECEDING CONSTRAINST COMING BOTH FROM SWF FILES
144                        /*else {
145                                try {
146                                        job.setStatus((int)BrokerConstants.JOB_STATUS_FINISHED);
147                                } catch (Exception e) {
148
149                                }
150                                for(JobInterface<?> j: jobRegistry.getJobs((int)BrokerConstants.JOB_STATUS_SUBMITTED)){
151                                        getWorkloadUnitHandler().handleJob(j); 
152                                }
153                        }*/
154                        break;
155                       
156                case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED:
157                        execTask = (Executable) ev.get_data();
158                        if (pluginSupportsEvent(tag)) {
159                                SchedulingEvent event = new TaskRequestedTimeExpiredEvent(execTask.getJobId(), execTask.getId());
160                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
161                                                queues, getJobRegistry(), getResourceManager(), moduleList);
162                                executeSchedulingPlan(decision);
163                        }
164                        break;
165                       
166                       
167                case DCWormsTags.TASK_EXECUTION_CHANGED:
168                        execTask = (ExecTask) ev.get_data();
169                        updateTaskExecution(execTask, SchedulingEventType.RESOURCE_STATE_CHANGED);
170                        break;
171                       
172                case DCWormsTags.UPDATE_PROCESSING:
173                        updateProcessingTimes(ev);
174                        break;
175                       
176                case DCWormsTags.POWER_LIMIT_EXCEEDED:
177                        if (pluginSupportsEvent(tag)) {
178                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.POWER_LIMIT_EXCEEDED);
179                                schedulingPlugin.schedule(event,
180                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
181                        }
182                        break;
183                }
184        }
185
186        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
187                if (pluginSupportsEvent(DCWormsTags.TASK_EXECUTION_FINISHED)) {
188                        SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
189                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
190                                        queues, getJobRegistry(), getResourceManager(), moduleList);
191                        executeSchedulingPlan(decision);
192                }
193                //if(scheduler.getParent() != null){
194                        sendFinishedWorkloadUnit(wu);
195                //}
196        }
197       
198        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
199
200                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
201                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
202                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
203
204                        if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
205                                continue;
206                        }
207
208                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
209
210                        TaskInterface<?> task = taskDecision.getTask();
211                        for (int j = 0; j < allocations.size(); j++) {
212
213                                AllocationInterface<?> allocation = allocations.get(j);
214                                if (allocation.getRequestedResources() == null || allocation.getRequestedResources().size() > 0) {
215                                        ExecTask exec = (ExecTask) task;                                       
216                                        executeTask(exec, allocation.getRequestedResources());
217                                } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){
218                                        allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName()));
219                                        submitTask(task, allocation);
220                                } else {
221                                        ExecTask exec = (ExecTask) task;
222                                        executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), exec));
223                                }
224                        }
225                }
226        }
227
228        protected void executeTask(ExecTask task, Map<ResourceUnitName, ResourceUnit> choosenResources) {
229
230                Executable exec = (Executable)task;
231                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources);
232                if(allocationStatus == false){
233                        log.info("Task " + task.getJobId() + "_" + task.getId() + " requires more resources than is available at this moment.");
234                        return;
235                }
236
237                removeFromQueue(task);
238
239                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime());
240
241                //if (phaseDuration < 0.0)
242                //      return;
243
244                //exec.setEstimatedDuration(exec.getEstimatedDuration() + phaseDuration);
245                DateTime currentTime = new DateTime();
246                ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime);
247                resHistItem.setCompletionPercentage(0);
248                exec.addUsedResources(resHistItem);
249               
250                try {
251                        exec.setStatus(DCWormsTags.INEXEC);
252                } catch (Exception e) {
253                        // TODO Auto-generated catch block
254                        e.printStackTrace();
255                }
256               
257                PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE);
258                notifyComputingResources(peUnit, EnergyEventType.TASK_STARTED, exec);
259               
260                updateTaskExecution(exec, SchedulingEventType.START_TASK_EXECUTION);
261                //scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, exec);
262
263                try {
264                        long expectedDuration = exec.getExpectedDuration().getMillis() / 1000;
265                        scheduler.sendInternal(expectedDuration, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
266                } catch (NoSuchFieldException e) {
267                        //double t = exec.getEstimatedDuration();
268                        //scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
269                }
270
271                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());
272               
273        }
274       
275        protected void finalizeExecutable(ExecTask execTask){
276               
277                Executable exec = (Executable)execTask;
278                exec.finalizeExecutable();
279               
280                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED);
281                scheduler.sim_cancel(filter, null);
282               
283                Task task;
284                Job job = jobRegistry.getJob(exec.getJobId());
285                try {
286                        task = job.getTask(exec.getTaskId());
287                } catch (NoSuchFieldException e) {
288                        return;
289                }
290                if(exec.getProcessesId() == null){
291                        try {
292                                task.setStatus(exec.getStatus());
293                        } catch (Exception e) {
294                                e.printStackTrace();
295                        }
296                } else {
297                        List<AbstractProcesses> processesList = task.getProcesses();
298                        for(int i = 0; i < processesList.size(); i++){
299                                AbstractProcesses processes = processesList.get(i);
300                                if(processes.getId().equals(exec.getProcessesId())){
301                                        processes.setStatus(exec.getStatus());
302                                        break;
303                                }
304                        }
305                }
306               
307                UsedResourcesList lastUsedList = exec.getUsedResources();
308                Map<ResourceUnitName, ResourceUnit> lastUsed = lastUsedList.getLast()
309                                .getResourceUnits();
310                getAllocationManager().freeResources(lastUsed);
311               
312                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
313                notifyComputingResources(peUnit, EnergyEventType.TASK_FINISHED, exec);
314        }
315       
316        protected void updateProcessingProgress() {
317
318                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
319                if (timeSpan <= 0.0) {
320                        // don't update when nothing changed
321                        return;
322                }
323                lastUpdateTime = Sim_system.clock();
324                Iterator<ExecTask> iter = jobRegistry.getRunningTasks().iterator();
325                while (iter.hasNext()) {
326                        ExecTask task = iter.next();
327                        Executable exec = (Executable)task;
328
329                        if(exec.getUsedResources().size() > 1){
330                                //System.out.println("--- upadteProgressX: " + Sim_system.sim_clock() );
331                                //System.out.println("taskId: " + exec.getId() + "; completion percentage: " + exec.getCompletionPercentage() + "; timespan: " + timeSpan + "; estimatedDuration: " +  exec.getEstimatedDuration());
332                                exec.setCompletionPercentage(exec.getCompletionPercentage() + 100 * (timeSpan / (exec.getEstimatedDuration()) * (1.0 - exec.getUsedResources().get(exec.getUsedResources().size()-1).getCompletionPercentage()/100.0)));
333                                //System.out.println("newProgress: " + exec.getCompletionPercentage()  );       
334                        }
335                        else {
336                                //System.out.println("--- upadteProgress2: " + Sim_system.sim_clock() );
337                                //System.out.println("taskId: " + exec.getId() + "; completion percentage: " + exec.getCompletionPercentage() + "; timespan: " + timeSpan + "; estimatedDuration: " +  exec.getEstimatedDuration());
338                                exec.setCompletionPercentage(exec.getCompletionPercentage() + 100 * (timeSpan / exec.getEstimatedDuration()));
339                                //System.out.println("newProgress: " + exec.getCompletionPercentage()  );       
340                        }
341                }
342        }
343       
344        private void notifyComputingResources(PEUnit peUnit, EnergyEventType eventType, Object obj){
345                if(peUnit instanceof ProcessingElements){
346                        ProcessingElements pes = (ProcessingElements) peUnit;
347                        for (ComputingResource resource : pes) {
348                                resource.handleEvent(new EnergyEvent(eventType, obj, scheduler.getFullName()));
349                                //DataCenterWorkloadSimulator.getEventManager().sendToResources(resource.getType(), 0, new EnergyEvent(eventType, obj));
350                        }
351                        /*try {
352                                for (ComputingResource resource : resourceManager.getResourcesOfType(pes.get(0).getType())) {
353                                        resource.handleEvent(new EnergyEvent(eventType, obj));
354                                }
355                        } catch (ResourceException e) {
356                                // TODO Auto-generated catch block
357                                e.printStackTrace();
358                        }*/
359                } else {
360                        ComputingResource resource = null;
361                        try {
362                                resource = SimulatedEnvironment.getComputingResourceByName(peUnit.getResourceId());
363                        } catch (ResourceException e) {
364                                return;
365                        }
366                        resource.handleEvent(new EnergyEvent(eventType, obj, scheduler.getFullName()));
367                }
368        }
369
370        protected void updateProcessingTimes(Sim_event ev) {
371                for (ExecTask execTask : jobRegistry.getRunningTasks()) {
372                        Executable exec = (Executable)execTask;
373
374                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getUsedResources().getLast().getResourceUnits();
375                        double lastTimeStamp = exec.getUsedResources().getLast().getTimeStamp().getMillis()/1000;
376                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED),
377                                        execTask, choosenResources, exec.getCompletionPercentage())).intValue();
378
379                        if(DoubleMath.subtract((lastTimeStamp + exec.getEstimatedDuration()), (new DateTime().getMillis()/1000 + phaseDuration)) == 0.0){
380                                continue;
381                        }
382                        //System.out.println("=== upadteTIme: " + Sim_system.sim_clock() );
383                        //System.out.println("execId: " + exec.getId() +  "; estimatedDuration " + exec.getEstimatedDuration());
384                        //System.out.println("execId: " + exec.getId() + "; difference " + DoubleMath.subtract((lastTimeStamp + exec.getEstimatedDuration()), (new DateTime().getMillis()/1000 + phaseDuration)));
385                        //System.out.println("completionPercantage: " + exec.getCompletionPercentage() + "; basic duration: " +exec.getResourceConsumptionProfile().getCurrentResourceConsumption().getDuration() +   "; phaseDuration: " +  phaseDuration);
386
387                        exec.setEstimatedDuration(phaseDuration);
388                        DateTime currentTime = new DateTime();
389                        ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime);
390                        resHistItem.setCompletionPercentage(exec.getCompletionPercentage());
391                        exec.addUsedResources(resHistItem);
392                       
393                        if(exec.getResourceConsumptionProfile().getCurrentResourceConsumption() == exec.getResourceConsumptionProfile().getResourceConsumptionList().getLast()){
394                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED);
395                                scheduler.sim_cancel(filter, null);
396                                scheduler.sendInternal(phaseDuration , DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
397                                //PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
398                                //notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
399                        } else{
400                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_CHANGED);
401                                scheduler.sim_cancel(filter, null);
402                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
403                                //PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
404                                //notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
405                        }
406                }
407        }       
408
409        protected void updateTaskExecution(ExecTask execTask, SchedulingEventType schedEvType) {
410
411                if (execTask.getStatus() == DCWormsTags.INEXEC) {
412                        Executable exec = (Executable)execTask;
413                       
414                        try {
415                                exec.setStatus(DCWormsTags.NEW_EXEC_PHASE);
416                        } catch (Exception e) {
417                        }
418                       
419                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getUsedResources().getLast().getResourceUnits();
420
421                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(schedEvType),
422                                        execTask, choosenResources, exec.getCompletionPercentage())).intValue();
423
424                        exec.setEstimatedDuration(phaseDuration);
425                       
426                        if(exec.getResourceConsumptionProfile().getCurrentResourceConsumption() == exec.getResourceConsumptionProfile().getResourceConsumptionList().getLast()){
427                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
428                                PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
429                                notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
430                        } else {
431                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
432                                PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
433                                notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
434                        }
435                }
436        }       
437
438        public double calculateTotalLoad() {
439
440                GSSAccumulator loadAcc = new GSSAccumulator();
441                try {
442                        for(ComputingResource compRes: getResourceManager().getResourcesOfType(StandardResourceType.Node)){
443                                loadAcc.add(compRes.getLoadInterface().getRecentUtilization().getValue());                             
444                        }
445                } catch (ResourceException e) {
446                        // TODO Auto-generated catch block
447                        e.printStackTrace();
448                }
449               
450                return loadAcc.getMean();
451        }
452
453        private Map<ResourceUnitName, ResourceUnit> chooseResourcesForExecution(String resourceName,
454                        ExecTask task) {
455
456                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>();
457                LocalResourceManager resourceManager = getResourceManager();
458                if(resourceName != null){
459                        ComputingResource resource = null;
460                        try {
461                                resource = resourceManager.getResourceByName(resourceName);
462                        } catch (ResourceException e) {
463                                return null;
464                        }
465                        resourceManager = new LocalResourceManager(resource);
466                }
467
468                int cpuRequest;
469                try {
470                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
471                } catch (NoSuchFieldException e) {
472                        cpuRequest = 1;
473                }
474
475                if (cpuRequest != 0) {
476                       
477                        List<ResourceUnit> availableUnits = null;
478                        try {
479                                availableUnits = resourceManager.getPE();
480                        } catch (ResourceException e) {
481                                return null;
482                        }
483                       
484                        List<ResourceUnit> choosenPEUnits = new ArrayList<ResourceUnit>();
485                        for (int i = 0; i < availableUnits.size() && cpuRequest > 0; i++) {
486                                PEUnit peUnit = (PEUnit) availableUnits.get(i);
487                                if(peUnit.getFreeAmount() > 0){
488                                        int allocPE = Math.min(peUnit.getFreeAmount(), cpuRequest);
489                                        cpuRequest = cpuRequest - allocPE;
490                                        choosenPEUnits.add(peUnit.replicate(allocPE)); 
491                                }       
492                        }
493                       
494                        if(cpuRequest > 0){
495                                return null;
496                        }
497                        map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0));
498                }
499
500                return  map;
501        }
502       
503        public void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack) {
504                //250314
505                //updateProcessingProgress();
506                if (log.isInfoEnabled())
507                        log.info("Received job " + wu.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
508                registerWorkloadUnit(wu);
509        }
510
511        private void registerWorkloadUnit(WorkloadUnit wu){
512                if(!wu.isRegistered()){
513                        wu.register(jobRegistry);
514                }
515                wu.accept(getWorkloadUnitHandler());
516        }
517       
518        class LocalWorkloadUnitHandler implements WorkloadUnitHandler{
519               
520                public void handleJob(JobInterface<?> job){
521
522                        if (log.isInfoEnabled())
523                                log.info("Handle " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
524                       
525                        try {
526                                if(job.getStatus() == BrokerConstants.JOB_STATUS_UNSUBMITTED)
527                                        job.setStatus((int)BrokerConstants.JOB_STATUS_SUBMITTED);
528                        } catch (Exception e) {
529                                e.printStackTrace();
530                        }
531                        List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
532                        jobsList.add(job);
533                        TaskListImpl availableTasks = new TaskListImpl();
534                        for(Task task: jobRegistry.getAvailableTasks(jobsList)){
535                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
536                                availableTasks.add(task);
537                        }
538
539                        for(TaskInterface<?> task: availableTasks){     
540                                registerWorkloadUnit(task);
541                        }
542                }
543               
544                public void handleTask(TaskInterface<?> t){
545                        Task task = (Task)t;
546                        List<AbstractProcesses> processes = task.getProcesses();
547
548                        if(processes == null || processes.size() == 0){
549                                Executable exec = new Executable(task);
550                                registerWorkloadUnit(exec);
551                        } else {
552                                for(int j = 0; j < processes.size(); j++){
553                                        AbstractProcesses procesesSet = processes.get(j);
554                                        Executable exec = new Executable(task, procesesSet);
555                                        registerWorkloadUnit(exec);
556                                }
557                        }
558                }
559               
560                public void handleExecutable(ExecTask task){
561                       
562                        Executable exec = (Executable) task;
563                        jobRegistry.addExecTask(exec);
564                       
565                        exec.trackResource(scheduler.get_name());
566                        Scheduler parentScheduler = scheduler.getParent();
567                        List<String> visitedResource = exec.getVisitedResources();
568                        String [] visitedResourcesArray = visitedResource.toArray(new String[visitedResource.size()]);
569                        while (parentScheduler != null && !ArrayUtils.contains(visitedResourcesArray, parentScheduler.get_name())) {
570                                exec.trackResource(parentScheduler.get_name());
571                                parentScheduler = parentScheduler.getParent();
572                        }
573                        exec.setSchedulerName(scheduler.get_id());
574                       
575                        TaskList newTasks = new TaskListImpl();
576                        newTasks.add(exec);
577               
578                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
579
580                        if (exec.getStatus() == DCWormsTags.QUEUED) {
581                                sendExecutableReadyEvent(exec);
582                        }
583                }
584        }
585
586        public WorkloadUnitHandler getWorkloadUnitHandler() {
587                return new LocalWorkloadUnitHandler();
588        }
589       
590       
591        public LocalResourceManager getResourceManager() {
592                if (resourceManager instanceof ResourceManager)
593                        return (LocalResourceManager) resourceManager;
594                else
595                        return null;
596        }
597       
598}
Note: See TracBrowser for help on using the repository browser.