source: xssim/branches/tpiontek/src/test/rewolucja/scheduling/implementation/GlobalManagementSystem.java @ 104

Revision 104, 8.8 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package test.rewolucja.scheduling.implementation;
2
3
4
5import java.util.ArrayList;
6import java.util.List;
7import java.util.Map;
8
9import org.apache.commons.logging.Log;
10import org.apache.commons.logging.LogFactory;
11import org.exolab.castor.types.Duration;
12import org.joda.time.DateTime;
13import org.joda.time.DateTimeUtilsExt;
14import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
15
16import schedframe.scheduling.AbstractProcesses;
17import schedframe.scheduling.Job;
18import schedframe.scheduling.JobInterface;
19import schedframe.scheduling.Task;
20import schedframe.scheduling.TaskInterface;
21import schedframe.scheduling.events.SchedulingEventReason;
22import schedframe.scheduling.events.SchedulingEventType;
23import schedframe.scheduling.events.TaskArrivedEvent;
24import schedframe.scheduling.events.TaskCanceledEvent;
25import schedframe.scheduling.events.TimerEvent;
26import schedframe.scheduling.plugin.SchedulingPluginConfiguration;
27import schedframe.scheduling.plugin.estimation.ExecTimeEstimationPlugin;
28import schedframe.scheduling.plugin.grid.GlobalSchedulingPlugin;
29import simulator.utils.InstanceFactory;
30import test.rewolucja.GSSIMJobInterface;
31import test.rewolucja.resources.description.ExecResourceDescription;
32import test.rewolucja.scheduling.plan.AllocationInterfaceNew;
33import test.rewolucja.scheduling.plan.ScheduledTaskInterfaceNew;
34import test.rewolucja.scheduling.plan.SchedulingPlanInterfaceNew;
35import test.rewolucja.task.JobList;
36import eduni.simjava.Sim_event;
37import gridsim.GridSim;
38import gridsim.GridSimTags;
39import gridsim.IO_data;
40import gridsim.gssim.GssimTags;
41import grms.shared.constants.BrokerConstants;
42import gssim.schedframe.scheduling.ExecTaskInterface;
43import gssim.schedframe.scheduling.Executable;
44
45public class GlobalManagementSystem extends ManagementSystem {
46
47        private static Log log = LogFactory.getLog(GlobalManagementSystem.class);
48
49        public GlobalManagementSystem(String providerId, String entityName, String schedulingPluginClassName,
50                        ExecTimeEstimationPlugin execTimeEstimationPlugin, ExecResourceDescription resourceDescription)
51                        throws Exception {
52                super(providerId, entityName, schedulingPluginClassName, execTimeEstimationPlugin, resourceDescription);
53
54                schedulingPlugin = (GlobalSchedulingPlugin) InstanceFactory.createInstance(
55                                schedulingPluginClassName,
56                                GlobalSchedulingPlugin.class);
57                if(schedulingPlugin == null){
58                        throw new Exception("Can not create grid scheduling plugin instance");
59                }
60        }
61
62        public void processEvent(Sim_event ev) {
63
64                int tag = ev.get_tag();
65                switch (tag) {
66
67                case GssimTags.TIMER:
68                        if (pluginSupportsEvent(GssimTags.TIMER)) {
69                                TimerEvent event = new  TimerEvent();
70                                SchedulingPlanInterfaceNew decision =  schedulingPlugin.schedule(event,
71                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
72                                executeSchedulingPlan(decision);
73                        }
74                        sendTimerEvent();
75
76                        break;
77                }
78        }
79       
80        public void notifySubmittedJob(GSSIMJobInterface<?> gssimJob, boolean ack) {
81                if (!pluginSupportsEvent(GridSimTags.GRIDLET_SUBMIT)) {
82                        log.error("Plugin " + schedulingPlugin.getClass()
83                                        + " does not provide support for TASK_ARRIVED event.\n"
84                                        + "Check plugin configuration or use default one.");
85                        return;
86                }
87                Job job = (Job) gssimJob;
88                jobRegistry.addJob(job);
89
90                if (log.isInfoEnabled())
91                        log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
92
93                List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
94                jobsList.add(job);
95                JobList tasks = new JobList();
96                tasks.addAll(jobRegistry.getReadyTasks(jobsList));
97                schedulingPlugin.placeJobsInQueues(tasks, queues, getResourceManager(), moduleList);
98
99                schedule(new TaskArrivedEvent());
100
101        }
102
103        public void notifyReturnedJob(GSSIMJobInterface<?> gssimJob) {
104                Executable exec = (Executable) gssimJob;
105
106                long duration = Double.valueOf(exec.getFinishTime() - exec.getExecStartTime()).longValue();
107                log.debug("Executable " + exec.getJobId() + "_" + exec.getId() + "\nstart time:  " +
108                                new java.util.Date(Double.valueOf(exec.getExecStartTime()).longValue() * 1000)
109                + "\nfinish time: " + new java.util.Date(Double.valueOf(exec.getFinishTime()).longValue() * 1000)
110                + "\nduration: " + new Duration(duration * 1000));
111               
112                try {
113                        Job job = jobRegistry.get(exec.getJobId());
114                        Task task = job.getTask(exec.getTaskId());
115                        if(exec.getProcessesId() == null){
116                                try {
117                                        task.setStatus(exec.getStatus());
118                                } catch (Exception e) {
119                                        // TODO Auto-generated catch block
120                                        e.printStackTrace();
121                                }
122                        } else {
123                                List<AbstractProcesses> processesList = task.getProcesses();
124                                for(int i = 0; i < processesList.size(); i++){
125                                        AbstractProcesses processes = processesList.get(i);
126                                        if(processes.getId().equals(exec.getProcessesId())){
127                                                processes.setStatus(exec.getStatus());
128                                                break;
129                                        }
130                                }
131                        }
132                       
133                        if(job.isFinished()){
134                                logicalResource.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job);
135                        }
136                        else {
137                                //prepareTaskQueue(new ArrayList<Job>(), job.getId());
138                                List<JobInterface<?>> jobs = new ArrayList<JobInterface<?>>();
139                                jobs.add(jobRegistry.getJobInfo(job.getId()));
140                                JobList tasks = new JobList();
141                                tasks.addAll(jobRegistry.getReadyTasks(jobs));
142                                schedulingPlugin.placeJobsInQueues(tasks, queues,
143                                                getResourceManager(), moduleList);
144                                schedule(new TaskArrivedEvent());
145                        }
146                       
147                } catch (NoSuchFieldException e) {
148                        e.printStackTrace();
149                }
150
151        }
152
153        public void notifyCanceledJob(GSSIMJobInterface<?> job){;
154
155                Executable task = (Executable) job;
156                String jobID = task.getJobId();
157                String taskID = task.getId();
158               
159                if(log.isDebugEnabled())
160                        log.debug("Received canceled job" + jobID + "_" + taskID);
161               
162                TaskInterface<?> tii = jobRegistry.getTaskInfo(jobID, taskID) ;
163                try {
164
165                        tii.setStatus((int)BrokerConstants.JOB_STATUS_CANCELED);
166                       
167                        TaskCanceledEvent event = new TaskCanceledEvent(jobID, taskID);
168                        event.setReason(SchedulingEventReason.RESERVATION_EXCEEDED);
169                        schedule(event);
170                       
171                } catch (Exception e) {
172                        log.error("Exception during scheduling. " + e.getMessage());
173                        e.printStackTrace();
174                }
175        }
176       
177        protected void executeSchedulingPlan(SchedulingPlanInterfaceNew decision) {
178
179                ArrayList<ScheduledTaskInterfaceNew> taskSchedulingDecisions = decision.getTasks();
180                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
181
182                        try {
183                                ScheduledTaskInterfaceNew taskDecision = taskSchedulingDecisions.get(i);
184       
185                                //log.info(decision.getDocument());
186       
187                                String jobID = taskDecision.getJobId();
188                                String taskID = taskDecision.getTaskId();
189                               
190                                // Task allocations that were rejected because of lack of resources or which were canceled and
191                                // not scheduled again are returned to the user.
192                                if(taskDecision.getStatus() == AllocationStatus.REJECTED){
193                                        Job job = jobRegistry.get(jobID);
194                                        logicalResource.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job);
195                                        continue;
196                                }
197                               
198                                ArrayList<AllocationInterfaceNew> allocations = taskDecision.getAllocations();
199
200                                Task task = (Task) jobRegistry.getTaskInfo(jobID, taskID);
201                                for (int j = 0; j < allocations.size(); j++) {
202
203                                        AllocationInterfaceNew allocation = allocations.get(j);
204                                        Executable exec = jobRegistry.createExecutable(task, allocation);
205                                        submitJob(exec, allocation);
206                                }                                                               
207
208
209                        }catch (Exception e){
210                                e.printStackTrace();
211                        }
212                }
213        }
214
215        protected void submitJob(GSSIMJobInterface<?> job, AllocationInterfaceNew allocation) {
216
217                String providerName = allocation.getProviderName();
218                if (providerName == null) {
219                        return;
220                }
221                ExecTaskInterface task = (ExecTaskInterface) job;
222                removeFromQueue(task);
223               
224                int resID = GridSim.getEntityId(providerName);
225                IO_data data = new IO_data(job, 0, resID);
226                logicalResource.send(logicalResource.getOutputPort(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, data);     
227               
228                //logicalResource.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, job);                       
229                if(log.isDebugEnabled())
230                        try {
231                                log.debug("Submitted job " + job.getId() + " to " + providerName);
232                        } catch (NoSuchFieldException e) {
233                                // TODO Auto-generated catch block
234                                e.printStackTrace();
235                        }
236        }
237
238
239        public boolean pluginSupportsEvent(int eventType) {
240                SchedulingPluginConfiguration config = (SchedulingPluginConfiguration) schedulingPlugin.getConfiguration();
241                if (config == null)
242                        return false;
243
244                Map<SchedulingEventType, Object> servedEvent = config.getServedEvents();
245                if (servedEvent == null)
246                        return false;
247
248                switch (eventType) {
249
250                case GssimTags.TIMER:
251                        return servedEvent.containsKey(SchedulingEventType.TIMER);
252                case GssimTags.GRIDLET_SUBMIT:
253                        return servedEvent.containsKey(SchedulingEventType.TASK_ARRIVED);
254                case GssimTags.GRIDLET_CANCEL:
255                        return servedEvent.containsKey(SchedulingEventType.TASK_CANCELED);
256                case GssimTags.GRIDLET_RESUME:
257                        return servedEvent.containsKey(SchedulingEventType.TASK_ARRIVED);
258
259                default:
260                        return false;
261                }
262        }
263
264       
265}
Note: See TracBrowser for help on using the repository browser.