source: xssim/trunk/src/test/rewolucja/scheduling/implementation/LocalManagementSystem.java @ 142

Revision 142, 19.2 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package test.rewolucja.scheduling.implementation;
2
3import eduni.simjava.Sim_event;
4import eduni.simjava.Sim_system;
5import gridsim.Accumulator;
6import gridsim.GridSimTags;
7import gridsim.Gridlet;
8import gridsim.ResourceCalendar;
9import gridsim.gssim.GssimConstants;
10import gridsim.gssim.GssimTags;
11import gridsim.gssim.ResourceHistoryItem;
12import gridsim.gssim.SubmittedTask;
13import gridsim.gssim.filter.SubTaskFilter;
14import grms.shared.constants.BrokerConstants;
15import gssim.schedframe.scheduling.AbstractExecutable;
16import gssim.schedframe.scheduling.ExecTaskInterface;
17import gssim.schedframe.scheduling.Executable;
18
19import java.util.ArrayList;
20import java.util.HashMap;
21import java.util.Iterator;
22import java.util.List;
23import java.util.Map;
24import java.util.Properties;
25
26import org.apache.commons.logging.Log;
27import org.apache.commons.logging.LogFactory;
28import org.joda.time.DateTime;
29import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
30
31import schedframe.resources.units.ResourceUnit;
32import schedframe.scheduling.events.SchedulingEvent;
33import schedframe.scheduling.events.SchedulingEventReason;
34import schedframe.scheduling.events.SchedulingEventType;
35import schedframe.scheduling.events.StartTaskExecutionEvent;
36import schedframe.scheduling.events.TaskCanceledEvent;
37import schedframe.scheduling.events.TaskFinishedEvent;
38import schedframe.scheduling.events.TaskRequestedTimeExpiredEvent;
39import schedframe.scheduling.plugin.SchedulingPluginConfiguration;
40import schedframe.scheduling.plugin.estimation.ExecTimeEstimationPlugin;
41import schedframe.scheduling.plugin.grid.ModuleListImpl;
42import schedframe.scheduling.plugin.grid.ModuleType;
43import schedframe.scheduling.plugin.local.LocalSchedulingPlugin;
44import schedframe.scheduling.utils.ResourceParameterName;
45import simulator.utils.DoubleMath;
46import simulator.utils.InstanceFactory;
47import test.rewolucja.GSSIMJobInterface;
48import test.rewolucja.energy.EnergyEvent;
49import test.rewolucja.energy.EnergyEventType;
50import test.rewolucja.resources.ProcessingElements;
51import test.rewolucja.resources.ResourceStatus;
52import test.rewolucja.resources.ResourceType;
53import test.rewolucja.resources.description.ExecResourceDescription;
54import test.rewolucja.resources.exception.ResourceException;
55import test.rewolucja.resources.logical.LogicalResource;
56import test.rewolucja.resources.manager.factory.ResourceManagerFactory;
57import test.rewolucja.resources.manager.implementation.ResourceManager;
58import test.rewolucja.resources.manager.interfaces.ResourceManagerInterface;
59import test.rewolucja.resources.manager.utils.ResourceManagerUtils;
60import test.rewolucja.resources.physical.base.ComputingResource;
61import test.rewolucja.scheduling.plan.AllocationInterfaceNew;
62import test.rewolucja.scheduling.plan.ScheduledTaskInterfaceNew;
63import test.rewolucja.scheduling.plan.SchedulingPlanInterfaceNew;
64import test.rewolucja.task.JobList;
65
66public class LocalManagementSystem extends ManagementSystem {
67
68        private Log log = LogFactory.getLog(LocalManagementSystem.class);
69
70        protected double lastUpdateTime;
71
72        protected Accumulator accTotalLoad_;
73
74        public LocalManagementSystem(String providerId, String entityName, String schedulingPluginClassName,
75                        ExecTimeEstimationPlugin execTimeEstimationPlugin, ExecResourceDescription resourceDescription)
76                        throws Exception {
77
78                super(providerId, entityName, schedulingPluginClassName, execTimeEstimationPlugin, resourceDescription);
79
80                schedulingPlugin = (LocalSchedulingPlugin) InstanceFactory.createInstance(schedulingPluginClassName, LocalSchedulingPlugin.class);
81                if (schedulingPlugin == null) {
82                        throw new Exception("Can not create local scheduling plugin instance.");
83                }
84
85                accTotalLoad_ = new Accumulator();
86                moduleList = new ModuleListImpl(1);
87
88        }
89
90        public void init(LogicalResource logRes) {
91                logicalResource = logRes;
92                resourceManager = (ResourceManager) ResourceManagerFactory.createResourceManager(logicalResource);
93                double load = 0;
94                accTotalLoad_.add(load);
95        }
96
97        public void processEvent(Sim_event ev) {
98
99                updateProcessingProgress();
100
101                int tag = ev.get_tag();
102                Object obj;
103
104                switch (tag) {
105
106                case GssimTags.TIMER:
107                        if (pluginSupportsEvent(tag)) {
108                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER);
109                                SchedulingPlanInterfaceNew decision =  schedulingPlugin.schedule(event,
110                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
111                                executeSchedulingPlan(decision);
112                        }
113                        sendTimerEvent();
114
115                        break;
116
117                case GssimTags.TASK_READY_FOR_EXECUTION:
118                        Executable data = (Executable) ev.get_data();
119
120                        try {
121                                data.setGridletStatus(Gridlet.READY);
122                                if (pluginSupportsEvent(tag)) {
123                                        SchedulingEvent event = new StartTaskExecutionEvent(data.getJobId(), data.getId());
124                                        SchedulingPlanInterfaceNew decision =  schedulingPlugin.schedule(event,
125                                                        queues,  getJobRegistry(), getResourceManager(), moduleList);
126                                        executeSchedulingPlan(decision);
127
128                                }
129                        } catch (Exception e) {
130                                e.printStackTrace();
131                        }
132                        break;
133
134                case GssimTags.TASK_EXECUTION_FINISHED:
135                        obj = ev.get_data();
136                        SubmittedTask task = (SubmittedTask) obj;
137                        if (task.getStatus() == Gridlet.INEXEC) {
138                                task.setGridletStatus(Gridlet.SUCCESS);
139                                task.finalizeGridlet();
140                                log.debug(task.getJobId() + "_" + task.getId() + " finished execution on " + new DateTime());
141                                log.info(GssimConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
142                                List<ResourceHistoryItem> lastUsedList = task.getUsedResources();
143                                Map<ResourceParameterName, ResourceUnit> lastUsed = lastUsedList.get(lastUsedList.size() - 1)
144                                                .getResourceUnits();
145                                getAllocationManager().freeResources(lastUsed);
146                                ProcessingElements pes = (ProcessingElements) lastUsed.get(ResourceParameterName.PROCESSINGELEMENTS);
147                                for (ComputingResource resource : pes) {
148                                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, task));
149                                }
150                                super.sendFinishJob((AbstractExecutable) task.getGridlet());
151                        }
152
153                        if (pluginSupportsEvent(tag)) {
154                                SchedulingEvent event = new TaskFinishedEvent(task.getJobId(), task.getId());
155                                SchedulingPlanInterfaceNew decision = schedulingPlugin.schedule(event,
156                                                queues, getJobRegistry(), getResourceManager(), moduleList);
157                                executeSchedulingPlan(decision);
158                        }
159
160                        break;
161                case GssimTags.TASK_REQUESTED_TIME_EXPIRED:
162                        obj = ev.get_data();
163                        task = (SubmittedTask) obj;
164                        if (pluginSupportsEvent(tag)) {
165                                SchedulingEvent event = new TaskRequestedTimeExpiredEvent(task.getJobId(), task.getId());
166                                SchedulingPlanInterfaceNew decision = schedulingPlugin.schedule(event,
167                                                queues, getJobRegistry(), getResourceManager(), moduleList);
168                                executeSchedulingPlan(decision);
169                        }
170
171                        break;
172                case GssimTags.UPDATE:
173                        updateProcessingTimes(ev);
174                        break;
175                }
176        }
177       
178        public void notifySubmittedJob(GSSIMJobInterface<?> job, boolean ack) {
179                if (job instanceof AbstractExecutable) {
180                        AbstractExecutable executable = (AbstractExecutable) job;
181                        // int cost =
182                        // this.resourceManager.getResourceCharacteristic().getResUnits() !=
183                        // null ?
184                        // this.resourceManager.getResourceCharacteristic().getResUnits().get(ResourceParameterName.COST).getAmount()
185                        // : 1;
186                        executable.setResourceParameter(logicalResource.get_id(), 1);
187
188                        updateProcessingProgress();
189                        JobList newTasks = new JobList();
190                        SubmittedTask submittedTask = jobRegistry.getSubmittedTask(executable.getJobId(), executable.getId());
191                        if(submittedTask == null)
192                        {       submittedTask = new SubmittedTask((Executable) executable);
193                                jobRegistry.addTask(submittedTask);
194                        }
195
196                        //submittedTask.addToResPath(logicalRes.get_name());
197                        submittedTask.visitResource(logicalResource.get_name());
198                        LogicalResource logicalRes = logicalResource.getParent();
199                        /*while (logicalRes != null && !submittedTask.getResPath().contains(logicalRes.get_name())) {
200                                submittedTask.addToResPath(logicalRes.get_name());
201                                logicalRes = logicalRes.getParent();
202                        }*/
203                        while (logicalRes != null && !submittedTask.getVisitedResources().contains(logicalRes.get_name())) {
204                                submittedTask.visitResource(logicalRes.get_name());
205                                logicalRes = logicalRes.getParent();
206                        }
207                        newTasks.add(submittedTask);
208                        schedulingPlugin.placeJobsInQueues(newTasks, queues, getResourceManager(), moduleList);
209
210                        if (job.getStatus() == Gridlet.QUEUED) {
211                                sendJobReadyEvent(job);
212                        }
213                }
214        }
215
216        public void notifyReturnedJob(GSSIMJobInterface<?> job) {
217                if (pluginSupportsEvent(GssimTags.TASK_EXECUTION_FINISHED)) {
218                        SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
219                        SchedulingPlanInterfaceNew decision =  schedulingPlugin.schedule(event,
220                                        queues, getJobRegistry(), getResourceManager(), moduleList);
221                        executeSchedulingPlan(decision);
222                }
223                if(logicalResource.getParent() != null){
224                        sendFinishJob((AbstractExecutable)job);
225                }
226        }
227
228        public void notifyCanceledJob(GSSIMJobInterface<?> job) {
229
230                if (!pluginSupportsEvent(GridSimTags.GRIDLET_CANCEL))
231                        return;
232
233                Executable executable = (Executable) job;
234                String jobID = executable.getJobId();
235
236                SchedulingPlanInterfaceNew decision = null;
237
238                try {
239
240                        executable.setStatus((int) BrokerConstants.JOB_STATUS_CANCELED);
241
242                        TaskCanceledEvent event = new TaskCanceledEvent(executable.getJobId(), executable.getTaskId());
243                        event.setReason(SchedulingEventReason.RESERVATION_EXCEEDED);
244                        decision = schedulingPlugin
245                                        .schedule(event, queues, getJobRegistry(), getResourceManager(), moduleList);
246
247                        if (decision == null)
248                                return;
249
250                        executeSchedulingPlan(decision);
251
252                } catch (Exception e) {
253                        log.error("Exception during scheduling. " + e.getMessage());
254                        e.printStackTrace();
255                }
256        }
257       
258        protected void executeSchedulingPlan(SchedulingPlanInterfaceNew decision) {
259
260                ArrayList<ScheduledTaskInterfaceNew> taskSchedulingDecisions = decision.getTasks();
261                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
262                        try {
263                                ScheduledTaskInterfaceNew taskDecision = taskSchedulingDecisions.get(i);
264
265                                // not scheduled again are returned to the user.
266                                if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
267                                        continue;
268                                }
269
270                                ArrayList<AllocationInterfaceNew> allocations = taskDecision.getAllocations();
271
272                                GSSIMJobInterface<?> task = taskDecision.getTask();
273                                for (int j = 0; j < allocations.size(); j++) {
274
275                                        AllocationInterfaceNew allocation = allocations.get(j);
276                                        if (allocation.isProcessing()) {
277                                                executeTask(task, allocation.getRequestedResources());
278                                        //} else if(GridSim.getEntityId(allocation.getProviderName()) != -1 || logicalResource.getLogicalResource(allocation.getProviderName())!=null){
279                                        } else if(resourceManager.getResourceProvider(allocation.getProviderName()) != null){
280                                                allocation.setProviderName(resourceManager.getResourceProvider(allocation.getProviderName()));
281                                                submitJob(task, allocation);
282                                        } else {
283                                                executeTask(task, chooseResourcesForExecution(allocation.getProviderName(), (ExecTaskInterface)task));
284                                        }
285                                }
286
287                        } catch (Exception e) {
288                                e.printStackTrace();
289                        }
290                }
291        }
292
293        protected void executeTask(GSSIMJobInterface<?> job, Map<ResourceParameterName, ResourceUnit> choosenResources) {
294                ExecTaskInterface task = (ExecTaskInterface) job;
295                SubmittedTask submittedTask = (SubmittedTask) task;
296
297                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources);
298                if(allocationStatus == false)
299                        return;
300                removeFromQueue(task);
301                double completionPercentage = (submittedTask.getLength() - submittedTask.getRemainingGridletLength())/submittedTask.getLength();
302                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION);
303                int time = Double.valueOf(
304                                forecastFinishTimePlugin.execTimeEstimation(event, choosenResources, task, completionPercentage)).intValue();
305                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime()
306                                + " will finish after " + time);
307
308                if (time < 0.0)
309                        return;
310               
311                submittedTask.setEstimatedDuration(time);
312                DateTime currentTime = new DateTime();
313                ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime);
314                submittedTask.addUsedResources(resHistItem);
315                submittedTask.setFinishTime(currentTime.getMillis() / 1000);
316               
317                jobRegistry.saveHistory(submittedTask, time, choosenResources);
318               
319                logicalResource.sendInternal(time, GssimTags.TASK_EXECUTION_FINISHED,
320                                submittedTask);
321
322                try {
323                        long expectedDuration = submittedTask.getExpectedDuration().getMillis() / 1000;
324                        logicalResource.sendInternal(expectedDuration, GssimTags.TASK_REQUESTED_TIME_EXPIRED, submittedTask);
325                } catch (NoSuchFieldException e) {
326                        double t = submittedTask.getEstimatedDuration();
327                        logicalResource.sendInternal(t, GssimTags.TASK_REQUESTED_TIME_EXPIRED, submittedTask);
328                }
329               
330                submittedTask.setGridletStatus(Gridlet.INEXEC);
331                log.info(GssimConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
332               
333                ProcessingElements pes = (ProcessingElements) choosenResources.get(ResourceParameterName.PROCESSINGELEMENTS);
334                for (ComputingResource resource : pes) {
335                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, submittedTask));
336                }
337
338                /*for(ExecTaskInterface etask : jobRegistry.getRunningTasks()){
339                        System.out.println(etask.getJobId());
340                        for(String taskId: etask.getVisitedResources())
341                                System.out.println("====="+taskId);
342                }*/
343
344        }
345       
346        protected void updateProcessingProgress() {
347                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
348                if (timeSpan <= 0.0) {
349                        // don't update when nothing changed
350                        return;
351                }
352                lastUpdateTime = Sim_system.clock();
353                Iterator<ExecTaskInterface> iter = jobRegistry.getRunningTasks().iterator();
354                while (iter.hasNext()) {
355                        ExecTaskInterface task = iter.next();
356                        SubmittedTask subTask = (SubmittedTask)task;
357                        List<ResourceHistoryItem> usedResourcesList = subTask.getUsedResources();
358                        ResourceUnit unit = usedResourcesList.get(usedResourcesList.size() - 1).getResourceUnits()
359                                        .get(ResourceParameterName.PROCESSINGELEMENTS);
360
361                        double load = getMIShare(timeSpan, (ProcessingElements) unit);
362                        subTask.updateGridletFinishedSoFar(load);
363                        addTotalLoad(load);
364                }
365        }
366
367        private double getMIShare(double timeSpan, ProcessingElements pes) {
368                double localLoad;
369                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
370                if (resCalendar == null)
371                        localLoad = 0;
372                else
373                        // 1 - localLoad_ = available MI share percentage
374                        localLoad = resCalendar.getCurrentLoad();
375
376                int speed = pes.getSpeed();
377                int cnt = pes.getAmount();
378
379                double totalMI = speed * cnt * timeSpan * (1 - localLoad);
380                return totalMI;
381        }
382
383        protected void updateProcessingTimes(Sim_event ev) {
384                updateProcessingProgress();
385                for (ExecTaskInterface task : jobRegistry.getRunningTasks()) {
386                        SubmittedTask subTask = (SubmittedTask)task;
387                        Map<ResourceParameterName, ResourceUnit> choosenResources = subTask.getUsedResources()
388                                        .get(subTask.getUsedResources().size() - 1).getResourceUnits();
389                        double completionPercentage = (task.getLength() - subTask.getRemainingGridletLength())/task.getLength();
390                        double time = forecastFinishTimePlugin.execTimeEstimation(null, choosenResources, task,
391                                        completionPercentage);
392                        /*if(!subTask.getResPath().contains(ev.get_data().toString())) {
393                                continue;*/
394                        if(!subTask.getVisitedResources().contains(ev.get_data().toString())) {
395                                continue;
396                        }// else if( DoubleMath.subtract(subTask.getEstimatedDuration(), (time + lastUpdateTime)) == 0.0 || completionPercentage == 0){
397                        else if( DoubleMath.subtract((subTask.getExecStartTime()+subTask.getEstimatedDuration()), (new DateTime().getMillis()/1000 + time)) == 0.0){
398                                continue;
399                        }
400                        SubTaskFilter filter = new SubTaskFilter(subTask.getGridletID(), GssimTags.TASK_EXECUTION_FINISHED);
401                        logicalResource.sim_cancel(filter, null);
402                        logicalResource.sendInternal(time, GssimTags.TASK_EXECUTION_FINISHED, task);
403
404                }
405        }       
406
407        public boolean pluginSupportsEvent(int eventType) {
408                SchedulingPluginConfiguration config = (SchedulingPluginConfiguration) schedulingPlugin.getConfiguration();
409                if (config == null)
410                        return false;
411
412                Map<SchedulingEventType, Object> servedEvent = config.getServedEvents();
413                if (servedEvent == null)
414                        return false;
415
416                switch (eventType) {
417
418                case GssimTags.TIMER:
419                        return servedEvent.containsKey(SchedulingEventType.TIMER);
420
421                case GssimTags.GRIDLET_SUBMIT:
422                        return servedEvent.containsKey(SchedulingEventType.TASK_ARRIVED);
423
424                case GssimTags.TASK_READY_FOR_EXECUTION:
425                        return servedEvent.containsKey(SchedulingEventType.START_TASK_EXECUTION);
426                case GssimTags.TASK_EXECUTION_FINISHED:
427                        return servedEvent.containsKey(SchedulingEventType.TASK_FINISHED);
428                case GssimTags.GRIDLET_CANCEL:
429                        return servedEvent.containsKey(SchedulingEventType.TASK_CANCELED);
430                case GssimTags.GRIDLET_RESUME:
431                        return servedEvent.containsKey(SchedulingEventType.TASK_ARRIVED);
432
433                case GssimTags.GRIDRESOURCE_FAILURE:
434                        return servedEvent.containsKey(SchedulingEventType.RESOURCE_FAILED);
435
436
437                case GssimTags.TASK_REQUESTED_TIME_EXPIRED:
438                        return servedEvent.containsKey(SchedulingEventType.TASK_REQUESTED_TIME_EXPIRED);
439
440                default:
441                        return false;
442                }
443        }
444
445        public double calculateTotalLoad(int size) {
446                // background load, defined during initialization
447                double load;
448                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
449                if (resCalendar == null)
450                        load = 0;
451                else
452                        load = resCalendar.getCurrentLoad();
453
454                double numberOfPE;
455                try {
456                        numberOfPE = resourceManager.getResourcesOfType(ResourceType.CPU).size();
457                } catch (Exception e) {
458                        numberOfPE = 1;
459                }
460                double tasksPerPE = (double) size / numberOfPE;
461                load += Math.min(1.0 - load, tasksPerPE);
462
463                return load;
464        }
465
466        public Accumulator getTotalLoad() {
467                return accTotalLoad_;
468        }
469
470        protected void addTotalLoad(double load) {
471                accTotalLoad_.add(load);
472        }
473       
474        private HashMap<ResourceParameterName, ResourceUnit> chooseResourcesForExecution(String resourceName,
475                        ExecTaskInterface task) {
476
477                ResourceManagerInterface resourceManager = this.resourceManager;
478                if(resourceName != null){
479                        ComputingResource resource = null;
480                        try {
481                                resource = resourceManager.getResourceByName(resourceName);
482                        } catch (ResourceException e) {
483                                return null;
484                        }
485
486                        resourceManager = new ResourceManager(resource);
487                }
488                HashMap<ResourceParameterName, ResourceUnit> map = new HashMap<ResourceParameterName, ResourceUnit>();
489
490                List<ComputingResource> choosenResources = null;
491                int cpuRequest;
492                try {
493                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
494                } catch (NoSuchFieldException e) {
495                        cpuRequest = 1;
496                }
497
498                if (cpuRequest != 0) {
499                        List<? extends ComputingResource> processingElements = null;
500                        try {
501                                Properties properties = new Properties();
502                                properties.setProperty("type", ResourceType.CPU.toString());
503                                processingElements = resourceManager.filterResources(properties);
504                        } catch (Exception e) {
505                                e.printStackTrace();
506                        }
507
508                        choosenResources = new ArrayList<ComputingResource>();
509
510                        for (int i = 0; i < processingElements.size() && cpuRequest > 0; i++) {
511                                if (processingElements.get(i).getStatus() == ResourceStatus.FREE) {
512                                        choosenResources.add(processingElements.get(i));
513                                        cpuRequest--;
514                                }
515                        }
516                        if (cpuRequest > 0)
517                        {       
518                                return null;
519                        }
520                       
521                        ProcessingElements result = new ProcessingElements(ResourceManagerUtils.getCommonParent(choosenResources).getName());
522                        result.addAll(choosenResources);
523                        map.put(ResourceParameterName.PROCESSINGELEMENTS, result);
524                }
525                return  map;
526        }
527
528       
529}
Note: See TracBrowser for help on using the repository browser.