source: DCWoRMS/trunk/src/schedframe/scheduling/policy/global/GlobalManagementSystem.java @ 481

Revision 481, 10.2 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.global;
2
3import java.util.ArrayList;
4import java.util.List;
5
6import org.apache.commons.logging.Log;
7import org.apache.commons.logging.LogFactory;
8import org.exolab.castor.types.Duration;
9import org.joda.time.DateTime;
10import org.joda.time.DateTimeUtilsExt;
11import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
12
13import qcg.shared.constants.BrokerConstants;
14import schedframe.events.scheduling.SchedulingEvent;
15import schedframe.events.scheduling.TaskArrivedEvent;
16import schedframe.events.scheduling.TimerEvent;
17import schedframe.scheduling.WorkloadUnitHandler;
18import schedframe.scheduling.TaskListImpl;
19import schedframe.scheduling.plan.AllocationInterface;
20import schedframe.scheduling.plan.ScheduledTaskInterface;
21import schedframe.scheduling.plan.SchedulingPlanInterface;
22import schedframe.scheduling.plugin.SchedulingPlugin;
23import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
24import schedframe.scheduling.policy.AbstractManagementSystem;
25import schedframe.scheduling.queue.TaskQueueList;
26import schedframe.scheduling.tasks.AbstractProcesses;
27import schedframe.scheduling.tasks.Job;
28import schedframe.scheduling.tasks.JobInterface;
29import schedframe.scheduling.tasks.Task;
30import schedframe.scheduling.tasks.TaskInterface;
31import schedframe.scheduling.tasks.WorkloadUnit;
32import eduni.simjava.Sim_event;
33import gridsim.GridSim;
34import gridsim.GridSimTags;
35import gridsim.IO_data;
36import gridsim.gssim.DCWormsTags;
37import gssim.schedframe.scheduling.ExecTask;
38import gssim.schedframe.scheduling.Executable;
39
40public class GlobalManagementSystem extends AbstractManagementSystem {
41
42        private static Log log = LogFactory.getLog(GlobalManagementSystem.class);
43
44        public GlobalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
45                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
46                        throws Exception {
47                super(providerId, entityName,  execTimeEstimationPlugin, queues);
48
49                if(schedPlugin == null){
50                        throw new Exception("Can not create global scheduling plugin instance");
51                }
52               
53                this.schedulingPlugin =  schedPlugin;
54        }
55
56        public void processEvent(Sim_event ev) {
57
58                int tag = ev.get_tag();
59                switch (tag) {
60
61                case DCWormsTags.TIMER:
62                        if (pluginSupportsEvent(DCWormsTags.TIMER)) {
63                                TimerEvent event = new  TimerEvent();
64                                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
65                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
66                                executeSchedulingPlan(decision);
67                        }
68                        sendTimerEvent();
69                        break;
70                }
71        }
72       
73        public void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack) {
74                if (!pluginSupportsEvent(GridSimTags.GRIDLET_SUBMIT)) {
75                        log.error("Plugin " + schedulingPlugin.getClass()
76                                        + " does not provide support for TASK_ARRIVED event.\n"
77                                        + "Check plugin configuration or use default one.");
78                        return;
79                }
80               
81                registerWorkloadUnit(wu);
82                /*Job job = (Job) wu;
83                jobRegistry.addJob(job);
84
85                if (log.isInfoEnabled())
86                        log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
87
88                List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
89                jobsList.add(job);
90                WorkloadUnitList readyWorkloadUnits = new WorkloadUnitList();
91                readyWorkloadUnits.addAll(jobRegistry.getReadyTasks(jobsList));
92                schedulingPlugin.placeJobsInQueues(readyWorkloadUnits, queues, getResourceManager(), moduleList);
93
94                schedule(new TaskArrivedEvent());*/
95
96        }
97       
98        private void registerWorkloadUnit(WorkloadUnit wu){
99                if(!wu.isRegistered()){
100                        wu.register(jobRegistry);
101                }
102                wu.accept(getWorkloadUnitHandler());
103        }
104       
105
106        protected void scheduleAvaialbleTasks(Job job){
107                List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
108                jobsList.add(job);
109                TaskListImpl readyTasks = new TaskListImpl();
110                readyTasks.addAll(jobRegistry.getAvailableTasks(jobsList));
111               
112                schedulingPlugin.placeTasksInQueues(readyTasks, queues, getResourceManager(), moduleList);
113                schedule(new TaskArrivedEvent());
114        }
115       
116        protected void schedule(SchedulingEvent schedulingEvent) {
117
118                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(
119                                schedulingEvent, queues, getJobRegistry(),  getResourceManager(), moduleList);
120                if (decision != null)
121                        executeSchedulingPlan(decision);
122        }
123       
124        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
125                Executable exec = (Executable) wu;
126
127                long duration = Double.valueOf(exec.getFinishTime() - exec.getExecStartTime()).longValue();
128                log.debug("Executable " + exec.getJobId() + "_" + exec.getId() + "\nstart time:  " +
129                                new java.util.Date(Double.valueOf(exec.getExecStartTime()).longValue() * 1000)
130                + "\nfinish time: " + new java.util.Date(Double.valueOf(exec.getFinishTime()).longValue() * 1000)
131                + "\nduration: " + new Duration(duration * 1000));
132               
133                try {
134                        Job job = jobRegistry.getJob(exec.getJobId());
135                        /*Task task = job.getTask(exec.getTaskId());
136                        if(exec.getProcessesId() == null){
137                                try {
138                                        task.setStatus(exec.getStatus());
139                                } catch (Exception e) {
140                                        // TODO Auto-generated catch block
141                                        e.printStackTrace();
142                                }
143                        } else {
144                                List<AbstractProcesses> processesList = task.getProcesses();
145                                for(int i = 0; i < processesList.size(); i++){
146                                        AbstractProcesses processes = processesList.get(i);
147                                        if(processes.getId().equals(exec.getProcessesId())){
148                                                processes.setStatus(exec.getStatus());
149                                                break;
150                                        }
151                                }
152                        }*/
153                       
154                        if(job.isFinished()){
155                                scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job);
156                        }
157                        else {
158                                scheduleAvaialbleTasks(job);
159                                /*List<JobInterface<?>> jobs = new ArrayList<JobInterface<?>>();
160                                jobs.add(jobRegistry.getJobInfo(job.getId()));
161                                WorkloadUnitList readyWorkloadUnits = new WorkloadUnitList();
162                                readyWorkloadUnits.addAll(jobRegistry.getReadyTasks(jobs));
163                                schedulingPlugin.placeJobsInQueues(readyWorkloadUnits, queues,
164                                                getResourceManager(), moduleList);
165                                schedule(new TaskArrivedEvent());*/
166                        }
167                       
168                } catch (Exception e) {
169                        e.printStackTrace();
170                }
171
172        }
173
174        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
175
176                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
177                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
178
179                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
180
181                        //log.info(decision.getDocument());
182
183                        String jobID = taskDecision.getJobId();
184                        String taskID = taskDecision.getTaskId();
185                       
186                        // Task allocations that were rejected because of lack of resources or which were canceled and
187                        // not scheduled again are returned to the user.
188                        if(taskDecision.getStatus() == AllocationStatus.REJECTED){
189                                Job job = jobRegistry.getJob(jobID);
190                                scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job);
191                                continue;
192                        }
193                       
194                        Task task = (Task) jobRegistry.getTaskInfo(jobID, taskID);
195                       
196                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
197                        for (int j = 0; j < allocations.size(); j++) {
198
199                                AllocationInterface<?> allocation = allocations.get(j);
200                                Executable exec = createExecutable(task, allocation);
201                                submitTask(exec, allocation);
202                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
203                        }                                                               
204                }
205        }
206
207        private Executable createExecutable(Task task, AllocationInterface<?> allocation) {
208
209                String refersTo = allocation.getProcessGroupId(); // null;//allocation.getRefersTo();
210                if(refersTo == null)
211                        refersTo = task.getId();
212                       
213                Executable exec = null;
214
215                if(refersTo.equals(task.getId())){
216                        exec = new Executable(task);
217                } else {
218                        List<AbstractProcesses> processes = task.getProcesses();
219                        if(processes == null) {
220                                try {
221                                        log.error("Allocation: " + allocation.getDocument() + "\nrefers to unknown task or processes set." +
222                                                        " Set correct value (task id or prcesses set id) for allocation refersTo attribute.");
223                                } catch (Exception e) {
224                                        e.printStackTrace();
225                                }
226                        }
227                        boolean found = false;
228                        for(int j = 0; j < processes.size() && !found; j++){
229                                AbstractProcesses procesesSet = processes.get(j);
230                                if(refersTo.equals(procesesSet.getId())){
231                                        exec = new Executable(task, procesesSet);
232                                        found = true;
233                                }
234                        }
235                        if(!found){
236                                log.error("Allocation refers to unknown proceses set.");
237                        }
238                }
239
240                exec.setReservationId(allocation.getReservationId());
241                       
242                /*HostInterface<?> host = allocation.getHost();
243                ComputingResourceTypeInterface<?> crt = host.getMachineParameters();
244                if(crt != null){
245                        ComputingResourceTypeItemInterface<?> crti = crt.getComputingResourceTypeItem(0);
246                        if(crti != null){
247                                ParameterPropertyInterface<?> properties[] = crti.getHostParameter().getProperty();
248                                for(int p = 0; p < properties.length; p++){
249                                        ParameterPropertyInterface<?> property = properties[p];
250                                        if("chosenCPUs".equals(property.getName())){
251                                                Object cpuNames = property.getValue();
252                                                exec.addSpecificResource(ResourceParameterName.FREECPUS, cpuNames);
253                                        }
254                                }
255                        }
256                }*/
257                return exec;
258        }
259       
260        protected void submitTask(TaskInterface<?> task, AllocationInterface<?> allocation) {
261
262                String providerName = allocation.getProviderName();
263                if (providerName == null) {
264                        return;
265                }
266                removeFromQueue(task);
267               
268                int resID = GridSim.getEntityId(providerName);
269                IO_data data = new IO_data(task, 0, resID);
270                scheduler.send(scheduler.getOutputPort(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, data); 
271               
272                //scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, job);                     
273                if(log.isDebugEnabled())
274                        log.debug("Submitted job " + task.getId() + " to " + providerName);
275
276        }
277
278        class GlobalWorkloadUnitHandler implements  WorkloadUnitHandler{
279
280                public void handleJob(Job job){
281
282                        if (log.isInfoEnabled())
283                                log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
284
285                        jobRegistry.addJob(job);
286                        scheduleAvaialbleTasks(job);
287                }
288               
289                public void handleTask(TaskInterface<?> task) {
290                        throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler.");
291                }
292
293                public void handleExecutable(ExecTask task) {
294                        throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler.");
295                }
296
297        }
298
299        public WorkloadUnitHandler getWorkloadUnitHandler() {
300                return new GlobalWorkloadUnitHandler();
301        }
302
303
304}
Note: See TracBrowser for help on using the repository browser.