package test.rewolucja.modules; import eduni.simjava.Sim_event; import gridsim.GridSim; import gridsim.Gridlet; import gridsim.IO_data; import gridsim.gssim.GssimTags; import gridsim.gssim.SubmittedTask; import gridsim.gssim.network.ExtendedGridSimTags; import gssim.schedframe.scheduling.plugin.grid.network.manager.NetworkData; import gssim.schedframe.scheduling.plugin.grid.network.manager.SendOrder; import gssim.schedframe.scheduling.plugin.grid.network.manager.TaskInputFile; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Properties; import schedframe.exceptions.ModuleException; import schedframe.scheduling.plugin.grid.ModuleType; import test.rewolucja.extensions.Extension; import test.rewolucja.extensions.ExtensionException; import test.rewolucja.extensions.ExtensionType; import test.rewolucja.resources.test.Event; import test.rewolucja.scheduling.JobRegistry; import test.rewolucja.scheduling.implementation.LocalManagementSystem; public class DataManager implements Extension{ protected HashMap task_inputFiles; protected List receivedNetData; LocalManagementSystem managementSystem; String resName; public DataManager(LocalManagementSystem managementSystem, String resName) { super(); this.managementSystem = managementSystem; task_inputFiles = new HashMap(); receivedNetData = new ArrayList(); this.resName = resName; } protected void processFileReceive(Sim_event ev) { NetworkData t = (NetworkData)ev.get_data(); TaskInputFile tif = (TaskInputFile)t.getData(); //log.info(get_name() + " received file"); System.out.println("---------------------FILE RECEIVED---------------- "); if(task_inputFiles.containsKey(tif.getJobID() + "_" + tif.getTaskID())) { ((HashSet)task_inputFiles.get(tif.getJobID() + "_" + tif.getTaskID())).add(tif.getFileName()); }else{ HashSet receivedFiles = new HashSet(); receivedFiles.add(tif.getFileName()); task_inputFiles.put(tif.getJobID() + "_" + tif.getTaskID(), receivedFiles); } SubmittedTask subTask = new JobRegistry(resName).getSubmittedTask(tif.getJobID(), tif.getTaskID()); if(subTask != null && checkInputFiles(subTask) && subTask.getGridletStatus() == Gridlet.QUEUED){ this.managementSystem.getLogicalResource().sendInternal( Long.valueOf(0).doubleValue(), GssimTags.TASK_READY_FOR_EXECUTION, subTask.getGridlet()); } receivedNetData.add(t); } protected boolean checkInputFiles(SubmittedTask subTask){ int nrOfRequestedFiles = 0; try{ nrOfRequestedFiles = subTask.getDescription().getExecution().getStageInOut().getStageInOutItem().length; } catch(Exception e){ e.printStackTrace(); } HashSet requestedFiles = new HashSet(); for(int i = 0; i < nrOfRequestedFiles; i++){ String fileName = subTask.getDescription().getExecution().getStageInOut().getStageInOutItem(i).getFile().getName(); requestedFiles.add(fileName); } HashSet tempSet = (HashSet)task_inputFiles.get(subTask.getDescription().getJobId() + "_" + subTask.getDescription().getTaskId()); //log.info("Task " + subTask.getJobId() + "_"+ subTask.getId() + " - received input files " + tempSet); System.out.println("task input files " + tempSet); //log.info("Task " + subTask.getJobId() + "_"+ subTask.getId() + " - requested files " + requestedFiles); System.out.println("requested files " + requestedFiles); if(tempSet == null || !tempSet.containsAll(requestedFiles)) { return false; } //log.info(get_name() + " received all requested files for task " + subTask.getJobId() + "_"+ subTask.getId()); System.out.println(resName+"---------------------ALL FILES RECEIVED---------------- "+subTask.getJobId()); //System.out.println(GridSim.clock()+";"+VirtualClock.getStaticCurrentDateTimeMillis()+";"+ new DateTime(VirtualClock.getStaticCurrentDateTimeMillis())); return true; } protected void sendFile(Sim_event ev) { SendOrder sendOrder = (SendOrder)ev.get_data(); //log.info(get_name() + " sending file to " + sendOrder.getRecipientName()); System.out.println(resName + " SENDING FILE TO " + sendOrder.getRecipientName()); //System.out.println(GridSim.clock()+";"+VirtualClock.getStaticCurrentDateTimeMillis()+";"+ new DateTime(VirtualClock.getStaticCurrentDateTimeMillis())); //NetworkData resData = new NetworkData(sendOrder.getTaskInputFile(), sendOrder.getRoute(), sendOrder.getReservationID()); NetworkData resData = sendOrder; IO_data data = new IO_data(resData, sendOrder.getFileSize(), GridSim.getEntityId(sendOrder.getRecipientName())); if(sendOrder.getRecipientName().compareTo(resName) == 0) managementSystem.getLogicalResource().send(GridSim.getEntityId(resName), ExtendedGridSimTags.SCHEDULE_NOW, ExtendedGridSimTags.FILE, resData); else { String recipientName = sendOrder.getRecipientName(); int recipientId = GridSim.getEntityId(recipientName); if(sendOrder.getReservationID() > 0 && recipientId != -1) { managementSystem.getLogicalResource().send(recipientId, ExtendedGridSimTags.SCHEDULE_NOW, ExtendedGridSimTags.FLOW_RESERVATION, resData); } else { managementSystem.getLogicalResource().send(recipientId, ExtendedGridSimTags.SCHEDULE_NOW, ExtendedGridSimTags.FLOW_SUBMIT, resData); } } } public List getReceivedNetData() { return receivedNetData; } @Override public void init(Properties properties) throws ExtensionException { // TODO Auto-generated method stub } @Override public void dispose() throws ExtensionException { // TODO Auto-generated method stub } @Override public ExtensionType getType() { return ExtensionType.DATA_MANAGER; } @Override public boolean supportsEvent(Event event) { switch (event.getTag()) { case ExtendedGridSimTags.SEND_ORDER: return true; case ExtendedGridSimTags.FILE: return true; case ExtendedGridSimTags.FLOW_SUBMIT: return true; case ExtendedGridSimTags.FLOW_RESERVATION: return true; default: return false; } } @Override public void handleEvent(Event event) { // TODO Auto-generated method stub switch (event.getTag()) { case ExtendedGridSimTags.SEND_ORDER: //sendFile(event.getTag()); break; case ExtendedGridSimTags.FILE: case ExtendedGridSimTags.FLOW_SUBMIT: case ExtendedGridSimTags.FLOW_RESERVATION: //processFileReceive(event.getTag()); break; } } }