source: xssim/trunk/src/test/rewolucja/GSSimUsersNew.java @ 104

Revision 104, 9.8 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package test.rewolucja;
2
3import eduni.simjava.Sim_event;
4import eduni.simjava.Sim_system;
5import gridsim.GridSim;
6import gridsim.GridSimTags;
7import gridsim.Gridlet;
8import gridsim.IO_data;
9import gridsim.gssim.GssimConstants;
10import gridsim.gssim.network.ExtendedGridSim;
11import gridsim.net.InfoPacket;
12import grms.shared.constants.BrokerConstants;
13import gssim.schedframe.scheduling.utils.JobDescription;
14import gssim.schedframe.scheduling.utils.TaskDescription;
15
16import java.util.ArrayList;
17import java.util.HashSet;
18import java.util.Iterator;
19import java.util.List;
20import java.util.Map;
21import java.util.Set;
22import java.util.TreeMap;
23
24import org.apache.commons.logging.Log;
25import org.apache.commons.logging.LogFactory;
26import org.joda.time.DateTime;
27
28import schedframe.scheduling.Job;
29import schedframe.scheduling.JobInterface;
30import schedframe.scheduling.Task;
31import schedframe.scheduling.TaskInterface;
32import simulator.GenericUser;
33import simulator.utils.XsltTransformations;
34import simulator.workload.WorkloadLoader;
35
36public class GSSimUsersNew extends ExtendedGridSim implements GenericUser {
37
38        /**A job generator, which produces jobs and tasks. These jobs are then sent by this entity */
39        protected WorkloadLoader workloadLoader;
40       
41        /** The name of the entity, to which the created tasks will be sent */
42        protected String destName;
43       
44        /** Indicates, that all tasks that returned to this object are finished */
45        protected boolean allTasksAreFinished;
46       
47        /** Stores the list of jobs, that have been returned to this object */
48        protected List<JobInterface<?>> returnedJobs;
49        protected Set<String> sendJobsIds;
50        protected Set<String> returnedJobsIds;
51       
52        protected XsltTransformations xsltTransformer;
53       
54        /**
55         * Indicates, that an error has occurred - it is used for debug purposes
56         */
57        protected boolean error;
58       
59        private static Log log = LogFactory.getLog(GSSimUsersNew.class);
60       
61        /**
62         * Constructs the users object with the given parameters
63         * @param name the name of the users entity (must be unique across all entities in the whole simulation)
64         * @param destinationName the name of the entity, to which the created tasks will be sent
65         * @param jobGenerator the job generator, which produces jobs and tasks, that will be sent by this class
66         * @throws Exception if any occurs (see {@link GridSim#GridSim(String, double)})
67         */
68        public GSSimUsersNew(String name, String destinationName, WorkloadLoader workload) throws Exception {
69                super(name, GssimConstants.DEFAULT_BAUD_RATE);
70                this.workloadLoader = workload;
71                destName = destinationName;
72                allTasksAreFinished = true;
73                error = false;
74               
75                sendJobsIds = new HashSet<String>();
76                returnedJobsIds = new HashSet<String>();
77               
78                this.xsltTransformer = new XsltTransformations();
79        }
80
81        @Override
82        public void body() {
83                sendJobs();
84                collectJobs();
85
86                //GridSim dependent code for shutting down the simulation
87                shutdownGridStatisticsEntity();
88                terminateIOEntities();
89                shutdownUserEntity();
90        }
91
92        /**
93         * Collects the jobs sent to broker(s)
94         */
95        protected void collectJobs() {
96                final int FACTOR = Math.min(10, workloadLoader.getTaskCount()); //the refresh rate of the gauge: at most 10 times
97                final int denominator = workloadLoader.getTaskCount() / FACTOR;
98                allTasksAreFinished = true;
99                returnedJobs = new ArrayList<JobInterface<?>>();
100                int counter = 0;
101                int oldRemeinder = 0;
102                boolean hundredPercent = false;
103               
104                Sim_event ev = new Sim_event();
105                sim_get_next(ev);
106                while (Sim_system.running()) {
107                        switch (ev.get_tag()) {
108                        case GridSimTags.END_OF_SIMULATION:
109                                //no action
110                                break;
111                       
112                        case GridSimTags.INFOPKT_SUBMIT:
113                                processPingRequest(ev);
114                                break;
115                               
116                        case GridSimTags.GRIDLET_RETURN:
117                                JobInterface<?> returnedJob = (JobInterface<?>) ev.get_data();
118                               
119                                String jobId = null;
120                                        try {
121                                                jobId = returnedJob.getId();
122                                        } catch (NoSuchFieldException e) {
123                                                // TODO Auto-generated catch block
124                                                e.printStackTrace();
125                                        }
126                               
127                                if (returnedJobs.contains(returnedJob)) {
128                                        if(log.isErrorEnabled())
129                                                log.error("Received the same job twice (job ID " + jobId + ")");
130                                        error = true;
131                                        break;
132                                }
133                               
134                                returnedJobs.add(returnedJob);
135                                returnedJobsIds.add(jobId);
136                               
137                                if(returnedJob.getStatus() == BrokerConstants.JOB_STATUS_FINISHED) {
138                                        if(log.isDebugEnabled())
139                                                log.debug("Received finished job " + jobId);
140                                       
141                                } else {
142                                        if(returnedJob.getStatus() == BrokerConstants.JOB_STATUS_CANCELED) {
143                                                if(log.isWarnEnabled()){
144                                                        String str = "Warning! An uncomplished job (Job ID: "+jobId+") has returned to users. Job was canceled.";
145                                                        log.warn(str);
146                                                }
147                                        }
148                                       
149                                        allTasksAreFinished = false;
150                                }
151                               
152                                counter += returnedJob.getTaskCount();
153                                int remainder = (counter / denominator);
154                                if (remainder != oldRemeinder) {
155                                        int gauge = ((counter * 100) / (workloadLoader.getTaskCount()));
156                                        if(log.isInfoEnabled())
157                                                log.info(gauge + "% ");
158                                       
159                                        oldRemeinder = remainder;
160                                        if (gauge == 100)
161                                                hundredPercent = true;
162                                }
163                       
164                                break;
165                        }
166                       
167                        //if all the Gridlets have been collected
168                        if (counter == workloadLoader.getTaskCount()) {
169                                break;
170                        }
171                       
172                        sim_get_next(ev);
173                }
174               
175                //if all the Gridlets have been collected
176                if (counter == workloadLoader.getTaskCount()) {
177                        if (! hundredPercent) {
178                                if(log.isInfoEnabled())
179                                        log.info("100%");
180                        }
181                        if(log.isInfoEnabled())
182                                log.info(get_name() + ": Received all "+workloadLoader.getJobCount()+" jobs and " + counter + " tasks");                       
183                } else {
184                        if(log.isErrorEnabled())
185                                log.error(get_name() + ": ERROR DID NOT RECEIVED all tasks - some tasks were not finished! (received "+counter+" of "+workloadLoader.getTaskCount()+")");
186                }
187               
188                Iterator <String> itr = sendJobsIds.iterator();
189                String jobId;
190                if(log.isInfoEnabled()){
191                        log.info("Missing tasks: ");
192                        while(itr.hasNext()){
193                                jobId = itr.next();
194                                if(!returnedJobsIds.contains(jobId)){
195                                        log.info(jobId + ", ");
196                                }
197                        }
198                }
199               
200        }
201
202        /**
203         * Sends jobs to broker entities
204         */
205        protected void sendJobs() {
206               
207                Map<Long, List<Job>> jobTimes = new TreeMap<Long, List<Job>>();
208                int destID = GridSim.getEntityId(destName);
209                destID=GridSim.getEntityId("BROKER@COMPUTING_GRID_0");
210                List<JobDescription> jobs = workloadLoader.getJobs();
211                //TaskRequirements taskReq = new TaskRequirementsImpl();
212                //double values[] = null;
213
214                for (JobDescription job : jobs) {
215                        long l_submissionTime = Long.MAX_VALUE;
216                        //pick the lowest submission time
217                        for (TaskDescription task : job) {
218                                if (task.getSubmissionTime() < l_submissionTime)
219                                        l_submissionTime = task.getSubmissionTime();
220                        }
221
222                        //store the submission time expressed in seconds after the simulation start time
223                        long submissionTime = l_submissionTime;
224                       
225                        Job newJob = prepareJob(job, submissionTime);
226                       
227                        List<Job> list = jobTimes.get(submissionTime);
228                        if (list == null) {
229                                list = new ArrayList<Job>();
230                                jobTimes.put(submissionTime, list);
231                        }
232                        list.add(newJob);
233                }
234                for (Long submissionTime : jobTimes.keySet()) {
235                        List<Job> list = jobTimes.get(submissionTime);
236                        for(int i = 0; i < list.size(); i++){
237                                this.sendJobsIds.add(list.get(i).getId());
238                                send(destID, submissionTime, GridSimTags.GRIDLET_SUBMIT, list.get(i));
239                        }
240                       
241                        //send(output, submissionTime, GridSimTags.GRIDLET_SUBMIT, new IO_data(list, GssConstants.DEFAULT_GRIDLET_SIZE, destID));
242                        //send(destID, submissionTime, GridSimTags.GRIDLET_SUBMIT, list);
243                       
244                }
245                System.out.println("finished sending jobs");
246        }
247       
248        public List<JobDescription> getAllSentJobs() {
249                return (List<JobDescription>) workloadLoader.getJobs();
250        }
251       
252        public List<TaskDescription> getAllSentTasks() {
253                List<TaskDescription> result = new ArrayList<TaskDescription>();
254                List<JobDescription> sentJobs = getAllSentJobs();
255                for (JobDescription job : sentJobs) {
256                        result.addAll(job);
257                }
258                return result;
259        }
260       
261        public List<JobInterface<?>> getAllReceivedJobs() {
262                return returnedJobs;
263        }
264
265        public int getFinishedTasksCount() {
266                int result = 0;
267                for (JobInterface<?> job : returnedJobs) {
268                        for (TaskInterface<?> task: job.getTask()) {
269                                if(task.getStatus() == Gridlet.SUCCESS)
270                                        result++;
271                        }
272                }
273                return result;
274        }
275       
276        public String getUserName() {
277                return get_name();
278        }
279       
280        public boolean isError() {
281                return error;
282        }
283       
284        /**
285         * Performs action concerning a ping request to this entity
286         * @param ev the event object
287         */
288        protected void processPingRequest(Sim_event ev) {
289        InfoPacket pkt = (InfoPacket) ev.get_data();
290        pkt.setTag(GridSimTags.INFOPKT_RETURN);
291        pkt.setDestID(pkt.getSrcID());
292
293                // sends back to the sender
294                send(output, GridSimTags.SCHEDULE_NOW, GridSimTags.INFOPKT_RETURN,
295                                new IO_data(pkt, pkt.getSize(), pkt.getSrcID()));
296    }
297               
298        protected Job prepareJob(JobDescription jobDescription, long submissionTime){
299
300                Job newJob = new Job(jobDescription.getJobId());
301                DateTime submitionTime = new DateTime();
302        submitionTime = submitionTime.plusMillis((int)submissionTime*1000);
303                        try {
304                               
305                                // transform job description to resource requirements
306
307                                newJob.setSenderId(this.get_id());
308                               
309                                for (TaskDescription taskDescription : jobDescription) {
310                                       
311                                        String xmlResReq = this.xsltTransformer.taskToResourceRequirements(
312                                                                                taskDescription.getDocument(),
313                                                                                jobDescription.getJobId(),
314                                                                                taskDescription.getUserDn(),
315                                                                                submitionTime);
316
317                                        Task newTask = new Task(xmlResReq);
318                                        newTask.setSenderId(this.get_id());
319                                        newTask.setStatus((int)BrokerConstants.TASK_STATUS_UNSUBMITTED);
320                                        newTask.setLength(taskDescription.getTaskLength());
321                                        newTask.setWorkloadLogWaitTime(taskDescription.getWorkloadLogWaitTime());
322                        //              newTask.setSubmissionTime(taskDescription.getSubmissionTime());
323                                       
324                                        newJob.add(newTask);
325                                }
326
327                                jobDescription.discardUnused();
328                               
329                        } catch (Exception e){
330                                log.error(e.getMessage());
331                                e.printStackTrace();
332                        }
333               
334                return newJob;
335        }
336}
Note: See TracBrowser for help on using the repository browser.