source: xssim/trunk/src/simulator/GSSimUsers.java @ 104

Revision 104, 10.0 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package simulator;
2
3import eduni.simjava.Sim_event;
4import eduni.simjava.Sim_system;
5import gridsim.GridSim;
6import gridsim.GridSimTags;
7import gridsim.Gridlet;
8import gridsim.IO_data;
9import gridsim.gssim.GssimConstants;
10import gridsim.gssim.network.ExtendedGridSim;
11import gridsim.net.InfoPacket;
12import grms.shared.constants.BrokerConstants;
13import org.qcg.broker.schemas.resreqs.ResourceRequirements;
14import gssim.schedframe.scheduling.AbstractExecutable;
15import gssim.schedframe.scheduling.utils.JobDescription;
16import gssim.schedframe.scheduling.utils.TaskDescription;
17
18import java.util.ArrayList;
19import java.util.HashSet;
20import java.util.Iterator;
21import java.util.List;
22import java.util.Map;
23import java.util.Set;
24import java.util.TreeMap;
25
26import org.apache.commons.logging.Log;
27import org.apache.commons.logging.LogFactory;
28
29import schedframe.scheduling.JobInterface;
30import schedframe.scheduling.TaskInterface;
31import simulator.workload.WorkloadLoader;
32
33import gridsim.net.Link;
34
35/**
36 * @author Stanislaw Szczepanowski
37 * @param <SubJob> a generic class that extends the functionality of {@link Job} interface
38 */
39public class GSSimUsers extends ExtendedGridSim implements GenericUser {
40
41        /**A job generator, which produces jobs and tasks. These jobs are then sent by this entity */
42        protected WorkloadLoader workloadLoader;
43       
44        /** The name of the entity, to which the created tasks will be sent */
45        protected String destName;
46       
47        /** Indicates, that all tasks that returned to this object are finished */
48        protected boolean allTasksAreFinished;
49       
50        /** Stores the list of jobs, that have been returned to this object */
51        protected List<JobInterface<?>> returnedJobs;
52        protected Set<String> sendJobsIds;
53        protected Set<String> returnedJobsIds;
54       
55        /**
56         * Indicates, that an error has occurred - it is used for debug purposes
57         */
58        protected boolean error;
59       
60        private static Log log = LogFactory.getLog(GSSimUsers.class);
61       
62        /**
63         * Constructs the users object with the given parameters
64         * @param name the name of the users entity (must be unique across all entities in the whole simulation)
65         * @param destinationName the name of the entity, to which the created tasks will be sent
66         * @param jobGenerator the job generator, which produces jobs and tasks, that will be sent by this class
67         * @throws Exception if any occurs (see {@link GridSim#GridSim(String, double)})
68         */
69        public GSSimUsers(String name, String destinationName, WorkloadLoader workload) throws Exception {
70                super(name, GssimConstants.DEFAULT_BAUD_RATE);
71                this.workloadLoader = workload;
72                destName = destinationName;
73                allTasksAreFinished = true;
74                error = false;
75               
76                sendJobsIds = new HashSet<String>();
77                returnedJobsIds = new HashSet<String>();
78        }
79
80        /**
81         * Constructs the users object with the given parameters
82         * @param name the name of the users entity (must be unique across all entities in the whole simulation)
83         * @param link the connection between users and broker
84         * @param destinationName the name of the entity, to which the created tasks will be sent
85         * @param jobGenerator the job generator, which produces jobs and tasks, that will be sent by this class
86         * @throws Exception if any occurs (see {@link GridSim#GridSim(String, double)})
87         */
88        public GSSimUsers(String name, Link link, String destinationName, WorkloadLoader workload) throws Exception {
89                super(name, link);
90                this.workloadLoader = workload;
91                destName = destinationName;
92                allTasksAreFinished = true;
93                error = false;
94               
95                sendJobsIds = new HashSet<String>();
96                returnedJobsIds = new HashSet<String>();
97        }
98        @Override
99        public void body() {
100                sendJobs();
101                collectJobs();
102
103                //GridSim dependent code for shutting down the simulation
104                shutdownGridStatisticsEntity();
105                terminateIOEntities();
106                shutdownUserEntity();
107        }
108
109        /**
110         * Collects the jobs sent to broker(s)
111         */
112        protected void collectJobs() {
113                final int FACTOR = Math.min(10, workloadLoader.getTaskCount()); //the refresh rate of the gauge: at most 10 times
114                final int denominator = workloadLoader.getTaskCount() / FACTOR;
115                allTasksAreFinished = true;
116                returnedJobs = new ArrayList<JobInterface<?>>();
117                int counter = 0;
118                int oldRemeinder = 0;
119                boolean hundredPercent = false;
120               
121                Sim_event ev = new Sim_event();
122                sim_get_next(ev);
123                while (Sim_system.running()) {
124                        switch (ev.get_tag()) {
125                        case GridSimTags.END_OF_SIMULATION:
126                                //no action
127                                break;
128                       
129                        case GridSimTags.INFOPKT_SUBMIT:
130                                processPingRequest(ev);
131                                break;
132                               
133                        case GridSimTags.GRIDLET_RETURN:
134                                JobInterface<?> returnedJob = (JobInterface<?>) ev.get_data();
135                               
136                                String jobId = null;
137                                        try {
138                                                jobId = returnedJob.getId();
139                                        } catch (NoSuchFieldException e) {
140                                                // TODO Auto-generated catch block
141                                                e.printStackTrace();
142                                        }
143                               
144                                if (returnedJobs.contains(returnedJob)) {
145                                        if(log.isErrorEnabled())
146                                                log.error("Received the same job twice (job ID " + jobId + ")");
147                                        error = true;
148                                        break;
149                                }
150                               
151                                returnedJobs.add(returnedJob);
152                                returnedJobsIds.add(jobId);
153                               
154                                if(returnedJob.getStatus() == BrokerConstants.JOB_STATUS_FINISHED) {
155                                        if(log.isDebugEnabled())
156                                                log.debug("Received finished job " + jobId);
157                                       
158                                } else {
159                                        if(returnedJob.getStatus() == BrokerConstants.JOB_STATUS_CANCELED) {
160                                                if(log.isWarnEnabled()){
161                                                        String str = "Warning! An uncomplished job (Job ID: "+jobId+") has returned to users. Job was canceled.";
162                                                        log.warn(str);
163                                                }
164                                        }
165                                       
166                                        allTasksAreFinished = false;
167                                }
168                               
169                                counter += returnedJob.getTaskCount();
170                                int remainder = (counter / denominator);
171                                if (remainder != oldRemeinder) {
172                                        int gauge = ((counter * 100) / (workloadLoader.getTaskCount()));
173                                        if(log.isInfoEnabled())
174                                                log.info(gauge + "% ");
175                                       
176                                        oldRemeinder = remainder;
177                                        if (gauge == 100)
178                                                hundredPercent = true;
179                                }
180                       
181                                break;
182                        }
183                       
184                        //if all the Gridlets have been collected
185                        if (counter == workloadLoader.getTaskCount()) {
186                                break;
187                        }
188                       
189                        sim_get_next(ev);
190                }
191               
192                //if all the Gridlets have been collected
193                if (counter == workloadLoader.getTaskCount()) {
194                        if (! hundredPercent) {
195                                if(log.isInfoEnabled())
196                                        log.info("100%");
197                        }
198                        if(log.isInfoEnabled())
199                                log.info(get_name() + ": Received all "+workloadLoader.getJobCount()+" jobs and " + counter + " tasks");                       
200                } else {
201                        if(log.isErrorEnabled())
202                                log.error(get_name() + ": ERROR DID NOT RECEIVED all tasks - some tasks were not finished! (received "+counter+" of "+workloadLoader.getTaskCount()+")");
203                }
204               
205                Iterator <String> itr = sendJobsIds.iterator();
206                String jobId;
207                if(log.isInfoEnabled()){
208                        log.info("Missing tasks: ");
209                        while(itr.hasNext()){
210                                jobId = itr.next();
211                                if(!returnedJobsIds.contains(jobId)){
212                                        log.info(jobId + ", ");
213                                }
214                        }
215                }
216               
217        }
218
219        /**
220         * Sends jobs to broker entities
221         */
222        protected void sendJobs() {
223                Map<Double, List<JobDescription>> jobTimes = new TreeMap<Double, List<JobDescription>>();
224               
225                List<JobDescription> jobs = workloadLoader.getJobs();
226                //TaskRequirements taskReq = new TaskRequirementsImpl();
227                //double values[] = null;
228               
229                for (JobDescription job : jobs) {
230                        long l_submissionTime = Long.MAX_VALUE;
231                        //pick the lowest submission time
232                        for (TaskDescription task : job) {
233                                if (task.getSubmissionTime() < l_submissionTime)
234                                        l_submissionTime = task.getSubmissionTime();
235                        }
236
237                        //store the submission time expressed in seconds after the simulation start time
238                        double submissionTime = l_submissionTime;
239                       
240               
241                /*      JobGridlet jg = (JobGridlet) job;
242                        jg.setUserID(get_id());
243                        jg.setSourceID(get_id());
244                        jg.setBrokerSubmissionTime(submissionTime);
245                        //TODO is it necessary?
246                        org.qcg.broker.schemas.jobdesc.Task t = jg.getTaskGridlet(0).getTaskDescription().getDescription();
247                        taskReq.wrap(t);
248                        try {
249                                values = taskReq.getCpucount();
250                                if(values != null && values.length != 0)
251                                        jg.setNumPE(Double.valueOf(values[0]).intValue()); // set the requested number of processors
252                        } catch (NoSuchParamException e) {
253                                if(log.isErrorEnabled())
254                                        log.error(e);
255                        }
256                       
257
258                        try {
259                                values = taskReq.getMemory();
260                                if(values != null && values.length != 0)
261                                        jg.setMemoryRequest(Double.valueOf(values[0]).intValue());
262                        } catch (NoSuchParamException e) {
263                                if(log.isErrorEnabled())
264                                        log.error(e);
265                        }
266                        */
267                       
268                        List<JobDescription> list = jobTimes.get(submissionTime);
269                        if (list == null) {
270                                list = new ArrayList<JobDescription>();
271                                jobTimes.put(submissionTime, list);
272                        }
273                        list.add(job);
274                }
275               
276                int destID = GridSim.getEntityId(destName);
277                for (Double submissionTime : jobTimes.keySet()) {
278                        List<JobDescription> list = jobTimes.get(submissionTime);
279                        for(int i = 0; i < list.size(); i++){
280                                this.sendJobsIds.add(list.get(i).getJobId());
281                        }
282                       
283                        //send(output, submissionTime, GridSimTags.GRIDLET_SUBMIT, new IO_data(list, GssConstants.DEFAULT_GRIDLET_SIZE, destID));
284                        send(destID, submissionTime, GridSimTags.GRIDLET_SUBMIT, list);
285                       
286                }
287        }
288       
289        public List<JobDescription> getAllSentJobs() {
290                return (List<JobDescription>) workloadLoader.getJobs();
291        }
292       
293        public List<TaskDescription> getAllSentTasks() {
294                List<TaskDescription> result = new ArrayList<TaskDescription>();
295                List<JobDescription> sentJobs = getAllSentJobs();
296                for (JobDescription job : sentJobs) {
297                        result.addAll(job);
298                }
299                return result;
300        }
301       
302        public List<JobInterface<?>> getAllReceivedJobs() {
303                return returnedJobs;
304        }
305
306        public int getFinishedTasksCount() {
307                int result = 0;
308                for (JobInterface<?> job : returnedJobs) {
309                        for (TaskInterface<?> task: job.getTask()) {
310                                if(task.getStatus() == Gridlet.SUCCESS)
311                                        result++;
312                        }
313                }
314                return result;
315        }
316       
317        public String getUserName() {
318                return get_name();
319        }
320       
321        public boolean isError() {
322                return error;
323        }
324       
325        /**
326         * Performs action concerning a ping request to this entity
327         * @param ev the event object
328         */
329        protected void processPingRequest(Sim_event ev) {
330        InfoPacket pkt = (InfoPacket) ev.get_data();
331        pkt.setTag(GridSimTags.INFOPKT_RETURN);
332        pkt.setDestID(pkt.getSrcID());
333
334                // sends back to the sender
335                send(output, GridSimTags.SCHEDULE_NOW, GridSimTags.INFOPKT_RETURN,
336                                new IO_data(pkt, pkt.getSize(), pkt.getSrcID()));
337    }
338       
339}
Note: See TracBrowser for help on using the repository browser.