Ignore:
Timestamp:
10/08/12 10:23:45 (13 years ago)
Author:
wojtekp
Message:
 
Location:
DCWoRMS/trunk/src/schedframe/scheduling/manager/tasks
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • DCWoRMS/trunk/src/schedframe/scheduling/manager/tasks/AbstractJobRegistry.java

    r480 r481  
    1313 
    1414 
    15 public abstract class AbstractJobRegistry /*extends ConcurrentHashMap<String, Job> */implements JobRegistry, Cloneable{ 
     15public abstract class AbstractJobRegistry /*extends ConcurrentHashMap<String, Job>*/ implements JobRegistry, Cloneable{ 
    1616         
    1717        private static final long serialVersionUID = 8409060063583755824L; 
     18 
    1819         
    19         private static Log log = LogFactory.getLog(AbstractJobRegistry.class); 
    20          
    21         protected static final ConcurrentHashMap<String, Job> jobs = new ConcurrentHashMap<String, Job>(); 
     20        protected static final ConcurrentHashMap<String, JobInterface<?>> jobs = new ConcurrentHashMap<String, JobInterface<?>>(); 
    2221         
    2322        protected AbstractJobRegistry(){ 
    24                 //log.warn("Methods from JobRegistry interface are not implemented."); 
    2523        } 
    2624         
    2725        public boolean addJob(JobInterface<?> job) { 
    28                 jobs.put(job.getId(), (Job) job); 
     26                jobs.put(job.getId(), job); 
    2927                return true; 
    3028        } 
     
    3230        public boolean addTask(TaskInterface<?> task) { 
    3331                if(jobs.containsKey(task.getJobId())){ 
    34                         jobs.get(task.getJobId()).add((Task)task); 
     32                        getJob(task.getJobId()).add((Task)task); 
    3533                        return true; 
    3634                } else { 
     
    3937        } 
    4038 
    41         public Job getJob(String jobId){ 
     39        public JobInterface<?> getJobInfo(String jobId) { 
    4240                return jobs.get(jobId); 
    4341        } 
    44          
    45         public JobInterface<?> getJobInfo(String jobID) { 
    46                 return jobs.get(jobID); 
    47         } 
    4842 
    49         public TaskInterface<?> getTaskInfo(String jobID, String taskId) { 
     43        public TaskInterface<?> getTaskInfo(String jobId, String taskId) { 
    5044                Task task = null; 
    51                 Job job = jobs.get(jobID); 
     45                Job job = getJob(jobId); 
    5246                 
    5347                if(job == null) 
     
    5751                        task = job.getTask(taskId); 
    5852                } catch (NoSuchFieldException e) { 
    59                         log.error(e.getMessage()); 
    6053                } 
    6154                return task; 
    6255        } 
    6356         
    64         /*public List<JobInterface<?>> getActiveJobs() { 
    65                 log.error("getActiveJobs() not implemented."); 
    66                 return null; 
     57        public Job getJob(String jobId){ 
     58                return (Job)jobs.get(jobId); 
    6759        } 
    6860 
    69         public List<TaskInterface<?>> getActiveTasks() { 
    70                 log.error("getActiveTasks() not implemented."); 
    71                 return null; 
    72         }*/ 
    73  
    7461} 
  • DCWoRMS/trunk/src/schedframe/scheduling/manager/tasks/JobRegistry.java

    r477 r481  
    55import java.util.List; 
    66 
     7import schedframe.ExecutablesList; 
    78import schedframe.scheduling.tasks.JobInterface; 
    89import schedframe.scheduling.tasks.TaskInterface; 
     
    1112public interface JobRegistry { 
    1213 
    13         //public List<JobInterface<?>> getActiveJobs(); 
    14  
    15         //public List<TaskInterface<?>> getActiveTasks(); 
    16          
    1714        public JobInterface<?> getJobInfo(String jobId); 
    1815 
     
    3128 
    3229 
    33         //public List<SubmittedTask> getSubmittedTasks(); 
     30        public ExecutablesList getExecutableTasks();  
    3431         
    35         public ExecTask getSubmittedTask(String jobId, String taskId); 
     32        public ExecTask getExecutable(String jobId, String taskId); 
    3633 
    3734         
    38         public List<? extends TaskInterface<?>> getReadyTasks(List<JobInterface<?>> jobsList); 
     35        public List<? extends TaskInterface<?>> getAvailableTasks(List<JobInterface<?>> jobsList); 
    3936 
    4037} 
  • DCWoRMS/trunk/src/schedframe/scheduling/manager/tasks/JobRegistryImpl.java

    r477 r481  
    11package schedframe.scheduling.manager.tasks; 
    22 
    3 import gridsim.Gridlet; 
     3import gridsim.gssim.DCWormsTags; 
    44import gssim.schedframe.scheduling.ExecTask; 
    5 import gssim.schedframe.scheduling.Executable; 
    65 
    76import java.util.ArrayList; 
    8 import java.util.Collections; 
    9 import java.util.HashMap; 
    107import java.util.List; 
    11 import java.util.Map; 
    128 
    139import org.apache.commons.lang.ArrayUtils; 
    1410import org.apache.commons.logging.Log; 
    1511import org.apache.commons.logging.LogFactory; 
    16 import org.joda.time.DateTime; 
    1712import org.qcg.broker.schemas.resreqs.ParentType; 
    1813import org.qcg.broker.schemas.resreqs.types.TaskStatesName; 
    1914 
    2015import qcg.shared.constants.BrokerConstants; 
    21 import schedframe.resources.units.ResourceUnit; 
    22 import schedframe.resources.units.ResourceUnitName; 
    23 import schedframe.scheduling.ResourceHistoryItem; 
    24 import schedframe.scheduling.plan.AllocationInterface; 
    25 import schedframe.scheduling.tasks.AbstractProcesses; 
     16import schedframe.ExecutablesList; 
    2617import schedframe.scheduling.tasks.JobInterface; 
    27 import schedframe.scheduling.tasks.SubmittedTask; 
    2818import schedframe.scheduling.tasks.Task; 
    29 import simulator.WormsConstants; 
    3019 
    3120public class JobRegistryImpl extends AbstractJobRegistry { 
     
    3726        private String context; 
    3827 
    39         //TO DO - change data structure 
    40         protected static final List<ExecTask> submittedTasks = Collections.synchronizedList(new ArrayList<ExecTask>());; 
    41         //protected static final List<ExecTaskInterface> submittedTasks = new CopyOnWriteArrayList<ExecTaskInterface>(); 
     28        //TO DO - consider data structure 
     29        protected static final ExecutablesList executables = new ExecutablesList(); 
     30        //protected static final List<ExecTask> executables = Collections.synchronizedList(new ArrayList<ExecTask>());; 
     31        //protected static final List<ExecTaskInterface> executables = new CopyOnWriteArrayList<ExecTaskInterface>(); 
    4232 
    43         public JobRegistryImpl(String context_) { 
    44                 context = context_; 
     33        public JobRegistryImpl(String context) { 
     34                this.context = context; 
    4535        } 
    4636 
    47         /*protected void setContext(String context_) { 
    48                 context = context_; 
    49         }*/ 
    50  
    51         public boolean addTask(ExecTask newTask) { 
    52                 if(getSubmittedTask(newTask.getJobId(), newTask.getId()) == null) 
    53                 { 
    54                         synchronized (submittedTasks)  { 
    55                                 submittedTasks.add(newTask); 
     37        public boolean addExecTask(ExecTask newTask) { 
     38                if(getExecutable(newTask.getJobId(), newTask.getId()) == null) { 
     39                        synchronized (executables)  { 
     40                                executables.add(newTask); 
    5641                        } 
    5742                        return true; 
     
    6045        } 
    6146 
     47        public ExecutablesList getExecutableTasks() { 
     48                return executables; 
     49        } 
    6250        public List<ExecTask> getTasks(int status) { 
    6351                List<ExecTask> taskList = new ArrayList<ExecTask>(); 
    64                 synchronized (submittedTasks)  { 
    65                         for (ExecTask task: submittedTasks) { 
     52                synchronized (executables)  { 
     53                        for (ExecTask task: executables) { 
    6654                                if (task.getStatus() == status) { 
    67                                         //SubmittedTask subTask = (SubmittedTask) task; 
    6855                                        List<String> visitedResource = task.getVisitedResources(); 
    69                                         if(ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), context)){ 
     56                                        if(ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), context)) { 
    7057                                                taskList.add(task); 
    7158                                        } 
    72                                         /*if(subTask.getVisitedResources().contains(context)){ 
    73                                                 taskList.add(subTask); 
    74                                         }*/ 
    7559                                } 
    7660                        } 
     
    8064 
    8165        public List<ExecTask> getQueuedTasks() { 
    82                 return getTasks(Gridlet.QUEUED); 
     66                return getTasks(DCWormsTags.QUEUED); 
    8367        } 
    8468         
    8569        public List<ExecTask> getRunningTasks() { 
    86                 return getTasks(Gridlet.INEXEC); 
     70                return getTasks(DCWormsTags.INEXEC); 
    8771        } 
    8872         
    8973        public List<ExecTask> getReadyTasks() { 
    90                 return getTasks(Gridlet.READY); 
     74                return getTasks(DCWormsTags.READY); 
    9175        } 
    9276         
    9377        public List<ExecTask> getFinishedTasks() { 
    94                 return getTasks(Gridlet.SUCCESS); 
     78                return getTasks(DCWormsTags.SUCCESS); 
    9579        } 
    9680         
    97          
    98         public List<ExecTask> getAllSubmittedTasks() { 
    99                 List<ExecTask> taskList; 
    100                 synchronized (submittedTasks)  { 
    101                         taskList = new ArrayList<ExecTask>(submittedTasks); 
    102                 } 
    103                 return taskList; 
    104         } 
    105  
    106         public List<SubmittedTask> getSubmittedTasks() { 
    107                 List<SubmittedTask> taskList = new ArrayList<SubmittedTask>(); 
    108                 synchronized (submittedTasks)  { 
    109                         for (ExecTask task : submittedTasks) { 
    110                                 SubmittedTask subTask = (SubmittedTask) task; 
    111                                 List<String> visitedResource = subTask.getVisitedResources(); 
    112                                 if(ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), context)){ 
    113                                         taskList.add(subTask); 
    114                                 } 
    115                                 /*if(subTask.getVisitedResources().contains(context)){ 
    116                                         taskList.add(subTask); 
    117                                 }*/ 
    118                         } 
    119                 } 
    120                 return taskList; 
    121         } 
    122          
    123         public ExecTask getSubmittedTask(String jobId, String taskId){ 
    124                 synchronized (submittedTasks)  { 
    125                         for (ExecTask task : submittedTasks) { 
     81        public ExecTask getExecutable(String jobId, String taskId){ 
     82                synchronized (executables)  { 
     83                        for (ExecTask task : executables) { 
    12684                                if (task.getJobId().compareTo(jobId) == 0 && task.getId().compareTo(taskId)==0) { 
    12785                                        return task; 
     
    13189                return null; 
    13290        } 
    133          
    13491 
    13592        @SuppressWarnings("unchecked") 
    136         public List<Task> getReadyTasks(List<JobInterface<?>> wuList) { 
    137                 List<Task> readyTasks = new ArrayList<Task>(); 
     93        public List<Task> getAvailableTasks(List<JobInterface<?>> wuList) { 
     94                List<Task> availableTasks = new ArrayList<Task>(); 
    13895                List<Task> waitingTasks = new ArrayList<Task>(); 
    13996                 
     
    143100                } 
    144101 
    145                 readyTasks.addAll(getPrecedenceConstrainedReadyTasks(waitingTasks)); 
    146                 return readyTasks; 
    147         } 
    148          
    149         public Executable getTaskExecutable(Integer executableId){ 
    150                 synchronized (submittedTasks)  { 
    151                         for (ExecTask task : submittedTasks) { 
    152                                 SubmittedTask subTask = (SubmittedTask) task; 
    153                                 Executable exec = (Executable)subTask.getGridlet(); 
    154                                 if (exec.getGridletID() == executableId) { 
    155                                         return exec; 
    156                                 } 
    157                         } 
    158                 } 
    159                 return null; 
    160         } 
    161          
    162         public List<Executable> getJobExecutables(String jobId){ 
    163                 List<Executable> list = new ArrayList<Executable>(); 
    164                 synchronized (submittedTasks)  { 
    165                         for(int i = 0; i < submittedTasks.size(); i++){ 
    166                                 SubmittedTask subTask = (SubmittedTask) submittedTasks.get(i); 
    167                                 Executable exec = (Executable)subTask.getGridlet(); 
    168                                  
    169                                 if(exec.getJobId().equals(jobId)) 
    170                                         list.add(exec); 
    171                         } 
    172                 } 
    173                 return list; 
    174         } 
    175          
    176         public JobRegistryImpl clone() { 
    177                 JobRegistryImpl jr = null; 
    178                 try { 
    179                         jr = (JobRegistryImpl) super.clone(); 
    180                 } catch (CloneNotSupportedException e) { 
    181                         // TODO Auto-generated catch block 
    182                         e.printStackTrace(); 
    183                 } 
    184  
    185                 return jr; 
    186         } 
    187  
    188         /*public AbstractExecutable getTaskExecutabls(String jobId, String taskId){ 
    189                 List<AbstractExecutable> list = new ArrayList<AbstractExecutable>(); 
    190                 synchronized (submittedTasks)  { 
    191                         for(int i = 0; i < size(); i++){ 
    192                                 SubmittedTask subTask = (SubmittedTask) submittedTasks.get(i); 
    193                                 AbstractExecutable exec = (AbstractExecutable)subTask.getGridlet(); 
    194                                  
    195                                 if(exec.getJobId().equals(jobId) && exec.getId().equals(taskId)) 
    196                                         return exec; 
    197                         } 
    198                 } 
    199                 return null; 
    200         }*/ 
    201          
    202          
    203         public Executable createExecutable(Task task, AllocationInterface allocation) { 
    204  
    205                 String refersTo = allocation.getProcessGroupId(); // null;//allocation.getRefersTo(); 
    206                 if(refersTo == null) 
    207                         refersTo = task.getId(); 
    208                          
    209                 Executable exec = null; 
    210  
    211                 if(refersTo.equals(task.getId())){ 
    212                         exec = new Executable(task); 
    213                 } else { 
    214                         List<AbstractProcesses> processes = task.getProcesses(); 
    215                         if(processes == null) { 
    216                                 try { 
    217                                         log.error("Allocation: " + allocation.getDocument() + "\nrefers to unknown task or processes set." + 
    218                                                         " Set correct value (task id or prcesses set id) for allocation refersTo attribute."); 
    219                                 } catch (Exception e) { 
    220                                         e.printStackTrace(); 
    221                                 } 
    222                         } 
    223                         boolean found = false; 
    224                         for(int j = 0; j < processes.size() && !found; j++){ 
    225                                 AbstractProcesses procesesSet = processes.get(j); 
    226                                 if(refersTo.equals(procesesSet.getId())){ 
    227                                         exec = new Executable(task, procesesSet); 
    228                                         found = true; 
    229                                 } 
    230                         } 
    231                         if(!found){ 
    232                                 log.error("Allocation refers to unknown proceses set."); 
    233                         } 
    234                 } 
    235                  
    236         //      exec.setUserID(task.getSenderId()); 
    237                 exec.setLength(task.getLength()); 
    238                 exec.setReservationId(allocation.getReservationId()); 
    239                          
    240                 /*HostInterface<?> host = allocation.getHost(); 
    241                 ComputingResourceTypeInterface<?> crt = host.getMachineParameters(); 
    242                 if(crt != null){ 
    243                         ComputingResourceTypeItemInterface<?> crti = crt.getComputingResourceTypeItem(0); 
    244                         if(crti != null){ 
    245                                 ParameterPropertyInterface<?> properties[] = crti.getHostParameter().getProperty(); 
    246                                 for(int p = 0; p < properties.length; p++){ 
    247                                         ParameterPropertyInterface<?> property = properties[p]; 
    248                                         if("chosenCPUs".equals(property.getName())){ 
    249                                                 Object cpuNames = property.getValue(); 
    250                                                 exec.addSpecificResource(ResourceParameterName.FREECPUS, cpuNames); 
    251                                         } 
    252                                 } 
    253                         } 
    254                 }*/ 
    255                 return exec; 
    256         } 
    257          
    258  
    259         public List<Executable> createExecutables(Task task) { 
    260  
    261                 List<AbstractProcesses> processes = task.getProcesses(); 
    262                  
    263                 List<Executable> executables = new ArrayList<Executable>(); 
    264  
    265                 if(processes == null || processes.size()==0){ 
    266                         Executable exec = new Executable(task); 
    267                         exec.setUserID(task.getSenderId()); 
    268                         exec.setLength(task.getLength()); 
    269                         executables.add(exec); 
    270                 } else { 
    271  
    272                         boolean found = false; 
    273                         for(int j = 0; j < processes.size() && !found; j++){ 
    274                                 AbstractProcesses procesesSet = processes.get(j); 
    275                                 Executable exec = new Executable(task, procesesSet); 
    276                                 exec.setUserID(task.getSenderId()); 
    277                                 exec.setLength(task.getLength()); 
    278                                 executables.add(exec); 
    279                         } 
    280                 } 
    281  
    282                 return  executables; 
     102                availableTasks.addAll(getPrecedenceConstrainedAvailableTasks(waitingTasks)); 
     103                return availableTasks; 
    283104        } 
    284105 
    285106 
    286         /**************************************/ 
    287         protected static Map<Integer, Map<String, Object>> history = new HashMap<Integer, Map<String,Object>>(); 
    288          
    289         public static Map<Integer, Map<String, Object>> getAllocationHistory(){ 
    290                 return history; 
    291         } 
    292          
    293         public void saveHistory (SubmittedTask submittedTask, int estimatedTime, Map<ResourceUnitName, ResourceUnit> choosenResources){ 
     107        private List<Task> getPrecedenceConstrainedAvailableTasks(List<Task> tasks){ 
    294108                 
    295                 /*submittedTask.setEstimatedDuration(estimatedTime); 
    296  
    297                 DateTime currentTime = new DateTime(); 
    298                 ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime); 
    299                 submittedTask.addUsedResources(resHistItem);*/ 
    300  
    301                 ResourceHistoryItem resHistItem = submittedTask.getUsedResources().getLast(); 
    302                 DateTime currentTime = new DateTime(); 
    303                 Map<String, Object> historyItem = new HashMap<String, Object>(); 
    304                 List<ResourceHistoryItem> list = new ArrayList<ResourceHistoryItem>(1); 
    305                 list.add(resHistItem); 
    306                 historyItem.put(WormsConstants.RESOURCES, list); 
    307                 historyItem.put(WormsConstants.START_TIME, currentTime); 
    308                 currentTime = currentTime.plusSeconds(estimatedTime); 
    309                 historyItem.put(WormsConstants.END_TIME, currentTime); 
    310  
    311                 history.put(Integer.valueOf(submittedTask.getGridletID()), historyItem); 
    312                 /*ProcessingElements pes = (ProcessingElements) choosenResources.get(ResourceParameterName.PROCESSINGELEMENTS); 
    313                 for (ComputingResource resource : pes) { 
    314                         //submittedTask.addToResPath(resource.getName()); 
    315                         submittedTask.visitResource(resource.getName()); 
    316                         ComputingResource parent = resource.getParent(); 
    317                         while (parent != null && !submittedTask.getResPath().contains(parent.getName() + "_")) { 
    318                                 submittedTask.addToResPath(parent.getName()); 
    319                                 parent = parent.getParent(); 
    320                         } 
    321                         while (parent != null && !submittedTask.getVisitedResources().contains(parent.getName() + "_")) { 
    322                                 submittedTask.visitResource(parent.getName()); 
    323                                 parent = parent.getParent(); 
    324                         } 
    325                 }*/ 
    326         } 
    327  
    328         private List<Task> getPrecedenceConstrainedReadyTasks(List<Task> tasks){ 
     109                List<Task> availableTasks = new ArrayList<Task>(); 
     110                int size = tasks.size(); 
    329111                 
    330                 List<Task> readyTasks = new ArrayList<Task>(); 
    331  
    332                 int size = tasks.size(); 
    333112                for(int i = 0; i < size; i++){ 
    334113                        int parCnt; 
    335                         int previousTaskReadyCnt = 0; 
     114                        int previousTaskSucceedCnt = 0; 
    336115                        Task task = tasks.get(i); 
    337116                        if(task.getStatus() != (int)BrokerConstants.TASK_STATUS_UNSUBMITTED) 
     
    341120                        } catch(Exception e){ 
    342121                                parCnt = 0; 
    343                                 //e.printStackTrace(); 
    344122                        } 
    345                         if(parCnt == 0) 
    346                         { 
    347                                 readyTasks.add(task); 
     123                        if(parCnt == 0){ 
     124                                availableTasks.add(task); 
    348125                        } 
    349                         else 
    350                         { 
     126                        else { 
    351127                                for(int j = 0; j < parCnt; j++){ 
    352128                                        ParentType par = task.getDescription().getWorkflow().getParent(j); 
     
    356132                                                } 
    357133                                        } 
    358                                         previousTaskReadyCnt++; 
     134                                        previousTaskSucceedCnt++; 
    359135                                } 
    360136                                 
    361                                 if(previousTaskReadyCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null) 
    362                                         readyTasks.add(task); 
    363                                 else if(previousTaskReadyCnt > 0 && task.getDescription().getWorkflow().getOr() != null) 
    364                                         readyTasks.add(task); 
    365                                 else if (previousTaskReadyCnt == parCnt) 
    366                                         readyTasks.add(task); 
     137                                if(previousTaskSucceedCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null) 
     138                                        availableTasks.add(task); 
     139                                else if(previousTaskSucceedCnt > 0 && task.getDescription().getWorkflow().getOr() != null) 
     140                                        availableTasks.add(task); 
     141                                else if (previousTaskSucceedCnt == parCnt) 
     142                                        availableTasks.add(task); 
    367143                        } 
    368144                }                
    369                 return readyTasks; 
     145                return availableTasks; 
    370146        } 
    371147         
     
    380156                return false; 
    381157        } 
     158 
    382159} 
Note: See TracChangeset for help on using the changeset viewer.