source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/policy/global/GlobalManagementSystem.java @ 1575

Revision 1575, 8.8 KB checked in by wojtekp, 9 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.global;
2
3import java.util.ArrayList;
4import java.util.List;
5
6import org.apache.commons.logging.Log;
7import org.apache.commons.logging.LogFactory;
8import org.exolab.castor.types.Duration;
9import org.joda.time.DateTime;
10import org.joda.time.DateTimeUtilsExt;
11import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
12
13import dcworms.schedframe.scheduling.ExecTask;
14import dcworms.schedframe.scheduling.Executable;
15
16import qcg.shared.constants.BrokerConstants;
17import schedframe.events.scheduling.TaskArrivedEvent;
18import schedframe.events.scheduling.TimerEvent;
19import schedframe.scheduling.TaskListImpl;
20import schedframe.scheduling.WorkloadUnitHandler;
21import schedframe.scheduling.plan.AllocationInterface;
22import schedframe.scheduling.plan.ScheduledTaskInterface;
23import schedframe.scheduling.plan.SchedulingPlanInterface;
24import schedframe.scheduling.plugin.SchedulingPlugin;
25import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
26import schedframe.scheduling.policy.AbstractManagementSystem;
27import schedframe.scheduling.queue.TaskQueueList;
28import schedframe.scheduling.tasks.AbstractProcesses;
29import schedframe.scheduling.tasks.Job;
30import schedframe.scheduling.tasks.JobInterface;
31import schedframe.scheduling.tasks.Task;
32import schedframe.scheduling.tasks.TaskInterface;
33import schedframe.scheduling.tasks.WorkloadUnit;
34import eduni.simjava.Sim_event;
35import gridsim.GridSim;
36import gridsim.GridSimTags;
37import gridsim.IO_data;
38import gridsim.dcworms.DCWormsTags;
39
40public class GlobalManagementSystem extends AbstractManagementSystem {
41
42        private static Log log = LogFactory.getLog(GlobalManagementSystem.class);
43
44        public GlobalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
45                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
46                        throws Exception {
47                super(providerId, entityName,  execTimeEstimationPlugin, queues);
48
49                if(schedPlugin == null){
50                        throw new Exception("Can not create global scheduling plugin instance");
51                }
52               
53                this.schedulingPlugin =  schedPlugin;
54        }
55
56        public void processEvent(Sim_event ev) {
57
58                int tag = ev.get_tag();
59                switch (tag) {
60
61                case DCWormsTags.TIMER:
62                        TimerEvent event = new  TimerEvent();
63                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
64                                        queues,  jobRegistry, getResourceManager(), moduleList);
65                        executeSchedulingPlan(decision);
66
67                        sendTimerEvent();
68                        break;
69                }
70        }
71       
72        public void notifySubmittedWorkloadUnit(WorkloadUnit wu) {
73                registerWorkloadUnit(wu);
74        }
75       
76        private void registerWorkloadUnit(WorkloadUnit wu){
77                if(!wu.isRegistered()){
78                        wu.register(jobRegistry);
79                }
80                wu.accept(getWorkloadUnitHandler());
81        }
82       
83
84        protected void schedule(JobInterface<?> job){
85                List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
86                jobsList.add(job);
87                TaskListImpl readyTasks = new TaskListImpl();
88                readyTasks.addAll(jobRegistry.getAvailableTasks(jobsList));
89               
90                schedulingPlugin.placeTasksInQueues(readyTasks, queues, getResourceManager(), moduleList);
91                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(
92                                new TaskArrivedEvent(), queues, jobRegistry,  getResourceManager(), moduleList);
93                if (decision != null)
94                        executeSchedulingPlan(decision);
95        }
96
97        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
98                Executable exec = (Executable) wu;
99
100                long duration = Double.valueOf(exec.getFinishTime() - exec.getExecStartTime()).longValue();
101                log.debug("Executable " + exec.getJobId() + "_" + exec.getId() + "\nstart time:  " +
102                                new java.util.Date(Double.valueOf(exec.getExecStartTime()).longValue() * 1000)
103                + "\nfinish time: " + new java.util.Date(Double.valueOf(exec.getFinishTime()).longValue() * 1000)
104                + "\nduration: " + new Duration(duration * 1000));
105               
106                try {
107                        Job job = jobRegistry.getJob(exec.getJobId());
108                        Task task = job.getTask(exec.getTaskId());
109                        if(exec.getProcessesId() == null){
110                                try {
111                                        task.setStatus(exec.getStatus());
112                                } catch (Exception e) {
113
114                                }
115                        } else {
116                                List<AbstractProcesses> processesList = task.getProcesses();
117                                for(int i = 0; i < processesList.size(); i++){
118                                        AbstractProcesses processes = processesList.get(i);
119                                        if(processes.getId().equals(exec.getProcessesId())){
120                                                processes.setStatus(exec.getStatus());
121                                                break;
122                                        }
123                                }
124                        }
125                       
126                        if(job.isFinished()){
127                                scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job);
128                        }
129                        else {
130                                schedule(job);
131                        }
132                       
133                } catch (Exception e) {
134                        e.printStackTrace();
135                }
136        }
137
138        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
139
140                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
141                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
142
143                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
144
145                        //log.info(decision.getDocument());
146
147                        String jobID = taskDecision.getJobId();
148                        String taskID = taskDecision.getTaskId();
149                       
150                        // Task allocations that were rejected because of lack of resources or which were canceled and
151                        // not scheduled again are returned to the user.
152                        if(taskDecision.getStatus() == AllocationStatus.REJECTED){
153                                Job job = jobRegistry.getJob(jobID);
154                                scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job);
155                                continue;
156                        }
157                       
158                        Task task = (Task) jobRegistry.getTaskInfo(jobID, taskID);
159                       
160                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
161                        for (int j = 0; j < allocations.size(); j++) {
162
163                                AllocationInterface<?> allocation = allocations.get(j);
164                                Executable exec = createExecutable(task, allocation);
165                                submitTask(exec, allocation);
166                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
167                        }                                                               
168                }
169        }
170
171        private Executable createExecutable(Task task, AllocationInterface<?> allocation) {
172
173                String refersTo = allocation.getProcessGroupId(); // null;//allocation.getRefersTo();
174                if(refersTo == null)
175                        refersTo = task.getId();
176                       
177                Executable exec = null;
178
179                if(refersTo.equals(task.getId())){
180                        exec = new Executable(task);
181                } else {
182                        List<AbstractProcesses> processes = task.getProcesses();
183                        if(processes == null) {
184                                try {
185                                        log.error("Allocation: " + allocation.getDocument() + "\nrefers to unknown task or processes set." +
186                                                        " Set correct value (task id or prcesses set id) for allocation refersTo attribute.");
187                                } catch (Exception e) {
188                                        e.printStackTrace();
189                                }
190                        }
191                        boolean found = false;
192                        for(int j = 0; j < processes.size() && !found; j++){
193                                AbstractProcesses procesesSet = processes.get(j);
194                                if(refersTo.equals(procesesSet.getId())){
195                                        exec = new Executable(task, procesesSet);
196                                        found = true;
197                                }
198                        }
199                        if(!found){
200                                log.error("Allocation refers to unknown proceses set.");
201                        }
202                }
203
204                exec.setReservationId(allocation.getReservationId());
205                       
206                /*HostInterface<?> host = allocation.getHost();
207                ComputingResourceTypeInterface<?> crt = host.getMachineParameters();
208                if(crt != null){
209                        ComputingResourceTypeItemInterface<?> crti = crt.getComputingResourceTypeItem(0);
210                        if(crti != null){
211                                ParameterPropertyInterface<?> properties[] = crti.getHostParameter().getProperty();
212                                for(int p = 0; p < properties.length; p++){
213                                        ParameterPropertyInterface<?> property = properties[p];
214                                        if("chosenCPUs".equals(property.getName())){
215                                                Object cpuNames = property.getValue();
216                                                exec.addSpecificResource(ResourceParameterName.FREECPUS, cpuNames);
217                                        }
218                                }
219                        }
220                }*/
221                return exec;
222        }
223       
224        protected void submitTask(TaskInterface<?> task, AllocationInterface<?> allocation) {
225
226                String providerName = allocation.getProviderName();
227                if (providerName == null) {
228                        return;
229                }
230                removeFromQueue(task);
231               
232                int resID = GridSim.getEntityId(providerName);
233                IO_data data = new IO_data(task, 0, resID);
234                scheduler.send(scheduler.getOutputPort(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, data); 
235                //scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, job);     
236               
237                if(log.isDebugEnabled())
238                        log.debug("Submitted job " + task.getId() + " to " + providerName);
239
240        }
241
242        class GlobalWorkloadUnitHandler implements WorkloadUnitHandler{
243
244                public void handleJob(JobInterface<?> job){
245                        if (log.isInfoEnabled())
246                                log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
247
248                        jobRegistry.addJob(job);
249                        schedule(job);
250                }
251               
252                public void handleTask(TaskInterface<?> task) {
253                        throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler.");
254                }
255
256                public void handleExecutable(ExecTask task) {
257                        throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler.");
258                }
259        }
260
261        public WorkloadUnitHandler getWorkloadUnitHandler() {
262                return new GlobalWorkloadUnitHandler();
263        }
264
265
266}
Note: See TracBrowser for help on using the repository browser.