package simulator; import eduni.simjava.Sim_event; import eduni.simjava.Sim_system; import gridsim.GridSim; import gridsim.GridSimTags; import gridsim.Gridlet; import gridsim.IO_data; import gridsim.gssim.GssimConstants; import gridsim.gssim.network.ExtendedGridSim; import gridsim.net.InfoPacket; import grms.shared.constants.BrokerConstants; import org.qcg.broker.schemas.resreqs.ResourceRequirements; import gssim.schedframe.scheduling.AbstractExecutable; import gssim.schedframe.scheduling.utils.JobDescription; import gssim.schedframe.scheduling.utils.TaskDescription; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import schedframe.scheduling.JobInterface; import schedframe.scheduling.TaskInterface; import simulator.workload.WorkloadLoader; import gridsim.net.Link; /** * @author Stanislaw Szczepanowski * @param a generic class that extends the functionality of {@link Job} interface */ public class GSSimUsers extends ExtendedGridSim implements GenericUser { /**A job generator, which produces jobs and tasks. These jobs are then sent by this entity */ protected WorkloadLoader workloadLoader; /** The name of the entity, to which the created tasks will be sent */ protected String destName; /** Indicates, that all tasks that returned to this object are finished */ protected boolean allTasksAreFinished; /** Stores the list of jobs, that have been returned to this object */ protected List> returnedJobs; protected Set sendJobsIds; protected Set returnedJobsIds; /** * Indicates, that an error has occurred - it is used for debug purposes */ protected boolean error; private static Log log = LogFactory.getLog(GSSimUsers.class); /** * Constructs the users object with the given parameters * @param name the name of the users entity (must be unique across all entities in the whole simulation) * @param destinationName the name of the entity, to which the created tasks will be sent * @param jobGenerator the job generator, which produces jobs and tasks, that will be sent by this class * @throws Exception if any occurs (see {@link GridSim#GridSim(String, double)}) */ public GSSimUsers(String name, String destinationName, WorkloadLoader workload) throws Exception { super(name, GssimConstants.DEFAULT_BAUD_RATE); this.workloadLoader = workload; destName = destinationName; allTasksAreFinished = true; error = false; sendJobsIds = new HashSet(); returnedJobsIds = new HashSet(); } /** * Constructs the users object with the given parameters * @param name the name of the users entity (must be unique across all entities in the whole simulation) * @param link the connection between users and broker * @param destinationName the name of the entity, to which the created tasks will be sent * @param jobGenerator the job generator, which produces jobs and tasks, that will be sent by this class * @throws Exception if any occurs (see {@link GridSim#GridSim(String, double)}) */ public GSSimUsers(String name, Link link, String destinationName, WorkloadLoader workload) throws Exception { super(name, link); this.workloadLoader = workload; destName = destinationName; allTasksAreFinished = true; error = false; sendJobsIds = new HashSet(); returnedJobsIds = new HashSet(); } @Override public void body() { sendJobs(); collectJobs(); //GridSim dependent code for shutting down the simulation shutdownGridStatisticsEntity(); terminateIOEntities(); shutdownUserEntity(); } /** * Collects the jobs sent to broker(s) */ protected void collectJobs() { final int FACTOR = Math.min(10, workloadLoader.getTaskCount()); //the refresh rate of the gauge: at most 10 times final int denominator = workloadLoader.getTaskCount() / FACTOR; allTasksAreFinished = true; returnedJobs = new ArrayList>(); int counter = 0; int oldRemeinder = 0; boolean hundredPercent = false; Sim_event ev = new Sim_event(); sim_get_next(ev); while (Sim_system.running()) { switch (ev.get_tag()) { case GridSimTags.END_OF_SIMULATION: //no action break; case GridSimTags.INFOPKT_SUBMIT: processPingRequest(ev); break; case GridSimTags.GRIDLET_RETURN: JobInterface returnedJob = (JobInterface) ev.get_data(); String jobId = null; try { jobId = returnedJob.getId(); } catch (NoSuchFieldException e) { // TODO Auto-generated catch block e.printStackTrace(); } if (returnedJobs.contains(returnedJob)) { if(log.isErrorEnabled()) log.error("Received the same job twice (job ID " + jobId + ")"); error = true; break; } returnedJobs.add(returnedJob); returnedJobsIds.add(jobId); if(returnedJob.getStatus() == BrokerConstants.JOB_STATUS_FINISHED) { if(log.isDebugEnabled()) log.debug("Received finished job " + jobId); } else { if(returnedJob.getStatus() == BrokerConstants.JOB_STATUS_CANCELED) { if(log.isWarnEnabled()){ String str = "Warning! An uncomplished job (Job ID: "+jobId+") has returned to users. Job was canceled."; log.warn(str); } } allTasksAreFinished = false; } counter += returnedJob.getTaskCount(); int remainder = (counter / denominator); if (remainder != oldRemeinder) { int gauge = ((counter * 100) / (workloadLoader.getTaskCount())); if(log.isInfoEnabled()) log.info(gauge + "% "); oldRemeinder = remainder; if (gauge == 100) hundredPercent = true; } break; } //if all the Gridlets have been collected if (counter == workloadLoader.getTaskCount()) { break; } sim_get_next(ev); } //if all the Gridlets have been collected if (counter == workloadLoader.getTaskCount()) { if (! hundredPercent) { if(log.isInfoEnabled()) log.info("100%"); } if(log.isInfoEnabled()) log.info(get_name() + ": Received all "+workloadLoader.getJobCount()+" jobs and " + counter + " tasks"); } else { if(log.isErrorEnabled()) log.error(get_name() + ": ERROR DID NOT RECEIVED all tasks - some tasks were not finished! (received "+counter+" of "+workloadLoader.getTaskCount()+")"); } Iterator itr = sendJobsIds.iterator(); String jobId; if(log.isInfoEnabled()){ log.info("Missing tasks: "); while(itr.hasNext()){ jobId = itr.next(); if(!returnedJobsIds.contains(jobId)){ log.info(jobId + ", "); } } } } /** * Sends jobs to broker entities */ protected void sendJobs() { Map> jobTimes = new TreeMap>(); List jobs = workloadLoader.getJobs(); //TaskRequirements taskReq = new TaskRequirementsImpl(); //double values[] = null; for (JobDescription job : jobs) { long l_submissionTime = Long.MAX_VALUE; //pick the lowest submission time for (TaskDescription task : job) { if (task.getSubmissionTime() < l_submissionTime) l_submissionTime = task.getSubmissionTime(); } //store the submission time expressed in seconds after the simulation start time double submissionTime = l_submissionTime; /* JobGridlet jg = (JobGridlet) job; jg.setUserID(get_id()); jg.setSourceID(get_id()); jg.setBrokerSubmissionTime(submissionTime); //TODO is it necessary? org.qcg.broker.schemas.jobdesc.Task t = jg.getTaskGridlet(0).getTaskDescription().getDescription(); taskReq.wrap(t); try { values = taskReq.getCpucount(); if(values != null && values.length != 0) jg.setNumPE(Double.valueOf(values[0]).intValue()); // set the requested number of processors } catch (NoSuchParamException e) { if(log.isErrorEnabled()) log.error(e); } try { values = taskReq.getMemory(); if(values != null && values.length != 0) jg.setMemoryRequest(Double.valueOf(values[0]).intValue()); } catch (NoSuchParamException e) { if(log.isErrorEnabled()) log.error(e); } */ List list = jobTimes.get(submissionTime); if (list == null) { list = new ArrayList(); jobTimes.put(submissionTime, list); } list.add(job); } int destID = GridSim.getEntityId(destName); for (Double submissionTime : jobTimes.keySet()) { List list = jobTimes.get(submissionTime); for(int i = 0; i < list.size(); i++){ this.sendJobsIds.add(list.get(i).getJobId()); } //send(output, submissionTime, GridSimTags.GRIDLET_SUBMIT, new IO_data(list, GssConstants.DEFAULT_GRIDLET_SIZE, destID)); send(destID, submissionTime, GridSimTags.GRIDLET_SUBMIT, list); } } public List getAllSentJobs() { return (List) workloadLoader.getJobs(); } public List getAllSentTasks() { List result = new ArrayList(); List sentJobs = getAllSentJobs(); for (JobDescription job : sentJobs) { result.addAll(job); } return result; } public List> getAllReceivedJobs() { return returnedJobs; } public int getFinishedTasksCount() { int result = 0; for (JobInterface job : returnedJobs) { for (TaskInterface task: job.getTask()) { if(task.getStatus() == Gridlet.SUCCESS) result++; } } return result; } public String getUserName() { return get_name(); } public boolean isError() { return error; } /** * Performs action concerning a ping request to this entity * @param ev the event object */ protected void processPingRequest(Sim_event ev) { InfoPacket pkt = (InfoPacket) ev.get_data(); pkt.setTag(GridSimTags.INFOPKT_RETURN); pkt.setDestID(pkt.getSrcID()); // sends back to the sender send(output, GridSimTags.SCHEDULE_NOW, GridSimTags.INFOPKT_RETURN, new IO_data(pkt, pkt.getSize(), pkt.getSrcID())); } }