source: xssim/trunk/src/test/rewolucja/scheduling/implementation/LocalManagementSystem.java @ 175

Revision 175, 19.2 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package test.rewolucja.scheduling.implementation;
2
3import eduni.simjava.Sim_event;
4import eduni.simjava.Sim_system;
5import gridsim.Accumulator;
6import gridsim.GridSimTags;
7import gridsim.Gridlet;
8import gridsim.ResourceCalendar;
9import gridsim.gssim.GssimConstants;
10import gridsim.gssim.GssimTags;
11import gridsim.gssim.ResourceHistoryItem;
12import gridsim.gssim.SubmittedTask;
13import gridsim.gssim.filter.SubTaskFilter;
14import grms.shared.constants.BrokerConstants;
15import gssim.schedframe.scheduling.AbstractExecutable;
16import gssim.schedframe.scheduling.ExecTaskInterface;
17import gssim.schedframe.scheduling.Executable;
18
19import java.util.ArrayList;
20import java.util.HashMap;
21import java.util.Iterator;
22import java.util.List;
23import java.util.Map;
24import java.util.Properties;
25
26import org.apache.commons.logging.Log;
27import org.apache.commons.logging.LogFactory;
28import org.joda.time.DateTime;
29import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
30
31import schedframe.resources.units.ResourceUnit;
32import schedframe.scheduling.events.SchedulingEvent;
33import schedframe.scheduling.events.SchedulingEventReason;
34import schedframe.scheduling.events.SchedulingEventType;
35import schedframe.scheduling.events.StartTaskExecutionEvent;
36import schedframe.scheduling.events.TaskCanceledEvent;
37import schedframe.scheduling.events.TaskFinishedEvent;
38import schedframe.scheduling.events.TaskRequestedTimeExpiredEvent;
39import schedframe.scheduling.plugin.SchedulingPluginConfiguration;
40import schedframe.scheduling.plugin.estimation.ExecTimeEstimationPlugin;
41import schedframe.scheduling.plugin.grid.ModuleListImpl;
42import schedframe.scheduling.plugin.grid.ModuleType;
43import schedframe.scheduling.plugin.local.LocalSchedulingPlugin;
44import schedframe.scheduling.utils.ResourceParameterName;
45import simulator.utils.DoubleMath;
46import simulator.utils.InstanceFactory;
47import test.rewolucja.GSSIMJobInterface;
48import test.rewolucja.energy.EnergyEvent;
49import test.rewolucja.energy.EnergyEventType;
50import test.rewolucja.resources.ProcessingElements;
51import test.rewolucja.resources.ResourceStatus;
52import test.rewolucja.resources.ResourceType;
53import test.rewolucja.resources.description.ExecResourceDescription;
54import test.rewolucja.resources.exception.ResourceException;
55import test.rewolucja.resources.logical.LogicalResource;
56import test.rewolucja.resources.manager.factory.ResourceManagerFactory;
57import test.rewolucja.resources.manager.implementation.ResourceManager;
58import test.rewolucja.resources.manager.interfaces.ResourceManagerInterface;
59import test.rewolucja.resources.manager.utils.ResourceManagerUtils;
60import test.rewolucja.resources.physical.base.ComputingResource;
61import test.rewolucja.scheduling.UsedResourceList;
62import test.rewolucja.scheduling.plan.AllocationInterfaceNew;
63import test.rewolucja.scheduling.plan.ScheduledTaskInterfaceNew;
64import test.rewolucja.scheduling.plan.SchedulingPlanInterfaceNew;
65import test.rewolucja.task.JobList;
66
67public class LocalManagementSystem extends ManagementSystem {
68
69        private Log log = LogFactory.getLog(LocalManagementSystem.class);
70
71        protected double lastUpdateTime;
72
73        protected Accumulator accTotalLoad_;
74
75        public LocalManagementSystem(String providerId, String entityName, String schedulingPluginClassName,
76                        ExecTimeEstimationPlugin execTimeEstimationPlugin, ExecResourceDescription resourceDescription)
77                        throws Exception {
78
79                super(providerId, entityName, schedulingPluginClassName, execTimeEstimationPlugin, resourceDescription);
80
81                schedulingPlugin = (LocalSchedulingPlugin) InstanceFactory.createInstance(schedulingPluginClassName, LocalSchedulingPlugin.class);
82                if (schedulingPlugin == null) {
83                        throw new Exception("Can not create local scheduling plugin instance.");
84                }
85
86                accTotalLoad_ = new Accumulator();
87                moduleList = new ModuleListImpl(1);
88
89        }
90
91        public void init(LogicalResource logRes) {
92                logicalResource = logRes;
93                resourceManager = (ResourceManager) ResourceManagerFactory.createResourceManager(logicalResource);
94                double load = 0;
95                accTotalLoad_.add(load);
96        }
97
98        public void processEvent(Sim_event ev) {
99
100                updateProcessingProgress();
101
102                int tag = ev.get_tag();
103                Object obj;
104
105                switch (tag) {
106
107                case GssimTags.TIMER:
108                        if (pluginSupportsEvent(tag)) {
109                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER);
110                                SchedulingPlanInterfaceNew decision =  schedulingPlugin.schedule(event,
111                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
112                                executeSchedulingPlan(decision);
113                        }
114                        sendTimerEvent();
115
116                        break;
117
118                case GssimTags.TASK_READY_FOR_EXECUTION:
119                        Executable data = (Executable) ev.get_data();
120
121                        try {
122                                data.setGridletStatus(Gridlet.READY);
123                                if (pluginSupportsEvent(tag)) {
124                                        SchedulingEvent event = new StartTaskExecutionEvent(data.getJobId(), data.getId());
125                                        SchedulingPlanInterfaceNew decision =  schedulingPlugin.schedule(event,
126                                                        queues,  getJobRegistry(), getResourceManager(), moduleList);
127                                        executeSchedulingPlan(decision);
128
129                                }
130                        } catch (Exception e) {
131                                e.printStackTrace();
132                        }
133                        break;
134
135                case GssimTags.TASK_EXECUTION_FINISHED:
136                        obj = ev.get_data();
137                        SubmittedTask task = (SubmittedTask) obj;
138                        if (task.getStatus() == Gridlet.INEXEC) {
139                                task.setGridletStatus(Gridlet.SUCCESS);
140                                task.finalizeGridlet();
141                                log.debug(task.getJobId() + "_" + task.getId() + " finished execution on " + new DateTime());
142                                log.info(GssimConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
143                                UsedResourceList<ResourceHistoryItem> lastUsedList = task.getUsedResources();
144                                Map<ResourceParameterName, ResourceUnit> lastUsed = lastUsedList.getLast()
145                                                .getResourceUnits();
146                                getAllocationManager().freeResources(lastUsed);
147                                ProcessingElements pes = (ProcessingElements) lastUsed.get(ResourceParameterName.PROCESSINGELEMENTS);
148                                for (ComputingResource resource : pes) {
149                                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, task));
150                                }
151                                super.sendFinishJob((AbstractExecutable) task.getGridlet());
152                        }
153
154                        if (pluginSupportsEvent(tag)) {
155                                SchedulingEvent event = new TaskFinishedEvent(task.getJobId(), task.getId());
156                                SchedulingPlanInterfaceNew decision = schedulingPlugin.schedule(event,
157                                                queues, getJobRegistry(), getResourceManager(), moduleList);
158                                executeSchedulingPlan(decision);
159                        }
160
161                        break;
162                case GssimTags.TASK_REQUESTED_TIME_EXPIRED:
163                        obj = ev.get_data();
164                        task = (SubmittedTask) obj;
165                        if (pluginSupportsEvent(tag)) {
166                                SchedulingEvent event = new TaskRequestedTimeExpiredEvent(task.getJobId(), task.getId());
167                                SchedulingPlanInterfaceNew decision = schedulingPlugin.schedule(event,
168                                                queues, getJobRegistry(), getResourceManager(), moduleList);
169                                executeSchedulingPlan(decision);
170                        }
171
172                        break;
173                case GssimTags.UPDATE:
174                        updateProcessingTimes(ev);
175                        break;
176                }
177        }
178       
179        public void notifySubmittedJob(GSSIMJobInterface<?> job, boolean ack) {
180                if (job instanceof AbstractExecutable) {
181                        AbstractExecutable executable = (AbstractExecutable) job;
182                        // int cost =
183                        // this.resourceManager.getResourceCharacteristic().getResUnits() !=
184                        // null ?
185                        // this.resourceManager.getResourceCharacteristic().getResUnits().get(ResourceParameterName.COST).getAmount()
186                        // : 1;
187                        executable.setResourceParameter(logicalResource.get_id(), 1);
188
189                        updateProcessingProgress();
190                        JobList newTasks = new JobList();
191                        SubmittedTask submittedTask = jobRegistry.getSubmittedTask(executable.getJobId(), executable.getId());
192                        if(submittedTask == null)
193                        {       submittedTask = new SubmittedTask((Executable) executable);
194                                jobRegistry.addTask(submittedTask);
195                        }
196
197                        //submittedTask.addToResPath(logicalRes.get_name());
198                        submittedTask.visitResource(logicalResource.get_name());
199                        LogicalResource logicalRes = logicalResource.getParent();
200                        /*while (logicalRes != null && !submittedTask.getResPath().contains(logicalRes.get_name())) {
201                                submittedTask.addToResPath(logicalRes.get_name());
202                                logicalRes = logicalRes.getParent();
203                        }*/
204                        while (logicalRes != null && !submittedTask.getVisitedResources().contains(logicalRes.get_name())) {
205                                submittedTask.visitResource(logicalRes.get_name());
206                                logicalRes = logicalRes.getParent();
207                        }
208                        newTasks.add(submittedTask);
209                        schedulingPlugin.placeJobsInQueues(newTasks, queues, getResourceManager(), moduleList);
210
211                        if (job.getStatus() == Gridlet.QUEUED) {
212                                sendJobReadyEvent(job);
213                        }
214                }
215        }
216
217        public void notifyReturnedJob(GSSIMJobInterface<?> job) {
218                if (pluginSupportsEvent(GssimTags.TASK_EXECUTION_FINISHED)) {
219                        SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
220                        SchedulingPlanInterfaceNew decision =  schedulingPlugin.schedule(event,
221                                        queues, getJobRegistry(), getResourceManager(), moduleList);
222                        executeSchedulingPlan(decision);
223                }
224                if(logicalResource.getParent() != null){
225                        sendFinishJob((AbstractExecutable)job);
226                }
227        }
228
229        public void notifyCanceledJob(GSSIMJobInterface<?> job) {
230
231                if (!pluginSupportsEvent(GridSimTags.GRIDLET_CANCEL))
232                        return;
233
234                Executable executable = (Executable) job;
235                String jobID = executable.getJobId();
236
237                SchedulingPlanInterfaceNew decision = null;
238
239                try {
240
241                        executable.setStatus((int) BrokerConstants.JOB_STATUS_CANCELED);
242
243                        TaskCanceledEvent event = new TaskCanceledEvent(executable.getJobId(), executable.getTaskId());
244                        event.setReason(SchedulingEventReason.RESERVATION_EXCEEDED);
245                        decision = schedulingPlugin
246                                        .schedule(event, queues, getJobRegistry(), getResourceManager(), moduleList);
247
248                        if (decision == null)
249                                return;
250
251                        executeSchedulingPlan(decision);
252
253                } catch (Exception e) {
254                        log.error("Exception during scheduling. " + e.getMessage());
255                        e.printStackTrace();
256                }
257        }
258       
259        protected void executeSchedulingPlan(SchedulingPlanInterfaceNew decision) {
260
261                ArrayList<ScheduledTaskInterfaceNew> taskSchedulingDecisions = decision.getTasks();
262                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
263                        try {
264                                ScheduledTaskInterfaceNew taskDecision = taskSchedulingDecisions.get(i);
265
266                                // not scheduled again are returned to the user.
267                                if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
268                                        continue;
269                                }
270
271                                ArrayList<AllocationInterfaceNew> allocations = taskDecision.getAllocations();
272
273                                GSSIMJobInterface<?> task = taskDecision.getTask();
274                                for (int j = 0; j < allocations.size(); j++) {
275
276                                        AllocationInterfaceNew allocation = allocations.get(j);
277                                        if (allocation.isProcessing()) {
278                                                executeTask(task, allocation.getRequestedResources());
279                                        //} else if(GridSim.getEntityId(allocation.getProviderName()) != -1 || logicalResource.getLogicalResource(allocation.getProviderName())!=null){
280                                        } else if(resourceManager.getResourceProvider(allocation.getProviderName()) != null){
281                                                allocation.setProviderName(resourceManager.getResourceProvider(allocation.getProviderName()));
282                                                submitJob(task, allocation);
283                                        } else {
284                                                executeTask(task, chooseResourcesForExecution(allocation.getProviderName(), (ExecTaskInterface)task));
285                                        }
286                                }
287
288                        } catch (Exception e) {
289                                e.printStackTrace();
290                        }
291                }
292        }
293
294        protected void executeTask(GSSIMJobInterface<?> job, Map<ResourceParameterName, ResourceUnit> choosenResources) {
295                ExecTaskInterface task = (ExecTaskInterface) job;
296                SubmittedTask submittedTask = (SubmittedTask) task;
297
298                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources);
299                if(allocationStatus == false)
300                        return;
301                removeFromQueue(task);
302                double completionPercentage = (submittedTask.getLength() - submittedTask.getRemainingGridletLength())/submittedTask.getLength();
303                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION);
304                int time = Double.valueOf(
305                                forecastFinishTimePlugin.execTimeEstimation(event, choosenResources, task, completionPercentage)).intValue();
306                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime()
307                                + " will finish after " + time);
308
309                if (time < 0.0)
310                        return;
311               
312                submittedTask.setEstimatedDuration(time);
313                DateTime currentTime = new DateTime();
314                ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime);
315                submittedTask.addUsedResources(resHistItem);
316                submittedTask.setFinishTime(currentTime.getMillis() / 1000);
317               
318                jobRegistry.saveHistory(submittedTask, time, choosenResources);
319               
320                logicalResource.sendInternal(time, GssimTags.TASK_EXECUTION_FINISHED,
321                                submittedTask);
322
323                try {
324                        long expectedDuration = submittedTask.getExpectedDuration().getMillis() / 1000;
325                        logicalResource.sendInternal(expectedDuration, GssimTags.TASK_REQUESTED_TIME_EXPIRED, submittedTask);
326                } catch (NoSuchFieldException e) {
327                        double t = submittedTask.getEstimatedDuration();
328                        logicalResource.sendInternal(t, GssimTags.TASK_REQUESTED_TIME_EXPIRED, submittedTask);
329                }
330               
331                submittedTask.setGridletStatus(Gridlet.INEXEC);
332                log.info(GssimConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
333               
334                ProcessingElements pes = (ProcessingElements) choosenResources.get(ResourceParameterName.PROCESSINGELEMENTS);
335                for (ComputingResource resource : pes) {
336                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, submittedTask));
337                }
338
339                /*for(ExecTaskInterface etask : jobRegistry.getRunningTasks()){
340                        System.out.println(etask.getJobId());
341                        for(String taskId: etask.getVisitedResources())
342                                System.out.println("====="+taskId);
343                }*/
344
345        }
346       
347        protected void updateProcessingProgress() {
348                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
349                if (timeSpan <= 0.0) {
350                        // don't update when nothing changed
351                        return;
352                }
353                lastUpdateTime = Sim_system.clock();
354                Iterator<ExecTaskInterface> iter = jobRegistry.getRunningTasks().iterator();
355                while (iter.hasNext()) {
356                        ExecTaskInterface task = iter.next();
357                        SubmittedTask subTask = (SubmittedTask)task;
358                        UsedResourceList<ResourceHistoryItem> usedResourcesList = subTask.getUsedResources();
359                        ResourceUnit unit = usedResourcesList.getLast().getResourceUnits()
360                                        .get(ResourceParameterName.PROCESSINGELEMENTS);
361
362                        double load = getMIShare(timeSpan, (ProcessingElements) unit);
363                        subTask.updateGridletFinishedSoFar(load);
364                        addTotalLoad(load);
365                }
366        }
367
368        private double getMIShare(double timeSpan, ProcessingElements pes) {
369                double localLoad;
370                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
371                if (resCalendar == null)
372                        localLoad = 0;
373                else
374                        // 1 - localLoad_ = available MI share percentage
375                        localLoad = resCalendar.getCurrentLoad();
376
377                int speed = pes.getSpeed();
378                int cnt = pes.getAmount();
379
380                double totalMI = speed * cnt * timeSpan * (1 - localLoad);
381                return totalMI;
382        }
383
384        protected void updateProcessingTimes(Sim_event ev) {
385                updateProcessingProgress();
386                for (ExecTaskInterface task : jobRegistry.getRunningTasks()) {
387                        SubmittedTask subTask = (SubmittedTask)task;
388                        Map<ResourceParameterName, ResourceUnit> choosenResources = subTask.getUsedResources().getLast().getResourceUnits();
389                        double completionPercentage = (task.getLength() - subTask.getRemainingGridletLength())/task.getLength();
390                        double time = forecastFinishTimePlugin.execTimeEstimation(null, choosenResources, task,
391                                        completionPercentage);
392                        /*if(!subTask.getResPath().contains(ev.get_data().toString())) {
393                                continue;*/
394                        if(!subTask.getVisitedResources().contains(ev.get_data().toString())) {
395                                continue;
396                        }// else if( DoubleMath.subtract(subTask.getEstimatedDuration(), (time + lastUpdateTime)) == 0.0 || completionPercentage == 0){
397                        else if( DoubleMath.subtract((subTask.getExecStartTime()+subTask.getEstimatedDuration()), (new DateTime().getMillis()/1000 + time)) == 0.0){
398                                continue;
399                        }
400                        SubTaskFilter filter = new SubTaskFilter(subTask.getGridletID(), GssimTags.TASK_EXECUTION_FINISHED);
401                        logicalResource.sim_cancel(filter, null);
402                        logicalResource.sendInternal(time, GssimTags.TASK_EXECUTION_FINISHED, task);
403
404                }
405        }       
406
407        public boolean pluginSupportsEvent(int eventType) {
408                SchedulingPluginConfiguration config = (SchedulingPluginConfiguration) schedulingPlugin.getConfiguration();
409                if (config == null)
410                        return false;
411
412                Map<SchedulingEventType, Object> servedEvent = config.getServedEvents();
413                if (servedEvent == null)
414                        return false;
415
416                switch (eventType) {
417
418                case GssimTags.TIMER:
419                        return servedEvent.containsKey(SchedulingEventType.TIMER);
420
421                case GssimTags.GRIDLET_SUBMIT:
422                        return servedEvent.containsKey(SchedulingEventType.TASK_ARRIVED);
423
424                case GssimTags.TASK_READY_FOR_EXECUTION:
425                        return servedEvent.containsKey(SchedulingEventType.START_TASK_EXECUTION);
426                case GssimTags.TASK_EXECUTION_FINISHED:
427                        return servedEvent.containsKey(SchedulingEventType.TASK_FINISHED);
428                case GssimTags.GRIDLET_CANCEL:
429                        return servedEvent.containsKey(SchedulingEventType.TASK_CANCELED);
430                case GssimTags.GRIDLET_RESUME:
431                        return servedEvent.containsKey(SchedulingEventType.TASK_ARRIVED);
432
433                case GssimTags.GRIDRESOURCE_FAILURE:
434                        return servedEvent.containsKey(SchedulingEventType.RESOURCE_FAILED);
435
436
437                case GssimTags.TASK_REQUESTED_TIME_EXPIRED:
438                        return servedEvent.containsKey(SchedulingEventType.TASK_REQUESTED_TIME_EXPIRED);
439
440                default:
441                        return false;
442                }
443        }
444
445        public double calculateTotalLoad(int size) {
446                // background load, defined during initialization
447                double load;
448                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
449                if (resCalendar == null)
450                        load = 0;
451                else
452                        load = resCalendar.getCurrentLoad();
453
454                double numberOfPE;
455                try {
456                        numberOfPE = resourceManager.getResourcesOfType(ResourceType.CPU).size();
457                } catch (Exception e) {
458                        numberOfPE = 1;
459                }
460                double tasksPerPE = (double) size / numberOfPE;
461                load += Math.min(1.0 - load, tasksPerPE);
462
463                return load;
464        }
465
466        public Accumulator getTotalLoad() {
467                return accTotalLoad_;
468        }
469
470        protected void addTotalLoad(double load) {
471                accTotalLoad_.add(load);
472        }
473       
474        private HashMap<ResourceParameterName, ResourceUnit> chooseResourcesForExecution(String resourceName,
475                        ExecTaskInterface task) {
476
477                ResourceManagerInterface resourceManager = this.resourceManager;
478                if(resourceName != null){
479                        ComputingResource resource = null;
480                        try {
481                                resource = resourceManager.getResourceByName(resourceName);
482                        } catch (ResourceException e) {
483                                return null;
484                        }
485
486                        resourceManager = new ResourceManager(resource);
487                }
488                HashMap<ResourceParameterName, ResourceUnit> map = new HashMap<ResourceParameterName, ResourceUnit>();
489
490                List<ComputingResource> choosenResources = null;
491                int cpuRequest;
492                try {
493                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
494                } catch (NoSuchFieldException e) {
495                        cpuRequest = 1;
496                }
497
498                if (cpuRequest != 0) {
499                        List<? extends ComputingResource> processingElements = null;
500                        try {
501                                Properties properties = new Properties();
502                                properties.setProperty("type", ResourceType.CPU.toString());
503                                processingElements = resourceManager.filterResources(properties);
504                        } catch (Exception e) {
505                                e.printStackTrace();
506                        }
507
508                        choosenResources = new ArrayList<ComputingResource>();
509
510                        for (int i = 0; i < processingElements.size() && cpuRequest > 0; i++) {
511                                if (processingElements.get(i).getStatus() == ResourceStatus.FREE) {
512                                        choosenResources.add(processingElements.get(i));
513                                        cpuRequest--;
514                                }
515                        }
516                        if (cpuRequest > 0)
517                        {       
518                                return null;
519                        }
520                       
521                        ProcessingElements result = new ProcessingElements(ResourceManagerUtils.getCommonParent(choosenResources).getName());
522                        result.addAll(choosenResources);
523                        map.put(ResourceParameterName.PROCESSINGELEMENTS, result);
524                }
525                return  map;
526        }
527
528       
529}
Note: See TracBrowser for help on using the repository browser.