- Timestamp:
- 10/31/12 13:52:06 (12 years ago)
- Location:
- DCWoRMS/trunk/build/classes/schedframe/scheduling/policy/global
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
DCWoRMS/trunk/build/classes/schedframe/scheduling/policy/global/GlobalManagementSystem.java
r477 r539 11 11 import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus; 12 12 13 import dcworms.schedframe.scheduling.ExecTask; 14 import dcworms.schedframe.scheduling.Executable; 15 13 16 import qcg.shared.constants.BrokerConstants; 14 15 import schedframe.events.scheduling.EventReason;16 import schedframe.events.scheduling.SchedulingEvent;17 17 import schedframe.events.scheduling.TaskArrivedEvent; 18 import schedframe.events.scheduling.TaskCanceledEvent;19 18 import schedframe.events.scheduling.TimerEvent; 20 import schedframe.scheduling. Scheduler;19 import schedframe.scheduling.TaskListImpl; 21 20 import schedframe.scheduling.WorkloadUnitHandler; 22 import schedframe.scheduling.WorkloadUnitListImpl;23 21 import schedframe.scheduling.plan.AllocationInterface; 24 22 import schedframe.scheduling.plan.ScheduledTaskInterface; … … 31 29 import schedframe.scheduling.tasks.Job; 32 30 import schedframe.scheduling.tasks.JobInterface; 33 import schedframe.scheduling.tasks.SubmittedTask;34 31 import schedframe.scheduling.tasks.Task; 35 32 import schedframe.scheduling.tasks.TaskInterface; 36 33 import schedframe.scheduling.tasks.WorkloadUnit; 37 38 34 import eduni.simjava.Sim_event; 39 35 import gridsim.GridSim; 40 36 import gridsim.GridSimTags; 41 import gridsim.Gridlet;42 37 import gridsim.IO_data; 43 import gridsim.gssim.WormsTags; 44 import gssim.schedframe.scheduling.ExecTask; 45 import gssim.schedframe.scheduling.Executable; 38 import gridsim.dcworms.DCWormsTags; 46 39 47 40 public class GlobalManagementSystem extends AbstractManagementSystem { … … 54 47 super(providerId, entityName, execTimeEstimationPlugin, queues); 55 48 56 /*schedulingPlugin = (GlobalSchedulingPlugin) InstanceFactory.createInstance(57 schedulingPluginClassName,58 GlobalSchedulingPlugin.class);*/59 49 if(schedPlugin == null){ 60 50 throw new Exception("Can not create global scheduling plugin instance"); … … 69 59 switch (tag) { 70 60 71 case WormsTags.TIMER:72 if (pluginSupportsEvent( WormsTags.TIMER)) {61 case DCWormsTags.TIMER: 62 if (pluginSupportsEvent(DCWormsTags.TIMER)) { 73 63 TimerEvent event = new TimerEvent(); 74 SchedulingPlanInterface decision = schedulingPlugin.schedule(event,64 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 75 65 queues, getJobRegistry(), getResourceManager(), moduleList); 76 66 executeSchedulingPlan(decision); … … 81 71 } 82 72 83 public void notifySubmittedWorkloadUnit(WorkloadUnit <?>wu, boolean ack) {73 public void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack) { 84 74 if (!pluginSupportsEvent(GridSimTags.GRIDLET_SUBMIT)) { 85 75 log.error("Plugin " + schedulingPlugin.getClass() … … 90 80 91 81 registerWorkloadUnit(wu); 92 /*Job job = (Job) wu; 93 jobRegistry.addJob(job); 94 95 if (log.isInfoEnabled()) 96 log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis())); 97 82 83 } 84 85 private void registerWorkloadUnit(WorkloadUnit wu){ 86 if(!wu.isRegistered()){ 87 wu.register(jobRegistry); 88 } 89 wu.accept(getWorkloadUnitHandler()); 90 } 91 92 93 protected void schedule(JobInterface<?> job){ 98 94 List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>(); 99 95 jobsList.add(job); 100 WorkloadUnitList readyWorkloadUnits = new WorkloadUnitList(); 101 readyWorkloadUnits.addAll(jobRegistry.getReadyTasks(jobsList)); 102 schedulingPlugin.placeJobsInQueues(readyWorkloadUnits, queues, getResourceManager(), moduleList); 103 104 schedule(new TaskArrivedEvent());*/ 105 106 } 107 108 private void registerWorkloadUnit(WorkloadUnit<?> wu){ 109 if(!wu.isRegistered()){ 110 wu.register(jobRegistry); 111 } 112 wu.accept(getWorkloadUnitHandler()); 113 } 114 115 116 protected void scheduleReadyTasks(Job job){ 117 List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>(); 118 jobsList.add(job); 119 WorkloadUnitListImpl readyWorkloadUnits = new WorkloadUnitListImpl(); 120 readyWorkloadUnits.addAll(jobRegistry.getReadyTasks(jobsList)); 121 schedulingPlugin.placeJobsInQueues(readyWorkloadUnits, queues, getResourceManager(), moduleList); 122 123 schedule(new TaskArrivedEvent()); 124 } 125 126 protected void schedule(SchedulingEvent schedulingEvent) { 127 128 try { 129 SchedulingPlanInterface decision = schedulingPlugin.schedule( 130 schedulingEvent, queues, getJobRegistry(), getResourceManager(), moduleList); 131 if (decision == null) 132 return; 133 96 TaskListImpl readyTasks = new TaskListImpl(); 97 readyTasks.addAll(jobRegistry.getAvailableTasks(jobsList)); 98 99 schedulingPlugin.placeTasksInQueues(readyTasks, queues, getResourceManager(), moduleList); 100 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule( 101 new TaskArrivedEvent(), queues, getJobRegistry(), getResourceManager(), moduleList); 102 if (decision != null) 134 103 executeSchedulingPlan(decision); 135 136 } catch (Exception e) { 137 e.printStackTrace(); 138 } 139 } 140 141 public void notifyReturnedWorkloadUnit(WorkloadUnit<?> wu) { 104 } 105 106 public void notifyReturnedWorkloadUnit(WorkloadUnit wu) { 142 107 Executable exec = (Executable) wu; 143 108 … … 149 114 150 115 try { 151 Job job = jobRegistry.get (exec.getJobId());152 /*Task task = job.getTask(exec.getTaskId());116 Job job = jobRegistry.getJob(exec.getJobId()); 117 Task task = job.getTask(exec.getTaskId()); 153 118 if(exec.getProcessesId() == null){ 154 119 try { 155 120 task.setStatus(exec.getStatus()); 156 121 } catch (Exception e) { 157 // TODO Auto-generated catch block 158 e.printStackTrace(); 122 159 123 } 160 124 } else { … … 167 131 } 168 132 } 169 } */133 } 170 134 171 135 if(job.isFinished()){ … … 173 137 } 174 138 else { 175 scheduleReadyTasks(job); 176 /*List<JobInterface<?>> jobs = new ArrayList<JobInterface<?>>(); 177 jobs.add(jobRegistry.getJobInfo(job.getId())); 178 WorkloadUnitList readyWorkloadUnits = new WorkloadUnitList(); 179 readyWorkloadUnits.addAll(jobRegistry.getReadyTasks(jobs)); 180 schedulingPlugin.placeJobsInQueues(readyWorkloadUnits, queues, 181 getResourceManager(), moduleList); 182 schedule(new TaskArrivedEvent());*/ 139 schedule(job); 183 140 } 184 141 … … 189 146 } 190 147 191 public void notifyCanceledWorkloadUnit(WorkloadUnit<?> wu){; 192 193 Executable task = (Executable) wu; 194 String jobID = task.getJobId(); 195 String taskID = task.getId(); 196 197 if(log.isDebugEnabled()) 198 log.debug("Received canceled job" + jobID + "_" + taskID); 199 200 TaskInterface<?> ti = jobRegistry.getTaskInfo(jobID, taskID) ; 201 try { 202 203 ti.setStatus((int)BrokerConstants.JOB_STATUS_CANCELED); 204 205 TaskCanceledEvent event = new TaskCanceledEvent(jobID, taskID); 206 event.setReason(EventReason.RESERVATION_EXCEEDED); 207 schedule(event); 208 209 } catch (Exception e) { 210 log.error("Exception during scheduling. " + e.getMessage()); 211 e.printStackTrace(); 212 } 213 } 214 215 protected void executeSchedulingPlan(SchedulingPlanInterface decision) { 216 217 ArrayList<ScheduledTaskInterface> taskSchedulingDecisions = decision.getTasks(); 148 protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) { 149 150 ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks(); 218 151 for (int i = 0; i < taskSchedulingDecisions.size(); i++) { 219 152 220 try { 221 ScheduledTaskInterface taskDecision = taskSchedulingDecisions.get(i); 222 223 //log.info(decision.getDocument()); 224 225 String jobID = taskDecision.getJobId(); 226 String taskID = taskDecision.getTaskId(); 227 228 // Task allocations that were rejected because of lack of resources or which were canceled and 229 // not scheduled again are returned to the user. 230 if(taskDecision.getStatus() == AllocationStatus.REJECTED){ 231 Job job = jobRegistry.get(jobID); 232 scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job); 233 continue; 234 } 235 236 ArrayList<AllocationInterface> allocations = taskDecision.getAllocations(); 237 238 Task task = (Task) jobRegistry.getTaskInfo(jobID, taskID); 239 for (int j = 0; j < allocations.size(); j++) { 240 241 AllocationInterface allocation = allocations.get(j); 242 Executable exec = jobRegistry.createExecutable(task, allocation); 243 submitWorkloadUnit(exec, allocation); 244 task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED); 245 } 246 247 }catch (Exception e){ 248 e.printStackTrace(); 249 } 250 } 251 } 252 253 protected void submitWorkloadUnit(WorkloadUnit<?> job, AllocationInterface allocation) { 153 ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i); 154 155 //log.info(decision.getDocument()); 156 157 String jobID = taskDecision.getJobId(); 158 String taskID = taskDecision.getTaskId(); 159 160 // Task allocations that were rejected because of lack of resources or which were canceled and 161 // not scheduled again are returned to the user. 162 if(taskDecision.getStatus() == AllocationStatus.REJECTED){ 163 Job job = jobRegistry.getJob(jobID); 164 scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job); 165 continue; 166 } 167 168 Task task = (Task) jobRegistry.getTaskInfo(jobID, taskID); 169 170 ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations(); 171 for (int j = 0; j < allocations.size(); j++) { 172 173 AllocationInterface<?> allocation = allocations.get(j); 174 Executable exec = createExecutable(task, allocation); 175 submitTask(exec, allocation); 176 task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED); 177 } 178 } 179 } 180 181 private Executable createExecutable(Task task, AllocationInterface<?> allocation) { 182 183 String refersTo = allocation.getProcessGroupId(); // null;//allocation.getRefersTo(); 184 if(refersTo == null) 185 refersTo = task.getId(); 186 187 Executable exec = null; 188 189 if(refersTo.equals(task.getId())){ 190 exec = new Executable(task); 191 } else { 192 List<AbstractProcesses> processes = task.getProcesses(); 193 if(processes == null) { 194 try { 195 log.error("Allocation: " + allocation.getDocument() + "\nrefers to unknown task or processes set." + 196 " Set correct value (task id or prcesses set id) for allocation refersTo attribute."); 197 } catch (Exception e) { 198 e.printStackTrace(); 199 } 200 } 201 boolean found = false; 202 for(int j = 0; j < processes.size() && !found; j++){ 203 AbstractProcesses procesesSet = processes.get(j); 204 if(refersTo.equals(procesesSet.getId())){ 205 exec = new Executable(task, procesesSet); 206 found = true; 207 } 208 } 209 if(!found){ 210 log.error("Allocation refers to unknown proceses set."); 211 } 212 } 213 214 exec.setReservationId(allocation.getReservationId()); 215 216 /*HostInterface<?> host = allocation.getHost(); 217 ComputingResourceTypeInterface<?> crt = host.getMachineParameters(); 218 if(crt != null){ 219 ComputingResourceTypeItemInterface<?> crti = crt.getComputingResourceTypeItem(0); 220 if(crti != null){ 221 ParameterPropertyInterface<?> properties[] = crti.getHostParameter().getProperty(); 222 for(int p = 0; p < properties.length; p++){ 223 ParameterPropertyInterface<?> property = properties[p]; 224 if("chosenCPUs".equals(property.getName())){ 225 Object cpuNames = property.getValue(); 226 exec.addSpecificResource(ResourceParameterName.FREECPUS, cpuNames); 227 } 228 } 229 } 230 }*/ 231 return exec; 232 } 233 234 protected void submitTask(TaskInterface<?> task, AllocationInterface<?> allocation) { 254 235 255 236 String providerName = allocation.getProviderName(); … … 257 238 return; 258 239 } 259 TaskInterface<?> task = (TaskInterface<?>) job;260 240 removeFromQueue(task); 261 241 262 242 int resID = GridSim.getEntityId(providerName); 263 IO_data data = new IO_data( job, 0, resID);243 IO_data data = new IO_data(task, 0, resID); 264 244 scheduler.send(scheduler.getOutputPort(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, data); 265 266 //scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, job);245 //scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, job); 246 267 247 if(log.isDebugEnabled()) 268 try { 269 log.debug("Submitted job " + job.getId() + " to " + providerName); 270 } catch (NoSuchFieldException e) { 271 // TODO Auto-generated catch block 272 e.printStackTrace(); 273 } 248 log.debug("Submitted job " + task.getId() + " to " + providerName); 249 274 250 } 275 251 276 252 class GlobalWorkloadUnitHandler implements WorkloadUnitHandler{ 277 253 278 public void handleJob(Job job){ 279 280 jobRegistry.addJob(job); 254 public void handleJob(JobInterface<?> job){ 281 255 if (log.isInfoEnabled()) 282 256 log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis())); 283 257 284 scheduleReadyTasks(job); 258 jobRegistry.addJob(job); 259 schedule(job); 285 260 } 286 261 … … 292 267 throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler."); 293 268 } 294 295 public void handleSubmittedTask(SubmittedTask task) {296 throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler.");297 }298 269 } 299 270 … … 301 272 return new GlobalWorkloadUnitHandler(); 302 273 } 303 304 274 305 275 -
DCWoRMS/trunk/build/classes/schedframe/scheduling/policy/global/GridBroker.java
r477 r539 19 19 public class GridBroker extends GlobalManagementSystem { 20 20 21 22 21 private static Log log = LogFactory.getLog(GridBroker.class); 23 22 … … 27 26 public GridBroker(String name, SchedulingPlugin schedulingPlugin, ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues) throws Exception { 28 27 super(name, "BROKER", schedulingPlugin, execTimeEstimationPlugin, queues); 29 30 //make use of plug-in interface 31 32 //Properties prop = new Properties(); 33 //prop.put("plugin.name", name); 34 //prop.put("plugin.utils.timeoperations", "gssim.scheduling.plugin.local.GssimTimeOperations"); 35 //schedulingPlugin.init(prop); 28 36 29 otherGridSchedulersIds = new HashSet<Integer>(); 37 38 30 moduleList = new ModuleListImpl(2); 39 //this.moduleList.add(new GridResourceDiscovery(this.getScheduler()));40 //moduleList.add(new GridReservationManagerNew(this));41 42 if(log.isDebugEnabled())43 log.debug(name + ": Creating a broker interface object");44 31 } 45 32 46 33 public void init(Scheduler scheduler, ManagedResources managedResources) { 47 34 super.init(scheduler, managedResources); 48 //this.scheduler = scheduler;49 //this.resourceManager = ResourceManagerFactory.createResourceManager(scheduler, managedResources);50 35 this.moduleList.add((GridResourceDiscovery)resourceManager); 51 36 } … … 57 42 } 58 43 return providerIds; 59 //return GridSim.getGridResourceList();60 44 } 61 45
Note: See TracChangeset
for help on using the changeset viewer.