source: xssim/src/test/rewolucja/scheduling/implementation/LocalManagementSystem.java @ 104

Revision 104, 18.1 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package test.rewolucja.scheduling.implementation;
2
3import eduni.simjava.Sim_event;
4import eduni.simjava.Sim_system;
5import gridsim.Accumulator;
6import gridsim.GridSim;
7import gridsim.GridSimTags;
8import gridsim.Gridlet;
9import gridsim.ResourceCalendar;
10import gridsim.gssim.GssimConstants;
11import gridsim.gssim.GssimTags;
12import gridsim.gssim.ResourceHistoryItem;
13import gridsim.gssim.SubmittedTask;
14import gridsim.gssim.filter.SubTaskFilter;
15import grms.shared.constants.BrokerConstants;
16import gssim.schedframe.scheduling.AbstractExecutable;
17import gssim.schedframe.scheduling.ExecTaskInterface;
18import gssim.schedframe.scheduling.Executable;
19
20import java.util.ArrayList;
21import java.util.HashMap;
22import java.util.Iterator;
23import java.util.List;
24import java.util.Map;
25import java.util.Properties;
26
27import org.apache.commons.logging.Log;
28import org.apache.commons.logging.LogFactory;
29import org.joda.time.DateTime;
30import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
31
32import schedframe.resources.units.ResourceUnit;
33import schedframe.scheduling.events.SchedulingEvent;
34import schedframe.scheduling.events.SchedulingEventReason;
35import schedframe.scheduling.events.SchedulingEventType;
36import schedframe.scheduling.events.StartTaskExecutionEvent;
37import schedframe.scheduling.events.TaskCanceledEvent;
38import schedframe.scheduling.events.TaskFinishedEvent;
39import schedframe.scheduling.plugin.SchedulingPluginConfiguration;
40import schedframe.scheduling.plugin.estimation.ExecTimeEstimationPlugin;
41import schedframe.scheduling.plugin.grid.ModuleListImpl;
42import schedframe.scheduling.plugin.grid.ModuleType;
43import schedframe.scheduling.plugin.local.LocalSchedulingPlugin;
44import schedframe.scheduling.utils.ResourceParameterName;
45import simulator.utils.DoubleMath;
46import simulator.utils.InstanceFactory;
47import test.rewolucja.GSSIMJobInterface;
48import test.rewolucja.energy.EnergyEvent;
49import test.rewolucja.energy.EnergyEventType;
50import test.rewolucja.resources.ProcessingElements;
51import test.rewolucja.resources.ResourceStatus;
52import test.rewolucja.resources.ResourceType;
53import test.rewolucja.resources.description.ExecResourceDescription;
54import test.rewolucja.resources.exception.ResourceException;
55import test.rewolucja.resources.logical.LogicalResource;
56import test.rewolucja.resources.manager.factory.ResourceManagerFactory;
57import test.rewolucja.resources.manager.implementation.ResourceManager;
58import test.rewolucja.resources.manager.interfaces.ResourceManagerInterface;
59import test.rewolucja.resources.manager.utils.ResourceManagerUtils;
60import test.rewolucja.resources.physical.base.ComputingResource;
61import test.rewolucja.scheduling.plan.AllocationInterfaceNew;
62import test.rewolucja.scheduling.plan.ScheduledTaskInterfaceNew;
63import test.rewolucja.scheduling.plan.SchedulingPlanInterfaceNew;
64import test.rewolucja.task.JobList;
65
66public class LocalManagementSystem extends ManagementSystem {
67
68        private Log log = LogFactory.getLog(LocalManagementSystem.class);
69
70        protected double lastUpdateTime;
71
72        protected Accumulator accTotalLoad_;
73
74        public LocalManagementSystem(String providerId, String entityName, String schedulingPluginClassName,
75                        ExecTimeEstimationPlugin execTimeEstimationPlugin, ExecResourceDescription resourceDescription)
76                        throws Exception {
77
78                super(providerId, entityName, schedulingPluginClassName, execTimeEstimationPlugin, resourceDescription);
79
80                schedulingPlugin = (LocalSchedulingPlugin) InstanceFactory.createInstance(schedulingPluginClassName, LocalSchedulingPlugin.class);
81                if (schedulingPlugin == null) {
82                        throw new Exception("Can not create local scheduling plugin instance.");
83                }
84
85                accTotalLoad_ = new Accumulator();
86                moduleList = new ModuleListImpl(1);
87
88        }
89
90        public void init(LogicalResource logRes) {
91                logicalResource = logRes;
92                resourceManager = (ResourceManager) ResourceManagerFactory.createResourceManager(logicalResource);
93                double load = 0;
94                accTotalLoad_.add(load);
95        }
96
97        public void processEvent(Sim_event ev) {
98
99                updateProcessingProgress();
100
101                int tag = ev.get_tag();
102                Object obj;
103
104                switch (tag) {
105
106                case GssimTags.TIMER:
107                        if (pluginSupportsEvent(tag)) {
108                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER);
109                                SchedulingPlanInterfaceNew decision =  schedulingPlugin.schedule(event,
110                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
111                                executeSchedulingPlan(decision);
112                        }
113                        sendTimerEvent();
114
115                        break;
116
117                case GssimTags.TASK_READY_FOR_EXECUTION:
118                        Executable data = (Executable) ev.get_data();
119
120                        try {
121                                data.setGridletStatus(Gridlet.READY);
122                                if (pluginSupportsEvent(tag)) {
123                                        SchedulingEvent event = new StartTaskExecutionEvent(data.getJobId(), data.getId());
124                                        SchedulingPlanInterfaceNew decision =  schedulingPlugin.schedule(event,
125                                                        queues,  getJobRegistry(), getResourceManager(), moduleList);
126                                        executeSchedulingPlan(decision);
127
128                                }
129                        } catch (Exception e) {
130                                e.printStackTrace();
131                        }
132                        break;
133
134                case GssimTags.TASK_EXECUTION_FINISHED:
135                        obj = ev.get_data();
136                        SubmittedTask task = (SubmittedTask) obj;
137                        if (task.getStatus() == Gridlet.INEXEC) {
138                                task.setGridletStatus(Gridlet.SUCCESS);
139                                task.finalizeGridlet();
140                                log.debug(task.getJobId() + "_" + task.getId() + " finished execution on " + new DateTime());
141                                log.info(GssimConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
142                                List<ResourceHistoryItem> lastUsedList = task.getUsedResources();
143                                Map<ResourceParameterName, ResourceUnit> lastUsed = lastUsedList.get(lastUsedList.size() - 1)
144                                                .getResourceUnits();
145                                getAllocationManager().freeResources(lastUsed);
146                                ProcessingElements pes = (ProcessingElements) lastUsed.get(ResourceParameterName.PROCESSINGELEMENTS);
147                                for (ComputingResource resource : pes) {
148                                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, task));
149                                }
150                                super.sendFinishJob((AbstractExecutable) task.getGridlet());
151                        }
152
153                        if (pluginSupportsEvent(tag)) {
154                                SchedulingEvent event = new TaskFinishedEvent(task.getJobId(), task.getId());
155                                SchedulingPlanInterfaceNew decision = schedulingPlugin.schedule(event,
156                                                queues, getJobRegistry(), getResourceManager(), moduleList);
157                                executeSchedulingPlan(decision);
158                        }
159
160                        break;
161                case GssimTags.TASK_REQUESTED_TIME_EXPIRED:
162                        obj = ev.get_data();
163
164                        if (pluginSupportsEvent(tag)) {
165                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_REQUESTED_TIME_EXPIRED);
166                                SchedulingPlanInterfaceNew decision = schedulingPlugin.schedule(event,
167                                                queues, getJobRegistry(), getResourceManager(), moduleList);
168                                executeSchedulingPlan(decision);
169                        }
170
171                        break;
172                case GssimTags.UPDATE:
173                        updateProcessingTimes(ev);
174                        break;
175                }
176        }
177       
178        public void notifySubmittedJob(GSSIMJobInterface<?> job, boolean ack) {
179                if (job instanceof AbstractExecutable) {
180                        AbstractExecutable executable = (AbstractExecutable) job;
181                        // int cost =
182                        // this.resourceManager.getResourceCharacteristic().getResUnits() !=
183                        // null ?
184                        // this.resourceManager.getResourceCharacteristic().getResUnits().get(ResourceParameterName.COST).getAmount()
185                        // : 1;
186                        executable.setResourceParameter(logicalResource.get_id(), 1);
187
188                        updateProcessingProgress();
189                        JobList newTasks = new JobList();
190                        SubmittedTask submittedTask = jobRegistry.getSubmittedTask(executable.getJobId(), executable.getId());
191                        if(submittedTask == null)
192                        {       submittedTask = new SubmittedTask((Executable) executable);
193                                jobRegistry.addTask(submittedTask);
194                        }
195                        LogicalResource logicalRes = logicalResource;
196                        submittedTask.addToResPath(logicalRes.get_name());
197                        while (logicalRes != null && !submittedTask.getResPath().contains(logicalRes.get_name())) {
198                                submittedTask.addToResPath(logicalRes.get_name());
199                                logicalRes = logicalRes.getParent();
200                        }
201                        newTasks.add(submittedTask);
202                        schedulingPlugin.placeJobsInQueues(newTasks, queues, getResourceManager(), moduleList);
203
204                        if (job.getStatus() == Gridlet.QUEUED) {
205                                sendJobReadyEvent(job);
206                        }
207                }
208        }
209
210        public void notifyReturnedJob(GSSIMJobInterface<?> job) {
211                if (pluginSupportsEvent(GssimTags.TASK_EXECUTION_FINISHED)) {
212                        SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
213                        SchedulingPlanInterfaceNew decision =  schedulingPlugin.schedule(event,
214                                        queues, getJobRegistry(), getResourceManager(), moduleList);
215                        executeSchedulingPlan(decision);
216                }
217                if(logicalResource.getParent() != null){
218                        sendFinishJob((AbstractExecutable)job);
219                }
220        }
221
222        public void notifyCanceledJob(GSSIMJobInterface<?> job) {
223
224                if (!pluginSupportsEvent(GridSimTags.GRIDLET_CANCEL))
225                        return;
226
227                Executable executable = (Executable) job;
228                String jobID = executable.getJobId();
229
230                SchedulingPlanInterfaceNew decision = null;
231
232                try {
233
234                        executable.setStatus((int) BrokerConstants.JOB_STATUS_CANCELED);
235
236                        TaskCanceledEvent event = new TaskCanceledEvent(executable.getJobId(), executable.getTaskId());
237                        event.setReason(SchedulingEventReason.RESERVATION_EXCEEDED);
238                        decision = schedulingPlugin
239                                        .schedule(event, queues, getJobRegistry(), getResourceManager(), moduleList);
240
241                        if (decision == null)
242                                return;
243
244                        executeSchedulingPlan(decision);
245
246                } catch (Exception e) {
247                        log.error("Exception during scheduling. " + e.getMessage());
248                        e.printStackTrace();
249                }
250        }
251       
252        protected void executeSchedulingPlan(SchedulingPlanInterfaceNew decision) {
253
254                ArrayList<ScheduledTaskInterfaceNew> taskSchedulingDecisions = decision.getTasks();
255                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
256                        try {
257                                ScheduledTaskInterfaceNew taskDecision = taskSchedulingDecisions.get(i);
258
259                                // not scheduled again are returned to the user.
260                                if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
261                                        continue;
262                                }
263
264                                ArrayList<AllocationInterfaceNew> allocations = taskDecision.getAllocations();
265
266                                GSSIMJobInterface<?> task = taskDecision.getTask();
267                                for (int j = 0; j < allocations.size(); j++) {
268
269                                        AllocationInterfaceNew allocation = allocations.get(j);
270                                        if (allocation.isProcessing()) {
271                                                executeTask(task, allocation.getRequestedResources());
272                                        //} else if(GridSim.getEntityId(allocation.getProviderName()) != -1 || logicalResource.getLogicalResource(allocation.getProviderName())!=null){
273                                        } else if(resourceManager.getResourceProvider(allocation.getProviderName()) != null){
274                                                allocation.setProviderName(resourceManager.getResourceProvider(allocation.getProviderName()));
275                                                submitJob(task, allocation);
276                                        } else {
277                                                executeTask(task, chooseResourcesForExecution(allocation.getProviderName(), (ExecTaskInterface)task));
278                                        }
279                                }
280
281                        } catch (Exception e) {
282                                e.printStackTrace();
283                        }
284                }
285        }
286
287        protected void executeTask(GSSIMJobInterface<?> job, Map<ResourceParameterName, ResourceUnit> choosenResources) {
288                ExecTaskInterface task = (ExecTaskInterface) job;
289                SubmittedTask submittedTask = (SubmittedTask) task;
290
291                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources);
292                if(allocationStatus == false)
293                        return;
294                removeFromQueue(task);
295                double completionPercentage = (submittedTask.getLength() - submittedTask.getRemainingGridletLength())/submittedTask.getLength();
296                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION);
297                int time = Double.valueOf(
298                                forecastFinishTimePlugin.execTimeEstimation(event, choosenResources, task, completionPercentage)).intValue();
299                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime()
300                                + " will finish after " + time);
301
302                if (time < 0.0)
303                        return;
304                submittedTask.setGridletStatus(Gridlet.INEXEC);
305                log.info(GssimConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
306               
307                jobRegistry.saveHistory(submittedTask, time, choosenResources);
308               
309                logicalResource.sendInternal(time, GssimTags.TASK_EXECUTION_FINISHED,
310                                submittedTask);
311
312                try {
313                        long expectedDuration = submittedTask.getExpectedDuration().getMillis() / 1000;
314                        logicalResource.sendInternal(expectedDuration, GssimTags.TASK_REQUESTED_TIME_EXPIRED, submittedTask);
315                } catch (NoSuchFieldException e) {
316                        double t = submittedTask.getEstimatedDuration();
317                        logicalResource.sendInternal(t, GssimTags.TASK_REQUESTED_TIME_EXPIRED, submittedTask);
318                }
319               
320                ProcessingElements pes = (ProcessingElements) choosenResources.get(ResourceParameterName.PROCESSINGELEMENTS);
321                for (ComputingResource resource : pes) {
322                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, submittedTask));
323                }
324
325
326        }
327       
328        protected void updateProcessingProgress() {
329                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
330                if (timeSpan <= 0.0) {
331                        // don't update when nothing changed
332                        return;
333                }
334                lastUpdateTime = Sim_system.clock();
335                Iterator<ExecTaskInterface> iter = jobRegistry.getRunningTasks().iterator();
336                while (iter.hasNext()) {
337                        ExecTaskInterface task = iter.next();
338                        SubmittedTask subTask = (SubmittedTask)task;
339                        List<ResourceHistoryItem> usedResourcesList = subTask.getUsedResources();
340                        ResourceUnit unit = usedResourcesList.get(usedResourcesList.size() - 1).getResourceUnits()
341                                        .get(ResourceParameterName.PROCESSINGELEMENTS);
342
343                        double load = getMIShare(timeSpan, (ProcessingElements) unit);
344                        subTask.updateGridletFinishedSoFar(load);
345                        addTotalLoad(load);
346                }
347        }
348
349        private double getMIShare(double timeSpan, ProcessingElements pes) {
350                double localLoad;
351                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
352                if (resCalendar == null)
353                        localLoad = 0;
354                else
355                        // 1 - localLoad_ = available MI share percentage
356                        localLoad = resCalendar.getCurrentLoad();
357
358                int speed = pes.getSpeed();
359                int cnt = pes.getAmount();
360
361                double totalMI = speed * cnt * timeSpan * (1 - localLoad);
362                return totalMI;
363        }
364
365        protected void updateProcessingTimes(Sim_event ev) {
366                updateProcessingProgress();
367                for (ExecTaskInterface task : jobRegistry.getRunningTasks()) {
368                        SubmittedTask subTask = (SubmittedTask)task;
369                        Map<ResourceParameterName, ResourceUnit> choosenResources = subTask.getUsedResources()
370                                        .get(subTask.getUsedResources().size() - 1).getResourceUnits();
371                        double completionPercentage = (task.getLength() - subTask.getRemainingGridletLength())/task.getLength();
372                        double time = forecastFinishTimePlugin.execTimeEstimation(null, choosenResources, task,
373                                        completionPercentage);
374                        if(!subTask.getResPath().contains(ev.get_data().toString())) {
375                                continue;
376                        } else if( DoubleMath.subtract(subTask.getEstimatedDuration(), (time + lastUpdateTime)) == 0.0){
377                                continue;
378                        }
379                        SubTaskFilter filter = new SubTaskFilter(subTask.getGridletID(), GssimTags.TASK_EXECUTION_FINISHED);
380                        logicalResource.sim_cancel(filter, null);
381                        logicalResource.sendInternal(time, GssimTags.TASK_EXECUTION_FINISHED, task);
382
383                }
384        }       
385
386        public boolean pluginSupportsEvent(int eventType) {
387                SchedulingPluginConfiguration config = (SchedulingPluginConfiguration) schedulingPlugin.getConfiguration();
388                if (config == null)
389                        return false;
390
391                Map<SchedulingEventType, Object> servedEvent = config.getServedEvents();
392                if (servedEvent == null)
393                        return false;
394
395                switch (eventType) {
396
397                case GssimTags.TIMER:
398                        return servedEvent.containsKey(SchedulingEventType.TIMER);
399
400                case GssimTags.GRIDLET_SUBMIT:
401                        return servedEvent.containsKey(SchedulingEventType.TASK_ARRIVED);
402
403                case GssimTags.TASK_READY_FOR_EXECUTION:
404                        return servedEvent.containsKey(SchedulingEventType.START_TASK_EXECUTION);
405                case GssimTags.TASK_EXECUTION_FINISHED:
406                        return servedEvent.containsKey(SchedulingEventType.TASK_FINISHED);
407                case GssimTags.GRIDLET_CANCEL:
408                        return servedEvent.containsKey(SchedulingEventType.TASK_CANCELED);
409                case GssimTags.GRIDLET_RESUME:
410                        return servedEvent.containsKey(SchedulingEventType.TASK_ARRIVED);
411
412                case GssimTags.GRIDRESOURCE_FAILURE:
413                        return servedEvent.containsKey(SchedulingEventType.RESOURCE_FAILED);
414
415
416                case GssimTags.TASK_REQUESTED_TIME_EXPIRED:
417                        return servedEvent.containsKey(SchedulingEventType.TASK_REQUESTED_TIME_EXPIRED);
418
419                default:
420                        return false;
421                }
422        }
423
424        public double calculateTotalLoad(int size) {
425                // background load, defined during initialization
426                double load;
427                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
428                if (resCalendar == null)
429                        load = 0;
430                else
431                        load = resCalendar.getCurrentLoad();
432
433                double numberOfPE;
434                try {
435                        numberOfPE = resourceManager.getResourcesOfType(ResourceType.CPU).size();
436                } catch (Exception e) {
437                        numberOfPE = 1;
438                }
439                double tasksPerPE = (double) size / numberOfPE;
440                load += Math.min(1.0 - load, tasksPerPE);
441
442                return load;
443        }
444
445        public Accumulator getTotalLoad() {
446                return accTotalLoad_;
447        }
448
449        protected void addTotalLoad(double load) {
450                accTotalLoad_.add(load);
451        }
452       
453        private HashMap<ResourceParameterName, ResourceUnit> chooseResourcesForExecution(String resourceName,
454                        ExecTaskInterface task) {
455
456                ResourceManagerInterface resourceManager = this.resourceManager;
457                if(resourceName != null){
458                        ComputingResource resource = null;
459                        try {
460                                resource = resourceManager.getResourceByName(resourceName);
461                        } catch (ResourceException e) {
462                                return null;
463                        }
464
465                        resourceManager = new ResourceManager(resource);
466                }
467                HashMap<ResourceParameterName, ResourceUnit> map = new HashMap<ResourceParameterName, ResourceUnit>();
468
469                List<ComputingResource> choosenResources = null;
470                int cpuRequest;
471                try {
472                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
473                } catch (NoSuchFieldException e) {
474                        cpuRequest = 1;
475                }
476
477                if (cpuRequest != 0) {
478                        List<? extends ComputingResource> processingElements = null;
479                        try {
480                                Properties properties = new Properties();
481                                properties.setProperty("type", ResourceType.CPU.toString());
482                                processingElements = resourceManager.filterResources(properties);
483                        } catch (Exception e) {
484                                e.printStackTrace();
485                        }
486
487                        choosenResources = new ArrayList<ComputingResource>();
488
489                        for (int i = 0; i < processingElements.size() && cpuRequest > 0; i++) {
490                                if (processingElements.get(i).getStatus() == ResourceStatus.FREE) {
491                                        choosenResources.add(processingElements.get(i));
492                                        cpuRequest--;
493                                }
494                        }
495                        if (cpuRequest > 0)
496                        {       
497                                return null;
498                        }
499                       
500                        ProcessingElements result = new ProcessingElements(ResourceManagerUtils.getCommonParent(choosenResources).getName());
501                        result.addAll(choosenResources);
502                        map.put(ResourceParameterName.PROCESSINGELEMENTS, result);
503                }
504                return  map;
505        }
506
507       
508}
Note: See TracBrowser for help on using the repository browser.