package schedframe.scheduling.manager.tasks; import gridsim.Gridlet; import gssim.schedframe.scheduling.ExecTask; import gssim.schedframe.scheduling.Executable; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.joda.time.DateTime; import org.qcg.broker.schemas.resreqs.ParentType; import org.qcg.broker.schemas.resreqs.types.TaskStatesName; import qcg.shared.constants.BrokerConstants; import schedframe.resources.units.ResourceUnit; import schedframe.resources.units.ResourceUnitName; import schedframe.scheduling.ResourceHistoryItem; import schedframe.scheduling.plan.AllocationInterface; import schedframe.scheduling.tasks.AbstractProcesses; import schedframe.scheduling.tasks.JobInterface; import schedframe.scheduling.tasks.SubmittedTask; import schedframe.scheduling.tasks.Task; import simulator.WormsConstants; public class JobRegistryImpl extends AbstractJobRegistry { private static final long serialVersionUID = 8030555906990767342L; private static Log log = LogFactory.getLog(JobRegistryImpl.class); private String context; //TO DO - change data structure protected static final List submittedTasks = Collections.synchronizedList(new ArrayList());; //protected static final List submittedTasks = new CopyOnWriteArrayList(); public JobRegistryImpl(String context_) { context = context_; } /*protected void setContext(String context_) { context = context_; }*/ public boolean addTask(ExecTask newTask) { if(getSubmittedTask(newTask.getJobId(), newTask.getId()) == null) { synchronized (submittedTasks) { submittedTasks.add(newTask); } return true; } return false; } public List getTasks(int status) { List taskList = new ArrayList(); synchronized (submittedTasks) { for (ExecTask task: submittedTasks) { if (task.getStatus() == status) { //SubmittedTask subTask = (SubmittedTask) task; List visitedResource = task.getVisitedResources(); if(ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), context)){ taskList.add(task); } /*if(subTask.getVisitedResources().contains(context)){ taskList.add(subTask); }*/ } } } return taskList; } public List getQueuedTasks() { return getTasks(Gridlet.QUEUED); } public List getRunningTasks() { return getTasks(Gridlet.INEXEC); } public List getReadyTasks() { return getTasks(Gridlet.READY); } public List getFinishedTasks() { return getTasks(Gridlet.SUCCESS); } public List getAllSubmittedTasks() { List taskList; synchronized (submittedTasks) { taskList = new ArrayList(submittedTasks); } return taskList; } public List getSubmittedTasks() { List taskList = new ArrayList(); synchronized (submittedTasks) { for (ExecTask task : submittedTasks) { SubmittedTask subTask = (SubmittedTask) task; List visitedResource = subTask.getVisitedResources(); if(ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), context)){ taskList.add(subTask); } /*if(subTask.getVisitedResources().contains(context)){ taskList.add(subTask); }*/ } } return taskList; } public ExecTask getSubmittedTask(String jobId, String taskId){ synchronized (submittedTasks) { for (ExecTask task : submittedTasks) { if (task.getJobId().compareTo(jobId) == 0 && task.getId().compareTo(taskId)==0) { return task; } } } return null; } @SuppressWarnings("unchecked") public List getReadyTasks(List> wuList) { List readyTasks = new ArrayList(); List waitingTasks = new ArrayList(); for(int i = 0; i < wuList.size(); i++){ JobInterface wu = (JobInterface)wuList.get(i); waitingTasks.addAll((List)wu.getTask()); } readyTasks.addAll(getPrecedenceConstrainedReadyTasks(waitingTasks)); return readyTasks; } public Executable getTaskExecutable(Integer executableId){ synchronized (submittedTasks) { for (ExecTask task : submittedTasks) { SubmittedTask subTask = (SubmittedTask) task; Executable exec = (Executable)subTask.getGridlet(); if (exec.getGridletID() == executableId) { return exec; } } } return null; } public List getJobExecutables(String jobId){ List list = new ArrayList(); synchronized (submittedTasks) { for(int i = 0; i < submittedTasks.size(); i++){ SubmittedTask subTask = (SubmittedTask) submittedTasks.get(i); Executable exec = (Executable)subTask.getGridlet(); if(exec.getJobId().equals(jobId)) list.add(exec); } } return list; } public JobRegistryImpl clone() { JobRegistryImpl jr = null; try { jr = (JobRegistryImpl) super.clone(); } catch (CloneNotSupportedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return jr; } /*public AbstractExecutable getTaskExecutabls(String jobId, String taskId){ List list = new ArrayList(); synchronized (submittedTasks) { for(int i = 0; i < size(); i++){ SubmittedTask subTask = (SubmittedTask) submittedTasks.get(i); AbstractExecutable exec = (AbstractExecutable)subTask.getGridlet(); if(exec.getJobId().equals(jobId) && exec.getId().equals(taskId)) return exec; } } return null; }*/ public Executable createExecutable(Task task, AllocationInterface allocation) { String refersTo = allocation.getProcessGroupId(); // null;//allocation.getRefersTo(); if(refersTo == null) refersTo = task.getId(); Executable exec = null; if(refersTo.equals(task.getId())){ exec = new Executable(task); } else { List processes = task.getProcesses(); if(processes == null) { try { log.error("Allocation: " + allocation.getDocument() + "\nrefers to unknown task or processes set." + " Set correct value (task id or prcesses set id) for allocation refersTo attribute."); } catch (Exception e) { e.printStackTrace(); } } boolean found = false; for(int j = 0; j < processes.size() && !found; j++){ AbstractProcesses procesesSet = processes.get(j); if(refersTo.equals(procesesSet.getId())){ exec = new Executable(task, procesesSet); found = true; } } if(!found){ log.error("Allocation refers to unknown proceses set."); } } // exec.setUserID(task.getSenderId()); exec.setLength(task.getLength()); exec.setReservationId(allocation.getReservationId()); /*HostInterface host = allocation.getHost(); ComputingResourceTypeInterface crt = host.getMachineParameters(); if(crt != null){ ComputingResourceTypeItemInterface crti = crt.getComputingResourceTypeItem(0); if(crti != null){ ParameterPropertyInterface properties[] = crti.getHostParameter().getProperty(); for(int p = 0; p < properties.length; p++){ ParameterPropertyInterface property = properties[p]; if("chosenCPUs".equals(property.getName())){ Object cpuNames = property.getValue(); exec.addSpecificResource(ResourceParameterName.FREECPUS, cpuNames); } } } }*/ return exec; } public List createExecutables(Task task) { List processes = task.getProcesses(); List executables = new ArrayList(); if(processes == null || processes.size()==0){ Executable exec = new Executable(task); exec.setUserID(task.getSenderId()); exec.setLength(task.getLength()); executables.add(exec); } else { boolean found = false; for(int j = 0; j < processes.size() && !found; j++){ AbstractProcesses procesesSet = processes.get(j); Executable exec = new Executable(task, procesesSet); exec.setUserID(task.getSenderId()); exec.setLength(task.getLength()); executables.add(exec); } } return executables; } /**************************************/ protected static Map> history = new HashMap>(); public static Map> getAllocationHistory(){ return history; } public void saveHistory (SubmittedTask submittedTask, int estimatedTime, Map choosenResources){ /*submittedTask.setEstimatedDuration(estimatedTime); DateTime currentTime = new DateTime(); ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime); submittedTask.addUsedResources(resHistItem);*/ ResourceHistoryItem resHistItem = submittedTask.getUsedResources().getLast(); DateTime currentTime = new DateTime(); Map historyItem = new HashMap(); List list = new ArrayList(1); list.add(resHistItem); historyItem.put(WormsConstants.RESOURCES, list); historyItem.put(WormsConstants.START_TIME, currentTime); currentTime = currentTime.plusSeconds(estimatedTime); historyItem.put(WormsConstants.END_TIME, currentTime); history.put(Integer.valueOf(submittedTask.getGridletID()), historyItem); /*ProcessingElements pes = (ProcessingElements) choosenResources.get(ResourceParameterName.PROCESSINGELEMENTS); for (ComputingResource resource : pes) { //submittedTask.addToResPath(resource.getName()); submittedTask.visitResource(resource.getName()); ComputingResource parent = resource.getParent(); while (parent != null && !submittedTask.getResPath().contains(parent.getName() + "_")) { submittedTask.addToResPath(parent.getName()); parent = parent.getParent(); } while (parent != null && !submittedTask.getVisitedResources().contains(parent.getName() + "_")) { submittedTask.visitResource(parent.getName()); parent = parent.getParent(); } }*/ } private List getPrecedenceConstrainedReadyTasks(List tasks){ List readyTasks = new ArrayList(); int size = tasks.size(); for(int i = 0; i < size; i++){ int parCnt; int previousTaskReadyCnt = 0; Task task = tasks.get(i); if(task.getStatus() != (int)BrokerConstants.TASK_STATUS_UNSUBMITTED) continue; try{ parCnt = task.getDescription().getWorkflow().getParentCount(); } catch(Exception e){ parCnt = 0; //e.printStackTrace(); } if(parCnt == 0) { readyTasks.add(task); } else { for(int j = 0; j < parCnt; j++){ ParentType par = task.getDescription().getWorkflow().getParent(j); if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){ if(!checkTaskCompletion(task.getJobId(), par.getContent())){ break; } } previousTaskReadyCnt++; } if(previousTaskReadyCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null) readyTasks.add(task); else if(previousTaskReadyCnt > 0 && task.getDescription().getWorkflow().getOr() != null) readyTasks.add(task); else if (previousTaskReadyCnt == parCnt) readyTasks.add(task); } } return readyTasks; } private boolean checkTaskCompletion(String jobID, String taskID){ JobInterface job = getJobInfo(jobID); try { if(job.getTask(taskID).isFinished()) return true; } catch (NoSuchFieldException e) { //e.printStackTrace(); } return false; } }