source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/policy/global/GlobalManagementSystem.java @ 1440

Revision 1440, 8.8 KB checked in by wojtekp, 11 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.global;
2
3import java.util.ArrayList;
4import java.util.List;
5
6import org.apache.commons.logging.Log;
7import org.apache.commons.logging.LogFactory;
8import org.exolab.castor.types.Duration;
9import org.joda.time.DateTime;
10import org.joda.time.DateTimeUtilsExt;
11import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
12
13import dcworms.schedframe.scheduling.ExecTask;
14import dcworms.schedframe.scheduling.Executable;
15
16import qcg.shared.constants.BrokerConstants;
17import schedframe.events.scheduling.TaskArrivedEvent;
18import schedframe.events.scheduling.TimerEvent;
19import schedframe.scheduling.TaskListImpl;
20import schedframe.scheduling.WorkloadUnitHandler;
21import schedframe.scheduling.plan.AllocationInterface;
22import schedframe.scheduling.plan.ScheduledTaskInterface;
23import schedframe.scheduling.plan.SchedulingPlanInterface;
24import schedframe.scheduling.plugin.SchedulingPlugin;
25import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
26import schedframe.scheduling.policy.AbstractManagementSystem;
27import schedframe.scheduling.queue.TaskQueueList;
28import schedframe.scheduling.tasks.AbstractProcesses;
29import schedframe.scheduling.tasks.Job;
30import schedframe.scheduling.tasks.JobInterface;
31import schedframe.scheduling.tasks.Task;
32import schedframe.scheduling.tasks.TaskInterface;
33import schedframe.scheduling.tasks.WorkloadUnit;
34import eduni.simjava.Sim_event;
35import gridsim.GridSim;
36import gridsim.GridSimTags;
37import gridsim.IO_data;
38import gridsim.dcworms.DCWormsTags;
39
40public class GlobalManagementSystem extends AbstractManagementSystem {
41
42        private static Log log = LogFactory.getLog(GlobalManagementSystem.class);
43
44        public GlobalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
45                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
46                        throws Exception {
47                super(providerId, entityName,  execTimeEstimationPlugin, queues);
48
49                if(schedPlugin == null){
50                        throw new Exception("Can not create global scheduling plugin instance");
51                }
52               
53                this.schedulingPlugin =  schedPlugin;
54        }
55
56        public void processEvent(Sim_event ev) {
57
58                int tag = ev.get_tag();
59                switch (tag) {
60
61                case DCWormsTags.TIMER:
62                        TimerEvent event = new  TimerEvent();
63                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
64                                        queues,  jobRegistry, getResourceManager(), moduleList);
65                        executeSchedulingPlan(decision);
66
67                        sendTimerEvent();
68                        break;
69                }
70        }
71       
72        public void notifySubmittedWorkloadUnit(WorkloadUnit wu) {
73                registerWorkloadUnit(wu);
74        }
75       
76        private void registerWorkloadUnit(WorkloadUnit wu){
77                if(!wu.isRegistered()){
78                        wu.register(jobRegistry);
79                }
80                wu.accept(getWorkloadUnitHandler());
81        }
82       
83
84        protected void schedule(JobInterface<?> job){
85                List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
86                jobsList.add(job);
87                TaskListImpl readyTasks = new TaskListImpl();
88                readyTasks.addAll(jobRegistry.getAvailableTasks(jobsList));
89               
90                schedulingPlugin.placeTasksInQueues(readyTasks, queues, getResourceManager(), moduleList);
91                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(
92                                new TaskArrivedEvent(), queues, jobRegistry,  getResourceManager(), moduleList);
93                if (decision != null)
94                        executeSchedulingPlan(decision);
95        }
96
97        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
98                Executable exec = (Executable) wu;
99
100                long duration = Double.valueOf(exec.getFinishTime() - exec.getExecStartTime()).longValue();
101                log.debug("Executable " + exec.getJobId() + "_" + exec.getId() + "\nstart time:  " +
102                                new java.util.Date(Double.valueOf(exec.getExecStartTime()).longValue() * 1000)
103                + "\nfinish time: " + new java.util.Date(Double.valueOf(exec.getFinishTime()).longValue() * 1000)
104                + "\nduration: " + new Duration(duration * 1000));
105               
106                try {
107                        Job job = jobRegistry.getJob(exec.getJobId());
108                        Task task = job.getTask(exec.getTaskId());
109                        if(exec.getProcessesId() == null){
110                                try {
111                                        task.setStatus(exec.getStatus());
112                                } catch (Exception e) {
113
114                                }
115                        } else {
116                                List<AbstractProcesses> processesList = task.getProcesses();
117                                for(int i = 0; i < processesList.size(); i++){
118                                        AbstractProcesses processes = processesList.get(i);
119                                        if(processes.getId().equals(exec.getProcessesId())){
120                                                processes.setStatus(exec.getStatus());
121                                                break;
122                                        }
123                                }
124                        }
125                       
126                        if(job.isFinished()){
127                                scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job);
128                        }
129                        else {
130                                schedule(job);
131                        }
132                       
133                } catch (Exception e) {
134                        e.printStackTrace();
135                }
136
137        }
138
139        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
140
141                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
142                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
143
144                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
145
146                        //log.info(decision.getDocument());
147
148                        String jobID = taskDecision.getJobId();
149                        String taskID = taskDecision.getTaskId();
150                       
151                        // Task allocations that were rejected because of lack of resources or which were canceled and
152                        // not scheduled again are returned to the user.
153                        if(taskDecision.getStatus() == AllocationStatus.REJECTED){
154                                Job job = jobRegistry.getJob(jobID);
155                                scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job);
156                                continue;
157                        }
158                       
159                        Task task = (Task) jobRegistry.getTaskInfo(jobID, taskID);
160                       
161                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
162                        for (int j = 0; j < allocations.size(); j++) {
163
164                                AllocationInterface<?> allocation = allocations.get(j);
165                                Executable exec = createExecutable(task, allocation);
166                                submitTask(exec, allocation);
167                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
168                        }                                                               
169                }
170        }
171
172        private Executable createExecutable(Task task, AllocationInterface<?> allocation) {
173
174                String refersTo = allocation.getProcessGroupId(); // null;//allocation.getRefersTo();
175                if(refersTo == null)
176                        refersTo = task.getId();
177                       
178                Executable exec = null;
179
180                if(refersTo.equals(task.getId())){
181                        exec = new Executable(task);
182                } else {
183                        List<AbstractProcesses> processes = task.getProcesses();
184                        if(processes == null) {
185                                try {
186                                        log.error("Allocation: " + allocation.getDocument() + "\nrefers to unknown task or processes set." +
187                                                        " Set correct value (task id or prcesses set id) for allocation refersTo attribute.");
188                                } catch (Exception e) {
189                                        e.printStackTrace();
190                                }
191                        }
192                        boolean found = false;
193                        for(int j = 0; j < processes.size() && !found; j++){
194                                AbstractProcesses procesesSet = processes.get(j);
195                                if(refersTo.equals(procesesSet.getId())){
196                                        exec = new Executable(task, procesesSet);
197                                        found = true;
198                                }
199                        }
200                        if(!found){
201                                log.error("Allocation refers to unknown proceses set.");
202                        }
203                }
204
205                exec.setReservationId(allocation.getReservationId());
206                       
207                /*HostInterface<?> host = allocation.getHost();
208                ComputingResourceTypeInterface<?> crt = host.getMachineParameters();
209                if(crt != null){
210                        ComputingResourceTypeItemInterface<?> crti = crt.getComputingResourceTypeItem(0);
211                        if(crti != null){
212                                ParameterPropertyInterface<?> properties[] = crti.getHostParameter().getProperty();
213                                for(int p = 0; p < properties.length; p++){
214                                        ParameterPropertyInterface<?> property = properties[p];
215                                        if("chosenCPUs".equals(property.getName())){
216                                                Object cpuNames = property.getValue();
217                                                exec.addSpecificResource(ResourceParameterName.FREECPUS, cpuNames);
218                                        }
219                                }
220                        }
221                }*/
222                return exec;
223        }
224       
225        protected void submitTask(TaskInterface<?> task, AllocationInterface<?> allocation) {
226
227                String providerName = allocation.getProviderName();
228                if (providerName == null) {
229                        return;
230                }
231                removeFromQueue(task);
232               
233                int resID = GridSim.getEntityId(providerName);
234                IO_data data = new IO_data(task, 0, resID);
235                scheduler.send(scheduler.getOutputPort(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, data); 
236                //scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, job);     
237               
238                if(log.isDebugEnabled())
239                        log.debug("Submitted job " + task.getId() + " to " + providerName);
240
241        }
242
243        class GlobalWorkloadUnitHandler implements WorkloadUnitHandler{
244
245                public void handleJob(JobInterface<?> job){
246                        if (log.isInfoEnabled())
247                                log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
248
249                        jobRegistry.addJob(job);
250                        schedule(job);
251                }
252               
253                public void handleTask(TaskInterface<?> task) {
254                        throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler.");
255                }
256
257                public void handleExecutable(ExecTask task) {
258                        throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler.");
259                }
260        }
261
262        public WorkloadUnitHandler getWorkloadUnitHandler() {
263                return new GlobalWorkloadUnitHandler();
264        }
265
266
267}
Note: See TracBrowser for help on using the repository browser.