package simulator; import eduni.simjava.Sim_event; import eduni.simjava.Sim_system; import gridsim.GridSimTags; import gridsim.gssim.AbstractAdvanceReservation; import gridsim.gssim.GssimConstants; import gridsim.gssim.GssimTags; import schedframe.scheduling.events.SchedulingEventType; import schedframe.scheduling.plan.impl.SchedulingPlan; import schedframe.scheduling.plugin.SchedulingPluginConfiguration; import schedframe.scheduling.plugin.grid.GlobalSchedulingPlugin; import schedframe.scheduling.plugin.grid.ModuleList; import schedframe.scheduling.plugin.grid.ModuleListImpl; import simulator.lists.ExecutablesList; import simulator.utils.InstanceFactory; import test.rewolucja.CommunicationInterface; import gssim.schedframe.scheduling.AbstractExecutable; import gssim.schedframe.scheduling.plugin.grid.GridResourceDiscovery; import gridsim.net.Link; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** * This class models an interface to the grid scheduler (the plug-in). It is an entity that: * * * @author Stanislaw Szczepanowski * */ public abstract class AbstractGridBroker extends AbstractAdvanceReservation implements CommunicationInterface{ protected Set otherGridSchedulersIDs; protected ModuleList moduleList; /** * The plug-in that has been passed to this interface entity */ protected GlobalSchedulingPlugin gridSchedulerPlugin; private static Log log = LogFactory.getLog(AbstractGridBroker.class); /** * @param name the name of this entity * @param gridSchedulerPluginName the name and package prefix of the plug-in that is to be used (this should be given in the main method of the simulator) * @throws Exception if the plug-in name is wrong */ public AbstractGridBroker(String name, String gridSchedulerPluginName) throws Exception { super(name, GssimConstants.DEFAULT_BAUD_RATE); //make use of plug-in interface gridSchedulerPlugin = (GlobalSchedulingPlugin) InstanceFactory.createInstance( gridSchedulerPluginName, GlobalSchedulingPlugin.class); if(gridSchedulerPlugin == null){ throw new Exception("Can not create grid scheduling plugin instance"); } Properties prop = new Properties(); prop.put("plugin.name", name); prop.put("plugin.utils.timeoperations", "gssim.scheduling.plugin.local.GssimTimeOperations"); gridSchedulerPlugin.init(prop); this.otherGridSchedulersIDs = new HashSet(); this.moduleList = new ModuleListImpl(2); this.moduleList.add(new GridResourceDiscovery(this)); if(log.isDebugEnabled()) log.debug(name + ": Creating a broker interface object"); } /** * @param name the name of this entity * @param gridSchedulerPluginName the name and package prefix of the plug-in that is to be used (this should be given in the main method of the simulator) * @param link the connection between entity and router * @throws Exception if the plug-in name is wrong */ public AbstractGridBroker(String name, Link link, String gridSchedulerPluginName) throws Exception { super(name, link); //make use of plug-in interface gridSchedulerPlugin = (GlobalSchedulingPlugin) InstanceFactory.createInstance( gridSchedulerPluginName, GlobalSchedulingPlugin.class); if(gridSchedulerPlugin == null){ throw new Exception("Can not create grid scheduling plugin instance"); } Properties prop = new Properties(); prop.put("plugin.name", name); prop.put("plugin.utils.timeoperations", "gssim.scheduling.plugin.local.GssimTimeOperations"); gridSchedulerPlugin.init(prop); this.otherGridSchedulersIDs = new HashSet(); this.moduleList = new ModuleListImpl(2); this.moduleList.add(new GridResourceDiscovery(this)); if(log.isDebugEnabled()) log.debug(name + ": Creating a broker interface object"); } public void body() { if(log.isDebugEnabled()) log.debug("Grid scheduler interface "+name+" entity has started"); SchedulingPluginConfiguration config = gridSchedulerPlugin.getConfiguration(); if(config != null){ Map events = config.getServedEvents(); Object obj = events.get(SchedulingEventType.TIMER); if(obj != null){ int delay = (Integer) obj; send(this.me, delay, GssimTags.TIMER); } } Sim_event ev = new Sim_event(); boolean run = true; /* List arResources = GridSim.getAdvancedReservationList(); Iterator itr = arResources.iterator(); while(itr.hasNext()){ Integer dest = (Integer) itr.next(); send(dest, GridSimTags.SCHEDULE_NOW, GssimTags.RETURN_RESERVATIONS, new Integer(this.me)); } */ sim_get_next(ev); while(Sim_system.running() && run) { switch (ev.get_tag()) { case GridSimTags.END_OF_SIMULATION: run = false; break; case GridSimTags.GRIDLET_SUBMIT: if(!pluginSupportsEvent(GridSimTags.GRIDLET_SUBMIT)){ log.error("Plugin " + this.gridSchedulerPlugin.getClass() + " does not provide support for TASK_ARRIVED event.\n" + "Check plugin configuration or use default one."); break; } List jobsList = prepareJobDescription(ev); //all the tasks are supposed to turn back to this entity (broker interface) int submittedJobsCount = jobsList.size(); //give the task to the broker scheduleJob(jobsList, null, null, moduleList, null, null, null, null, null); //informs the Sim_system statistics about the completion of an event (multiplied by the number of jobs //sent to this broker) - this influences the "Throughput" metric in statistics. for (int i = 0; i < submittedJobsCount; ++i) { sim_completed(ev); } break; case GridSimTags.INFOPKT_RETURN: //do nothing break; case GridSimTags.GRIDLET_RETURN: AbstractExecutable simpleGridlet = (AbstractExecutable) ev.get_data(); notifyReturnedGridlet(simpleGridlet); break; case GridSimTags.GRIDLET_CANCEL: AbstractExecutable taskGridlet = (AbstractExecutable) ev.get_data(); if(pluginSupportsEvent(GridSimTags.GRIDLET_CANCEL)) notifyCanceledGridlet(taskGridlet); break; case GssimTags.TIMER: if(pluginSupportsEvent(GssimTags.TIMER)) scheduleCyclic(); config = gridSchedulerPlugin.getConfiguration(); if(config != null){ Map events = config.getServedEvents(); Object obj = events.get(SchedulingEventType.TIMER); if(obj != null){ int delay = (Integer) obj; send(this.me, delay, GssimTags.TIMER); } } break; } //sim_get_next(notGRMSTags, ev); sim_get_next(ev); } //shut down all the entities, including GridStatistics entity since // we used it to record certain events. shutdownGridStatisticsEntity(); shutdownUserEntity(); terminateIOEntities(); if(log.isDebugEnabled()) log.debug("Grid scheduler "+name+" interface ends"); } public void addOtherGridSchedulerID(int id){ this.otherGridSchedulersIDs.add(id); } /** * This method is used to retrieve the the list of identifiers of grid resources, that this broker instance can contact to. * This method should be overwritten in subclasses, if a broker instance sees only a subset of all available grid resources. * * @return the list of identifiers of grid resources, that this broker instance can contact to */ public List getMyGridResources() { return getGridResourceList(); } protected boolean pluginSupportsEvent(int eventType){ SchedulingPluginConfiguration config = this.gridSchedulerPlugin.getConfiguration(); if(config == null) return false; Map servedEvent = config.getServedEvents(); if(servedEvent == null) return false; switch(eventType){ case GssimTags.TIMER: return servedEvent.containsKey(SchedulingEventType.TIMER); case GssimTags.GRIDLET_SUBMIT: return servedEvent.containsKey(SchedulingEventType.TASK_ARRIVED); case GssimTags.GRIDLET_CANCEL: return servedEvent.containsKey(SchedulingEventType.TASK_CANCELED); case GssimTags.GRIDLET_RESUME: return servedEvent.containsKey(SchedulingEventType.TASK_ARRIVED); case GssimTags.GRIDRESOURCE_FAILURE: return servedEvent.containsKey(SchedulingEventType.RESOURCE_FAILED); default: return false; } } /** * The entry point to the plug-in.
* Invokes the plug-in with the provided information. * * @param gridletList the list of jobs that have come from users * @param jobDescription (JD) Job description (resource requirements, information needed to run a job-e.g. environmental variables, data needed) * @param userPreferences (UP) User preferences * @param reservationRequest (RR) Reservation request * @param resources (RP) Resource parameters (static and dynamic) * @param reservationOffers (RO) Reservation offers * @param predictedTimes (PT) Predicted times * @param jobSet (JQ) Job set(queue) * @param jobRegistry (JR) Job registry (information about already executed (and being executed) jobs) * @param brokerConfiguration (BC) Broker configuration */ public abstract void scheduleJob(List gridletList, String userPreferences, String reservationRequest, ModuleList moduleList, String reservationOffers, String predictedTimes, String jobSet, String jobRegistry, String brokerConfiguration); public abstract void scheduleCyclic(); public abstract void notifyReturnedGridlet(AbstractExecutable simpleGridlet); public abstract void notifyCanceledGridlet(AbstractExecutable taskGridlet); public abstract ExecutablesList getExecutables(); protected abstract List prepareJobDescription(Sim_event ev); }