package test.rewolucja.scheduling.implementation; import eduni.simjava.Sim_event; import eduni.simjava.Sim_system; import gridsim.Accumulator; import gridsim.GridSimTags; import gridsim.Gridlet; import gridsim.ResourceCalendar; import gridsim.gssim.GssimConstants; import gridsim.gssim.GssimTags; import gridsim.gssim.ResourceHistoryItem; import gridsim.gssim.SubmittedTask; import gridsim.gssim.filter.SubTaskFilter; import grms.shared.constants.BrokerConstants; import gssim.schedframe.scheduling.AbstractExecutable; import gssim.schedframe.scheduling.ExecTaskInterface; import gssim.schedframe.scheduling.Executable; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.joda.time.DateTime; import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus; import schedframe.resources.units.ResourceUnit; import schedframe.scheduling.events.SchedulingEvent; import schedframe.scheduling.events.SchedulingEventReason; import schedframe.scheduling.events.SchedulingEventType; import schedframe.scheduling.events.StartTaskExecutionEvent; import schedframe.scheduling.events.TaskCanceledEvent; import schedframe.scheduling.events.TaskFinishedEvent; import schedframe.scheduling.events.TaskRequestedTimeExpiredEvent; import schedframe.scheduling.plugin.SchedulingPluginConfiguration; import schedframe.scheduling.plugin.estimation.ExecTimeEstimationPlugin; import schedframe.scheduling.plugin.grid.ModuleListImpl; import schedframe.scheduling.plugin.grid.ModuleType; import schedframe.scheduling.plugin.local.LocalSchedulingPlugin; import schedframe.scheduling.utils.ResourceParameterName; import simulator.utils.DoubleMath; import simulator.utils.InstanceFactory; import test.rewolucja.GSSIMJobInterface; import test.rewolucja.energy.EnergyEvent; import test.rewolucja.energy.EnergyEventType; import test.rewolucja.resources.ProcessingElements; import test.rewolucja.resources.ResourceStatus; import test.rewolucja.resources.ResourceType; import test.rewolucja.resources.description.ExecResourceDescription; import test.rewolucja.resources.exception.ResourceException; import test.rewolucja.resources.logical.LogicalResource; import test.rewolucja.resources.manager.factory.ResourceManagerFactory; import test.rewolucja.resources.manager.implementation.ResourceManager; import test.rewolucja.resources.manager.interfaces.ResourceManagerInterface; import test.rewolucja.resources.manager.utils.ResourceManagerUtils; import test.rewolucja.resources.physical.base.ComputingResource; import test.rewolucja.scheduling.UsedResourceList; import test.rewolucja.scheduling.plan.AllocationInterfaceNew; import test.rewolucja.scheduling.plan.ScheduledTaskInterfaceNew; import test.rewolucja.scheduling.plan.SchedulingPlanInterfaceNew; import test.rewolucja.task.JobList; public class LocalManagementSystem extends ManagementSystem { private Log log = LogFactory.getLog(LocalManagementSystem.class); protected double lastUpdateTime; protected Accumulator accTotalLoad_; public LocalManagementSystem(String providerId, String entityName, String schedulingPluginClassName, ExecTimeEstimationPlugin execTimeEstimationPlugin, ExecResourceDescription resourceDescription) throws Exception { super(providerId, entityName, schedulingPluginClassName, execTimeEstimationPlugin, resourceDescription); schedulingPlugin = (LocalSchedulingPlugin) InstanceFactory.createInstance(schedulingPluginClassName, LocalSchedulingPlugin.class); if (schedulingPlugin == null) { throw new Exception("Can not create local scheduling plugin instance."); } accTotalLoad_ = new Accumulator(); moduleList = new ModuleListImpl(1); } public void init(LogicalResource logRes) { logicalResource = logRes; resourceManager = (ResourceManager) ResourceManagerFactory.createResourceManager(logicalResource); double load = 0; accTotalLoad_.add(load); } public void processEvent(Sim_event ev) { updateProcessingProgress(); int tag = ev.get_tag(); Object obj; switch (tag) { case GssimTags.TIMER: if (pluginSupportsEvent(tag)) { SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER); SchedulingPlanInterfaceNew decision = schedulingPlugin.schedule(event, queues, getJobRegistry(), getResourceManager(), moduleList); executeSchedulingPlan(decision); } sendTimerEvent(); break; case GssimTags.TASK_READY_FOR_EXECUTION: Executable data = (Executable) ev.get_data(); try { data.setGridletStatus(Gridlet.READY); if (pluginSupportsEvent(tag)) { SchedulingEvent event = new StartTaskExecutionEvent(data.getJobId(), data.getId()); SchedulingPlanInterfaceNew decision = schedulingPlugin.schedule(event, queues, getJobRegistry(), getResourceManager(), moduleList); executeSchedulingPlan(decision); } } catch (Exception e) { e.printStackTrace(); } break; case GssimTags.TASK_EXECUTION_FINISHED: obj = ev.get_data(); SubmittedTask task = (SubmittedTask) obj; if (task.getStatus() == Gridlet.INEXEC) { task.setGridletStatus(Gridlet.SUCCESS); task.finalizeGridlet(); log.debug(task.getJobId() + "_" + task.getId() + " finished execution on " + new DateTime()); log.info(GssimConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size())); UsedResourceList lastUsedList = task.getUsedResources(); Map lastUsed = lastUsedList.getLast() .getResourceUnits(); getAllocationManager().freeResources(lastUsed); ProcessingElements pes = (ProcessingElements) lastUsed.get(ResourceParameterName.PROCESSINGELEMENTS); for (ComputingResource resource : pes) { resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, task)); } super.sendFinishJob((AbstractExecutable) task.getGridlet()); } if (pluginSupportsEvent(tag)) { SchedulingEvent event = new TaskFinishedEvent(task.getJobId(), task.getId()); SchedulingPlanInterfaceNew decision = schedulingPlugin.schedule(event, queues, getJobRegistry(), getResourceManager(), moduleList); executeSchedulingPlan(decision); } break; case GssimTags.TASK_REQUESTED_TIME_EXPIRED: obj = ev.get_data(); task = (SubmittedTask) obj; if (pluginSupportsEvent(tag)) { SchedulingEvent event = new TaskRequestedTimeExpiredEvent(task.getJobId(), task.getId()); SchedulingPlanInterfaceNew decision = schedulingPlugin.schedule(event, queues, getJobRegistry(), getResourceManager(), moduleList); executeSchedulingPlan(decision); } break; case GssimTags.UPDATE: updateProcessingTimes(ev); break; } } public void notifySubmittedJob(GSSIMJobInterface job, boolean ack) { if (job instanceof AbstractExecutable) { AbstractExecutable executable = (AbstractExecutable) job; // int cost = // this.resourceManager.getResourceCharacteristic().getResUnits() != // null ? // this.resourceManager.getResourceCharacteristic().getResUnits().get(ResourceParameterName.COST).getAmount() // : 1; executable.setResourceParameter(logicalResource.get_id(), 1); updateProcessingProgress(); JobList newTasks = new JobList(); SubmittedTask submittedTask = jobRegistry.getSubmittedTask(executable.getJobId(), executable.getId()); if(submittedTask == null) { submittedTask = new SubmittedTask((Executable) executable); jobRegistry.addTask(submittedTask); } //submittedTask.addToResPath(logicalRes.get_name()); submittedTask.visitResource(logicalResource.get_name()); LogicalResource logicalRes = logicalResource.getParent(); /*while (logicalRes != null && !submittedTask.getResPath().contains(logicalRes.get_name())) { submittedTask.addToResPath(logicalRes.get_name()); logicalRes = logicalRes.getParent(); }*/ while (logicalRes != null && !submittedTask.getVisitedResources().contains(logicalRes.get_name())) { submittedTask.visitResource(logicalRes.get_name()); logicalRes = logicalRes.getParent(); } newTasks.add(submittedTask); schedulingPlugin.placeJobsInQueues(newTasks, queues, getResourceManager(), moduleList); if (job.getStatus() == Gridlet.QUEUED) { sendJobReadyEvent(job); } } } public void notifyReturnedJob(GSSIMJobInterface job) { if (pluginSupportsEvent(GssimTags.TASK_EXECUTION_FINISHED)) { SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED); SchedulingPlanInterfaceNew decision = schedulingPlugin.schedule(event, queues, getJobRegistry(), getResourceManager(), moduleList); executeSchedulingPlan(decision); } if(logicalResource.getParent() != null){ sendFinishJob((AbstractExecutable)job); } } public void notifyCanceledJob(GSSIMJobInterface job) { if (!pluginSupportsEvent(GridSimTags.GRIDLET_CANCEL)) return; Executable executable = (Executable) job; String jobID = executable.getJobId(); SchedulingPlanInterfaceNew decision = null; try { executable.setStatus((int) BrokerConstants.JOB_STATUS_CANCELED); TaskCanceledEvent event = new TaskCanceledEvent(executable.getJobId(), executable.getTaskId()); event.setReason(SchedulingEventReason.RESERVATION_EXCEEDED); decision = schedulingPlugin .schedule(event, queues, getJobRegistry(), getResourceManager(), moduleList); if (decision == null) return; executeSchedulingPlan(decision); } catch (Exception e) { log.error("Exception during scheduling. " + e.getMessage()); e.printStackTrace(); } } protected void executeSchedulingPlan(SchedulingPlanInterfaceNew decision) { ArrayList taskSchedulingDecisions = decision.getTasks(); for (int i = 0; i < taskSchedulingDecisions.size(); i++) { try { ScheduledTaskInterfaceNew taskDecision = taskSchedulingDecisions.get(i); // not scheduled again are returned to the user. if (taskDecision.getStatus() == AllocationStatus.REJECTED) { continue; } ArrayList allocations = taskDecision.getAllocations(); GSSIMJobInterface task = taskDecision.getTask(); for (int j = 0; j < allocations.size(); j++) { AllocationInterfaceNew allocation = allocations.get(j); if (allocation.isProcessing()) { executeTask(task, allocation.getRequestedResources()); //} else if(GridSim.getEntityId(allocation.getProviderName()) != -1 || logicalResource.getLogicalResource(allocation.getProviderName())!=null){ } else if(resourceManager.getResourceProvider(allocation.getProviderName()) != null){ allocation.setProviderName(resourceManager.getResourceProvider(allocation.getProviderName())); submitJob(task, allocation); } else { executeTask(task, chooseResourcesForExecution(allocation.getProviderName(), (ExecTaskInterface)task)); } } } catch (Exception e) { e.printStackTrace(); } } } protected void executeTask(GSSIMJobInterface job, Map choosenResources) { ExecTaskInterface task = (ExecTaskInterface) job; SubmittedTask submittedTask = (SubmittedTask) task; boolean allocationStatus = getAllocationManager().allocateResources(choosenResources); if(allocationStatus == false) return; removeFromQueue(task); double completionPercentage = (submittedTask.getLength() - submittedTask.getRemainingGridletLength())/submittedTask.getLength(); SchedulingEvent event = new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION); int time = Double.valueOf( forecastFinishTimePlugin.execTimeEstimation(event, choosenResources, task, completionPercentage)).intValue(); log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime() + " will finish after " + time); if (time < 0.0) return; submittedTask.setEstimatedDuration(time); DateTime currentTime = new DateTime(); ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime); submittedTask.addUsedResources(resHistItem); submittedTask.setFinishTime(currentTime.getMillis() / 1000); jobRegistry.saveHistory(submittedTask, time, choosenResources); logicalResource.sendInternal(time, GssimTags.TASK_EXECUTION_FINISHED, submittedTask); try { long expectedDuration = submittedTask.getExpectedDuration().getMillis() / 1000; logicalResource.sendInternal(expectedDuration, GssimTags.TASK_REQUESTED_TIME_EXPIRED, submittedTask); } catch (NoSuchFieldException e) { double t = submittedTask.getEstimatedDuration(); logicalResource.sendInternal(t, GssimTags.TASK_REQUESTED_TIME_EXPIRED, submittedTask); } submittedTask.setGridletStatus(Gridlet.INEXEC); log.info(GssimConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size())); ProcessingElements pes = (ProcessingElements) choosenResources.get(ResourceParameterName.PROCESSINGELEMENTS); for (ComputingResource resource : pes) { resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, submittedTask)); } /*for(ExecTaskInterface etask : jobRegistry.getRunningTasks()){ System.out.println(etask.getJobId()); for(String taskId: etask.getVisitedResources()) System.out.println("====="+taskId); }*/ } protected void updateProcessingProgress() { double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime); if (timeSpan <= 0.0) { // don't update when nothing changed return; } lastUpdateTime = Sim_system.clock(); Iterator iter = jobRegistry.getRunningTasks().iterator(); while (iter.hasNext()) { ExecTaskInterface task = iter.next(); SubmittedTask subTask = (SubmittedTask)task; UsedResourceList usedResourcesList = subTask.getUsedResources(); ResourceUnit unit = usedResourcesList.getLast().getResourceUnits() .get(ResourceParameterName.PROCESSINGELEMENTS); double load = getMIShare(timeSpan, (ProcessingElements) unit); subTask.updateGridletFinishedSoFar(load); addTotalLoad(load); } } private double getMIShare(double timeSpan, ProcessingElements pes) { double localLoad; ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR); if (resCalendar == null) localLoad = 0; else // 1 - localLoad_ = available MI share percentage localLoad = resCalendar.getCurrentLoad(); int speed = pes.getSpeed(); int cnt = pes.getAmount(); double totalMI = speed * cnt * timeSpan * (1 - localLoad); return totalMI; } protected void updateProcessingTimes(Sim_event ev) { updateProcessingProgress(); for (ExecTaskInterface task : jobRegistry.getRunningTasks()) { SubmittedTask subTask = (SubmittedTask)task; Map choosenResources = subTask.getUsedResources().getLast().getResourceUnits(); double completionPercentage = (task.getLength() - subTask.getRemainingGridletLength())/task.getLength(); double time = forecastFinishTimePlugin.execTimeEstimation(null, choosenResources, task, completionPercentage); /*if(!subTask.getResPath().contains(ev.get_data().toString())) { continue;*/ if(!subTask.getVisitedResources().contains(ev.get_data().toString())) { continue; }// else if( DoubleMath.subtract(subTask.getEstimatedDuration(), (time + lastUpdateTime)) == 0.0 || completionPercentage == 0){ else if( DoubleMath.subtract((subTask.getExecStartTime()+subTask.getEstimatedDuration()), (new DateTime().getMillis()/1000 + time)) == 0.0){ continue; } SubTaskFilter filter = new SubTaskFilter(subTask.getGridletID(), GssimTags.TASK_EXECUTION_FINISHED); logicalResource.sim_cancel(filter, null); logicalResource.sendInternal(time, GssimTags.TASK_EXECUTION_FINISHED, task); } } public boolean pluginSupportsEvent(int eventType) { SchedulingPluginConfiguration config = (SchedulingPluginConfiguration) schedulingPlugin.getConfiguration(); if (config == null) return false; Map servedEvent = config.getServedEvents(); if (servedEvent == null) return false; switch (eventType) { case GssimTags.TIMER: return servedEvent.containsKey(SchedulingEventType.TIMER); case GssimTags.GRIDLET_SUBMIT: return servedEvent.containsKey(SchedulingEventType.TASK_ARRIVED); case GssimTags.TASK_READY_FOR_EXECUTION: return servedEvent.containsKey(SchedulingEventType.START_TASK_EXECUTION); case GssimTags.TASK_EXECUTION_FINISHED: return servedEvent.containsKey(SchedulingEventType.TASK_FINISHED); case GssimTags.GRIDLET_CANCEL: return servedEvent.containsKey(SchedulingEventType.TASK_CANCELED); case GssimTags.GRIDLET_RESUME: return servedEvent.containsKey(SchedulingEventType.TASK_ARRIVED); case GssimTags.GRIDRESOURCE_FAILURE: return servedEvent.containsKey(SchedulingEventType.RESOURCE_FAILED); case GssimTags.TASK_REQUESTED_TIME_EXPIRED: return servedEvent.containsKey(SchedulingEventType.TASK_REQUESTED_TIME_EXPIRED); default: return false; } } public double calculateTotalLoad(int size) { // background load, defined during initialization double load; ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR); if (resCalendar == null) load = 0; else load = resCalendar.getCurrentLoad(); double numberOfPE; try { numberOfPE = resourceManager.getResourcesOfType(ResourceType.CPU).size(); } catch (Exception e) { numberOfPE = 1; } double tasksPerPE = (double) size / numberOfPE; load += Math.min(1.0 - load, tasksPerPE); return load; } public Accumulator getTotalLoad() { return accTotalLoad_; } protected void addTotalLoad(double load) { accTotalLoad_.add(load); } private HashMap chooseResourcesForExecution(String resourceName, ExecTaskInterface task) { ResourceManagerInterface resourceManager = this.resourceManager; if(resourceName != null){ ComputingResource resource = null; try { resource = resourceManager.getResourceByName(resourceName); } catch (ResourceException e) { return null; } resourceManager = new ResourceManager(resource); } HashMap map = new HashMap(); List choosenResources = null; int cpuRequest; try { cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue(); } catch (NoSuchFieldException e) { cpuRequest = 1; } if (cpuRequest != 0) { List processingElements = null; try { Properties properties = new Properties(); properties.setProperty("type", ResourceType.CPU.toString()); processingElements = resourceManager.filterResources(properties); } catch (Exception e) { e.printStackTrace(); } choosenResources = new ArrayList(); for (int i = 0; i < processingElements.size() && cpuRequest > 0; i++) { if (processingElements.get(i).getStatus() == ResourceStatus.FREE) { choosenResources.add(processingElements.get(i)); cpuRequest--; } } if (cpuRequest > 0) { return null; } ProcessingElements result = new ProcessingElements(ResourceManagerUtils.getCommonParent(choosenResources).getName()); result.addAll(choosenResources); map.put(ResourceParameterName.PROCESSINGELEMENTS, result); } return map; } }