Changeset 481 for DCWoRMS/trunk/src/schedframe/scheduling/policy/global
- Timestamp:
- 10/08/12 10:23:45 (13 years ago)
- Location:
- DCWoRMS/trunk/src/schedframe/scheduling/policy/global
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
DCWoRMS/trunk/src/schedframe/scheduling/policy/global/GlobalManagementSystem.java
r480 r481 12 12 13 13 import qcg.shared.constants.BrokerConstants; 14 15 import schedframe.events.scheduling.EventReason;16 14 import schedframe.events.scheduling.SchedulingEvent; 17 15 import schedframe.events.scheduling.TaskArrivedEvent; 18 import schedframe.events.scheduling.TaskCanceledEvent;19 16 import schedframe.events.scheduling.TimerEvent; 20 import schedframe.scheduling.Scheduler;21 17 import schedframe.scheduling.WorkloadUnitHandler; 22 import schedframe.scheduling. WorkloadUnitListImpl;18 import schedframe.scheduling.TaskListImpl; 23 19 import schedframe.scheduling.plan.AllocationInterface; 24 20 import schedframe.scheduling.plan.ScheduledTaskInterface; … … 31 27 import schedframe.scheduling.tasks.Job; 32 28 import schedframe.scheduling.tasks.JobInterface; 33 import schedframe.scheduling.tasks.SubmittedTask;34 29 import schedframe.scheduling.tasks.Task; 35 30 import schedframe.scheduling.tasks.TaskInterface; 36 31 import schedframe.scheduling.tasks.WorkloadUnit; 37 38 32 import eduni.simjava.Sim_event; 39 33 import gridsim.GridSim; 40 34 import gridsim.GridSimTags; 41 import gridsim.Gridlet;42 35 import gridsim.IO_data; 43 import gridsim.gssim. WormsTags;36 import gridsim.gssim.DCWormsTags; 44 37 import gssim.schedframe.scheduling.ExecTask; 45 38 import gssim.schedframe.scheduling.Executable; … … 54 47 super(providerId, entityName, execTimeEstimationPlugin, queues); 55 48 56 /*schedulingPlugin = (GlobalSchedulingPlugin) InstanceFactory.createInstance(57 schedulingPluginClassName,58 GlobalSchedulingPlugin.class);*/59 49 if(schedPlugin == null){ 60 50 throw new Exception("Can not create global scheduling plugin instance"); … … 69 59 switch (tag) { 70 60 71 case WormsTags.TIMER:72 if (pluginSupportsEvent( WormsTags.TIMER)) {61 case DCWormsTags.TIMER: 62 if (pluginSupportsEvent(DCWormsTags.TIMER)) { 73 63 TimerEvent event = new TimerEvent(); 74 SchedulingPlanInterface decision = schedulingPlugin.schedule(event,64 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 75 65 queues, getJobRegistry(), getResourceManager(), moduleList); 76 66 executeSchedulingPlan(decision); … … 114 104 115 105 116 protected void schedule ReadyTasks(Job job){106 protected void scheduleAvaialbleTasks(Job job){ 117 107 List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>(); 118 108 jobsList.add(job); 119 WorkloadUnitListImpl readyWorkloadUnits = new WorkloadUnitListImpl();120 ready WorkloadUnits.addAll(jobRegistry.getReadyTasks(jobsList));121 schedulingPlugin.placeJobsInQueues(readyWorkloadUnits, queues, getResourceManager(), moduleList);122 109 TaskListImpl readyTasks = new TaskListImpl(); 110 readyTasks.addAll(jobRegistry.getAvailableTasks(jobsList)); 111 112 schedulingPlugin.placeTasksInQueues(readyTasks, queues, getResourceManager(), moduleList); 123 113 schedule(new TaskArrivedEvent()); 124 114 } … … 126 116 protected void schedule(SchedulingEvent schedulingEvent) { 127 117 128 try { 129 SchedulingPlanInterface decision = schedulingPlugin.schedule( 130 schedulingEvent, queues, getJobRegistry(), getResourceManager(), moduleList); 131 if (decision == null) 132 return; 133 118 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule( 119 schedulingEvent, queues, getJobRegistry(), getResourceManager(), moduleList); 120 if (decision != null) 134 121 executeSchedulingPlan(decision); 135 136 } catch (Exception e) {137 e.printStackTrace();138 }139 122 } 140 123 … … 173 156 } 174 157 else { 175 schedule ReadyTasks(job);158 scheduleAvaialbleTasks(job); 176 159 /*List<JobInterface<?>> jobs = new ArrayList<JobInterface<?>>(); 177 160 jobs.add(jobRegistry.getJobInfo(job.getId())); … … 189 172 } 190 173 191 public void notifyCanceledWorkloadUnit(WorkloadUnit wu){; 192 193 Executable task = (Executable) wu; 194 String jobID = task.getJobId(); 195 String taskID = task.getId(); 196 197 if(log.isDebugEnabled()) 198 log.debug("Received canceled job" + jobID + "_" + taskID); 199 200 TaskInterface<?> ti = jobRegistry.getTaskInfo(jobID, taskID) ; 201 try { 202 203 ti.setStatus((int)BrokerConstants.JOB_STATUS_CANCELED); 204 205 TaskCanceledEvent event = new TaskCanceledEvent(jobID, taskID); 206 event.setReason(EventReason.RESERVATION_EXCEEDED); 207 schedule(event); 208 209 } catch (Exception e) { 210 log.error("Exception during scheduling. " + e.getMessage()); 211 e.printStackTrace(); 212 } 213 } 214 215 protected void executeSchedulingPlan(SchedulingPlanInterface decision) { 216 217 ArrayList<ScheduledTaskInterface> taskSchedulingDecisions = decision.getTasks(); 174 protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) { 175 176 ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks(); 218 177 for (int i = 0; i < taskSchedulingDecisions.size(); i++) { 219 178 220 try { 221 ScheduledTaskInterface taskDecision = taskSchedulingDecisions.get(i); 222 223 //log.info(decision.getDocument()); 224 225 String jobID = taskDecision.getJobId(); 226 String taskID = taskDecision.getTaskId(); 227 228 // Task allocations that were rejected because of lack of resources or which were canceled and 229 // not scheduled again are returned to the user. 230 if(taskDecision.getStatus() == AllocationStatus.REJECTED){ 231 Job job = jobRegistry.getJob(jobID); 232 scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job); 233 continue; 234 } 235 236 ArrayList<AllocationInterface> allocations = taskDecision.getAllocations(); 237 238 Task task = (Task) jobRegistry.getTaskInfo(jobID, taskID); 239 for (int j = 0; j < allocations.size(); j++) { 240 241 AllocationInterface allocation = allocations.get(j); 242 Executable exec = jobRegistry.createExecutable(task, allocation); 243 submitWorkloadUnit(exec, allocation); 244 task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED); 245 } 246 247 }catch (Exception e){ 248 e.printStackTrace(); 249 } 250 } 251 } 252 253 protected void submitWorkloadUnit(WorkloadUnit job, AllocationInterface allocation) { 179 ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i); 180 181 //log.info(decision.getDocument()); 182 183 String jobID = taskDecision.getJobId(); 184 String taskID = taskDecision.getTaskId(); 185 186 // Task allocations that were rejected because of lack of resources or which were canceled and 187 // not scheduled again are returned to the user. 188 if(taskDecision.getStatus() == AllocationStatus.REJECTED){ 189 Job job = jobRegistry.getJob(jobID); 190 scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job); 191 continue; 192 } 193 194 Task task = (Task) jobRegistry.getTaskInfo(jobID, taskID); 195 196 ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations(); 197 for (int j = 0; j < allocations.size(); j++) { 198 199 AllocationInterface<?> allocation = allocations.get(j); 200 Executable exec = createExecutable(task, allocation); 201 submitTask(exec, allocation); 202 task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED); 203 } 204 } 205 } 206 207 private Executable createExecutable(Task task, AllocationInterface<?> allocation) { 208 209 String refersTo = allocation.getProcessGroupId(); // null;//allocation.getRefersTo(); 210 if(refersTo == null) 211 refersTo = task.getId(); 212 213 Executable exec = null; 214 215 if(refersTo.equals(task.getId())){ 216 exec = new Executable(task); 217 } else { 218 List<AbstractProcesses> processes = task.getProcesses(); 219 if(processes == null) { 220 try { 221 log.error("Allocation: " + allocation.getDocument() + "\nrefers to unknown task or processes set." + 222 " Set correct value (task id or prcesses set id) for allocation refersTo attribute."); 223 } catch (Exception e) { 224 e.printStackTrace(); 225 } 226 } 227 boolean found = false; 228 for(int j = 0; j < processes.size() && !found; j++){ 229 AbstractProcesses procesesSet = processes.get(j); 230 if(refersTo.equals(procesesSet.getId())){ 231 exec = new Executable(task, procesesSet); 232 found = true; 233 } 234 } 235 if(!found){ 236 log.error("Allocation refers to unknown proceses set."); 237 } 238 } 239 240 exec.setReservationId(allocation.getReservationId()); 241 242 /*HostInterface<?> host = allocation.getHost(); 243 ComputingResourceTypeInterface<?> crt = host.getMachineParameters(); 244 if(crt != null){ 245 ComputingResourceTypeItemInterface<?> crti = crt.getComputingResourceTypeItem(0); 246 if(crti != null){ 247 ParameterPropertyInterface<?> properties[] = crti.getHostParameter().getProperty(); 248 for(int p = 0; p < properties.length; p++){ 249 ParameterPropertyInterface<?> property = properties[p]; 250 if("chosenCPUs".equals(property.getName())){ 251 Object cpuNames = property.getValue(); 252 exec.addSpecificResource(ResourceParameterName.FREECPUS, cpuNames); 253 } 254 } 255 } 256 }*/ 257 return exec; 258 } 259 260 protected void submitTask(TaskInterface<?> task, AllocationInterface<?> allocation) { 254 261 255 262 String providerName = allocation.getProviderName(); … … 257 264 return; 258 265 } 259 TaskInterface<?> task = (TaskInterface<?>) job;260 266 removeFromQueue(task); 261 267 262 268 int resID = GridSim.getEntityId(providerName); 263 IO_data data = new IO_data( job, 0, resID);269 IO_data data = new IO_data(task, 0, resID); 264 270 scheduler.send(scheduler.getOutputPort(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, data); 265 271 266 272 //scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, job); 267 273 if(log.isDebugEnabled()) 268 log.debug("Submitted job " + job.getId() + " to " + providerName);274 log.debug("Submitted job " + task.getId() + " to " + providerName); 269 275 270 276 } … … 274 280 public void handleJob(Job job){ 275 281 276 jobRegistry.addJob(job);277 282 if (log.isInfoEnabled()) 278 283 log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis())); 279 284 280 scheduleReadyTasks(job); 285 jobRegistry.addJob(job); 286 scheduleAvaialbleTasks(job); 281 287 } 282 288 … … 289 295 } 290 296 291 public void handleSubmittedTask(SubmittedTask task) {292 throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler.");293 }294 297 } 295 298 … … 297 300 return new GlobalWorkloadUnitHandler(); 298 301 } 299 300 302 301 303 -
DCWoRMS/trunk/src/schedframe/scheduling/policy/global/GridBroker.java
r477 r481 19 19 public class GridBroker extends GlobalManagementSystem { 20 20 21 22 21 private static Log log = LogFactory.getLog(GridBroker.class); 23 22 … … 27 26 public GridBroker(String name, SchedulingPlugin schedulingPlugin, ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues) throws Exception { 28 27 super(name, "BROKER", schedulingPlugin, execTimeEstimationPlugin, queues); 29 30 //make use of plug-in interface 31 32 //Properties prop = new Properties(); 33 //prop.put("plugin.name", name); 34 //prop.put("plugin.utils.timeoperations", "gssim.scheduling.plugin.local.GssimTimeOperations"); 35 //schedulingPlugin.init(prop); 28 36 29 otherGridSchedulersIds = new HashSet<Integer>(); 37 38 30 moduleList = new ModuleListImpl(2); 39 //this.moduleList.add(new GridResourceDiscovery(this.getScheduler()));40 //moduleList.add(new GridReservationManagerNew(this));41 42 if(log.isDebugEnabled())43 log.debug(name + ": Creating a broker interface object");44 31 } 45 32 46 33 public void init(Scheduler scheduler, ManagedResources managedResources) { 47 34 super.init(scheduler, managedResources); 48 //this.scheduler = scheduler;49 //this.resourceManager = ResourceManagerFactory.createResourceManager(scheduler, managedResources);50 35 this.moduleList.add((GridResourceDiscovery)resourceManager); 51 36 } … … 57 42 } 58 43 return providerIds; 59 //return GridSim.getGridResourceList();60 44 } 61 45
Note: See TracChangeset
for help on using the changeset viewer.