package schedframe.scheduling.manager.tasks; import gridsim.dcworms.DCWormsTags; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Set; import org.qcg.broker.schemas.resreqs.ParentType; import org.qcg.broker.schemas.resreqs.Workflow; import org.qcg.broker.schemas.resreqs.types.TaskStatesName; import qcg.shared.constants.BrokerConstants; import schedframe.ExecutablesList; import schedframe.resources.computing.ComputingResource; import schedframe.scheduling.tasks.JobInterface; import schedframe.scheduling.tasks.Task; import dcworms.schedframe.scheduling.ExecTask; public class JobRegistryImpl extends AbstractJobRegistry { private String context; private ComputingResource cr; //TO DO - consider data structure protected static final ExecutablesList executables = new ExecutablesList(); //protected static final List executables = Collections.synchronizedList(new ArrayList());; //protected static final List executables = new CopyOnWriteArrayList(); public JobRegistryImpl(String context) { this.context = context; } public JobRegistryImpl(ComputingResource cr) { this.cr = cr; } public JobRegistryImpl() { this(""); } public boolean addExecTask(ExecTask newTask) { if(getTask(newTask.getJobId(), newTask.getId()) == null) { synchronized (executables) { executables.add(newTask); } return true; } return false; } public ExecutablesList getTasks() { return executables; } public List getTasks(int status) { List taskList = new ArrayList(); synchronized (executables) { for (ExecTask task: executables) { if (task.getStatus() == status) { if(cr != null){ if(!task.getAllocatedResources().isEmpty()){ Set visitedResource = task.getAllocatedResources().getLast().getResources(); if(visitedResource.contains(cr)){ taskList.add(task); } else { for(ComputingResource res: visitedResource){ if(cr.contains(res)){ taskList.add(task); break; } } } } } else { if(!task.getAllocatedResources().isEmpty()){ Set visitedResource = task.getAllocatedResources().getLast().getResourceNames(); for(String res: visitedResource){ if(res.equals(context) || res.substring(0, res.lastIndexOf("/")).contains(context)){ taskList.add(task); break; } } } if(task.getSchedulerName().equals(context)) { taskList.add(task); } } } } } return taskList; //return getTasks(Arrays.asList(status), true); } public List getTasks(List statusList, boolean match) { List taskList = new ArrayList(); synchronized (executables) { for (ExecTask task: executables) { for(int status: statusList){ if (compareStatus(task.getStatus(), status, match)) { if(cr != null){ if(!task.getAllocatedResources().isEmpty()){ Set visitedResource = task.getAllocatedResources().getLast().getResources(); if(visitedResource.contains(cr)){ taskList.add(task); } else { for(ComputingResource res: visitedResource){ if(cr.contains(res)){ taskList.add(task); break; } } } } } else { if(!task.getAllocatedResources().isEmpty()){ Set visitedResource = task.getAllocatedResources().getLast().getResourceNames(); for(String res: visitedResource){ if(res.equals(context) || res.substring(0, res.lastIndexOf("/")).contains(context)){ taskList.add(task); break; } } } if(task.getSchedulerName().equals(context)) { taskList.add(task); } } } } } } return taskList; } private boolean compareStatus(int taskStatus, int status, boolean match){ if(match){ if(taskStatus == status) return true; } else { if(taskStatus !=status) return true; } return false; } public List getQueuedTasks() { return getTasks(DCWormsTags.QUEUED); } public List getRunningTasks() { return getTasks(DCWormsTags.INEXEC); } public List getReadyTasks() { return getTasks(DCWormsTags.READY); } public List getFinishedTasks() { return getTasks(DCWormsTags.SUCCESS); } public ExecTask getTask(String jobId, String taskId){ synchronized (executables) { for (ExecTask task : executables) { if (task.getJobId().compareTo(jobId) == 0 && task.getId().compareTo(taskId) == 0) { return task; } } } return null; } @SuppressWarnings("unchecked") public List getAvailableTasks(List> jobList) { List availableTasks = new ArrayList(); List waitingTasks = new ArrayList(); for(int i = 0; i < jobList.size(); i++){ JobInterface job = (JobInterface)jobList.get(i); waitingTasks.addAll((List)job.getTask()); } availableTasks.addAll(getPrecedenceConstrainedAvailableTasks(waitingTasks)); return availableTasks; } private List getPrecedenceConstrainedAvailableTasks(List tasks){ List availableTasks = new ArrayList(); int size = tasks.size(); for(int i = 0; i < size; i++){ int parCnt; int previousTaskSucceedCnt = 0; Task task = tasks.get(i); if(task.getStatus() != (int)BrokerConstants.TASK_STATUS_UNSUBMITTED) continue; //the following procedure supports only one nested structure Workflow w = task.getDescription().getWorkflow(); if (w == null){ availableTasks.add(task); continue; } if(w.getAnd() != null) { parCnt = w.getAnd().getParentOpTypeItemCount(); if(parCnt == 0) { availableTasks.add(task); } else { for(int j = 0; j < parCnt; j++){ ParentType par = w.getAnd().getParentOpTypeItem(j).getParent(); if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){ if(!checkTaskState(task.getJobId(), par.getContent(), TaskStatesName.FINISHED)){ //if(!checkTaskCompletion(task.getJobId(), par.getContent())){ break; } } previousTaskSucceedCnt++; } if(previousTaskSucceedCnt == parCnt) availableTasks.add(task); } } else if(w.getOr() != null) { parCnt = w.getOr().getParentOpTypeItemCount(); if(parCnt == 0) { availableTasks.add(task); } else { for(int j = 0; j < parCnt; j++){ ParentType par = w.getOr().getParentOpTypeItem(j).getParent(); if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){ if(!checkTaskState(task.getJobId(), par.getContent(), TaskStatesName.FINISHED)){ //if(!checkTaskCompletion(task.getJobId(), par.getContent())){ continue; } } previousTaskSucceedCnt++; } if(previousTaskSucceedCnt > 0) availableTasks.add(task); } } else { parCnt = w.getParentCount(); if(parCnt == 0) { availableTasks.add(task); } else { for(int j = 0; j < parCnt; j++){ ParentType par = w.getParent(j); if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){ if(par.getContent().contains("_")){ if(!checkTaskState(par.getContent().split("_")[0], par.getContent().split("_")[1], TaskStatesName.FINISHED)){ //if(!checkTaskCompletion(par.getContent().split("_")[0], par.getContent().split("_")[1])){ continue; } } else if(!checkTaskState(task.getJobId(), par.getContent(), TaskStatesName.FINISHED)){ //else if(!checkTaskCompletion(task.getJobId(), par.getContent())){ continue; } } previousTaskSucceedCnt++; } if(previousTaskSucceedCnt == parCnt) availableTasks.add(task); } } /*try{ parCnt = task.getDescription().getWorkflow().getParentCount(); } catch(Exception e){ parCnt = 0; } if(parCnt == 0){ availableTasks.add(task); } else { for(int j = 0; j < parCnt; j++){ ParentType par = task.getDescription().getWorkflow().getParent(j); if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){ if(!checkTaskCompletion(task.getJobId(), par.getContent())){ break; } } previousTaskSucceedCnt++; } if(previousTaskSucceedCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null) availableTasks.add(task); else if(previousTaskSucceedCnt > 0 && task.getDescription().getWorkflow().getOr() != null) availableTasks.add(task); else if (previousTaskSucceedCnt == parCnt) availableTasks.add(task); }*/ } return availableTasks; } private boolean checkTaskCompletion(String jobID, String taskID){ JobInterface job = getJobInfo(jobID); try { if(job.getTask(taskID).isFinished()) return true; } catch (NoSuchFieldException e) { //e.printStackTrace(); } return false; } private boolean checkTaskState(String jobID, String taskID, TaskStatesName taskState){ JobInterface job = getJobInfo(jobID); try { switch (taskState){ case QUEUED:{ if(job.getTask(taskID).getStatus() == (int)BrokerConstants.JOB_STATUS_SUBMITTED) return true; } case FINISHED:{ if(job.getTask(taskID).isFinished()) return true; } default: break; } } catch (NoSuchFieldException e) { //e.printStackTrace(); } return false; } public static void reset(){ jobs.clear(); executables.clear(); } }