source: gssim/trunk/src/example/gridplugin/GridARAllSlots.java @ 5

Revision 5, 10.2 KB checked in by wojtekp, 14 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package example.gridplugin;
2
3import java.util.List;
4import java.util.Properties;
5
6import org.apache.commons.logging.Log;
7import org.apache.commons.logging.LogFactory;
8import org.joda.time.DateTime;
9
10import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
11import schedframe.resources.ResourceStateDescription;
12import schedframe.resources.units.Processors;
13import schedframe.resources.units.ResourceUnit;
14import schedframe.scheduling.JobInterface;
15import schedframe.scheduling.Offer;
16import schedframe.scheduling.Queue;
17import schedframe.scheduling.Reservation;
18import schedframe.scheduling.ResourceRequirements;
19import schedframe.scheduling.TaskInterface;
20import schedframe.scheduling.TimeRequirements;
21import schedframe.scheduling.TimeResourceAllocation;
22import schedframe.scheduling.events.SchedulingEvent;
23import schedframe.scheduling.events.TaskCanceledEvent;
24import schedframe.scheduling.plan.SchedulingPlanInterface;
25import schedframe.scheduling.plan.impl.Allocation;
26import schedframe.scheduling.plan.impl.Host;
27import schedframe.scheduling.plan.impl.ScheduledTask;
28import schedframe.scheduling.plan.impl.SchedulingPlan;
29import schedframe.scheduling.plugin.SchedulingPluginConfiguration;
30import schedframe.scheduling.plugin.configuration.DefaultConfiguration;
31import schedframe.scheduling.plugin.grid.GridSchedulingPlugin;
32import schedframe.scheduling.plugin.grid.JobRegistry;
33import schedframe.scheduling.plugin.grid.Module;
34import schedframe.scheduling.plugin.grid.ModuleList;
35import schedframe.scheduling.plugin.grid.NetworkManager;
36import schedframe.scheduling.plugin.grid.Prediction;
37import schedframe.scheduling.plugin.grid.ReservationManager;
38import schedframe.scheduling.plugin.grid.ResourceDiscovery;
39import schedframe.scheduling.utils.ResourceParameterName;
40
41/**
42 *
43 * @author Marcin Krystek
44 *
45 */
46public class GridARAllSlots implements GridSchedulingPlugin<org.qcg.broker.schemas.schedulingplan.SchedulingPlan> {
47
48        private static Log log = LogFactory.getLog(GridARAllSlots.class);
49       
50        public GridARAllSlots(){
51        }
52       
53        public SchedulingPlanInterface<org.qcg.broker.schemas.schedulingplan.SchedulingPlan> schedule(
54                        SchedulingEvent event,
55                        Queue<? extends JobInterface<?>> jobQueue,
56                        Queue<? extends TaskInterface<?>> taskQueue,
57                        JobRegistry jobRegistry,
58                        ModuleList moduleList,
59                        Prediction prediction)
60                        throws Exception {
61               
62                SchedulingPlan plan = null;
63               
64                ResourceDiscovery resources = null;
65                ReservationManager reservManager = null;
66                for(int i = 0; i < moduleList.size(); i++){
67                        Module m = moduleList.get(i);
68                        switch(m.getType()){
69                                case RESERVATION_MANAGER: reservManager = (ReservationManager) m;
70                                        break;
71                                case RESOURCE_DISCOVERY: resources = (ResourceDiscovery) m;
72                                        break;
73                        }
74                }
75               
76                // this method is called any time new event arrived.
77                // Events types which are expected to appear here are defined in plugin configuration.
78                // See getConfiguration() method
79               
80                // choose correct method to serve the event
81                switch(event.getType()){
82               
83                        case TASK_ARRIVED: plan = scheduleNewTask(jobQueue, taskQueue,
84                                                                                                        jobRegistry, resources,
85                                                                                                        reservManager, null,
86                                                                                                        prediction);
87                                break;
88                       
89                        case TASK_CANCELED: TaskCanceledEvent e = (TaskCanceledEvent) event;
90                                                                plan = rejectTask(e.getJobId(), e.getTaskId());
91                                break;
92                               
93                        default: log.info("Scheduling event " + event.getType().name() + " is not" +
94                                        " supportd by the plugin ");
95                                break;
96                }
97               
98                return plan;
99               
100        }
101
102       
103        protected SchedulingPlan scheduleNewTask(Queue<? extends JobInterface<?>> jobQueue,
104                        Queue<? extends TaskInterface<?>> taskQueue,
105                                        JobRegistry jobRegistry,
106                                        ResourceDiscovery resources,
107                                        ReservationManager reservManager,
108                                        NetworkManager networkManager,
109                                        Prediction prediction)
110                                        throws Exception{
111
112                // prepare plan with decision of allocating resources to tasks
113                SchedulingPlan plan = new SchedulingPlan();
114                int size = taskQueue.size();
115               
116                // iterate over task queue
117                for(int i = 0; i < size; i++) {
118                        TaskInterface<?> task = taskQueue.remove(0);
119                       
120                        // prepare description of task time and resource requirements
121                        TimeRequirements timeRequirements = new TimeRequirements(task);
122                        ResourceRequirements resourceRequirements = new ResourceRequirements(task);
123                       
124                        // get offers from all local resources. Grid plugin MUST be aware of the format
125                        // and the way in which local resource (plugin) returns offers.
126                        List<Offer> offers = reservManager.getOffer(timeRequirements, resourceRequirements, null);
127                       
128                        // choose the offer which provides enough free resources. Order of the list "offers"
129                        // is not determined, therefore results my differ between executions.
130                        Offer offer = null;
131                        for(int offerIdx = 0; offerIdx < offers.size() && offer == null; offerIdx++){
132                                offer = offers.get(offerIdx);
133                       
134                                // local resource manager (plugin) returns all time slots between start and end time
135                                // declared in timeRequirements. Each time slot describes different resource usage.
136                                // Check if there is enough free resources between start and end time, and create
137                                // new offer - argument/request for create reservation method.
138                                offer = createReservationRequest(offer, timeRequirements, resourceRequirements);
139                        }
140
141                        if(offer == null){
142                                log.error("skipping task: " + task.getJobId() + "_" + task.getId());
143                                continue;
144                        }
145                        // create reservation. Offer should have only these time resource allocations
146                        // which should be reserved - all unnecessary allocations must be removed. Allocations
147                        // may be also brand new objects, but remember to set correct provider info.
148                        // There is different reservation for different time allocation created.
149                        List<Reservation> reservations = reservManager.createReservation(offer, null);
150                       
151                        // get the first reservation. In this example, there is only one created
152                        Reservation r = reservations.get(0);
153                       
154                        // set information about task (job and task id) for which this reservation
155                        // was created
156                        r.setJobId(task.getJobId());
157                        r.setTaskId(task.getId());
158                       
159                        // commit reservation. Form now reservation can not expire.
160                        r = reservManager.commitReservation(r, null);
161       
162                        // prepare allocation description in scheduling plan
163                       
164                        // information about destination host
165                        Host host = new Host();
166                        host.setHostname(r.getAllocatedResource().getProvider().getProviderId());
167                       
168                        // information about created reservation
169                        Allocation allocation = new Allocation();
170                        allocation.setProcessesCount(1);
171                        allocation.setReservation(r);
172                       
173                        allocation.setHost(host);
174                       
175                        // information about the task itself
176                        ScheduledTask scheduledTask = new ScheduledTask();
177                        scheduledTask.setTaskId(r.getTaskId());
178                        scheduledTask.setJobId(r.getJobId());
179                        scheduledTask.addAllocation(allocation);               
180                       
181                        plan.addTask(scheduledTask);
182                }
183                return plan;
184        }
185       
186        protected SchedulingPlan rejectTask(String jobId, String taskId){
187                // prepare plan with decision about task rejection.
188                SchedulingPlan plan = new SchedulingPlan();
189                        // create task
190                        ScheduledTask task = new ScheduledTask();
191                                task.setJobId(jobId);
192                                task.setTaskId(taskId);
193                                // set its status as rejected
194                                task.setStatus(AllocationStatus.REJECTED);
195                plan.addTask(task);
196                return plan;
197        }
198       
199       
200        private Offer createReservationRequest(Offer offer, TimeRequirements timeRequirements, ResourceRequirements resourceRequirements){
201                // This method assumes, that time allocations in offer argument covers continuous time interval.
202               
203                try {
204                        // determine time duration for which resource reservation will be created
205                        long reqDuration = timeRequirements.getEndMillis() - timeRequirements.getStartMillis();
206                        // this is helper variable. Its value will change during processing
207                        long duration = reqDuration;
208                       
209                        // get number of required processors
210                        int reqCpu = Double.valueOf(resourceRequirements.getCpuCntRequest()).intValue();
211                        DateTime start = null;
212                       
213                        // iterate through all time allocation.
214                        // This loop stops when long enough time interval is found
215                        for(int i = 0; i < offer.size() && duration > 0; i++){
216                                TimeResourceAllocation allocation = offer.get(i);
217                                ResourceUnit unit = allocation.getAllocatedResource().
218                                                                                                getResourceUnit(ResourceParameterName.CPUCOUNT);
219
220                                // check if free amount of the resource is enough to satisfy required amount
221                                if(unit.getFreeAmount() >= reqCpu){
222                                        // if so, then shorten required duration, because this allocation satisfies resource requirements and
223                                        // it can be "joined" with previous one in single, continuous time interval.
224                                        duration = duration - (allocation.getEndMillis() - allocation.getStartMillis());
225                                        if(start == null)
226                                                start = allocation.getStart();
227                                } else {
228                                        // otherwise, continue searching from the beginning
229                                        duration = reqDuration;
230                                        start = null;
231                                }
232                        }
233
234                        // if duration is grater then 0, it means that there is no single and continuous time interval
235                        // which can be reserved the task
236                        if(duration > 0){
237                                log.error("There is not enough free resources between " +
238                                                        timeRequirements.getStart() + " " + timeRequirements.getStart().getMillis() +
239                                                        " and " +
240                                                        timeRequirements.getEnd() + " " + timeRequirements.getEnd().getMillis() +
241                                                        " in offer " + offer.getId());
242                                return null;
243                        }
244                       
245                        // create new offer, which will be used as an argument/request by reservation manager to create
246                        // reservation for task.
247                        int totalAmount = offer.get(0).getAllocatedResource().
248                                                                                        getResourceUnit(ResourceParameterName.CPUCOUNT).
249                                                                                        getAmount();
250                       
251                        ResourceStateDescription resDesc = new ResourceStateDescription(offer.getProvider());
252                        resDesc.addResourceUnit(new Processors(totalAmount, reqCpu, 1));
253                       
254                        TimeResourceAllocation allocation = new TimeResourceAllocation(start, start.plus(reqDuration));
255                        allocation.setAllocatedResource(resDesc);
256                       
257                        Offer retOffer = new Offer();
258                        retOffer.setProvider(offer.getProvider());
259                        retOffer.add(allocation);
260                       
261                        return retOffer;
262
263                } catch (NoSuchFieldException e) {
264                        e.printStackTrace();
265                }
266               
267                return null;
268        }
269       
270        public SchedulingPluginConfiguration getConfiguration() {
271                return DefaultConfiguration.forGridPlugin();
272        }
273
274        public String getPluginName() {
275                return getClass().getName();
276        }
277
278        public void initPlugin(Properties properties) {
279        }
280
281}
Note: See TracBrowser for help on using the repository browser.