package test.rewolucja.scheduling; import gridsim.Gridlet; import gridsim.gssim.GssimConstants; import gridsim.gssim.ResourceHistoryItem; import gridsim.gssim.SubmittedTask; import gssim.schedframe.scheduling.AbstractExecutable; import gssim.schedframe.scheduling.ExecTaskInterface; import gssim.schedframe.scheduling.Executable; import gssim.schedframe.scheduling.queues.TaskQueue; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.joda.time.DateTime; import schedframe.resources.units.ResourceUnit; import schedframe.scheduling.AbstractProcesses; import schedframe.scheduling.Job; import schedframe.scheduling.JobInterface; import schedframe.scheduling.Task; import schedframe.scheduling.TaskInterface; import schedframe.scheduling.utils.ResourceParameterName; import test.rewolucja.resources.ProcessingElements; import test.rewolucja.resources.physical.base.ComputingResource; import test.rewolucja.scheduling.plan.AllocationInterfaceNew; public class JobRegistry extends AbstractJobRegistry /*implements Cloneable*/ { private static final long serialVersionUID = 8030555906990767342L; private static Log log = LogFactory.getLog(JobRegistry.class); private String context; protected static final List submittedTasks = Collections.synchronizedList(new ArrayList());; //protected static final List submittedTasks = new CopyOnWriteArrayList(); public JobRegistry(String context_) { context = context_; } /*protected void setContext(String context_) { context = context_; }*/ public boolean addTask(ExecTaskInterface newTask) { if(getSubmittedTask(newTask.getJobId(), newTask.getId()) == null) { synchronized (submittedTasks) { submittedTasks.add(newTask); } return true; } return false; } public List getTasks(int status) { List taskList = new ArrayList(); synchronized (submittedTasks) { for (ExecTaskInterface task : submittedTasks) { if (task.getStatus() == status) { SubmittedTask subTask = (SubmittedTask) task; if (subTask.getResPath().contains(context)) { taskList.add(subTask); } } } } return taskList; } public List getQueuedTasks() { return getTasks(Gridlet.QUEUED); } public List getRunningTasks() { return getTasks(Gridlet.INEXEC); } public List getReadyTasks() { return getTasks(Gridlet.READY); } public List getFinishedTasks() { return getTasks(Gridlet.SUCCESS); } public List getAllSubmittedTasks() { List taskList;; synchronized (submittedTasks) { taskList = new ArrayList(submittedTasks); } return taskList; } public List getSubmittedTasks() { List taskList = new ArrayList(); synchronized (submittedTasks) { for (ExecTaskInterface task : submittedTasks) { SubmittedTask subTask = (SubmittedTask) task; if (subTask.getResPath().contains(context)) { taskList.add(subTask); } } } return taskList; } public SubmittedTask getSubmittedTask(String jobId, String taskId){ synchronized (submittedTasks) { for (ExecTaskInterface task : submittedTasks) { if (task.getJobId().compareTo(jobId) == 0 && task.getId().compareTo(taskId)==0) { return (SubmittedTask)task; } } } return null; } public List> getReadyTasks(List> jobsList) { List> tasks = new ArrayList>(); TaskQueue waitingTasks = new TaskQueue(); for(int i = 0; i < jobsList.size(); i++){ Job newJob = (Job)jobsList.get(i); waitingTasks.addAll(newJob.getTask()); } tasks.addAll(waitingTasks.getReadyTasks(this)); return tasks; } public AbstractExecutable getTaskExecutable(Integer executableId){ synchronized (submittedTasks) { for (ExecTaskInterface task : submittedTasks) { SubmittedTask subTask = (SubmittedTask) task; AbstractExecutable exec = (AbstractExecutable)subTask.getGridlet(); if (exec.getGridletID() == executableId) { return exec; } } } return null; } public List getJobExecutables(String jobId){ List list = new ArrayList(); synchronized (submittedTasks) { for(int i = 0; i < submittedTasks.size(); i++){ SubmittedTask subTask = (SubmittedTask) submittedTasks.get(i); AbstractExecutable exec = (AbstractExecutable)subTask.getGridlet(); if(exec.getJobId().equals(jobId)) list.add(exec); } } return list; } /*public AbstractExecutable getTaskExecutabls(String jobId, String taskId){ List list = new ArrayList(); synchronized (submittedTasks) { for(int i = 0; i < size(); i++){ SubmittedTask subTask = (SubmittedTask) submittedTasks.get(i); AbstractExecutable exec = (AbstractExecutable)subTask.getGridlet(); if(exec.getJobId().equals(jobId) && exec.getId().equals(taskId)) return exec; } } return null; }*/ public Executable createExecutable(Task task, AllocationInterfaceNew allocation) { String refersTo = allocation.getProcessGroupId(); // null;//allocation.getRefersTo(); if(refersTo == null) refersTo = task.getId(); Executable exec = null; if(refersTo.equals(task.getId())){ exec = new Executable(task); } else { List processes = task.getProcesses(); if(processes == null) { try { log.error("Allocation: " + allocation.getDocument() + "\nrefers to unknown task or processes set." + " Set correct value (task id or prcesses set id) for allocation refersTo attribute."); } catch (Exception e) { e.printStackTrace(); } } boolean found = false; for(int j = 0; j < processes.size() && !found; j++){ AbstractProcesses procesesSet = processes.get(j); if(refersTo.equals(procesesSet.getId())){ exec = new Executable(task, procesesSet); found = true; } } if(!found){ log.error("Allocation refers to unknown proceses set."); } } exec.setUserID(task.getSenderId()); exec.setLength(task.getLength()); exec.setReservationId(allocation.getReservationId()); /*HostInterface host = allocation.getHost(); ComputingResourceTypeInterface crt = host.getMachineParameters(); if(crt != null){ ComputingResourceTypeItemInterface crti = crt.getComputingResourceTypeItem(0); if(crti != null){ ParameterPropertyInterface properties[] = crti.getHostParameter().getProperty(); for(int p = 0; p < properties.length; p++){ ParameterPropertyInterface property = properties[p]; if("chosenCPUs".equals(property.getName())){ Object cpuNames = property.getValue(); exec.addSpecificResource(ResourceParameterName.FREECPUS, cpuNames); } } } }*/ return exec; } /**************************************/ protected static Map> history = new HashMap>(); public static Map> getAllocationHistory(){ return history; } public void saveHistory (SubmittedTask submittedTask, int estimatedTime, Map choosenResources){ submittedTask.setEstimatedDuration(estimatedTime); DateTime currentTime = new DateTime(); ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime); submittedTask.addUsedResources(resHistItem); Map historyItem = new HashMap(); List list = new ArrayList(1); list.add(resHistItem); historyItem.put(GssimConstants.RESOURCES, list); historyItem.put(GssimConstants.START_TIME, currentTime); currentTime = currentTime.plusSeconds(estimatedTime); historyItem.put(GssimConstants.END_TIME, currentTime); submittedTask.setFinishTime(currentTime.getMillis() / 1000); history.put(Integer.valueOf(submittedTask.getGridletID()), historyItem); ProcessingElements pes = (ProcessingElements) choosenResources.get(ResourceParameterName.PROCESSINGELEMENTS); for (ComputingResource resource : pes) { submittedTask.addToResPath(resource.getName()); ComputingResource parent = resource.getParent(); while (parent != null && !submittedTask.getResPath().contains(parent.getName() + "_")) { submittedTask.addToResPath(parent.getName()); parent = parent.getParent(); } } } }