source: DCWoRMS/branches/coolemall/src/test/testSOP/algorithms/multicloud/SOPVP.java @ 1606

Revision 1606, 26.3 KB checked in by wojtekp, 8 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package test.testSOP.algorithms.multicloud;
2
3import java.io.BufferedReader;
4import java.io.FileNotFoundException;
5import java.io.FileWriter;
6import java.io.IOException;
7import java.io.InputStreamReader;
8import java.io.PrintWriter;
9import java.util.ArrayList;
10import java.util.Collections;
11import java.util.HashMap;
12import java.util.List;
13import java.util.Map;
14
15import org.apache.commons.logging.Log;
16import org.apache.commons.logging.LogFactory;
17
18import qcg.shared.constants.BrokerConstants;
19
20import dcworms.schedframe.scheduling.ExecTask;
21
22import schedframe.events.scheduling.SchedulingEvent;
23import schedframe.resources.ResourceStatus;
24import schedframe.resources.StandardResourceType;
25import schedframe.resources.computing.ComputingResource;
26import schedframe.resources.computing.Node;
27import schedframe.resources.computing.Processor;
28import schedframe.resources.computing.profiles.energy.power.StandardPowerStateName;
29import schedframe.resources.units.Memory;
30import schedframe.resources.units.ProcessingElements;
31import schedframe.resources.units.ResourceUnit;
32import schedframe.resources.units.ResourceUnitName;
33import schedframe.resources.units.StandardResourceUnitName;
34import schedframe.scheduling.manager.resources.ClusterResourceManager;
35import schedframe.scheduling.manager.resources.ResourceManager;
36import schedframe.scheduling.manager.tasks.JobRegistry;
37import schedframe.scheduling.manager.tasks.JobRegistryImpl;
38import schedframe.scheduling.plan.SchedulingPlanInterface;
39import schedframe.scheduling.plan.impl.SchedulingPlan;
40import schedframe.scheduling.plugin.ModuleList;
41import schedframe.scheduling.queue.TaskQueue;
42import schedframe.scheduling.queue.TaskQueueList;
43import schedframe.scheduling.tasks.TaskInterface;
44import simulator.DataCenterWorkloadSimulator;
45import simulator.stats.implementation.DCWormsStatistics;
46import test.testSOP.ComparatorHostHighestLoadFirst;
47import test.testSOP.ComparatorHostLowestLoadFirst;
48import test.testSOP.Utility;
49import example.localplugin.BaseLocalSchedulingPlugin;
50import gridsim.GridSimTags;
51import gridsim.dcworms.DCWormsTags;
52
53public class SOPVP extends BaseLocalSchedulingPlugin {
54        Log log = LogFactory.getLog(SOPVP.class);
55       
56        public static String MIGRATION_PATH = "src/test/testSOP_results/data/multicloud/migrations.txt";
57        public static String ON_VS_EXPECTED = "src/test/testSOP_results/data/multicloud/on_vs_expected.txt";
58        public static String COMPUTATION_TIME = "src/test/testSOP_results/data/multicloud/computation_time.txt";
59        public static String TASK_REJECTIONS = "src/test/testSOP_results/data/multicloud/task_rejections.txt";
60
61       
62        public SchedulingPlanInterface<?> schedule(SchedulingEvent event, TaskQueueList queues, JobRegistry jobRegistry,
63                        ResourceManager resManager, ModuleList modules) {
64
65                ClusterResourceManager resourceManager = (ClusterResourceManager) resManager;
66//              List<Processor> processors = resourceManager.getProcessors();
67//              List<Node> nodes = resourceManager.getNodes();
68                List<Node> nodes = Utility.getNodesByPowerState(resourceManager.getNodes(), StandardPowerStateName.ON);
69                SchedulingPlan plan = new SchedulingPlan();
70                // choose the events types to serve.
71                // Different actions for different events are possible.
72                switch (event.getType()) {
73                case START_TASK_EXECUTION:
74                case TASK_FINISHED:
75                        // our tasks are placed only in first queue (see BaseLocalSchedulingPlugin.placeJobsInQueues() method)
76                        TaskQueue q = queues.get(0);
77                        // check all tasks in queue
78
79                        for (int i = 0; i < q.size(); i++) {
80                                TaskInterface<?> task = q.get(i);
81                                // if status of the tasks in READY
82                                if (task.getStatus() == DCWormsTags.READY) {
83                                        Map<ResourceUnitName, ResourceUnit> choosenResources = chooseResourcesForExecution(nodes, task);
84                                        if (choosenResources != null) {
85                                                addToSchedulingPlan(plan, task, choosenResources);
86                                        }
87                                        else {
88                                                sendBackTask(task, jobRegistry, q, resourceManager.getNodes().get(0).getFullName().split("/")[0]);
89                                        }
90                                        //FOR TESTING PURPOSE
91//                                      if (resourceManager.getNodes().get(0).getFullName().contains("ResDataCenter_2")) {
92//                                             
93//                                              sendBackTask(task, jobRegistry, q, resourceManager.getNodes().get(0).getFullName().split("/")[0]);
94//                                      }
95//                                      else {
96//                                              Map<ResourceUnitName, ResourceUnit> choosenResources = chooseResourcesForExecution(nodes, task);
97//                                              if (choosenResources != null) {
98//                                                      addToSchedulingPlan(plan, task, choosenResources);
99//                                              }
100//                                      }
101                                }
102                        }
103                        break;
104//              case POWER_LIMIT_EXCEEDED:
105////                    String src = event.getSource();
106////                    optimizeEnergyUsage(jobRegistry, resourceManager , src);
107//                      break;
108                       
109                case TIMER:
110                       
111                        log.debug("Scheduler Invoked by timer");
112                        long startTime = System.nanoTime();
113                        consolidate(jobRegistry, resourceManager);
114                        long endTime = System.nanoTime();
115                        long duration = (endTime - startTime) / 1000000;  //divide by 1000000 to get milliseconds.
116                        try {
117                                PrintWriter writer = new PrintWriter(new FileWriter(COMPUTATION_TIME, true));
118                                writer.write(String.format("%d\t%d\n", resourceManager.getNodes().size(), duration));
119                                writer.close();
120                        } catch (IOException e) {
121                                // TODO Auto-generated catch block
122                                e.printStackTrace();
123                        }
124                       
125                        Utility.manageHosts(jobRegistry, resourceManager, log);
126                case JOB_ARRIVED:
127                        break;
128                case JOB_CANCELED:
129                        break;
130                case JOB_FAILED:
131                        break;
132                case JOB_FINISHED:
133                        break;
134                case PROVIDER_FAILED:
135                        break;
136                case RESOURCE_FAILED:
137                        break;
138                case RESOURCE_STATE_CHANGED:
139                        break;
140                case TASK_ARRIVED:
141                        break;
142                case TASK_CANCELED:
143                        break;
144                case TASK_FAILED:
145                        break;
146                case TASK_MOVED:
147                        break;
148                case TASK_PAUSED:
149                        break;
150                case TASK_REQUESTED_TIME_EXPIRED:
151                        break;
152                case TASK_RESUMED:
153                        break;
154                default:
155                        break;
156                }
157               
158                return plan;
159        }
160
161
162       
163
164       
165
166
167       
168
169
170        /**
171         * Consolidate
172         * @param jobRegistry
173         * @param resourceManager
174         */
175        @SuppressWarnings("unchecked")
176        private void consolidate(JobRegistry jobRegistry, ClusterResourceManager resourceManager) {
177                boolean success = true;
178                List<Node> nodeList = (List<Node>) resourceManager.getResourcesByTypeWithStatus(StandardResourceType.Node, ResourceStatus.FREE);
179                Node leastLoadedNode = this.getLeastLoadedHost(nodeList);
180               
181                //all nodes are empty
182                if (leastLoadedNode == null) {
183                        log.debug("All nodes are empty");
184                        return;
185                }
186                log.debug("Least loaded node is "+leastLoadedNode.getFullName());
187       
188                //Sort host lowest load first
189                Collections.sort(nodeList, new ComparatorHostLowestLoadFirst());
190               
191                //Print the nodelist
192//              this.printNodes(nodeList);
193               
194                ArrayList<Node> hostListMostCPU = new ArrayList<Node>();
195                ArrayList<Node> hostListMostMEM = new ArrayList<Node>();
196                ArrayList<String> migrateVM= new ArrayList<String>(); //VM to migrate
197                ArrayList<String> migrateHost= new ArrayList<String>(); //Host to migrate to
198               
199                //Sort in 2 lists
200                for (Node h : nodeList) {
201                        if (h == leastLoadedNode)
202                                continue;
203                        if (getCPULoad(h, null, null, null) > getMEMLoad(h, null, null, null)){
204                                hostListMostCPU.add(h);
205                        }
206                        else {
207                                hostListMostMEM.add(h);
208                        }
209                }
210               
211                JobRegistry leastLoadedNodeJR = new JobRegistryImpl(leastLoadedNode.getFullName());
212                List<ExecTask> runningTasks = leastLoadedNodeJR.getRunningTasks();
213                //For each VM
214                for (int i = 0 ;  i<runningTasks.size() && success==true ; i++) {
215                        ExecTask t = runningTasks.get(i);
216//                      VM vm = getVMFromId(runningTasks.get(i), vmList);
217                        boolean found = false;
218//                      log.debug("There are "+hostListMostCPU.size()+" hosts with More CPU used than MEM");
219//                      log.debug("There are "+hostListMostMEM.size()+" hosts with More MEM used than CPU");
220                        //if VM is CPU bound
221                        if (this.taskIsCPUBound(t)) {
222//                              log.debug("Task "+t.getJobId()+" is CPU bound");
223                                //Try MOSTMEMLIST first
224                                for (int h = 0 ; h<hostListMostMEM.size() && !found ; h++) {
225                                        Node otherhost = hostListMostMEM.get(h);
226                                        if (taskCanFitOnNode(t, otherhost, jobRegistry.getRunningTasks(), migrateVM, migrateHost) && otherhost != leastLoadedNode && !Utility.nodeIsEmpty(otherhost, log)) {
227//                                              log.debug("Task "+t.getJobId()+" can fit on host in MOSTMEMLIST "+otherhost.getFullName());
228                                                //register migration
229                                                migrateVM.add(t.getJobId());
230                                                migrateHost.add(otherhost.getFullName());
231                                                //update usage
232//                                              otherhost.setCpuUsage(otherhost.getCpuUsage()+vm.getCpuNumber()*100);
233//                                              otherhost.setMemUsage(otherhost.getMemUsage()+vm.getMem());
234                                                //If needed change list of h
235                                                if (getCPULoad(otherhost, jobRegistry.getRunningTasks(), migrateVM, migrateHost) < getMEMLoad(otherhost, jobRegistry.getRunningTasks(), migrateVM, migrateHost)) {
236                                                        hostListMostMEM.remove(otherhost);
237                                                        hostListMostCPU.add(otherhost);
238                                                }
239                                                found = true;
240                                        }
241                                }
242                                //Then try MOSTCPULIST
243                                for (int h = 0 ; h<hostListMostCPU.size() && !found ; h++) {
244                                        Node otherhost = hostListMostCPU.get(h);
245                                        if (taskCanFitOnNode(t, otherhost, jobRegistry.getRunningTasks(), migrateVM, migrateHost) && otherhost != leastLoadedNode && !Utility.nodeIsEmpty(otherhost, log)) {
246//                                              log.debug("Task "+t.getJobId()+" can fit on host in MOSTCPULIST "+otherhost.getFullName());
247                                                //register migration
248                                                migrateVM.add(t.getJobId());
249                                                migrateHost.add(otherhost.getFullName());
250                                                //update usage
251//                                              otherhost.setCpuUsage(otherhost.getCpuUsage()+vm.getCpuNumber()*100);
252//                                              otherhost.setMemUsage(otherhost.getMemUsage()+vm.getMem());
253                                                //If needed change list of h
254                                                if (getCPULoad(otherhost, jobRegistry.getRunningTasks(), migrateVM, migrateHost) < getMEMLoad(otherhost, jobRegistry.getRunningTasks(), migrateVM, migrateHost)) {
255                                                        hostListMostCPU.remove(otherhost);
256                                                        hostListMostMEM.add(otherhost);
257                                                }
258                                                found = true;
259                                        }
260                                }
261                        }
262                        //If VM is MEM bound
263                        else{
264//                              log.debug("Task "+t.getJobId()+" is MEM bound");
265                                //try MOSTCPULIST first
266                                for (int h = 0 ; h<hostListMostCPU.size() && !found ; h++) {
267                                        Node otherhost = hostListMostCPU.get(h);
268                                        if (taskCanFitOnNode(t, otherhost, jobRegistry.getRunningTasks(), migrateVM, migrateHost) && otherhost != leastLoadedNode && !Utility.nodeIsEmpty(otherhost, log)) {
269//                                              log.debug("VM "+t.getJobId()+" can fit on host in MOSTCPULIST "+otherhost.getFullName());
270                                                //register migration
271                                                migrateVM.add(t.getJobId());
272                                                migrateHost.add(otherhost.getFullName());
273                                                //update usage
274//                                              otherhost.setCpuUsage(otherhost.getCpuUsage()+vm.getCpuNumber()*100);
275//                                              otherhost.setMemUsage(otherhost.getMemUsage()+vm.getMem());
276                                                //If needed change list of h
277                                                if (getCPULoad(otherhost, jobRegistry.getRunningTasks(), migrateVM, migrateHost) < getMEMLoad(otherhost, jobRegistry.getRunningTasks(), migrateVM, migrateHost)) {
278                                                        hostListMostCPU.remove(otherhost);
279                                                        hostListMostMEM.add(otherhost);
280                                                }
281                                                found = true;
282                                        }
283                                }
284                                //then MOSTMEMLIST
285                                for (int h = 0 ; h<hostListMostMEM.size() && !found ; h++) {
286                                        Node otherhost = hostListMostMEM.get(h);
287                                        if (taskCanFitOnNode(t, otherhost, jobRegistry.getRunningTasks(), migrateVM, migrateHost) && otherhost != leastLoadedNode && !Utility.nodeIsEmpty(otherhost, log)) {
288//                                              log.debug("Task "+t.getJobId()+" can fit on host in MOSTMEMLIST "+otherhost.getFullName());
289                                                //register migration
290                                                migrateVM.add(t.getJobId());
291                                                migrateHost.add(otherhost.getFullName());
292                                                //update usage
293//                                              otherhost.setCpuUsage(otherhost.getCpuUsage()+vm.getCpuNumber()*100);
294//                                              otherhost.setMemUsage(otherhost.getMemUsage()+vm.getMem());
295                                                //If needed change list of h
296                                                if (getCPULoad(otherhost, jobRegistry.getRunningTasks(), migrateVM, migrateHost) < getMEMLoad(otherhost, jobRegistry.getRunningTasks(), migrateVM, migrateHost)) {
297                                                        hostListMostMEM.remove(otherhost);
298                                                        hostListMostCPU.add(otherhost);
299                                                }
300                                                found = true;
301                                        }
302                                }
303                        }
304                        if (!found) {
305//                              log.debug("Could not find a suitable host for task : "+t.getJobId());
306                                success = false;
307                        }
308                }
309                //least loaded host is empty -> success = false
310                if (migrateVM.isEmpty()) {
311                        success=false;
312                }
313               
314                //if success, enforce realloc
315                if (success) {
316                        for (int i = 0 ; i<migrateVM.size() ; i++) {
317//                              log.info("-- Migration -- : Task "+migrateVM.get(i)+" goes to node "+migrateHost.get(i));
318                        }
319                        log.info("Enforcing reallocation");
320                       
321                        //Write fmigrations into file (might be slow)
322                        PrintWriter writer = null;
323                        PrintWriter writer2 = null;
324                        try {
325//                              log.debug(System.getProperty("user.dir"));
326                                writer = new PrintWriter(new FileWriter(MIGRATION_PATH, true));
327                                writer2 = new PrintWriter(new FileWriter(ON_VS_EXPECTED, true));
328                               
329//                              log.debug("MigrateVM "+migrateVM.size()+" = "+migrateVM.get(0));
330                                for (int i = 0 ; i<migrateVM.size() ; i++) {
331                                        ExecTask t = jobRegistry.getTask(migrateVM.get(i), "0");
332                                       
333//                                      log.debug("TaskID: "+t.getId()+"/"+t.getJobId());
334                                        //should give the ok for migration
335                                        ArrayList<Node> nlist = new ArrayList<Node>();
336                                        nlist.add((Node) resourceManager.getResourceByName(migrateHost.get(i)));
337                                       
338                                        log.debug("Destination host: "+nlist.get(0).getFullName());
339                                        log.debug("Task : "+t.getJobId());
340                                        Map<ResourceUnitName, ResourceUnit> destinationResources = chooseResourcesForExecution(nlist , t);
341                                        jobRegistry.migrateTask(t.getJobId(), t.getId(), destinationResources);
342                                        writer.println(String.format("%s\t%s", t.getJobId(), nlist.get(0).getName()));
343                                        int onNodes=-1;
344                                        for (Node n : resourceManager.getNodes()) {
345                                                if (n.getPowerInterface().getPowerState().equals(StandardPowerStateName.ON)
346                                                                || n.getPowerInterface().getPowerState().equals(StandardPowerStateName.BOOT))
347                                                        onNodes++;
348                                        }
349                                        //SI la migration peut amener une extinction
350                                        if (Utility.getExpectedHostNumber(jobRegistry, resourceManager, log) < onNodes)
351                                                writer2.println("TRUE");
352                                        else
353                                                writer2.println("FALSE");
354//                                      writer2.println(String.format("%d\t%d", Utility.getExpectedHostNumber(jobRegistry, resourceManager, log), onNodes));
355                                }
356                                writer.close();
357                                writer2.close();
358                        } catch (IOException e) {
359                                e.printStackTrace();
360                        } finally {
361                                writer.close();
362                        }
363                }
364        }
365
366//      private void optimizeEnergyUsage(JobRegistry jobRegistry, ClusterResourceManager resourceManager, String resName) {
367//              Node overLoadedNode = (Node)resourceManager .getResourceByName(resName);
368//              Node leastLoadedNode = findLeastLoadedResource(resourceManager.getNodes());
369//             
370//              if(leastLoadedNode.getLoadInterface().getRecentUtilization().getValue() >= overLoadedNode.getLoadInterface().getRecentUtilization().getValue()){
371//                      return;
372//              }
373//
374//              JobRegistry srcHostJr = new JobRegistryImpl(overLoadedNode.getFullName());
375//              int runningTasksNumber = srcHostJr.getRunningTasks().size();
376//              for(int i = srcHostJr.getRunningTasks().size() - 1; i >= 0 && runningTasksNumber > 2; i--) {
377//                      ExecTask execTask = srcHostJr.getRunningTasks().get(i);
378//                      Map<ResourceUnitName, ResourceUnit> destinationResources = chooseResourcesForExecution(findLeastLoadedResource(resourceManager.getNodes()).getProcessors(), execTask);
379//                      jobRegistry.migrateTask(execTask.getJobId(), execTask.getId(), destinationResources);
380//                      runningTasksNumber--;
381//              }
382//      }
383       
384       
385
386
387
388       
389
390        /**
391         * Is task CPUbound compared to reference
392         * @param t
393         * @return
394         */
395        private boolean taskIsCPUBound(ExecTask t) {
396                //ATTENTION ICI NB PROC EN DUR
397                try {
398                        return ((float) t.getCpuCntRequest() / (float)400) > ((float)t.getMemoryRequest() / (float)16384.0);
399                } catch (NoSuchFieldException e) {
400                        // TODO Auto-generated catch block
401                        e.printStackTrace();
402                }
403                log.debug("Could not determine if task "+t.getJobId()+" was cpubound");
404                return true;
405        }
406
407        private float getMEMLoad(Node h, List<ExecTask> tasklist, ArrayList<String> migrateVM, ArrayList<String> migrateHost) {
408                try {
409                        int additional = 0;
410                        if (tasklist != null && migrateVM != null && migrateHost != null) {
411                                //Get the migrating toward node h
412                                for (int i = 0 ; i < migrateHost.size() ; i++) {
413                                        if (h.getFullName().equals(migrateHost.get(i))) {
414                                                for(int j=0 ; j<tasklist.size() ; j++) {
415                                                        if (migrateVM.get(i).equals(tasklist.get(j))) {
416                                                                additional+=tasklist.get(j).getMemoryRequest();
417                                                        }
418                                                }
419                                        }
420                                }
421                        }
422                       
423                        return 1 - (((float)h.getFreeMemory()-(float)additional) / (float)h.getTotalMemory());
424                } catch (NoSuchFieldException e) {
425                        // TODO Auto-generated catch block
426                        e.printStackTrace();
427                }
428                return 0;
429        }
430
431        private float getCPULoad(Node h, List<ExecTask> tasklist, ArrayList<String> migrateVM, ArrayList<String> migrateHost) {
432                int additional = 0;
433                if (tasklist != null && migrateVM != null && migrateHost != null) {
434                        //Get the migrating toward node h
435                        for (int i = 0 ; i < migrateHost.size() ; i++) {
436                                if (h.getFullName().equals(migrateHost.get(i))) {
437                                        for(int j=0 ; j<tasklist.size() ; j++) {
438                                                if (migrateVM.get(i).equals(tasklist.get(j))) {
439                                                        try {
440                                                                additional+=tasklist.get(j).getCpuCntRequest();
441                                                        } catch (NoSuchFieldException e) {
442                                                                // TODO Auto-generated catch block
443                                                                e.printStackTrace();
444                                                        }
445                                                }
446                                        }
447                                }
448                        }
449                }
450                return 1 - (((float) h.getFreeProcessorsNumber()-(float)additional)  / (float)h.getProcessorsNumber());
451        }
452
453        private void printNodes(List<Node> nodeList) {
454                for (Node n : nodeList){
455                        try {
456                                log.debug(n.getName() + "("+n.getPowerInterface().getPowerState().getLabel()+"): " +
457                                                "CPU="+n.getFreeProcessorsNumber()+"/"+n.getProcessorsNumber() +"(" + (float)n.getFreeProcessorsNumber() / (float)n.getProcessorsNumber()+ ")" +
458                                                " / MEM: " +n.getFreeMemory()+"/"+n.getTotalMemory()+ "("+(float)n.getFreeMemory()/(float)n.getTotalMemory()+")");
459                        } catch (NoSuchFieldException e) {
460                                e.printStackTrace();
461                        }
462                }
463        }
464
465        /**
466         * Gets the most loaded resource of the node n
467         * @param n
468         * @return
469         */
470        private float getMostLoadedResource(Node n) {
471                float memLoad = 0, cpuLoad;
472                try {
473                        memLoad = (float)1 - (float)n.getFreeMemory() / (float)n.getTotalMemory();
474                } catch (NoSuchFieldException e) {
475                        // TODO Auto-generated catch block
476                        e.printStackTrace();
477                }
478                cpuLoad = (float)1 - (float)n.getFreeProcessorsNumber() / (float)n.getProcessorsNumber();
479//              log.debug("GetMostLoadedResource node "+n.getName()+": "+cpuLoad+" -- "+memLoad);
480                if (cpuLoad > memLoad) {
481                        return cpuLoad;
482                }
483                else {
484                        return memLoad;
485                }
486        }
487       
488        /**
489         * Gets the least loaded host from the list
490         * Host powered on is not tested
491         * @param nodeList
492         * @return
493         */
494        private Node getLeastLoadedHost(List<Node> nodeList) {
495                Node n = null;
496                float min = 2;
497                for (int i = 0 ; i < nodeList.size() ; i++) {
498                        float mostLoadedResource = getMostLoadedResource(nodeList.get(i));
499                        if (min >= mostLoadedResource  && mostLoadedResource > 0) {
500                                n = nodeList.get(i);
501                                min = mostLoadedResource;
502                        }
503                }
504                if (n == null) {
505                        log.error("Least loaded node is null !");
506                        return null;
507                }
508//              logger.debug("Least loaded host is : "+h.getId() +" (CPU="+h.getCpuUsage()+"/MEM="+h.getMemUsage()+")");
509                return n;
510        }
511
512        /**
513         * Dummy function to test full allocation to 1 proc
514         * @param processors
515         * @param task
516         * @return
517         */
518        private Map<ResourceUnitName, ResourceUnit> chooseNode1(
519                        List<Processor> processors, TaskInterface<?> task) {
520                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>(1);
521                int cpuRequest = 0;
522                try {
523                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
524                } catch (NoSuchFieldException e) {
525                        // TODO Auto-generated catch block
526                        e.printStackTrace();
527                }
528                List<ComputingResource> choosenResources = new ArrayList<ComputingResource>(cpuRequest);
529                choosenResources.add(processors.get(0));
530                ProcessingElements pe = new ProcessingElements();
531                pe.addAll(choosenResources);
532                map.put(StandardResourceUnitName.PE, pe);
533                return map;
534        }
535       
536        /**
537         *
538         * @param processors
539         * @param task
540         * @return
541         */
542        private Map<ResourceUnitName, ResourceUnit> chooseResourcesForExecution(
543                        List<Node> nodes, TaskInterface<?> task) {
544
545                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>(1);
546               
547                List<ComputingResource> chosenResources = new ArrayList<ComputingResource>();   
548                boolean found = false;
549                Memory memory = null;
550                int cpuRequest=0;
551               
552                Collections.sort(nodes, new ComparatorHostHighestLoadFirst());
553               
554                for (int h=0 ; h<nodes.size() && !found ; h++){
555                        Node n = nodes.get(h);
556//                      log.debug("Task "+task.getId()+"/"+task.getJobId()+" can fit?");
557                        if (taskCanFitOnNode(task, n, null, null, null)){
558                                try {
559                                        log.debug("Task "+task.getId()+"/"+task.getJobId()+" (CPU="+task.getCpuCntRequest()+" ; MEM="+task.getMemoryRequest()+") can fit on node "+n.getFullName());
560                                } catch (NoSuchFieldException e) {
561                                        // TODO Auto-generated catch block
562                                        e.printStackTrace();
563                                }
564                                found=true;
565                               
566                                try {
567                                        cpuRequest = (int)task.getCpuCntRequest();
568                                        memory = new Memory(n.getMemory(), (int)task.getMemoryRequest(), (int)task.getMemoryRequest());
569                                } catch (NoSuchFieldException e1) {
570                                        e1.printStackTrace();
571                                }
572                                //Adding chosen processors
573                                for (int i = 0; i < n.getProcessors().size() && cpuRequest > 0; i++) {
574                                        if (n.getProcessors().get(i).getStatus() == ResourceStatus.FREE) {
575                                                chosenResources.add(n.getProcessors().get(i));
576                                                cpuRequest--;
577                                        }
578                                }
579                                if (cpuRequest > 0) {
580                                        continue;
581                                }
582                        }
583                }
584                if (!found) {
585                        log.info("Allocation not found for task "+task.getJobId());
586                        return null;
587                }
588               
589                ProcessingElements pe = new ProcessingElements();
590                pe.addAll(chosenResources);
591                map.put(StandardResourceUnitName.PE, pe);
592                if (memory != null)
593                        map.put(StandardResourceUnitName.MEMORY, memory);
594                else
595                        return null;
596                return map;
597        }
598       
599//      /**
600//       * FCFS CPU
601//       * @param processors
602//       * @param task
603//       * @return
604//       */
605//      private Map<ResourceUnitName, ResourceUnit> chooseResourcesForExecution(
606//                      List<Processor> processors, TaskInterface<?> task) {
607//
608//              Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>(1);
609//
610//              int cpuRequest;
611//              try {
612//                      cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
613//              } catch (NoSuchFieldException e) {
614//                      cpuRequest = 0;
615//              }
616//
617//              if (cpuRequest != 0) {
618//
619//                      if (processors.size() < cpuRequest) {
620//                              return null;
621//                      }
622//
623//                      List<ComputingResource> choosenResources = new ArrayList<ComputingResource>(cpuRequest);                               
624//                      for (int i = 0; i < processors.size() && cpuRequest > 0; i++) {
625//                              if (processors.get(i).getStatus() == ResourceStatus.FREE) {
626//                                      choosenResources.add(processors.get(i));
627//                                      cpuRequest--;
628//                              }
629//                      }
630//                      if (cpuRequest > 0) {
631//                              return null;
632//                      }
633//
634//                      ProcessingElements pe = new ProcessingElements();
635//                      pe.addAll(choosenResources);
636//                      map.put(StandardResourceUnitName.PE, pe);
637//                      return map;
638//              }
639//
640//              return null;
641//      }
642
643        /**
644         * Checks whether a task can fit on a node
645         * @param task
646         * @param n
647         * @param migrateVM
648         * @param runningTasks
649         * @param migrateHost
650         * @return
651         */
652        private boolean taskCanFitOnNode(TaskInterface<?> task, Node n, List<ExecTask> runningTasks, ArrayList<String> migrateVM, ArrayList<String> migrateHost) {
653                int cpuRequest;
654                try {
655                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
656                } catch (NoSuchFieldException e) {
657                        log.warn("Task : "+task.getId()+" does not require CPU !");
658                        cpuRequest = 0;
659                }
660               
661                int memRequest;
662                try {
663                        memRequest = (int) task.getMemoryRequest();
664                } catch (NoSuchFieldException e) {
665                        log.warn("Task : "+task.getJobId()+" does not require MEMORY !");
666                        memRequest = 0;
667                }
668//              log.debug("Task "+task.getId()+" => CPU="+cpuRequest+"/MEM="+memRequest);
669
670                if (cpuRequest == 0 && memRequest == 0){
671                        log.error("Error: task is empty !");
672                        return false;
673                }
674               
675                try {
676                       
677                        //To for expected migrations
678                        int additionalProc = 0;
679                        int additionalMem = 0;
680                        if (runningTasks != null && migrateVM != null && migrateHost != null) {
681                                //check node is destination
682                                for (int i = 0 ; i < migrateHost.size() ; i++) {
683                                        String nodeName = migrateHost.get(i);
684                                        if (n.getFullName().equals(nodeName)) {
685                                                String migrateTaskId = migrateVM.get(i);
686                                                //get corresponding task
687                                                for (int j = 0 ; j<runningTasks.size() ; j++) {
688                                                        if (runningTasks.get(j).getJobId().equals(migrateTaskId)) {
689                                                                additionalProc+=runningTasks.get(j).getCpuCntRequest();
690                                                                additionalMem+=runningTasks.get(j).getMemoryRequest();
691                                                        }
692                                                }
693                                        }
694                                }
695                               
696                        }
697                        //For each processor check whether it can fit
698                        if (n.getFreeMemory() >= memRequest+additionalMem && n.getFreeProcessorsNumber() >= cpuRequest+additionalProc) {
699                                return true;
700                        }
701                       
702                } catch (NoSuchFieldException e) {
703                        // TODO Auto-generated catch block
704                        e.printStackTrace();
705                }
706                return false;
707        }
708
709        private Node findLeastLoadedResource(List<Node> nodes ){
710                Node leastLoadedNode = null;
711                double minLoad = Double.MAX_VALUE;
712               
713                for(int i = 0; i < nodes.size(); i++){
714                        Node node = nodes.get(i);
715                        double totalLoad = node.getLoadInterface().getRecentUtilization().getValue();
716                        if(totalLoad < minLoad){
717                                leastLoadedNode= node;
718                                minLoad = totalLoad;
719                        }
720                }
721                return leastLoadedNode;
722        }
723
724        private void sendBackTask(TaskInterface<?> task, JobRegistry jobRegistry, TaskQueue q, String clusterName) {
725                try {
726                        PrintWriter writer = new PrintWriter(new FileWriter(TASK_REJECTIONS, true));
727                        //Task id / Cluster name
728                        writer.write(String.format("%s\t%s\n", task.getJobId(), clusterName));
729                        writer.close();
730                } catch (IOException e) {
731                        // TODO Auto-generated catch block
732                        e.printStackTrace();
733                }
734                log.debug("Task: "+task.getJobId() + " sent back to GridBroker");
735                try {
736                        //Unsubmit task
737                        jobRegistry.getJobInfo(task.getJobId()).getTask(task.getId()).setStatus((int)BrokerConstants.TASK_STATUS_UNSUBMITTED);
738
739                } catch (Exception e) {
740                        // TODO Auto-generated catch block
741                        e.printStackTrace();
742                }
743                q.remove(task);
744                synchronized(jobRegistry) {
745                        jobRegistry.getTasks().remove(task);
746                }
747                DataCenterWorkloadSimulator.getEventManager().send("grid", 0, GridSimTags.GRIDLET_SUBMIT, jobRegistry.getJobInfo(task.getJobId()));
748               
749        }
750}
Note: See TracBrowser for help on using the repository browser.