source: DCWoRMS/branches/coolemall/src/test/testSOP/algorithms/multicloud/MetaSched.java @ 1606

Revision 1606, 10.1 KB checked in by wojtekp, 8 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package test.testSOP.algorithms.multicloud;
2
3import java.util.ArrayList;
4import java.util.Collections;
5import java.util.LinkedList;
6import java.util.List;
7import java.util.Random;
8
9import org.apache.commons.logging.Log;
10import org.apache.commons.logging.LogFactory;
11
12import schedframe.events.scheduling.SchedulingEvent;
13import schedframe.resources.StandardResourceType;
14import schedframe.resources.computing.ComputingResource;
15import schedframe.resources.computing.Node;
16import schedframe.resources.computing.Processor;
17import schedframe.scheduling.Scheduler;
18import schedframe.scheduling.SchedulerDescription;
19import schedframe.scheduling.manager.resources.ClusterResourceManager;
20import schedframe.scheduling.manager.resources.ManagedComputingResources;
21import schedframe.scheduling.manager.resources.ResourceManager;
22import schedframe.scheduling.manager.tasks.JobRegistry;
23import schedframe.scheduling.plan.SchedulingPlanInterface;
24import schedframe.scheduling.plan.impl.Allocation;
25import schedframe.scheduling.plan.impl.ScheduledTask;
26import schedframe.scheduling.plan.impl.SchedulingPlan;
27import schedframe.scheduling.plugin.Module;
28import schedframe.scheduling.plugin.ModuleList;
29import schedframe.scheduling.plugin.grid.ResourceDiscovery;
30import schedframe.scheduling.queue.TaskQueue;
31import schedframe.scheduling.queue.TaskQueueList;
32import schedframe.scheduling.tasks.TaskInterface;
33import schedframe.scheduling.tasks.WorkloadUnit;
34import example.globalplugin.BaseGlobalPlugin;
35
36
37public class MetaSched  extends BaseGlobalPlugin {
38        Log log = LogFactory.getLog(MetaSched.class);
39        private LinkedList<String> lastUsedResources = new LinkedList<String>();
40       
41        private static ArrayList<String> cacheTaskId = new ArrayList<String>();
42        private static ArrayList<String> cacheChosenCluster = new ArrayList<String>();
43        private static ArrayList<Integer> cacheNbTries = new ArrayList<Integer>();
44       
45        private static int MAX_TRIES = 3;
46        public SchedulingPlanInterface<?> schedule(SchedulingEvent event,
47                        TaskQueueList queues,
48                        JobRegistry jobRegistry,
49                        ResourceManager resourceManager, ModuleList modules)  {
50       
51               
52                ResourceDiscovery resources = null;
53                for(int i = 0; i < modules.size(); i++){
54                        Module m = modules.get(i);
55                        switch(m.getType()){
56                                case RESOURCE_DISCOVERY: resources = (ResourceDiscovery) m;
57                                        break;
58                        }
59                }
60               
61//              switch(event.getType()) {
62//                      case TIMER:
63//                              log.debug("Metasched called by timer");
64//                              incrementNbTriesCache();
65//                              removeOldCacheEntries();
66//                              break;
67//                      default:
68//                              log.debug("Defau");
69//                              break;
70//              }
71                SchedulingPlan plan = new SchedulingPlan();
72               
73                TaskQueue q = queues.get(0);
74//              log.debug("Queues size:"+queues.size());
75                int size = q.size();
76               
77                // order of the resources on this list is not determined
78                List<SchedulerDescription> availableResources = resources.getResources();
79               
80//              log.debug("Available resource: "+availableResources.size());
81//              log.debug(availableResources.get(0).getId());
82//              log.debug(resourceManager.getSchedulers().get(0).getFullName());
83//              log.debug(resourceManager.getSchedulers().get(0).getCompResources().get(0).getLoadInterface().getRecentUtilization().getValue());
84//              log.debug(resourceManager.getSchedulers().get(0).getCompResources().get(0).getFullName());
85//              log.debug(availableResources.get(0).getProvider().getPropertyKeys());
86//              getY(resourceManager.getSchedulers().get(0).getCompResources());
87               
88//              log.debug("0 : "+availableResources.get(0).getProvider().getProviderId());
89//              log.debug("1 : "+availableResources.get(1).getProvider().getProviderId());
90               
91               
92                //For each task in the queue
93                for(int i = 0; i < size; i++) {
94                        WorkloadUnit job = q.remove(0);
95                        TaskInterface<?> task = (TaskInterface<?>)job;
96                        Scheduler sched = chooseCloud(resourceManager.getSchedulers(), task);
97                        incrementNbTriesCache();
98                        removeOldCacheEntries();
99                        //Skip task if we dont have a cloud available
100                        if (sched == null)
101                                continue;
102                        cacheTaskId.add(task.getJobId());
103                        cacheChosenCluster.add(sched.getFullName());
104                        cacheNbTries.add(1);
105//                      log.debug("Choosen cloud : "+sched.getFullName());
106                        //Round robin
107                        SchedulerDescription sd = availableResources.get(availableResources.size() - 1);
108                        for(SchedulerDescription schedDesc: availableResources){
109                                if(!lastUsedResources.contains(schedDesc.getProvider().getProviderId())){
110                                        if(lastUsedResources.size() + 1 >= availableResources.size()){
111                                                lastUsedResources.poll();
112                                        }
113                                        lastUsedResources.add(schedDesc.getProvider().getProviderId());
114                                        sd = schedDesc;
115                                        break;
116                                }
117                        }
118                       
119                        Allocation allocation = new Allocation();
120                        allocation.setProcessesCount(1);
121                        allocation.setProviderName(sd.getProvider().getProviderId());
122                        ScheduledTask scheduledTask = new ScheduledTask(task);
123                        scheduledTask.setTaskId(task.getId());
124                        scheduledTask.setJobId(task.getJobId());
125                        scheduledTask.addAllocation(allocation);               
126                       
127                        plan.addTask(scheduledTask);
128                       
129                        //Metasched
130//                      Allocation allocation = new Allocation();
131//                      allocation.setProcessesCount(1);
132//                      allocation.setProviderName(sched.get_name());
133//                      ScheduledTask scheduledTask = new ScheduledTask(task);
134//                      scheduledTask.setTaskId(task.getId());
135//                      scheduledTask.setJobId(task.getJobId());
136//                      scheduledTask.addAllocation(allocation);               
137//                     
138//                      plan.addTask(scheduledTask);
139                }
140                return plan;
141        }
142
143        private void incrementNbTriesCache() {
144                for (int i = 0 ; i < cacheNbTries.size(); i++) {
145                        cacheNbTries.set(i, cacheNbTries.get(i)+1);
146                }
147        }
148       
149        private void removeOldCacheEntries() {
150                for (int i = 0 ; i < cacheNbTries.size(); i++) {
151                        if (cacheNbTries.get(i) > MAX_TRIES) {
152                                log.debug("Cache entry :"+cacheTaskId.get(i)+"/"+cacheChosenCluster.get(i)+"/"+cacheNbTries.get(i)+" is too old");
153                                cacheNbTries.remove(i);
154                                cacheTaskId.remove(i);
155                                cacheChosenCluster.remove(i);
156                        }
157                }
158        }
159
160        // return std::max((float)c.cpu_number_hosts_le_50p/(float)c.host_number, (float)c.mem_number_hosts_le_50p/(float)c.host_number);
161        private float getX(ManagedComputingResources compRes) {
162                float hostNumber = compRes.size();
163//              log.debug("Host number: "+hostNumber);
164                float cpu_number_hosts_le_50p=0;
165                float mem_number_hosts_le_50p=0;
166               
167                for (ComputingResource cr : compRes) {
168                        Node node = (Node)cr;
169//                      log.debug(node.getFullName());
170//                      log.debug("Free Processors = "+node.getFreeProcessorsNumber()+"/"+node.getProcessorsNumber());
171//                      log.debug(((Processor)node.getChildren().get(0)).);
172                        if ((float)node.getFreeProcessorsNumber() >= 0.5 * (float)node.getProcessorsNumber()) {
173                                cpu_number_hosts_le_50p++;
174                        }
175                        try {
176                                if ((float)node.getFreeMemory() >= 0.5 *(float)node.getTotalMemory()) {
177                                        mem_number_hosts_le_50p++;
178                                }
179                        } catch (NoSuchFieldException e) {
180                                // TODO Auto-generated catch block
181                                e.printStackTrace();
182                        }
183                }
184                float ret = Math.max(cpu_number_hosts_le_50p/hostNumber,mem_number_hosts_le_50p/hostNumber);
185//              log.debug("X=max("+cpu_number_hosts_le_50p+"/"+hostNumber+","+mem_number_hosts_le_50p+"/"+hostNumber+")="+ret);
186                return ret;
187        }
188       
189        //return (float)(c.highest_mean_resource_le_50_p / 100.0);
190        private float getY(ManagedComputingResources compRes) {
191                float mean_cpu_le_50p = 0;
192                float nbCPU=0;
193                float mean_mem_le_50p = 0;
194                float nbMEM=0;
195                for (ComputingResource cr : compRes) {
196                        Node node = (Node)cr;
197                        //node is loaded < 50%
198                        if ((float)node.getFreeProcessorsNumber() >= 0.5 * (float)node.getProcessorsNumber()) {
199                                mean_cpu_le_50p+=((float)node.getProcessorsNumber()-(float)node.getFreeProcessorsNumber())/(float)node.getProcessorsNumber();
200                                nbCPU++;
201//                              log.debug("CPU Inf 50p");
202//                              log.debug("(float)node.getProcessorsNumber() = "+(float)node.getProcessorsNumber());
203//                              log.debug("(float)node.getFreeProcessorsNumber() = "+(float)node.getFreeProcessorsNumber());
204                        }
205                        try {
206                                if ((float)node.getFreeMemory() >= 0.5 *(float)node.getTotalMemory()) {
207                                        mean_mem_le_50p+=((float)node.getTotalMemory() - (float)node.getFreeMemory()) / (float)node.getTotalMemory();
208                                        nbMEM++;
209//                                      log.debug(((float)node.getTotalMemory() - (float)node.getFreeMemory()));
210//                                      log.debug((float)node.getFreeMemory());
211//                                      log.debug("mem Inf 50p");
212                                }
213                        } catch (NoSuchFieldException e) {
214                                // TODO Auto-generated catch block
215                                e.printStackTrace();
216                        }
217                }
218                if (nbCPU != 0) {
219//                      log.debug("cpu: "+mean_cpu_le_50p+"/"+nbCPU);
220                        mean_cpu_le_50p = mean_cpu_le_50p / nbCPU;                             
221                }
222                else {
223                        mean_cpu_le_50p = (float) 0.5;
224                }
225                if (nbMEM != 0) {
226//                      log.debug("mem: "+mean_mem_le_50p+"/"+nbMEM);
227                        mean_mem_le_50p = mean_mem_le_50p / nbMEM;
228                }
229                else {
230                        mean_mem_le_50p = (float) 0.5;
231                }
232//              log.debug("Y = max("+mean_cpu_le_50p+","+mean_mem_le_50p+")");
233                return Math.max(mean_cpu_le_50p, mean_mem_le_50p);
234        }
235       
236        private Scheduler chooseCloud(List<Scheduler> schedulers, TaskInterface<?> task) {
237                Scheduler chosenCloud = null;
238                float value = Float.NEGATIVE_INFINITY;
239                Collections.sort(schedulers, new ComparatorRankingMetasched());
240                for (int i = 0 ; i < schedulers.size() ; i++) {
241                        //If scheduler has already been chosen, continue
242                        if (cloudAlreadyChosen(schedulers.get(i), task)) {
243                                log.debug("Task "+task.getJobId()+" already chose cloud "+schedulers.get(i).getFullName());
244                                continue;
245                        }
246                               
247//                      log.debug("Cloud : "+schedulers.get(i).get_name());
248                        float x = getX(schedulers.get(i).getCompResources());
249                        float y = getY(schedulers.get(i).getCompResources());
250                        float z = i;
251                        float newValue = x * ((float)schedulers.get(i).getCompResources().size() / 2)
252                                                - y * (float)schedulers.get(i).getCompResources().size()
253                                                - z;
254                        if (newValue > value) {
255                                chosenCloud = schedulers.get(i);
256                                value = newValue;
257                        }
258                }
259                if (chosenCloud == null) {
260                        log.debug("No cloud was found !");
261                }
262                return chosenCloud;
263        }
264
265
266        private boolean cloudAlreadyChosen(Scheduler sched, TaskInterface<?> task) {
267                for (int i = 0 ; i < cacheTaskId.size() ; i++) {
268                        if (task.getJobId().equals(cacheTaskId.get(i)) && sched.getFullName().equals(cacheChosenCluster.get(i))) {
269                                return true;
270                        }
271                }
272                return false;
273        }
274}
Note: See TracBrowser for help on using the repository browser.