package test.rewolucja.scheduling.implementation; import eduni.simjava.Sim_event; import gridsim.GridSimTags; import gridsim.Gridlet; import gridsim.gssim.GssimConstants; import gridsim.gssim.GssimTags; import gridsim.gssim.ResourceHistoryItem; import gridsim.gssim.SubmittedTask; import gridsim.gssim.policy.ARAllocationPolicy; import gssim.schedframe.scheduling.AbstractExecutable; import gssim.schedframe.scheduling.ExecTaskInterface; import gssim.schedframe.scheduling.Executable; import gssim.schedframe.scheduling.plugin.local.GssimTimeOperations; import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.joda.time.DateTime; import org.joda.time.DateTimeUtilsExt; import schedframe.exceptions.ReservationException; import schedframe.resources.units.ResourceUnit; import schedframe.scheduling.AbstractResourceRequirements; import schedframe.scheduling.AbstractTimeRequirements; import schedframe.scheduling.Offer; import schedframe.scheduling.Queue; import schedframe.scheduling.Reservation; import schedframe.scheduling.ResourceUsage; import schedframe.scheduling.TaskInterface; import schedframe.scheduling.TimeResourceAllocation; import schedframe.scheduling.events.ReservationActiveEvent; import schedframe.scheduling.events.SchedulingEvent; import schedframe.scheduling.events.SchedulingEventType; import schedframe.scheduling.events.SchedulingResponseType; import schedframe.scheduling.events.StartTaskExecutionEvent; import schedframe.scheduling.events.TaskRequestedTimeExpiredEvent; import schedframe.scheduling.plugin.SchedulingPluginConfiguration; import schedframe.scheduling.plugin.estimation.ExecTimeEstimationPlugin; import schedframe.scheduling.plugin.grid.Module; import schedframe.scheduling.plugin.local.LocalSchedulingARPlugin; import schedframe.scheduling.utils.ResourceParameterName; import test.rewolucja.reservation.LocalReservationManagerNew; import test.rewolucja.reservation.ReservationNew; import test.rewolucja.resources.description.ExecResourceDescription; import test.rewolucja.scheduling.plan.SchedulingPlanInterfaceNew; import test.rewolucja.scheduling.queue.GSSIMQueue; public class LocalARManagementSystem extends LocalManagementSystem { private Log log = LogFactory.getLog(ARAllocationPolicy.class); protected LocalSchedulingARPlugin arSchedulingPlugin; protected LocalReservationManagerNew reservationManager; public LocalARManagementSystem(String providerID, String entityName, String schedulingPluginClassName, ExecTimeEstimationPlugin execTimeEstimationPlugin, ExecResourceDescription resourceDescription) throws Exception { super(providerID, entityName, schedulingPluginClassName, execTimeEstimationPlugin, resourceDescription); this.arSchedulingPlugin = (LocalSchedulingARPlugin) this.schedulingPlugin; this.moduleList.add( new LocalReservationManagerNew(0, new GssimTimeOperations(), getName())); // this.reservationManager = new LocalReservationManagerNew(0, new GssimTimeOperations(), this.resName); for(int i = 0; i < moduleList.size();i++){ Module m = moduleList.get(i); switch(m.getType()){ case LOCAL_RESERVATION_MANAGER: reservationManager = (LocalReservationManagerNew) m; break; } } } public List getOffer(AbstractTimeRequirements timeRequirements, AbstractResourceRequirements resourceRequirements) throws ReservationException { updateProcessingProgress(); List result = null; LocalReservationManagerNew reservationManager = null; for(int i = 0; i < moduleList.size();i++){ Module m = moduleList.get(i); switch(m.getType()){ case LOCAL_RESERVATION_MANAGER: reservationManager = (LocalReservationManagerNew) m; break; } } result = arSchedulingPlugin.getOffers(timeRequirements, resourceRequirements, jobRegistry, queues, getResourceManager(), reservationManager); return result; } public ReservationNew createReservation( AbstractTimeRequirements timeRequirements, AbstractResourceRequirements resourceRequirements) throws ReservationException { updateProcessingProgress(); ReservationNew result = null; result = arSchedulingPlugin.createReservation(timeRequirements, resourceRequirements, jobRegistry, queues, getResourceManager(), reservationManager); return result; } public List createReservation(ResourceUsage resourceUsage) throws ReservationException { updateProcessingProgress(); List result = null; result = arSchedulingPlugin.createReservation(resourceUsage, jobRegistry, queues, getResourceManager(), reservationManager); return result; } public ReservationNew commitReservation(ReservationNew reservation) throws ReservationException { updateProcessingProgress(); ReservationNew result = null; result = arSchedulingPlugin.commitReservation(reservation, jobRegistry, queues, getResourceManager(), reservationManager); /* * if("23".equals(reservation.getJobId())){ int a = 1; } Processors * processors; try { processors = (Processors) * reservation.getAllocatedResource(). * getResourceUnit(ResourceParameterName.CPUCOUNT); if(processors != * null){ Processors choosenResources = * getResourceUnitsManager().chooseProcessors(processors); * getResourceUnitsManager().createResourceReservation(reservation, * choosenResources); } else { log.error("SOMETHING IS WRONG HERE. " + * "THERE SHOULD BE SOME RESOURCE REQUIREMENT IN THIS RESERVATION.\n" + * reservation); } * * } catch (NoSuchFieldException e) { e.printStackTrace(); } */ return result; } public ReservationNew commitReservation(ReservationNew reservation, TimeResourceAllocation resourceUsage) throws ReservationException { updateProcessingProgress(); ReservationNew result = null; result = arSchedulingPlugin.commitReservation(reservation, resourceUsage, jobRegistry, queues, getResourceManager(), reservationManager); return result; } public void modifyReservation(ReservationNew reservation, TimeResourceAllocation resourceUsage) throws ReservationException { updateProcessingProgress(); arSchedulingPlugin.modifyReservation(reservation, resourceUsage, jobRegistry, queues, getResourceManager(), reservationManager); } public void cancelReservation(ReservationNew reservation) throws ReservationException { updateProcessingProgress(); arSchedulingPlugin.cancelReservation(reservation, jobRegistry, queues, reservationManager); } public ReservationNew.Status checkStatus(ReservationNew reservation) throws ReservationException { updateProcessingProgress(); ReservationNew.Status status = arSchedulingPlugin.getStatus(reservation, reservationManager); return status; } public Collection getReservations(){ return this.reservationManager.getReservations(); } protected ExecTaskInterface findTask(Reservation r, List>> queues){ ExecTaskInterface task = null; for(int i = 0; i < queues.size() && task == null; i++){ Queue> queue = queues.get(i); for(int j = 0; j < queue.size() && task == null; j++){ TaskInterface t = queue.get(j); if(t.getJobId().equals(r.getJobId()) && t.getId().equals(r.getTaskId())){ task = (ExecTaskInterface)t; } } } return task; } private boolean checkFreeSlot(TaskInterface task){ SubmittedTask submittedTask = (SubmittedTask) task; if(submittedTask.getReservationID() != -1) return true; long expectedRuntime = getExpectedRuntime(task); if(expectedRuntime < 0) return false; int cpuCnt = 0; try { cpuCnt = Double.valueOf(submittedTask.getCpuCntRequest()).intValue(); } catch (NoSuchFieldException e) { cpuCnt = 1; } //local reservation manager sprawdza tylko zalozoene rezrwacje wiec potrzebuje poki co tylko czasow a nie ilosci cpu ResourceComponents processors = new ResourceComponents(this.getName()); //Processors processors = new Processors(this.resName, this.getResourceUnitsManager().getResourcesOfType(ResourceType.CPU), cpuCnt); DateTime startTime = new DateTime(); DateTime endTime = startTime.plusSeconds(Long.valueOf(expectedRuntime).intValue()); List usageList = null; try { usageList = reservationManager.resourceUsageList(processors, jobRegistry.getRunningTasks(), startTime, endTime); long reqDuration = (long)expectedRuntime * 1000; long duration = reqDuration; int reqCpu = cpuCnt; DateTime start = null; for(int i = 0; i < usageList.size() && duration > 0; i++){ TimeResourceAllocation allocation = usageList.get(i); ResourceUnit unit = allocation.getAllocatedResource(). getResourceUnit(ResourceParameterName.CPUCOUNT); if(unit.getFreeAmount() >= reqCpu){ duration = duration - (allocation.getEndMillis() - allocation.getStartMillis()); if(start == null) start = allocation.getStart(); } else { duration = reqDuration; start = null; } } if(duration > 0){ return false; } } catch (NoSuchFieldException e) { return false; } return true; } public long getExpectedRuntime(TaskInterface task){ SubmittedTask submittedTask = (SubmittedTask) task; long expectedDuration; try { expectedDuration = submittedTask.getExpectedDuration().getMillis()/1000; } catch (NoSuchFieldException e) { //log.warn(e.getMessage()); // assume that task without defined requested time will not be performed; return -1; //expectedDuration = execTimeEstimation(task); //if executing time for a given task is not defined then assume that it //can be performed anyway ant set expected duration to 0 //expectedDuration = 0; } long expectedRuntime = -1; if(task.getStatus() == Gridlet.READY) expectedRuntime = expectedDuration; else if(task.getStatus() == Gridlet.INEXEC) { double execStartTime = submittedTask.getExecStartTime(); long currentTimeMillis = new DateTime().getMillis()/1000; expectedRuntime = (Double.valueOf(execStartTime).longValue() + expectedDuration - currentTimeMillis); } return expectedRuntime; } public void processOtherEvent(Sim_event ev){ updateProcessingProgress(); int tag = ev.get_tag(); Object obj; switch(tag){ case GssimTags.TIMER: if(pluginSupportsEvent(tag)){ SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER); arSchedulingPlugin.schedule(event, jobRegistry, queues, getResourceManager(), this.moduleList); arSchedulingPlugin.schedule(event, jobRegistry, queues, getResourceManager(), this.moduleList); } SchedulingPluginConfiguration config = (SchedulingPluginConfiguration)schedulingPlugin.getConfiguration(); if(config != null){ Map events = config.getServedEvents(); if(events != null){ int delay = (Integer) events.get(SchedulingEventType.TIMER); getLogicalResource().sendInternal( delay, GssimTags.TIMER, null); //this.timerHandler.notify(delay, null); } } break; case GssimTags.TASK_READY_FOR_EXECUTION: Executable data = (Executable) ev.get_data(); System.out.println("oooooooooooo"+data.getJobId()); try { data.setGridletStatus(Gridlet.READY); /* Reservation.Status r_stat = this.reservationManager.getReservStatus(data.getReservationId()); if(r_stat == Reservation.Status.ACTIVE){ ResourceUnitsManagerImpl manager = getResourceUnitsManager(); if(!manager.areResourcesReserved(data)){ Map resources = manager.chooseResourcesFor(data); if(resources != null) manager.createResourceReservation(data, resources); } } */ if(pluginSupportsEvent(tag)){ SchedulingEvent event = new StartTaskExecutionEvent(data.getJobId(), data.getId()); SchedulingPlanInterfaceNew decision= arSchedulingPlugin.schedule(event, jobRegistry, queues, getResourceManager(), this.moduleList); arSchedulingPlugin.schedule(event, jobRegistry, queues, getResourceManager(), this.moduleList); executeSchedulingPlan(decision); } } catch (Exception e) { e.printStackTrace(); } break; case GssimTags.TASK_EXECUTION_FINISHED: obj = ev.get_data(); SubmittedTask t = (SubmittedTask) obj; if(t.getStatus() == Gridlet.INEXEC){ t.setGridletStatus(Gridlet.SUCCESS); t.finalizeGridlet(); System.out.println("finished"+t.getJobId()); // this.finishedTasks.put(t.getJobId()+"_"+t.getId(), t); super.sendFinishJob((AbstractExecutable)t.getGridlet()); if(pluginSupportsEvent(tag)) { SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED); SchedulingPlanInterfaceNew decision=arSchedulingPlugin.schedule(event, jobRegistry, queues, getResourceManager(), this.moduleList); executeSchedulingPlan(decision); } } break; case GridSimTags.AR_STATUS_ACTIVE: obj = ev.get_data(); String resId = (String) obj; List list = reservationManager.getReservations(); for(int i = 0; i < list.size(); i++){ ReservationNew r = list.get(i); if(r.getId().equals(resId)){ r.setStatus(ReservationNew.Status.ACTIVE); long delay = (r.getEndMillis() - DateTimeUtilsExt.currentTimeMillis()) / 1000; log.info("reservation for " + r.getJobId() + " ends after: " + delay); // ExecTaskInterface task = findTask(r, queues); // task may not be submitted to the resource yet. This may happen when task submit time // and reservation start time are equal // if(task != null){ // Map choosenResources = getResourceUnitsManager().chooseResourcesFor(task); // getResourceUnitsManager().createResourceReservation(task, choosenResources); // } getLogicalResource().sendInternal(Long.valueOf(delay).doubleValue(), GssimTags.AR_STATUS_COMPLETED, r); if(pluginSupportsEvent(tag)){ SchedulingPlanInterfaceNew decision= arSchedulingPlugin.schedule(new ReservationActiveEvent(r), jobRegistry, queues, getResourceManager(), this.moduleList); executeSchedulingPlan(decision); } break; } } break; case GridSimTags.AR_STATUS_EXPIRED: log.error("Implement reservation expiration handler"); break; case GridSimTags.AR_STATUS_COMPLETED: ReservationNew reservation = (ReservationNew) ev.get_data(); reservation.setStatus(ReservationNew.Status.FINISHED); log.debug("finishing reservation: " + reservation); String jt = reservation.getJobId() + "_" + reservation.getTaskId(); /* SubmittedTask task = this.finishedTasks.get(jt); if(task != null){ List> lastUsed = task.getUsedResources(); getResourceUnitsManager().removeResourceReservation(lastUsed.get(lastUsed.size() - 1)); this.finishedTasks.remove(jt); return; } */ int index; for(index = 0; index < jobRegistry.getRunningTasks().size(); index++){ SubmittedTask st = (SubmittedTask) jobRegistry.getRunningTasks().get(index); if(st.getReservationID() == Integer.valueOf(reservation.getId()).intValue()){ Integer gridletId = Integer.valueOf(st.getGridletID()); Map gridletHistory = history.get(gridletId); DateTime expectedEndTime = (DateTime) gridletHistory.get(GssimConstants.END_TIME); DateTime currentTime = new DateTime(); if(expectedEndTime.isAfter(currentTime)){ st.setGridletStatus(Gridlet.CANCELED); st.finalizeGridlet(); gridletHistory.put(GssimConstants.END_TIME, currentTime); log.debug("stop task execution: " + jt); super.sendCancelJob(GridSimTags.GRIDLET_CANCEL, (AbstractExecutable)st.getGridlet(), gridletId, st.getUserID()); List lastUsed = st.getUsedResources(); //getResourceUnitsManager().removeResourceReservation(lastUsed.get(lastUsed.size() - 1).getResourceUnits()); return; } } } GSSIMQueue queue = queues.get(0); String jobId = reservation.getJobId(); String taskId = reservation.getTaskId(); for(int i = 0; i < queue.size(); i++){ ExecTaskInterface queuedTask = (ExecTaskInterface) queue.get(i); System.out.println(jobId+";"+taskId); if(jobId.equals(queuedTask.getJobId()) && taskId.equals(queuedTask.getId())){ System.out.println(jobId+";"+taskId+ "failed"); SubmittedTask st = (SubmittedTask) queuedTask; st.setGridletStatus(Gridlet.FAILED); st.finalizeGridlet(); queue.remove(i); log.debug("task didnt event start. remove from queue"); super.sendCancelJob(GridSimTags.GRIDLET_CANCEL, (AbstractExecutable)st.getGridlet(), st.getGridletID(), st.getUserID()); break; } } break; case GssimTags.TASK_REQUESTED_TIME_EXPIRED: obj = ev.get_data(); t = (SubmittedTask) obj; if(pluginSupportsEvent(tag)) { SchedulingEvent event = new TaskRequestedTimeExpiredEvent(t.getJobId(), t.getId()); SchedulingResponseType responseEvent = arSchedulingPlugin.handleResourceAllocationViolation(event, jobRegistry, queues, getResourceManager(), this.moduleList); if(responseEvent == SchedulingResponseType.TERMINATE_TASK){ if(t.getStatus() == Gridlet.INEXEC){ t.setGridletStatus(Gridlet.SUCCESS); t.finalizeGridlet(); Integer gridletId = Integer.valueOf(t.getGridletID()); Map gridletHistory = history.get(gridletId); DateTime currentTime = new DateTime(); gridletHistory.put(GssimConstants.END_TIME, currentTime); super.sendFinishJob((AbstractExecutable)t.getGridlet()); } } else if(responseEvent == SchedulingResponseType.KILL_TASK){ if(t.getStatus() == Gridlet.INEXEC){ t.setGridletStatus(Gridlet.CANCELED); t.finalizeGridlet(); Integer gridletId = Integer.valueOf(t.getGridletID()); Map gridletHistory = history.get(gridletId); DateTime currentTime = new DateTime(); gridletHistory.put(GssimConstants.END_TIME, currentTime); log.debug("stop task execution: " + t.getJobId() + "_" + t.getId()); super.sendCancelJob(GridSimTags.GRIDLET_CANCEL, (AbstractExecutable)t.getGridlet(), gridletId, t.getUserID()); } } else if(responseEvent == SchedulingResponseType.STOP_AND_RESUME_FROM_CHECKPOINT){ if(t.getStatus() == Gridlet.INEXEC){ jobPause(t.getGridletID(), t.getUserID(), false); jobResume(t.getGridletID(), t.getUserID(), false); this.communicationInterface.cancel(GssimTags.TASK_REQUESTED_TIME_EXPIRED, t); } } else if(responseEvent == SchedulingResponseType.ONE_HOUR_GRACE_PERIOD){ if(t.getStatus() == Gridlet.INEXEC){ this.getLogicalResource().sendInternal(Long.valueOf(3600).doubleValue(), GssimTags.TASK_REQUESTED_TIME_EXPIRED, t); return; } } else if(responseEvent == SchedulingResponseType.EXECUTE_ANYWAY){ if(t.getStatus() == Gridlet.INEXEC){ return; } } if(pluginSupportsEvent(GssimTags.TASK_EXECUTION_FINISHED)){ SchedulingEvent newEvent = new SchedulingEvent(SchedulingEventType.TASK_FINISHED); arSchedulingPlugin.schedule(newEvent, jobRegistry, queues, getResourceManager(), this.moduleList); arSchedulingPlugin.schedule(newEvent, jobRegistry, queues, getResourceManager(), this.moduleList); } } break; } } }