source: DCWoRMS/trunk/src/example/localplugin/FCFSCPUFreqScalingClusterLocalPlugin.java @ 493

Revision 493, 6.8 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package example.localplugin;
2
3import gridsim.dcworms.DCWormsTags;
4
5import java.util.ArrayList;
6import java.util.HashMap;
7import java.util.List;
8import java.util.Map;
9
10import schedframe.events.scheduling.SchedulingEvent;
11import schedframe.events.scheduling.TaskFinishedEvent;
12import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent;
13import schedframe.resources.ResourceStatus;
14import schedframe.resources.computing.ComputingResource;
15import schedframe.resources.computing.Processor;
16import schedframe.resources.units.ProcessingElements;
17import schedframe.resources.units.ResourceUnit;
18import schedframe.resources.units.ResourceUnitName;
19import schedframe.resources.units.StandardResourceUnitName;
20import schedframe.scheduling.UsedResourcesList;
21import schedframe.scheduling.manager.resources.ClusterResourceManager;
22import schedframe.scheduling.manager.resources.ResourceManager;
23import schedframe.scheduling.manager.tasks.JobRegistry;
24import schedframe.scheduling.plan.SchedulingPlanInterface;
25import schedframe.scheduling.plan.impl.SchedulingPlan;
26import schedframe.scheduling.plugin.grid.ModuleList;
27import schedframe.scheduling.queue.TaskQueue;
28import schedframe.scheduling.queue.TaskQueueList;
29import schedframe.scheduling.tasks.TaskInterface;
30import schedframe.scheduling.tasks.WorkloadUnit;
31import dcworms.schedframe.scheduling.Executable;
32
33public class FCFSCPUFreqScalingClusterLocalPlugin extends BaseLocalSchedulingPlugin {
34
35        List<Processor> allocatedCPUs;
36        public FCFSCPUFreqScalingClusterLocalPlugin () {
37                allocatedCPUs = new ArrayList<Processor>();
38        }
39
40        public SchedulingPlanInterface<?> schedule(SchedulingEvent event, TaskQueueList queues, JobRegistry jobRegistry,
41                        ResourceManager resManager, ModuleList modules) {
42
43                ClusterResourceManager resourceManager = (ClusterResourceManager) resManager;
44                SchedulingPlan plan = new SchedulingPlan();
45                // our tasks are placed only in first queue (see
46                // BaseLocalPlugin.placeJobsInQueues() method)
47                TaskQueue q = queues.get(0);
48                // chose the events types to serve.
49                // Different actions for different events are possible.
50                switch (event.getType()) {
51               
52                case START_TASK_EXECUTION:
53
54                        // check all tasks in queue
55                        for (int i = 0; i < q.size(); i++) {
56                                WorkloadUnit job = q.get(i);
57                                TaskInterface<?> task = (TaskInterface<?>) job;
58                                // if status of the tasks in READY
59                                if (task.getStatus() == DCWormsTags.READY) {
60
61                                        Map<ResourceUnitName, ResourceUnit> choosenResources = chooseResourcesForExecution(resourceManager, task);
62                                        if (choosenResources  != null) {
63                                                addToSchedulingPlan(plan, task, choosenResources);
64                                                ProcessingElements pes = (ProcessingElements)choosenResources.get(StandardResourceUnitName.PE);
65                                                List<Processor> processors =  new ArrayList<Processor>();
66                                                for(ComputingResource res : pes){
67                                                        processors.add((Processor) res);
68                                                }
69                                                adjustFrequency(ResourceStatus.BUSY,processors);
70                                        }
71                                }
72                        }
73                        break;
74                       
75                case TASK_FINISHED:
76                        TaskFinishedEvent finEvent = (TaskFinishedEvent) event;
77                        Executable exec = (Executable) jobRegistry.getExecutable(finEvent.getJobId(), finEvent.getTaskId());
78                        UsedResourcesList usedResourcesList = exec.getUsedResources();
79                        ProcessingElements pes = (ProcessingElements)usedResourcesList.getLast().getResourceUnits().get(StandardResourceUnitName.PE);
80                        List<Processor> processors =  new ArrayList<Processor>();
81                        for(ComputingResource res : pes){
82                                processors.add((Processor) res);
83                                allocatedCPUs.add((Processor) res);
84                        }
85                        adjustFrequency(ResourceStatus.FREE, processors);
86                        break;
87                       
88                case TASK_REQUESTED_TIME_EXPIRED:
89                        TaskRequestedTimeExpiredEvent timExpEvent = (TaskRequestedTimeExpiredEvent) event;
90                        exec = (Executable) jobRegistry.getExecutable(timExpEvent.getJobId(), timExpEvent.getTaskId());
91                        usedResourcesList = exec.getUsedResources();
92                        pes = (ProcessingElements)usedResourcesList.getLast().getResourceUnits().get(StandardResourceUnitName.PE);
93                        processors =  new ArrayList<Processor>();
94                        for(ComputingResource res : pes){
95                                allocatedCPUs.remove((Processor) res);
96                        }
97                        // check all tasks in queue
98                        for (int i = 0; i < q.size(); i++) {
99                                WorkloadUnit job = q.get(i);
100                                TaskInterface<?> task = (TaskInterface<?>) job;
101                                // if status of the tasks in READY
102                                if (task.getStatus() == DCWormsTags.READY) {
103
104                                        Map<ResourceUnitName, ResourceUnit> choosenResources = chooseResourcesForExecution(resourceManager, task);
105                                        if (choosenResources  != null) {
106                                                addToSchedulingPlan(plan, task, choosenResources);
107                                                pes = (ProcessingElements)choosenResources.get(StandardResourceUnitName.PE);
108                                                processors =  new ArrayList<Processor>();
109                                                for(ComputingResource res : pes){
110                                                        processors.add((Processor) res);
111                                                }
112                                                adjustFrequency(ResourceStatus.BUSY, processors);
113                                        }
114                                }
115                        }
116                        break;
117                }
118                return plan;
119        }
120       
121        private Map<ResourceUnitName, ResourceUnit> chooseResourcesForExecution(
122                        ClusterResourceManager resourceManager, TaskInterface<?> task) {
123
124                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>();
125
126                int cpuRequest;
127                try {
128                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
129                } catch (NoSuchFieldException e) {
130                        cpuRequest = 1;
131                }
132
133                if (cpuRequest != 0) {
134                        List<ComputingResource> choosenResources = null;
135                        List<Processor> processors = resourceManager.getProcessors();
136                        processors.removeAll(allocatedCPUs);
137                        if (processors.size() < cpuRequest) {
138                                // log.warn("Task requires more cpus than is availiable in this resource.");
139                                return null;
140                        }
141
142                        choosenResources = new ArrayList<ComputingResource>();
143
144                        for (int i = 0; i < processors.size() && cpuRequest > 0; i++) {
145                                if (processors.get(i).getStatus() == ResourceStatus.FREE) {
146                                        choosenResources.add(processors.get(i));
147                                        cpuRequest--;
148                                }
149                        }
150                        if (cpuRequest > 0) {
151                                // log.info("Task " + task.getJobId() + "_" + task.getId() +
152                                // " requires more cpus than is availiable in this moment.");
153                                return null;
154                        }
155
156                        ProcessingElements result = new ProcessingElements();
157                        result.addAll(choosenResources);
158                        map.put(StandardResourceUnitName.PE, result);
159                }
160                return map;
161        }
162
163        private void adjustFrequency(ResourceStatus status, List<Processor> processors){
164                switch(status){
165                case BUSY:
166                        for(Processor cpu: processors){
167                                try {
168                                        if(cpu.getPowerInterface().getSupportedPStates().containsKey("P0"))
169                                                cpu.getPowerInterface().setPState("P0");
170                                } catch (NoSuchFieldException e) {
171                                        // TODO Auto-generated catch block
172                                        e.printStackTrace();
173                                }
174                        }
175                        break;
176                case FREE:
177                        for(Processor cpu: processors){
178                                try {
179                                        if(cpu.getPowerInterface().getSupportedPStates().containsKey("P3"))
180                                                cpu.getPowerInterface().setPState("P3");
181                                } catch (NoSuchFieldException e) {
182                                        // TODO Auto-generated catch block
183                                        e.printStackTrace();
184                                }
185                        }
186                        break;
187                }
188        }
189       
190        public String getName() {
191                return getClass().getName();
192        }
193
194}
Note: See TracBrowser for help on using the repository browser.