source: DCWoRMS/trunk/src/schedframe/scheduling/policy/global/GlobalManagementSystem.java @ 478

Revision 478, 9.9 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.global;
2
3import java.util.ArrayList;
4import java.util.List;
5
6import org.apache.commons.logging.Log;
7import org.apache.commons.logging.LogFactory;
8import org.exolab.castor.types.Duration;
9import org.joda.time.DateTime;
10import org.joda.time.DateTimeUtilsExt;
11import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
12
13import qcg.shared.constants.BrokerConstants;
14
15import schedframe.events.scheduling.EventReason;
16import schedframe.events.scheduling.SchedulingEvent;
17import schedframe.events.scheduling.TaskArrivedEvent;
18import schedframe.events.scheduling.TaskCanceledEvent;
19import schedframe.events.scheduling.TimerEvent;
20import schedframe.scheduling.Scheduler;
21import schedframe.scheduling.WorkloadUnitHandler;
22import schedframe.scheduling.WorkloadUnitListImpl;
23import schedframe.scheduling.plan.AllocationInterface;
24import schedframe.scheduling.plan.ScheduledTaskInterface;
25import schedframe.scheduling.plan.SchedulingPlanInterface;
26import schedframe.scheduling.plugin.SchedulingPlugin;
27import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
28import schedframe.scheduling.policy.AbstractManagementSystem;
29import schedframe.scheduling.queue.TaskQueueList;
30import schedframe.scheduling.tasks.AbstractProcesses;
31import schedframe.scheduling.tasks.Job;
32import schedframe.scheduling.tasks.JobInterface;
33import schedframe.scheduling.tasks.SubmittedTask;
34import schedframe.scheduling.tasks.Task;
35import schedframe.scheduling.tasks.TaskInterface;
36import schedframe.scheduling.tasks.WorkloadUnit;
37
38import eduni.simjava.Sim_event;
39import gridsim.GridSim;
40import gridsim.GridSimTags;
41import gridsim.Gridlet;
42import gridsim.IO_data;
43import gridsim.gssim.WormsTags;
44import gssim.schedframe.scheduling.ExecTask;
45import gssim.schedframe.scheduling.Executable;
46
47public class GlobalManagementSystem extends AbstractManagementSystem {
48
49        private static Log log = LogFactory.getLog(GlobalManagementSystem.class);
50
51        public GlobalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
52                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
53                        throws Exception {
54                super(providerId, entityName,  execTimeEstimationPlugin, queues);
55
56                /*schedulingPlugin = (GlobalSchedulingPlugin) InstanceFactory.createInstance(
57                                schedulingPluginClassName,
58                                GlobalSchedulingPlugin.class);*/
59                if(schedPlugin == null){
60                        throw new Exception("Can not create global scheduling plugin instance");
61                }
62               
63                this.schedulingPlugin =  schedPlugin;
64        }
65
66        public void processEvent(Sim_event ev) {
67
68                int tag = ev.get_tag();
69                switch (tag) {
70
71                case WormsTags.TIMER:
72                        if (pluginSupportsEvent(WormsTags.TIMER)) {
73                                TimerEvent event = new  TimerEvent();
74                                SchedulingPlanInterface decision =  schedulingPlugin.schedule(event,
75                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
76                                executeSchedulingPlan(decision);
77                        }
78                        sendTimerEvent();
79                        break;
80                }
81        }
82       
83        public void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack) {
84                if (!pluginSupportsEvent(GridSimTags.GRIDLET_SUBMIT)) {
85                        log.error("Plugin " + schedulingPlugin.getClass()
86                                        + " does not provide support for TASK_ARRIVED event.\n"
87                                        + "Check plugin configuration or use default one.");
88                        return;
89                }
90               
91                registerWorkloadUnit(wu);
92                /*Job job = (Job) wu;
93                jobRegistry.addJob(job);
94
95                if (log.isInfoEnabled())
96                        log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
97
98                List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
99                jobsList.add(job);
100                WorkloadUnitList readyWorkloadUnits = new WorkloadUnitList();
101                readyWorkloadUnits.addAll(jobRegistry.getReadyTasks(jobsList));
102                schedulingPlugin.placeJobsInQueues(readyWorkloadUnits, queues, getResourceManager(), moduleList);
103
104                schedule(new TaskArrivedEvent());*/
105
106        }
107       
108        private void registerWorkloadUnit(WorkloadUnit wu){
109                if(!wu.isRegistered()){
110                        wu.register(jobRegistry);
111                }
112                wu.accept(getWorkloadUnitHandler());
113        }
114       
115
116        protected void scheduleReadyTasks(Job job){
117                List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
118                jobsList.add(job);
119                WorkloadUnitListImpl readyWorkloadUnits = new WorkloadUnitListImpl();
120                readyWorkloadUnits.addAll(jobRegistry.getReadyTasks(jobsList));
121                schedulingPlugin.placeJobsInQueues(readyWorkloadUnits, queues, getResourceManager(), moduleList);
122
123                schedule(new TaskArrivedEvent());
124        }
125       
126        protected void schedule(SchedulingEvent schedulingEvent) {
127
128                try {
129                        SchedulingPlanInterface decision = schedulingPlugin.schedule(
130                                        schedulingEvent, queues, getJobRegistry(),  getResourceManager(), moduleList);
131                        if (decision == null)
132                                return;
133
134                        executeSchedulingPlan(decision);
135
136                } catch (Exception e) {
137                        e.printStackTrace();
138                }
139        }
140       
141        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
142                Executable exec = (Executable) wu;
143
144                long duration = Double.valueOf(exec.getFinishTime() - exec.getExecStartTime()).longValue();
145                log.debug("Executable " + exec.getJobId() + "_" + exec.getId() + "\nstart time:  " +
146                                new java.util.Date(Double.valueOf(exec.getExecStartTime()).longValue() * 1000)
147                + "\nfinish time: " + new java.util.Date(Double.valueOf(exec.getFinishTime()).longValue() * 1000)
148                + "\nduration: " + new Duration(duration * 1000));
149               
150                try {
151                        Job job = jobRegistry.get(exec.getJobId());
152                        /*Task task = job.getTask(exec.getTaskId());
153                        if(exec.getProcessesId() == null){
154                                try {
155                                        task.setStatus(exec.getStatus());
156                                } catch (Exception e) {
157                                        // TODO Auto-generated catch block
158                                        e.printStackTrace();
159                                }
160                        } else {
161                                List<AbstractProcesses> processesList = task.getProcesses();
162                                for(int i = 0; i < processesList.size(); i++){
163                                        AbstractProcesses processes = processesList.get(i);
164                                        if(processes.getId().equals(exec.getProcessesId())){
165                                                processes.setStatus(exec.getStatus());
166                                                break;
167                                        }
168                                }
169                        }*/
170                       
171                        if(job.isFinished()){
172                                scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job);
173                        }
174                        else {
175                                scheduleReadyTasks(job);
176                                /*List<JobInterface<?>> jobs = new ArrayList<JobInterface<?>>();
177                                jobs.add(jobRegistry.getJobInfo(job.getId()));
178                                WorkloadUnitList readyWorkloadUnits = new WorkloadUnitList();
179                                readyWorkloadUnits.addAll(jobRegistry.getReadyTasks(jobs));
180                                schedulingPlugin.placeJobsInQueues(readyWorkloadUnits, queues,
181                                                getResourceManager(), moduleList);
182                                schedule(new TaskArrivedEvent());*/
183                        }
184                       
185                } catch (Exception e) {
186                        e.printStackTrace();
187                }
188
189        }
190
191        public void notifyCanceledWorkloadUnit(WorkloadUnit wu){;
192
193                Executable task = (Executable) wu;
194                String jobID = task.getJobId();
195                String taskID = task.getId();
196               
197                if(log.isDebugEnabled())
198                        log.debug("Received canceled job" + jobID + "_" + taskID);
199               
200                TaskInterface<?> ti = jobRegistry.getTaskInfo(jobID, taskID) ;
201                try {
202
203                        ti.setStatus((int)BrokerConstants.JOB_STATUS_CANCELED);
204                       
205                        TaskCanceledEvent event = new TaskCanceledEvent(jobID, taskID);
206                        event.setReason(EventReason.RESERVATION_EXCEEDED);
207                        schedule(event);
208                       
209                } catch (Exception e) {
210                        log.error("Exception during scheduling. " + e.getMessage());
211                        e.printStackTrace();
212                }
213        }
214       
215        protected void executeSchedulingPlan(SchedulingPlanInterface decision) {
216
217                ArrayList<ScheduledTaskInterface> taskSchedulingDecisions = decision.getTasks();
218                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
219
220                        try {
221                                ScheduledTaskInterface taskDecision = taskSchedulingDecisions.get(i);
222       
223                                //log.info(decision.getDocument());
224       
225                                String jobID = taskDecision.getJobId();
226                                String taskID = taskDecision.getTaskId();
227                               
228                                // Task allocations that were rejected because of lack of resources or which were canceled and
229                                // not scheduled again are returned to the user.
230                                if(taskDecision.getStatus() == AllocationStatus.REJECTED){
231                                        Job job = jobRegistry.get(jobID);
232                                        scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job);
233                                        continue;
234                                }
235                               
236                                ArrayList<AllocationInterface> allocations = taskDecision.getAllocations();
237
238                                Task task = (Task) jobRegistry.getTaskInfo(jobID, taskID);
239                                for (int j = 0; j < allocations.size(); j++) {
240
241                                        AllocationInterface allocation = allocations.get(j);
242                                        Executable exec = jobRegistry.createExecutable(task, allocation);
243                                        submitWorkloadUnit(exec, allocation);
244                                        task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
245                                }                                                               
246
247                        }catch (Exception e){
248                                e.printStackTrace();
249                        }
250                }
251        }
252
253        protected void submitWorkloadUnit(WorkloadUnit job, AllocationInterface allocation) {
254
255                String providerName = allocation.getProviderName();
256                if (providerName == null) {
257                        return;
258                }
259                TaskInterface<?> task = (TaskInterface<?>) job;
260                removeFromQueue(task);
261               
262                int resID = GridSim.getEntityId(providerName);
263                IO_data data = new IO_data(job, 0, resID);
264                scheduler.send(scheduler.getOutputPort(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, data); 
265               
266                //scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, job);                     
267                if(log.isDebugEnabled())
268                        log.debug("Submitted job " + job.getId() + " to " + providerName);
269
270        }
271
272        class GlobalWorkloadUnitHandler implements  WorkloadUnitHandler{
273
274                public void handleJob(Job job){
275
276                        jobRegistry.addJob(job);
277                        if (log.isInfoEnabled())
278                                log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
279
280                        scheduleReadyTasks(job);
281                }
282               
283                public void handleTask(TaskInterface<?> task) {
284                        throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler.");
285                }
286
287                public void handleExecutable(ExecTask task) {
288                        throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler.");
289                }
290
291                public void handleSubmittedTask(SubmittedTask task) {
292                        throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler.");
293                }
294        }
295
296        public WorkloadUnitHandler getWorkloadUnitHandler() {
297                return new GlobalWorkloadUnitHandler();
298        }
299       
300
301
302}
Note: See TracBrowser for help on using the repository browser.