package example.localplugin; import gridsim.Gridlet; import gridsim.gssim.ResourceHistoryItem; import gridsim.gssim.SubmittedTask; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import schedframe.resources.units.ResourceUnit; import schedframe.scheduling.TaskInterface; import schedframe.scheduling.events.SchedulingEvent; import schedframe.scheduling.events.TaskFinishedEvent; import schedframe.scheduling.events.TaskRequestedTimeExpiredEvent; import schedframe.scheduling.plugin.grid.ModuleList; import schedframe.scheduling.utils.ResourceParameterName; import test.rewolucja.GSSIMJobInterface; import test.rewolucja.energy.profile.PStateType; import test.rewolucja.resources.ProcessingElements; import test.rewolucja.resources.ResourceStatus; import test.rewolucja.resources.manager.implementation.ClusterResourceManager; import test.rewolucja.resources.manager.interfaces.ResourceManagerInterface; import test.rewolucja.resources.physical.base.ComputingResource; import test.rewolucja.resources.physical.implementation.Processor; import test.rewolucja.scheduling.JobRegistryInterface; import test.rewolucja.scheduling.UsedResourceList; import test.rewolucja.scheduling.plan.SchedulingPlanInterfaceNew; import test.rewolucja.scheduling.plan.SchedulingPlanNew; import test.rewolucja.scheduling.queue.Queue; import test.rewolucja.scheduling.queue.QueueList; public class FCFSCPUFreqScalingClusterLocalPlugin extends BaseLocalPlugin { List allocatedCPUs; public FCFSCPUFreqScalingClusterLocalPlugin () { allocatedCPUs = new ArrayList(); } public SchedulingPlanInterfaceNew schedule(SchedulingEvent event, QueueList queues, JobRegistryInterface jobRegistry, ResourceManagerInterface resManager, ModuleList modules) { ClusterResourceManager resourceManager = (ClusterResourceManager) resManager; SchedulingPlanNew plan = new SchedulingPlanNew(); // our tasks are placed only in first queue (see // BaseLocalPlugin.placeJobsInQueues() method) Queue q = queues.get(0); // chose the events types to serve. // Different actions for different events are possible. switch (event.getType()) { case START_TASK_EXECUTION: // check all tasks in queue for (int i = 0; i < q.size(); i++) { GSSIMJobInterface job = q.get(i); TaskInterface task = (TaskInterface) job; // if status of the tasks in READY if (task.getStatus() == Gridlet.READY) { Map choosenResources = chooseResourcesForExecution(resourceManager, task); if (choosenResources != null) { addToSchedulingPlan(plan, task, choosenResources); ProcessingElements pes = (ProcessingElements)choosenResources.get(ResourceParameterName.PROCESSINGELEMENTS); List processors = new ArrayList(); for(ComputingResource res : pes){ processors.add((Processor) res); } adjustFrequency(ResourceStatus.BUSY,processors); } } } break; case TASK_FINISHED: TaskFinishedEvent finEvent = (TaskFinishedEvent) event; SubmittedTask subTask = jobRegistry.getSubmittedTask(finEvent.getJobId(), finEvent.getTaskId()); UsedResourceList usedResourcesList = subTask.getUsedResources(); ProcessingElements pes = (ProcessingElements)usedResourcesList.getLast().getResourceUnits().get(ResourceParameterName.PROCESSINGELEMENTS); List processors = new ArrayList(); for(ComputingResource res : pes){ processors.add((Processor) res); allocatedCPUs.add((Processor) res); } adjustFrequency(ResourceStatus.FREE, processors); break; case TASK_REQUESTED_TIME_EXPIRED: TaskRequestedTimeExpiredEvent timExpEvent = (TaskRequestedTimeExpiredEvent) event; subTask = jobRegistry.getSubmittedTask(timExpEvent.getJobId(), timExpEvent.getTaskId()); usedResourcesList = subTask.getUsedResources(); pes = (ProcessingElements)usedResourcesList.getLast().getResourceUnits().get(ResourceParameterName.PROCESSINGELEMENTS); processors = new ArrayList(); for(ComputingResource res : pes){ allocatedCPUs.remove((Processor) res); } // check all tasks in queue for (int i = 0; i < q.size(); i++) { GSSIMJobInterface job = q.get(i); TaskInterface task = (TaskInterface) job; // if status of the tasks in READY if (task.getStatus() == Gridlet.READY) { Map choosenResources = chooseResourcesForExecution(resourceManager, task); if (choosenResources != null) { addToSchedulingPlan(plan, task, choosenResources); pes = (ProcessingElements)choosenResources.get(ResourceParameterName.PROCESSINGELEMENTS); processors = new ArrayList(); for(ComputingResource res : pes){ processors.add((Processor) res); } adjustFrequency(ResourceStatus.BUSY, processors); } } } break; } return plan; } private HashMap chooseResourcesForExecution( ClusterResourceManager resourceManager, TaskInterface task) { HashMap map = new HashMap(); int cpuRequest; try { cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue(); } catch (NoSuchFieldException e) { cpuRequest = 1; } if (cpuRequest != 0) { List choosenResources = null; List processors = resourceManager.getProcessors(); processors.removeAll(allocatedCPUs); if (processors.size() < cpuRequest) { // log.warn("Task requires more cpus than is availiable in this resource."); return null; } choosenResources = new ArrayList(); for (int i = 0; i < processors.size() && cpuRequest > 0; i++) { if (processors.get(i).getStatus() == ResourceStatus.FREE) { choosenResources.add(processors.get(i)); cpuRequest--; } } if (cpuRequest > 0) { // log.info("Task " + task.getJobId() + "_" + task.getId() + // " requires more cpus than is availiable in this moment."); return null; } ProcessingElements result = new ProcessingElements(); result.addAll(choosenResources); map.put(ResourceParameterName.PROCESSINGELEMENTS, result); } return map; } private void adjustFrequency(ResourceStatus status, List processors){ switch(status){ case BUSY: for(Processor cpu: processors){ if(cpu.getPowerInterface().getSupportedPStates().containsKey(PStateType.P0)) cpu.getPowerInterface().setPState(PStateType.P0); } break; case FREE: for(Processor cpu: processors){ if(cpu.getPowerInterface().getSupportedPStates().containsKey(PStateType.P3)) cpu.getPowerInterface().setPState(PStateType.P3); } break; } } public String getName() { return getClass().getName(); } public void init(Properties properties) { // no extra initialization is expected. } }