- Timestamp:
- 10/31/12 13:52:06 (12 years ago)
- Location:
- DCWoRMS/trunk/build/classes/schedframe/scheduling/manager/tasks
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
DCWoRMS/trunk/build/classes/schedframe/scheduling/manager/tasks/AbstractJobRegistry.java
r477 r539 2 2 3 3 4 import java.util.Map;5 4 import java.util.concurrent.ConcurrentHashMap; 6 5 … … 14 13 15 14 16 public abstract class AbstractJobRegistry /*extends ConcurrentHashMap<String, Job> */implements JobRegistry, Cloneable{15 public abstract class AbstractJobRegistry /*extends ConcurrentHashMap<String, Job>*/ implements JobRegistry, Cloneable{ 17 16 18 17 private static final long serialVersionUID = 8409060063583755824L; 18 19 19 20 private static Log log = LogFactory.getLog(AbstractJobRegistry.class); 21 22 protected static final ConcurrentHashMap<String, Job> jobs = new ConcurrentHashMap<String, Job>(); 20 protected static final ConcurrentHashMap<String, JobInterface<?>> jobs = new ConcurrentHashMap<String, JobInterface<?>>(); 23 21 24 22 protected AbstractJobRegistry(){ 25 //log.warn("Methods from JobRegistry interface are not implemented.");26 23 } 27 24 28 25 public boolean addJob(JobInterface<?> job) { 29 try { 30 jobs.put(job.getId(), (Job) job); 31 } catch (NoSuchFieldException e) { 32 log.error(e.getMessage()); 33 return false; 34 } 26 jobs.put(job.getId(), job); 35 27 return true; 36 28 } … … 38 30 public boolean addTask(TaskInterface<?> task) { 39 31 if(jobs.containsKey(task.getJobId())){ 40 jobs.get(task.getJobId()).add((Task)task);32 getJob(task.getJobId()).add((Task)task); 41 33 return true; 42 34 } else { … … 45 37 } 46 38 47 public Job get(String jobId){39 public JobInterface<?> getJobInfo(String jobId) { 48 40 return jobs.get(jobId); 49 41 } 50 51 public JobInterface<?> getJobInfo(String jobID) {52 return jobs.get(jobID);53 }54 42 55 public TaskInterface<?> getTaskInfo(String jobI D, String taskId) {43 public TaskInterface<?> getTaskInfo(String jobId, String taskId) { 56 44 Task task = null; 57 Job job = jobs.get(jobID);45 Job job = getJob(jobId); 58 46 59 47 if(job == null) … … 63 51 task = job.getTask(taskId); 64 52 } catch (NoSuchFieldException e) { 65 log.error(e.getMessage());66 53 } 67 54 return task; 68 55 } 69 56 70 /*public List<JobInterface<?>> getActiveJobs() { 71 log.error("getActiveJobs() not implemented."); 72 return null; 57 public Job getJob(String jobId){ 58 return (Job)jobs.get(jobId); 73 59 } 74 60 75 public List<TaskInterface<?>> getActiveTasks() {76 log.error("getActiveTasks() not implemented.");77 return null;78 }*/79 80 81 82 83 61 } -
DCWoRMS/trunk/build/classes/schedframe/scheduling/manager/tasks/JobRegistry.java
r477 r539 1 1 package schedframe.scheduling.manager.tasks; 2 2 3 import gssim.schedframe.scheduling.ExecTask;4 3 5 4 import java.util.List; 6 5 6 import dcworms.schedframe.scheduling.ExecTask; 7 8 import schedframe.ExecutablesList; 7 9 import schedframe.scheduling.tasks.JobInterface; 8 10 import schedframe.scheduling.tasks.TaskInterface; … … 11 13 public interface JobRegistry { 12 14 13 //public List<JobInterface<?>> getActiveJobs();14 15 //public List<TaskInterface<?>> getActiveTasks();16 17 15 public JobInterface<?> getJobInfo(String jobId); 18 16 … … 31 29 32 30 33 //public List<SubmittedTask> getSubmittedTasks();31 public ExecutablesList getExecutableTasks(); 34 32 35 public ExecTask get SubmittedTask(String jobId, String taskId);33 public ExecTask getExecutable(String jobId, String taskId); 36 34 37 35 38 public List<? extends TaskInterface<?>> get ReadyTasks(List<JobInterface<?>> jobsList);36 public List<? extends TaskInterface<?>> getAvailableTasks(List<JobInterface<?>> jobsList); 39 37 40 38 } -
DCWoRMS/trunk/build/classes/schedframe/scheduling/manager/tasks/JobRegistryImpl.java
r477 r539 1 1 package schedframe.scheduling.manager.tasks; 2 2 3 import gridsim.Gridlet; 4 import gssim.schedframe.scheduling.ExecTask; 5 import gssim.schedframe.scheduling.Executable; 3 import gridsim.dcworms.DCWormsTags; 6 4 7 5 import java.util.ArrayList; 8 import java.util.Collections;9 import java.util.HashMap;10 6 import java.util.List; 11 import java.util.Map;12 7 13 8 import org.apache.commons.lang.ArrayUtils; 14 9 import org.apache.commons.logging.Log; 15 10 import org.apache.commons.logging.LogFactory; 16 import org.joda.time.DateTime;17 11 import org.qcg.broker.schemas.resreqs.ParentType; 18 12 import org.qcg.broker.schemas.resreqs.types.TaskStatesName; 19 13 14 import dcworms.schedframe.scheduling.ExecTask; 15 20 16 import qcg.shared.constants.BrokerConstants; 21 import schedframe.resources.units.ResourceUnit; 22 import schedframe.resources.units.ResourceUnitName; 23 import schedframe.scheduling.ResourceHistoryItem; 24 import schedframe.scheduling.plan.AllocationInterface; 25 import schedframe.scheduling.tasks.AbstractProcesses; 17 import schedframe.ExecutablesList; 26 18 import schedframe.scheduling.tasks.JobInterface; 27 import schedframe.scheduling.tasks.SubmittedTask;28 19 import schedframe.scheduling.tasks.Task; 29 import simulator.WormsConstants;30 20 31 21 public class JobRegistryImpl extends AbstractJobRegistry { … … 37 27 private String context; 38 28 39 //TO DO - change data structure 40 protected static final List<ExecTask> submittedTasks = Collections.synchronizedList(new ArrayList<ExecTask>());; 41 //protected static final List<ExecTaskInterface> submittedTasks = new CopyOnWriteArrayList<ExecTaskInterface>(); 29 //TO DO - consider data structure 30 protected static final ExecutablesList executables = new ExecutablesList(); 31 //protected static final List<ExecTask> executables = Collections.synchronizedList(new ArrayList<ExecTask>());; 32 //protected static final List<ExecTaskInterface> executables = new CopyOnWriteArrayList<ExecTaskInterface>(); 42 33 43 public JobRegistryImpl(String context _) {44 context = context_;34 public JobRegistryImpl(String context) { 35 this.context = context; 45 36 } 46 37 47 /*protected void setContext(String context_) { 48 context = context_; 49 }*/ 50 51 public boolean addTask(ExecTask newTask) { 52 if(getSubmittedTask(newTask.getJobId(), newTask.getId()) == null) 53 { 54 synchronized (submittedTasks) { 55 submittedTasks.add(newTask); 38 public boolean addExecTask(ExecTask newTask) { 39 if(getExecutable(newTask.getJobId(), newTask.getId()) == null) { 40 synchronized (executables) { 41 executables.add(newTask); 56 42 } 57 43 return true; … … 60 46 } 61 47 48 public ExecutablesList getExecutableTasks() { 49 return executables; 50 } 62 51 public List<ExecTask> getTasks(int status) { 63 52 List<ExecTask> taskList = new ArrayList<ExecTask>(); 64 synchronized ( submittedTasks) {65 for (ExecTask task: submittedTasks) {53 synchronized (executables) { 54 for (ExecTask task: executables) { 66 55 if (task.getStatus() == status) { 67 //SubmittedTask subTask = (SubmittedTask) task;68 56 List<String> visitedResource = task.getVisitedResources(); 69 if(ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), context)) {57 if(ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), context)) { 70 58 taskList.add(task); 71 59 } 72 /*if(subTask.getVisitedResources().contains(context)){73 taskList.add(subTask);74 }*/75 60 } 76 61 } … … 80 65 81 66 public List<ExecTask> getQueuedTasks() { 82 return getTasks( Gridlet.QUEUED);67 return getTasks(DCWormsTags.QUEUED); 83 68 } 84 69 85 70 public List<ExecTask> getRunningTasks() { 86 return getTasks( Gridlet.INEXEC);71 return getTasks(DCWormsTags.INEXEC); 87 72 } 88 73 89 74 public List<ExecTask> getReadyTasks() { 90 return getTasks( Gridlet.READY);75 return getTasks(DCWormsTags.READY); 91 76 } 92 77 93 78 public List<ExecTask> getFinishedTasks() { 94 return getTasks( Gridlet.SUCCESS);79 return getTasks(DCWormsTags.SUCCESS); 95 80 } 96 81 97 98 public List<ExecTask> getAllSubmittedTasks() { 99 List<ExecTask> taskList; 100 synchronized (submittedTasks) { 101 taskList = new ArrayList<ExecTask>(submittedTasks); 102 } 103 return taskList; 104 } 105 106 public List<SubmittedTask> getSubmittedTasks() { 107 List<SubmittedTask> taskList = new ArrayList<SubmittedTask>(); 108 synchronized (submittedTasks) { 109 for (ExecTask task : submittedTasks) { 110 SubmittedTask subTask = (SubmittedTask) task; 111 List<String> visitedResource = subTask.getVisitedResources(); 112 if(ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), context)){ 113 taskList.add(subTask); 114 } 115 /*if(subTask.getVisitedResources().contains(context)){ 116 taskList.add(subTask); 117 }*/ 118 } 119 } 120 return taskList; 121 } 122 123 public ExecTask getSubmittedTask(String jobId, String taskId){ 124 synchronized (submittedTasks) { 125 for (ExecTask task : submittedTasks) { 126 if (task.getJobId().compareTo(jobId) == 0 && task.getId().compareTo(taskId)==0) { 82 public ExecTask getExecutable(String jobId, String taskId){ 83 synchronized (executables) { 84 for (ExecTask task : executables) { 85 if (task.getJobId().compareTo(jobId) == 0 && task.getId().compareTo(taskId) == 0) { 127 86 return task; 128 87 } … … 131 90 return null; 132 91 } 133 134 92 135 93 @SuppressWarnings("unchecked") 136 public List<Task> get ReadyTasks(List<JobInterface<?>> wuList) {137 List<Task> readyTasks = new ArrayList<Task>();94 public List<Task> getAvailableTasks(List<JobInterface<?>> wuList) { 95 List<Task> availableTasks = new ArrayList<Task>(); 138 96 List<Task> waitingTasks = new ArrayList<Task>(); 139 97 … … 143 101 } 144 102 145 readyTasks.addAll(getPrecedenceConstrainedReadyTasks(waitingTasks)); 146 return readyTasks; 147 } 148 149 public Executable getTaskExecutable(Integer executableId){ 150 synchronized (submittedTasks) { 151 for (ExecTask task : submittedTasks) { 152 SubmittedTask subTask = (SubmittedTask) task; 153 Executable exec = (Executable)subTask.getGridlet(); 154 if (exec.getGridletID() == executableId) { 155 return exec; 156 } 157 } 158 } 159 return null; 160 } 161 162 public List<Executable> getJobExecutables(String jobId){ 163 List<Executable> list = new ArrayList<Executable>(); 164 synchronized (submittedTasks) { 165 for(int i = 0; i < submittedTasks.size(); i++){ 166 SubmittedTask subTask = (SubmittedTask) submittedTasks.get(i); 167 Executable exec = (Executable)subTask.getGridlet(); 168 169 if(exec.getJobId().equals(jobId)) 170 list.add(exec); 171 } 172 } 173 return list; 174 } 175 176 public JobRegistryImpl clone() { 177 JobRegistryImpl jr = null; 178 try { 179 jr = (JobRegistryImpl) super.clone(); 180 } catch (CloneNotSupportedException e) { 181 // TODO Auto-generated catch block 182 e.printStackTrace(); 183 } 184 185 return jr; 186 } 187 188 /*public AbstractExecutable getTaskExecutabls(String jobId, String taskId){ 189 List<AbstractExecutable> list = new ArrayList<AbstractExecutable>(); 190 synchronized (submittedTasks) { 191 for(int i = 0; i < size(); i++){ 192 SubmittedTask subTask = (SubmittedTask) submittedTasks.get(i); 193 AbstractExecutable exec = (AbstractExecutable)subTask.getGridlet(); 194 195 if(exec.getJobId().equals(jobId) && exec.getId().equals(taskId)) 196 return exec; 197 } 198 } 199 return null; 200 }*/ 201 202 203 public Executable createExecutable(Task task, AllocationInterface allocation) { 204 205 String refersTo = allocation.getProcessGroupId(); // null;//allocation.getRefersTo(); 206 if(refersTo == null) 207 refersTo = task.getId(); 208 209 Executable exec = null; 210 211 if(refersTo.equals(task.getId())){ 212 exec = new Executable(task); 213 } else { 214 List<AbstractProcesses> processes = task.getProcesses(); 215 if(processes == null) { 216 try { 217 log.error("Allocation: " + allocation.getDocument() + "\nrefers to unknown task or processes set." + 218 " Set correct value (task id or prcesses set id) for allocation refersTo attribute."); 219 } catch (Exception e) { 220 e.printStackTrace(); 221 } 222 } 223 boolean found = false; 224 for(int j = 0; j < processes.size() && !found; j++){ 225 AbstractProcesses procesesSet = processes.get(j); 226 if(refersTo.equals(procesesSet.getId())){ 227 exec = new Executable(task, procesesSet); 228 found = true; 229 } 230 } 231 if(!found){ 232 log.error("Allocation refers to unknown proceses set."); 233 } 234 } 235 236 // exec.setUserID(task.getSenderId()); 237 exec.setLength(task.getLength()); 238 exec.setReservationId(allocation.getReservationId()); 239 240 /*HostInterface<?> host = allocation.getHost(); 241 ComputingResourceTypeInterface<?> crt = host.getMachineParameters(); 242 if(crt != null){ 243 ComputingResourceTypeItemInterface<?> crti = crt.getComputingResourceTypeItem(0); 244 if(crti != null){ 245 ParameterPropertyInterface<?> properties[] = crti.getHostParameter().getProperty(); 246 for(int p = 0; p < properties.length; p++){ 247 ParameterPropertyInterface<?> property = properties[p]; 248 if("chosenCPUs".equals(property.getName())){ 249 Object cpuNames = property.getValue(); 250 exec.addSpecificResource(ResourceParameterName.FREECPUS, cpuNames); 251 } 252 } 253 } 254 }*/ 255 return exec; 256 } 257 258 259 public List<Executable> createExecutables(Task task) { 260 261 List<AbstractProcesses> processes = task.getProcesses(); 262 263 List<Executable> executables = new ArrayList<Executable>(); 264 265 if(processes == null || processes.size()==0){ 266 Executable exec = new Executable(task); 267 exec.setUserID(task.getSenderId()); 268 exec.setLength(task.getLength()); 269 executables.add(exec); 270 } else { 271 272 boolean found = false; 273 for(int j = 0; j < processes.size() && !found; j++){ 274 AbstractProcesses procesesSet = processes.get(j); 275 Executable exec = new Executable(task, procesesSet); 276 exec.setUserID(task.getSenderId()); 277 exec.setLength(task.getLength()); 278 executables.add(exec); 279 } 280 } 281 282 return executables; 103 availableTasks.addAll(getPrecedenceConstrainedAvailableTasks(waitingTasks)); 104 return availableTasks; 283 105 } 284 106 285 107 286 /**************************************/ 287 protected static Map<Integer, Map<String, Object>> history = new HashMap<Integer, Map<String,Object>>(); 288 289 public static Map<Integer, Map<String, Object>> getAllocationHistory(){ 290 return history; 291 } 292 293 public void saveHistory (SubmittedTask submittedTask, int estimatedTime, Map<ResourceUnitName, ResourceUnit> choosenResources){ 108 private List<Task> getPrecedenceConstrainedAvailableTasks(List<Task> tasks){ 294 109 295 /*submittedTask.setEstimatedDuration(estimatedTime); 296 297 DateTime currentTime = new DateTime(); 298 ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime); 299 submittedTask.addUsedResources(resHistItem);*/ 300 301 ResourceHistoryItem resHistItem = submittedTask.getUsedResources().getLast(); 302 DateTime currentTime = new DateTime(); 303 Map<String, Object> historyItem = new HashMap<String, Object>(); 304 List<ResourceHistoryItem> list = new ArrayList<ResourceHistoryItem>(1); 305 list.add(resHistItem); 306 historyItem.put(WormsConstants.RESOURCES, list); 307 historyItem.put(WormsConstants.START_TIME, currentTime); 308 currentTime = currentTime.plusSeconds(estimatedTime); 309 historyItem.put(WormsConstants.END_TIME, currentTime); 310 311 history.put(Integer.valueOf(submittedTask.getGridletID()), historyItem); 312 /*ProcessingElements pes = (ProcessingElements) choosenResources.get(ResourceParameterName.PROCESSINGELEMENTS); 313 for (ComputingResource resource : pes) { 314 //submittedTask.addToResPath(resource.getName()); 315 submittedTask.visitResource(resource.getName()); 316 ComputingResource parent = resource.getParent(); 317 while (parent != null && !submittedTask.getResPath().contains(parent.getName() + "_")) { 318 submittedTask.addToResPath(parent.getName()); 319 parent = parent.getParent(); 320 } 321 while (parent != null && !submittedTask.getVisitedResources().contains(parent.getName() + "_")) { 322 submittedTask.visitResource(parent.getName()); 323 parent = parent.getParent(); 324 } 325 }*/ 326 } 327 328 private List<Task> getPrecedenceConstrainedReadyTasks(List<Task> tasks){ 110 List<Task> availableTasks = new ArrayList<Task>(); 111 int size = tasks.size(); 329 112 330 List<Task> readyTasks = new ArrayList<Task>();331 332 int size = tasks.size();333 113 for(int i = 0; i < size; i++){ 334 114 int parCnt; 335 int previousTask ReadyCnt = 0;115 int previousTaskSucceedCnt = 0; 336 116 Task task = tasks.get(i); 337 117 if(task.getStatus() != (int)BrokerConstants.TASK_STATUS_UNSUBMITTED) … … 341 121 } catch(Exception e){ 342 122 parCnt = 0; 343 //e.printStackTrace();344 123 } 345 if(parCnt == 0) 346 { 347 readyTasks.add(task); 124 if(parCnt == 0){ 125 availableTasks.add(task); 348 126 } 349 else 350 { 127 else { 351 128 for(int j = 0; j < parCnt; j++){ 352 129 ParentType par = task.getDescription().getWorkflow().getParent(j); … … 356 133 } 357 134 } 358 previousTask ReadyCnt++;135 previousTaskSucceedCnt++; 359 136 } 360 137 361 if(previousTask ReadyCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null)362 readyTasks.add(task);363 else if(previousTask ReadyCnt > 0 && task.getDescription().getWorkflow().getOr() != null)364 readyTasks.add(task);365 else if (previousTask ReadyCnt == parCnt)366 readyTasks.add(task);138 if(previousTaskSucceedCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null) 139 availableTasks.add(task); 140 else if(previousTaskSucceedCnt > 0 && task.getDescription().getWorkflow().getOr() != null) 141 availableTasks.add(task); 142 else if (previousTaskSucceedCnt == parCnt) 143 availableTasks.add(task); 367 144 } 368 145 } 369 return readyTasks;146 return availableTasks; 370 147 } 371 148 … … 380 157 return false; 381 158 } 159 382 160 }
Note: See TracChangeset
for help on using the changeset viewer.