Ignore:
Timestamp:
10/09/12 13:58:06 (13 years ago)
Author:
wojtekp
Message:
 
File:
1 edited

Legend:

Unmodified
Added
Removed
  • DCWoRMS/trunk/src/schedframe/scheduling/policy/local/LocalManagementSystem.java

    r481 r490  
    11package schedframe.scheduling.policy.local; 
    22 
     3import dcworms.schedframe.scheduling.ExecTask; 
     4import dcworms.schedframe.scheduling.Executable; 
    35import eduni.simjava.Sim_event; 
    46import eduni.simjava.Sim_system; 
     
    79import gridsim.gssim.DCWormsTags; 
    810import gridsim.gssim.filter.ExecTaskFilter; 
    9 import gssim.schedframe.scheduling.ExecTask; 
    10 import gssim.schedframe.scheduling.Executable; 
    1111 
    1212import java.util.ArrayList; 
     
    4141import schedframe.scheduling.ResourceHistoryItem; 
    4242import schedframe.scheduling.Scheduler; 
     43import schedframe.scheduling.TaskList; 
    4344import schedframe.scheduling.TaskListImpl; 
    44 import schedframe.scheduling.UsedResourceList; 
     45import schedframe.scheduling.UsedResourcesList; 
    4546import schedframe.scheduling.WorkloadUnitHandler; 
    4647import schedframe.scheduling.manager.resources.LocalResourceManager; 
     
    7879 
    7980                super(providerId, entityName, execTimeEstimationPlugin, queues); 
    80  
    81                 //schedulingPlugin = (LocalSchedulingPlugin) InstanceFactory.createInstance(schedulingPluginClassName, LocalSchedulingPlugin.class); 
    8281                 
    8382                if (schedPlugin == null) { 
     
    8584                } 
    8685                this.schedulingPlugin =  schedPlugin; 
    87                 accTotalLoad = new Accumulator(); 
    88                 moduleList = new ModuleListImpl(1); 
    89  
     86                this.moduleList = new ModuleListImpl(1); 
     87                 
     88                this.accTotalLoad = new Accumulator(); 
    9089        } 
    9190 
    9291        public void init(Scheduler sched, ManagedResources managedResources) { 
    9392                super.init(sched, managedResources); 
    94                 //scheduler = sched; 
    95                 //resourceManager = ResourceManagerFactory.createResourceManager(scheduler); 
    9693                double load = 0; 
    9794                accTotalLoad.add(load); 
     
    115112                        } 
    116113                        sendTimerEvent(); 
    117  
    118114                        break; 
    119115 
    120116                case DCWormsTags.TASK_READY_FOR_EXECUTION: 
    121117                         
    122                         ExecTask data = (ExecTask) ev.get_data(); 
    123                         try { 
    124                                 data.setStatus(DCWormsTags.READY); 
     118                        ExecTask execTask = (ExecTask) ev.get_data(); 
     119                        try { 
     120                                execTask.setStatus(DCWormsTags.READY); 
    125121                                if (pluginSupportsEvent(tag)) { 
    126                                         SchedulingEvent event = new StartTaskExecutionEvent(data.getJobId(), data.getId()); 
     122                                        SchedulingEvent event = new StartTaskExecutionEvent(execTask.getJobId(), execTask.getId()); 
    127123                                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event, 
    128124                                                        queues,  getJobRegistry(), getResourceManager(), moduleList); 
     
    136132                case DCWormsTags.TASK_EXECUTION_FINISHED: 
    137133                        obj = ev.get_data(); 
    138                         ExecTask exec = (ExecTask) obj; 
    139                         if (exec.getStatus() == DCWormsTags.INEXEC) { 
    140                                 finalizeExecutable(exec); 
    141  
    142                                 sendFinishedWorkloadUnit(exec); 
    143                                 //task.setGridletStatus(Gridlet.SUCCESS); 
    144                                 //task.finalizeGridlet(); 
    145                                 log.debug(exec.getJobId() + "_" + exec.getId() + " finished execution on " + new DateTime()); 
     134                        execTask = (ExecTask) obj; 
     135                        if (execTask.getStatus() == DCWormsTags.INEXEC) { 
     136                                 
     137                                finalizeExecutable(execTask); 
     138                                sendFinishedWorkloadUnit(execTask); 
     139                                log.debug(execTask.getJobId() + "_" + execTask.getId() + " finished execution on " + new DateTime()); 
    146140                                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size())); 
    147                                 /*UsedResourceList<ResourceHistoryItem> lastUsedList = task.getUsedResources(); 
    148                                 Map<ResourceUnitName, AbstractResourceUnit> lastUsed = lastUsedList.getLast() 
    149                                                 .getResourceUnits(); 
    150                                 getAllocationManager().freeResources(lastUsed); 
    151                                 ProcessingElements pes = (ProcessingElements) lastUsed.get(StandardResourceUnitName.PROCESSINGELEMENTS); 
    152                                 for (ComputingResource resource : pes) { 
    153                                         resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, task)); 
     141                                if (pluginSupportsEvent(tag)) { 
     142                                        SchedulingEvent event = new TaskFinishedEvent(execTask.getJobId(), execTask.getId()); 
     143                                        SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 
     144                                                        queues, getJobRegistry(), getResourceManager(), moduleList); 
     145                                        executeSchedulingPlan(decision); 
    154146                                } 
    155                                 SubTaskFilter filter = new SubTaskFilter(task.getGridletID(), GssimTags.TASK_REQUESTED_TIME_EXPIRED); 
    156                                 scheduler.sim_cancel(filter, null); 
    157                                 super.sendFinishJob((Executable) task.getGridlet());*/ 
    158                         } 
     147                        } 
     148 
     149                        Job job = jobRegistry.getJob(execTask.getJobId()); 
     150                        if(!job.isFinished()){ 
     151                                getWorkloadUnitHandler().handleJob(job); 
     152                        } 
     153                        break; 
     154                         
     155                case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED: 
     156                        obj = ev.get_data(); 
     157                        execTask = (Executable) obj; 
    159158                        if (pluginSupportsEvent(tag)) { 
    160                                 SchedulingEvent event = new TaskFinishedEvent(exec.getJobId(), exec.getId()); 
     159                                SchedulingEvent event = new TaskRequestedTimeExpiredEvent(execTask.getJobId(), execTask.getId()); 
    161160                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 
    162161                                                queues, getJobRegistry(), getResourceManager(), moduleList); 
    163162                                executeSchedulingPlan(decision); 
    164163                        } 
    165                         Job job = jobRegistry.getJob(exec.getJobId()); 
    166                         if(!job.isFinished()){ 
    167                                 getWorkloadUnitHandler().handleJob(job); 
    168                         } 
    169  
    170164                        break; 
    171                 case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED: 
    172                         obj = ev.get_data(); 
    173                         exec = (Executable) obj; 
    174                         if (pluginSupportsEvent(tag)) { 
    175                                 SchedulingEvent event = new TaskRequestedTimeExpiredEvent(exec.getJobId(), exec.getId()); 
    176                                 SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 
    177                                                 queues, getJobRegistry(), getResourceManager(), moduleList); 
    178                                 executeSchedulingPlan(decision); 
    179                         } 
    180  
    181                         break; 
     165                         
    182166                case DCWormsTags.UPDATE: 
    183167                        updateProcessingTimes(ev); 
     
    185169                } 
    186170        } 
    187          
    188171 
    189172        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) { 
     
    223206                                } else { 
    224207                                        ExecTask exec = (ExecTask) task; 
    225                                         executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), (ExecTask)task)); 
     208                                        executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), exec)); 
    226209                                } 
    227210                        } 
     
    237220                        return; 
    238221                removeFromQueue(task); 
    239                 //double completionPercentage = (submittedTask.getLength() - submittedTask.getRemainingGridletLength())/submittedTask.getLength(); 
     222 
    240223                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION); 
    241224                int time = Double.valueOf( 
     
    251234                ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime); 
    252235                exec.addUsedResources(resHistItem); 
    253  
    254                 scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, 
    255                                 exec); 
     236                try { 
     237                        exec.setStatus(DCWormsTags.INEXEC); 
     238                } catch (Exception e) { 
     239                        // TODO Auto-generated catch block 
     240                        e.printStackTrace(); 
     241                } 
     242                scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, exec); 
    256243 
    257244                try { 
     
    262249                        scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec); 
    263250                } 
    264                  
    265                 try { 
    266                         exec.setStatus(DCWormsTags.INEXEC); 
    267                 } catch (Exception e1) { 
    268                         // TODO Auto-generated catch block 
    269                         e1.printStackTrace(); 
    270                 } 
     251 
    271252                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size())); 
    272253                 
    273254                PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE); 
    274                 if(peUnit instanceof ProcessingElements){ 
     255                 
     256                notifyComputingResources(peUnit, EnergyEventType.TASK_STARTED, exec); 
     257                 
     258                /*if(peUnit instanceof ProcessingElements){ 
    275259                        ProcessingElements pes = (ProcessingElements) peUnit; 
    276260                        for (ComputingResource resource : pes) { 
    277                                 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 
     261                                resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, exec)); 
    278262                        } 
    279263                } else { 
     
    282266                                resource = ResourceController.getComputingResourceByName(peUnit.getResourceId()); 
    283267                        } catch (ResourceException e) { 
    284                                  
    285                         } 
    286                         resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 
    287                 } 
    288                 /*ProcessingElements pes = (ProcessingElements) choosenResources.get(StandardResourceUnitName.PE); 
    289                 for (ComputingResource resource : pes) { 
    290                         resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, submittedTask)); 
    291                 }*/ 
     268                                return; 
     269                        } 
     270                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, exec)); 
     271                } 
     272*/ 
    292273 
    293274                /*for(ExecTaskInterface etask : jobRegistry.getRunningTasks()){ 
     
    301282                 
    302283                Executable exec = (Executable)execTask; 
    303                 try { 
    304                         exec.setStatus(DCWormsTags.SUCCESS); 
    305                 } catch (Exception e1) { 
    306                         // TODO Auto-generated catch block 
    307                         e1.printStackTrace(); 
    308                 } 
    309284                exec.finalizeExecutable(); 
    310                 UsedResourceList<ResourceHistoryItem> lastUsedList = exec.getUsedResources(); 
    311                 Map<ResourceUnitName, ResourceUnit> lastUsed = lastUsedList.getLast() 
    312                                 .getResourceUnits(); 
    313                 getAllocationManager().freeResources(lastUsed); 
    314                  
    315                 PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE); 
    316                 if(peUnit instanceof ProcessingElements){ 
    317                         ProcessingElements pes = (ProcessingElements) peUnit; 
    318                         for (ComputingResource resource : pes) { 
    319                                 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 
    320                         } 
    321                 } else { 
    322                         ComputingResource resource = null; 
    323                         try { 
    324                                 resource = ResourceController.getComputingResourceByName(peUnit.getResourceId()); 
    325                         } catch (ResourceException e) { 
    326                                  
    327                         } 
    328                         resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 
    329                 } 
    330                 /*ProcessingElements pes = (ProcessingElements) lastUsed.get(StandardResourceUnitName.PE); 
    331                 for (ComputingResource resource : pes) { 
    332                         resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, subTask)); 
    333                 }*/ 
     285                 
    334286                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED); 
    335287                scheduler.sim_cancel(filter, null); 
    336288                 
    337  
     289                Task task; 
    338290                Job job = jobRegistry.getJob(exec.getJobId()); 
    339  
    340                 Task task = null; 
    341291                try { 
    342292                        task = job.getTask(exec.getTaskId()); 
    343293                } catch (NoSuchFieldException e) { 
    344                         e.printStackTrace(); 
     294                        return; 
    345295                } 
    346296                if(exec.getProcessesId() == null){ 
     
    360310                        } 
    361311                } 
     312                 
     313                UsedResourcesList lastUsedList = exec.getUsedResources(); 
     314                Map<ResourceUnitName, ResourceUnit> lastUsed = lastUsedList.getLast() 
     315                                .getResourceUnits(); 
     316                getAllocationManager().freeResources(lastUsed); 
     317                 
     318                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE); 
     319                notifyComputingResources(peUnit, EnergyEventType.TASK_FINISHED, exec); 
     320                /*if(peUnit instanceof ProcessingElements){ 
     321                        ProcessingElements pes = (ProcessingElements) peUnit; 
     322                        for (ComputingResource resource : pes) { 
     323                                resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 
     324                        } 
     325                } else { 
     326                        ComputingResource resource = null; 
     327                        try { 
     328                                resource = ResourceController.getComputingResourceByName(peUnit.getResourceId()); 
     329                        } catch (ResourceException e) { 
     330                                return; 
     331                        } 
     332                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 
     333                }*/ 
     334 
    362335                //sendFinishedWorkloadUnit(executable); 
    363336        } 
     
    374347                        ExecTask task = iter.next(); 
    375348                        Executable exec = (Executable)task; 
    376                         UsedResourceList<ResourceHistoryItem> usedResourcesList = exec.getUsedResources(); 
    377                         ResourceUnit unit = usedResourcesList.getLast().getResourceUnits() 
     349                        UsedResourcesList usedResourcesList = exec.getUsedResources(); 
     350                        PEUnit peUnit = (PEUnit)usedResourcesList.getLast().getResourceUnits() 
    378351                                        .get(StandardResourceUnitName.PE); 
    379352 
    380                         double load = getMIShare(timeSpan, (PEUnit) unit); 
     353                        double load = getMIShare(timeSpan, peUnit); 
    381354                        exec.setCompletionPercentage(100 * timeSpan/exec.getEstimatedDuration()); 
    382355                        addTotalLoad(load); 
    383356                } 
    384357        } 
    385  
     358        private void notifyComputingResources(PEUnit peUnit, EnergyEventType eventType, Object obj){ 
     359 
     360                if(peUnit instanceof ProcessingElements){ 
     361                        ProcessingElements pes = (ProcessingElements) peUnit; 
     362                        for (ComputingResource resource : pes) { 
     363                                resource.handleEvent(new EnergyEvent(eventType, obj)); 
     364                        } 
     365                } else { 
     366                        ComputingResource resource = null; 
     367                        try { 
     368                                resource = ResourceController.getComputingResourceByName(peUnit.getResourceId()); 
     369                        } catch (ResourceException e) { 
     370                                return; 
     371                        } 
     372                        resource.handleEvent(new EnergyEvent(eventType, obj)); 
     373                } 
     374        } 
     375         
    386376        private double getMIShare(double timeSpan, PEUnit pes) { 
    387377                double localLoad; 
     
    402392        protected void updateProcessingTimes(Sim_event ev) { 
    403393                updateProcessingProgress(); 
    404                 for (ExecTask task : jobRegistry.getRunningTasks()) { 
    405                         Executable exec = (Executable)task; 
     394                for (ExecTask execTask : jobRegistry.getRunningTasks()) { 
     395                        Executable exec = (Executable)execTask; 
    406396                        List<String> visitedResource = exec.getVisitedResources(); 
    407397                        String originResource = ev.get_data().toString(); 
     
    411401                         
    412402                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getUsedResources().getLast().getResourceUnits(); 
    413                         //double completionPercentage = (task.getLength() - subTask.getRemainingGridletLength())/task.getLength(); 
    414403                        double time = execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED),  
    415                                         task, choosenResources, exec.getCompletionPercentage()); 
    416  
    417                         /*if(!subTask.getVisitedResources().contains(ev.get_data().toString())) { 
    418                                 continue; 
    419                         }*/ 
     404                                        execTask, choosenResources, exec.getCompletionPercentage()); 
     405 
    420406                        //check if the new estimated end time is equal to the previous one; if yes the continue without update 
    421407                        if( DoubleMath.subtract((exec.getExecStartTime() + exec.getEstimatedDuration()), (new DateTime().getMillis()/1000 + time)) == 0.0){ 
     
    424410                        ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED); 
    425411                        scheduler.sim_cancel(filter, null); 
    426                         scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, task); 
    427  
     412                        scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, execTask); 
    428413                } 
    429414        }        
     
    443428                                numberOfPE = numberOfPE + resUnit.getAmount(); 
    444429                        } 
    445                         //numberOfPE = getResourceManager().getPE().size(); 
    446430                } catch (Exception e) { 
    447431                        numberOfPE = 1; 
     
    464448                        ExecTask task) { 
    465449 
     450                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>(); 
    466451                ResourceManager resourceManager = this.resourceManager; 
    467452                if(resourceName != null){ 
     
    472457                                return null; 
    473458                        } 
    474  
    475459                        resourceManager = new LocalResourceManager(resource); 
    476460                } 
    477                 Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>(); 
    478  
    479461 
    480462                int cpuRequest; 
     
    485467                } 
    486468 
    487                 //PEUnit processingUnits = null; 
    488469                if (cpuRequest != 0) { 
    489470                         
     
    494475                                return null; 
    495476                        } 
     477                         
    496478                        List<ResourceUnit> choosenPEUnits = new ArrayList<ResourceUnit>(); 
    497  
    498479                        for (int i = 0; i < availableUnits.size() && cpuRequest > 0; i++) { 
    499480                                PEUnit peUnit = (PEUnit) availableUnits .get(i); 
     
    508489                                return null; 
    509490                        } 
    510  
    511491                        map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0)); 
    512492                } 
     
    529509        class LocalWorkloadUnitHandler implements WorkloadUnitHandler{ 
    530510                 
    531                 public void handleJob(Job job){ 
     511                public void handleJob(JobInterface<?> job){ 
    532512 
    533513                        if (log.isInfoEnabled()) 
     
    536516                        List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>(); 
    537517                        jobsList.add(job); 
    538                         TaskListImpl readyTasks = new TaskListImpl(); 
     518                        TaskListImpl availableTasks = new TaskListImpl(); 
    539519                        for(Task task: jobRegistry.getAvailableTasks(jobsList)){ 
    540520                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED); 
    541                                 readyTasks.add(task); 
    542                         } 
    543  
    544                         for(WorkloadUnit e:readyTasks){  
    545                                 registerWorkloadUnit(e); 
    546                         } 
    547                 } 
    548                  
    549                 public void handleTask(TaskInterface<?> ti){ 
    550                         Task task = (Task)ti; 
     521                                availableTasks.add(task); 
     522                        } 
     523 
     524                        for(TaskInterface<?> task: availableTasks){      
     525                                registerWorkloadUnit(task); 
     526                        } 
     527                } 
     528                 
     529                public void handleTask(TaskInterface<?> t){ 
     530                        Task task = (Task)t; 
    551531                        List<AbstractProcesses> processes = task.getProcesses(); 
    552532 
     
    564544                 
    565545                public void handleExecutable(ExecTask task){ 
     546                         
    566547                        Executable exec = (Executable) task; 
    567  
    568                         // int cost = 
    569                         // this.resourceManager.getResourceCharacteristic().getResUnits() != 
    570                         // null ? 
    571                         // this.resourceManager.getResourceCharacteristic().getResUnits().get(ResourceParameterName.COST).getAmount() 
    572                         // : 1; 
    573  
    574                         exec.visitResource(scheduler.get_name()); 
     548                        jobRegistry.addExecTask(exec); 
     549                         
     550                        exec.trackResource(scheduler.get_name()); 
    575551                        Scheduler parentScheduler = scheduler.getParent(); 
    576                         while (parentScheduler != null && !exec.getVisitedResources().contains(parentScheduler.get_name())) { 
    577                                 exec.visitResource(parentScheduler.get_name()); 
     552                        List<String> visitedResource = exec.getVisitedResources(); 
     553                        String [] visitedResourcesArray = visitedResource.toArray(new String[visitedResource.size()]); 
     554                        while (parentScheduler != null && !ArrayUtils.contains(visitedResourcesArray, parentScheduler.get_name())) { 
     555                                exec.trackResource(parentScheduler.get_name()); 
    578556                                parentScheduler = parentScheduler.getParent(); 
    579557                        } 
    580                          
    581558                        exec.setSchedulerName(scheduler.get_id()); 
    582                         jobRegistry.addExecTask(exec); 
    583                         TaskListImpl newTasks = new TaskListImpl(); 
     559                         
     560                        TaskList newTasks = new TaskListImpl(); 
    584561                        newTasks.add(exec); 
    585562                 
Note: See TracChangeset for help on using the changeset viewer.