Changeset 481 for DCWoRMS/trunk/src/schedframe/scheduling/policy
- Timestamp:
- 10/08/12 10:23:45 (13 years ago)
- Location:
- DCWoRMS/trunk/src/schedframe/scheduling/policy
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
DCWoRMS/trunk/src/schedframe/scheduling/policy/AbstractManagementSystem.java
r480 r481 10 10 import schedframe.PluginConfiguration; 11 11 import schedframe.events.scheduling.SchedulingEventType; 12 import schedframe.resources.units.StandardResourceUnitName;13 12 import schedframe.scheduling.Scheduler; 14 13 import schedframe.scheduling.WorkloadUnitHandler; … … 16 15 import schedframe.scheduling.manager.resources.ResourceManager; 17 16 import schedframe.scheduling.manager.resources.ResourceManagerFactory; 17 import schedframe.scheduling.manager.tasks.JobRegistry; 18 18 import schedframe.scheduling.manager.tasks.JobRegistryImpl; 19 import schedframe.scheduling.manager.tasks.JobRegistry;20 19 import schedframe.scheduling.plan.AllocationInterface; 21 20 import schedframe.scheduling.plan.SchedulingPlanInterface; … … 27 26 import schedframe.scheduling.queue.TaskQueueList; 28 27 import schedframe.scheduling.tasks.Job; 28 import schedframe.scheduling.tasks.TaskInterface; 29 29 import schedframe.scheduling.tasks.WorkloadUnit; 30 import simulator. WormsConstants;30 import simulator.DCWormsConstants; 31 31 import eduni.simjava.Sim_event; 32 32 import gridsim.GridSim; 33 33 import gridsim.GridSimTags; 34 import gridsim.Gridlet;35 34 import gridsim.IO_data; 36 import gridsim.gssim. WormsTags;35 import gridsim.gssim.DCWormsTags; 37 36 import gssim.schedframe.scheduling.ExecTask; 38 37 import gssim.schedframe.scheduling.Executable; … … 45 44 protected String name; 46 45 46 protected TaskQueueList queues; 47 47 protected ResourceManager resourceManager; 48 49 protected TaskQueueList queues;50 48 protected JobRegistryImpl jobRegistry; 49 protected ModuleList moduleList; 50 51 51 protected SchedulingPlugin schedulingPlugin; 52 53 52 protected ExecutionTimeEstimationPlugin execTimeEstimationPlugin; 54 53 55 protected ModuleList moduleList; 56 57 protected JobRegistryImpl jobRegistry; 54 protected Scheduler scheduler; 58 55 59 56 … … 67 64 } 68 65 66 public void init(Scheduler sched, ManagedResources managedResources) { 67 scheduler = sched; 68 resourceManager = ResourceManagerFactory.createResourceManager(scheduler, managedResources); 69 scheduler.set_stat(DCWormsConstants.getResourcesStatisticsObject(queues.size())); 70 for(int i = 0; i < queues.size(); i++){ 71 TaskQueue q = queues.get(i); 72 if(q instanceof AbstractStatsSupportingQueue<?>){ 73 AbstractStatsSupportingQueue<?> queue = (AbstractStatsSupportingQueue<?>) q; 74 queue.setStats(scheduler.get_stat(), DCWormsConstants.TASKS_QUEUE_LENGTH_MEASURE_NAME + "_" + Integer.toString(i)); 75 } 76 } 77 } 78 69 79 public void processEvent(Sim_event ev) { 70 80 processOtherEvent(ev); … … 81 91 } 82 92 93 83 94 public String getName() { 84 95 return name; 85 }86 87 public PluginConfiguration getSchedulingPluginConfiguration() {88 return schedulingPlugin.getConfiguration();89 96 } 90 97 … … 106 113 return jobRegistry; 107 114 } 115 116 public Scheduler getScheduler() { 117 return scheduler; 118 } 119 120 public PluginConfiguration getSchedulingPluginConfiguration() { 121 return schedulingPlugin.getConfiguration(); 122 } 108 123 109 124 public boolean pluginSupportsEvent(int eventType){ … … 111 126 } 112 127 113 public abstract void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack); 114 115 public abstract void notifyCanceledWorkloadUnit(WorkloadUnit wu); 116 117 public abstract void notifyReturnedWorkloadUnit(WorkloadUnit wu); 118 119 protected abstract void executeSchedulingPlan(SchedulingPlanInterface decision); 120 121 122 128 public TaskQueueList getQueues(){ 129 return queues; 130 } 131 132 public Map<String, Integer> getQueuesSize() { 133 Map<String, Integer> queue_size = new HashMap<String, Integer>(); 134 for (TaskQueue queue : queues) { 135 queue_size.put(queue.getName(), queue.size()); 136 } 137 return queue_size; 138 } 139 123 140 //POPRAWIC (ale co? bo teraz chyba jest ok) 124 protected void submit WorkloadUnit(WorkloadUnit wu, AllocationInterfaceallocation) {141 protected void submitTask(TaskInterface<?> task, AllocationInterface<?> allocation) { 125 142 String providerName = allocation.getProviderName(); 126 143 if (providerName == null) { … … 128 145 } 129 146 //Executable exec = (Executable) wu; 130 removeFromQueue(wu); 131 scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, wu); 132 } 133 134 protected boolean sendCanceledWorkloadUnit(int tag, Executable task, int executableId, int destId) { 135 136 if (tag != GridSimTags.GRIDLET_CANCEL) { 137 return false; 138 } 139 140 long taskSize = 0; 141 if (task != null) { 142 taskSize = task.getGridletOutputSize(); 143 } 144 145 // if no Gridlet found, then create a new Gridlet but set its status 146 // to FAILED. Then, most importantly, set the resource parameters 147 // because the user will search/filter based on a resource ID. 148 else if (task == null) { 149 try { 150 taskSize = 100; 151 task = jobRegistry.getTaskExecutable(executableId); 152 task.setGridletStatus(Gridlet.FAILED); 153 int cost = resourceManager.getSharedResourceUnits().get(StandardResourceUnitName.COST) != null ? resourceManager 154 .getSharedResourceUnits().get(StandardResourceUnitName.COST).get(0).getAmount() 155 : 1; 156 task.setResourceParameter(scheduler.get_id(), cost); 157 } catch (Exception e) { 158 // empty ... 159 } 160 } 161 scheduler.send(scheduler.getOutputPort(), GridSimTags.SCHEDULE_NOW, tag, new IO_data(task, taskSize, destId)); 162 163 return true; 147 removeFromQueue(task); 148 scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, task); 164 149 } 165 150 … … 184 169 185 170 protected void sendExecutableReadyEvent(ExecTask exec) { 186 187 /*if (wu instanceof JobInterface) {188 scheduler.sendInternal(Long.valueOf(0).doubleValue(), GssimTags.TASK_READY_FOR_EXECUTION,189 wu);190 return;191 }*/192 171 193 172 long delay = 0; … … 202 181 } 203 182 204 scheduler.sendInternal(Long.valueOf(delay).doubleValue(), WormsTags.TASK_READY_FOR_EXECUTION,183 scheduler.sendInternal(Long.valueOf(delay).doubleValue(), DCWormsTags.TASK_READY_FOR_EXECUTION, 205 184 exec); 206 185 } … … 214 193 if (obj != null) { 215 194 int delay = (Integer) obj; 216 scheduler.sendInternal(delay, WormsTags.TIMER, null);195 scheduler.sendInternal(delay, DCWormsTags.TIMER, null); 217 196 } 218 197 } … … 229 208 return false; 230 209 } 231 232 public TaskQueueList getAccessQueues(){233 return queues;234 }235 236 public Map<String, Integer> getQueuesSize() {237 Map<String, Integer> queue_size = new HashMap<String, Integer>();238 for (TaskQueue queue : queues) {239 queue_size.put(queue.getName(), queue.size());240 }241 return queue_size;242 }243 244 public void init(Scheduler sched, ManagedResources managedResources) {245 scheduler = sched;246 resourceManager = ResourceManagerFactory.createResourceManager(scheduler, managedResources);247 scheduler.set_stat(WormsConstants.getResourcesStatisticsObject(queues.size()));248 for(int i = 0; i < queues.size(); i++){249 TaskQueue q = queues.get(i);250 if(q instanceof AbstractStatsSupportingQueue<?>){251 AbstractStatsSupportingQueue<?> queue = (AbstractStatsSupportingQueue<?>) q;252 queue.setStats(scheduler.get_stat(), WormsConstants.TASKS_QUEUE_LENGTH_MEASURE_NAME + "_" + Integer.toString(i));253 }254 }255 }256 257 protected Scheduler scheduler;258 259 public Scheduler getScheduler() {260 return scheduler;261 }262 210 263 211 public abstract WorkloadUnitHandler getWorkloadUnitHandler(); 212 213 public abstract void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack); 214 215 public abstract void notifyReturnedWorkloadUnit(WorkloadUnit wu); 216 217 protected abstract void executeSchedulingPlan(SchedulingPlanInterface<?> decision); 264 218 265 219 } -
DCWoRMS/trunk/src/schedframe/scheduling/policy/global/GlobalManagementSystem.java
r480 r481 12 12 13 13 import qcg.shared.constants.BrokerConstants; 14 15 import schedframe.events.scheduling.EventReason;16 14 import schedframe.events.scheduling.SchedulingEvent; 17 15 import schedframe.events.scheduling.TaskArrivedEvent; 18 import schedframe.events.scheduling.TaskCanceledEvent;19 16 import schedframe.events.scheduling.TimerEvent; 20 import schedframe.scheduling.Scheduler;21 17 import schedframe.scheduling.WorkloadUnitHandler; 22 import schedframe.scheduling. WorkloadUnitListImpl;18 import schedframe.scheduling.TaskListImpl; 23 19 import schedframe.scheduling.plan.AllocationInterface; 24 20 import schedframe.scheduling.plan.ScheduledTaskInterface; … … 31 27 import schedframe.scheduling.tasks.Job; 32 28 import schedframe.scheduling.tasks.JobInterface; 33 import schedframe.scheduling.tasks.SubmittedTask;34 29 import schedframe.scheduling.tasks.Task; 35 30 import schedframe.scheduling.tasks.TaskInterface; 36 31 import schedframe.scheduling.tasks.WorkloadUnit; 37 38 32 import eduni.simjava.Sim_event; 39 33 import gridsim.GridSim; 40 34 import gridsim.GridSimTags; 41 import gridsim.Gridlet;42 35 import gridsim.IO_data; 43 import gridsim.gssim. WormsTags;36 import gridsim.gssim.DCWormsTags; 44 37 import gssim.schedframe.scheduling.ExecTask; 45 38 import gssim.schedframe.scheduling.Executable; … … 54 47 super(providerId, entityName, execTimeEstimationPlugin, queues); 55 48 56 /*schedulingPlugin = (GlobalSchedulingPlugin) InstanceFactory.createInstance(57 schedulingPluginClassName,58 GlobalSchedulingPlugin.class);*/59 49 if(schedPlugin == null){ 60 50 throw new Exception("Can not create global scheduling plugin instance"); … … 69 59 switch (tag) { 70 60 71 case WormsTags.TIMER:72 if (pluginSupportsEvent( WormsTags.TIMER)) {61 case DCWormsTags.TIMER: 62 if (pluginSupportsEvent(DCWormsTags.TIMER)) { 73 63 TimerEvent event = new TimerEvent(); 74 SchedulingPlanInterface decision = schedulingPlugin.schedule(event,64 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 75 65 queues, getJobRegistry(), getResourceManager(), moduleList); 76 66 executeSchedulingPlan(decision); … … 114 104 115 105 116 protected void schedule ReadyTasks(Job job){106 protected void scheduleAvaialbleTasks(Job job){ 117 107 List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>(); 118 108 jobsList.add(job); 119 WorkloadUnitListImpl readyWorkloadUnits = new WorkloadUnitListImpl();120 ready WorkloadUnits.addAll(jobRegistry.getReadyTasks(jobsList));121 schedulingPlugin.placeJobsInQueues(readyWorkloadUnits, queues, getResourceManager(), moduleList);122 109 TaskListImpl readyTasks = new TaskListImpl(); 110 readyTasks.addAll(jobRegistry.getAvailableTasks(jobsList)); 111 112 schedulingPlugin.placeTasksInQueues(readyTasks, queues, getResourceManager(), moduleList); 123 113 schedule(new TaskArrivedEvent()); 124 114 } … … 126 116 protected void schedule(SchedulingEvent schedulingEvent) { 127 117 128 try { 129 SchedulingPlanInterface decision = schedulingPlugin.schedule( 130 schedulingEvent, queues, getJobRegistry(), getResourceManager(), moduleList); 131 if (decision == null) 132 return; 133 118 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule( 119 schedulingEvent, queues, getJobRegistry(), getResourceManager(), moduleList); 120 if (decision != null) 134 121 executeSchedulingPlan(decision); 135 136 } catch (Exception e) {137 e.printStackTrace();138 }139 122 } 140 123 … … 173 156 } 174 157 else { 175 schedule ReadyTasks(job);158 scheduleAvaialbleTasks(job); 176 159 /*List<JobInterface<?>> jobs = new ArrayList<JobInterface<?>>(); 177 160 jobs.add(jobRegistry.getJobInfo(job.getId())); … … 189 172 } 190 173 191 public void notifyCanceledWorkloadUnit(WorkloadUnit wu){; 192 193 Executable task = (Executable) wu; 194 String jobID = task.getJobId(); 195 String taskID = task.getId(); 196 197 if(log.isDebugEnabled()) 198 log.debug("Received canceled job" + jobID + "_" + taskID); 199 200 TaskInterface<?> ti = jobRegistry.getTaskInfo(jobID, taskID) ; 201 try { 202 203 ti.setStatus((int)BrokerConstants.JOB_STATUS_CANCELED); 204 205 TaskCanceledEvent event = new TaskCanceledEvent(jobID, taskID); 206 event.setReason(EventReason.RESERVATION_EXCEEDED); 207 schedule(event); 208 209 } catch (Exception e) { 210 log.error("Exception during scheduling. " + e.getMessage()); 211 e.printStackTrace(); 212 } 213 } 214 215 protected void executeSchedulingPlan(SchedulingPlanInterface decision) { 216 217 ArrayList<ScheduledTaskInterface> taskSchedulingDecisions = decision.getTasks(); 174 protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) { 175 176 ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks(); 218 177 for (int i = 0; i < taskSchedulingDecisions.size(); i++) { 219 178 220 try { 221 ScheduledTaskInterface taskDecision = taskSchedulingDecisions.get(i); 222 223 //log.info(decision.getDocument()); 224 225 String jobID = taskDecision.getJobId(); 226 String taskID = taskDecision.getTaskId(); 227 228 // Task allocations that were rejected because of lack of resources or which were canceled and 229 // not scheduled again are returned to the user. 230 if(taskDecision.getStatus() == AllocationStatus.REJECTED){ 231 Job job = jobRegistry.getJob(jobID); 232 scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job); 233 continue; 234 } 235 236 ArrayList<AllocationInterface> allocations = taskDecision.getAllocations(); 237 238 Task task = (Task) jobRegistry.getTaskInfo(jobID, taskID); 239 for (int j = 0; j < allocations.size(); j++) { 240 241 AllocationInterface allocation = allocations.get(j); 242 Executable exec = jobRegistry.createExecutable(task, allocation); 243 submitWorkloadUnit(exec, allocation); 244 task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED); 245 } 246 247 }catch (Exception e){ 248 e.printStackTrace(); 249 } 250 } 251 } 252 253 protected void submitWorkloadUnit(WorkloadUnit job, AllocationInterface allocation) { 179 ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i); 180 181 //log.info(decision.getDocument()); 182 183 String jobID = taskDecision.getJobId(); 184 String taskID = taskDecision.getTaskId(); 185 186 // Task allocations that were rejected because of lack of resources or which were canceled and 187 // not scheduled again are returned to the user. 188 if(taskDecision.getStatus() == AllocationStatus.REJECTED){ 189 Job job = jobRegistry.getJob(jobID); 190 scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job); 191 continue; 192 } 193 194 Task task = (Task) jobRegistry.getTaskInfo(jobID, taskID); 195 196 ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations(); 197 for (int j = 0; j < allocations.size(); j++) { 198 199 AllocationInterface<?> allocation = allocations.get(j); 200 Executable exec = createExecutable(task, allocation); 201 submitTask(exec, allocation); 202 task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED); 203 } 204 } 205 } 206 207 private Executable createExecutable(Task task, AllocationInterface<?> allocation) { 208 209 String refersTo = allocation.getProcessGroupId(); // null;//allocation.getRefersTo(); 210 if(refersTo == null) 211 refersTo = task.getId(); 212 213 Executable exec = null; 214 215 if(refersTo.equals(task.getId())){ 216 exec = new Executable(task); 217 } else { 218 List<AbstractProcesses> processes = task.getProcesses(); 219 if(processes == null) { 220 try { 221 log.error("Allocation: " + allocation.getDocument() + "\nrefers to unknown task or processes set." + 222 " Set correct value (task id or prcesses set id) for allocation refersTo attribute."); 223 } catch (Exception e) { 224 e.printStackTrace(); 225 } 226 } 227 boolean found = false; 228 for(int j = 0; j < processes.size() && !found; j++){ 229 AbstractProcesses procesesSet = processes.get(j); 230 if(refersTo.equals(procesesSet.getId())){ 231 exec = new Executable(task, procesesSet); 232 found = true; 233 } 234 } 235 if(!found){ 236 log.error("Allocation refers to unknown proceses set."); 237 } 238 } 239 240 exec.setReservationId(allocation.getReservationId()); 241 242 /*HostInterface<?> host = allocation.getHost(); 243 ComputingResourceTypeInterface<?> crt = host.getMachineParameters(); 244 if(crt != null){ 245 ComputingResourceTypeItemInterface<?> crti = crt.getComputingResourceTypeItem(0); 246 if(crti != null){ 247 ParameterPropertyInterface<?> properties[] = crti.getHostParameter().getProperty(); 248 for(int p = 0; p < properties.length; p++){ 249 ParameterPropertyInterface<?> property = properties[p]; 250 if("chosenCPUs".equals(property.getName())){ 251 Object cpuNames = property.getValue(); 252 exec.addSpecificResource(ResourceParameterName.FREECPUS, cpuNames); 253 } 254 } 255 } 256 }*/ 257 return exec; 258 } 259 260 protected void submitTask(TaskInterface<?> task, AllocationInterface<?> allocation) { 254 261 255 262 String providerName = allocation.getProviderName(); … … 257 264 return; 258 265 } 259 TaskInterface<?> task = (TaskInterface<?>) job;260 266 removeFromQueue(task); 261 267 262 268 int resID = GridSim.getEntityId(providerName); 263 IO_data data = new IO_data( job, 0, resID);269 IO_data data = new IO_data(task, 0, resID); 264 270 scheduler.send(scheduler.getOutputPort(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, data); 265 271 266 272 //scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, job); 267 273 if(log.isDebugEnabled()) 268 log.debug("Submitted job " + job.getId() + " to " + providerName);274 log.debug("Submitted job " + task.getId() + " to " + providerName); 269 275 270 276 } … … 274 280 public void handleJob(Job job){ 275 281 276 jobRegistry.addJob(job);277 282 if (log.isInfoEnabled()) 278 283 log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis())); 279 284 280 scheduleReadyTasks(job); 285 jobRegistry.addJob(job); 286 scheduleAvaialbleTasks(job); 281 287 } 282 288 … … 289 295 } 290 296 291 public void handleSubmittedTask(SubmittedTask task) {292 throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler.");293 }294 297 } 295 298 … … 297 300 return new GlobalWorkloadUnitHandler(); 298 301 } 299 300 302 301 303 -
DCWoRMS/trunk/src/schedframe/scheduling/policy/global/GridBroker.java
r477 r481 19 19 public class GridBroker extends GlobalManagementSystem { 20 20 21 22 21 private static Log log = LogFactory.getLog(GridBroker.class); 23 22 … … 27 26 public GridBroker(String name, SchedulingPlugin schedulingPlugin, ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues) throws Exception { 28 27 super(name, "BROKER", schedulingPlugin, execTimeEstimationPlugin, queues); 29 30 //make use of plug-in interface 31 32 //Properties prop = new Properties(); 33 //prop.put("plugin.name", name); 34 //prop.put("plugin.utils.timeoperations", "gssim.scheduling.plugin.local.GssimTimeOperations"); 35 //schedulingPlugin.init(prop); 28 36 29 otherGridSchedulersIds = new HashSet<Integer>(); 37 38 30 moduleList = new ModuleListImpl(2); 39 //this.moduleList.add(new GridResourceDiscovery(this.getScheduler()));40 //moduleList.add(new GridReservationManagerNew(this));41 42 if(log.isDebugEnabled())43 log.debug(name + ": Creating a broker interface object");44 31 } 45 32 46 33 public void init(Scheduler scheduler, ManagedResources managedResources) { 47 34 super.init(scheduler, managedResources); 48 //this.scheduler = scheduler;49 //this.resourceManager = ResourceManagerFactory.createResourceManager(scheduler, managedResources);50 35 this.moduleList.add((GridResourceDiscovery)resourceManager); 51 36 } … … 57 42 } 58 43 return providerIds; 59 //return GridSim.getGridResourceList();60 44 } 61 45 -
DCWoRMS/trunk/src/schedframe/scheduling/policy/local/LocalManagementSystem.java
r480 r481 4 4 import eduni.simjava.Sim_system; 5 5 import gridsim.Accumulator; 6 import gridsim.GridSimTags;7 import gridsim.Gridlet;8 6 import gridsim.ResourceCalendar; 9 import gridsim.gssim. WormsTags;10 import gridsim.gssim.filter. SubTaskFilter;7 import gridsim.gssim.DCWormsTags; 8 import gridsim.gssim.filter.ExecTaskFilter; 11 9 import gssim.schedframe.scheduling.ExecTask; 12 10 import gssim.schedframe.scheduling.Executable; … … 27 25 import qcg.shared.constants.BrokerConstants; 28 26 import schedframe.ResourceController; 29 import schedframe.events.scheduling.EventReason;30 27 import schedframe.events.scheduling.SchedulingEvent; 31 28 import schedframe.events.scheduling.SchedulingEventType; 32 29 import schedframe.events.scheduling.StartTaskExecutionEvent; 33 import schedframe.events.scheduling.TaskCanceledEvent;34 30 import schedframe.events.scheduling.TaskFinishedEvent; 35 31 import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent; … … 45 41 import schedframe.scheduling.ResourceHistoryItem; 46 42 import schedframe.scheduling.Scheduler; 43 import schedframe.scheduling.TaskListImpl; 47 44 import schedframe.scheduling.UsedResourceList; 48 45 import schedframe.scheduling.WorkloadUnitHandler; 49 import schedframe.scheduling.WorkloadUnitListImpl;50 46 import schedframe.scheduling.manager.resources.LocalResourceManager; 51 47 import schedframe.scheduling.manager.resources.ManagedResources; … … 63 59 import schedframe.scheduling.tasks.Job; 64 60 import schedframe.scheduling.tasks.JobInterface; 65 import schedframe.scheduling.tasks.SubmittedTask;66 61 import schedframe.scheduling.tasks.Task; 67 62 import schedframe.scheduling.tasks.TaskInterface; 68 63 import schedframe.scheduling.tasks.WorkloadUnit; 69 import simulator. WormsConstants;64 import simulator.DCWormsConstants; 70 65 import simulator.utils.DoubleMath; 71 66 … … 112 107 switch (tag) { 113 108 114 case WormsTags.TIMER:109 case DCWormsTags.TIMER: 115 110 if (pluginSupportsEvent(tag)) { 116 111 SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER); 117 SchedulingPlanInterface decision = schedulingPlugin.schedule(event,112 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 118 113 queues, getJobRegistry(), getResourceManager(), moduleList); 119 114 executeSchedulingPlan(decision); … … 123 118 break; 124 119 125 case WormsTags.TASK_READY_FOR_EXECUTION:120 case DCWormsTags.TASK_READY_FOR_EXECUTION: 126 121 127 122 ExecTask data = (ExecTask) ev.get_data(); 128 123 try { 129 data.setStatus( Gridlet.READY);124 data.setStatus(DCWormsTags.READY); 130 125 if (pluginSupportsEvent(tag)) { 131 126 SchedulingEvent event = new StartTaskExecutionEvent(data.getJobId(), data.getId()); 132 SchedulingPlanInterface decision = schedulingPlugin.schedule(event,127 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 133 128 queues, getJobRegistry(), getResourceManager(), moduleList); 134 129 executeSchedulingPlan(decision); … … 139 134 break; 140 135 141 case WormsTags.TASK_EXECUTION_FINISHED:136 case DCWormsTags.TASK_EXECUTION_FINISHED: 142 137 obj = ev.get_data(); 143 ExecTask task= (ExecTask) obj;144 if ( task.getStatus() == Gridlet.INEXEC) {145 finalizeExecutable( task);146 SubmittedTask subTask = (SubmittedTask)task; 147 sendFinishedWorkloadUnit( (ExecTask)subTask.getGridlet());138 ExecTask exec = (ExecTask) obj; 139 if (exec.getStatus() == DCWormsTags.INEXEC) { 140 finalizeExecutable(exec); 141 142 sendFinishedWorkloadUnit(exec); 148 143 //task.setGridletStatus(Gridlet.SUCCESS); 149 144 //task.finalizeGridlet(); 150 log.debug( task.getJobId() + "_" + task.getId() + " finished execution on " + new DateTime());151 log.info( WormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));145 log.debug(exec.getJobId() + "_" + exec.getId() + " finished execution on " + new DateTime()); 146 log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size())); 152 147 /*UsedResourceList<ResourceHistoryItem> lastUsedList = task.getUsedResources(); 153 148 Map<ResourceUnitName, AbstractResourceUnit> lastUsed = lastUsedList.getLast() … … 163 158 } 164 159 if (pluginSupportsEvent(tag)) { 165 SchedulingEvent event = new TaskFinishedEvent( task.getJobId(), task.getId());166 SchedulingPlanInterface decision = schedulingPlugin.schedule(event,160 SchedulingEvent event = new TaskFinishedEvent(exec.getJobId(), exec.getId()); 161 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 167 162 queues, getJobRegistry(), getResourceManager(), moduleList); 168 163 executeSchedulingPlan(decision); 169 164 } 170 Job job = jobRegistry.getJob( task.getJobId());165 Job job = jobRegistry.getJob(exec.getJobId()); 171 166 if(!job.isFinished()){ 172 167 getWorkloadUnitHandler().handleJob(job); … … 174 169 175 170 break; 176 case WormsTags.TASK_REQUESTED_TIME_EXPIRED:171 case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED: 177 172 obj = ev.get_data(); 178 task = (SubmittedTask) obj;173 exec = (Executable) obj; 179 174 if (pluginSupportsEvent(tag)) { 180 SchedulingEvent event = new TaskRequestedTimeExpiredEvent( task.getJobId(), task.getId());181 SchedulingPlanInterface decision = schedulingPlugin.schedule(event,175 SchedulingEvent event = new TaskRequestedTimeExpiredEvent(exec.getJobId(), exec.getId()); 176 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 182 177 queues, getJobRegistry(), getResourceManager(), moduleList); 183 178 executeSchedulingPlan(decision); … … 185 180 186 181 break; 187 case WormsTags.UPDATE:182 case DCWormsTags.UPDATE: 188 183 updateProcessingTimes(ev); 189 184 break; … … 193 188 194 189 public void notifyReturnedWorkloadUnit(WorkloadUnit wu) { 195 if (pluginSupportsEvent( WormsTags.TASK_EXECUTION_FINISHED)) {190 if (pluginSupportsEvent(DCWormsTags.TASK_EXECUTION_FINISHED)) { 196 191 SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED); 197 SchedulingPlanInterface decision = schedulingPlugin.schedule(event,192 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 198 193 queues, getJobRegistry(), getResourceManager(), moduleList); 199 194 executeSchedulingPlan(decision); … … 203 198 //} 204 199 } 205 206 public void notifyCanceledWorkloadUnit(WorkloadUnit job) { 207 208 if (!pluginSupportsEvent(GridSimTags.GRIDLET_CANCEL)) 209 return; 210 211 Executable executable = (Executable) job; 212 String jobID = executable.getJobId(); 213 214 SchedulingPlanInterface decision = null; 215 216 try { 217 executable.setStatus((int) BrokerConstants.JOB_STATUS_CANCELED); 218 219 TaskCanceledEvent event = new TaskCanceledEvent(executable.getJobId(), executable.getTaskId()); 220 event.setReason(EventReason.RESERVATION_EXCEEDED); 221 decision = schedulingPlugin 222 .schedule(event, queues, getJobRegistry(), getResourceManager(), moduleList); 223 224 if (decision == null) 225 return; 226 227 executeSchedulingPlan(decision); 228 229 } catch (Exception e) { 230 log.error("Exception during scheduling. " + e.getMessage()); 231 e.printStackTrace(); 232 } 233 } 234 235 protected void executeSchedulingPlan(SchedulingPlanInterface decision) { 236 237 ArrayList<ScheduledTaskInterface> taskSchedulingDecisions = decision.getTasks(); 200 201 protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) { 202 203 ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks(); 238 204 for (int i = 0; i < taskSchedulingDecisions.size(); i++) { 239 try { 240 ScheduledTaskInterface taskDecision = taskSchedulingDecisions.get(i); 241 242 // not scheduled again are returned to the user. 243 if (taskDecision.getStatus() == AllocationStatus.REJECTED) { 244 continue; 205 ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i); 206 207 if (taskDecision.getStatus() == AllocationStatus.REJECTED) { 208 continue; 209 } 210 211 ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations(); 212 213 TaskInterface<?> task = taskDecision.getTask(); 214 for (int j = 0; j < allocations.size(); j++) { 215 216 AllocationInterface<?> allocation = allocations.get(j); 217 if (allocation.isProcessing()) { 218 ExecTask exec = (ExecTask) task; 219 executeTask(exec, allocation.getRequestedResources()); 220 } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){ 221 allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName())); 222 submitTask(task, allocation); 223 } else { 224 ExecTask exec = (ExecTask) task; 225 executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), (ExecTask)task)); 245 226 } 246 247 ArrayList<AllocationInterface> allocations = taskDecision.getAllocations(); 248 249 WorkloadUnit task = taskDecision.getTask(); 250 for (int j = 0; j < allocations.size(); j++) { 251 252 AllocationInterface allocation = allocations.get(j); 253 if (allocation.isProcessing()) { 254 255 ExecTask exec = (ExecTask) task; 256 257 258 //Executable e = (Executable)task; 259 /*SubmittedTask submittedTask = jobRegistry.getSubmittedTask(e.getJobId(), e.getId()); 260 if(submittedTask == null) 261 { submittedTask = new SubmittedTask(e); 262 jobRegistry.addTask(submittedTask); 263 }*/ 264 265 /*e.visitResource(scheduler.get_name()); 266 Scheduler parentScheduler = scheduler.getParent(); 267 while (parentScheduler != null && !e.getVisitedResources().contains(parentScheduler.get_name())) { 268 e.visitResource(parentScheduler.get_name()); 269 parentScheduler = parentScheduler.getParent(); 270 }*/ 271 272 273 executeTask(exec, allocation.getRequestedResources()); 274 //} else if(GridSim.getEntityId(allocation.getProviderName()) != -1 || scheduler.getScheduler(allocation.getProviderName())!=null){ 275 } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){ 276 allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName())); 277 submitWorkloadUnit(task, allocation); 278 } else { 279 280 ExecTask exec = (ExecTask) task; 281 282 //Executable exec = jobRegistry.createExecutable(t, allocation); 283 //exec.setResourceParameter(scheduler.get_id(), 1); 284 /*e.visitResource(scheduler.get_name()); 285 Scheduler parentScheduler = scheduler.getParent(); 286 while (parentScheduler != null && !e.getVisitedResources().contains(parentScheduler.get_name())) { 287 e.visitResource(parentScheduler.get_name()); 288 parentScheduler = parentScheduler.getParent(); 289 }*/ 290 executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), (ExecTask)task)); 291 } 292 } 293 294 } catch (Exception e) { 295 e.printStackTrace(); 296 } 227 } 228 297 229 } 298 230 } 299 231 300 232 protected void executeTask(ExecTask task, Map<ResourceUnitName, ResourceUnit> choosenResources) { 301 // Executable exec = (Executable) task; 302 303 SubmittedTask submittedTask = (SubmittedTask)task; 233 234 Executable exec = (Executable)task; 304 235 boolean allocationStatus = getAllocationManager().allocateResources(choosenResources); 305 236 if(allocationStatus == false) 306 237 return; 307 238 removeFromQueue(task); 308 //SubmittedTask submittedTask = (SubmittedTask)task; 309 /* submittedTask = jobRegistry.getSubmittedTask(exec.getJobId(), exec.getId()); 310 if(submittedTask == null) 311 { submittedTask = new SubmittedTask(exec); 312 jobRegistry.addTask(submittedTask); 313 }*/ 314 double completionPercentage = (submittedTask.getLength() - submittedTask.getRemainingGridletLength())/submittedTask.getLength(); 239 //double completionPercentage = (submittedTask.getLength() - submittedTask.getRemainingGridletLength())/submittedTask.getLength(); 315 240 SchedulingEvent event = new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION); 316 241 int time = Double.valueOf( 317 execTimeEstimationPlugin.execTimeEstimation(event, task, choosenResources, completionPercentage)).intValue();242 execTimeEstimationPlugin.execTimeEstimation(event, task, choosenResources, exec.getCompletionPercentage())).intValue(); 318 243 log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime() 319 244 + " will finish after " + time); … … 322 247 return; 323 248 324 submittedTask.setEstimatedDuration(time);249 exec.setEstimatedDuration(time); 325 250 DateTime currentTime = new DateTime(); 326 251 ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime); 327 submittedTask.addUsedResources(resHistItem); 328 submittedTask.setFinishTime(currentTime.getMillis() / 1000); 329 330 jobRegistry.saveHistory(submittedTask, time, choosenResources); 331 332 scheduler.sendInternal(time, WormsTags.TASK_EXECUTION_FINISHED, 333 submittedTask); 252 exec.addUsedResources(resHistItem); 253 254 scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, 255 exec); 334 256 335 257 try { 336 long expectedDuration = submittedTask.getExpectedDuration().getMillis() / 1000;337 scheduler.sendInternal(expectedDuration, WormsTags.TASK_REQUESTED_TIME_EXPIRED, submittedTask);258 long expectedDuration = exec.getExpectedDuration().getMillis() / 1000; 259 scheduler.sendInternal(expectedDuration, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec); 338 260 } catch (NoSuchFieldException e) { 339 double t = submittedTask.getEstimatedDuration(); 340 scheduler.sendInternal(t, WormsTags.TASK_REQUESTED_TIME_EXPIRED, submittedTask); 341 } 342 343 submittedTask.setGridletStatus(Gridlet.INEXEC); 344 log.info(WormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size())); 261 double t = exec.getEstimatedDuration(); 262 scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec); 263 } 264 265 try { 266 exec.setStatus(DCWormsTags.INEXEC); 267 } catch (Exception e1) { 268 // TODO Auto-generated catch block 269 e1.printStackTrace(); 270 } 271 log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size())); 345 272 346 273 PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE); … … 348 275 ProcessingElements pes = (ProcessingElements) peUnit; 349 276 for (ComputingResource resource : pes) { 350 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, submittedTask));277 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 351 278 } 352 279 } else { … … 357 284 358 285 } 359 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, submittedTask));286 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 360 287 } 361 288 /*ProcessingElements pes = (ProcessingElements) choosenResources.get(StandardResourceUnitName.PE); … … 371 298 } 372 299 373 public void finalizeExecutable(ExecTask exec){ 374 375 SubmittedTask subTask = (SubmittedTask)exec; 376 subTask.setGridletStatus(Gridlet.SUCCESS); 377 subTask.finalizeGridlet(); 378 UsedResourceList<ResourceHistoryItem> lastUsedList = subTask.getUsedResources(); 300 public void finalizeExecutable(ExecTask execTask){ 301 302 Executable exec = (Executable)execTask; 303 try { 304 exec.setStatus(DCWormsTags.SUCCESS); 305 } catch (Exception e1) { 306 // TODO Auto-generated catch block 307 e1.printStackTrace(); 308 } 309 exec.finalizeExecutable(); 310 UsedResourceList<ResourceHistoryItem> lastUsedList = exec.getUsedResources(); 379 311 Map<ResourceUnitName, ResourceUnit> lastUsed = lastUsedList.getLast() 380 312 .getResourceUnits(); … … 385 317 ProcessingElements pes = (ProcessingElements) peUnit; 386 318 for (ComputingResource resource : pes) { 387 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, subTask));319 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 388 320 } 389 321 } else { … … 394 326 395 327 } 396 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, subTask));328 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 397 329 } 398 330 /*ProcessingElements pes = (ProcessingElements) lastUsed.get(StandardResourceUnitName.PE); … … 400 332 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, subTask)); 401 333 }*/ 402 SubTaskFilter filter = new SubTaskFilter(subTask.getGridletID(),WormsTags.TASK_REQUESTED_TIME_EXPIRED);334 ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED); 403 335 scheduler.sim_cancel(filter, null); 404 336 405 Executable executable = (Executable) subTask.getGridlet(); 406 Job job = jobRegistry.getJob(exec utable.getJobId());337 338 Job job = jobRegistry.getJob(exec.getJobId()); 407 339 408 340 Task task = null; 409 341 try { 410 task = job.getTask(exec utable.getTaskId());342 task = job.getTask(exec.getTaskId()); 411 343 } catch (NoSuchFieldException e) { 412 344 e.printStackTrace(); 413 345 } 414 if(exec utable.getProcessesId() == null){346 if(exec.getProcessesId() == null){ 415 347 try { 416 task.setStatus(exec utable.getStatus());348 task.setStatus(exec.getStatus()); 417 349 } catch (Exception e) { 418 350 e.printStackTrace(); … … 422 354 for(int i = 0; i < processesList.size(); i++){ 423 355 AbstractProcesses processes = processesList.get(i); 424 if(processes.getId().equals(exec utable.getProcessesId())){425 processes.setStatus(exec utable.getStatus());356 if(processes.getId().equals(exec.getProcessesId())){ 357 processes.setStatus(exec.getStatus()); 426 358 break; 427 359 } … … 441 373 while (iter.hasNext()) { 442 374 ExecTask task = iter.next(); 443 SubmittedTask subTask = (SubmittedTask)task;444 UsedResourceList<ResourceHistoryItem> usedResourcesList = subTask.getUsedResources();375 Executable exec = (Executable)task; 376 UsedResourceList<ResourceHistoryItem> usedResourcesList = exec.getUsedResources(); 445 377 ResourceUnit unit = usedResourcesList.getLast().getResourceUnits() 446 378 .get(StandardResourceUnitName.PE); 447 379 448 380 double load = getMIShare(timeSpan, (PEUnit) unit); 449 subTask.updateGridletFinishedSoFar(load);381 exec.setCompletionPercentage(100 * timeSpan/exec.getEstimatedDuration()); 450 382 addTotalLoad(load); 451 383 } … … 471 403 updateProcessingProgress(); 472 404 for (ExecTask task : jobRegistry.getRunningTasks()) { 473 SubmittedTask subTask = (SubmittedTask)task;474 List<String> visitedResource = subTask.getVisitedResources();405 Executable exec = (Executable)task; 406 List<String> visitedResource = exec.getVisitedResources(); 475 407 String originResource = ev.get_data().toString(); 476 408 if(!ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), originResource)){ … … 478 410 } 479 411 480 Map<ResourceUnitName, ResourceUnit> choosenResources = subTask.getUsedResources().getLast().getResourceUnits();481 double completionPercentage = (task.getLength() - subTask.getRemainingGridletLength())/task.getLength();412 Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getUsedResources().getLast().getResourceUnits(); 413 //double completionPercentage = (task.getLength() - subTask.getRemainingGridletLength())/task.getLength(); 482 414 double time = execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED), 483 task, choosenResources, completionPercentage);415 task, choosenResources, exec.getCompletionPercentage()); 484 416 485 417 /*if(!subTask.getVisitedResources().contains(ev.get_data().toString())) { … … 487 419 }*/ 488 420 //check if the new estimated end time is equal to the previous one; if yes the continue without update 489 if( DoubleMath.subtract(( subTask.getExecStartTime() + subTask.getEstimatedDuration()), (new DateTime().getMillis()/1000 + time)) == 0.0){421 if( DoubleMath.subtract((exec.getExecStartTime() + exec.getEstimatedDuration()), (new DateTime().getMillis()/1000 + time)) == 0.0){ 490 422 continue; 491 423 } 492 SubTaskFilter filter = new SubTaskFilter(subTask.getGridletID(),WormsTags.TASK_EXECUTION_FINISHED);424 ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED); 493 425 scheduler.sim_cancel(filter, null); 494 scheduler.sendInternal(time, WormsTags.TASK_EXECUTION_FINISHED, task);426 scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, task); 495 427 496 428 } … … 577 509 } 578 510 579 /*try {580 List<? extends ComputingResource> processingElements = resourceManager.getResourcesOfType(StandardResourceType.Processor);581 List<ComputingResource> choosenResources = new ArrayList<ComputingResource>();582 int peSize = processingElements.size();583 for (int i = 0; i < peSize && cpuRequest > 0; i++) {584 if (processingElements.get(i).getStatus() == ResourceStatus.FREE) {585 choosenResources.add(processingElements.get(i));586 cpuRequest--;587 }588 }589 if (cpuRequest > 0)590 {591 return null;592 }593 processingUnits = new ProcessingElements(choosenResources);594 } catch (Exception e) {595 596 List<ResourceUnit> procResUnit = resourceManager.getDistributedResourceUnits(StandardResourceUnitName.PE);597 598 for(ResourceUnit resUnit: procResUnit){599 if (resUnit.getFreeAmount() >= cpuRequest)600 {601 processingUnits = new PEUnit(resUnit.getResourceId(), cpuRequest, cpuRequest);602 break;603 }604 }605 }*/606 511 map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0)); 607 512 } 608 /*int memoryRequest; 609 try { 610 memoryRequest = Double.valueOf(task.getMemoryRequest()).intValue(); 611 } catch (NoSuchFieldException e) { 612 memoryRequest = 0; 613 } 614 if (memoryRequest != 0) { 615 List<ResourceUnit> resUnit = resourceManager.getSharedResourceUnits().get(StandardResourceUnitName.MEMORY); 616 617 Memory memory = null; 618 for (ResourceUnit memUnit : resUnit) { 619 Memory m = (Memory) memUnit; 620 621 if (m.getFreeAmount() >= memoryRequest) { 622 System.out.println(m.getResourceId()+ ";"+m.getAmount()+";"+m.getFreeAmount()); 623 memory = new Memory(m, memoryRequest, memoryRequest); 624 } 625 } 626 if(memory == null) 627 return null; 628 map.put(StandardResourceUnitName.MEMORY, memory); 629 }*/ 513 630 514 return map; 631 515 } 632 633 634 635 public void notifySubmittedWorkloadUnit(WorkloadUnit job, boolean ack) { 516 517 public void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack) { 636 518 updateProcessingProgress(); 637 registerWorkloadUnit( job);519 registerWorkloadUnit(wu); 638 520 } 639 521 … … 654 536 List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>(); 655 537 jobsList.add(job); 656 WorkloadUnitListImpl readyTasks = new WorkloadUnitListImpl();657 for(Task task: jobRegistry.get ReadyTasks(jobsList)){538 TaskListImpl readyTasks = new TaskListImpl(); 539 for(Task task: jobRegistry.getAvailableTasks(jobsList)){ 658 540 task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED); 659 541 readyTasks.add(task); … … 667 549 public void handleTask(TaskInterface<?> ti){ 668 550 Task task = (Task)ti; 669 670 if(task.getProcesses() == null){ 551 List<AbstractProcesses> processes = task.getProcesses(); 552 553 if(processes == null || processes.size() == 0){ 671 554 Executable exec = new Executable(task); 672 exec.setUserID(task.getSenderId());673 exec.setLength(task.getLength());674 555 registerWorkloadUnit(exec); 675 556 } else { 676 List<AbstractProcesses> processesList = task.getProcesses(); 677 for(int i = 0; i < processesList.size(); i++){ 678 AbstractProcesses processes = processesList.get(i); 679 Executable exec = new Executable(task, processes); 680 exec.setUserID(task.getSenderId()); 681 exec.setLength(task.getLength()); 557 for(int j = 0; j < processes.size(); j++){ 558 AbstractProcesses procesesSet = processes.get(j); 559 Executable exec = new Executable(task, procesesSet); 682 560 registerWorkloadUnit(exec); 683 561 } … … 701 579 } 702 580 703 exec.setResourceParameter(scheduler.get_id(), 1); 704 SubmittedTask subTask = new SubmittedTask(exec); 705 jobRegistry.addTask(subTask); 706 WorkloadUnitListImpl newTasks = new WorkloadUnitListImpl(); 707 newTasks.add(subTask); 708 709 schedulingPlugin.placeJobsInQueues(newTasks, queues, getResourceManager(), moduleList); 710 711 if (subTask.getStatus() == Gridlet.QUEUED) { 581 exec.setSchedulerName(scheduler.get_id()); 582 jobRegistry.addExecTask(exec); 583 TaskListImpl newTasks = new TaskListImpl(); 584 newTasks.add(exec); 585 586 schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList); 587 588 if (exec.getStatus() == DCWormsTags.QUEUED) { 712 589 sendExecutableReadyEvent(exec); 713 }714 }715 716 public void handleSubmittedTask(SubmittedTask task){717 718 task.visitResource(scheduler.get_name());719 Scheduler parentScheduler = scheduler.getParent();720 while (parentScheduler != null && !task.getVisitedResources().contains(parentScheduler.get_name())) {721 task.visitResource(parentScheduler.get_name());722 parentScheduler = parentScheduler.getParent();723 }724 725 jobRegistry.addTask(task);726 WorkloadUnitListImpl newTasks = new WorkloadUnitListImpl();727 newTasks.add(task);728 729 schedulingPlugin.placeJobsInQueues(newTasks, queues, getResourceManager(), moduleList);730 731 if (task.getStatus() == Gridlet.QUEUED) {732 sendExecutableReadyEvent(task);733 590 } 734 591 }
Note: See TracChangeset
for help on using the changeset viewer.