source: DCWoRMS/trunk/src/schedframe/scheduling/policy/AbstractManagementSystem.java @ 477

Revision 477, 8.0 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
RevLine 
[477]1package schedframe.scheduling.policy;
2
3import java.util.HashMap;
4import java.util.Map;
5
6import org.apache.commons.logging.Log;
7import org.apache.commons.logging.LogFactory;
8import org.joda.time.DateTimeUtilsExt;
9
10import schedframe.PluginConfiguration;
11import schedframe.events.scheduling.SchedulingEventType;
12import schedframe.resources.units.StandardResourceUnitName;
13import schedframe.scheduling.Scheduler;
14import schedframe.scheduling.WorkloadUnitHandler;
15import schedframe.scheduling.manager.resources.ManagedResources;
16import schedframe.scheduling.manager.resources.ResourceManager;
17import schedframe.scheduling.manager.resources.ResourceManagerFactory;
18import schedframe.scheduling.manager.tasks.JobRegistryImpl;
19import schedframe.scheduling.manager.tasks.JobRegistry;
20import schedframe.scheduling.plan.AllocationInterface;
21import schedframe.scheduling.plan.SchedulingPlanInterface;
22import schedframe.scheduling.plugin.SchedulingPlugin;
23import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
24import schedframe.scheduling.plugin.grid.ModuleList;
25import schedframe.scheduling.plugin.local.ResourceAllocationInterface;
26import schedframe.scheduling.queue.TaskQueue;
27import schedframe.scheduling.queue.TaskQueueList;
28import schedframe.scheduling.tasks.Job;
29import schedframe.scheduling.tasks.WorkloadUnit;
30import simulator.WormsConstants;
31import eduni.simjava.Sim_event;
32import gridsim.GridSim;
33import gridsim.GridSimTags;
34import gridsim.Gridlet;
35import gridsim.IO_data;
36import gridsim.gssim.WormsTags;
37import gssim.schedframe.scheduling.ExecTask;
38import gssim.schedframe.scheduling.Executable;
39import gssim.schedframe.scheduling.queues.AbstractStatsSupportingQueue;
40
41public abstract class AbstractManagementSystem {
42
43        private Log log = LogFactory.getLog(AbstractManagementSystem.class);
44       
45        protected String name;
46
47        protected ResourceManager resourceManager;
48
49        protected TaskQueueList queues;
50
51        protected SchedulingPlugin schedulingPlugin;
52
53        protected ExecutionTimeEstimationPlugin execTimeEstimationPlugin;
54
55        protected ModuleList moduleList;
56       
57        protected JobRegistryImpl jobRegistry;
58       
59
60        public AbstractManagementSystem(String providerId, String entityName,
61                        ExecutionTimeEstimationPlugin execTimeEstPlugin, TaskQueueList queues) {
62               
63                this.name = entityName + "@" + providerId;
64                this.queues = queues;
65                this.jobRegistry = new JobRegistryImpl(name);
66                this.execTimeEstimationPlugin = execTimeEstPlugin;
67        }
68       
69        public void processEvent(Sim_event ev) {
70                processOtherEvent(ev);
71        }
72
73        protected void processOtherEvent(Sim_event ev) {
74                if (ev == null) {
75                        System.out.println(name + ".processOtherEvent(): " + "Error - an event is null.");
76                        return;
77                }
78
79                log.error(name + ".processOtherEvent(): Unable to "
80                                + "handle request from an event with a tag number " + ev.get_tag());
81        }
82       
83        public String getName() {
84                return name;
85        }
86
87        public PluginConfiguration getSchedulingPluginConfiguration() {
88                return schedulingPlugin.getConfiguration();
89        }
90       
91        public ResourceManager getResourceManager() {
92                if (resourceManager instanceof ResourceManager)
93                        return (ResourceManager) resourceManager;
94                else
95                        return null;
96        }
97
98        public ResourceAllocationInterface getAllocationManager() {
99                if (resourceManager instanceof ResourceAllocationInterface)
100                        return (ResourceAllocationInterface) resourceManager;
101                else
102                        return null;
103        }
104
105        protected JobRegistry getJobRegistry(){
106                return jobRegistry;
107        }
108
109        public boolean pluginSupportsEvent(int eventType){
110                return true;
111        }
112
113        public abstract void notifySubmittedWorkloadUnit(WorkloadUnit<?> wu, boolean ack);
114
115        public abstract void notifyCanceledWorkloadUnit(WorkloadUnit<?> wu);
116
117        public abstract void notifyReturnedWorkloadUnit(WorkloadUnit<?> wu);
118
119        protected abstract void executeSchedulingPlan(SchedulingPlanInterface decision);
120
121
122
123        //POPRAWIC  (ale co? bo teraz chyba jest ok)
124        protected void submitWorkloadUnit(WorkloadUnit<?> wu, AllocationInterface allocation) {
125                String providerName = allocation.getProviderName();
126                if (providerName == null) {
127                        return;
128                }
129                //Executable exec = (Executable) wu;
130                removeFromQueue(wu);
131                scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, wu);
132        }
133       
134        protected boolean sendCanceledWorkloadUnit(int tag, Executable task, int executableId, int destId) {
135               
136                if (tag != GridSimTags.GRIDLET_CANCEL) {
137                        return false;
138                }
139
140                long taskSize = 0;
141                if (task != null) {
142                        taskSize = task.getGridletOutputSize();
143                }
144
145                // if no Gridlet found, then create a new Gridlet but set its status
146                // to FAILED. Then, most importantly, set the resource parameters
147                // because the user will search/filter based on a resource ID.
148                else if (task == null) {
149                        try {
150                                taskSize = 100;
151                                task = jobRegistry.getTaskExecutable(executableId);
152                                task.setGridletStatus(Gridlet.FAILED);
153                                int cost = resourceManager.getSharedResourceUnits().get(StandardResourceUnitName.COST) != null ? resourceManager
154                                                .getSharedResourceUnits().get(StandardResourceUnitName.COST).get(0).getAmount()
155                                                : 1;
156                                task.setResourceParameter(scheduler.get_id(), cost);
157                        } catch (Exception e) {
158                                // empty ...
159                        }
160                }
161                scheduler.send(scheduler.getOutputPort(), GridSimTags.SCHEDULE_NOW, tag,  new IO_data(task, taskSize, destId));
162
163                return true;
164        }
165
166        protected boolean sendFinishedWorkloadUnit(WorkloadUnit<?> wu) {
167               
168                Executable exec = (Executable) wu;
169                if(scheduler.getParent() == null)
170                {
171                        Job job = jobRegistry.get(exec.getJobId());
172
173                        if(job.isFinished()){
174                                scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job);
175                                return true;
176                        }
177                        else return true;
178                }
179       
180        IO_data obj = new IO_data(exec, 0, /*task.getGridletOutputSize(),*/ GridSim.getEntityId(scheduler.getParent().get_name()));
181        scheduler.send(scheduler.getOutputPort(), 0.0, GridSimTags.GRIDLET_RETURN, obj);
182                return true;
183        }
184
185        protected void sendExecutableReadyEvent(ExecTask exec) {
186
187                /*if (wu instanceof JobInterface) {
188                        scheduler.sendInternal(Long.valueOf(0).doubleValue(), GssimTags.TASK_READY_FOR_EXECUTION,
189                                        wu);
190                        return;
191                }*/
192
193                long delay = 0;
194                try {
195                        long expectedStartTime = exec.getExecutionStartTime().getMillis() / 1000;
196                        long currentTime = DateTimeUtilsExt.currentTimeMillis() / 1000;
197                        delay = expectedStartTime - currentTime;
198                        if (delay < 0)
199                                delay = 0;
200                } catch (NoSuchFieldException e) {
201                        delay = 0;
202                }
203
204                scheduler.sendInternal(Long.valueOf(delay).doubleValue(), WormsTags.TASK_READY_FOR_EXECUTION,
205                                exec);
206        }
207       
208        protected void sendTimerEvent() {
209                PluginConfiguration pluginConfig = schedulingPlugin.getConfiguration();
210                if (pluginConfig != null) {
211                        Map<SchedulingEventType, Object> events = pluginConfig.getServedEvents();
212                        if (events != null) {
213                                Object obj = events.get(SchedulingEventType.TIMER);
214                                if (obj != null) {
215                                        int delay = (Integer) obj;
216                                        scheduler.sendInternal(delay, WormsTags.TIMER, null);
217                                }
218                        }
219                }
220        }
221       
222        protected boolean removeFromQueue(WorkloadUnit<?> wu) {
223                for(TaskQueue queue : queues){
224                        if(queue.contains(wu)){
225                                queue.remove(wu);
226                                return true;
227                        }
228                }
229                return false;
230        }
231
232        public TaskQueueList getAccessQueues(){
233                return queues;
234        }
235       
236        public Map<String, Integer> getQueuesSize() {
237                Map<String, Integer> queue_size = new HashMap<String, Integer>();
238                for (TaskQueue queue : queues) {
239                        queue_size.put(queue.getName(), queue.size());
240                }
241                return queue_size;
242        }
243
244        public void init(Scheduler sched, ManagedResources managedResources) {
245                scheduler = sched;
246                resourceManager = ResourceManagerFactory.createResourceManager(scheduler, managedResources);
247                scheduler.set_stat(WormsConstants.getResourcesStatisticsObject(queues.size()));
248                for(int i = 0; i < queues.size(); i++){
249                        TaskQueue q = queues.get(i);
250                        if(q instanceof AbstractStatsSupportingQueue<?>){
251                                AbstractStatsSupportingQueue<?> queue = (AbstractStatsSupportingQueue<?>) q;
252                                queue.setStats(scheduler.get_stat(), WormsConstants.TASKS_QUEUE_LENGTH_MEASURE_NAME + "_" + Integer.toString(i));
253                        }
254                }
255        }
256
257        protected Scheduler scheduler;
258
259        public Scheduler getScheduler() {
260                return scheduler;
261        }
262       
263        public abstract WorkloadUnitHandler getWorkloadUnitHandler();
264
265}
Note: See TracBrowser for help on using the repository browser.