source: DCWoRMS/trunk/src/schedframe/scheduling/policy/global/GlobalManagementSystem.java @ 493

Revision 493, 9.1 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.global;
2
3import java.util.ArrayList;
4import java.util.List;
5
6import org.apache.commons.logging.Log;
7import org.apache.commons.logging.LogFactory;
8import org.exolab.castor.types.Duration;
9import org.joda.time.DateTime;
10import org.joda.time.DateTimeUtilsExt;
11import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
12
13import dcworms.schedframe.scheduling.ExecTask;
14import dcworms.schedframe.scheduling.Executable;
15
16import qcg.shared.constants.BrokerConstants;
17import schedframe.events.scheduling.TaskArrivedEvent;
18import schedframe.events.scheduling.TimerEvent;
19import schedframe.scheduling.TaskListImpl;
20import schedframe.scheduling.WorkloadUnitHandler;
21import schedframe.scheduling.plan.AllocationInterface;
22import schedframe.scheduling.plan.ScheduledTaskInterface;
23import schedframe.scheduling.plan.SchedulingPlanInterface;
24import schedframe.scheduling.plugin.SchedulingPlugin;
25import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
26import schedframe.scheduling.policy.AbstractManagementSystem;
27import schedframe.scheduling.queue.TaskQueueList;
28import schedframe.scheduling.tasks.AbstractProcesses;
29import schedframe.scheduling.tasks.Job;
30import schedframe.scheduling.tasks.JobInterface;
31import schedframe.scheduling.tasks.Task;
32import schedframe.scheduling.tasks.TaskInterface;
33import schedframe.scheduling.tasks.WorkloadUnit;
34import eduni.simjava.Sim_event;
35import gridsim.GridSim;
36import gridsim.GridSimTags;
37import gridsim.IO_data;
38import gridsim.dcworms.DCWormsTags;
39
40public class GlobalManagementSystem extends AbstractManagementSystem {
41
42        private static Log log = LogFactory.getLog(GlobalManagementSystem.class);
43
44        public GlobalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
45                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
46                        throws Exception {
47                super(providerId, entityName,  execTimeEstimationPlugin, queues);
48
49                if(schedPlugin == null){
50                        throw new Exception("Can not create global scheduling plugin instance");
51                }
52               
53                this.schedulingPlugin =  schedPlugin;
54        }
55
56        public void processEvent(Sim_event ev) {
57
58                int tag = ev.get_tag();
59                switch (tag) {
60
61                case DCWormsTags.TIMER:
62                        if (pluginSupportsEvent(DCWormsTags.TIMER)) {
63                                TimerEvent event = new  TimerEvent();
64                                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
65                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
66                                executeSchedulingPlan(decision);
67                        }
68                        sendTimerEvent();
69                        break;
70                }
71        }
72       
73        public void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack) {
74                if (!pluginSupportsEvent(GridSimTags.GRIDLET_SUBMIT)) {
75                        log.error("Plugin " + schedulingPlugin.getClass()
76                                        + " does not provide support for TASK_ARRIVED event.\n"
77                                        + "Check plugin configuration or use default one.");
78                        return;
79                }
80               
81                registerWorkloadUnit(wu);
82
83        }
84       
85        private void registerWorkloadUnit(WorkloadUnit wu){
86                if(!wu.isRegistered()){
87                        wu.register(jobRegistry);
88                }
89                wu.accept(getWorkloadUnitHandler());
90        }
91       
92
93        protected void schedule(JobInterface<?> job){
94                List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
95                jobsList.add(job);
96                TaskListImpl readyTasks = new TaskListImpl();
97                readyTasks.addAll(jobRegistry.getAvailableTasks(jobsList));
98               
99                schedulingPlugin.placeTasksInQueues(readyTasks, queues, getResourceManager(), moduleList);
100                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(
101                                new TaskArrivedEvent(), queues, getJobRegistry(),  getResourceManager(), moduleList);
102                if (decision != null)
103                        executeSchedulingPlan(decision);
104        }
105
106        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
107                Executable exec = (Executable) wu;
108
109                long duration = Double.valueOf(exec.getFinishTime() - exec.getExecStartTime()).longValue();
110                log.debug("Executable " + exec.getJobId() + "_" + exec.getId() + "\nstart time:  " +
111                                new java.util.Date(Double.valueOf(exec.getExecStartTime()).longValue() * 1000)
112                + "\nfinish time: " + new java.util.Date(Double.valueOf(exec.getFinishTime()).longValue() * 1000)
113                + "\nduration: " + new Duration(duration * 1000));
114               
115                try {
116                        Job job = jobRegistry.getJob(exec.getJobId());
117                        Task task = job.getTask(exec.getTaskId());
118                        if(exec.getProcessesId() == null){
119                                try {
120                                        task.setStatus(exec.getStatus());
121                                } catch (Exception e) {
122
123                                }
124                        } else {
125                                List<AbstractProcesses> processesList = task.getProcesses();
126                                for(int i = 0; i < processesList.size(); i++){
127                                        AbstractProcesses processes = processesList.get(i);
128                                        if(processes.getId().equals(exec.getProcessesId())){
129                                                processes.setStatus(exec.getStatus());
130                                                break;
131                                        }
132                                }
133                        }
134                       
135                        if(job.isFinished()){
136                                scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job);
137                        }
138                        else {
139                                schedule(job);
140                        }
141                       
142                } catch (Exception e) {
143                        e.printStackTrace();
144                }
145
146        }
147
148        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
149
150                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
151                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
152
153                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
154
155                        //log.info(decision.getDocument());
156
157                        String jobID = taskDecision.getJobId();
158                        String taskID = taskDecision.getTaskId();
159                       
160                        // Task allocations that were rejected because of lack of resources or which were canceled and
161                        // not scheduled again are returned to the user.
162                        if(taskDecision.getStatus() == AllocationStatus.REJECTED){
163                                Job job = jobRegistry.getJob(jobID);
164                                scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job);
165                                continue;
166                        }
167                       
168                        Task task = (Task) jobRegistry.getTaskInfo(jobID, taskID);
169                       
170                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
171                        for (int j = 0; j < allocations.size(); j++) {
172
173                                AllocationInterface<?> allocation = allocations.get(j);
174                                Executable exec = createExecutable(task, allocation);
175                                submitTask(exec, allocation);
176                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
177                        }                                                               
178                }
179        }
180
181        private Executable createExecutable(Task task, AllocationInterface<?> allocation) {
182
183                String refersTo = allocation.getProcessGroupId(); // null;//allocation.getRefersTo();
184                if(refersTo == null)
185                        refersTo = task.getId();
186                       
187                Executable exec = null;
188
189                if(refersTo.equals(task.getId())){
190                        exec = new Executable(task);
191                } else {
192                        List<AbstractProcesses> processes = task.getProcesses();
193                        if(processes == null) {
194                                try {
195                                        log.error("Allocation: " + allocation.getDocument() + "\nrefers to unknown task or processes set." +
196                                                        " Set correct value (task id or prcesses set id) for allocation refersTo attribute.");
197                                } catch (Exception e) {
198                                        e.printStackTrace();
199                                }
200                        }
201                        boolean found = false;
202                        for(int j = 0; j < processes.size() && !found; j++){
203                                AbstractProcesses procesesSet = processes.get(j);
204                                if(refersTo.equals(procesesSet.getId())){
205                                        exec = new Executable(task, procesesSet);
206                                        found = true;
207                                }
208                        }
209                        if(!found){
210                                log.error("Allocation refers to unknown proceses set.");
211                        }
212                }
213
214                exec.setReservationId(allocation.getReservationId());
215                       
216                /*HostInterface<?> host = allocation.getHost();
217                ComputingResourceTypeInterface<?> crt = host.getMachineParameters();
218                if(crt != null){
219                        ComputingResourceTypeItemInterface<?> crti = crt.getComputingResourceTypeItem(0);
220                        if(crti != null){
221                                ParameterPropertyInterface<?> properties[] = crti.getHostParameter().getProperty();
222                                for(int p = 0; p < properties.length; p++){
223                                        ParameterPropertyInterface<?> property = properties[p];
224                                        if("chosenCPUs".equals(property.getName())){
225                                                Object cpuNames = property.getValue();
226                                                exec.addSpecificResource(ResourceParameterName.FREECPUS, cpuNames);
227                                        }
228                                }
229                        }
230                }*/
231                return exec;
232        }
233       
234        protected void submitTask(TaskInterface<?> task, AllocationInterface<?> allocation) {
235
236                String providerName = allocation.getProviderName();
237                if (providerName == null) {
238                        return;
239                }
240                removeFromQueue(task);
241               
242                int resID = GridSim.getEntityId(providerName);
243                IO_data data = new IO_data(task, 0, resID);
244                scheduler.send(scheduler.getOutputPort(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, data); 
245                //scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, job);     
246               
247                if(log.isDebugEnabled())
248                        log.debug("Submitted job " + task.getId() + " to " + providerName);
249
250        }
251
252        class GlobalWorkloadUnitHandler implements  WorkloadUnitHandler{
253
254                public void handleJob(JobInterface<?> job){
255                        if (log.isInfoEnabled())
256                                log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
257
258                        jobRegistry.addJob(job);
259                        schedule(job);
260                }
261               
262                public void handleTask(TaskInterface<?> task) {
263                        throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler.");
264                }
265
266                public void handleExecutable(ExecTask task) {
267                        throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler.");
268                }
269        }
270
271        public WorkloadUnitHandler getWorkloadUnitHandler() {
272                return new GlobalWorkloadUnitHandler();
273        }
274
275
276}
Note: See TracBrowser for help on using the repository browser.