package schedframe.scheduling.policy; import java.util.HashMap; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.joda.time.DateTimeUtilsExt; import schedframe.PluginConfiguration; import schedframe.events.scheduling.SchedulingEventType; import schedframe.resources.units.StandardResourceUnitName; import schedframe.scheduling.Scheduler; import schedframe.scheduling.WorkloadUnitHandler; import schedframe.scheduling.manager.resources.ManagedResources; import schedframe.scheduling.manager.resources.ResourceManager; import schedframe.scheduling.manager.resources.ResourceManagerFactory; import schedframe.scheduling.manager.tasks.JobRegistryImpl; import schedframe.scheduling.manager.tasks.JobRegistry; import schedframe.scheduling.plan.AllocationInterface; import schedframe.scheduling.plan.SchedulingPlanInterface; import schedframe.scheduling.plugin.SchedulingPlugin; import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin; import schedframe.scheduling.plugin.grid.ModuleList; import schedframe.scheduling.plugin.local.ResourceAllocationInterface; import schedframe.scheduling.queue.TaskQueue; import schedframe.scheduling.queue.TaskQueueList; import schedframe.scheduling.tasks.Job; import schedframe.scheduling.tasks.WorkloadUnit; import simulator.WormsConstants; import eduni.simjava.Sim_event; import gridsim.GridSim; import gridsim.GridSimTags; import gridsim.Gridlet; import gridsim.IO_data; import gridsim.gssim.WormsTags; import gssim.schedframe.scheduling.ExecTask; import gssim.schedframe.scheduling.Executable; import gssim.schedframe.scheduling.queues.AbstractStatsSupportingQueue; public abstract class AbstractManagementSystem { private Log log = LogFactory.getLog(AbstractManagementSystem.class); protected String name; protected ResourceManager resourceManager; protected TaskQueueList queues; protected SchedulingPlugin schedulingPlugin; protected ExecutionTimeEstimationPlugin execTimeEstimationPlugin; protected ModuleList moduleList; protected JobRegistryImpl jobRegistry; public AbstractManagementSystem(String providerId, String entityName, ExecutionTimeEstimationPlugin execTimeEstPlugin, TaskQueueList queues) { this.name = entityName + "@" + providerId; this.queues = queues; this.jobRegistry = new JobRegistryImpl(name); this.execTimeEstimationPlugin = execTimeEstPlugin; } public void processEvent(Sim_event ev) { processOtherEvent(ev); } protected void processOtherEvent(Sim_event ev) { if (ev == null) { System.out.println(name + ".processOtherEvent(): " + "Error - an event is null."); return; } log.error(name + ".processOtherEvent(): Unable to " + "handle request from an event with a tag number " + ev.get_tag()); } public String getName() { return name; } public PluginConfiguration getSchedulingPluginConfiguration() { return schedulingPlugin.getConfiguration(); } public ResourceManager getResourceManager() { if (resourceManager instanceof ResourceManager) return (ResourceManager) resourceManager; else return null; } public ResourceAllocationInterface getAllocationManager() { if (resourceManager instanceof ResourceAllocationInterface) return (ResourceAllocationInterface) resourceManager; else return null; } protected JobRegistry getJobRegistry(){ return jobRegistry; } public boolean pluginSupportsEvent(int eventType){ return true; } public abstract void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack); public abstract void notifyCanceledWorkloadUnit(WorkloadUnit wu); public abstract void notifyReturnedWorkloadUnit(WorkloadUnit wu); protected abstract void executeSchedulingPlan(SchedulingPlanInterface decision); //POPRAWIC (ale co? bo teraz chyba jest ok) protected void submitWorkloadUnit(WorkloadUnit wu, AllocationInterface allocation) { String providerName = allocation.getProviderName(); if (providerName == null) { return; } //Executable exec = (Executable) wu; removeFromQueue(wu); scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, wu); } protected boolean sendCanceledWorkloadUnit(int tag, Executable task, int executableId, int destId) { if (tag != GridSimTags.GRIDLET_CANCEL) { return false; } long taskSize = 0; if (task != null) { taskSize = task.getGridletOutputSize(); } // if no Gridlet found, then create a new Gridlet but set its status // to FAILED. Then, most importantly, set the resource parameters // because the user will search/filter based on a resource ID. else if (task == null) { try { taskSize = 100; task = jobRegistry.getTaskExecutable(executableId); task.setGridletStatus(Gridlet.FAILED); int cost = resourceManager.getSharedResourceUnits().get(StandardResourceUnitName.COST) != null ? resourceManager .getSharedResourceUnits().get(StandardResourceUnitName.COST).get(0).getAmount() : 1; task.setResourceParameter(scheduler.get_id(), cost); } catch (Exception e) { // empty ... } } scheduler.send(scheduler.getOutputPort(), GridSimTags.SCHEDULE_NOW, tag, new IO_data(task, taskSize, destId)); return true; } protected boolean sendFinishedWorkloadUnit(WorkloadUnit wu) { Executable exec = (Executable) wu; if(scheduler.getParent() == null) { Job job = jobRegistry.get(exec.getJobId()); if(job.isFinished()){ scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job); return true; }else return true; } IO_data obj = new IO_data(exec, 0, /*task.getGridletOutputSize(),*/ GridSim.getEntityId(scheduler.getParent().get_name())); scheduler.send(scheduler.getOutputPort(), 0.0, GridSimTags.GRIDLET_RETURN, obj); return true; } protected void sendExecutableReadyEvent(ExecTask exec) { /*if (wu instanceof JobInterface) { scheduler.sendInternal(Long.valueOf(0).doubleValue(), GssimTags.TASK_READY_FOR_EXECUTION, wu); return; }*/ long delay = 0; try { long expectedStartTime = exec.getExecutionStartTime().getMillis() / 1000; long currentTime = DateTimeUtilsExt.currentTimeMillis() / 1000; delay = expectedStartTime - currentTime; if (delay < 0) delay = 0; } catch (NoSuchFieldException e) { delay = 0; } scheduler.sendInternal(Long.valueOf(delay).doubleValue(), WormsTags.TASK_READY_FOR_EXECUTION, exec); } protected void sendTimerEvent() { PluginConfiguration pluginConfig = schedulingPlugin.getConfiguration(); if (pluginConfig != null) { Map events = pluginConfig.getServedEvents(); if (events != null) { Object obj = events.get(SchedulingEventType.TIMER); if (obj != null) { int delay = (Integer) obj; scheduler.sendInternal(delay, WormsTags.TIMER, null); } } } } protected boolean removeFromQueue(WorkloadUnit wu) { for(TaskQueue queue : queues){ if(queue.contains(wu)){ queue.remove(wu); return true; } } return false; } public TaskQueueList getAccessQueues(){ return queues; } public Map getQueuesSize() { Map queue_size = new HashMap(); for (TaskQueue queue : queues) { queue_size.put(queue.getName(), queue.size()); } return queue_size; } public void init(Scheduler sched, ManagedResources managedResources) { scheduler = sched; resourceManager = ResourceManagerFactory.createResourceManager(scheduler, managedResources); scheduler.set_stat(WormsConstants.getResourcesStatisticsObject(queues.size())); for(int i = 0; i < queues.size(); i++){ TaskQueue q = queues.get(i); if(q instanceof AbstractStatsSupportingQueue){ AbstractStatsSupportingQueue queue = (AbstractStatsSupportingQueue) q; queue.setStats(scheduler.get_stat(), WormsConstants.TASKS_QUEUE_LENGTH_MEASURE_NAME + "_" + Integer.toString(i)); } } } protected Scheduler scheduler; public Scheduler getScheduler() { return scheduler; } public abstract WorkloadUnitHandler getWorkloadUnitHandler(); }