source: DCWoRMS/trunk/build/classes/schedframe/scheduling/manager/tasks/JobRegistryImpl.java @ 477

Revision 477, 11.5 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.manager.tasks;
2
3import gridsim.Gridlet;
4import gssim.schedframe.scheduling.ExecTask;
5import gssim.schedframe.scheduling.Executable;
6
7import java.util.ArrayList;
8import java.util.Collections;
9import java.util.HashMap;
10import java.util.List;
11import java.util.Map;
12
13import org.apache.commons.lang.ArrayUtils;
14import org.apache.commons.logging.Log;
15import org.apache.commons.logging.LogFactory;
16import org.joda.time.DateTime;
17import org.qcg.broker.schemas.resreqs.ParentType;
18import org.qcg.broker.schemas.resreqs.types.TaskStatesName;
19
20import qcg.shared.constants.BrokerConstants;
21import schedframe.resources.units.ResourceUnit;
22import schedframe.resources.units.ResourceUnitName;
23import schedframe.scheduling.ResourceHistoryItem;
24import schedframe.scheduling.plan.AllocationInterface;
25import schedframe.scheduling.tasks.AbstractProcesses;
26import schedframe.scheduling.tasks.JobInterface;
27import schedframe.scheduling.tasks.SubmittedTask;
28import schedframe.scheduling.tasks.Task;
29import simulator.WormsConstants;
30
31public class JobRegistryImpl extends AbstractJobRegistry {
32       
33        private static final long serialVersionUID = 8030555906990767342L;
34
35        private static Log log = LogFactory.getLog(JobRegistryImpl.class);
36
37        private String context;
38
39        //TO DO - change data structure
40        protected static final List<ExecTask> submittedTasks = Collections.synchronizedList(new ArrayList<ExecTask>());;
41        //protected static final List<ExecTaskInterface> submittedTasks = new CopyOnWriteArrayList<ExecTaskInterface>();
42
43        public JobRegistryImpl(String context_) {
44                context = context_;
45        }
46
47        /*protected void setContext(String context_) {
48                context = context_;
49        }*/
50
51        public boolean addTask(ExecTask newTask) {
52                if(getSubmittedTask(newTask.getJobId(), newTask.getId()) == null)
53                {
54                        synchronized (submittedTasks)  {
55                                submittedTasks.add(newTask);
56                        }
57                        return true;
58                }
59                return false;
60        }
61
62        public List<ExecTask> getTasks(int status) {
63                List<ExecTask> taskList = new ArrayList<ExecTask>();
64                synchronized (submittedTasks)  {
65                        for (ExecTask task: submittedTasks) {
66                                if (task.getStatus() == status) {
67                                        //SubmittedTask subTask = (SubmittedTask) task;
68                                        List<String> visitedResource = task.getVisitedResources();
69                                        if(ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), context)){
70                                                taskList.add(task);
71                                        }
72                                        /*if(subTask.getVisitedResources().contains(context)){
73                                                taskList.add(subTask);
74                                        }*/
75                                }
76                        }
77                }
78                return taskList;
79        }
80
81        public List<ExecTask> getQueuedTasks() {
82                return getTasks(Gridlet.QUEUED);
83        }
84       
85        public List<ExecTask> getRunningTasks() {
86                return getTasks(Gridlet.INEXEC);
87        }
88       
89        public List<ExecTask> getReadyTasks() {
90                return getTasks(Gridlet.READY);
91        }
92       
93        public List<ExecTask> getFinishedTasks() {
94                return getTasks(Gridlet.SUCCESS);
95        }
96       
97       
98        public List<ExecTask> getAllSubmittedTasks() {
99                List<ExecTask> taskList;
100                synchronized (submittedTasks)  {
101                        taskList = new ArrayList<ExecTask>(submittedTasks);
102                }
103                return taskList;
104        }
105
106        public List<SubmittedTask> getSubmittedTasks() {
107                List<SubmittedTask> taskList = new ArrayList<SubmittedTask>();
108                synchronized (submittedTasks)  {
109                        for (ExecTask task : submittedTasks) {
110                                SubmittedTask subTask = (SubmittedTask) task;
111                                List<String> visitedResource = subTask.getVisitedResources();
112                                if(ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), context)){
113                                        taskList.add(subTask);
114                                }
115                                /*if(subTask.getVisitedResources().contains(context)){
116                                        taskList.add(subTask);
117                                }*/
118                        }
119                }
120                return taskList;
121        }
122       
123        public ExecTask getSubmittedTask(String jobId, String taskId){
124                synchronized (submittedTasks)  {
125                        for (ExecTask task : submittedTasks) {
126                                if (task.getJobId().compareTo(jobId) == 0 && task.getId().compareTo(taskId)==0) {
127                                        return task;
128                                }
129                        }
130                }
131                return null;
132        }
133       
134
135        @SuppressWarnings("unchecked")
136        public List<Task> getReadyTasks(List<JobInterface<?>> wuList) {
137                List<Task> readyTasks = new ArrayList<Task>();
138                List<Task> waitingTasks = new ArrayList<Task>();
139               
140                for(int i = 0; i < wuList.size(); i++){
141                        JobInterface<?> wu  = (JobInterface<?>)wuList.get(i);
142                        waitingTasks.addAll((List<Task>)wu.getTask());
143                }
144
145                readyTasks.addAll(getPrecedenceConstrainedReadyTasks(waitingTasks));
146                return readyTasks;
147        }
148       
149        public Executable getTaskExecutable(Integer executableId){
150                synchronized (submittedTasks)  {
151                        for (ExecTask task : submittedTasks) {
152                                SubmittedTask subTask = (SubmittedTask) task;
153                                Executable exec = (Executable)subTask.getGridlet();
154                                if (exec.getGridletID() == executableId) {
155                                        return exec;
156                                }
157                        }
158                }
159                return null;
160        }
161       
162        public List<Executable> getJobExecutables(String jobId){
163                List<Executable> list = new ArrayList<Executable>();
164                synchronized (submittedTasks)  {
165                        for(int i = 0; i < submittedTasks.size(); i++){
166                                SubmittedTask subTask = (SubmittedTask) submittedTasks.get(i);
167                                Executable exec = (Executable)subTask.getGridlet();
168                               
169                                if(exec.getJobId().equals(jobId))
170                                        list.add(exec);
171                        }
172                }
173                return list;
174        }
175       
176        public JobRegistryImpl clone() {
177                JobRegistryImpl jr = null;
178                try {
179                        jr = (JobRegistryImpl) super.clone();
180                } catch (CloneNotSupportedException e) {
181                        // TODO Auto-generated catch block
182                        e.printStackTrace();
183                }
184
185                return jr;
186        }
187
188        /*public AbstractExecutable getTaskExecutabls(String jobId, String taskId){
189                List<AbstractExecutable> list = new ArrayList<AbstractExecutable>();
190                synchronized (submittedTasks)  {
191                        for(int i = 0; i < size(); i++){
192                                SubmittedTask subTask = (SubmittedTask) submittedTasks.get(i);
193                                AbstractExecutable exec = (AbstractExecutable)subTask.getGridlet();
194                               
195                                if(exec.getJobId().equals(jobId) && exec.getId().equals(taskId))
196                                        return exec;
197                        }
198                }
199                return null;
200        }*/
201       
202       
203        public Executable createExecutable(Task task, AllocationInterface allocation) {
204
205                String refersTo = allocation.getProcessGroupId(); // null;//allocation.getRefersTo();
206                if(refersTo == null)
207                        refersTo = task.getId();
208                       
209                Executable exec = null;
210
211                if(refersTo.equals(task.getId())){
212                        exec = new Executable(task);
213                } else {
214                        List<AbstractProcesses> processes = task.getProcesses();
215                        if(processes == null) {
216                                try {
217                                        log.error("Allocation: " + allocation.getDocument() + "\nrefers to unknown task or processes set." +
218                                                        " Set correct value (task id or prcesses set id) for allocation refersTo attribute.");
219                                } catch (Exception e) {
220                                        e.printStackTrace();
221                                }
222                        }
223                        boolean found = false;
224                        for(int j = 0; j < processes.size() && !found; j++){
225                                AbstractProcesses procesesSet = processes.get(j);
226                                if(refersTo.equals(procesesSet.getId())){
227                                        exec = new Executable(task, procesesSet);
228                                        found = true;
229                                }
230                        }
231                        if(!found){
232                                log.error("Allocation refers to unknown proceses set.");
233                        }
234                }
235               
236        //      exec.setUserID(task.getSenderId());
237                exec.setLength(task.getLength());
238                exec.setReservationId(allocation.getReservationId());
239                       
240                /*HostInterface<?> host = allocation.getHost();
241                ComputingResourceTypeInterface<?> crt = host.getMachineParameters();
242                if(crt != null){
243                        ComputingResourceTypeItemInterface<?> crti = crt.getComputingResourceTypeItem(0);
244                        if(crti != null){
245                                ParameterPropertyInterface<?> properties[] = crti.getHostParameter().getProperty();
246                                for(int p = 0; p < properties.length; p++){
247                                        ParameterPropertyInterface<?> property = properties[p];
248                                        if("chosenCPUs".equals(property.getName())){
249                                                Object cpuNames = property.getValue();
250                                                exec.addSpecificResource(ResourceParameterName.FREECPUS, cpuNames);
251                                        }
252                                }
253                        }
254                }*/
255                return exec;
256        }
257       
258
259        public List<Executable> createExecutables(Task task) {
260
261                List<AbstractProcesses> processes = task.getProcesses();
262               
263                List<Executable> executables = new ArrayList<Executable>();
264
265                if(processes == null || processes.size()==0){
266                        Executable exec = new Executable(task);
267                        exec.setUserID(task.getSenderId());
268                        exec.setLength(task.getLength());
269                        executables.add(exec);
270                } else {
271
272                        boolean found = false;
273                        for(int j = 0; j < processes.size() && !found; j++){
274                                AbstractProcesses procesesSet = processes.get(j);
275                                Executable exec = new Executable(task, procesesSet);
276                                exec.setUserID(task.getSenderId());
277                                exec.setLength(task.getLength());
278                                executables.add(exec);
279                        }
280                }
281
282                return  executables;
283        }
284
285
286        /**************************************/
287        protected static Map<Integer, Map<String, Object>> history = new HashMap<Integer, Map<String,Object>>();
288       
289        public static Map<Integer, Map<String, Object>> getAllocationHistory(){
290                return history;
291        }
292       
293        public void saveHistory (SubmittedTask submittedTask, int estimatedTime, Map<ResourceUnitName, ResourceUnit> choosenResources){
294               
295                /*submittedTask.setEstimatedDuration(estimatedTime);
296
297                DateTime currentTime = new DateTime();
298                ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime);
299                submittedTask.addUsedResources(resHistItem);*/
300
301                ResourceHistoryItem resHistItem = submittedTask.getUsedResources().getLast();
302                DateTime currentTime = new DateTime();
303                Map<String, Object> historyItem = new HashMap<String, Object>();
304                List<ResourceHistoryItem> list = new ArrayList<ResourceHistoryItem>(1);
305                list.add(resHistItem);
306                historyItem.put(WormsConstants.RESOURCES, list);
307                historyItem.put(WormsConstants.START_TIME, currentTime);
308                currentTime = currentTime.plusSeconds(estimatedTime);
309                historyItem.put(WormsConstants.END_TIME, currentTime);
310
311                history.put(Integer.valueOf(submittedTask.getGridletID()), historyItem);
312                /*ProcessingElements pes = (ProcessingElements) choosenResources.get(ResourceParameterName.PROCESSINGELEMENTS);
313                for (ComputingResource resource : pes) {
314                        //submittedTask.addToResPath(resource.getName());
315                        submittedTask.visitResource(resource.getName());
316                        ComputingResource parent = resource.getParent();
317                        while (parent != null && !submittedTask.getResPath().contains(parent.getName() + "_")) {
318                                submittedTask.addToResPath(parent.getName());
319                                parent = parent.getParent();
320                        }
321                        while (parent != null && !submittedTask.getVisitedResources().contains(parent.getName() + "_")) {
322                                submittedTask.visitResource(parent.getName());
323                                parent = parent.getParent();
324                        }
325                }*/
326        }
327
328        private List<Task> getPrecedenceConstrainedReadyTasks(List<Task> tasks){
329               
330                List<Task> readyTasks = new ArrayList<Task>();
331
332                int size = tasks.size();
333                for(int i = 0; i < size; i++){
334                        int parCnt;
335                        int previousTaskReadyCnt = 0;
336                        Task task = tasks.get(i);
337                        if(task.getStatus() != (int)BrokerConstants.TASK_STATUS_UNSUBMITTED)
338                                continue;
339                        try{           
340                                parCnt = task.getDescription().getWorkflow().getParentCount();
341                        } catch(Exception e){
342                                parCnt = 0;
343                                //e.printStackTrace();
344                        }
345                        if(parCnt == 0)
346                        {
347                                readyTasks.add(task);
348                        }
349                        else
350                        {
351                                for(int j = 0; j < parCnt; j++){
352                                        ParentType par = task.getDescription().getWorkflow().getParent(j);
353                                        if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
354                                                if(!checkTaskCompletion(task.getJobId(), par.getContent())){
355                                                        break;
356                                                }
357                                        }
358                                        previousTaskReadyCnt++;
359                                }
360                               
361                                if(previousTaskReadyCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null)
362                                        readyTasks.add(task);
363                                else if(previousTaskReadyCnt > 0 && task.getDescription().getWorkflow().getOr() != null)
364                                        readyTasks.add(task);
365                                else if (previousTaskReadyCnt == parCnt)
366                                        readyTasks.add(task);
367                        }
368                }               
369                return readyTasks;
370        }
371       
372        private boolean checkTaskCompletion(String jobID, String taskID){
373                JobInterface<?> job = getJobInfo(jobID);
374                try {
375                        if(job.getTask(taskID).isFinished())
376                                return true;
377                } catch (NoSuchFieldException e) {
378                        //e.printStackTrace();
379                }
380                return false;
381        }
382}
Note: See TracBrowser for help on using the repository browser.