source: xssim/trunk/src/simulator/GridBroker.java @ 104

Revision 104, 15.6 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package simulator;
2
3import eduni.simjava.Sim_event;
4import gridsim.GridSimTags;
5import gridsim.IO_data;
6import gridsim.gssim.GssimConstants;
7import gridsim.net.Link;
8import grms.shared.constants.BrokerConstants;
9import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
10import gssim.schedframe.scheduling.AbstractExecutable;
11import gssim.schedframe.scheduling.Executable;
12import gssim.schedframe.scheduling.plugin.grid.GridReservationManager;
13import gssim.schedframe.scheduling.plugin.grid.GssimJobRegistry;
14import gssim.schedframe.scheduling.queues.JobQueue;
15import gssim.schedframe.scheduling.queues.TaskQueue;
16import gssim.schedframe.scheduling.utils.JobDescription;
17import gssim.schedframe.scheduling.utils.TaskDescription;
18
19import java.util.ArrayList;
20import java.util.List;
21
22import org.apache.commons.logging.Log;
23import org.apache.commons.logging.LogFactory;
24import org.exolab.castor.types.Duration;
25import org.joda.time.DateTime;
26
27import schedframe.scheduling.AbstractProcesses;
28import schedframe.scheduling.Job;
29import schedframe.scheduling.Task;
30import schedframe.scheduling.TaskInterface;
31import schedframe.scheduling.events.SchedulingEventReason;
32import schedframe.scheduling.events.TaskArrivedEvent;
33import schedframe.scheduling.events.TaskCanceledEvent;
34import schedframe.scheduling.events.TimerEvent;
35import schedframe.scheduling.plan.AllocationInterface;
36import schedframe.scheduling.plan.ComputingResourceTypeInterface;
37import schedframe.scheduling.plan.ComputingResourceTypeItemInterface;
38import schedframe.scheduling.plan.HostInterface;
39import schedframe.scheduling.plan.ParameterPropertyInterface;
40import schedframe.scheduling.plan.ScheduledTaskInterface;
41import schedframe.scheduling.plan.SchedulingPlanInterface;
42import schedframe.scheduling.plan.impl.Host;
43import schedframe.scheduling.plugin.grid.ModuleList;
44import schedframe.scheduling.utils.ResourceParameterName;
45import simulator.lists.ExecutablesList;
46import simulator.utils.XsltTransformations;
47
48/**
49 * This class implements the functionality of a grid scheduler interface for the
50 * task and host descriptions used in PSNC (that is: QcgJobDescriptionSchema.xsd and HostParamSchema.xsd)
51 *
52 * This class implements also the functionality of the reservation manager
53 * used across multiple calls to {@link #scheduleJob(JobGridletList, String, String, String, ResourceDiscovery, String, String, String, String, String)}
54 * @author Stanislaw Szczepanowski
55 *
56 */
57public class GridBroker extends AbstractGridBroker {
58
59       
60//      private static Log log = LogFactory.getLog(GridBroker.class);
61       
62        /** The persistent job registry object */
63        protected GssimJobRegistry jobRegistry;
64
65        /** Unfinished jobs */
66        protected JobQueue unfinishedJobs;
67       
68        protected TaskQueue unfinishedTasks;
69       
70        protected XsltTransformations xsltTransformer;
71
72        protected ExecutablesList executables;
73       
74        /**
75         * Creates an interface for a given plug-in.
76         *
77         * @param name the name of this entity
78         * @param options the options object of the simulator
79         * @param expectedJobNumber the number of expected jobs, that will be sent to the scheduler (for optimization purposes)
80         * @param expectedTaskNumber the number of expected tasks, that will be sent to the scheduler (for optimization purposes)
81         * @throws Exception if any error occurs
82         */
83        public GridBroker(String name, ConfigurationOptions options, int expectedJobNumber, int expectedTaskNumber) throws Exception {
84                super(name, options.gridSchedulingPluginName);
85                set_stat(GssimConstants.getBrokersStatisticsObject());
86               
87//              this.allTasks = new HashMap<String, Map<String, Task>>(expectedJobNumber);
88//              this.jobsMap = new HashMap<String, Job>(expectedJobNumber);
89               
90                this.jobRegistry = new GssimJobRegistry();
91
92                this.unfinishedJobs = new JobQueue();
93                this.unfinishedTasks = new TaskQueue();
94               
95                this.moduleList.add(new GridReservationManager(this));
96                this.xsltTransformer = new XsltTransformations();
97                this.executables = new ExecutablesList();
98        }
99       
100        /**
101         * Creates an interface for a given plug-in.
102         *
103         * @param name the name of this entity
104         * @param link the connection between entity and router
105         * @param options the options object of the simulator
106         * @param expectedJobNumber the number of expected jobs, that will be sent to the scheduler (for optimization purposes)
107         * @param expectedTaskNumber the number of expected tasks, that will be sent to the scheduler (for optimization purposes)
108         * @throws Exception if any error occurs
109         */
110        public GridBroker(String name, Link link, ConfigurationOptions options, int expectedJobNumber, int expectedTaskNumber) throws Exception {
111                super(name, link, options.gridSchedulingPluginName);
112
113                set_stat(GssimConstants.getBrokersStatisticsObject());
114               
115//              this.allTasks = new HashMap<String, Map<String, Task>>(expectedJobNumber);
116//              this.jobsMap = new HashMap<String, Job>(expectedJobNumber);
117               
118                this.jobRegistry = new GssimJobRegistry();
119
120                this.unfinishedJobs = new JobQueue();
121                this.unfinishedTasks = new TaskQueue();
122               
123                this.moduleList.add(new GridReservationManager(this));
124                this.xsltTransformer = new XsltTransformations();
125                this.executables = new ExecutablesList();
126        }
127       
128        public void scheduleCyclic(){
129                SchedulingPlanInterface<?> decision = null;
130               
131                try {
132                       
133                        decision = gridSchedulerPlugin.schedule(new TimerEvent(), unfinishedJobs,
134                                                                                                        unfinishedTasks, jobRegistry,
135                                                                                                        moduleList, null);
136                        if (decision == null)
137                                return;
138                       
139                        log.info(decision.getDocument());
140                       
141                        execute(decision);
142                       
143                } catch (Exception e) {
144                        log.error("Exception during scheduling. " + e.getMessage());
145                        e.printStackTrace();
146                }
147        }
148       
149        /**
150         * @see GridSchedulerInterface#scheduleJob
151         */
152        public void scheduleJob(List<?> jobsList,
153                        String userPreferences,
154                        String reservationRequest,
155                        ModuleList moduleList,
156                        String reservationOffers,
157                        String predictedTimes,
158                        String jobSet,
159                        String jobRegistryString,
160                        String brokerConfiguration) {
161               
162                //List<Job> jobs = (List<Job>) jobsList;
163                TaskQueue waitingTasks = new TaskQueue();
164               
165                for(int i = 0; i < jobsList.size(); i++){
166                        Job newJob = (Job)jobsList.get(i);
167                        //jobsMap.put(newJob.getId(), newJob);
168                        this.jobRegistry.addJob(newJob);
169                        unfinishedJobs.add(newJob);
170                        List<Task> tasks = newJob.getTask();
171                        //unfinishedTasks.addAll(tasks);
172                       
173                        waitingTasks.addAll(tasks);
174                }
175               
176                if(jobRegistryString != null){
177                        waitingTasks.addAll(jobRegistry.get(jobRegistryString).getTask());
178                }
179               
180                unfinishedTasks.addAll(waitingTasks.getReadyTasks(jobRegistry));
181               
182                try {
183                        SchedulingPlanInterface<?> decision = gridSchedulerPlugin.schedule(
184                                                                                                                new TaskArrivedEvent(),
185                                                                                                                unfinishedJobs,
186                                                                                                                unfinishedTasks,
187                                                                                                                jobRegistry,
188                                                                                                                moduleList,
189                                                                                                                null);
190                        if (decision == null)
191                                return;
192                       
193                        log.info(decision.getDocument());
194                       
195                        execute(decision);
196                       
197                } catch (Exception e) {
198                        log.error("Exception during scheduling. " + e.getMessage());
199                        e.printStackTrace();
200                }
201               
202        }
203       
204        protected void execute(SchedulingPlanInterface<?> decision){
205                ScheduledTaskInterface<?> taskSchedulingDecisions[] = decision.getTask();
206               
207                for (int i = 0; i < taskSchedulingDecisions.length; i++) {
208                       
209                        try {
210                       
211                                ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions[i];
212       
213       
214                                String jobID = taskDecision.getJobId();
215                                String taskID = taskDecision.getTaskId();
216                               
217                                // Task allocations that were rejected because of lack of resources or which were canceled and
218                                // not scheduled again are returned to the user.
219                                if(taskDecision.getStatus() == AllocationStatus.REJECTED){
220                                        Job jobGridlet = this.jobRegistry.get(jobID);
221                                        send(output, 0, GridSimTags.GRIDLET_RETURN, new IO_data(jobGridlet, 0, jobGridlet.getSenderId()));
222                                        continue;
223                                }
224                               
225                                AllocationInterface<?> allocations[] = taskDecision.getAllocation();
226                               
227                                //indicates, whether the scheduling decision concerns a new task or a previously scheduled task
228                                boolean schedulingNewTask = true;
229       
230                /*              Map<String, Task> tasksMap = allTasks.get(jobID);
231                                if (tasksMap == null) {
232                                        tasksMap = new HashMap<String, Task>();
233                                        allTasks.put(jobID, tasksMap);
234                                }
235                                Task taskGridlet = tasksMap.get(taskID);
236                                if (taskGridlet == null) {
237                                        Job jg = jobsMap.get(jobID);
238                                        taskGridlet = jg.getTask(taskID);
239                                        tasksMap.put(taskID, taskGridlet);
240                                } else {
241                                        schedulingNewTask = false;
242                                }
243        */
244                                if (schedulingNewTask) {
245                                        // create new sub tasks
246                                        //sendSubTasks(taskGridlet, processDecisions);
247                                        Task taskGridlet = (Task) this.jobRegistry.getTaskInfo(jobID, taskID);
248                                        sendSubTasks(taskGridlet, allocations);
249                                        taskGridlet.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
250                                } else {
251                                        // move task
252                                //      SubTaskGridlet[] simpleGridlets = taskGridlet.getGridlets();
253                                //      moveSubTasks(simpleGridlets, allocations);
254                                }
255                               
256                        }catch (Exception e){
257                                e.printStackTrace();
258                        }
259                }
260        }
261
262        /**
263         * Sends (spreads) the given task gridlet to resources
264         * @param taskGridlet the task gridlet to be sent
265         * @param resourceIDs the IDs of resources to which the gridlet is to be sent
266         * @param reservationID the IDs of reservations
267         * @param procCounts the number of processor to be used on each resource
268         * @param peRatings the processing elements rating to be used on each resource
269         * @pre resourceIDs.length == reservationID.length == procCounts.length == peRatings.length == memRequests.length 
270         */
271        protected void sendSubTasks(Task task, AllocationInterface<?> allocations[] ) {
272
273                for (int i = 0; i < allocations.length; i++) {
274
275                        AllocationInterface<?> allocation = allocations[i];
276
277                        String refersTo = allocation.getProcessGroupId(); // null;//allocation.getRefersTo();
278                        if(refersTo == null)
279                                refersTo = task.getId();
280                       
281                        Executable exec = null;
282                       
283                        if(refersTo.equals(task.getId())){
284                                exec = new Executable(task);
285                        } else {
286                                List<AbstractProcesses> processes = task.getProcesses();
287                                if(processes == null) {
288                                        try {
289                                                log.error("Allocation: " + allocation.getDocument() + "\nrefers to unknown task or processes set." +
290                                                                " Set correct value (task id or prcesses set id) for allocation refersTo attribute.");
291                                        } catch (Exception e) {
292                                                e.printStackTrace();
293                                        }
294                                }
295                                boolean found = false;
296                                for(int j = 0; j < processes.size() && !found; j++){
297                                        AbstractProcesses procesesSet = processes.get(j);
298                                        if(refersTo.equals(procesesSet.getId())){
299                                                exec = new Executable(task, procesesSet);
300                                                found = true;
301                                        }
302                                }
303                                if(!found){
304                                        log.error("Allocation refers to unknown proceses set.");
305                                }
306                               
307                        }
308                       
309               
310                        exec.setUserID(this.get_id());
311                        exec.setLength(task.getLength());
312                       
313                        exec.setReservationId(allocation.getReservationId());
314
315                        executables.add(exec);
316                       
317                        HostInterface<?> host = allocation.getHost();
318                        ComputingResourceTypeInterface<?> crt = host.getMachineParameters();
319                        if(crt != null){
320                                ComputingResourceTypeItemInterface<?> crti = crt.getComputingResourceTypeItem(0);
321                                if(crti != null){
322                                        ParameterPropertyInterface<?> properties[] = crti.getHostParameter().getProperty();
323                                        for(int p = 0; p < properties.length; p++){
324                                                ParameterPropertyInterface<?> property = properties[p];
325                                                if("chosenCPUs".equals(property.getName())){
326                                                        Object cpuNames = property.getValue();
327                                                        exec.addSpecificResource(ResourceParameterName.FREECPUS, cpuNames);
328                                                }
329                                        }
330                                }
331                        }
332                       
333                        String resourceID = allocation.getHost().getHostname();
334                        int resID = getEntityId(resourceID);
335                        IO_data data = new IO_data(exec, 0, resID);
336                        send(output, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, data);
337
338                        if(log.isDebugEnabled())
339                                log.debug("Submitted task " + exec.getJobId() + "_" + exec.getId() + " to resource " + resourceID);
340                }
341        }
342       
343       
344        protected List<?> prepareJobDescription(Sim_event ev){
345                List<JobDescription> jobsList = (List<JobDescription>) ev.get_data();
346                int senderId = ev.get_src();
347                ArrayList<Job> ret = new ArrayList<Job>(jobsList.size());
348
349                DateTime submitionTime = new DateTime();
350               
351                for (JobDescription jobDescription : jobsList) {
352                        try {
353                               
354                                // transform job description to resource requirements
355                               
356                                if(log.isInfoEnabled())
357                                        log.info("Received job  " + jobDescription.getJobId() + " at " + submitionTime);
358                               
359                                Job newJob = new Job(jobDescription.getJobId());
360                                newJob.setSenderId(senderId);
361                               
362                                for (TaskDescription taskDescription : jobDescription) {
363                                       
364                                        String xmlResReq = this.xsltTransformer.taskToResourceRequirements(
365                                                                                taskDescription.getDocument(),
366                                                                                jobDescription.getJobId(),
367                                                                                taskDescription.getUserDn(),
368                                                                                submitionTime);
369
370                                        Task newTask = new Task(xmlResReq);
371                                        newTask.setSenderId(senderId);
372                                        newTask.setStatus((int)BrokerConstants.TASK_STATUS_UNSUBMITTED);
373                                        newTask.setLength(taskDescription.getTaskLength());
374                                        newTask.setWorkloadLogWaitTime(taskDescription.getWorkloadLogWaitTime());
375                        //              newTask.setSubmissionTime(taskDescription.getSubmissionTime());
376                                       
377                                        if(log.isInfoEnabled())
378                                                log.info("Received task " + newTask.getId() + " at " + newTask.getSubmissionTimeToBroker());
379                                       
380                                        newJob.add(newTask);
381                                }
382                                ret.add(newJob);
383                               
384                                jobDescription.discardUnused();
385                               
386                        } catch (Exception e){
387                                log.error(e.getMessage());
388                                e.printStackTrace();
389                        }
390                }
391               
392                return ret;
393        }
394       
395        public void notifyCanceledGridlet(AbstractExecutable taskGridlet){
396                if(log.isDebugEnabled())
397                        log.debug("przyszedl scancelowany gridlet.");
398               
399               
400                Executable task = (Executable) taskGridlet;
401                String jobID = task.getJobId();
402               
403                SchedulingPlanInterface<?> decision = null;
404               
405                TaskInterface<?> tii = jobRegistry.getTaskInfo(jobID, task.getId());
406                try {
407
408                        tii.setStatus((int)BrokerConstants.JOB_STATUS_CANCELED);
409                       
410                        TaskCanceledEvent event = new TaskCanceledEvent(task.getJobId(), task.getTaskId());
411                        event.setReason(SchedulingEventReason.RESERVATION_EXCEEDED);
412                        decision = gridSchedulerPlugin.schedule(event, unfinishedJobs, unfinishedTasks, jobRegistry,
413                                                                                                        moduleList,  null);
414                       
415                        if(decision == null)
416                                return;
417                       
418                        execute(decision);
419                       
420                } catch (Exception e) {
421                        log.error("Exception during scheduling. " + e.getMessage());
422                        e.printStackTrace();
423                }
424        }
425       
426        public void notifyReturnedGridlet(AbstractExecutable simpleGridlet) {
427               
428                Executable exec = (Executable) simpleGridlet;
429               
430                long duration = Double.valueOf(exec.getFinishTime() - exec.getExecStartTime()).longValue();
431                log.debug("Executable " + exec.getId() + "\nstart time:  " +
432                                new java.util.Date(Double.valueOf(exec.getExecStartTime()).longValue() * 1000)
433                + "\nfinish time: " + new java.util.Date(Double.valueOf(exec.getFinishTime()).longValue() * 1000)
434                + "\nduration: " + new Duration(duration * 1000));
435               
436               
437                try {
438                        Job job = this.jobRegistry.get(exec.getJobId());
439                        Task task = job.getTask(exec.getTaskId());
440                        if(exec.getProcessesId() == null){
441                                task.setStatus(exec.getStatus());
442                        } else {
443                                List<AbstractProcesses> processesList = task.getProcesses();
444                                for(int i = 0; i < processesList.size(); i++){
445                                        AbstractProcesses processes = processesList.get(i);
446                                        if(processes.getId().equals(exec.getProcessesId())){
447                                                processes.setStatus(exec.getStatus());
448                                                break;
449                                        }
450                                }
451                        }
452                       
453                        if(job.isFinished()){
454                                send(output, 0, GridSimTags.GRIDLET_RETURN, new IO_data(job, 0, job.getSenderId()));
455                        }
456                        else {
457                                scheduleJob(new ArrayList<Job>(), null, null, moduleList, null, null, null, job.getId(), null);
458                        }
459                       
460                } catch (NoSuchFieldException e) {
461                        e.printStackTrace();
462                }
463        }
464       
465        public ExecutablesList getExecutables(){
466                return this.executables;
467        }
468
469        @Override
470        public void processRequest(Sim_event ev) {
471                // TODO Auto-generated method stub
472               
473        }
474
475        @Override
476        public void send(int entityID, double delay, int gridSimTag, Object data) {
477                // TODO Auto-generated method stub
478               
479        }
480       
481}
Note: See TracBrowser for help on using the repository browser.