source: DCWoRMS/trunk/src/simulator/WormsUsers.java @ 477

Revision 477, 9.5 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package simulator;
2
3import eduni.simjava.Sim_event;
4import eduni.simjava.Sim_system;
5import gridsim.GridSim;
6import gridsim.GridSimTags;
7import gridsim.Gridlet;
8import gridsim.IO_data;
9import gridsim.net.InfoPacket;
10import gssim.schedframe.scheduling.utils.JobDescription;
11import gssim.schedframe.scheduling.utils.TaskDescription;
12
13import java.util.ArrayList;
14import java.util.HashSet;
15import java.util.Iterator;
16import java.util.List;
17import java.util.Map;
18import java.util.Set;
19import java.util.TreeMap;
20
21import org.apache.commons.logging.Log;
22import org.apache.commons.logging.LogFactory;
23import org.joda.time.DateTime;
24
25import qcg.shared.constants.BrokerConstants;
26import schedframe.scheduling.tasks.Job;
27import schedframe.scheduling.tasks.JobInterface;
28import schedframe.scheduling.tasks.Task;
29import schedframe.scheduling.tasks.TaskInterface;
30import simulator.utils.XsltTransformations;
31import simulator.workload.WorkloadLoader;
32
33public class WormsUsers extends GridSim implements GenericUser {
34
35        /**A job generator, which produces jobs and tasks. These jobs are then sent by this entity */
36        protected WorkloadLoader workloadLoader;
37       
38        /** The name of the entity, to which the created tasks will be sent */
39        protected String destName;
40       
41        /** Indicates, that all tasks that returned to this object are finished */
42        protected boolean allTasksAreFinished;
43       
44        /** Stores the list of jobs, that have been returned to this object */
45        protected List<JobInterface<?>> returnedJobs;
46        protected Set<String> sendJobsIds;
47        protected Set<String> returnedJobsIds;
48       
49        protected XsltTransformations xsltTransformer;
50       
51        /**
52         * Indicates, that an error has occurred - it is used for debug purposes
53         */
54        protected boolean error;
55       
56        private static Log log = LogFactory.getLog(WormsUsers.class);
57       
58        /**
59         * Constructs the users object with the given parameters
60         * @param name the name of the users entity (must be unique across all entities in the whole simulation)
61         * @param destinationName the name of the entity, to which the created tasks will be sent
62         * @param jobGenerator the job generator, which produces jobs and tasks, that will be sent by this class
63         * @throws Exception if any occurs (see {@link GridSim#GridSim(String, double)})
64         */
65        public WormsUsers(String name, String destinationName, WorkloadLoader workload) throws Exception {
66                super(name, WormsConstants.DEFAULT_BAUD_RATE);
67                this.workloadLoader = workload;
68                destName = destinationName;
69                allTasksAreFinished = true;
70                error = false;
71               
72                sendJobsIds = new HashSet<String>();
73                returnedJobsIds = new HashSet<String>();
74               
75                this.xsltTransformer = new XsltTransformations();
76        }
77
78        @Override
79        public void body() {
80                sendJobs();
81                collectJobs();
82
83                //GridSim dependent code for shutting down the simulation
84                shutdownGridStatisticsEntity();
85                terminateIOEntities();
86                shutdownUserEntity();
87        }
88
89        /**
90         * Collects the jobs sent to broker(s)
91         */
92        protected void collectJobs() {
93                final int FACTOR = Math.min(10, workloadLoader.getTaskCount()); //the refresh rate of the gauge: at most 10 times
94                final int denominator = workloadLoader.getTaskCount() / FACTOR;
95                allTasksAreFinished = true;
96                returnedJobs = new ArrayList<JobInterface<?>>();
97                int counter = 0;
98                int oldRemeinder = 0;
99                boolean hundredPercent = false;
100               
101                Sim_event ev = new Sim_event();
102                sim_get_next(ev);
103                while (Sim_system.running()) {
104                        switch (ev.get_tag()) {
105                        case GridSimTags.END_OF_SIMULATION:
106                                //no action
107                                break;
108                       
109                        case GridSimTags.INFOPKT_SUBMIT:
110                                processPingRequest(ev);
111                                break;
112                               
113                        case GridSimTags.GRIDLET_RETURN:
114                                JobInterface<?> returnedJob = (JobInterface<?>) ev.get_data();
115                               
116                                String jobId = returnedJob.getId();
117
118                               
119                                if (returnedJobs.contains(returnedJob)) {
120                                        if(log.isErrorEnabled())
121                                                log.error("Received the same job twice (job ID " + jobId + ")");
122                                        error = true;
123                                        break;
124                                }
125                               
126                                returnedJobs.add(returnedJob);
127                                returnedJobsIds.add(jobId);
128                               
129                                if(returnedJob.getStatus() == BrokerConstants.JOB_STATUS_FINISHED) {
130                                        if(log.isDebugEnabled())
131                                                log.debug("Received finished job " + jobId);
132                                       
133                                } else {
134                                        if(returnedJob.getStatus() == BrokerConstants.JOB_STATUS_CANCELED) {
135                                                if(log.isWarnEnabled()){
136                                                        String str = "Warning! An uncomplished job (Job ID: "+jobId+") has returned to users. Job was canceled.";
137                                                        log.warn(str);
138                                                }
139                                        }
140                                       
141                                        allTasksAreFinished = false;
142                                }
143                               
144                                counter += returnedJob.getTaskCount();
145                                int remainder = (counter / denominator);
146                                if (remainder != oldRemeinder) {
147                                        int gauge = ((counter * 100) / (workloadLoader.getTaskCount()));
148                                        if(log.isInfoEnabled())
149                                                log.info(gauge + "% ");
150                                       
151                                        oldRemeinder = remainder;
152                                        if (gauge == 100)
153                                                hundredPercent = true;
154                                }
155                       
156                                break;
157                        }
158                       
159                        //if all the Gridlets have been collected
160                        if (counter == workloadLoader.getTaskCount()) {
161                                break;
162                        }
163                       
164                        sim_get_next(ev);
165                }
166               
167                //if all the Gridlets have been collected
168                if (counter == workloadLoader.getTaskCount()) {
169                        if (! hundredPercent) {
170                                if(log.isInfoEnabled())
171                                        log.info("100%");
172                        }
173                        if(log.isInfoEnabled())
174                                log.info(get_name() + ": Received all "+workloadLoader.getJobCount()+" jobs and " + counter + " tasks");                       
175                } else {
176                        if(log.isErrorEnabled())
177                                log.error(get_name() + ": ERROR DID NOT RECEIVED all tasks - some tasks were not finished! (received "+counter+" of "+workloadLoader.getTaskCount()+")");
178                }
179               
180                Iterator <String> itr = sendJobsIds.iterator();
181                String jobId;
182                if(log.isInfoEnabled()){
183                        log.info("Missing tasks: ");
184                        while(itr.hasNext()){
185                                jobId = itr.next();
186                                if(!returnedJobsIds.contains(jobId)){
187                                        log.info(jobId + ", ");
188                                }
189                        }
190                }
191               
192        }
193
194        /**
195         * Sends jobs to broker entities
196         */
197        protected void sendJobs() {
198               
199                Map<Long, List<Job>> jobTimes = new TreeMap<Long, List<Job>>();
200                int destID = GridSim.getEntityId(destName);
201                //destID=GridSim.getEntityId("BROKER@COMPUTING_GRID_0");
202                List<JobDescription> jobs = workloadLoader.getJobs();
203                //TaskRequirements taskReq = new TaskRequirementsImpl();
204                //double values[] = null;
205
206                for (JobDescription job : jobs) {
207                        long l_submissionTime = Long.MAX_VALUE;
208                        //pick the lowest submission time
209                        for (TaskDescription task : job) {
210                                if (task.getSubmissionTime() < l_submissionTime)
211                                        l_submissionTime = task.getSubmissionTime();
212                        }
213
214                        //store the submission time expressed in seconds after the simulation start time
215                        long submissionTime = l_submissionTime;
216                       
217                        Job newJob = prepareJob(job, submissionTime);
218                       
219                        List<Job> list = jobTimes.get(submissionTime);
220                        if (list == null) {
221                                list = new ArrayList<Job>();
222                                jobTimes.put(submissionTime, list);
223                        }
224                        list.add(newJob);
225                }
226                for (Long submissionTime : jobTimes.keySet()) {
227                        List<Job> list = jobTimes.get(submissionTime);
228                        for(int i = 0; i < list.size(); i++){
229                                this.sendJobsIds.add(list.get(i).getId());
230                                send(destID, submissionTime, GridSimTags.GRIDLET_SUBMIT, list.get(i));
231                        }
232                       
233                        //send(output, submissionTime, GridSimTags.GRIDLET_SUBMIT, new IO_data(list, GssConstants.DEFAULT_GRIDLET_SIZE, destID));
234                        //send(destID, submissionTime, GridSimTags.GRIDLET_SUBMIT, list);
235                       
236                }
237                System.out.println("finished sending jobs");
238        }
239       
240        public List<JobDescription> getAllSentJobs() {
241                return (List<JobDescription>) workloadLoader.getJobs();
242        }
243       
244        public List<TaskDescription> getAllSentTasks() {
245                List<TaskDescription> result = new ArrayList<TaskDescription>();
246                List<JobDescription> sentJobs = getAllSentJobs();
247                for (JobDescription job : sentJobs) {
248                        result.addAll(job);
249                }
250                return result;
251        }
252       
253        public List<JobInterface<?>> getAllReceivedJobs() {
254                return returnedJobs;
255        }
256
257        public int getFinishedTasksCount() {
258                int result = 0;
259                for (JobInterface<?> job : returnedJobs) {
260                        for (TaskInterface<?> task: job.getTask()) {
261                                if(task.getStatus() == Gridlet.SUCCESS)
262                                        result++;
263                        }
264                }
265                return result;
266        }
267       
268        public String getUserName() {
269                return get_name();
270        }
271       
272        public boolean isError() {
273                return error;
274        }
275       
276        /**
277         * Performs action concerning a ping request to this entity
278         * @param ev the event object
279         */
280        protected void processPingRequest(Sim_event ev) {
281        InfoPacket pkt = (InfoPacket) ev.get_data();
282        pkt.setTag(GridSimTags.INFOPKT_RETURN);
283        pkt.setDestID(pkt.getSrcID());
284
285                // sends back to the sender
286                send(output, GridSimTags.SCHEDULE_NOW, GridSimTags.INFOPKT_RETURN,
287                                new IO_data(pkt, pkt.getSize(), pkt.getSrcID()));
288    }
289               
290        protected Job prepareJob(JobDescription jobDescription, long submissionTime){
291
292                Job newJob = new Job(jobDescription.getJobId());
293                DateTime submitionTime = new DateTime();
294        submitionTime = submitionTime.plusMillis((int)submissionTime*1000);
295                        try {
296                               
297                                // transform job description to resource requirements
298
299                                newJob.setSenderId(this.get_id());
300                               
301                                for (TaskDescription taskDescription : jobDescription) {
302                                       
303                                        String xmlResReq = this.xsltTransformer.taskToResourceRequirements(
304                                                                                taskDescription.getDocument(),
305                                                                                jobDescription.getJobId(),
306                                                                                taskDescription.getUserDn(),
307                                                                                submitionTime);
308
309                                        Task newTask = new Task(xmlResReq);
310                                        newTask.setSenderId(this.get_id());
311                                        newTask.setStatus((int)BrokerConstants.TASK_STATUS_UNSUBMITTED);
312                                        newTask.setLength(taskDescription.getTaskLength());
313                                        newTask.setWorkloadLogWaitTime(taskDescription.getWorkloadLogWaitTime());
314                        //              newTask.setSubmissionTime(taskDescription.getSubmissionTime());
315                                       
316                                        newJob.add(newTask);
317                                }
318
319                                jobDescription.discardUnused();
320                               
321                        } catch (Exception e){
322                                log.error(e.getMessage());
323                                e.printStackTrace();
324                        }
325               
326                return newJob;
327        }
328}
Note: See TracBrowser for help on using the repository browser.