Ignore:
Timestamp:
10/08/12 10:23:45 (13 years ago)
Author:
wojtekp
Message:
 
File:
1 edited

Legend:

Unmodified
Added
Removed
  • DCWoRMS/trunk/src/schedframe/scheduling/policy/local/LocalManagementSystem.java

    r480 r481  
    44import eduni.simjava.Sim_system; 
    55import gridsim.Accumulator; 
    6 import gridsim.GridSimTags; 
    7 import gridsim.Gridlet; 
    86import gridsim.ResourceCalendar; 
    9 import gridsim.gssim.WormsTags; 
    10 import gridsim.gssim.filter.SubTaskFilter; 
     7import gridsim.gssim.DCWormsTags; 
     8import gridsim.gssim.filter.ExecTaskFilter; 
    119import gssim.schedframe.scheduling.ExecTask; 
    1210import gssim.schedframe.scheduling.Executable; 
     
    2725import qcg.shared.constants.BrokerConstants; 
    2826import schedframe.ResourceController; 
    29 import schedframe.events.scheduling.EventReason; 
    3027import schedframe.events.scheduling.SchedulingEvent; 
    3128import schedframe.events.scheduling.SchedulingEventType; 
    3229import schedframe.events.scheduling.StartTaskExecutionEvent; 
    33 import schedframe.events.scheduling.TaskCanceledEvent; 
    3430import schedframe.events.scheduling.TaskFinishedEvent; 
    3531import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent; 
     
    4541import schedframe.scheduling.ResourceHistoryItem; 
    4642import schedframe.scheduling.Scheduler; 
     43import schedframe.scheduling.TaskListImpl; 
    4744import schedframe.scheduling.UsedResourceList; 
    4845import schedframe.scheduling.WorkloadUnitHandler; 
    49 import schedframe.scheduling.WorkloadUnitListImpl; 
    5046import schedframe.scheduling.manager.resources.LocalResourceManager; 
    5147import schedframe.scheduling.manager.resources.ManagedResources; 
     
    6359import schedframe.scheduling.tasks.Job; 
    6460import schedframe.scheduling.tasks.JobInterface; 
    65 import schedframe.scheduling.tasks.SubmittedTask; 
    6661import schedframe.scheduling.tasks.Task; 
    6762import schedframe.scheduling.tasks.TaskInterface; 
    6863import schedframe.scheduling.tasks.WorkloadUnit; 
    69 import simulator.WormsConstants; 
     64import simulator.DCWormsConstants; 
    7065import simulator.utils.DoubleMath; 
    7166 
     
    112107                switch (tag) { 
    113108 
    114                 case WormsTags.TIMER: 
     109                case DCWormsTags.TIMER: 
    115110                        if (pluginSupportsEvent(tag)) { 
    116111                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER); 
    117                                 SchedulingPlanInterface decision =  schedulingPlugin.schedule(event,  
     112                                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,  
    118113                                                queues,  getJobRegistry(), getResourceManager(), moduleList); 
    119114                                executeSchedulingPlan(decision); 
     
    123118                        break; 
    124119 
    125                 case WormsTags.TASK_READY_FOR_EXECUTION: 
     120                case DCWormsTags.TASK_READY_FOR_EXECUTION: 
    126121                         
    127122                        ExecTask data = (ExecTask) ev.get_data(); 
    128123                        try { 
    129                                 data.setStatus(Gridlet.READY); 
     124                                data.setStatus(DCWormsTags.READY); 
    130125                                if (pluginSupportsEvent(tag)) { 
    131126                                        SchedulingEvent event = new StartTaskExecutionEvent(data.getJobId(), data.getId()); 
    132                                         SchedulingPlanInterface decision =  schedulingPlugin.schedule(event, 
     127                                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event, 
    133128                                                        queues,  getJobRegistry(), getResourceManager(), moduleList); 
    134129                                        executeSchedulingPlan(decision); 
     
    139134                        break; 
    140135 
    141                 case WormsTags.TASK_EXECUTION_FINISHED: 
     136                case DCWormsTags.TASK_EXECUTION_FINISHED: 
    142137                        obj = ev.get_data(); 
    143                         ExecTask task = (ExecTask) obj; 
    144                         if (task.getStatus() == Gridlet.INEXEC) { 
    145                                 finalizeExecutable(task); 
    146                                 SubmittedTask subTask = (SubmittedTask)task; 
    147                                 sendFinishedWorkloadUnit((ExecTask)subTask.getGridlet()); 
     138                        ExecTask exec = (ExecTask) obj; 
     139                        if (exec.getStatus() == DCWormsTags.INEXEC) { 
     140                                finalizeExecutable(exec); 
     141 
     142                                sendFinishedWorkloadUnit(exec); 
    148143                                //task.setGridletStatus(Gridlet.SUCCESS); 
    149144                                //task.finalizeGridlet(); 
    150                                 log.debug(task.getJobId() + "_" + task.getId() + " finished execution on " + new DateTime()); 
    151                                 log.info(WormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size())); 
     145                                log.debug(exec.getJobId() + "_" + exec.getId() + " finished execution on " + new DateTime()); 
     146                                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size())); 
    152147                                /*UsedResourceList<ResourceHistoryItem> lastUsedList = task.getUsedResources(); 
    153148                                Map<ResourceUnitName, AbstractResourceUnit> lastUsed = lastUsedList.getLast() 
     
    163158                        } 
    164159                        if (pluginSupportsEvent(tag)) { 
    165                                 SchedulingEvent event = new TaskFinishedEvent(task.getJobId(), task.getId()); 
    166                                 SchedulingPlanInterface decision = schedulingPlugin.schedule(event, 
     160                                SchedulingEvent event = new TaskFinishedEvent(exec.getJobId(), exec.getId()); 
     161                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 
    167162                                                queues, getJobRegistry(), getResourceManager(), moduleList); 
    168163                                executeSchedulingPlan(decision); 
    169164                        } 
    170                         Job job = jobRegistry.getJob(task.getJobId()); 
     165                        Job job = jobRegistry.getJob(exec.getJobId()); 
    171166                        if(!job.isFinished()){ 
    172167                                getWorkloadUnitHandler().handleJob(job); 
     
    174169 
    175170                        break; 
    176                 case WormsTags.TASK_REQUESTED_TIME_EXPIRED: 
     171                case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED: 
    177172                        obj = ev.get_data(); 
    178                         task = (SubmittedTask) obj; 
     173                        exec = (Executable) obj; 
    179174                        if (pluginSupportsEvent(tag)) { 
    180                                 SchedulingEvent event = new TaskRequestedTimeExpiredEvent(task.getJobId(), task.getId()); 
    181                                 SchedulingPlanInterface decision = schedulingPlugin.schedule(event, 
     175                                SchedulingEvent event = new TaskRequestedTimeExpiredEvent(exec.getJobId(), exec.getId()); 
     176                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event, 
    182177                                                queues, getJobRegistry(), getResourceManager(), moduleList); 
    183178                                executeSchedulingPlan(decision); 
     
    185180 
    186181                        break; 
    187                 case WormsTags.UPDATE: 
     182                case DCWormsTags.UPDATE: 
    188183                        updateProcessingTimes(ev); 
    189184                        break; 
     
    193188 
    194189        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) { 
    195                 if (pluginSupportsEvent(WormsTags.TASK_EXECUTION_FINISHED)) { 
     190                if (pluginSupportsEvent(DCWormsTags.TASK_EXECUTION_FINISHED)) { 
    196191                        SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED); 
    197                         SchedulingPlanInterface decision =  schedulingPlugin.schedule(event, 
     192                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event, 
    198193                                        queues, getJobRegistry(), getResourceManager(), moduleList); 
    199194                        executeSchedulingPlan(decision); 
     
    203198                //} 
    204199        } 
    205  
    206         public void notifyCanceledWorkloadUnit(WorkloadUnit job) { 
    207  
    208                 if (!pluginSupportsEvent(GridSimTags.GRIDLET_CANCEL)) 
    209                         return; 
    210  
    211                 Executable executable = (Executable) job; 
    212                 String jobID = executable.getJobId(); 
    213  
    214                 SchedulingPlanInterface decision = null; 
    215  
    216                 try { 
    217                         executable.setStatus((int) BrokerConstants.JOB_STATUS_CANCELED); 
    218  
    219                         TaskCanceledEvent event = new TaskCanceledEvent(executable.getJobId(), executable.getTaskId()); 
    220                         event.setReason(EventReason.RESERVATION_EXCEEDED); 
    221                         decision = schedulingPlugin 
    222                                         .schedule(event, queues, getJobRegistry(), getResourceManager(), moduleList); 
    223  
    224                         if (decision == null) 
    225                                 return; 
    226  
    227                         executeSchedulingPlan(decision); 
    228  
    229                 } catch (Exception e) { 
    230                         log.error("Exception during scheduling. " + e.getMessage()); 
    231                         e.printStackTrace(); 
    232                 } 
    233         } 
    234          
    235         protected void executeSchedulingPlan(SchedulingPlanInterface decision) { 
    236  
    237                 ArrayList<ScheduledTaskInterface> taskSchedulingDecisions = decision.getTasks(); 
     200         
     201        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) { 
     202 
     203                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks(); 
    238204                for (int i = 0; i < taskSchedulingDecisions.size(); i++) { 
    239                         try { 
    240                                 ScheduledTaskInterface taskDecision = taskSchedulingDecisions.get(i); 
    241  
    242                                 // not scheduled again are returned to the user. 
    243                                 if (taskDecision.getStatus() == AllocationStatus.REJECTED) { 
    244                                         continue; 
     205                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i); 
     206 
     207                        if (taskDecision.getStatus() == AllocationStatus.REJECTED) { 
     208                                continue; 
     209                        } 
     210 
     211                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations(); 
     212 
     213                        TaskInterface<?> task = taskDecision.getTask(); 
     214                        for (int j = 0; j < allocations.size(); j++) { 
     215 
     216                                AllocationInterface<?> allocation = allocations.get(j); 
     217                                if (allocation.isProcessing()) { 
     218                                        ExecTask exec = (ExecTask) task;                                         
     219                                        executeTask(exec, allocation.getRequestedResources()); 
     220                                } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){ 
     221                                        allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName())); 
     222                                        submitTask(task, allocation); 
     223                                } else { 
     224                                        ExecTask exec = (ExecTask) task; 
     225                                        executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), (ExecTask)task)); 
    245226                                } 
    246  
    247                                 ArrayList<AllocationInterface> allocations = taskDecision.getAllocations(); 
    248  
    249                                 WorkloadUnit task = taskDecision.getTask(); 
    250                                 for (int j = 0; j < allocations.size(); j++) { 
    251  
    252                                         AllocationInterface allocation = allocations.get(j); 
    253                                         if (allocation.isProcessing()) { 
    254                                                  
    255                                                 ExecTask exec = (ExecTask) task; 
    256                                                  
    257  
    258                                                 //Executable e = (Executable)task; 
    259                                                 /*SubmittedTask submittedTask = jobRegistry.getSubmittedTask(e.getJobId(), e.getId()); 
    260                                                 if(submittedTask == null) 
    261                                                 {       submittedTask = new SubmittedTask(e); 
    262                                                         jobRegistry.addTask(submittedTask); 
    263                                                 }*/ 
    264  
    265                                                 /*e.visitResource(scheduler.get_name()); 
    266                                                 Scheduler parentScheduler = scheduler.getParent(); 
    267                                                 while (parentScheduler != null && !e.getVisitedResources().contains(parentScheduler.get_name())) { 
    268                                                         e.visitResource(parentScheduler.get_name()); 
    269                                                         parentScheduler = parentScheduler.getParent(); 
    270                                                 }*/ 
    271                                                  
    272                                                                                                  
    273                                                 executeTask(exec, allocation.getRequestedResources()); 
    274                                         //} else if(GridSim.getEntityId(allocation.getProviderName()) != -1 || scheduler.getScheduler(allocation.getProviderName())!=null){ 
    275                                         } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){ 
    276                                                 allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName())); 
    277                                                 submitWorkloadUnit(task, allocation); 
    278                                         } else { 
    279  
    280                                                 ExecTask exec = (ExecTask) task; 
    281                                                  
    282                                                         //Executable exec = jobRegistry.createExecutable(t, allocation); 
    283                                                         //exec.setResourceParameter(scheduler.get_id(), 1); 
    284                                                 /*e.visitResource(scheduler.get_name()); 
    285                                                 Scheduler parentScheduler = scheduler.getParent(); 
    286                                                 while (parentScheduler != null && !e.getVisitedResources().contains(parentScheduler.get_name())) { 
    287                                                         e.visitResource(parentScheduler.get_name()); 
    288                                                         parentScheduler = parentScheduler.getParent(); 
    289                                                 }*/ 
    290                                                 executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), (ExecTask)task)); 
    291                                         } 
    292                                 } 
    293  
    294                         } catch (Exception e) { 
    295                                 e.printStackTrace(); 
    296                         } 
     227                        } 
     228 
    297229                } 
    298230        } 
    299231 
    300232        protected void executeTask(ExecTask task, Map<ResourceUnitName, ResourceUnit> choosenResources) { 
    301         //      Executable exec = (Executable) task; 
    302          
    303                 SubmittedTask submittedTask = (SubmittedTask)task; 
     233 
     234                Executable exec = (Executable)task; 
    304235                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources); 
    305236                if(allocationStatus == false) 
    306237                        return; 
    307238                removeFromQueue(task); 
    308                 //SubmittedTask submittedTask = (SubmittedTask)task; 
    309                 /* submittedTask = jobRegistry.getSubmittedTask(exec.getJobId(), exec.getId()); 
    310                 if(submittedTask == null) 
    311                 {       submittedTask = new SubmittedTask(exec); 
    312                         jobRegistry.addTask(submittedTask); 
    313                 }*/ 
    314                 double completionPercentage = (submittedTask.getLength() - submittedTask.getRemainingGridletLength())/submittedTask.getLength(); 
     239                //double completionPercentage = (submittedTask.getLength() - submittedTask.getRemainingGridletLength())/submittedTask.getLength(); 
    315240                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION); 
    316241                int time = Double.valueOf( 
    317                                 execTimeEstimationPlugin.execTimeEstimation(event, task, choosenResources, completionPercentage)).intValue(); 
     242                                execTimeEstimationPlugin.execTimeEstimation(event, task, choosenResources, exec.getCompletionPercentage())).intValue(); 
    318243                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime() 
    319244                                + " will finish after " + time); 
     
    322247                        return; 
    323248 
    324                 submittedTask.setEstimatedDuration(time); 
     249                exec.setEstimatedDuration(time); 
    325250                DateTime currentTime = new DateTime(); 
    326251                ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime); 
    327                 submittedTask.addUsedResources(resHistItem); 
    328                 submittedTask.setFinishTime(currentTime.getMillis() / 1000); 
    329                  
    330                 jobRegistry.saveHistory(submittedTask, time, choosenResources); 
    331                  
    332                 scheduler.sendInternal(time, WormsTags.TASK_EXECUTION_FINISHED, 
    333                                 submittedTask); 
     252                exec.addUsedResources(resHistItem); 
     253 
     254                scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, 
     255                                exec); 
    334256 
    335257                try { 
    336                         long expectedDuration = submittedTask.getExpectedDuration().getMillis() / 1000; 
    337                         scheduler.sendInternal(expectedDuration, WormsTags.TASK_REQUESTED_TIME_EXPIRED, submittedTask); 
     258                        long expectedDuration = exec.getExpectedDuration().getMillis() / 1000; 
     259                        scheduler.sendInternal(expectedDuration, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec); 
    338260                } catch (NoSuchFieldException e) { 
    339                         double t = submittedTask.getEstimatedDuration(); 
    340                         scheduler.sendInternal(t, WormsTags.TASK_REQUESTED_TIME_EXPIRED, submittedTask); 
    341                 } 
    342                  
    343                 submittedTask.setGridletStatus(Gridlet.INEXEC); 
    344                 log.info(WormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size())); 
     261                        double t = exec.getEstimatedDuration(); 
     262                        scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec); 
     263                } 
     264                 
     265                try { 
     266                        exec.setStatus(DCWormsTags.INEXEC); 
     267                } catch (Exception e1) { 
     268                        // TODO Auto-generated catch block 
     269                        e1.printStackTrace(); 
     270                } 
     271                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size())); 
    345272                 
    346273                PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE); 
     
    348275                        ProcessingElements pes = (ProcessingElements) peUnit; 
    349276                        for (ComputingResource resource : pes) { 
    350                                 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, submittedTask)); 
     277                                resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 
    351278                        } 
    352279                } else { 
     
    357284                                 
    358285                        } 
    359                         resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, submittedTask)); 
     286                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 
    360287                } 
    361288                /*ProcessingElements pes = (ProcessingElements) choosenResources.get(StandardResourceUnitName.PE); 
     
    371298        } 
    372299         
    373         public void finalizeExecutable(ExecTask exec){ 
    374                  
    375                 SubmittedTask subTask = (SubmittedTask)exec; 
    376                 subTask.setGridletStatus(Gridlet.SUCCESS); 
    377                 subTask.finalizeGridlet(); 
    378                 UsedResourceList<ResourceHistoryItem> lastUsedList = subTask.getUsedResources(); 
     300        public void finalizeExecutable(ExecTask execTask){ 
     301                 
     302                Executable exec = (Executable)execTask; 
     303                try { 
     304                        exec.setStatus(DCWormsTags.SUCCESS); 
     305                } catch (Exception e1) { 
     306                        // TODO Auto-generated catch block 
     307                        e1.printStackTrace(); 
     308                } 
     309                exec.finalizeExecutable(); 
     310                UsedResourceList<ResourceHistoryItem> lastUsedList = exec.getUsedResources(); 
    379311                Map<ResourceUnitName, ResourceUnit> lastUsed = lastUsedList.getLast() 
    380312                                .getResourceUnits(); 
     
    385317                        ProcessingElements pes = (ProcessingElements) peUnit; 
    386318                        for (ComputingResource resource : pes) { 
    387                                 resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, subTask)); 
     319                                resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 
    388320                        } 
    389321                } else { 
     
    394326                                 
    395327                        } 
    396                         resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, subTask)); 
     328                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec)); 
    397329                } 
    398330                /*ProcessingElements pes = (ProcessingElements) lastUsed.get(StandardResourceUnitName.PE); 
     
    400332                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, subTask)); 
    401333                }*/ 
    402                 SubTaskFilter filter = new SubTaskFilter(subTask.getGridletID(), WormsTags.TASK_REQUESTED_TIME_EXPIRED); 
     334                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED); 
    403335                scheduler.sim_cancel(filter, null); 
    404336                 
    405                 Executable executable = (Executable) subTask.getGridlet(); 
    406                 Job job = jobRegistry.getJob(executable.getJobId()); 
     337 
     338                Job job = jobRegistry.getJob(exec.getJobId()); 
    407339 
    408340                Task task = null; 
    409341                try { 
    410                         task = job.getTask(executable.getTaskId()); 
     342                        task = job.getTask(exec.getTaskId()); 
    411343                } catch (NoSuchFieldException e) { 
    412344                        e.printStackTrace(); 
    413345                } 
    414                 if(executable.getProcessesId() == null){ 
     346                if(exec.getProcessesId() == null){ 
    415347                        try { 
    416                                 task.setStatus(executable.getStatus()); 
     348                                task.setStatus(exec.getStatus()); 
    417349                        } catch (Exception e) { 
    418350                                e.printStackTrace(); 
     
    422354                        for(int i = 0; i < processesList.size(); i++){ 
    423355                                AbstractProcesses processes = processesList.get(i); 
    424                                 if(processes.getId().equals(executable.getProcessesId())){ 
    425                                         processes.setStatus(executable.getStatus()); 
     356                                if(processes.getId().equals(exec.getProcessesId())){ 
     357                                        processes.setStatus(exec.getStatus()); 
    426358                                        break; 
    427359                                } 
     
    441373                while (iter.hasNext()) { 
    442374                        ExecTask task = iter.next(); 
    443                         SubmittedTask subTask = (SubmittedTask)task; 
    444                         UsedResourceList<ResourceHistoryItem> usedResourcesList = subTask.getUsedResources(); 
     375                        Executable exec = (Executable)task; 
     376                        UsedResourceList<ResourceHistoryItem> usedResourcesList = exec.getUsedResources(); 
    445377                        ResourceUnit unit = usedResourcesList.getLast().getResourceUnits() 
    446378                                        .get(StandardResourceUnitName.PE); 
    447379 
    448380                        double load = getMIShare(timeSpan, (PEUnit) unit); 
    449                         subTask.updateGridletFinishedSoFar(load); 
     381                        exec.setCompletionPercentage(100 * timeSpan/exec.getEstimatedDuration()); 
    450382                        addTotalLoad(load); 
    451383                } 
     
    471403                updateProcessingProgress(); 
    472404                for (ExecTask task : jobRegistry.getRunningTasks()) { 
    473                         SubmittedTask subTask = (SubmittedTask)task; 
    474                         List<String> visitedResource = subTask.getVisitedResources(); 
     405                        Executable exec = (Executable)task; 
     406                        List<String> visitedResource = exec.getVisitedResources(); 
    475407                        String originResource = ev.get_data().toString(); 
    476408                        if(!ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), originResource)){ 
     
    478410                        } 
    479411                         
    480                         Map<ResourceUnitName, ResourceUnit> choosenResources = subTask.getUsedResources().getLast().getResourceUnits(); 
    481                         double completionPercentage = (task.getLength() - subTask.getRemainingGridletLength())/task.getLength(); 
     412                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getUsedResources().getLast().getResourceUnits(); 
     413                        //double completionPercentage = (task.getLength() - subTask.getRemainingGridletLength())/task.getLength(); 
    482414                        double time = execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED),  
    483                                         task, choosenResources, completionPercentage); 
     415                                        task, choosenResources, exec.getCompletionPercentage()); 
    484416 
    485417                        /*if(!subTask.getVisitedResources().contains(ev.get_data().toString())) { 
     
    487419                        }*/ 
    488420                        //check if the new estimated end time is equal to the previous one; if yes the continue without update 
    489                         if( DoubleMath.subtract((subTask.getExecStartTime() + subTask.getEstimatedDuration()), (new DateTime().getMillis()/1000 + time)) == 0.0){ 
     421                        if( DoubleMath.subtract((exec.getExecStartTime() + exec.getEstimatedDuration()), (new DateTime().getMillis()/1000 + time)) == 0.0){ 
    490422                                continue; 
    491423                        } 
    492                         SubTaskFilter filter = new SubTaskFilter(subTask.getGridletID(), WormsTags.TASK_EXECUTION_FINISHED); 
     424                        ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED); 
    493425                        scheduler.sim_cancel(filter, null); 
    494                         scheduler.sendInternal(time, WormsTags.TASK_EXECUTION_FINISHED, task); 
     426                        scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, task); 
    495427 
    496428                } 
     
    577509                        } 
    578510 
    579                         /*try { 
    580                                 List<? extends ComputingResource> processingElements = resourceManager.getResourcesOfType(StandardResourceType.Processor); 
    581                                 List<ComputingResource> choosenResources = new ArrayList<ComputingResource>(); 
    582                                 int peSize = processingElements.size(); 
    583                                 for (int i = 0; i < peSize && cpuRequest > 0; i++) { 
    584                                         if (processingElements.get(i).getStatus() == ResourceStatus.FREE) { 
    585                                                 choosenResources.add(processingElements.get(i)); 
    586                                                 cpuRequest--; 
    587                                         } 
    588                                 } 
    589                                 if (cpuRequest > 0) 
    590                                 {        
    591                                         return null; 
    592                                 } 
    593                                 processingUnits = new ProcessingElements(choosenResources); 
    594                         } catch (Exception e) { 
    595          
    596                                 List<ResourceUnit> procResUnit = resourceManager.getDistributedResourceUnits(StandardResourceUnitName.PE); 
    597  
    598                                 for(ResourceUnit resUnit: procResUnit){ 
    599                                         if (resUnit.getFreeAmount() >= cpuRequest) 
    600                                         {        
    601                                                 processingUnits = new PEUnit(resUnit.getResourceId(), cpuRequest, cpuRequest); 
    602                                                 break; 
    603                                         }  
    604                                 } 
    605                         }*/ 
    606511                        map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0)); 
    607512                } 
    608                 /*int memoryRequest; 
    609                 try { 
    610                         memoryRequest = Double.valueOf(task.getMemoryRequest()).intValue(); 
    611                 } catch (NoSuchFieldException e) { 
    612                         memoryRequest = 0; 
    613                 } 
    614                 if (memoryRequest != 0) { 
    615                         List<ResourceUnit> resUnit = resourceManager.getSharedResourceUnits().get(StandardResourceUnitName.MEMORY); 
    616  
    617                         Memory memory = null; 
    618                         for (ResourceUnit memUnit : resUnit) { 
    619                                 Memory m = (Memory) memUnit; 
    620  
    621                                 if (m.getFreeAmount() >= memoryRequest) {        
    622                                         System.out.println(m.getResourceId()+ ";"+m.getAmount()+";"+m.getFreeAmount()); 
    623                                         memory = new Memory(m, memoryRequest, memoryRequest); 
    624                                 } 
    625                         } 
    626                         if(memory == null) 
    627                                 return null; 
    628                         map.put(StandardResourceUnitName.MEMORY, memory); 
    629                 }*/ 
     513 
    630514                return  map; 
    631515        } 
    632  
    633  
    634          
    635         public void notifySubmittedWorkloadUnit(WorkloadUnit job, boolean ack) { 
     516         
     517        public void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack) { 
    636518                updateProcessingProgress(); 
    637                 registerWorkloadUnit(job); 
     519                registerWorkloadUnit(wu); 
    638520        } 
    639521 
     
    654536                        List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>(); 
    655537                        jobsList.add(job); 
    656                         WorkloadUnitListImpl readyTasks = new WorkloadUnitListImpl(); 
    657                         for(Task task: jobRegistry.getReadyTasks(jobsList)){ 
     538                        TaskListImpl readyTasks = new TaskListImpl(); 
     539                        for(Task task: jobRegistry.getAvailableTasks(jobsList)){ 
    658540                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED); 
    659541                                readyTasks.add(task); 
     
    667549                public void handleTask(TaskInterface<?> ti){ 
    668550                        Task task = (Task)ti; 
    669                          
    670                         if(task.getProcesses() == null){ 
     551                        List<AbstractProcesses> processes = task.getProcesses(); 
     552 
     553                        if(processes == null || processes.size() == 0){ 
    671554                                Executable exec = new Executable(task); 
    672                                 exec.setUserID(task.getSenderId()); 
    673                                 exec.setLength(task.getLength()); 
    674555                                registerWorkloadUnit(exec); 
    675556                        } else { 
    676                                 List<AbstractProcesses> processesList = task.getProcesses(); 
    677                                 for(int i = 0; i < processesList.size(); i++){   
    678                                         AbstractProcesses processes = processesList.get(i); 
    679                                         Executable exec = new Executable(task, processes); 
    680                                         exec.setUserID(task.getSenderId()); 
    681                                         exec.setLength(task.getLength()); 
     557                                for(int j = 0; j < processes.size(); j++){ 
     558                                        AbstractProcesses procesesSet = processes.get(j); 
     559                                        Executable exec = new Executable(task, procesesSet); 
    682560                                        registerWorkloadUnit(exec); 
    683561                                } 
     
    701579                        } 
    702580                         
    703                         exec.setResourceParameter(scheduler.get_id(), 1); 
    704                         SubmittedTask subTask = new SubmittedTask(exec); 
    705                         jobRegistry.addTask(subTask); 
    706                         WorkloadUnitListImpl newTasks = new WorkloadUnitListImpl(); 
    707                         newTasks.add(subTask); 
    708                  
    709                         schedulingPlugin.placeJobsInQueues(newTasks, queues, getResourceManager(), moduleList); 
    710  
    711                         if (subTask.getStatus() == Gridlet.QUEUED) { 
     581                        exec.setSchedulerName(scheduler.get_id()); 
     582                        jobRegistry.addExecTask(exec); 
     583                        TaskListImpl newTasks = new TaskListImpl(); 
     584                        newTasks.add(exec); 
     585                 
     586                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList); 
     587 
     588                        if (exec.getStatus() == DCWormsTags.QUEUED) { 
    712589                                sendExecutableReadyEvent(exec); 
    713                         } 
    714                 } 
    715                  
    716                 public void handleSubmittedTask(SubmittedTask task){ 
    717  
    718                         task.visitResource(scheduler.get_name()); 
    719                         Scheduler parentScheduler = scheduler.getParent(); 
    720                         while (parentScheduler != null && !task.getVisitedResources().contains(parentScheduler.get_name())) { 
    721                                 task.visitResource(parentScheduler.get_name()); 
    722                                 parentScheduler = parentScheduler.getParent(); 
    723                         } 
    724  
    725                         jobRegistry.addTask(task); 
    726                         WorkloadUnitListImpl newTasks = new WorkloadUnitListImpl(); 
    727                         newTasks.add(task); 
    728                  
    729                         schedulingPlugin.placeJobsInQueues(newTasks, queues, getResourceManager(), moduleList); 
    730  
    731                         if (task.getStatus() == Gridlet.QUEUED) { 
    732                                 sendExecutableReadyEvent(task); 
    733590                        } 
    734591                } 
Note: See TracChangeset for help on using the changeset viewer.