source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/policy/global/GlobalManagementSystem.java @ 1415

Revision 1415, 9.1 KB checked in by wojtekp, 11 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.global;
2
3import java.util.ArrayList;
4import java.util.List;
5
6import org.apache.commons.logging.Log;
7import org.apache.commons.logging.LogFactory;
8import org.exolab.castor.types.Duration;
9import org.joda.time.DateTime;
10import org.joda.time.DateTimeUtilsExt;
11import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
12
13import dcworms.schedframe.scheduling.ExecTask;
14import dcworms.schedframe.scheduling.Executable;
15
16import qcg.shared.constants.BrokerConstants;
17import schedframe.events.scheduling.TaskArrivedEvent;
18import schedframe.events.scheduling.TimerEvent;
19import schedframe.scheduling.TaskListImpl;
20import schedframe.scheduling.WorkloadUnitHandler;
21import schedframe.scheduling.plan.AllocationInterface;
22import schedframe.scheduling.plan.ScheduledTaskInterface;
23import schedframe.scheduling.plan.SchedulingPlanInterface;
24import schedframe.scheduling.plugin.SchedulingPlugin;
25import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
26import schedframe.scheduling.policy.AbstractManagementSystem;
27import schedframe.scheduling.queue.TaskQueueList;
28import schedframe.scheduling.tasks.AbstractProcesses;
29import schedframe.scheduling.tasks.Job;
30import schedframe.scheduling.tasks.JobInterface;
31import schedframe.scheduling.tasks.Task;
32import schedframe.scheduling.tasks.TaskInterface;
33import schedframe.scheduling.tasks.WorkloadUnit;
34import eduni.simjava.Sim_event;
35import gridsim.GridSim;
36import gridsim.GridSimTags;
37import gridsim.IO_data;
38import gridsim.dcworms.DCWormsTags;
39
40public class GlobalManagementSystem extends AbstractManagementSystem {
41
42        private static Log log = LogFactory.getLog(GlobalManagementSystem.class);
43
44        public GlobalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
45                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
46                        throws Exception {
47                super(providerId, entityName,  execTimeEstimationPlugin, queues);
48
49                if(schedPlugin == null){
50                        throw new Exception("Can not create global scheduling plugin instance");
51                }
52               
53                this.schedulingPlugin =  schedPlugin;
54        }
55
56        public void processEvent(Sim_event ev) {
57
58                int tag = ev.get_tag();
59                switch (tag) {
60
61                case DCWormsTags.TIMER:
62                        if (pluginSupportsEvent(DCWormsTags.TIMER)) {
63                                TimerEvent event = new  TimerEvent();
64                                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
65                                                queues,  jobRegistry, getResourceManager(), moduleList);
66                                executeSchedulingPlan(decision);
67                        }
68                        sendTimerEvent();
69                        break;
70                }
71        }
72       
73        public void notifySubmittedWorkloadUnit(WorkloadUnit wu) {
74                if (!pluginSupportsEvent(GridSimTags.GRIDLET_SUBMIT)) {
75                        log.error("Plugin " + schedulingPlugin.getClass()
76                                        + " does not provide support for TASK_ARRIVED event.\n"
77                                        + "Check plugin configuration or use default one.");
78                        return;
79                }
80                registerWorkloadUnit(wu);
81
82        }
83       
84        private void registerWorkloadUnit(WorkloadUnit wu){
85                if(!wu.isRegistered()){
86                        wu.register(jobRegistry);
87                }
88                wu.accept(getWorkloadUnitHandler());
89        }
90       
91
92        protected void schedule(JobInterface<?> job){
93                List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
94                jobsList.add(job);
95                TaskListImpl readyTasks = new TaskListImpl();
96                readyTasks.addAll(jobRegistry.getAvailableTasks(jobsList));
97               
98                schedulingPlugin.placeTasksInQueues(readyTasks, queues, getResourceManager(), moduleList);
99                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(
100                                new TaskArrivedEvent(), queues, jobRegistry,  getResourceManager(), moduleList);
101                if (decision != null)
102                        executeSchedulingPlan(decision);
103        }
104
105        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
106                Executable exec = (Executable) wu;
107
108                long duration = Double.valueOf(exec.getFinishTime() - exec.getExecStartTime()).longValue();
109                log.debug("Executable " + exec.getJobId() + "_" + exec.getId() + "\nstart time:  " +
110                                new java.util.Date(Double.valueOf(exec.getExecStartTime()).longValue() * 1000)
111                + "\nfinish time: " + new java.util.Date(Double.valueOf(exec.getFinishTime()).longValue() * 1000)
112                + "\nduration: " + new Duration(duration * 1000));
113               
114                try {
115                        Job job = jobRegistry.getJob(exec.getJobId());
116                        Task task = job.getTask(exec.getTaskId());
117                        if(exec.getProcessesId() == null){
118                                try {
119                                        task.setStatus(exec.getStatus());
120                                } catch (Exception e) {
121
122                                }
123                        } else {
124                                List<AbstractProcesses> processesList = task.getProcesses();
125                                for(int i = 0; i < processesList.size(); i++){
126                                        AbstractProcesses processes = processesList.get(i);
127                                        if(processes.getId().equals(exec.getProcessesId())){
128                                                processes.setStatus(exec.getStatus());
129                                                break;
130                                        }
131                                }
132                        }
133                       
134                        if(job.isFinished()){
135                                scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job);
136                        }
137                        else {
138                                schedule(job);
139                        }
140                       
141                } catch (Exception e) {
142                        e.printStackTrace();
143                }
144
145        }
146
147        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
148
149                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
150                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
151
152                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
153
154                        //log.info(decision.getDocument());
155
156                        String jobID = taskDecision.getJobId();
157                        String taskID = taskDecision.getTaskId();
158                       
159                        // Task allocations that were rejected because of lack of resources or which were canceled and
160                        // not scheduled again are returned to the user.
161                        if(taskDecision.getStatus() == AllocationStatus.REJECTED){
162                                Job job = jobRegistry.getJob(jobID);
163                                scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job);
164                                continue;
165                        }
166                       
167                        Task task = (Task) jobRegistry.getTaskInfo(jobID, taskID);
168                       
169                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
170                        for (int j = 0; j < allocations.size(); j++) {
171
172                                AllocationInterface<?> allocation = allocations.get(j);
173                                Executable exec = createExecutable(task, allocation);
174                                submitTask(exec, allocation);
175                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
176                        }                                                               
177                }
178        }
179
180        private Executable createExecutable(Task task, AllocationInterface<?> allocation) {
181
182                String refersTo = allocation.getProcessGroupId(); // null;//allocation.getRefersTo();
183                if(refersTo == null)
184                        refersTo = task.getId();
185                       
186                Executable exec = null;
187
188                if(refersTo.equals(task.getId())){
189                        exec = new Executable(task);
190                } else {
191                        List<AbstractProcesses> processes = task.getProcesses();
192                        if(processes == null) {
193                                try {
194                                        log.error("Allocation: " + allocation.getDocument() + "\nrefers to unknown task or processes set." +
195                                                        " Set correct value (task id or prcesses set id) for allocation refersTo attribute.");
196                                } catch (Exception e) {
197                                        e.printStackTrace();
198                                }
199                        }
200                        boolean found = false;
201                        for(int j = 0; j < processes.size() && !found; j++){
202                                AbstractProcesses procesesSet = processes.get(j);
203                                if(refersTo.equals(procesesSet.getId())){
204                                        exec = new Executable(task, procesesSet);
205                                        found = true;
206                                }
207                        }
208                        if(!found){
209                                log.error("Allocation refers to unknown proceses set.");
210                        }
211                }
212
213                exec.setReservationId(allocation.getReservationId());
214                       
215                /*HostInterface<?> host = allocation.getHost();
216                ComputingResourceTypeInterface<?> crt = host.getMachineParameters();
217                if(crt != null){
218                        ComputingResourceTypeItemInterface<?> crti = crt.getComputingResourceTypeItem(0);
219                        if(crti != null){
220                                ParameterPropertyInterface<?> properties[] = crti.getHostParameter().getProperty();
221                                for(int p = 0; p < properties.length; p++){
222                                        ParameterPropertyInterface<?> property = properties[p];
223                                        if("chosenCPUs".equals(property.getName())){
224                                                Object cpuNames = property.getValue();
225                                                exec.addSpecificResource(ResourceParameterName.FREECPUS, cpuNames);
226                                        }
227                                }
228                        }
229                }*/
230                return exec;
231        }
232       
233        protected void submitTask(TaskInterface<?> task, AllocationInterface<?> allocation) {
234
235                String providerName = allocation.getProviderName();
236                if (providerName == null) {
237                        return;
238                }
239                removeFromQueue(task);
240               
241                int resID = GridSim.getEntityId(providerName);
242                IO_data data = new IO_data(task, 0, resID);
243                scheduler.send(scheduler.getOutputPort(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, data); 
244                //scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, job);     
245               
246                if(log.isDebugEnabled())
247                        log.debug("Submitted job " + task.getId() + " to " + providerName);
248
249        }
250
251        class GlobalWorkloadUnitHandler implements WorkloadUnitHandler{
252
253                public void handleJob(JobInterface<?> job){
254                        if (log.isInfoEnabled())
255                                log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
256
257                        jobRegistry.addJob(job);
258                        schedule(job);
259                }
260               
261                public void handleTask(TaskInterface<?> task) {
262                        throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler.");
263                }
264
265                public void handleExecutable(ExecTask task) {
266                        throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler.");
267                }
268        }
269
270        public WorkloadUnitHandler getWorkloadUnitHandler() {
271                return new GlobalWorkloadUnitHandler();
272        }
273
274
275}
Note: See TracBrowser for help on using the repository browser.