source: DCWoRMS/trunk/src/schedframe/scheduling/policy/local/LocalManagementSystem.java @ 478

Revision 478, 26.0 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.local;
2
3import eduni.simjava.Sim_event;
4import eduni.simjava.Sim_system;
5import gridsim.Accumulator;
6import gridsim.GridSimTags;
7import gridsim.Gridlet;
8import gridsim.ResourceCalendar;
9import gridsim.gssim.WormsTags;
10import gridsim.gssim.filter.SubTaskFilter;
11import gssim.schedframe.scheduling.ExecTask;
12import gssim.schedframe.scheduling.Executable;
13
14import java.util.ArrayList;
15import java.util.HashMap;
16import java.util.Iterator;
17import java.util.List;
18import java.util.Map;
19
20import org.apache.commons.lang.ArrayUtils;
21import org.apache.commons.logging.Log;
22import org.apache.commons.logging.LogFactory;
23import org.joda.time.DateTime;
24import org.joda.time.DateTimeUtilsExt;
25import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
26
27import qcg.shared.constants.BrokerConstants;
28import schedframe.ResourceController;
29import schedframe.events.scheduling.EventReason;
30import schedframe.events.scheduling.SchedulingEvent;
31import schedframe.events.scheduling.SchedulingEventType;
32import schedframe.events.scheduling.StartTaskExecutionEvent;
33import schedframe.events.scheduling.TaskCanceledEvent;
34import schedframe.events.scheduling.TaskFinishedEvent;
35import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent;
36import schedframe.exceptions.ResourceException;
37import schedframe.resources.computing.ComputingResource;
38import schedframe.resources.computing.profiles.energy.EnergyEvent;
39import schedframe.resources.computing.profiles.energy.EnergyEventType;
40import schedframe.resources.units.PEUnit;
41import schedframe.resources.units.ProcessingElements;
42import schedframe.resources.units.ResourceUnit;
43import schedframe.resources.units.ResourceUnitName;
44import schedframe.resources.units.StandardResourceUnitName;
45import schedframe.scheduling.ResourceHistoryItem;
46import schedframe.scheduling.Scheduler;
47import schedframe.scheduling.UsedResourceList;
48import schedframe.scheduling.WorkloadUnitHandler;
49import schedframe.scheduling.WorkloadUnitListImpl;
50import schedframe.scheduling.manager.resources.LocalResourceManager;
51import schedframe.scheduling.manager.resources.ManagedResources;
52import schedframe.scheduling.manager.resources.ResourceManager;
53import schedframe.scheduling.plan.AllocationInterface;
54import schedframe.scheduling.plan.ScheduledTaskInterface;
55import schedframe.scheduling.plan.SchedulingPlanInterface;
56import schedframe.scheduling.plugin.SchedulingPlugin;
57import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
58import schedframe.scheduling.plugin.grid.ModuleListImpl;
59import schedframe.scheduling.plugin.grid.ModuleType;
60import schedframe.scheduling.policy.AbstractManagementSystem;
61import schedframe.scheduling.queue.TaskQueueList;
62import schedframe.scheduling.tasks.AbstractProcesses;
63import schedframe.scheduling.tasks.Job;
64import schedframe.scheduling.tasks.JobInterface;
65import schedframe.scheduling.tasks.SubmittedTask;
66import schedframe.scheduling.tasks.Task;
67import schedframe.scheduling.tasks.TaskInterface;
68import schedframe.scheduling.tasks.WorkloadUnit;
69import simulator.WormsConstants;
70import simulator.utils.DoubleMath;
71
72public class LocalManagementSystem extends AbstractManagementSystem {
73
74        private Log log = LogFactory.getLog(LocalManagementSystem.class);
75
76        protected double lastUpdateTime;
77
78        protected Accumulator accTotalLoad;
79
80        public LocalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
81                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
82                        throws Exception {
83
84                super(providerId, entityName, execTimeEstimationPlugin, queues);
85
86                //schedulingPlugin = (LocalSchedulingPlugin) InstanceFactory.createInstance(schedulingPluginClassName, LocalSchedulingPlugin.class);
87               
88                if (schedPlugin == null) {
89                        throw new Exception("Can not create local scheduling plugin instance.");
90                }
91                this.schedulingPlugin =  schedPlugin;
92                accTotalLoad = new Accumulator();
93                moduleList = new ModuleListImpl(1);
94
95        }
96
97        public void init(Scheduler sched, ManagedResources managedResources) {
98                super.init(sched, managedResources);
99                //scheduler = sched;
100                //resourceManager = ResourceManagerFactory.createResourceManager(scheduler);
101                double load = 0;
102                accTotalLoad.add(load);
103        }
104
105        public void processEvent(Sim_event ev) {
106
107                updateProcessingProgress();
108
109                int tag = ev.get_tag();
110                Object obj;
111
112                switch (tag) {
113
114                case WormsTags.TIMER:
115                        if (pluginSupportsEvent(tag)) {
116                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER);
117                                SchedulingPlanInterface decision =  schedulingPlugin.schedule(event,
118                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
119                                executeSchedulingPlan(decision);
120                        }
121                        sendTimerEvent();
122
123                        break;
124
125                case WormsTags.TASK_READY_FOR_EXECUTION:
126                       
127                        ExecTask data = (ExecTask) ev.get_data();
128                        try {
129                                data.setStatus(Gridlet.READY);
130                                if (pluginSupportsEvent(tag)) {
131                                        SchedulingEvent event = new StartTaskExecutionEvent(data.getJobId(), data.getId());
132                                        SchedulingPlanInterface decision =  schedulingPlugin.schedule(event,
133                                                        queues,  getJobRegistry(), getResourceManager(), moduleList);
134                                        executeSchedulingPlan(decision);
135                                }
136                        } catch (Exception e) {
137                                e.printStackTrace();
138                        }
139                        break;
140
141                case WormsTags.TASK_EXECUTION_FINISHED:
142                        obj = ev.get_data();
143                        ExecTask task = (ExecTask) obj;
144                        if (task.getStatus() == Gridlet.INEXEC) {
145                                finalizeExecutable(task);
146                                SubmittedTask subTask = (SubmittedTask)task;
147                                sendFinishedWorkloadUnit((ExecTask)subTask.getGridlet());
148                                //task.setGridletStatus(Gridlet.SUCCESS);
149                                //task.finalizeGridlet();
150                                log.debug(task.getJobId() + "_" + task.getId() + " finished execution on " + new DateTime());
151                                log.info(WormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
152                                /*UsedResourceList<ResourceHistoryItem> lastUsedList = task.getUsedResources();
153                                Map<ResourceUnitName, AbstractResourceUnit> lastUsed = lastUsedList.getLast()
154                                                .getResourceUnits();
155                                getAllocationManager().freeResources(lastUsed);
156                                ProcessingElements pes = (ProcessingElements) lastUsed.get(StandardResourceUnitName.PROCESSINGELEMENTS);
157                                for (ComputingResource resource : pes) {
158                                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, task));
159                                }
160                                SubTaskFilter filter = new SubTaskFilter(task.getGridletID(), GssimTags.TASK_REQUESTED_TIME_EXPIRED);
161                                scheduler.sim_cancel(filter, null);
162                                super.sendFinishJob((Executable) task.getGridlet());*/
163                        }
164                        if (pluginSupportsEvent(tag)) {
165                                SchedulingEvent event = new TaskFinishedEvent(task.getJobId(), task.getId());
166                                SchedulingPlanInterface decision = schedulingPlugin.schedule(event,
167                                                queues, getJobRegistry(), getResourceManager(), moduleList);
168                                executeSchedulingPlan(decision);
169                        }
170                        Job job = jobRegistry.get(task.getJobId());
171                        if(!job.isFinished()){
172                                getWorkloadUnitHandler().handleJob(job);
173                        }
174
175                        break;
176                case WormsTags.TASK_REQUESTED_TIME_EXPIRED:
177                        obj = ev.get_data();
178                        task = (SubmittedTask) obj;
179                        if (pluginSupportsEvent(tag)) {
180                                SchedulingEvent event = new TaskRequestedTimeExpiredEvent(task.getJobId(), task.getId());
181                                SchedulingPlanInterface decision = schedulingPlugin.schedule(event,
182                                                queues, getJobRegistry(), getResourceManager(), moduleList);
183                                executeSchedulingPlan(decision);
184                        }
185
186                        break;
187                case WormsTags.UPDATE:
188                        updateProcessingTimes(ev);
189                        break;
190                }
191        }
192       
193
194        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
195                if (pluginSupportsEvent(WormsTags.TASK_EXECUTION_FINISHED)) {
196                        SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
197                        SchedulingPlanInterface decision =  schedulingPlugin.schedule(event,
198                                        queues, getJobRegistry(), getResourceManager(), moduleList);
199                        executeSchedulingPlan(decision);
200                }
201                //if(scheduler.getParent() != null){
202                        sendFinishedWorkloadUnit(wu);
203                //}
204        }
205
206        public void notifyCanceledWorkloadUnit(WorkloadUnit job) {
207
208                if (!pluginSupportsEvent(GridSimTags.GRIDLET_CANCEL))
209                        return;
210
211                Executable executable = (Executable) job;
212                String jobID = executable.getJobId();
213
214                SchedulingPlanInterface decision = null;
215
216                try {
217                        executable.setStatus((int) BrokerConstants.JOB_STATUS_CANCELED);
218
219                        TaskCanceledEvent event = new TaskCanceledEvent(executable.getJobId(), executable.getTaskId());
220                        event.setReason(EventReason.RESERVATION_EXCEEDED);
221                        decision = schedulingPlugin
222                                        .schedule(event, queues, getJobRegistry(), getResourceManager(), moduleList);
223
224                        if (decision == null)
225                                return;
226
227                        executeSchedulingPlan(decision);
228
229                } catch (Exception e) {
230                        log.error("Exception during scheduling. " + e.getMessage());
231                        e.printStackTrace();
232                }
233        }
234       
235        protected void executeSchedulingPlan(SchedulingPlanInterface decision) {
236
237                ArrayList<ScheduledTaskInterface> taskSchedulingDecisions = decision.getTasks();
238                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
239                        try {
240                                ScheduledTaskInterface taskDecision = taskSchedulingDecisions.get(i);
241
242                                // not scheduled again are returned to the user.
243                                if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
244                                        continue;
245                                }
246
247                                ArrayList<AllocationInterface> allocations = taskDecision.getAllocations();
248
249                                WorkloadUnit task = taskDecision.getTask();
250                                for (int j = 0; j < allocations.size(); j++) {
251
252                                        AllocationInterface allocation = allocations.get(j);
253                                        if (allocation.isProcessing()) {
254                                               
255                                                ExecTask exec = (ExecTask) task;
256                                               
257
258                                                //Executable e = (Executable)task;
259                                                /*SubmittedTask submittedTask = jobRegistry.getSubmittedTask(e.getJobId(), e.getId());
260                                                if(submittedTask == null)
261                                                {       submittedTask = new SubmittedTask(e);
262                                                        jobRegistry.addTask(submittedTask);
263                                                }*/
264
265                                                /*e.visitResource(scheduler.get_name());
266                                                Scheduler parentScheduler = scheduler.getParent();
267                                                while (parentScheduler != null && !e.getVisitedResources().contains(parentScheduler.get_name())) {
268                                                        e.visitResource(parentScheduler.get_name());
269                                                        parentScheduler = parentScheduler.getParent();
270                                                }*/
271                                               
272                                                                                               
273                                                executeTask(exec, allocation.getRequestedResources());
274                                        //} else if(GridSim.getEntityId(allocation.getProviderName()) != -1 || scheduler.getScheduler(allocation.getProviderName())!=null){
275                                        } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){
276                                                allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName()));
277                                                submitWorkloadUnit(task, allocation);
278                                        } else {
279
280                                                ExecTask exec = (ExecTask) task;
281                                               
282                                                        //Executable exec = jobRegistry.createExecutable(t, allocation);
283                                                        //exec.setResourceParameter(scheduler.get_id(), 1);
284                                                /*e.visitResource(scheduler.get_name());
285                                                Scheduler parentScheduler = scheduler.getParent();
286                                                while (parentScheduler != null && !e.getVisitedResources().contains(parentScheduler.get_name())) {
287                                                        e.visitResource(parentScheduler.get_name());
288                                                        parentScheduler = parentScheduler.getParent();
289                                                }*/
290                                                executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), (ExecTask)task));
291                                        }
292                                }
293
294                        } catch (Exception e) {
295                                e.printStackTrace();
296                        }
297                }
298        }
299
300        protected void executeTask(ExecTask task, Map<ResourceUnitName, ResourceUnit> choosenResources) {
301        //      Executable exec = (Executable) task;
302       
303                SubmittedTask submittedTask = (SubmittedTask)task;
304                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources);
305                if(allocationStatus == false)
306                        return;
307                removeFromQueue(task);
308                //SubmittedTask submittedTask = (SubmittedTask)task;
309                /* submittedTask = jobRegistry.getSubmittedTask(exec.getJobId(), exec.getId());
310                if(submittedTask == null)
311                {       submittedTask = new SubmittedTask(exec);
312                        jobRegistry.addTask(submittedTask);
313                }*/
314                double completionPercentage = (submittedTask.getLength() - submittedTask.getRemainingGridletLength())/submittedTask.getLength();
315                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION);
316                int time = Double.valueOf(
317                                execTimeEstimationPlugin.execTimeEstimation(event, task, choosenResources, completionPercentage)).intValue();
318                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime()
319                                + " will finish after " + time);
320
321                if (time < 0.0)
322                        return;
323
324                submittedTask.setEstimatedDuration(time);
325                DateTime currentTime = new DateTime();
326                ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime);
327                submittedTask.addUsedResources(resHistItem);
328                submittedTask.setFinishTime(currentTime.getMillis() / 1000);
329               
330                jobRegistry.saveHistory(submittedTask, time, choosenResources);
331               
332                scheduler.sendInternal(time, WormsTags.TASK_EXECUTION_FINISHED,
333                                submittedTask);
334
335                try {
336                        long expectedDuration = submittedTask.getExpectedDuration().getMillis() / 1000;
337                        scheduler.sendInternal(expectedDuration, WormsTags.TASK_REQUESTED_TIME_EXPIRED, submittedTask);
338                } catch (NoSuchFieldException e) {
339                        double t = submittedTask.getEstimatedDuration();
340                        scheduler.sendInternal(t, WormsTags.TASK_REQUESTED_TIME_EXPIRED, submittedTask);
341                }
342               
343                submittedTask.setGridletStatus(Gridlet.INEXEC);
344                log.info(WormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
345               
346                PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE);
347                if(peUnit instanceof ProcessingElements){
348                        ProcessingElements pes = (ProcessingElements) peUnit;
349                        for (ComputingResource resource : pes) {
350                                resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, submittedTask));
351                        }
352                } else {
353                        ComputingResource resource = null;
354                        try {
355                                resource = ResourceController.getComputingResourceByName(peUnit.getResourceId());
356                        } catch (ResourceException e) {
357                               
358                        }
359                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, submittedTask));
360                }
361                /*ProcessingElements pes = (ProcessingElements) choosenResources.get(StandardResourceUnitName.PE);
362                for (ComputingResource resource : pes) {
363                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, submittedTask));
364                }*/
365
366                /*for(ExecTaskInterface etask : jobRegistry.getRunningTasks()){
367                        System.out.println(etask.getJobId());
368                        for(String taskId: etask.getVisitedResources())
369                                System.out.println("====="+taskId);
370                }*/
371        }
372       
373        public void finalizeExecutable(ExecTask exec){
374               
375                SubmittedTask subTask = (SubmittedTask)exec;
376                subTask.setGridletStatus(Gridlet.SUCCESS);
377                subTask.finalizeGridlet();
378                UsedResourceList<ResourceHistoryItem> lastUsedList = subTask.getUsedResources();
379                Map<ResourceUnitName, ResourceUnit> lastUsed = lastUsedList.getLast()
380                                .getResourceUnits();
381                getAllocationManager().freeResources(lastUsed);
382               
383                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
384                if(peUnit instanceof ProcessingElements){
385                        ProcessingElements pes = (ProcessingElements) peUnit;
386                        for (ComputingResource resource : pes) {
387                                resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, subTask));
388                        }
389                } else {
390                        ComputingResource resource = null;
391                        try {
392                                resource = ResourceController.getComputingResourceByName(peUnit.getResourceId());
393                        } catch (ResourceException e) {
394                               
395                        }
396                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, subTask));
397                }
398                /*ProcessingElements pes = (ProcessingElements) lastUsed.get(StandardResourceUnitName.PE);
399                for (ComputingResource resource : pes) {
400                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, subTask));
401                }*/
402                SubTaskFilter filter = new SubTaskFilter(subTask.getGridletID(), WormsTags.TASK_REQUESTED_TIME_EXPIRED);
403                scheduler.sim_cancel(filter, null);
404               
405                Executable executable = (Executable) subTask.getGridlet();
406                Job job = jobRegistry.get(executable.getJobId());
407
408                Task task = null;
409                try {
410                        task = job.getTask(executable.getTaskId());
411                } catch (NoSuchFieldException e) {
412                        e.printStackTrace();
413                }
414                if(executable.getProcessesId() == null){
415                        try {
416                                task.setStatus(executable.getStatus());
417                        } catch (Exception e) {
418                                e.printStackTrace();
419                        }
420                } else {
421                        List<AbstractProcesses> processesList = task.getProcesses();
422                        for(int i = 0; i < processesList.size(); i++){
423                                AbstractProcesses processes = processesList.get(i);
424                                if(processes.getId().equals(executable.getProcessesId())){
425                                        processes.setStatus(executable.getStatus());
426                                        break;
427                                }
428                        }
429                }
430                //sendFinishedWorkloadUnit(executable);
431        }
432       
433        protected void updateProcessingProgress() {
434                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
435                if (timeSpan <= 0.0) {
436                        // don't update when nothing changed
437                        return;
438                }
439                lastUpdateTime = Sim_system.clock();
440                Iterator<ExecTask> iter = jobRegistry.getRunningTasks().iterator();
441                while (iter.hasNext()) {
442                        ExecTask task = iter.next();
443                        SubmittedTask subTask = (SubmittedTask)task;
444                        UsedResourceList<ResourceHistoryItem> usedResourcesList = subTask.getUsedResources();
445                        ResourceUnit unit = usedResourcesList.getLast().getResourceUnits()
446                                        .get(StandardResourceUnitName.PE);
447
448                        double load = getMIShare(timeSpan, (PEUnit) unit);
449                        subTask.updateGridletFinishedSoFar(load);
450                        addTotalLoad(load);
451                }
452        }
453
454        private double getMIShare(double timeSpan, PEUnit pes) {
455                double localLoad;
456                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
457                if (resCalendar == null)
458                        localLoad = 0;
459                else
460                        // 1 - localLoad_ = available MI share percentage
461                        localLoad = resCalendar.getCurrentLoad();
462
463                int speed = pes.getSpeed();
464                int cnt = pes.getAmount();
465
466                double totalMI = speed * cnt * timeSpan * (1 - localLoad);
467                return totalMI;
468        }
469
470        protected void updateProcessingTimes(Sim_event ev) {
471                updateProcessingProgress();
472                for (ExecTask task : jobRegistry.getRunningTasks()) {
473                        SubmittedTask subTask = (SubmittedTask)task;
474                        List<String> visitedResource = subTask.getVisitedResources();
475                        String originResource = ev.get_data().toString();
476                        if(!ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), originResource)){
477                                continue;
478                        }
479                       
480                        Map<ResourceUnitName, ResourceUnit> choosenResources = subTask.getUsedResources().getLast().getResourceUnits();
481                        double completionPercentage = (task.getLength() - subTask.getRemainingGridletLength())/task.getLength();
482                        double time = execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED),
483                                        task, choosenResources, completionPercentage);
484
485                        /*if(!subTask.getVisitedResources().contains(ev.get_data().toString())) {
486                                continue;
487                        }*/
488                        //check if the new estimated end time is equal to the previous one; if yes the continue without update
489                        if( DoubleMath.subtract((subTask.getExecStartTime() + subTask.getEstimatedDuration()), (new DateTime().getMillis()/1000 + time)) == 0.0){
490                                continue;
491                        }
492                        SubTaskFilter filter = new SubTaskFilter(subTask.getGridletID(), WormsTags.TASK_EXECUTION_FINISHED);
493                        scheduler.sim_cancel(filter, null);
494                        scheduler.sendInternal(time, WormsTags.TASK_EXECUTION_FINISHED, task);
495
496                }
497        }       
498
499        public double calculateTotalLoad(int size) {
500                // background load, defined during initialization
501                double load;
502                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
503                if (resCalendar == null)
504                        load = 0;
505                else
506                        load = resCalendar.getCurrentLoad();
507
508                int numberOfPE = 0;
509                try {
510                        for(ResourceUnit resUnit : getResourceManager().getPE()){
511                                numberOfPE = numberOfPE + resUnit.getAmount();
512                        }
513                        //numberOfPE = getResourceManager().getPE().size();
514                } catch (Exception e) {
515                        numberOfPE = 1;
516                }
517                double tasksPerPE = (double) size / numberOfPE;
518                load += Math.min(1.0 - load, tasksPerPE);
519
520                return load;
521        }
522
523        public Accumulator getTotalLoad() {
524                return accTotalLoad;
525        }
526
527        protected void addTotalLoad(double load) {
528                accTotalLoad.add(load);
529        }
530       
531        private Map<ResourceUnitName, ResourceUnit> chooseResourcesForExecution(String resourceName,
532                        ExecTask task) {
533
534                ResourceManager resourceManager = this.resourceManager;
535                if(resourceName != null){
536                        ComputingResource resource = null;
537                        try {
538                                resource = resourceManager.getResourceByName(resourceName);
539                        } catch (ResourceException e) {
540                                return null;
541                        }
542
543                        resourceManager = new LocalResourceManager(resource);
544                }
545                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>();
546
547
548                int cpuRequest;
549                try {
550                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
551                } catch (NoSuchFieldException e) {
552                        cpuRequest = 1;
553                }
554
555                //PEUnit processingUnits = null;
556                if (cpuRequest != 0) {
557                       
558                        List<ResourceUnit> availableUnits = null;
559                        try {
560                                availableUnits = getResourceManager().getPE();
561                        } catch (ResourceException e) {
562                                return null;
563                        }
564                        List<ResourceUnit> choosenPEUnits = new ArrayList<ResourceUnit>();
565
566                        for (int i = 0; i < availableUnits.size() && cpuRequest > 0; i++) {
567                                PEUnit peUnit = (PEUnit) availableUnits .get(i);
568                                if(peUnit.getFreeAmount() > 0){
569                                        int  allocPE = Math.min(peUnit.getFreeAmount(), cpuRequest);
570                                        cpuRequest = cpuRequest - allocPE;
571                                        choosenPEUnits.add(peUnit.replicate(allocPE)); 
572                                }       
573                        }
574                       
575                        if(cpuRequest > 0){
576                                return null;
577                        }
578
579                        /*try {
580                                List<? extends ComputingResource> processingElements = resourceManager.getResourcesOfType(StandardResourceType.Processor);
581                                List<ComputingResource> choosenResources = new ArrayList<ComputingResource>();
582                                int peSize = processingElements.size();
583                                for (int i = 0; i < peSize && cpuRequest > 0; i++) {
584                                        if (processingElements.get(i).getStatus() == ResourceStatus.FREE) {
585                                                choosenResources.add(processingElements.get(i));
586                                                cpuRequest--;
587                                        }
588                                }
589                                if (cpuRequest > 0)
590                                {       
591                                        return null;
592                                }
593                                processingUnits = new ProcessingElements(choosenResources);
594                        } catch (Exception e) {
595       
596                                List<ResourceUnit> procResUnit = resourceManager.getDistributedResourceUnits(StandardResourceUnitName.PE);
597
598                                for(ResourceUnit resUnit: procResUnit){
599                                        if (resUnit.getFreeAmount() >= cpuRequest)
600                                        {       
601                                                processingUnits = new PEUnit(resUnit.getResourceId(), cpuRequest, cpuRequest);
602                                                break;
603                                        }
604                                }
605                        }*/
606                        map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0));
607                }
608                /*int memoryRequest;
609                try {
610                        memoryRequest = Double.valueOf(task.getMemoryRequest()).intValue();
611                } catch (NoSuchFieldException e) {
612                        memoryRequest = 0;
613                }
614                if (memoryRequest != 0) {
615                        List<ResourceUnit> resUnit = resourceManager.getSharedResourceUnits().get(StandardResourceUnitName.MEMORY);
616
617                        Memory memory = null;
618                        for (ResourceUnit memUnit : resUnit) {
619                                Memory m = (Memory) memUnit;
620
621                                if (m.getFreeAmount() >= memoryRequest) {       
622                                        System.out.println(m.getResourceId()+ ";"+m.getAmount()+";"+m.getFreeAmount());
623                                        memory = new Memory(m, memoryRequest, memoryRequest);
624                                }
625                        }
626                        if(memory == null)
627                                return null;
628                        map.put(StandardResourceUnitName.MEMORY, memory);
629                }*/
630                return  map;
631        }
632
633
634       
635        public void notifySubmittedWorkloadUnit(WorkloadUnit job, boolean ack) {
636                updateProcessingProgress();
637                registerWorkloadUnit(job);
638        }
639
640        private void registerWorkloadUnit(WorkloadUnit wu){
641                if(!wu.isRegistered()){
642                        wu.register(jobRegistry);
643                }
644                wu.accept(getWorkloadUnitHandler());
645        }
646       
647        class LocalWorkloadUnitHandler implements WorkloadUnitHandler{
648               
649                public void handleJob(Job job){
650
651                        if (log.isInfoEnabled())
652                                log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
653
654                        List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
655                        jobsList.add(job);
656                        WorkloadUnitListImpl readyTasks = new WorkloadUnitListImpl();
657                        for(Task task: jobRegistry.getReadyTasks(jobsList)){
658                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
659                                readyTasks.add(task);
660                        }
661
662                        for(WorkloadUnit e:readyTasks){
663                                registerWorkloadUnit(e);
664                        }
665                }
666               
667                public void handleTask(TaskInterface<?> ti){
668                        Task task = (Task)ti;
669                       
670                        if(task.getProcesses() == null){
671                                Executable exec = new Executable(task);
672                                exec.setUserID(task.getSenderId());
673                                exec.setLength(task.getLength());
674                                registerWorkloadUnit(exec);
675                        } else {
676                                List<AbstractProcesses> processesList = task.getProcesses();
677                                for(int i = 0; i < processesList.size(); i++){ 
678                                        AbstractProcesses processes = processesList.get(i);
679                                        Executable exec = new Executable(task, processes);
680                                        exec.setUserID(task.getSenderId());
681                                        exec.setLength(task.getLength());
682                                        registerWorkloadUnit(exec);
683                                }
684                        }
685                }
686               
687                public void handleExecutable(ExecTask task){
688                        Executable exec = (Executable) task;
689
690                        // int cost =
691                        // this.resourceManager.getResourceCharacteristic().getResUnits() !=
692                        // null ?
693                        // this.resourceManager.getResourceCharacteristic().getResUnits().get(ResourceParameterName.COST).getAmount()
694                        // : 1;
695
696                        exec.visitResource(scheduler.get_name());
697                        Scheduler parentScheduler = scheduler.getParent();
698                        while (parentScheduler != null && !exec.getVisitedResources().contains(parentScheduler.get_name())) {
699                                exec.visitResource(parentScheduler.get_name());
700                                parentScheduler = parentScheduler.getParent();
701                        }
702                       
703                        exec.setResourceParameter(scheduler.get_id(), 1);
704                        SubmittedTask subTask = new SubmittedTask(exec);
705                        jobRegistry.addTask(subTask);
706                        WorkloadUnitListImpl newTasks = new WorkloadUnitListImpl();
707                        newTasks.add(subTask);
708               
709                        schedulingPlugin.placeJobsInQueues(newTasks, queues, getResourceManager(), moduleList);
710
711                        if (subTask.getStatus() == Gridlet.QUEUED) {
712                                sendExecutableReadyEvent(exec);
713                        }
714                }
715               
716                public void handleSubmittedTask(SubmittedTask task){
717
718                        task.visitResource(scheduler.get_name());
719                        Scheduler parentScheduler = scheduler.getParent();
720                        while (parentScheduler != null && !task.getVisitedResources().contains(parentScheduler.get_name())) {
721                                task.visitResource(parentScheduler.get_name());
722                                parentScheduler = parentScheduler.getParent();
723                        }
724
725                        jobRegistry.addTask(task);
726                        WorkloadUnitListImpl newTasks = new WorkloadUnitListImpl();
727                        newTasks.add(task);
728               
729                        schedulingPlugin.placeJobsInQueues(newTasks, queues, getResourceManager(), moduleList);
730
731                        if (task.getStatus() == Gridlet.QUEUED) {
732                                sendExecutableReadyEvent(task);
733                        }
734                }
735        }
736
737        public WorkloadUnitHandler getWorkloadUnitHandler() {
738                return new LocalWorkloadUnitHandler();
739        }
740       
741       
742        public LocalResourceManager getResourceManager() {
743                if (resourceManager instanceof ResourceManager)
744                        return (LocalResourceManager) resourceManager;
745                else
746                        return null;
747        }
748       
749}
Note: See TracBrowser for help on using the repository browser.