Changeset 481 for DCWoRMS/trunk/src/schedframe/scheduling/manager
- Timestamp:
- 10/08/12 10:23:45 (13 years ago)
- Location:
- DCWoRMS/trunk/src/schedframe/scheduling/manager/tasks
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
DCWoRMS/trunk/src/schedframe/scheduling/manager/tasks/AbstractJobRegistry.java
r480 r481 13 13 14 14 15 public abstract class AbstractJobRegistry /*extends ConcurrentHashMap<String, Job> */implements JobRegistry, Cloneable{15 public abstract class AbstractJobRegistry /*extends ConcurrentHashMap<String, Job>*/ implements JobRegistry, Cloneable{ 16 16 17 17 private static final long serialVersionUID = 8409060063583755824L; 18 18 19 19 private static Log log = LogFactory.getLog(AbstractJobRegistry.class); 20 21 protected static final ConcurrentHashMap<String, Job> jobs = new ConcurrentHashMap<String, Job>(); 20 protected static final ConcurrentHashMap<String, JobInterface<?>> jobs = new ConcurrentHashMap<String, JobInterface<?>>(); 22 21 23 22 protected AbstractJobRegistry(){ 24 //log.warn("Methods from JobRegistry interface are not implemented.");25 23 } 26 24 27 25 public boolean addJob(JobInterface<?> job) { 28 jobs.put(job.getId(), (Job)job);26 jobs.put(job.getId(), job); 29 27 return true; 30 28 } … … 32 30 public boolean addTask(TaskInterface<?> task) { 33 31 if(jobs.containsKey(task.getJobId())){ 34 jobs.get(task.getJobId()).add((Task)task);32 getJob(task.getJobId()).add((Task)task); 35 33 return true; 36 34 } else { … … 39 37 } 40 38 41 public Job getJob(String jobId){39 public JobInterface<?> getJobInfo(String jobId) { 42 40 return jobs.get(jobId); 43 41 } 44 45 public JobInterface<?> getJobInfo(String jobID) {46 return jobs.get(jobID);47 }48 42 49 public TaskInterface<?> getTaskInfo(String jobI D, String taskId) {43 public TaskInterface<?> getTaskInfo(String jobId, String taskId) { 50 44 Task task = null; 51 Job job = jobs.get(jobID);45 Job job = getJob(jobId); 52 46 53 47 if(job == null) … … 57 51 task = job.getTask(taskId); 58 52 } catch (NoSuchFieldException e) { 59 log.error(e.getMessage());60 53 } 61 54 return task; 62 55 } 63 56 64 /*public List<JobInterface<?>> getActiveJobs() { 65 log.error("getActiveJobs() not implemented."); 66 return null; 57 public Job getJob(String jobId){ 58 return (Job)jobs.get(jobId); 67 59 } 68 60 69 public List<TaskInterface<?>> getActiveTasks() {70 log.error("getActiveTasks() not implemented.");71 return null;72 }*/73 74 61 } -
DCWoRMS/trunk/src/schedframe/scheduling/manager/tasks/JobRegistry.java
r477 r481 5 5 import java.util.List; 6 6 7 import schedframe.ExecutablesList; 7 8 import schedframe.scheduling.tasks.JobInterface; 8 9 import schedframe.scheduling.tasks.TaskInterface; … … 11 12 public interface JobRegistry { 12 13 13 //public List<JobInterface<?>> getActiveJobs();14 15 //public List<TaskInterface<?>> getActiveTasks();16 17 14 public JobInterface<?> getJobInfo(String jobId); 18 15 … … 31 28 32 29 33 //public List<SubmittedTask> getSubmittedTasks();30 public ExecutablesList getExecutableTasks(); 34 31 35 public ExecTask get SubmittedTask(String jobId, String taskId);32 public ExecTask getExecutable(String jobId, String taskId); 36 33 37 34 38 public List<? extends TaskInterface<?>> get ReadyTasks(List<JobInterface<?>> jobsList);35 public List<? extends TaskInterface<?>> getAvailableTasks(List<JobInterface<?>> jobsList); 39 36 40 37 } -
DCWoRMS/trunk/src/schedframe/scheduling/manager/tasks/JobRegistryImpl.java
r477 r481 1 1 package schedframe.scheduling.manager.tasks; 2 2 3 import gridsim. Gridlet;3 import gridsim.gssim.DCWormsTags; 4 4 import gssim.schedframe.scheduling.ExecTask; 5 import gssim.schedframe.scheduling.Executable;6 5 7 6 import java.util.ArrayList; 8 import java.util.Collections;9 import java.util.HashMap;10 7 import java.util.List; 11 import java.util.Map;12 8 13 9 import org.apache.commons.lang.ArrayUtils; 14 10 import org.apache.commons.logging.Log; 15 11 import org.apache.commons.logging.LogFactory; 16 import org.joda.time.DateTime;17 12 import org.qcg.broker.schemas.resreqs.ParentType; 18 13 import org.qcg.broker.schemas.resreqs.types.TaskStatesName; 19 14 20 15 import qcg.shared.constants.BrokerConstants; 21 import schedframe.resources.units.ResourceUnit; 22 import schedframe.resources.units.ResourceUnitName; 23 import schedframe.scheduling.ResourceHistoryItem; 24 import schedframe.scheduling.plan.AllocationInterface; 25 import schedframe.scheduling.tasks.AbstractProcesses; 16 import schedframe.ExecutablesList; 26 17 import schedframe.scheduling.tasks.JobInterface; 27 import schedframe.scheduling.tasks.SubmittedTask;28 18 import schedframe.scheduling.tasks.Task; 29 import simulator.WormsConstants;30 19 31 20 public class JobRegistryImpl extends AbstractJobRegistry { … … 37 26 private String context; 38 27 39 //TO DO - change data structure 40 protected static final List<ExecTask> submittedTasks = Collections.synchronizedList(new ArrayList<ExecTask>());; 41 //protected static final List<ExecTaskInterface> submittedTasks = new CopyOnWriteArrayList<ExecTaskInterface>(); 28 //TO DO - consider data structure 29 protected static final ExecutablesList executables = new ExecutablesList(); 30 //protected static final List<ExecTask> executables = Collections.synchronizedList(new ArrayList<ExecTask>());; 31 //protected static final List<ExecTaskInterface> executables = new CopyOnWriteArrayList<ExecTaskInterface>(); 42 32 43 public JobRegistryImpl(String context _) {44 context = context_;33 public JobRegistryImpl(String context) { 34 this.context = context; 45 35 } 46 36 47 /*protected void setContext(String context_) { 48 context = context_; 49 }*/ 50 51 public boolean addTask(ExecTask newTask) { 52 if(getSubmittedTask(newTask.getJobId(), newTask.getId()) == null) 53 { 54 synchronized (submittedTasks) { 55 submittedTasks.add(newTask); 37 public boolean addExecTask(ExecTask newTask) { 38 if(getExecutable(newTask.getJobId(), newTask.getId()) == null) { 39 synchronized (executables) { 40 executables.add(newTask); 56 41 } 57 42 return true; … … 60 45 } 61 46 47 public ExecutablesList getExecutableTasks() { 48 return executables; 49 } 62 50 public List<ExecTask> getTasks(int status) { 63 51 List<ExecTask> taskList = new ArrayList<ExecTask>(); 64 synchronized ( submittedTasks) {65 for (ExecTask task: submittedTasks) {52 synchronized (executables) { 53 for (ExecTask task: executables) { 66 54 if (task.getStatus() == status) { 67 //SubmittedTask subTask = (SubmittedTask) task;68 55 List<String> visitedResource = task.getVisitedResources(); 69 if(ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), context)) {56 if(ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), context)) { 70 57 taskList.add(task); 71 58 } 72 /*if(subTask.getVisitedResources().contains(context)){73 taskList.add(subTask);74 }*/75 59 } 76 60 } … … 80 64 81 65 public List<ExecTask> getQueuedTasks() { 82 return getTasks( Gridlet.QUEUED);66 return getTasks(DCWormsTags.QUEUED); 83 67 } 84 68 85 69 public List<ExecTask> getRunningTasks() { 86 return getTasks( Gridlet.INEXEC);70 return getTasks(DCWormsTags.INEXEC); 87 71 } 88 72 89 73 public List<ExecTask> getReadyTasks() { 90 return getTasks( Gridlet.READY);74 return getTasks(DCWormsTags.READY); 91 75 } 92 76 93 77 public List<ExecTask> getFinishedTasks() { 94 return getTasks( Gridlet.SUCCESS);78 return getTasks(DCWormsTags.SUCCESS); 95 79 } 96 80 97 98 public List<ExecTask> getAllSubmittedTasks() { 99 List<ExecTask> taskList; 100 synchronized (submittedTasks) { 101 taskList = new ArrayList<ExecTask>(submittedTasks); 102 } 103 return taskList; 104 } 105 106 public List<SubmittedTask> getSubmittedTasks() { 107 List<SubmittedTask> taskList = new ArrayList<SubmittedTask>(); 108 synchronized (submittedTasks) { 109 for (ExecTask task : submittedTasks) { 110 SubmittedTask subTask = (SubmittedTask) task; 111 List<String> visitedResource = subTask.getVisitedResources(); 112 if(ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), context)){ 113 taskList.add(subTask); 114 } 115 /*if(subTask.getVisitedResources().contains(context)){ 116 taskList.add(subTask); 117 }*/ 118 } 119 } 120 return taskList; 121 } 122 123 public ExecTask getSubmittedTask(String jobId, String taskId){ 124 synchronized (submittedTasks) { 125 for (ExecTask task : submittedTasks) { 81 public ExecTask getExecutable(String jobId, String taskId){ 82 synchronized (executables) { 83 for (ExecTask task : executables) { 126 84 if (task.getJobId().compareTo(jobId) == 0 && task.getId().compareTo(taskId)==0) { 127 85 return task; … … 131 89 return null; 132 90 } 133 134 91 135 92 @SuppressWarnings("unchecked") 136 public List<Task> get ReadyTasks(List<JobInterface<?>> wuList) {137 List<Task> readyTasks = new ArrayList<Task>();93 public List<Task> getAvailableTasks(List<JobInterface<?>> wuList) { 94 List<Task> availableTasks = new ArrayList<Task>(); 138 95 List<Task> waitingTasks = new ArrayList<Task>(); 139 96 … … 143 100 } 144 101 145 readyTasks.addAll(getPrecedenceConstrainedReadyTasks(waitingTasks)); 146 return readyTasks; 147 } 148 149 public Executable getTaskExecutable(Integer executableId){ 150 synchronized (submittedTasks) { 151 for (ExecTask task : submittedTasks) { 152 SubmittedTask subTask = (SubmittedTask) task; 153 Executable exec = (Executable)subTask.getGridlet(); 154 if (exec.getGridletID() == executableId) { 155 return exec; 156 } 157 } 158 } 159 return null; 160 } 161 162 public List<Executable> getJobExecutables(String jobId){ 163 List<Executable> list = new ArrayList<Executable>(); 164 synchronized (submittedTasks) { 165 for(int i = 0; i < submittedTasks.size(); i++){ 166 SubmittedTask subTask = (SubmittedTask) submittedTasks.get(i); 167 Executable exec = (Executable)subTask.getGridlet(); 168 169 if(exec.getJobId().equals(jobId)) 170 list.add(exec); 171 } 172 } 173 return list; 174 } 175 176 public JobRegistryImpl clone() { 177 JobRegistryImpl jr = null; 178 try { 179 jr = (JobRegistryImpl) super.clone(); 180 } catch (CloneNotSupportedException e) { 181 // TODO Auto-generated catch block 182 e.printStackTrace(); 183 } 184 185 return jr; 186 } 187 188 /*public AbstractExecutable getTaskExecutabls(String jobId, String taskId){ 189 List<AbstractExecutable> list = new ArrayList<AbstractExecutable>(); 190 synchronized (submittedTasks) { 191 for(int i = 0; i < size(); i++){ 192 SubmittedTask subTask = (SubmittedTask) submittedTasks.get(i); 193 AbstractExecutable exec = (AbstractExecutable)subTask.getGridlet(); 194 195 if(exec.getJobId().equals(jobId) && exec.getId().equals(taskId)) 196 return exec; 197 } 198 } 199 return null; 200 }*/ 201 202 203 public Executable createExecutable(Task task, AllocationInterface allocation) { 204 205 String refersTo = allocation.getProcessGroupId(); // null;//allocation.getRefersTo(); 206 if(refersTo == null) 207 refersTo = task.getId(); 208 209 Executable exec = null; 210 211 if(refersTo.equals(task.getId())){ 212 exec = new Executable(task); 213 } else { 214 List<AbstractProcesses> processes = task.getProcesses(); 215 if(processes == null) { 216 try { 217 log.error("Allocation: " + allocation.getDocument() + "\nrefers to unknown task or processes set." + 218 " Set correct value (task id or prcesses set id) for allocation refersTo attribute."); 219 } catch (Exception e) { 220 e.printStackTrace(); 221 } 222 } 223 boolean found = false; 224 for(int j = 0; j < processes.size() && !found; j++){ 225 AbstractProcesses procesesSet = processes.get(j); 226 if(refersTo.equals(procesesSet.getId())){ 227 exec = new Executable(task, procesesSet); 228 found = true; 229 } 230 } 231 if(!found){ 232 log.error("Allocation refers to unknown proceses set."); 233 } 234 } 235 236 // exec.setUserID(task.getSenderId()); 237 exec.setLength(task.getLength()); 238 exec.setReservationId(allocation.getReservationId()); 239 240 /*HostInterface<?> host = allocation.getHost(); 241 ComputingResourceTypeInterface<?> crt = host.getMachineParameters(); 242 if(crt != null){ 243 ComputingResourceTypeItemInterface<?> crti = crt.getComputingResourceTypeItem(0); 244 if(crti != null){ 245 ParameterPropertyInterface<?> properties[] = crti.getHostParameter().getProperty(); 246 for(int p = 0; p < properties.length; p++){ 247 ParameterPropertyInterface<?> property = properties[p]; 248 if("chosenCPUs".equals(property.getName())){ 249 Object cpuNames = property.getValue(); 250 exec.addSpecificResource(ResourceParameterName.FREECPUS, cpuNames); 251 } 252 } 253 } 254 }*/ 255 return exec; 256 } 257 258 259 public List<Executable> createExecutables(Task task) { 260 261 List<AbstractProcesses> processes = task.getProcesses(); 262 263 List<Executable> executables = new ArrayList<Executable>(); 264 265 if(processes == null || processes.size()==0){ 266 Executable exec = new Executable(task); 267 exec.setUserID(task.getSenderId()); 268 exec.setLength(task.getLength()); 269 executables.add(exec); 270 } else { 271 272 boolean found = false; 273 for(int j = 0; j < processes.size() && !found; j++){ 274 AbstractProcesses procesesSet = processes.get(j); 275 Executable exec = new Executable(task, procesesSet); 276 exec.setUserID(task.getSenderId()); 277 exec.setLength(task.getLength()); 278 executables.add(exec); 279 } 280 } 281 282 return executables; 102 availableTasks.addAll(getPrecedenceConstrainedAvailableTasks(waitingTasks)); 103 return availableTasks; 283 104 } 284 105 285 106 286 /**************************************/ 287 protected static Map<Integer, Map<String, Object>> history = new HashMap<Integer, Map<String,Object>>(); 288 289 public static Map<Integer, Map<String, Object>> getAllocationHistory(){ 290 return history; 291 } 292 293 public void saveHistory (SubmittedTask submittedTask, int estimatedTime, Map<ResourceUnitName, ResourceUnit> choosenResources){ 107 private List<Task> getPrecedenceConstrainedAvailableTasks(List<Task> tasks){ 294 108 295 /*submittedTask.setEstimatedDuration(estimatedTime); 296 297 DateTime currentTime = new DateTime(); 298 ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime); 299 submittedTask.addUsedResources(resHistItem);*/ 300 301 ResourceHistoryItem resHistItem = submittedTask.getUsedResources().getLast(); 302 DateTime currentTime = new DateTime(); 303 Map<String, Object> historyItem = new HashMap<String, Object>(); 304 List<ResourceHistoryItem> list = new ArrayList<ResourceHistoryItem>(1); 305 list.add(resHistItem); 306 historyItem.put(WormsConstants.RESOURCES, list); 307 historyItem.put(WormsConstants.START_TIME, currentTime); 308 currentTime = currentTime.plusSeconds(estimatedTime); 309 historyItem.put(WormsConstants.END_TIME, currentTime); 310 311 history.put(Integer.valueOf(submittedTask.getGridletID()), historyItem); 312 /*ProcessingElements pes = (ProcessingElements) choosenResources.get(ResourceParameterName.PROCESSINGELEMENTS); 313 for (ComputingResource resource : pes) { 314 //submittedTask.addToResPath(resource.getName()); 315 submittedTask.visitResource(resource.getName()); 316 ComputingResource parent = resource.getParent(); 317 while (parent != null && !submittedTask.getResPath().contains(parent.getName() + "_")) { 318 submittedTask.addToResPath(parent.getName()); 319 parent = parent.getParent(); 320 } 321 while (parent != null && !submittedTask.getVisitedResources().contains(parent.getName() + "_")) { 322 submittedTask.visitResource(parent.getName()); 323 parent = parent.getParent(); 324 } 325 }*/ 326 } 327 328 private List<Task> getPrecedenceConstrainedReadyTasks(List<Task> tasks){ 109 List<Task> availableTasks = new ArrayList<Task>(); 110 int size = tasks.size(); 329 111 330 List<Task> readyTasks = new ArrayList<Task>();331 332 int size = tasks.size();333 112 for(int i = 0; i < size; i++){ 334 113 int parCnt; 335 int previousTask ReadyCnt = 0;114 int previousTaskSucceedCnt = 0; 336 115 Task task = tasks.get(i); 337 116 if(task.getStatus() != (int)BrokerConstants.TASK_STATUS_UNSUBMITTED) … … 341 120 } catch(Exception e){ 342 121 parCnt = 0; 343 //e.printStackTrace();344 122 } 345 if(parCnt == 0) 346 { 347 readyTasks.add(task); 123 if(parCnt == 0){ 124 availableTasks.add(task); 348 125 } 349 else 350 { 126 else { 351 127 for(int j = 0; j < parCnt; j++){ 352 128 ParentType par = task.getDescription().getWorkflow().getParent(j); … … 356 132 } 357 133 } 358 previousTask ReadyCnt++;134 previousTaskSucceedCnt++; 359 135 } 360 136 361 if(previousTask ReadyCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null)362 readyTasks.add(task);363 else if(previousTask ReadyCnt > 0 && task.getDescription().getWorkflow().getOr() != null)364 readyTasks.add(task);365 else if (previousTask ReadyCnt == parCnt)366 readyTasks.add(task);137 if(previousTaskSucceedCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null) 138 availableTasks.add(task); 139 else if(previousTaskSucceedCnt > 0 && task.getDescription().getWorkflow().getOr() != null) 140 availableTasks.add(task); 141 else if (previousTaskSucceedCnt == parCnt) 142 availableTasks.add(task); 367 143 } 368 144 } 369 return readyTasks;145 return availableTasks; 370 146 } 371 147 … … 380 156 return false; 381 157 } 158 382 159 }
Note: See TracChangeset
for help on using the changeset viewer.