package simulator;
import eduni.simjava.Sim_event;
import eduni.simjava.Sim_system;
import gridsim.GridSimTags;
import gridsim.gssim.AbstractAdvanceReservation;
import gridsim.gssim.GssimConstants;
import gridsim.gssim.GssimTags;
import schedframe.scheduling.events.SchedulingEventType;
import schedframe.scheduling.plan.impl.SchedulingPlan;
import schedframe.scheduling.plugin.SchedulingPluginConfiguration;
import schedframe.scheduling.plugin.grid.GlobalSchedulingPlugin;
import schedframe.scheduling.plugin.grid.ModuleList;
import schedframe.scheduling.plugin.grid.ModuleListImpl;
import simulator.lists.ExecutablesList;
import simulator.utils.InstanceFactory;
import test.rewolucja.CommunicationInterface;
import gssim.schedframe.scheduling.AbstractExecutable;
import gssim.schedframe.scheduling.plugin.grid.GridResourceDiscovery;
import gridsim.net.Link;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* This class models an interface to the grid scheduler (the plug-in). It is an entity that:
*
* - passes the received jobs (created by users) to the plug-in,
* - collects the current resources' descriptions and gives it to the plug-in,
* - sends the scheduled tasks to the resources,
* - gives back the finished tasks to the plug-in,
* - sends back the return information to the users.
*
*
* @author Stanislaw Szczepanowski
*
*/
public abstract class AbstractGridBroker extends AbstractAdvanceReservation implements CommunicationInterface{
protected Set otherGridSchedulersIDs;
protected ModuleList moduleList;
/**
* The plug-in that has been passed to this interface entity
*/
protected GlobalSchedulingPlugin gridSchedulerPlugin;
private static Log log = LogFactory.getLog(AbstractGridBroker.class);
/**
* @param name the name of this entity
* @param gridSchedulerPluginName the name and package prefix of the plug-in that is to be used (this should be given in the main method of the simulator)
* @throws Exception if the plug-in name is wrong
*/
public AbstractGridBroker(String name, String gridSchedulerPluginName) throws Exception {
super(name, GssimConstants.DEFAULT_BAUD_RATE);
//make use of plug-in interface
gridSchedulerPlugin = (GlobalSchedulingPlugin) InstanceFactory.createInstance(
gridSchedulerPluginName,
GlobalSchedulingPlugin.class);
if(gridSchedulerPlugin == null){
throw new Exception("Can not create grid scheduling plugin instance");
}
Properties prop = new Properties();
prop.put("plugin.name", name);
prop.put("plugin.utils.timeoperations", "gssim.scheduling.plugin.local.GssimTimeOperations");
gridSchedulerPlugin.init(prop);
this.otherGridSchedulersIDs = new HashSet();
this.moduleList = new ModuleListImpl(2);
this.moduleList.add(new GridResourceDiscovery(this));
if(log.isDebugEnabled())
log.debug(name + ": Creating a broker interface object");
}
/**
* @param name the name of this entity
* @param gridSchedulerPluginName the name and package prefix of the plug-in that is to be used (this should be given in the main method of the simulator)
* @param link the connection between entity and router
* @throws Exception if the plug-in name is wrong
*/
public AbstractGridBroker(String name, Link link, String gridSchedulerPluginName) throws Exception {
super(name, link);
//make use of plug-in interface
gridSchedulerPlugin = (GlobalSchedulingPlugin) InstanceFactory.createInstance(
gridSchedulerPluginName,
GlobalSchedulingPlugin.class);
if(gridSchedulerPlugin == null){
throw new Exception("Can not create grid scheduling plugin instance");
}
Properties prop = new Properties();
prop.put("plugin.name", name);
prop.put("plugin.utils.timeoperations", "gssim.scheduling.plugin.local.GssimTimeOperations");
gridSchedulerPlugin.init(prop);
this.otherGridSchedulersIDs = new HashSet();
this.moduleList = new ModuleListImpl(2);
this.moduleList.add(new GridResourceDiscovery(this));
if(log.isDebugEnabled())
log.debug(name + ": Creating a broker interface object");
}
public void body() {
if(log.isDebugEnabled())
log.debug("Grid scheduler interface "+name+" entity has started");
SchedulingPluginConfiguration config = gridSchedulerPlugin.getConfiguration();
if(config != null){
Map events = config.getServedEvents();
Object obj = events.get(SchedulingEventType.TIMER);
if(obj != null){
int delay = (Integer) obj;
send(this.me, delay, GssimTags.TIMER);
}
}
Sim_event ev = new Sim_event();
boolean run = true;
/* List> arResources = GridSim.getAdvancedReservationList();
Iterator> itr = arResources.iterator();
while(itr.hasNext()){
Integer dest = (Integer) itr.next();
send(dest, GridSimTags.SCHEDULE_NOW, GssimTags.RETURN_RESERVATIONS, new Integer(this.me));
}
*/
sim_get_next(ev);
while(Sim_system.running() && run) {
switch (ev.get_tag()) {
case GridSimTags.END_OF_SIMULATION:
run = false;
break;
case GridSimTags.GRIDLET_SUBMIT:
if(!pluginSupportsEvent(GridSimTags.GRIDLET_SUBMIT)){
log.error("Plugin " + this.gridSchedulerPlugin.getClass() + " does not provide support for TASK_ARRIVED event.\n" +
"Check plugin configuration or use default one.");
break;
}
List> jobsList = prepareJobDescription(ev);
//all the tasks are supposed to turn back to this entity (broker interface)
int submittedJobsCount = jobsList.size();
//give the task to the broker
scheduleJob(jobsList, null, null, moduleList, null, null, null, null, null);
//informs the Sim_system statistics about the completion of an event (multiplied by the number of jobs
//sent to this broker) - this influences the "Throughput" metric in statistics.
for (int i = 0; i < submittedJobsCount; ++i) {
sim_completed(ev);
}
break;
case GridSimTags.INFOPKT_RETURN:
//do nothing
break;
case GridSimTags.GRIDLET_RETURN:
AbstractExecutable simpleGridlet = (AbstractExecutable) ev.get_data();
notifyReturnedGridlet(simpleGridlet);
break;
case GridSimTags.GRIDLET_CANCEL:
AbstractExecutable taskGridlet = (AbstractExecutable) ev.get_data();
if(pluginSupportsEvent(GridSimTags.GRIDLET_CANCEL))
notifyCanceledGridlet(taskGridlet);
break;
case GssimTags.TIMER:
if(pluginSupportsEvent(GssimTags.TIMER))
scheduleCyclic();
config = gridSchedulerPlugin.getConfiguration();
if(config != null){
Map events = config.getServedEvents();
Object obj = events.get(SchedulingEventType.TIMER);
if(obj != null){
int delay = (Integer) obj;
send(this.me, delay, GssimTags.TIMER);
}
}
break;
}
//sim_get_next(notGRMSTags, ev);
sim_get_next(ev);
}
//shut down all the entities, including GridStatistics entity since
// we used it to record certain events.
shutdownGridStatisticsEntity();
shutdownUserEntity();
terminateIOEntities();
if(log.isDebugEnabled())
log.debug("Grid scheduler "+name+" interface ends");
}
public void addOtherGridSchedulerID(int id){
this.otherGridSchedulersIDs.add(id);
}
/**
* This method is used to retrieve the the list of identifiers of grid resources, that this broker instance can contact to.
* This method should be overwritten in subclasses, if a broker instance sees only a subset of all available grid resources.
*
* @return the list of identifiers of grid resources, that this broker instance can contact to
*/
public List getMyGridResources() {
return getGridResourceList();
}
protected boolean pluginSupportsEvent(int eventType){
SchedulingPluginConfiguration config = this.gridSchedulerPlugin.getConfiguration();
if(config == null)
return false;
Map servedEvent = config.getServedEvents();
if(servedEvent == null)
return false;
switch(eventType){
case GssimTags.TIMER:
return servedEvent.containsKey(SchedulingEventType.TIMER);
case GssimTags.GRIDLET_SUBMIT:
return servedEvent.containsKey(SchedulingEventType.TASK_ARRIVED);
case GssimTags.GRIDLET_CANCEL:
return servedEvent.containsKey(SchedulingEventType.TASK_CANCELED);
case GssimTags.GRIDLET_RESUME:
return servedEvent.containsKey(SchedulingEventType.TASK_ARRIVED);
case GssimTags.GRIDRESOURCE_FAILURE:
return servedEvent.containsKey(SchedulingEventType.RESOURCE_FAILED);
default: return false;
}
}
/**
* The entry point to the plug-in.
* Invokes the plug-in with the provided information.
*
* @param gridletList the list of jobs that have come from users
* @param jobDescription (JD) Job description (resource requirements, information needed to run a job-e.g. environmental variables, data needed)
* @param userPreferences (UP) User preferences
* @param reservationRequest (RR) Reservation request
* @param resources (RP) Resource parameters (static and dynamic)
* @param reservationOffers (RO) Reservation offers
* @param predictedTimes (PT) Predicted times
* @param jobSet (JQ) Job set(queue)
* @param jobRegistry (JR) Job registry (information about already executed (and being executed) jobs)
* @param brokerConfiguration (BC) Broker configuration
*/
public abstract void scheduleJob(List> gridletList,
String userPreferences,
String reservationRequest,
ModuleList moduleList,
String reservationOffers,
String predictedTimes,
String jobSet,
String jobRegistry,
String brokerConfiguration);
public abstract void scheduleCyclic();
public abstract void notifyReturnedGridlet(AbstractExecutable simpleGridlet);
public abstract void notifyCanceledGridlet(AbstractExecutable taskGridlet);
public abstract ExecutablesList getExecutables();
protected abstract List> prepareJobDescription(Sim_event ev);
}