1 | package example.gridplugin; |
---|
2 | |
---|
3 | import org.qcg.broker.schemas.resreqs.ExecutionTimeType; |
---|
4 | import org.qcg.broker.schemas.resreqs.TimePeriod; |
---|
5 | import org.qcg.broker.schemas.resreqs.TimePeriodChoice; |
---|
6 | import gssim.schedframe.scheduling.plugin.local.GssimTimeOperations; |
---|
7 | |
---|
8 | import java.util.List; |
---|
9 | import java.util.Properties; |
---|
10 | |
---|
11 | import org.apache.commons.logging.Log; |
---|
12 | import org.apache.commons.logging.LogFactory; |
---|
13 | import org.joda.time.DateTime; |
---|
14 | |
---|
15 | import schedframe.exceptions.ModuleException; |
---|
16 | import schedframe.exceptions.NotAuthorizedException; |
---|
17 | import schedframe.exceptions.ReservationException; |
---|
18 | import schedframe.resources.ResourceProvider; |
---|
19 | import schedframe.resources.ResourceStateDescription; |
---|
20 | import schedframe.resources.units.Processors; |
---|
21 | import schedframe.scheduling.JobInterface; |
---|
22 | import schedframe.scheduling.Offer; |
---|
23 | import schedframe.scheduling.Queue; |
---|
24 | import schedframe.scheduling.Reservation; |
---|
25 | import schedframe.scheduling.ResourceUsage; |
---|
26 | import schedframe.scheduling.TaskInterface; |
---|
27 | import schedframe.scheduling.TimeRequirements; |
---|
28 | import schedframe.scheduling.TimeResourceAllocation; |
---|
29 | import schedframe.scheduling.events.SchedulingEvent; |
---|
30 | import schedframe.scheduling.plan.SchedulingPlanInterface; |
---|
31 | import schedframe.scheduling.plan.impl.Allocation; |
---|
32 | import schedframe.scheduling.plan.impl.Host; |
---|
33 | import schedframe.scheduling.plan.impl.ScheduledTask; |
---|
34 | import schedframe.scheduling.plan.impl.ScheduledTime; |
---|
35 | import schedframe.scheduling.plan.impl.SchedulingPlan; |
---|
36 | import schedframe.scheduling.plugin.SchedulingPluginConfiguration; |
---|
37 | import schedframe.scheduling.plugin.configuration.DefaultConfiguration; |
---|
38 | import schedframe.scheduling.plugin.grid.GridSchedulingPlugin; |
---|
39 | import schedframe.scheduling.plugin.grid.JobRegistry; |
---|
40 | import schedframe.scheduling.plugin.grid.Module; |
---|
41 | import schedframe.scheduling.plugin.grid.ModuleList; |
---|
42 | import schedframe.scheduling.plugin.grid.Prediction; |
---|
43 | import schedframe.scheduling.plugin.grid.ReservationManager; |
---|
44 | import schedframe.scheduling.plugin.local.TimeOperations; |
---|
45 | import schedframe.scheduling.utils.ResourceParameterName; |
---|
46 | import simulator.lists.slotList.SlotsGantt; |
---|
47 | |
---|
48 | /** |
---|
49 | * |
---|
50 | * @author Ariel |
---|
51 | * |
---|
52 | * This Plugin schedules jobs using the earliest due date (EDD) policy to sort them in a queue |
---|
53 | * and then applies the greedy list scheduling Graham algorithm. |
---|
54 | * |
---|
55 | * Jobs $J_i$ from the queue are assigned to one of |
---|
56 | * clusters $\mathcal{M}$ using a greedy list-scheduling |
---|
57 | * algorithm based on ~\cite{graham1969bmt,mounie07}. |
---|
58 | * To this end, the Grid scheduler queries each organization $O_k$ about |
---|
59 | * a list of free slots $\psi_{ki} \in \Psi_k, \psi_{ki}=(t',t'',m_{ki}) , i=1..|\Psi|$. |
---|
60 | * Parameters $t'$ and $t''$ denote start and time of a slot, respectively. |
---|
61 | * $m_{ki}$ is a number of processors available within the slot $i$ at organization $O_k$. |
---|
62 | * Slots are time periods within which a number of available processors is constant. |
---|
63 | * The Grid scheduler sorts collected slots by increasing start time. The schedule is constructed |
---|
64 | * by assigning jobs in the Grid scheduler's queue to processors in given slots in a greedy manner. |
---|
65 | * For each slot $\psi_{kI}$ (starting from the earliest one) the scheduler chooses from its queue the |
---|
66 | * first job $J_j$ requiring no more than $m_{ki}$ processors in all subsequent |
---|
67 | * slots $i \geq I$ such as $t''_{ki} \geq t'_{kI} + p_j$. If such a job was find the scheduler schedules it to be started at |
---|
68 | * $t'_{kI}$, and removes it from the queue. If there is no such a job, the scheduler apply the same procedure to the next free slot |
---|
69 | *whose number of available processors is larger than the current one. |
---|
70 | */ |
---|
71 | public class ARGraham implements GridSchedulingPlugin<org.qcg.broker.schemas.schedulingplan.SchedulingPlan>{ |
---|
72 | |
---|
73 | private Log log = LogFactory.getLog(ARGraham.class); |
---|
74 | protected TimeOperations timeOpers; |
---|
75 | |
---|
76 | public SchedulingPlanInterface<org.qcg.broker.schemas.schedulingplan.SchedulingPlan> schedule( |
---|
77 | SchedulingEvent event, Queue<? extends JobInterface<?>> jobQueue, |
---|
78 | Queue<? extends TaskInterface<?>> taskQueue, |
---|
79 | JobRegistry jobRegistry, ModuleList moduleList, Prediction prediction) |
---|
80 | throws Exception { |
---|
81 | |
---|
82 | ReservationManager reservManager = null; |
---|
83 | for(int i = 0; i < moduleList.size() && reservManager == null; i++){ |
---|
84 | Module m = moduleList.get(i); |
---|
85 | switch(m.getType()){ |
---|
86 | case RESERVATION_MANAGER: reservManager = (ReservationManager) m; |
---|
87 | break; |
---|
88 | } |
---|
89 | } |
---|
90 | |
---|
91 | // empty scheduling plan |
---|
92 | SchedulingPlan sd = new SchedulingPlan(); |
---|
93 | |
---|
94 | // getting offers |
---|
95 | DateTime startPeriod = timeOpers.getTime(); |
---|
96 | DateTime endPeriod = new DateTime(Long.MAX_VALUE); |
---|
97 | |
---|
98 | ExecutionTimeType ett = new ExecutionTimeType(); |
---|
99 | TimePeriod tp = new TimePeriod(); |
---|
100 | tp.setPeriodStart(startPeriod.toDate()); |
---|
101 | TimePeriodChoice tpc = new TimePeriodChoice(); |
---|
102 | tpc.setPeriodEnd(endPeriod.toDate()); |
---|
103 | tp.setTimePeriodChoice(tpc); |
---|
104 | ett.setTimePeriod(tp); |
---|
105 | |
---|
106 | TimeRequirements timeRequirements = new TimeRequirements(ett); |
---|
107 | |
---|
108 | List<Offer> offers = reservManager.getOffer(timeRequirements, null, null); |
---|
109 | |
---|
110 | System.out.print("Number of offers " + offers.size()); |
---|
111 | |
---|
112 | if (offers.size() < 1) |
---|
113 | return sd; |
---|
114 | |
---|
115 | // in this example, there is only one computing resource, |
---|
116 | // therefore only one offer is expected |
---|
117 | Offer offer = offers.get(0); |
---|
118 | ResourceProvider provider = offer.getProvider(); |
---|
119 | int maxCpuCnt = offer.get(0).getAllocatedResource(). |
---|
120 | getResourceUnit(ResourceParameterName.CPUCOUNT). |
---|
121 | getAmount(); |
---|
122 | |
---|
123 | SlotsGantt slotGantt = new SlotsGantt(offer); |
---|
124 | |
---|
125 | int i = 0; |
---|
126 | while (taskQueue.size() > 0 && i < slotGantt.size()) { |
---|
127 | DateTime startTime = slotGantt.get(i).getStart(); |
---|
128 | int j = 0; |
---|
129 | while (j < taskQueue.size()) { |
---|
130 | TaskInterface<?> task = taskQueue.get(j); |
---|
131 | |
---|
132 | // resource requirements |
---|
133 | int taskSize = Double.valueOf(task.getCpuCntRequest()).intValue(); |
---|
134 | Processors ruReq = new Processors("compRes1", maxCpuCnt, taskSize); |
---|
135 | ResourceStateDescription resourceState = new ResourceStateDescription(provider); |
---|
136 | resourceState.addResourceUnit(ruReq); |
---|
137 | |
---|
138 | // end of task |
---|
139 | DateTime endTask = new DateTime(startTime.plus(task.getExpectedDuration())); |
---|
140 | |
---|
141 | // check slot |
---|
142 | TimeResourceAllocation taskSlot = new TimeResourceAllocation(resourceState, startTime, endTask); |
---|
143 | if (slotGantt.checkSlot(taskSlot)) { |
---|
144 | slotGantt.addSlot(taskSlot); |
---|
145 | taskQueue.remove(j); // pull out of the queue |
---|
146 | |
---|
147 | Reservation reservation = reserveResources(reservManager, provider, |
---|
148 | taskSlot); |
---|
149 | |
---|
150 | // submit task to created reservation |
---|
151 | // add to scheduling plan |
---|
152 | ScheduledTask taskSD = new ScheduledTask(); |
---|
153 | taskSD.setJobId(task.getJobId()); |
---|
154 | taskSD.setTaskId(task.getId()); |
---|
155 | |
---|
156 | Allocation taskAllocation = new Allocation(); |
---|
157 | taskAllocation.setProcessesCount(taskSize); |
---|
158 | |
---|
159 | Host host = new Host(); |
---|
160 | host.setHostname(provider.getProviderId()); |
---|
161 | taskAllocation.setHost(host); |
---|
162 | taskAllocation.setReservation(reservation); |
---|
163 | |
---|
164 | taskSD.addAllocation(taskAllocation); |
---|
165 | |
---|
166 | ScheduledTime scheduledTime = new ScheduledTime(); |
---|
167 | scheduledTime.setStart(startTime.toDate()); |
---|
168 | scheduledTime.setEnd(endTask.toDate()); |
---|
169 | taskSD.setScheduledTime(scheduledTime); |
---|
170 | |
---|
171 | sd.addTask(taskSD); |
---|
172 | |
---|
173 | } else |
---|
174 | j++; |
---|
175 | } |
---|
176 | i++; |
---|
177 | } |
---|
178 | |
---|
179 | return sd; |
---|
180 | } |
---|
181 | |
---|
182 | protected Reservation reserveResources(ReservationManager reservManager, |
---|
183 | ResourceProvider provider, |
---|
184 | TimeResourceAllocation taskSlot) { |
---|
185 | |
---|
186 | ResourceUsage resourceUsage = new ResourceUsage(); |
---|
187 | resourceUsage.setProvider(provider); |
---|
188 | resourceUsage.add(taskSlot); |
---|
189 | |
---|
190 | // set initial reservation |
---|
191 | log.debug("GLOBAL: creating reservation for offer: " + taskSlot + " provider: " + provider); |
---|
192 | List<Reservation> reservations = null; |
---|
193 | try { |
---|
194 | reservations = reservManager.createReservation(resourceUsage, null); |
---|
195 | } catch (ModuleException e) { |
---|
196 | log.error("GLOBAL: crerating reservation fails"); |
---|
197 | return null; |
---|
198 | } |
---|
199 | |
---|
200 | // only one reservation is expected |
---|
201 | Reservation reservation = reservations.get(0); |
---|
202 | |
---|
203 | if(reservation == null) |
---|
204 | return null; |
---|
205 | |
---|
206 | String reservationId = reservation.getId(); |
---|
207 | log.debug("GLOBAL: reservationID: "+reservationId); |
---|
208 | |
---|
209 | |
---|
210 | // commit initial reservation |
---|
211 | try { |
---|
212 | reservation = reservManager.commitReservation(reservation, null); |
---|
213 | } catch (ModuleException e) { |
---|
214 | log.error("GLOBAL: reservation " + reservationId + " was not commited successfully"); |
---|
215 | return null; |
---|
216 | } |
---|
217 | |
---|
218 | log.debug("GLOBAL: reservation "+ reservation.getId() + " commited"); |
---|
219 | |
---|
220 | return reservation; |
---|
221 | } |
---|
222 | |
---|
223 | public SchedulingPluginConfiguration getConfiguration() { |
---|
224 | return DefaultConfiguration.forGridPlugin(); |
---|
225 | } |
---|
226 | |
---|
227 | public String getPluginName() { |
---|
228 | return getClass().getName(); |
---|
229 | } |
---|
230 | |
---|
231 | public void initPlugin(Properties properties) { |
---|
232 | timeOpers = new GssimTimeOperations(); |
---|
233 | } |
---|
234 | |
---|
235 | } |
---|