Changeset 539 for DCWoRMS/trunk/build/classes/schedframe/scheduling/policy
- Timestamp:
- 10/31/12 13:52:06 (12 years ago)
- Location:
- DCWoRMS/trunk/build/classes/schedframe/scheduling/policy
- Files:
-
- 10 edited
Legend:
- Unmodified
- Added
- Removed
-
DCWoRMS/trunk/build/classes/schedframe/scheduling/policy/AbstractManagementSystem.java
r477 r539 8 8 import org.joda.time.DateTimeUtilsExt; 9 9 10 import dcworms.schedframe.scheduling.ExecTask; 11 import dcworms.schedframe.scheduling.Executable; 12 import dcworms.schedframe.scheduling.queues.AbstractStatsSupportingQueue; 13 10 14 import schedframe.PluginConfiguration; 11 15 import schedframe.events.scheduling.SchedulingEventType; 12 import schedframe.resources.units.StandardResourceUnitName;13 16 import schedframe.scheduling.Scheduler; 14 17 import schedframe.scheduling.WorkloadUnitHandler; … … 16 19 import schedframe.scheduling.manager.resources.ResourceManager; 17 20 import schedframe.scheduling.manager.resources.ResourceManagerFactory; 21 import schedframe.scheduling.manager.tasks.JobRegistry; 18 22 import schedframe.scheduling.manager.tasks.JobRegistryImpl; 19 import schedframe.scheduling.manager.tasks.JobRegistry;20 23 import schedframe.scheduling.plan.AllocationInterface; 21 24 import schedframe.scheduling.plan.SchedulingPlanInterface; … … 27 30 import schedframe.scheduling.queue.TaskQueueList; 28 31 import schedframe.scheduling.tasks.Job; 32 import schedframe.scheduling.tasks.TaskInterface; 29 33 import schedframe.scheduling.tasks.WorkloadUnit; 30 import simulator. WormsConstants;34 import simulator.DCWormsConstants; 31 35 import eduni.simjava.Sim_event; 32 36 import gridsim.GridSim; 33 37 import gridsim.GridSimTags; 34 import gridsim.Gridlet;35 38 import gridsim.IO_data; 36 import gridsim.gssim.WormsTags; 37 import gssim.schedframe.scheduling.ExecTask; 38 import gssim.schedframe.scheduling.Executable; 39 import gssim.schedframe.scheduling.queues.AbstractStatsSupportingQueue; 39 import gridsim.dcworms.DCWormsTags; 40 40 41 41 public abstract class AbstractManagementSystem { … … 45 45 protected String name; 46 46 47 protected TaskQueueList queues; 47 48 protected ResourceManager resourceManager; 48 49 protected TaskQueueList queues;50 49 protected JobRegistryImpl jobRegistry; 50 protected ModuleList moduleList; 51 51 52 protected SchedulingPlugin schedulingPlugin; 52 53 53 protected ExecutionTimeEstimationPlugin execTimeEstimationPlugin; 54 54 55 protected ModuleList moduleList; 56 57 protected JobRegistryImpl jobRegistry; 55 protected Scheduler scheduler; 58 56 59 57 … … 67 65 } 68 66 67 public void init(Scheduler sched, ManagedResources managedResources) { 68 scheduler = sched; 69 resourceManager = ResourceManagerFactory.createResourceManager(scheduler, managedResources); 70 scheduler.set_stat(DCWormsConstants.getResourcesStatisticsObject(queues.size())); 71 for(int i = 0; i < queues.size(); i++){ 72 TaskQueue q = queues.get(i); 73 if(q instanceof AbstractStatsSupportingQueue<?>){ 74 AbstractStatsSupportingQueue<?> queue = (AbstractStatsSupportingQueue<?>) q; 75 queue.setStats(scheduler.get_stat(), DCWormsConstants.TASKS_QUEUE_LENGTH_MEASURE_NAME + "_" + Integer.toString(i)); 76 } 77 } 78 } 79 69 80 public void processEvent(Sim_event ev) { 70 81 processOtherEvent(ev); … … 81 92 } 82 93 94 83 95 public String getName() { 84 96 return name; 85 }86 87 public PluginConfiguration getSchedulingPluginConfiguration() {88 return schedulingPlugin.getConfiguration();89 97 } 90 98 … … 106 114 return jobRegistry; 107 115 } 116 117 public Scheduler getScheduler() { 118 return scheduler; 119 } 120 121 public PluginConfiguration getSchedulingPluginConfiguration() { 122 return schedulingPlugin.getConfiguration(); 123 } 108 124 109 125 public boolean pluginSupportsEvent(int eventType){ … … 111 127 } 112 128 113 public abstract void notifySubmittedWorkloadUnit(WorkloadUnit<?> wu, boolean ack); 114 115 public abstract void notifyCanceledWorkloadUnit(WorkloadUnit<?> wu); 116 117 public abstract void notifyReturnedWorkloadUnit(WorkloadUnit<?> wu); 118 119 protected abstract void executeSchedulingPlan(SchedulingPlanInterface decision); 120 121 122 129 public TaskQueueList getQueues(){ 130 return queues; 131 } 132 133 public Map<String, Integer> getQueuesSize() { 134 Map<String, Integer> queue_size = new HashMap<String, Integer>(); 135 for (TaskQueue queue : queues) { 136 queue_size.put(queue.getName(), queue.size()); 137 } 138 return queue_size; 139 } 140 123 141 //POPRAWIC (ale co? bo teraz chyba jest ok) 124 protected void submit WorkloadUnit(WorkloadUnit<?> wu, AllocationInterfaceallocation) {142 protected void submitTask(TaskInterface<?> task, AllocationInterface<?> allocation) { 125 143 String providerName = allocation.getProviderName(); 126 144 if (providerName == null) { 127 145 return; 128 146 } 129 //Executable exec = (Executable) wu; 130 removeFromQueue(wu); 131 scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, wu); 132 } 133 134 protected boolean sendCanceledWorkloadUnit(int tag, Executable task, int executableId, int destId) { 135 136 if (tag != GridSimTags.GRIDLET_CANCEL) { 137 return false; 138 } 139 140 long taskSize = 0; 141 if (task != null) { 142 taskSize = task.getGridletOutputSize(); 143 } 144 145 // if no Gridlet found, then create a new Gridlet but set its status 146 // to FAILED. Then, most importantly, set the resource parameters 147 // because the user will search/filter based on a resource ID. 148 else if (task == null) { 149 try { 150 taskSize = 100; 151 task = jobRegistry.getTaskExecutable(executableId); 152 task.setGridletStatus(Gridlet.FAILED); 153 int cost = resourceManager.getSharedResourceUnits().get(StandardResourceUnitName.COST) != null ? resourceManager 154 .getSharedResourceUnits().get(StandardResourceUnitName.COST).get(0).getAmount() 155 : 1; 156 task.setResourceParameter(scheduler.get_id(), cost); 157 } catch (Exception e) { 158 // empty ... 159 } 160 } 161 scheduler.send(scheduler.getOutputPort(), GridSimTags.SCHEDULE_NOW, tag, new IO_data(task, taskSize, destId)); 162 163 return true; 164 } 165 166 protected boolean sendFinishedWorkloadUnit(WorkloadUnit<?> wu) { 147 removeFromQueue(task); 148 scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, task); 149 } 150 151 protected boolean sendFinishedWorkloadUnit(WorkloadUnit wu) { 167 152 168 153 Executable exec = (Executable) wu; 169 154 if(scheduler.getParent() == null) 170 155 { 171 Job job = jobRegistry.get (exec.getJobId());156 Job job = jobRegistry.getJob(exec.getJobId()); 172 157 173 158 if(job.isFinished()){ 174 159 scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job); 175 160 return true; 176 }else return true; 161 } 162 else return true; 177 163 } 178 164 … … 183 169 184 170 protected void sendExecutableReadyEvent(ExecTask exec) { 185 186 /*if (wu instanceof JobInterface) {187 scheduler.sendInternal(Long.valueOf(0).doubleValue(), GssimTags.TASK_READY_FOR_EXECUTION,188 wu);189 return;190 }*/191 171 192 172 long delay = 0; … … 201 181 } 202 182 203 scheduler.sendInternal(Long.valueOf(delay).doubleValue(), WormsTags.TASK_READY_FOR_EXECUTION,183 scheduler.sendInternal(Long.valueOf(delay).doubleValue(), DCWormsTags.TASK_READY_FOR_EXECUTION, 204 184 exec); 205 185 } … … 213 193 if (obj != null) { 214 194 int delay = (Integer) obj; 215 scheduler.sendInternal(delay, WormsTags.TIMER, null);195 scheduler.sendInternal(delay, DCWormsTags.TIMER, null); 216 196 } 217 197 } … … 219 199 } 220 200 221 protected boolean removeFromQueue( WorkloadUnit<?> wu) {201 protected boolean removeFromQueue(TaskInterface<?> task) { 222 202 for(TaskQueue queue : queues){ 223 if(queue.contains( wu)){224 queue.remove( wu);203 if(queue.contains(task)){ 204 queue.remove(task); 225 205 return true; 226 206 } … … 228 208 return false; 229 209 } 230 231 public TaskQueueList getAccessQueues(){232 return queues;233 }234 235 public Map<String, Integer> getQueuesSize() {236 Map<String, Integer> queue_size = new HashMap<String, Integer>();237 for (TaskQueue queue : queues) {238 queue_size.put(queue.getName(), queue.size());239 }240 return queue_size;241 }242 243 public void init(Scheduler sched, ManagedResources managedResources) {244 scheduler = sched;245 resourceManager = ResourceManagerFactory.createResourceManager(scheduler, managedResources);246 scheduler.set_stat(WormsConstants.getResourcesStatisticsObject(queues.size()));247 for(int i = 0; i < queues.size(); i++){248 TaskQueue q = queues.get(i);249 if(q instanceof AbstractStatsSupportingQueue<?>){250 AbstractStatsSupportingQueue<?> queue = (AbstractStatsSupportingQueue<?>) q;251 queue.setStats(scheduler.get_stat(), WormsConstants.TASKS_QUEUE_LENGTH_MEASURE_NAME + "_" + Integer.toString(i));252 }253 }254 }255 256 protected Scheduler scheduler;257 258 public Scheduler getScheduler() {259 return scheduler;260 }261 210 262 211 public abstract WorkloadUnitHandler getWorkloadUnitHandler(); 212 213 public abstract void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack); 214 215 public abstract void notifyReturnedWorkloadUnit(WorkloadUnit wu); 216 217 protected abstract void executeSchedulingPlan(SchedulingPlanInterface<?> decision); 263 218 264 219 } -
DCWoRMS/trunk/build/classes/schedframe/scheduling/policy/global/GlobalManagementSystem.java
r477 r539 11 11 import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus; 12 12 13 import dcworms.schedframe.scheduling.ExecTask; 14 import dcworms.schedframe.scheduling.Executable; 15 13 16 import qcg.shared.constants.BrokerConstants; 14 15 import schedframe.events.scheduling.EventReason;16 import schedframe.events.scheduling.SchedulingEvent;17 17 import schedframe.events.scheduling.TaskArrivedEvent; 18 import schedframe.events.scheduling.TaskCanceledEvent;19 18 import schedframe.events.scheduling.TimerEvent; 20 import schedframe.scheduling. Scheduler;19 import schedframe.scheduling.TaskListImpl; 21 20 import schedframe.scheduling.WorkloadUnitHandler; 22 import schedframe.scheduling.WorkloadUnitListImpl;23 21 import schedframe.scheduling.plan.AllocationInterface; 24 22 import schedframe.scheduling.plan.ScheduledTaskInterface; … … 31 29 import schedframe.scheduling.tasks.Job; 32 30 import schedframe.scheduling.tasks.JobInterface; 33 import schedframe.scheduling.tasks.SubmittedTask;34 31 import schedframe.scheduling.tasks.Task; 35 32 import schedframe.scheduling.tasks.TaskInterface; 36 33 import schedframe.scheduling.tasks.WorkloadUnit; 37 38 34 import eduni.simjava.Sim_event; 39 35 import gridsim.GridSim; 40 36 import gridsim.GridSimTags; 41 import gridsim.Gridlet;42 37 import gridsim.IO_data; 43 import gridsim.gssim.WormsTags; 44 import gssim.schedframe.scheduling.ExecTask; 45 import gssim.schedframe.scheduling.Executable; 38 import gridsim.dcworms.DCWormsTags; 46 39 47 40 public class GlobalManagementSystem extends AbstractManagementSystem { … … 54 47 super(providerId, entityName, execTimeEstimationPlugin, queues); 55 48 56 /*schedulingPlugin = (GlobalSchedulingPlugin) InstanceFactory.createInstance(57 schedulingPluginClassName,58 GlobalSchedulingPlugin.class);*/59 49 if(schedPlugin == null){ 60 50 throw new Exception("Can not create global scheduling plugin instance"); … … 69 59 switch (tag) { 70 60 71 case WormsTags.TIMER:72 if (pluginSupportsEvent( WormsTags.TIMER)) {61 case DCWormsTags.TIMER: 62 if (pluginSupportsEvent(DCWormsTags.TIMER)) { 73 63 TimerEvent event = new TimerEvent(); 74 SchedulingPlanInterface decision = schedulingPlugin.schedule(event,64 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 75 65 queues, getJobRegistry(), getResourceManager(), moduleList); 76 66 executeSchedulingPlan(decision); … … 81 71 } 82 72 83 public void notifySubmittedWorkloadUnit(WorkloadUnit <?>wu, boolean ack) {73 public void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack) { 84 74 if (!pluginSupportsEvent(GridSimTags.GRIDLET_SUBMIT)) { 85 75 log.error("Plugin " + schedulingPlugin.getClass() … … 90 80 91 81 registerWorkloadUnit(wu); 92 /*Job job = (Job) wu; 93 jobRegistry.addJob(job); 94 95 if (log.isInfoEnabled()) 96 log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis())); 97 82 83 } 84 85 private void registerWorkloadUnit(WorkloadUnit wu){ 86 if(!wu.isRegistered()){ 87 wu.register(jobRegistry); 88 } 89 wu.accept(getWorkloadUnitHandler()); 90 } 91 92 93 protected void schedule(JobInterface<?> job){ 98 94 List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>(); 99 95 jobsList.add(job); 100 WorkloadUnitList readyWorkloadUnits = new WorkloadUnitList(); 101 readyWorkloadUnits.addAll(jobRegistry.getReadyTasks(jobsList)); 102 schedulingPlugin.placeJobsInQueues(readyWorkloadUnits, queues, getResourceManager(), moduleList); 103 104 schedule(new TaskArrivedEvent());*/ 105 106 } 107 108 private void registerWorkloadUnit(WorkloadUnit<?> wu){ 109 if(!wu.isRegistered()){ 110 wu.register(jobRegistry); 111 } 112 wu.accept(getWorkloadUnitHandler()); 113 } 114 115 116 protected void scheduleReadyTasks(Job job){ 117 List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>(); 118 jobsList.add(job); 119 WorkloadUnitListImpl readyWorkloadUnits = new WorkloadUnitListImpl(); 120 readyWorkloadUnits.addAll(jobRegistry.getReadyTasks(jobsList)); 121 schedulingPlugin.placeJobsInQueues(readyWorkloadUnits, queues, getResourceManager(), moduleList); 122 123 schedule(new TaskArrivedEvent()); 124 } 125 126 protected void schedule(SchedulingEvent schedulingEvent) { 127 128 try { 129 SchedulingPlanInterface decision = schedulingPlugin.schedule( 130 schedulingEvent, queues, getJobRegistry(), getResourceManager(), moduleList); 131 if (decision == null) 132 return; 133 96 TaskListImpl readyTasks = new TaskListImpl(); 97 readyTasks.addAll(jobRegistry.getAvailableTasks(jobsList)); 98 99 schedulingPlugin.placeTasksInQueues(readyTasks, queues, getResourceManager(), moduleList); 100 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule( 101 new TaskArrivedEvent(), queues, getJobRegistry(), getResourceManager(), moduleList); 102 if (decision != null) 134 103 executeSchedulingPlan(decision); 135 136 } catch (Exception e) { 137 e.printStackTrace(); 138 } 139 } 140 141 public void notifyReturnedWorkloadUnit(WorkloadUnit<?> wu) { 104 } 105 106 public void notifyReturnedWorkloadUnit(WorkloadUnit wu) { 142 107 Executable exec = (Executable) wu; 143 108 … … 149 114 150 115 try { 151 Job job = jobRegistry.get (exec.getJobId());152 /*Task task = job.getTask(exec.getTaskId());116 Job job = jobRegistry.getJob(exec.getJobId()); 117 Task task = job.getTask(exec.getTaskId()); 153 118 if(exec.getProcessesId() == null){ 154 119 try { 155 120 task.setStatus(exec.getStatus()); 156 121 } catch (Exception e) { 157 // TODO Auto-generated catch block 158 e.printStackTrace(); 122 159 123 } 160 124 } else { … … 167 131 } 168 132 } 169 } */133 } 170 134 171 135 if(job.isFinished()){ … … 173 137 } 174 138 else { 175 scheduleReadyTasks(job); 176 /*List<JobInterface<?>> jobs = new ArrayList<JobInterface<?>>(); 177 jobs.add(jobRegistry.getJobInfo(job.getId())); 178 WorkloadUnitList readyWorkloadUnits = new WorkloadUnitList(); 179 readyWorkloadUnits.addAll(jobRegistry.getReadyTasks(jobs)); 180 schedulingPlugin.placeJobsInQueues(readyWorkloadUnits, queues, 181 getResourceManager(), moduleList); 182 schedule(new TaskArrivedEvent());*/ 139 schedule(job); 183 140 } 184 141 … … 189 146 } 190 147 191 public void notifyCanceledWorkloadUnit(WorkloadUnit<?> wu){; 192 193 Executable task = (Executable) wu; 194 String jobID = task.getJobId(); 195 String taskID = task.getId(); 196 197 if(log.isDebugEnabled()) 198 log.debug("Received canceled job" + jobID + "_" + taskID); 199 200 TaskInterface<?> ti = jobRegistry.getTaskInfo(jobID, taskID) ; 201 try { 202 203 ti.setStatus((int)BrokerConstants.JOB_STATUS_CANCELED); 204 205 TaskCanceledEvent event = new TaskCanceledEvent(jobID, taskID); 206 event.setReason(EventReason.RESERVATION_EXCEEDED); 207 schedule(event); 208 209 } catch (Exception e) { 210 log.error("Exception during scheduling. " + e.getMessage()); 211 e.printStackTrace(); 212 } 213 } 214 215 protected void executeSchedulingPlan(SchedulingPlanInterface decision) { 216 217 ArrayList<ScheduledTaskInterface> taskSchedulingDecisions = decision.getTasks(); 148 protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) { 149 150 ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks(); 218 151 for (int i = 0; i < taskSchedulingDecisions.size(); i++) { 219 152 220 try { 221 ScheduledTaskInterface taskDecision = taskSchedulingDecisions.get(i); 222 223 //log.info(decision.getDocument()); 224 225 String jobID = taskDecision.getJobId(); 226 String taskID = taskDecision.getTaskId(); 227 228 // Task allocations that were rejected because of lack of resources or which were canceled and 229 // not scheduled again are returned to the user. 230 if(taskDecision.getStatus() == AllocationStatus.REJECTED){ 231 Job job = jobRegistry.get(jobID); 232 scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job); 233 continue; 234 } 235 236 ArrayList<AllocationInterface> allocations = taskDecision.getAllocations(); 237 238 Task task = (Task) jobRegistry.getTaskInfo(jobID, taskID); 239 for (int j = 0; j < allocations.size(); j++) { 240 241 AllocationInterface allocation = allocations.get(j); 242 Executable exec = jobRegistry.createExecutable(task, allocation); 243 submitWorkloadUnit(exec, allocation); 244 task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED); 245 } 246 247 }catch (Exception e){ 248 e.printStackTrace(); 249 } 250 } 251 } 252 253 protected void submitWorkloadUnit(WorkloadUnit<?> job, AllocationInterface allocation) { 153 ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i); 154 155 //log.info(decision.getDocument()); 156 157 String jobID = taskDecision.getJobId(); 158 String taskID = taskDecision.getTaskId(); 159 160 // Task allocations that were rejected because of lack of resources or which were canceled and 161 // not scheduled again are returned to the user. 162 if(taskDecision.getStatus() == AllocationStatus.REJECTED){ 163 Job job = jobRegistry.getJob(jobID); 164 scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job); 165 continue; 166 } 167 168 Task task = (Task) jobRegistry.getTaskInfo(jobID, taskID); 169 170 ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations(); 171 for (int j = 0; j < allocations.size(); j++) { 172 173 AllocationInterface<?> allocation = allocations.get(j); 174 Executable exec = createExecutable(task, allocation); 175 submitTask(exec, allocation); 176 task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED); 177 } 178 } 179 } 180 181 private Executable createExecutable(Task task, AllocationInterface<?> allocation) { 182 183 String refersTo = allocation.getProcessGroupId(); // null;//allocation.getRefersTo(); 184 if(refersTo == null) 185 refersTo = task.getId(); 186 187 Executable exec = null; 188 189 if(refersTo.equals(task.getId())){ 190 exec = new Executable(task); 191 } else { 192 List<AbstractProcesses> processes = task.getProcesses(); 193 if(processes == null) { 194 try { 195 log.error("Allocation: " + allocation.getDocument() + "\nrefers to unknown task or processes set." + 196 " Set correct value (task id or prcesses set id) for allocation refersTo attribute."); 197 } catch (Exception e) { 198 e.printStackTrace(); 199 } 200 } 201 boolean found = false; 202 for(int j = 0; j < processes.size() && !found; j++){ 203 AbstractProcesses procesesSet = processes.get(j); 204 if(refersTo.equals(procesesSet.getId())){ 205 exec = new Executable(task, procesesSet); 206 found = true; 207 } 208 } 209 if(!found){ 210 log.error("Allocation refers to unknown proceses set."); 211 } 212 } 213 214 exec.setReservationId(allocation.getReservationId()); 215 216 /*HostInterface<?> host = allocation.getHost(); 217 ComputingResourceTypeInterface<?> crt = host.getMachineParameters(); 218 if(crt != null){ 219 ComputingResourceTypeItemInterface<?> crti = crt.getComputingResourceTypeItem(0); 220 if(crti != null){ 221 ParameterPropertyInterface<?> properties[] = crti.getHostParameter().getProperty(); 222 for(int p = 0; p < properties.length; p++){ 223 ParameterPropertyInterface<?> property = properties[p]; 224 if("chosenCPUs".equals(property.getName())){ 225 Object cpuNames = property.getValue(); 226 exec.addSpecificResource(ResourceParameterName.FREECPUS, cpuNames); 227 } 228 } 229 } 230 }*/ 231 return exec; 232 } 233 234 protected void submitTask(TaskInterface<?> task, AllocationInterface<?> allocation) { 254 235 255 236 String providerName = allocation.getProviderName(); … … 257 238 return; 258 239 } 259 TaskInterface<?> task = (TaskInterface<?>) job;260 240 removeFromQueue(task); 261 241 262 242 int resID = GridSim.getEntityId(providerName); 263 IO_data data = new IO_data( job, 0, resID);243 IO_data data = new IO_data(task, 0, resID); 264 244 scheduler.send(scheduler.getOutputPort(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, data); 265 266 //scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, job);245 //scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, job); 246 267 247 if(log.isDebugEnabled()) 268 try { 269 log.debug("Submitted job " + job.getId() + " to " + providerName); 270 } catch (NoSuchFieldException e) { 271 // TODO Auto-generated catch block 272 e.printStackTrace(); 273 } 248 log.debug("Submitted job " + task.getId() + " to " + providerName); 249 274 250 } 275 251 276 252 class GlobalWorkloadUnitHandler implements WorkloadUnitHandler{ 277 253 278 public void handleJob(Job job){ 279 280 jobRegistry.addJob(job); 254 public void handleJob(JobInterface<?> job){ 281 255 if (log.isInfoEnabled()) 282 256 log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis())); 283 257 284 scheduleReadyTasks(job); 258 jobRegistry.addJob(job); 259 schedule(job); 285 260 } 286 261 … … 292 267 throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler."); 293 268 } 294 295 public void handleSubmittedTask(SubmittedTask task) {296 throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler.");297 }298 269 } 299 270 … … 301 272 return new GlobalWorkloadUnitHandler(); 302 273 } 303 304 274 305 275 -
DCWoRMS/trunk/build/classes/schedframe/scheduling/policy/global/GridBroker.java
r477 r539 19 19 public class GridBroker extends GlobalManagementSystem { 20 20 21 22 21 private static Log log = LogFactory.getLog(GridBroker.class); 23 22 … … 27 26 public GridBroker(String name, SchedulingPlugin schedulingPlugin, ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues) throws Exception { 28 27 super(name, "BROKER", schedulingPlugin, execTimeEstimationPlugin, queues); 29 30 //make use of plug-in interface 31 32 //Properties prop = new Properties(); 33 //prop.put("plugin.name", name); 34 //prop.put("plugin.utils.timeoperations", "gssim.scheduling.plugin.local.GssimTimeOperations"); 35 //schedulingPlugin.init(prop); 28 36 29 otherGridSchedulersIds = new HashSet<Integer>(); 37 38 30 moduleList = new ModuleListImpl(2); 39 //this.moduleList.add(new GridResourceDiscovery(this.getScheduler()));40 //moduleList.add(new GridReservationManagerNew(this));41 42 if(log.isDebugEnabled())43 log.debug(name + ": Creating a broker interface object");44 31 } 45 32 46 33 public void init(Scheduler scheduler, ManagedResources managedResources) { 47 34 super.init(scheduler, managedResources); 48 //this.scheduler = scheduler;49 //this.resourceManager = ResourceManagerFactory.createResourceManager(scheduler, managedResources);50 35 this.moduleList.add((GridResourceDiscovery)resourceManager); 51 36 } … … 57 42 } 58 43 return providerIds; 59 //return GridSim.getGridResourceList();60 44 } 61 45 -
DCWoRMS/trunk/build/classes/schedframe/scheduling/policy/local/LocalManagementSystem.java
r477 r539 1 1 package schedframe.scheduling.policy.local; 2 2 3 import dcworms.schedframe.scheduling.ExecTask; 4 import dcworms.schedframe.scheduling.Executable; 3 5 import eduni.simjava.Sim_event; 4 6 import eduni.simjava.Sim_system; 5 7 import gridsim.Accumulator; 6 8 import gridsim.GridSimTags; 7 import gridsim.Gridlet;8 9 import gridsim.ResourceCalendar; 9 import gridsim.gssim.WormsTags; 10 import gridsim.gssim.filter.SubTaskFilter; 11 import gssim.schedframe.scheduling.ExecTask; 12 import gssim.schedframe.scheduling.Executable; 10 import gridsim.dcworms.DCWormsTags; 11 import gridsim.dcworms.filter.ExecTaskFilter; 13 12 14 13 import java.util.ArrayList; … … 27 26 import qcg.shared.constants.BrokerConstants; 28 27 import schedframe.ResourceController; 29 import schedframe.events.scheduling.EventReason;30 28 import schedframe.events.scheduling.SchedulingEvent; 31 29 import schedframe.events.scheduling.SchedulingEventType; 32 30 import schedframe.events.scheduling.StartTaskExecutionEvent; 33 import schedframe.events.scheduling.TaskCanceledEvent;34 31 import schedframe.events.scheduling.TaskFinishedEvent; 35 32 import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent; … … 45 42 import schedframe.scheduling.ResourceHistoryItem; 46 43 import schedframe.scheduling.Scheduler; 47 import schedframe.scheduling.UsedResourceList; 44 import schedframe.scheduling.TaskList; 45 import schedframe.scheduling.TaskListImpl; 46 import schedframe.scheduling.UsedResourcesList; 48 47 import schedframe.scheduling.WorkloadUnitHandler; 49 import schedframe.scheduling.WorkloadUnitListImpl;50 48 import schedframe.scheduling.manager.resources.LocalResourceManager; 51 49 import schedframe.scheduling.manager.resources.ManagedResources; … … 63 61 import schedframe.scheduling.tasks.Job; 64 62 import schedframe.scheduling.tasks.JobInterface; 65 import schedframe.scheduling.tasks.SubmittedTask;66 63 import schedframe.scheduling.tasks.Task; 67 64 import schedframe.scheduling.tasks.TaskInterface; 68 65 import schedframe.scheduling.tasks.WorkloadUnit; 69 import simulator. WormsConstants;66 import simulator.DCWormsConstants; 70 67 import simulator.utils.DoubleMath; 71 68 … … 83 80 84 81 super(providerId, entityName, execTimeEstimationPlugin, queues); 85 86 //schedulingPlugin = (LocalSchedulingPlugin) InstanceFactory.createInstance(schedulingPluginClassName, LocalSchedulingPlugin.class);87 82 88 83 if (schedPlugin == null) { … … 90 85 } 91 86 this.schedulingPlugin = schedPlugin; 92 accTotalLoad = new Accumulator();93 moduleList = new ModuleListImpl(1);94 87 this.moduleList = new ModuleListImpl(1); 88 89 this.accTotalLoad = new Accumulator(); 95 90 } 96 91 97 92 public void init(Scheduler sched, ManagedResources managedResources) { 98 93 super.init(sched, managedResources); 99 //scheduler = sched;100 //resourceManager = ResourceManagerFactory.createResourceManager(scheduler);101 94 double load = 0; 102 95 accTotalLoad.add(load); … … 112 105 switch (tag) { 113 106 114 case WormsTags.TIMER:107 case DCWormsTags.TIMER: 115 108 if (pluginSupportsEvent(tag)) { 116 109 SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER); 117 SchedulingPlanInterface decision = schedulingPlugin.schedule(event,110 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 118 111 queues, getJobRegistry(), getResourceManager(), moduleList); 119 112 executeSchedulingPlan(decision); 120 113 } 121 114 sendTimerEvent(); 122 123 115 break; 124 116 125 case WormsTags.TASK_READY_FOR_EXECUTION: 126 127 ExecTask data = (ExecTask) ev.get_data(); 128 try { 129 data.setStatus(Gridlet.READY); 117 case DCWormsTags.TASK_READY_FOR_EXECUTION: 118 ExecTask execTask = (ExecTask) ev.get_data(); 119 try { 120 execTask.setStatus(DCWormsTags.READY); 130 121 if (pluginSupportsEvent(tag)) { 131 SchedulingEvent event = new StartTaskExecutionEvent( data.getJobId(), data.getId());132 SchedulingPlanInterface decision = schedulingPlugin.schedule(event,122 SchedulingEvent event = new StartTaskExecutionEvent(execTask.getJobId(), execTask.getId()); 123 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 133 124 queues, getJobRegistry(), getResourceManager(), moduleList); 134 125 executeSchedulingPlan(decision); … … 139 130 break; 140 131 141 case WormsTags.TASK_EXECUTION_FINISHED:132 case DCWormsTags.TASK_EXECUTION_FINISHED: 142 133 obj = ev.get_data(); 143 ExecTask task = (ExecTask) obj; 144 if (task.getStatus() == Gridlet.INEXEC) { 145 finalizeExecutable(task); 146 SubmittedTask subTask = (SubmittedTask)task; 147 sendFinishedWorkloadUnit((ExecTask)subTask.getGridlet()); 148 //task.setGridletStatus(Gridlet.SUCCESS); 149 //task.finalizeGridlet(); 150 log.debug(task.getJobId() + "_" + task.getId() + " finished execution on " + new DateTime()); 151 log.info(WormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size())); 152 /*UsedResourceList<ResourceHistoryItem> lastUsedList = task.getUsedResources(); 153 Map<ResourceUnitName, AbstractResourceUnit> lastUsed = lastUsedList.getLast() 154 .getResourceUnits(); 155 getAllocationManager().freeResources(lastUsed); 156 ProcessingElements pes = (ProcessingElements) lastUsed.get(StandardResourceUnitName.PROCESSINGELEMENTS); 157 for (ComputingResource resource : pes) { 158 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, task)); 134 execTask = (ExecTask) obj; 135 if (execTask.getStatus() == DCWormsTags.INEXEC) { 136 137 finalizeExecutable(execTask); 138 sendFinishedWorkloadUnit(execTask); 139 log.debug(execTask.getJobId() + "_" + execTask.getId() + " finished execution on " + new DateTime()); 140 log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size())); 141 if (pluginSupportsEvent(tag)) { 142 SchedulingEvent event = new TaskFinishedEvent(execTask.getJobId(), execTask.getId()); 143 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 144 queues, getJobRegistry(), getResourceManager(), moduleList); 145 executeSchedulingPlan(decision); 159 146 } 160 SubTaskFilter filter = new SubTaskFilter(task.getGridletID(), GssimTags.TASK_REQUESTED_TIME_EXPIRED); 161 scheduler.sim_cancel(filter, null); 162 super.sendFinishJob((Executable) task.getGridlet());*/ 163 } 147 } 148 149 Job job = jobRegistry.getJob(execTask.getJobId()); 150 if(!job.isFinished()){ 151 getWorkloadUnitHandler().handleJob(job); 152 } 153 break; 154 155 case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED: 156 obj = ev.get_data(); 157 execTask = (Executable) obj; 164 158 if (pluginSupportsEvent(tag)) { 165 SchedulingEvent event = new Task FinishedEvent(task.getJobId(), task.getId());166 SchedulingPlanInterface decision = schedulingPlugin.schedule(event,159 SchedulingEvent event = new TaskRequestedTimeExpiredEvent(execTask.getJobId(), execTask.getId()); 160 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 167 161 queues, getJobRegistry(), getResourceManager(), moduleList); 168 162 executeSchedulingPlan(decision); 169 163 } 170 Job job = jobRegistry.get(task.getJobId());171 if(!job.isFinished()){172 getWorkloadUnitHandler().handleJob(job);173 }174 175 164 break; 176 case WormsTags.TASK_REQUESTED_TIME_EXPIRED: 177 obj = ev.get_data(); 178 task = (SubmittedTask) obj; 179 if (pluginSupportsEvent(tag)) { 180 SchedulingEvent event = new TaskRequestedTimeExpiredEvent(task.getJobId(), task.getId()); 181 SchedulingPlanInterface decision = schedulingPlugin.schedule(event, 182 queues, getJobRegistry(), getResourceManager(), moduleList); 183 executeSchedulingPlan(decision); 184 } 185 186 break; 187 case WormsTags.UPDATE: 165 166 case DCWormsTags.UPDATE: 188 167 updateProcessingTimes(ev); 189 168 break; 190 169 } 191 170 } 192 193 194 public void notifyReturnedWorkloadUnit(WorkloadUnit<?> wu) { 195 if (pluginSupportsEvent(WormsTags.TASK_EXECUTION_FINISHED)) { 171 172 public void notifyReturnedWorkloadUnit(WorkloadUnit wu) { 173 if (pluginSupportsEvent(DCWormsTags.TASK_EXECUTION_FINISHED)) { 196 174 SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED); 197 SchedulingPlanInterface decision = schedulingPlugin.schedule(event,175 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 198 176 queues, getJobRegistry(), getResourceManager(), moduleList); 199 177 executeSchedulingPlan(decision); … … 203 181 //} 204 182 } 205 206 public void notifyCanceledWorkloadUnit(WorkloadUnit<?> job) { 207 208 if (!pluginSupportsEvent(GridSimTags.GRIDLET_CANCEL)) 209 return; 210 211 Executable executable = (Executable) job; 212 String jobID = executable.getJobId(); 213 214 SchedulingPlanInterface decision = null; 215 216 try { 217 executable.setStatus((int) BrokerConstants.JOB_STATUS_CANCELED); 218 219 TaskCanceledEvent event = new TaskCanceledEvent(executable.getJobId(), executable.getTaskId()); 220 event.setReason(EventReason.RESERVATION_EXCEEDED); 221 decision = schedulingPlugin 222 .schedule(event, queues, getJobRegistry(), getResourceManager(), moduleList); 223 224 if (decision == null) 225 return; 226 227 executeSchedulingPlan(decision); 228 229 } catch (Exception e) { 230 log.error("Exception during scheduling. " + e.getMessage()); 231 e.printStackTrace(); 232 } 233 } 234 235 protected void executeSchedulingPlan(SchedulingPlanInterface decision) { 236 237 ArrayList<ScheduledTaskInterface> taskSchedulingDecisions = decision.getTasks(); 183 184 protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) { 185 186 ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks(); 238 187 for (int i = 0; i < taskSchedulingDecisions.size(); i++) { 239 try { 240 ScheduledTaskInterface taskDecision = taskSchedulingDecisions.get(i); 241 242 // not scheduled again are returned to the user. 243 if (taskDecision.getStatus() == AllocationStatus.REJECTED) { 244 continue; 188 ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i); 189 190 if (taskDecision.getStatus() == AllocationStatus.REJECTED) { 191 continue; 192 } 193 194 ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations(); 195 196 TaskInterface<?> task = taskDecision.getTask(); 197 for (int j = 0; j < allocations.size(); j++) { 198 199 AllocationInterface<?> allocation = allocations.get(j); 200 if (allocation.isProcessing()) { 201 ExecTask exec = (ExecTask) task; 202 executeTask(exec, allocation.getRequestedResources()); 203 } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){ 204 allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName())); 205 submitTask(task, allocation); 206 } else { 207 ExecTask exec = (ExecTask) task; 208 executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), exec)); 245 209 } 246 247 ArrayList<AllocationInterface> allocations = taskDecision.getAllocations(); 248 249 WorkloadUnit<?> task = taskDecision.getTask(); 250 for (int j = 0; j < allocations.size(); j++) { 251 252 AllocationInterface allocation = allocations.get(j); 253 if (allocation.isProcessing()) { 254 255 ExecTask exec = (ExecTask) task; 256 257 258 //Executable e = (Executable)task; 259 /*SubmittedTask submittedTask = jobRegistry.getSubmittedTask(e.getJobId(), e.getId()); 260 if(submittedTask == null) 261 { submittedTask = new SubmittedTask(e); 262 jobRegistry.addTask(submittedTask); 263 }*/ 264 265 /*e.visitResource(scheduler.get_name()); 266 Scheduler parentScheduler = scheduler.getParent(); 267 while (parentScheduler != null && !e.getVisitedResources().contains(parentScheduler.get_name())) { 268 e.visitResource(parentScheduler.get_name()); 269 parentScheduler = parentScheduler.getParent(); 270 }*/ 271 272 273 executeTask(exec, allocation.getRequestedResources()); 274 //} else if(GridSim.getEntityId(allocation.getProviderName()) != -1 || scheduler.getScheduler(allocation.getProviderName())!=null){ 275 } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){ 276 allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName())); 277 submitWorkloadUnit(task, allocation); 278 } else { 279 280 ExecTask exec = (ExecTask) task; 281 282 //Executable exec = jobRegistry.createExecutable(t, allocation); 283 //exec.setResourceParameter(scheduler.get_id(), 1); 284 /*e.visitResource(scheduler.get_name()); 285 Scheduler parentScheduler = scheduler.getParent(); 286 while (parentScheduler != null && !e.getVisitedResources().contains(parentScheduler.get_name())) { 287 e.visitResource(parentScheduler.get_name()); 288 parentScheduler = parentScheduler.getParent(); 289 }*/ 290 executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), (ExecTask)task)); 291 } 292 } 293 294 } catch (Exception e) { 295 e.printStackTrace(); 296 } 210 } 211 297 212 } 298 213 } 299 214 300 215 protected void executeTask(ExecTask task, Map<ResourceUnitName, ResourceUnit> choosenResources) { 301 // Executable exec = (Executable) task; 302 303 SubmittedTask submittedTask = (SubmittedTask)task; 216 217 Executable exec = (Executable)task; 304 218 boolean allocationStatus = getAllocationManager().allocateResources(choosenResources); 305 219 if(allocationStatus == false) 306 220 return; 307 221 removeFromQueue(task); 308 //SubmittedTask submittedTask = (SubmittedTask)task; 309 /* submittedTask = jobRegistry.getSubmittedTask(exec.getJobId(), exec.getId()); 310 if(submittedTask == null) 311 { submittedTask = new SubmittedTask(exec); 312 jobRegistry.addTask(submittedTask); 313 }*/ 314 double completionPercentage = (submittedTask.getLength() - submittedTask.getRemainingGridletLength())/submittedTask.getLength(); 222 315 223 SchedulingEvent event = new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION); 316 224 int time = Double.valueOf( 317 execTimeEstimationPlugin.execTimeEstimation(event, choosenResources, task, completionPercentage)).intValue();225 execTimeEstimationPlugin.execTimeEstimation(event, task, choosenResources, exec.getCompletionPercentage())).intValue(); 318 226 log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime() 319 227 + " will finish after " + time); … … 322 230 return; 323 231 324 submittedTask.setEstimatedDuration(time);232 exec.setEstimatedDuration(time); 325 233 DateTime currentTime = new DateTime(); 326 234 ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime); 327 submittedTask.addUsedResources(resHistItem); 328 submittedTask.setFinishTime(currentTime.getMillis() / 1000); 329 330 jobRegistry.saveHistory(submittedTask, time, choosenResources); 331 332 scheduler.sendInternal(time, WormsTags.TASK_EXECUTION_FINISHED, 333 submittedTask); 334 235 exec.addUsedResources(resHistItem); 335 236 try { 336 long expectedDuration = submittedTask.getExpectedDuration().getMillis() / 1000; 337 scheduler.sendInternal(expectedDuration, WormsTags.TASK_REQUESTED_TIME_EXPIRED, submittedTask); 237 exec.setStatus(DCWormsTags.INEXEC); 238 } catch (Exception e) { 239 // TODO Auto-generated catch block 240 e.printStackTrace(); 241 } 242 scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, exec); 243 244 try { 245 long expectedDuration = exec.getExpectedDuration().getMillis() / 1000; 246 scheduler.sendInternal(expectedDuration, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec); 338 247 } catch (NoSuchFieldException e) { 339 double t = submittedTask.getEstimatedDuration(); 340 scheduler.sendInternal(t, WormsTags.TASK_REQUESTED_TIME_EXPIRED, submittedTask); 341 } 342 343 submittedTask.setGridletStatus(Gridlet.INEXEC); 344 log.info(WormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size())); 248 double t = exec.getEstimatedDuration(); 249 scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec); 250 } 251 252 log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size())); 345 253 346 254 PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE); 347 if(peUnit instanceof ProcessingElements){ 255 256 notifyComputingResources(peUnit, EnergyEventType.TASK_STARTED, exec); 257 258 /*if(peUnit instanceof ProcessingElements){ 348 259 ProcessingElements pes = (ProcessingElements) peUnit; 349 260 for (ComputingResource resource : pes) { 350 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_ FINISHED, submittedTask));261 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, exec)); 351 262 } 352 263 } else { … … 355 266 resource = ResourceController.getComputingResourceByName(peUnit.getResourceId()); 356 267 } catch (ResourceException e) { 357 358 } 359 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, submittedTask)); 360 } 361 /*ProcessingElements pes = (ProcessingElements) choosenResources.get(StandardResourceUnitName.PE); 362 for (ComputingResource resource : pes) { 363 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, submittedTask)); 364 }*/ 268 return; 269 } 270 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, exec)); 271 } 272 */ 365 273 366 274 /*for(ExecTaskInterface etask : jobRegistry.getRunningTasks()){ … … 371 279 } 372 280 373 public void finalizeExecutable(ExecTask exec){ 374 375 SubmittedTask subTask = (SubmittedTask)exec; 376 subTask.setGridletStatus(Gridlet.SUCCESS); 377 subTask.finalizeGridlet(); 378 UsedResourceList<ResourceHistoryItem> lastUsedList = subTask.getUsedResources(); 379 Map<ResourceUnitName, ResourceUnit> lastUsed = lastUsedList.getLast() 380 .getResourceUnits(); 381 getAllocationManager().freeResources(lastUsed); 382 383 PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE); 384 if(peUnit instanceof ProcessingElements){ 385 ProcessingElements pes = (ProcessingElements) peUnit; 386 for (ComputingResource resource : pes) { 387 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, subTask)); 388 } 389 } else { 390 ComputingResource resource = null; 391 try { 392 resource = ResourceController.getComputingResourceByName(peUnit.getResourceId()); 393 } catch (ResourceException e) { 394 395 } 396 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, subTask)); 397 } 398 /*ProcessingElements pes = (ProcessingElements) lastUsed.get(StandardResourceUnitName.PE); 399 for (ComputingResource resource : pes) { 400 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, subTask)); 401 }*/ 402 SubTaskFilter filter = new SubTaskFilter(subTask.getGridletID(), WormsTags.TASK_REQUESTED_TIME_EXPIRED); 281 public void finalizeExecutable(ExecTask execTask){ 282 283 Executable exec = (Executable)execTask; 284 exec.finalizeExecutable(); 285 286 ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED); 403 287 scheduler.sim_cancel(filter, null); 404 288 405 Executable executable = (Executable) subTask.getGridlet(); 406 Job job = jobRegistry.get(executable.getJobId()); 407 408 Task task = null; 289 Task task; 290 Job job = jobRegistry.getJob(exec.getJobId()); 409 291 try { 410 task = job.getTask(exec utable.getTaskId());292 task = job.getTask(exec.getTaskId()); 411 293 } catch (NoSuchFieldException e) { 412 e.printStackTrace();413 } 414 if(exec utable.getProcessesId() == null){415 try { 416 task.setStatus(exec utable.getStatus());294 return; 295 } 296 if(exec.getProcessesId() == null){ 297 try { 298 task.setStatus(exec.getStatus()); 417 299 } catch (Exception e) { 418 300 e.printStackTrace(); … … 422 304 for(int i = 0; i < processesList.size(); i++){ 423 305 AbstractProcesses processes = processesList.get(i); 424 if(processes.getId().equals(exec utable.getProcessesId())){425 processes.setStatus(exec utable.getStatus());306 if(processes.getId().equals(exec.getProcessesId())){ 307 processes.setStatus(exec.getStatus()); 426 308 break; 427 309 } 428 310 } 429 311 } 312 313 UsedResourcesList lastUsedList = exec.getUsedResources(); 314 Map<ResourceUnitName, ResourceUnit> lastUsed = lastUsedList.getLast() 315 .getResourceUnits(); 316 getAllocationManager().freeResources(lastUsed); 317 318 PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE); 319 notifyComputingResources(peUnit, EnergyEventType.TASK_FINISHED, exec); 320 /*if(peUnit instanceof ProcessingElements){ 321 ProcessingElements pes = (ProcessingElements) peUnit; 322 for (ComputingResource resource : pes) { 323 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 324 } 325 } else { 326 ComputingResource resource = null; 327 try { 328 resource = ResourceController.getComputingResourceByName(peUnit.getResourceId()); 329 } catch (ResourceException e) { 330 return; 331 } 332 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 333 }*/ 334 430 335 //sendFinishedWorkloadUnit(executable); 431 336 } … … 441 346 while (iter.hasNext()) { 442 347 ExecTask task = iter.next(); 443 SubmittedTask subTask = (SubmittedTask)task; 444 UsedResourceList<ResourceHistoryItem> usedResourcesList = subTask.getUsedResources(); 445 ResourceUnit unit = usedResourcesList.getLast().getResourceUnits() 348 Executable exec = (Executable)task; 349 exec.setCompletionPercentage(exec.getCompletionPercentage() + 100 * timeSpan/exec.getEstimatedDuration()); 350 351 UsedResourcesList usedResourcesList = exec.getUsedResources(); 352 PEUnit peUnit = (PEUnit)usedResourcesList.getLast().getResourceUnits() 446 353 .get(StandardResourceUnitName.PE); 447 448 double load = getMIShare(timeSpan, (PEUnit) unit); 449 subTask.updateGridletFinishedSoFar(load); 354 double load = getMIShare(timeSpan, peUnit); 450 355 addTotalLoad(load); 451 356 } 452 357 } 453 358 private void notifyComputingResources(PEUnit peUnit, EnergyEventType eventType, Object obj){ 359 360 if(peUnit instanceof ProcessingElements){ 361 ProcessingElements pes = (ProcessingElements) peUnit; 362 for (ComputingResource resource : pes) { 363 resource.handleEvent(new EnergyEvent(eventType, obj)); 364 } 365 } else { 366 ComputingResource resource = null; 367 try { 368 resource = ResourceController.getComputingResourceByName(peUnit.getResourceId()); 369 } catch (ResourceException e) { 370 return; 371 } 372 resource.handleEvent(new EnergyEvent(eventType, obj)); 373 } 374 } 375 454 376 private double getMIShare(double timeSpan, PEUnit pes) { 455 377 double localLoad; … … 470 392 protected void updateProcessingTimes(Sim_event ev) { 471 393 updateProcessingProgress(); 472 for (ExecTask task : jobRegistry.getRunningTasks()) {473 SubmittedTask subTask = (SubmittedTask)task;474 List<String> visitedResource = subTask.getVisitedResources();394 for (ExecTask execTask : jobRegistry.getRunningTasks()) { 395 Executable exec = (Executable)execTask; 396 List<String> visitedResource = exec.getVisitedResources(); 475 397 String originResource = ev.get_data().toString(); 476 398 if(!ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), originResource)){ … … 478 400 } 479 401 480 Map<ResourceUnitName, ResourceUnit> choosenResources = subTask.getUsedResources().getLast().getResourceUnits();481 double completionPercentage = (task.getLength() - subTask.getRemainingGridletLength())/task.getLength();482 double time = execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED),483 choosenResources, task, completionPercentage); 484 485 /*if(!subTask.getVisitedResources().contains(ev.get_data().toString())){402 Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getUsedResources().getLast().getResourceUnits(); 403 int time = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED), 404 execTask, choosenResources, exec.getCompletionPercentage())).intValue(); 405 406 //check if the new estimated end time is equal to the previous one; if yes the continue without update 407 if( DoubleMath.subtract((exec.getExecStartTime() + exec.getEstimatedDuration()), (new DateTime().getMillis()/1000 + time)) == 0.0){ 486 408 continue; 487 }*/ 488 //check if the new estimated end time is equal to the previous one; if yes the continue without update 489 if( DoubleMath.subtract((subTask.getExecStartTime() + subTask.getEstimatedDuration()), (new DateTime().getMillis()/1000 + time)) == 0.0){ 490 continue; 491 } 492 SubTaskFilter filter = new SubTaskFilter(subTask.getGridletID(), WormsTags.TASK_EXECUTION_FINISHED); 409 } 410 exec.setEstimatedDuration(time); 411 ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED); 493 412 scheduler.sim_cancel(filter, null); 494 scheduler.sendInternal(time, WormsTags.TASK_EXECUTION_FINISHED, task); 495 413 scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, execTask); 496 414 } 497 415 } … … 511 429 numberOfPE = numberOfPE + resUnit.getAmount(); 512 430 } 513 //numberOfPE = getResourceManager().getPE().size();514 431 } catch (Exception e) { 515 432 numberOfPE = 1; … … 532 449 ExecTask task) { 533 450 534 ResourceManager resourceManager = this.resourceManager; 451 Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>(); 452 LocalResourceManager resourceManager = getResourceManager(); 535 453 if(resourceName != null){ 536 454 ComputingResource resource = null; … … 540 458 return null; 541 459 } 542 543 460 resourceManager = new LocalResourceManager(resource); 544 461 } 545 Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>();546 547 462 548 463 int cpuRequest; … … 553 468 } 554 469 555 //PEUnit processingUnits = null;556 470 if (cpuRequest != 0) { 557 471 558 472 List<ResourceUnit> availableUnits = null; 559 473 try { 560 availableUnits = getResourceManager().getPE();474 availableUnits = resourceManager.getPE(); 561 475 } catch (ResourceException e) { 562 476 return null; 563 477 } 478 564 479 List<ResourceUnit> choosenPEUnits = new ArrayList<ResourceUnit>(); 565 566 480 for (int i = 0; i < availableUnits.size() && cpuRequest > 0; i++) { 567 481 PEUnit peUnit = (PEUnit) availableUnits .get(i); … … 576 490 return null; 577 491 } 578 579 /*try {580 List<? extends ComputingResource> processingElements = resourceManager.getResourcesOfType(StandardResourceType.Processor);581 List<ComputingResource> choosenResources = new ArrayList<ComputingResource>();582 int peSize = processingElements.size();583 for (int i = 0; i < peSize && cpuRequest > 0; i++) {584 if (processingElements.get(i).getStatus() == ResourceStatus.FREE) {585 choosenResources.add(processingElements.get(i));586 cpuRequest--;587 }588 }589 if (cpuRequest > 0)590 {591 return null;592 }593 processingUnits = new ProcessingElements(choosenResources);594 } catch (Exception e) {595 596 List<ResourceUnit> procResUnit = resourceManager.getDistributedResourceUnits(StandardResourceUnitName.PE);597 598 for(ResourceUnit resUnit: procResUnit){599 if (resUnit.getFreeAmount() >= cpuRequest)600 {601 processingUnits = new PEUnit(resUnit.getResourceId(), cpuRequest, cpuRequest);602 break;603 }604 }605 }*/606 492 map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0)); 607 493 } 608 /*int memoryRequest; 609 try { 610 memoryRequest = Double.valueOf(task.getMemoryRequest()).intValue(); 611 } catch (NoSuchFieldException e) { 612 memoryRequest = 0; 613 } 614 if (memoryRequest != 0) { 615 List<ResourceUnit> resUnit = resourceManager.getSharedResourceUnits().get(StandardResourceUnitName.MEMORY); 616 617 Memory memory = null; 618 for (ResourceUnit memUnit : resUnit) { 619 Memory m = (Memory) memUnit; 620 621 if (m.getFreeAmount() >= memoryRequest) { 622 System.out.println(m.getResourceId()+ ";"+m.getAmount()+";"+m.getFreeAmount()); 623 memory = new Memory(m, memoryRequest, memoryRequest); 624 } 625 } 626 if(memory == null) 627 return null; 628 map.put(StandardResourceUnitName.MEMORY, memory); 629 }*/ 494 630 495 return map; 631 496 } 632 633 634 635 public void notifySubmittedWorkloadUnit(WorkloadUnit<?> job, boolean ack) { 497 498 public void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack) { 636 499 updateProcessingProgress(); 637 registerWorkloadUnit( job);638 } 639 640 private void registerWorkloadUnit(WorkloadUnit <?>wu){500 registerWorkloadUnit(wu); 501 } 502 503 private void registerWorkloadUnit(WorkloadUnit wu){ 641 504 if(!wu.isRegistered()){ 642 505 wu.register(jobRegistry); … … 647 510 class LocalWorkloadUnitHandler implements WorkloadUnitHandler{ 648 511 649 public void handleJob(Job job){512 public void handleJob(JobInterface<?> job){ 650 513 651 514 if (log.isInfoEnabled()) … … 654 517 List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>(); 655 518 jobsList.add(job); 656 WorkloadUnitListImpl readyTasks = new WorkloadUnitListImpl();657 for(Task task: jobRegistry.get ReadyTasks(jobsList)){519 TaskListImpl availableTasks = new TaskListImpl(); 520 for(Task task: jobRegistry.getAvailableTasks(jobsList)){ 658 521 task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED); 659 readyTasks.add(task); 660 } 661 662 for(WorkloadUnit<?> e:readyTasks){ 663 registerWorkloadUnit(e); 664 } 665 } 666 667 public void handleTask(TaskInterface<?> ti){ 668 Task task = (Task)ti; 669 670 if(task.getProcesses() == null){ 522 availableTasks.add(task); 523 } 524 525 for(TaskInterface<?> task: availableTasks){ 526 registerWorkloadUnit(task); 527 } 528 } 529 530 public void handleTask(TaskInterface<?> t){ 531 Task task = (Task)t; 532 List<AbstractProcesses> processes = task.getProcesses(); 533 534 if(processes == null || processes.size() == 0){ 671 535 Executable exec = new Executable(task); 672 exec.setUserID(task.getSenderId());673 exec.setLength(task.getLength());674 536 registerWorkloadUnit(exec); 675 537 } else { 676 List<AbstractProcesses> processesList = task.getProcesses(); 677 for(int i = 0; i < processesList.size(); i++){ 678 AbstractProcesses processes = processesList.get(i); 679 Executable exec = new Executable(task, processes); 680 exec.setUserID(task.getSenderId()); 681 exec.setLength(task.getLength()); 538 for(int j = 0; j < processes.size(); j++){ 539 AbstractProcesses procesesSet = processes.get(j); 540 Executable exec = new Executable(task, procesesSet); 682 541 registerWorkloadUnit(exec); 683 542 } … … 686 545 687 546 public void handleExecutable(ExecTask task){ 547 688 548 Executable exec = (Executable) task; 689 690 // int cost = 691 // this.resourceManager.getResourceCharacteristic().getResUnits() != 692 // null ? 693 // this.resourceManager.getResourceCharacteristic().getResUnits().get(ResourceParameterName.COST).getAmount() 694 // : 1; 695 696 exec.visitResource(scheduler.get_name()); 549 jobRegistry.addExecTask(exec); 550 551 exec.trackResource(scheduler.get_name()); 697 552 Scheduler parentScheduler = scheduler.getParent(); 698 while (parentScheduler != null && !exec.getVisitedResources().contains(parentScheduler.get_name())) { 699 exec.visitResource(parentScheduler.get_name()); 553 List<String> visitedResource = exec.getVisitedResources(); 554 String [] visitedResourcesArray = visitedResource.toArray(new String[visitedResource.size()]); 555 while (parentScheduler != null && !ArrayUtils.contains(visitedResourcesArray, parentScheduler.get_name())) { 556 exec.trackResource(parentScheduler.get_name()); 700 557 parentScheduler = parentScheduler.getParent(); 701 558 } 702 703 exec.setResourceParameter(scheduler.get_id(), 1); 704 SubmittedTask subTask = new SubmittedTask(exec); 705 jobRegistry.addTask(subTask); 706 WorkloadUnitListImpl newTasks = new WorkloadUnitListImpl(); 707 newTasks.add(subTask); 708 709 schedulingPlugin.placeJobsInQueues(newTasks, queues, getResourceManager(), moduleList); 710 711 if (subTask.getStatus() == Gridlet.QUEUED) { 559 exec.setSchedulerName(scheduler.get_id()); 560 561 TaskList newTasks = new TaskListImpl(); 562 newTasks.add(exec); 563 564 schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList); 565 566 if (exec.getStatus() == DCWormsTags.QUEUED) { 712 567 sendExecutableReadyEvent(exec); 713 }714 }715 716 public void handleSubmittedTask(SubmittedTask task){717 718 task.visitResource(scheduler.get_name());719 Scheduler parentScheduler = scheduler.getParent();720 while (parentScheduler != null && !task.getVisitedResources().contains(parentScheduler.get_name())) {721 task.visitResource(parentScheduler.get_name());722 parentScheduler = parentScheduler.getParent();723 }724 725 jobRegistry.addTask(task);726 WorkloadUnitListImpl newTasks = new WorkloadUnitListImpl();727 newTasks.add(task);728 729 schedulingPlugin.placeJobsInQueues(newTasks, queues, getResourceManager(), moduleList);730 731 if (task.getStatus() == Gridlet.QUEUED) {732 sendExecutableReadyEvent(task);733 568 } 734 569 }
Note: See TracChangeset
for help on using the changeset viewer.