Ignore:
Timestamp:
10/31/12 13:52:06 (12 years ago)
Author:
wojtekp
Message:
 
Location:
DCWoRMS/trunk/build/classes/schedframe/scheduling/policy/global
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • DCWoRMS/trunk/build/classes/schedframe/scheduling/policy/global/GlobalManagementSystem.java

    r477 r539  
    1111import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus; 
    1212 
     13import dcworms.schedframe.scheduling.ExecTask; 
     14import dcworms.schedframe.scheduling.Executable; 
     15 
    1316import qcg.shared.constants.BrokerConstants; 
    14  
    15 import schedframe.events.scheduling.EventReason; 
    16 import schedframe.events.scheduling.SchedulingEvent; 
    1717import schedframe.events.scheduling.TaskArrivedEvent; 
    18 import schedframe.events.scheduling.TaskCanceledEvent; 
    1918import schedframe.events.scheduling.TimerEvent; 
    20 import schedframe.scheduling.Scheduler; 
     19import schedframe.scheduling.TaskListImpl; 
    2120import schedframe.scheduling.WorkloadUnitHandler; 
    22 import schedframe.scheduling.WorkloadUnitListImpl; 
    2321import schedframe.scheduling.plan.AllocationInterface; 
    2422import schedframe.scheduling.plan.ScheduledTaskInterface; 
     
    3129import schedframe.scheduling.tasks.Job; 
    3230import schedframe.scheduling.tasks.JobInterface; 
    33 import schedframe.scheduling.tasks.SubmittedTask; 
    3431import schedframe.scheduling.tasks.Task; 
    3532import schedframe.scheduling.tasks.TaskInterface; 
    3633import schedframe.scheduling.tasks.WorkloadUnit; 
    37  
    3834import eduni.simjava.Sim_event; 
    3935import gridsim.GridSim; 
    4036import gridsim.GridSimTags; 
    41 import gridsim.Gridlet; 
    4237import gridsim.IO_data; 
    43 import gridsim.gssim.WormsTags; 
    44 import gssim.schedframe.scheduling.ExecTask; 
    45 import gssim.schedframe.scheduling.Executable; 
     38import gridsim.dcworms.DCWormsTags; 
    4639 
    4740public class GlobalManagementSystem extends AbstractManagementSystem { 
     
    5447                super(providerId, entityName,  execTimeEstimationPlugin, queues); 
    5548 
    56                 /*schedulingPlugin = (GlobalSchedulingPlugin) InstanceFactory.createInstance( 
    57                                 schedulingPluginClassName, 
    58                                 GlobalSchedulingPlugin.class);*/ 
    5949                if(schedPlugin == null){ 
    6050                        throw new Exception("Can not create global scheduling plugin instance"); 
     
    6959                switch (tag) { 
    7060 
    71                 case WormsTags.TIMER: 
    72                         if (pluginSupportsEvent(WormsTags.TIMER)) { 
     61                case DCWormsTags.TIMER: 
     62                        if (pluginSupportsEvent(DCWormsTags.TIMER)) { 
    7363                                TimerEvent event = new  TimerEvent(); 
    74                                 SchedulingPlanInterface decision =  schedulingPlugin.schedule(event,  
     64                                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,  
    7565                                                queues,  getJobRegistry(), getResourceManager(), moduleList); 
    7666                                executeSchedulingPlan(decision); 
     
    8171        } 
    8272         
    83         public void notifySubmittedWorkloadUnit(WorkloadUnit<?> wu, boolean ack) { 
     73        public void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack) { 
    8474                if (!pluginSupportsEvent(GridSimTags.GRIDLET_SUBMIT)) { 
    8575                        log.error("Plugin " + schedulingPlugin.getClass() 
     
    9080                 
    9181                registerWorkloadUnit(wu); 
    92                 /*Job job = (Job) wu; 
    93                 jobRegistry.addJob(job); 
    94  
    95                 if (log.isInfoEnabled()) 
    96                         log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis())); 
    97  
     82 
     83        } 
     84         
     85        private void registerWorkloadUnit(WorkloadUnit wu){ 
     86                if(!wu.isRegistered()){ 
     87                        wu.register(jobRegistry); 
     88                } 
     89                wu.accept(getWorkloadUnitHandler()); 
     90        } 
     91         
     92 
     93        protected void schedule(JobInterface<?> job){ 
    9894                List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>(); 
    9995                jobsList.add(job); 
    100                 WorkloadUnitList readyWorkloadUnits = new WorkloadUnitList(); 
    101                 readyWorkloadUnits.addAll(jobRegistry.getReadyTasks(jobsList)); 
    102                 schedulingPlugin.placeJobsInQueues(readyWorkloadUnits, queues, getResourceManager(), moduleList); 
    103  
    104                 schedule(new TaskArrivedEvent());*/ 
    105  
    106         } 
    107          
    108         private void registerWorkloadUnit(WorkloadUnit<?> wu){ 
    109                 if(!wu.isRegistered()){ 
    110                         wu.register(jobRegistry); 
    111                 } 
    112                 wu.accept(getWorkloadUnitHandler()); 
    113         } 
    114          
    115  
    116         protected void scheduleReadyTasks(Job job){ 
    117                 List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>(); 
    118                 jobsList.add(job); 
    119                 WorkloadUnitListImpl readyWorkloadUnits = new WorkloadUnitListImpl(); 
    120                 readyWorkloadUnits.addAll(jobRegistry.getReadyTasks(jobsList)); 
    121                 schedulingPlugin.placeJobsInQueues(readyWorkloadUnits, queues, getResourceManager(), moduleList); 
    122  
    123                 schedule(new TaskArrivedEvent()); 
    124         } 
    125          
    126         protected void schedule(SchedulingEvent schedulingEvent) { 
    127  
    128                 try { 
    129                         SchedulingPlanInterface decision = schedulingPlugin.schedule( 
    130                                         schedulingEvent, queues, getJobRegistry(),  getResourceManager(), moduleList); 
    131                         if (decision == null) 
    132                                 return; 
    133  
     96                TaskListImpl readyTasks = new TaskListImpl(); 
     97                readyTasks.addAll(jobRegistry.getAvailableTasks(jobsList)); 
     98                 
     99                schedulingPlugin.placeTasksInQueues(readyTasks, queues, getResourceManager(), moduleList); 
     100                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule( 
     101                                new TaskArrivedEvent(), queues, getJobRegistry(),  getResourceManager(), moduleList); 
     102                if (decision != null) 
    134103                        executeSchedulingPlan(decision); 
    135  
    136                 } catch (Exception e) { 
    137                         e.printStackTrace(); 
    138                 } 
    139         } 
    140          
    141         public void notifyReturnedWorkloadUnit(WorkloadUnit<?> wu) { 
     104        } 
     105 
     106        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) { 
    142107                Executable exec = (Executable) wu; 
    143108 
     
    149114                 
    150115                try { 
    151                         Job job = jobRegistry.get(exec.getJobId()); 
    152                         /*Task task = job.getTask(exec.getTaskId()); 
     116                        Job job = jobRegistry.getJob(exec.getJobId()); 
     117                        Task task = job.getTask(exec.getTaskId()); 
    153118                        if(exec.getProcessesId() == null){ 
    154119                                try { 
    155120                                        task.setStatus(exec.getStatus()); 
    156121                                } catch (Exception e) { 
    157                                         // TODO Auto-generated catch block 
    158                                         e.printStackTrace(); 
     122 
    159123                                } 
    160124                        } else { 
     
    167131                                        } 
    168132                                } 
    169                         }*/ 
     133                        } 
    170134                         
    171135                        if(job.isFinished()){ 
     
    173137                        } 
    174138                        else { 
    175                                 scheduleReadyTasks(job); 
    176                                 /*List<JobInterface<?>> jobs = new ArrayList<JobInterface<?>>(); 
    177                                 jobs.add(jobRegistry.getJobInfo(job.getId())); 
    178                                 WorkloadUnitList readyWorkloadUnits = new WorkloadUnitList(); 
    179                                 readyWorkloadUnits.addAll(jobRegistry.getReadyTasks(jobs)); 
    180                                 schedulingPlugin.placeJobsInQueues(readyWorkloadUnits, queues, 
    181                                                 getResourceManager(), moduleList); 
    182                                 schedule(new TaskArrivedEvent());*/ 
     139                                schedule(job); 
    183140                        } 
    184141                         
     
    189146        } 
    190147 
    191         public void notifyCanceledWorkloadUnit(WorkloadUnit<?> wu){; 
    192  
    193                 Executable task = (Executable) wu; 
    194                 String jobID = task.getJobId(); 
    195                 String taskID = task.getId(); 
    196                  
    197                 if(log.isDebugEnabled()) 
    198                         log.debug("Received canceled job" + jobID + "_" + taskID); 
    199                  
    200                 TaskInterface<?> ti = jobRegistry.getTaskInfo(jobID, taskID) ; 
    201                 try { 
    202  
    203                         ti.setStatus((int)BrokerConstants.JOB_STATUS_CANCELED); 
    204                          
    205                         TaskCanceledEvent event = new TaskCanceledEvent(jobID, taskID); 
    206                         event.setReason(EventReason.RESERVATION_EXCEEDED); 
    207                         schedule(event); 
    208                          
    209                 } catch (Exception e) { 
    210                         log.error("Exception during scheduling. " + e.getMessage()); 
    211                         e.printStackTrace(); 
    212                 } 
    213         } 
    214          
    215         protected void executeSchedulingPlan(SchedulingPlanInterface decision) { 
    216  
    217                 ArrayList<ScheduledTaskInterface> taskSchedulingDecisions = decision.getTasks(); 
     148        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) { 
     149 
     150                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks(); 
    218151                for (int i = 0; i < taskSchedulingDecisions.size(); i++) { 
    219152 
    220                         try { 
    221                                 ScheduledTaskInterface taskDecision = taskSchedulingDecisions.get(i); 
    222          
    223                                 //log.info(decision.getDocument()); 
    224          
    225                                 String jobID = taskDecision.getJobId(); 
    226                                 String taskID = taskDecision.getTaskId(); 
    227                                  
    228                                 // Task allocations that were rejected because of lack of resources or which were canceled and  
    229                                 // not scheduled again are returned to the user. 
    230                                 if(taskDecision.getStatus() == AllocationStatus.REJECTED){ 
    231                                         Job job = jobRegistry.get(jobID); 
    232                                         scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job); 
    233                                         continue; 
    234                                 } 
    235                                  
    236                                 ArrayList<AllocationInterface> allocations = taskDecision.getAllocations(); 
    237  
    238                                 Task task = (Task) jobRegistry.getTaskInfo(jobID, taskID); 
    239                                 for (int j = 0; j < allocations.size(); j++) { 
    240  
    241                                         AllocationInterface allocation = allocations.get(j); 
    242                                         Executable exec = jobRegistry.createExecutable(task, allocation); 
    243                                         submitWorkloadUnit(exec, allocation); 
    244                                         task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED); 
    245                                 }                                                                
    246  
    247                         }catch (Exception e){ 
    248                                 e.printStackTrace(); 
    249                         } 
    250                 } 
    251         } 
    252  
    253         protected void submitWorkloadUnit(WorkloadUnit<?> job, AllocationInterface allocation) { 
     153                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i); 
     154 
     155                        //log.info(decision.getDocument()); 
     156 
     157                        String jobID = taskDecision.getJobId(); 
     158                        String taskID = taskDecision.getTaskId(); 
     159                         
     160                        // Task allocations that were rejected because of lack of resources or which were canceled and  
     161                        // not scheduled again are returned to the user. 
     162                        if(taskDecision.getStatus() == AllocationStatus.REJECTED){ 
     163                                Job job = jobRegistry.getJob(jobID); 
     164                                scheduler.send(job.getSenderId(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_RETURN, job); 
     165                                continue; 
     166                        } 
     167                         
     168                        Task task = (Task) jobRegistry.getTaskInfo(jobID, taskID); 
     169                         
     170                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations(); 
     171                        for (int j = 0; j < allocations.size(); j++) { 
     172 
     173                                AllocationInterface<?> allocation = allocations.get(j); 
     174                                Executable exec = createExecutable(task, allocation); 
     175                                submitTask(exec, allocation); 
     176                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED); 
     177                        }                                                                
     178                } 
     179        } 
     180 
     181        private Executable createExecutable(Task task, AllocationInterface<?> allocation) { 
     182 
     183                String refersTo = allocation.getProcessGroupId(); // null;//allocation.getRefersTo(); 
     184                if(refersTo == null) 
     185                        refersTo = task.getId(); 
     186                         
     187                Executable exec = null; 
     188 
     189                if(refersTo.equals(task.getId())){ 
     190                        exec = new Executable(task); 
     191                } else { 
     192                        List<AbstractProcesses> processes = task.getProcesses(); 
     193                        if(processes == null) { 
     194                                try { 
     195                                        log.error("Allocation: " + allocation.getDocument() + "\nrefers to unknown task or processes set." + 
     196                                                        " Set correct value (task id or prcesses set id) for allocation refersTo attribute."); 
     197                                } catch (Exception e) { 
     198                                        e.printStackTrace(); 
     199                                } 
     200                        } 
     201                        boolean found = false; 
     202                        for(int j = 0; j < processes.size() && !found; j++){ 
     203                                AbstractProcesses procesesSet = processes.get(j); 
     204                                if(refersTo.equals(procesesSet.getId())){ 
     205                                        exec = new Executable(task, procesesSet); 
     206                                        found = true; 
     207                                } 
     208                        } 
     209                        if(!found){ 
     210                                log.error("Allocation refers to unknown proceses set."); 
     211                        } 
     212                } 
     213 
     214                exec.setReservationId(allocation.getReservationId()); 
     215                         
     216                /*HostInterface<?> host = allocation.getHost(); 
     217                ComputingResourceTypeInterface<?> crt = host.getMachineParameters(); 
     218                if(crt != null){ 
     219                        ComputingResourceTypeItemInterface<?> crti = crt.getComputingResourceTypeItem(0); 
     220                        if(crti != null){ 
     221                                ParameterPropertyInterface<?> properties[] = crti.getHostParameter().getProperty(); 
     222                                for(int p = 0; p < properties.length; p++){ 
     223                                        ParameterPropertyInterface<?> property = properties[p]; 
     224                                        if("chosenCPUs".equals(property.getName())){ 
     225                                                Object cpuNames = property.getValue(); 
     226                                                exec.addSpecificResource(ResourceParameterName.FREECPUS, cpuNames); 
     227                                        } 
     228                                } 
     229                        } 
     230                }*/ 
     231                return exec; 
     232        } 
     233         
     234        protected void submitTask(TaskInterface<?> task, AllocationInterface<?> allocation) { 
    254235 
    255236                String providerName = allocation.getProviderName(); 
     
    257238                        return; 
    258239                } 
    259                 TaskInterface<?> task = (TaskInterface<?>) job; 
    260240                removeFromQueue(task); 
    261241                 
    262242                int resID = GridSim.getEntityId(providerName); 
    263                 IO_data data = new IO_data(job, 0, resID); 
     243                IO_data data = new IO_data(task, 0, resID); 
    264244                scheduler.send(scheduler.getOutputPort(), GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, data);   
    265                  
    266                 //scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, job);                       
     245                //scheduler.send(providerName, GridSimTags.SCHEDULE_NOW, GridSimTags.GRIDLET_SUBMIT, job);       
     246                 
    267247                if(log.isDebugEnabled()) 
    268                         try { 
    269                                 log.debug("Submitted job " + job.getId() + " to " + providerName); 
    270                         } catch (NoSuchFieldException e) { 
    271                                 // TODO Auto-generated catch block 
    272                                 e.printStackTrace(); 
    273                         } 
     248                        log.debug("Submitted job " + task.getId() + " to " + providerName); 
     249 
    274250        } 
    275251 
    276252        class GlobalWorkloadUnitHandler implements  WorkloadUnitHandler{ 
    277253 
    278                 public void handleJob(Job job){ 
    279  
    280                         jobRegistry.addJob(job); 
     254                public void handleJob(JobInterface<?> job){ 
    281255                        if (log.isInfoEnabled()) 
    282256                                log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis())); 
    283257 
    284                         scheduleReadyTasks(job); 
     258                        jobRegistry.addJob(job); 
     259                        schedule(job); 
    285260                } 
    286261                 
     
    292267                        throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler."); 
    293268                } 
    294  
    295                 public void handleSubmittedTask(SubmittedTask task) { 
    296                         throw new RuntimeException("Not implemented since it isn't expected that tasks are send directly to the global scheduler."); 
    297                 } 
    298269        } 
    299270 
     
    301272                return new GlobalWorkloadUnitHandler(); 
    302273        } 
    303          
    304274 
    305275 
  • DCWoRMS/trunk/build/classes/schedframe/scheduling/policy/global/GridBroker.java

    r477 r539  
    1919public class GridBroker extends GlobalManagementSystem {  
    2020 
    21          
    2221        private static Log log = LogFactory.getLog(GridBroker.class); 
    2322 
     
    2726        public GridBroker(String name, SchedulingPlugin schedulingPlugin, ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues) throws Exception { 
    2827                super(name, "BROKER",  schedulingPlugin, execTimeEstimationPlugin, queues); 
    29                  
    30                 //make use of plug-in interface 
    31                  
    32                 //Properties prop = new Properties(); 
    33                 //prop.put("plugin.name", name); 
    34                 //prop.put("plugin.utils.timeoperations", "gssim.scheduling.plugin.local.GssimTimeOperations"); 
    35                 //schedulingPlugin.init(prop); 
     28 
    3629                otherGridSchedulersIds = new HashSet<Integer>(); 
    37                  
    3830                moduleList = new ModuleListImpl(2); 
    39                 //this.moduleList.add(new GridResourceDiscovery(this.getScheduler())); 
    40                 //moduleList.add(new GridReservationManagerNew(this)); 
    41                  
    42                 if(log.isDebugEnabled()) 
    43                         log.debug(name + ": Creating a broker interface object"); 
    4431        } 
    4532 
    4633        public void init(Scheduler scheduler, ManagedResources managedResources) { 
    4734                super.init(scheduler, managedResources); 
    48                 //this.scheduler = scheduler; 
    49                 //this.resourceManager = ResourceManagerFactory.createResourceManager(scheduler, managedResources); 
    5035                this.moduleList.add((GridResourceDiscovery)resourceManager); 
    5136        } 
     
    5742                } 
    5843                return providerIds; 
    59                 //return GridSim.getGridResourceList(); 
    6044        } 
    6145 
Note: See TracChangeset for help on using the changeset viewer.