source: xssim/trunk/src/test/rewolucja/scheduling/implementation/LocalManagementSystem.java @ 239

Revision 239, 19.4 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package test.rewolucja.scheduling.implementation;
2
3import eduni.simjava.Sim_event;
4import eduni.simjava.Sim_system;
5import gridsim.Accumulator;
6import gridsim.GridSimTags;
7import gridsim.Gridlet;
8import gridsim.ResourceCalendar;
9import gridsim.gssim.GssimConstants;
10import gridsim.gssim.GssimTags;
11import gridsim.gssim.ResourceHistoryItem;
12import gridsim.gssim.SubmittedTask;
13import gridsim.gssim.filter.SubTaskFilter;
14import grms.shared.constants.BrokerConstants;
15import gssim.schedframe.scheduling.AbstractExecutable;
16import gssim.schedframe.scheduling.ExecTaskInterface;
17import gssim.schedframe.scheduling.Executable;
18
19import java.util.ArrayList;
20import java.util.HashMap;
21import java.util.Iterator;
22import java.util.List;
23import java.util.Map;
24import java.util.Properties;
25
26import org.apache.commons.logging.Log;
27import org.apache.commons.logging.LogFactory;
28import org.joda.time.DateTime;
29import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
30
31import schedframe.resources.units.ResourceUnit;
32import schedframe.scheduling.events.SchedulingEvent;
33import schedframe.scheduling.events.SchedulingEventReason;
34import schedframe.scheduling.events.SchedulingEventType;
35import schedframe.scheduling.events.StartTaskExecutionEvent;
36import schedframe.scheduling.events.TaskCanceledEvent;
37import schedframe.scheduling.events.TaskFinishedEvent;
38import schedframe.scheduling.events.TaskRequestedTimeExpiredEvent;
39import schedframe.scheduling.plugin.SchedulingPluginConfiguration;
40import schedframe.scheduling.plugin.estimation.ExecTimeEstimationPlugin;
41import schedframe.scheduling.plugin.grid.ModuleListImpl;
42import schedframe.scheduling.plugin.grid.ModuleType;
43import schedframe.scheduling.plugin.local.LocalSchedulingPlugin;
44import schedframe.scheduling.utils.ResourceParameterName;
45import simulator.utils.DoubleMath;
46import simulator.utils.InstanceFactory;
47import test.rewolucja.GSSIMJobInterface;
48import test.rewolucja.energy.EnergyEvent;
49import test.rewolucja.energy.EnergyEventType;
50import test.rewolucja.resources.ProcessingElements;
51import test.rewolucja.resources.ResourceStatus;
52import test.rewolucja.resources.ResourceType;
53import test.rewolucja.resources.description.ExecResourceDescription;
54import test.rewolucja.resources.exception.ResourceException;
55import test.rewolucja.resources.logical.base.LogicalResource;
56import test.rewolucja.resources.manager.factory.ResourceManagerFactory;
57import test.rewolucja.resources.manager.implementation.ResourceManager;
58import test.rewolucja.resources.manager.interfaces.ResourceManagerInterface;
59import test.rewolucja.resources.manager.utils.ResourceManagerUtils;
60import test.rewolucja.resources.physical.base.ComputingResource;
61import test.rewolucja.scheduling.UsedResourceList;
62import test.rewolucja.scheduling.plan.AllocationInterfaceNew;
63import test.rewolucja.scheduling.plan.ScheduledTaskInterfaceNew;
64import test.rewolucja.scheduling.plan.SchedulingPlanInterfaceNew;
65import test.rewolucja.task.JobList;
66
67public class LocalManagementSystem extends ManagementSystem {
68
69        private Log log = LogFactory.getLog(LocalManagementSystem.class);
70
71        protected double lastUpdateTime;
72
73        protected Accumulator accTotalLoad_;
74
75        public LocalManagementSystem(String providerId, String entityName, String schedulingPluginClassName,
76                        ExecTimeEstimationPlugin execTimeEstimationPlugin, ExecResourceDescription resourceDescription)
77                        throws Exception {
78
79                super(providerId, entityName, schedulingPluginClassName, execTimeEstimationPlugin, resourceDescription);
80
81                schedulingPlugin = (LocalSchedulingPlugin) InstanceFactory.createInstance(schedulingPluginClassName, LocalSchedulingPlugin.class);
82                if (schedulingPlugin == null) {
83                        throw new Exception("Can not create local scheduling plugin instance.");
84                }
85
86                accTotalLoad_ = new Accumulator();
87                moduleList = new ModuleListImpl(1);
88
89        }
90
91        public void init(LogicalResource logRes) {
92                logicalResource = logRes;
93                resourceManager = (ResourceManager) ResourceManagerFactory.createResourceManager(logicalResource);
94                double load = 0;
95                accTotalLoad_.add(load);
96        }
97
98        public void processEvent(Sim_event ev) {
99
100                updateProcessingProgress();
101
102                int tag = ev.get_tag();
103                Object obj;
104
105                switch (tag) {
106
107                case GssimTags.TIMER:
108                        if (pluginSupportsEvent(tag)) {
109                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER);
110                                SchedulingPlanInterfaceNew decision =  schedulingPlugin.schedule(event,
111                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
112                                executeSchedulingPlan(decision);
113                        }
114                        sendTimerEvent();
115
116                        break;
117
118                case GssimTags.TASK_READY_FOR_EXECUTION:
119                        Executable data = (Executable) ev.get_data();
120
121                        try {
122                                data.setGridletStatus(Gridlet.READY);
123                                if (pluginSupportsEvent(tag)) {
124                                        SchedulingEvent event = new StartTaskExecutionEvent(data.getJobId(), data.getId());
125                                        SchedulingPlanInterfaceNew decision =  schedulingPlugin.schedule(event,
126                                                        queues,  getJobRegistry(), getResourceManager(), moduleList);
127                                        executeSchedulingPlan(decision);
128
129                                }
130                        } catch (Exception e) {
131                                e.printStackTrace();
132                        }
133                        break;
134
135                case GssimTags.TASK_EXECUTION_FINISHED:
136                        obj = ev.get_data();
137                        SubmittedTask task = (SubmittedTask) obj;
138                        if (task.getStatus() == Gridlet.INEXEC) {
139                                task.setGridletStatus(Gridlet.SUCCESS);
140                                task.finalizeGridlet();
141                                log.debug(task.getJobId() + "_" + task.getId() + " finished execution on " + new DateTime());
142                                log.info(GssimConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
143                                UsedResourceList<ResourceHistoryItem> lastUsedList = task.getUsedResources();
144                                Map<ResourceParameterName, ResourceUnit> lastUsed = lastUsedList.getLast()
145                                                .getResourceUnits();
146                                getAllocationManager().freeResources(lastUsed);
147                                ProcessingElements pes = (ProcessingElements) lastUsed.get(ResourceParameterName.PROCESSINGELEMENTS);
148                                for (ComputingResource resource : pes) {
149                                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, task));
150                                }
151                                SubTaskFilter filter = new SubTaskFilter(task.getGridletID(), GssimTags.TASK_REQUESTED_TIME_EXPIRED);
152                                logicalResource.sim_cancel(filter, null);
153                                super.sendFinishJob((AbstractExecutable) task.getGridlet());
154                        }
155
156                        if (pluginSupportsEvent(tag)) {
157                                SchedulingEvent event = new TaskFinishedEvent(task.getJobId(), task.getId());
158                                SchedulingPlanInterfaceNew decision = schedulingPlugin.schedule(event,
159                                                queues, getJobRegistry(), getResourceManager(), moduleList);
160                                executeSchedulingPlan(decision);
161                        }
162
163                        break;
164                case GssimTags.TASK_REQUESTED_TIME_EXPIRED:
165                        obj = ev.get_data();
166                        task = (SubmittedTask) obj;
167                        if (pluginSupportsEvent(tag)) {
168                                SchedulingEvent event = new TaskRequestedTimeExpiredEvent(task.getJobId(), task.getId());
169                                SchedulingPlanInterfaceNew decision = schedulingPlugin.schedule(event,
170                                                queues, getJobRegistry(), getResourceManager(), moduleList);
171                                executeSchedulingPlan(decision);
172                        }
173
174                        break;
175                case GssimTags.UPDATE:
176                        updateProcessingTimes(ev);
177                        break;
178                }
179        }
180       
181        public void notifySubmittedJob(GSSIMJobInterface<?> job, boolean ack) {
182                if (job instanceof AbstractExecutable) {
183                        AbstractExecutable executable = (AbstractExecutable) job;
184                        // int cost =
185                        // this.resourceManager.getResourceCharacteristic().getResUnits() !=
186                        // null ?
187                        // this.resourceManager.getResourceCharacteristic().getResUnits().get(ResourceParameterName.COST).getAmount()
188                        // : 1;
189                        executable.setResourceParameter(logicalResource.get_id(), 1);
190
191                        updateProcessingProgress();
192                        JobList newTasks = new JobList();
193                        SubmittedTask submittedTask = jobRegistry.getSubmittedTask(executable.getJobId(), executable.getId());
194                        if(submittedTask == null)
195                        {       submittedTask = new SubmittedTask((Executable) executable);
196                                jobRegistry.addTask(submittedTask);
197                        }
198
199                        //submittedTask.addToResPath(logicalRes.get_name());
200                        submittedTask.visitResource(logicalResource.get_name());
201                        LogicalResource logicalRes = logicalResource.getParent();
202                        /*while (logicalRes != null && !submittedTask.getResPath().contains(logicalRes.get_name())) {
203                                submittedTask.addToResPath(logicalRes.get_name());
204                                logicalRes = logicalRes.getParent();
205                        }*/
206                        while (logicalRes != null && !submittedTask.getVisitedResources().contains(logicalRes.get_name())) {
207                                submittedTask.visitResource(logicalRes.get_name());
208                                logicalRes = logicalRes.getParent();
209                        }
210                        newTasks.add(submittedTask);
211                        schedulingPlugin.placeJobsInQueues(newTasks, queues, getResourceManager(), moduleList);
212
213                        if (job.getStatus() == Gridlet.QUEUED) {
214                                sendJobReadyEvent(job);
215                        }
216                }
217        }
218
219        public void notifyReturnedJob(GSSIMJobInterface<?> job) {
220                if (pluginSupportsEvent(GssimTags.TASK_EXECUTION_FINISHED)) {
221                        SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
222                        SchedulingPlanInterfaceNew decision =  schedulingPlugin.schedule(event,
223                                        queues, getJobRegistry(), getResourceManager(), moduleList);
224                        executeSchedulingPlan(decision);
225                }
226                if(logicalResource.getParent() != null){
227                        sendFinishJob((AbstractExecutable)job);
228                }
229        }
230
231        public void notifyCanceledJob(GSSIMJobInterface<?> job) {
232
233                if (!pluginSupportsEvent(GridSimTags.GRIDLET_CANCEL))
234                        return;
235
236                Executable executable = (Executable) job;
237                String jobID = executable.getJobId();
238
239                SchedulingPlanInterfaceNew decision = null;
240
241                try {
242
243                        executable.setStatus((int) BrokerConstants.JOB_STATUS_CANCELED);
244
245                        TaskCanceledEvent event = new TaskCanceledEvent(executable.getJobId(), executable.getTaskId());
246                        event.setReason(SchedulingEventReason.RESERVATION_EXCEEDED);
247                        decision = schedulingPlugin
248                                        .schedule(event, queues, getJobRegistry(), getResourceManager(), moduleList);
249
250                        if (decision == null)
251                                return;
252
253                        executeSchedulingPlan(decision);
254
255                } catch (Exception e) {
256                        log.error("Exception during scheduling. " + e.getMessage());
257                        e.printStackTrace();
258                }
259        }
260       
261        protected void executeSchedulingPlan(SchedulingPlanInterfaceNew decision) {
262
263                ArrayList<ScheduledTaskInterfaceNew> taskSchedulingDecisions = decision.getTasks();
264                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
265                        try {
266                                ScheduledTaskInterfaceNew taskDecision = taskSchedulingDecisions.get(i);
267
268                                // not scheduled again are returned to the user.
269                                if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
270                                        continue;
271                                }
272
273                                ArrayList<AllocationInterfaceNew> allocations = taskDecision.getAllocations();
274
275                                GSSIMJobInterface<?> task = taskDecision.getTask();
276                                for (int j = 0; j < allocations.size(); j++) {
277
278                                        AllocationInterfaceNew allocation = allocations.get(j);
279                                        if (allocation.isProcessing()) {
280                                                executeTask(task, allocation.getRequestedResources());
281                                        //} else if(GridSim.getEntityId(allocation.getProviderName()) != -1 || logicalResource.getLogicalResource(allocation.getProviderName())!=null){
282                                        } else if(resourceManager.getResourceProvider(allocation.getProviderName()) != null){
283                                                allocation.setProviderName(resourceManager.getResourceProvider(allocation.getProviderName()));
284                                                submitJob(task, allocation);
285                                        } else {
286                                                executeTask(task, chooseResourcesForExecution(allocation.getProviderName(), (ExecTaskInterface)task));
287                                        }
288                                }
289
290                        } catch (Exception e) {
291                                e.printStackTrace();
292                        }
293                }
294        }
295
296        protected void executeTask(GSSIMJobInterface<?> job, Map<ResourceParameterName, ResourceUnit> choosenResources) {
297                ExecTaskInterface task = (ExecTaskInterface) job;
298                SubmittedTask submittedTask = (SubmittedTask) task;
299
300                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources);
301                if(allocationStatus == false)
302                        return;
303                removeFromQueue(task);
304                double completionPercentage = (submittedTask.getLength() - submittedTask.getRemainingGridletLength())/submittedTask.getLength();
305                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION);
306                int time = Double.valueOf(
307                                forecastFinishTimePlugin.execTimeEstimation(event, choosenResources, task, completionPercentage)).intValue();
308                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime()
309                                + " will finish after " + time);
310
311                if (time < 0.0)
312                        return;
313               
314                submittedTask.setEstimatedDuration(time);
315                DateTime currentTime = new DateTime();
316                ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime);
317                submittedTask.addUsedResources(resHistItem);
318                submittedTask.setFinishTime(currentTime.getMillis() / 1000);
319               
320                jobRegistry.saveHistory(submittedTask, time, choosenResources);
321               
322                logicalResource.sendInternal(time, GssimTags.TASK_EXECUTION_FINISHED,
323                                submittedTask);
324
325                try {
326                        long expectedDuration = submittedTask.getExpectedDuration().getMillis() / 1000;
327                        logicalResource.sendInternal(expectedDuration, GssimTags.TASK_REQUESTED_TIME_EXPIRED, submittedTask);
328                } catch (NoSuchFieldException e) {
329                        double t = submittedTask.getEstimatedDuration();
330                        logicalResource.sendInternal(t, GssimTags.TASK_REQUESTED_TIME_EXPIRED, submittedTask);
331                }
332               
333                submittedTask.setGridletStatus(Gridlet.INEXEC);
334                log.info(GssimConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
335               
336                ProcessingElements pes = (ProcessingElements) choosenResources.get(ResourceParameterName.PROCESSINGELEMENTS);
337                for (ComputingResource resource : pes) {
338                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, submittedTask));
339                }
340
341                /*for(ExecTaskInterface etask : jobRegistry.getRunningTasks()){
342                        System.out.println(etask.getJobId());
343                        for(String taskId: etask.getVisitedResources())
344                                System.out.println("====="+taskId);
345                }*/
346
347        }
348       
349        protected void updateProcessingProgress() {
350                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
351                if (timeSpan <= 0.0) {
352                        // don't update when nothing changed
353                        return;
354                }
355                lastUpdateTime = Sim_system.clock();
356                Iterator<ExecTaskInterface> iter = jobRegistry.getRunningTasks().iterator();
357                while (iter.hasNext()) {
358                        ExecTaskInterface task = iter.next();
359                        SubmittedTask subTask = (SubmittedTask)task;
360                        UsedResourceList<ResourceHistoryItem> usedResourcesList = subTask.getUsedResources();
361                        ResourceUnit unit = usedResourcesList.getLast().getResourceUnits()
362                                        .get(ResourceParameterName.PROCESSINGELEMENTS);
363
364                        double load = getMIShare(timeSpan, (ProcessingElements) unit);
365                        subTask.updateGridletFinishedSoFar(load);
366                        addTotalLoad(load);
367                }
368        }
369
370        private double getMIShare(double timeSpan, ProcessingElements pes) {
371                double localLoad;
372                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
373                if (resCalendar == null)
374                        localLoad = 0;
375                else
376                        // 1 - localLoad_ = available MI share percentage
377                        localLoad = resCalendar.getCurrentLoad();
378
379                int speed = pes.getSpeed();
380                int cnt = pes.getAmount();
381
382                double totalMI = speed * cnt * timeSpan * (1 - localLoad);
383                return totalMI;
384        }
385
386        protected void updateProcessingTimes(Sim_event ev) {
387                updateProcessingProgress();
388                for (ExecTaskInterface task : jobRegistry.getRunningTasks()) {
389                        SubmittedTask subTask = (SubmittedTask)task;
390                        Map<ResourceParameterName, ResourceUnit> choosenResources = subTask.getUsedResources().getLast().getResourceUnits();
391                        double completionPercentage = (task.getLength() - subTask.getRemainingGridletLength())/task.getLength();
392                        double time = forecastFinishTimePlugin.execTimeEstimation(null, choosenResources, task,
393                                        completionPercentage);
394                        /*if(!subTask.getResPath().contains(ev.get_data().toString())) {
395                                continue;*/
396                        if(!subTask.getVisitedResources().contains(ev.get_data().toString())) {
397                                continue;
398                        }// else if( DoubleMath.subtract(subTask.getEstimatedDuration(), (time + lastUpdateTime)) == 0.0 || completionPercentage == 0){
399                        else if( DoubleMath.subtract((subTask.getExecStartTime()+subTask.getEstimatedDuration()), (new DateTime().getMillis()/1000 + time)) == 0.0){
400                                continue;
401                        }
402                        SubTaskFilter filter = new SubTaskFilter(subTask.getGridletID(), GssimTags.TASK_EXECUTION_FINISHED);
403                        logicalResource.sim_cancel(filter, null);
404                        logicalResource.sendInternal(time, GssimTags.TASK_EXECUTION_FINISHED, task);
405
406                }
407        }       
408
409        public boolean pluginSupportsEvent(int eventType) {
410                SchedulingPluginConfiguration config = (SchedulingPluginConfiguration) schedulingPlugin.getConfiguration();
411                if (config == null)
412                        return false;
413
414                Map<SchedulingEventType, Object> servedEvent = config.getServedEvents();
415                if (servedEvent == null)
416                        return false;
417
418                switch (eventType) {
419
420                case GssimTags.TIMER:
421                        return servedEvent.containsKey(SchedulingEventType.TIMER);
422
423                case GssimTags.GRIDLET_SUBMIT:
424                        return servedEvent.containsKey(SchedulingEventType.TASK_ARRIVED);
425
426                case GssimTags.TASK_READY_FOR_EXECUTION:
427                        return servedEvent.containsKey(SchedulingEventType.START_TASK_EXECUTION);
428                case GssimTags.TASK_EXECUTION_FINISHED:
429                        return servedEvent.containsKey(SchedulingEventType.TASK_FINISHED);
430                case GssimTags.GRIDLET_CANCEL:
431                        return servedEvent.containsKey(SchedulingEventType.TASK_CANCELED);
432                case GssimTags.GRIDLET_RESUME:
433                        return servedEvent.containsKey(SchedulingEventType.TASK_ARRIVED);
434
435                case GssimTags.GRIDRESOURCE_FAILURE:
436                        return servedEvent.containsKey(SchedulingEventType.RESOURCE_FAILED);
437
438
439                case GssimTags.TASK_REQUESTED_TIME_EXPIRED:
440                        return servedEvent.containsKey(SchedulingEventType.TASK_REQUESTED_TIME_EXPIRED);
441
442                default:
443                        return false;
444                }
445        }
446
447        public double calculateTotalLoad(int size) {
448                // background load, defined during initialization
449                double load;
450                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
451                if (resCalendar == null)
452                        load = 0;
453                else
454                        load = resCalendar.getCurrentLoad();
455
456                double numberOfPE;
457                try {
458                        numberOfPE = resourceManager.getResourcesOfType(ResourceType.CPU).size();
459                } catch (Exception e) {
460                        numberOfPE = 1;
461                }
462                double tasksPerPE = (double) size / numberOfPE;
463                load += Math.min(1.0 - load, tasksPerPE);
464
465                return load;
466        }
467
468        public Accumulator getTotalLoad() {
469                return accTotalLoad_;
470        }
471
472        protected void addTotalLoad(double load) {
473                accTotalLoad_.add(load);
474        }
475       
476        private HashMap<ResourceParameterName, ResourceUnit> chooseResourcesForExecution(String resourceName,
477                        ExecTaskInterface task) {
478
479                ResourceManagerInterface resourceManager = this.resourceManager;
480                if(resourceName != null){
481                        ComputingResource resource = null;
482                        try {
483                                resource = resourceManager.getResourceByName(resourceName);
484                        } catch (ResourceException e) {
485                                return null;
486                        }
487
488                        resourceManager = new ResourceManager(resource);
489                }
490                HashMap<ResourceParameterName, ResourceUnit> map = new HashMap<ResourceParameterName, ResourceUnit>();
491
492                List<ComputingResource> choosenResources = null;
493                int cpuRequest;
494                try {
495                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
496                } catch (NoSuchFieldException e) {
497                        cpuRequest = 1;
498                }
499
500                if (cpuRequest != 0) {
501                        List<? extends ComputingResource> processingElements = null;
502                        try {
503                                Properties properties = new Properties();
504                                properties.setProperty("type", ResourceType.CPU.toString());
505                                processingElements = resourceManager.filterResources(properties);
506                        } catch (Exception e) {
507                                e.printStackTrace();
508                        }
509
510                        choosenResources = new ArrayList<ComputingResource>();
511
512                        for (int i = 0; i < processingElements.size() && cpuRequest > 0; i++) {
513                                if (processingElements.get(i).getStatus() == ResourceStatus.FREE) {
514                                        choosenResources.add(processingElements.get(i));
515                                        cpuRequest--;
516                                }
517                        }
518                        if (cpuRequest > 0)
519                        {       
520                                return null;
521                        }
522                       
523                        ProcessingElements result = new ProcessingElements(ResourceManagerUtils.getCommonParent(choosenResources).getName());
524                        result.addAll(choosenResources);
525                        map.put(ResourceParameterName.PROCESSINGELEMENTS, result);
526                }
527                return  map;
528        }
529
530       
531}
Note: See TracBrowser for help on using the repository browser.