source: DCWoRMS/trunk/src/simulator/DCWormsUsers.java @ 686

Revision 686, 9.8 KB checked in by wojtekp, 12 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package simulator;
2
3import dcworms.schedframe.scheduling.utils.JobDescription;
4import dcworms.schedframe.scheduling.utils.TaskDescription;
5import eduni.simjava.Sim_event;
6import eduni.simjava.Sim_system;
7import gridsim.GridSim;
8import gridsim.GridSimTags;
9import gridsim.IO_data;
10import gridsim.dcworms.DCWormsTags;
11import gridsim.net.InfoPacket;
12
13import java.util.ArrayList;
14import java.util.HashSet;
15import java.util.Iterator;
16import java.util.List;
17import java.util.Map;
18import java.util.Set;
19import java.util.TreeMap;
20
21import org.apache.commons.logging.Log;
22import org.apache.commons.logging.LogFactory;
23import org.joda.time.DateTime;
24
25import qcg.shared.constants.BrokerConstants;
26import schedframe.scheduling.tasks.Job;
27import schedframe.scheduling.tasks.JobInterface;
28import schedframe.scheduling.tasks.Task;
29import schedframe.scheduling.tasks.TaskInterface;
30import simulator.utils.XsltTransformations;
31import simulator.workload.WorkloadLoader;
32
33public class DCWormsUsers extends GridSim implements GenericUser {
34
35        /**A job generator, which produces jobs and tasks. These jobs are then sent by this entity */
36        protected WorkloadLoader workloadLoader;
37       
38        /** The name of the entity, to which the created tasks will be sent */
39        protected String destName;
40       
41        /** Indicates, that all tasks that returned to this object are finished */
42        protected boolean allTasksAreFinished;
43       
44        /** Stores the list of jobs, that have been returned to this object */
45        protected List<JobInterface<?>> returnedJobs;
46        protected Set<String> sendJobsIds;
47        protected Set<String> returnedJobsIds;
48       
49        protected long submissionStartTime = Long.MAX_VALUE;
50       
51        protected XsltTransformations xsltTransformer;
52       
53        /**
54         * Indicates, that an error has occurred - it is used for debug purposes
55         */
56        protected boolean error;
57       
58        private static Log log = LogFactory.getLog(DCWormsUsers.class);
59       
60        /**
61         * Constructs the users object with the given parameters
62         * @param name the name of the users entity (must be unique across all entities in the whole simulation)
63         * @param destinationName the name of the entity, to which the created tasks will be sent
64         * @param jobGenerator the job generator, which produces jobs and tasks, that will be sent by this class
65         * @throws Exception if any occurs (see {@link GridSim#GridSim(String, double)})
66         */
67        public DCWormsUsers(String name, String destinationName, WorkloadLoader workload) throws Exception {
68                super(name, DCWormsConstants.DEFAULT_BAUD_RATE);
69                this.workloadLoader = workload;
70                destName = destinationName;
71                allTasksAreFinished = true;
72                error = false;
73               
74                sendJobsIds = new HashSet<String>();
75                returnedJobsIds = new HashSet<String>();
76               
77                this.xsltTransformer = new XsltTransformations();
78        }
79
80        @Override
81        public void body() {
82                sendJobs();
83                collectJobs();
84
85                //GridSim dependent code for shutting down the simulation
86                shutdownGridStatisticsEntity();
87                terminateIOEntities();
88                shutdownUserEntity();
89        }
90
91        /**
92         * Collects the jobs sent to broker(s)
93         */
94        protected void collectJobs() {
95                final int FACTOR = Math.min(10, workloadLoader.getTaskCount()); //the refresh rate of the gauge: at most 10 times
96                final int denominator = workloadLoader.getTaskCount() / FACTOR;
97                allTasksAreFinished = true;
98                returnedJobs = new ArrayList<JobInterface<?>>();
99                int counter = 0;
100                int oldRemeinder = 0;
101                boolean hundredPercent = false;
102               
103                Sim_event ev = new Sim_event();
104                sim_get_next(ev);
105                while (Sim_system.running()) {
106                        switch (ev.get_tag()) {
107                        case GridSimTags.END_OF_SIMULATION:
108                                //no action
109                                break;
110                       
111                        case GridSimTags.INFOPKT_SUBMIT:
112                                processPingRequest(ev);
113                                break;
114                               
115                        case GridSimTags.GRIDLET_RETURN:
116                                JobInterface<?> returnedJob = (JobInterface<?>) ev.get_data();
117                               
118                                String jobId = returnedJob.getId();
119
120                               
121                                if (returnedJobs.contains(returnedJob)) {
122                                        if(log.isErrorEnabled())
123                                                log.error("Received the same job twice (job ID " + jobId + ")");
124                                        error = true;
125                                        break;
126                                }
127                               
128                                returnedJobs.add(returnedJob);
129                                returnedJobsIds.add(jobId);
130                               
131                                if(returnedJob.getStatus() == BrokerConstants.JOB_STATUS_FINISHED) {
132                                        if(log.isDebugEnabled())
133                                                log.debug("Received finished job " + jobId);
134                                       
135                                } else {
136                                        if(returnedJob.getStatus() == BrokerConstants.JOB_STATUS_CANCELED) {
137                                                if(log.isWarnEnabled()){
138                                                        String str = "Warning! An uncomplished job (Job ID: "+jobId+") has returned to users. Job was canceled.";
139                                                        log.warn(str);
140                                                }
141                                        }
142                                       
143                                        allTasksAreFinished = false;
144                                }
145                               
146                                counter += returnedJob.getTaskCount();
147                                int remainder = (counter / denominator);
148                                if (remainder != oldRemeinder) {
149                                        int gauge = ((counter * 100) / (workloadLoader.getTaskCount()));
150                                        if(log.isInfoEnabled())
151                                                log.info(gauge + "% ");
152                                       
153                                        oldRemeinder = remainder;
154                                        if (gauge == 100)
155                                                hundredPercent = true;
156                                }
157                       
158                                break;
159                        }
160                       
161                        //if all the Gridlets have been collected
162                        if (counter == workloadLoader.getTaskCount()) {
163                                break;
164                        }
165                       
166                        sim_get_next(ev);
167                }
168               
169                //if all the Gridlets have been collected
170                if (counter == workloadLoader.getTaskCount()) {
171                        if (! hundredPercent) {
172                                if(log.isInfoEnabled())
173                                        log.info("100%");
174                        }
175                        if(log.isInfoEnabled())
176                                log.info(get_name() + ": Received all "+workloadLoader.getJobCount()+" jobs and " + counter + " tasks");                       
177                } else {
178                        if(log.isErrorEnabled())
179                                log.error(get_name() + ": ERROR DID NOT RECEIVED all tasks - some tasks were not finished! (received "+counter+" of "+workloadLoader.getTaskCount()+")");
180                }
181               
182                Iterator <String> itr = sendJobsIds.iterator();
183                String jobId;
184                if(log.isInfoEnabled()){
185                        log.info("Missing tasks: ");
186                        while(itr.hasNext()){
187                                jobId = itr.next();
188                                if(!returnedJobsIds.contains(jobId)){
189                                        log.info(jobId + ", ");
190                                }
191                        }
192                }
193               
194        }
195
196        /**
197         * Sends jobs to broker entities
198         */
199        protected void sendJobs() {
200               
201                Map<Long, List<Job>> jobTimes = new TreeMap<Long, List<Job>>();
202                int destID = GridSim.getEntityId(destName);
203                //destID=GridSim.getEntityId("BROKER@COMPUTING_GRID_0");
204                List<JobDescription> jobs = workloadLoader.getJobs();
205                //TaskRequirements taskReq = new TaskRequirementsImpl();
206                //double values[] = null;
207
208                for (JobDescription job : jobs) {
209                        long l_submissionTime = Long.MAX_VALUE;
210                        //pick the lowest submission time
211                        for (TaskDescription task : job) {
212                                if (task.getSubmissionTime() < l_submissionTime)
213                                        l_submissionTime = task.getSubmissionTime();
214                        }
215
216                        //store the submission time expressed in seconds after the simulation start time
217                        long submissionTime = l_submissionTime;
218                       
219                        Job newJob = prepareJob(job, submissionTime);
220                       
221                        List<Job> list = jobTimes.get(submissionTime);
222                        if (list == null) {
223                                list = new ArrayList<Job>();
224                                jobTimes.put(submissionTime, list);
225                        }
226                        list.add(newJob);
227                }
228                for (Long submissionTime : jobTimes.keySet()) {
229                       
230                        if(submissionTime < submissionStartTime)
231                                submissionStartTime = submissionTime;
232
233                        List<Job> list = jobTimes.get(submissionTime);
234                        for(int i = 0; i < list.size(); i++){
235                                this.sendJobsIds.add(list.get(i).getId());
236                                send(destID, submissionTime, GridSimTags.GRIDLET_SUBMIT, list.get(i));
237                        }
238                       
239                        //send(output, submissionTime, GridSimTags.GRIDLET_SUBMIT, new IO_data(list, GssConstants.DEFAULT_GRIDLET_SIZE, destID));
240                        //send(destID, submissionTime, GridSimTags.GRIDLET_SUBMIT, list);
241                       
242                }
243                System.out.println("finished sending jobs");
244        }
245       
246        public List<JobDescription> getAllSentJobs() {
247                return (List<JobDescription>) workloadLoader.getJobs();
248        }
249       
250        public List<TaskDescription> getAllSentTasks() {
251                List<TaskDescription> result = new ArrayList<TaskDescription>();
252                List<JobDescription> sentJobs = getAllSentJobs();
253                for (JobDescription job : sentJobs) {
254                        result.addAll(job);
255                }
256                return result;
257        }
258       
259        public List<JobInterface<?>> getAllReceivedJobs() {
260                return returnedJobs;
261        }
262
263        public int getFinishedTasksCount() {
264                int result = 0;
265                for (JobInterface<?> job : returnedJobs) {
266                        for (TaskInterface<?> task: job.getTask()) {
267                                if(task.getStatus() == DCWormsTags.SUCCESS)
268                                        result++;
269                        }
270                }
271                return result;
272        }
273       
274        public String getUserName() {
275                return get_name();
276        }
277       
278        public boolean isError() {
279                return error;
280        }
281       
282        /**
283         * Performs action concerning a ping request to this entity
284         * @param ev the event object
285         */
286        protected void processPingRequest(Sim_event ev) {
287        InfoPacket pkt = (InfoPacket) ev.get_data();
288        pkt.setTag(GridSimTags.INFOPKT_RETURN);
289        pkt.setDestID(pkt.getSrcID());
290
291                // sends back to the sender
292                send(output, GridSimTags.SCHEDULE_NOW, GridSimTags.INFOPKT_RETURN,
293                                new IO_data(pkt, pkt.getSize(), pkt.getSrcID()));
294    }
295               
296        protected Job prepareJob(JobDescription jobDescription, long submissionTime){
297
298                Job newJob = new Job(jobDescription.getJobId());
299                DateTime submitionTime = new DateTime();
300        submitionTime = submitionTime.plusMillis((int)submissionTime*1000);
301                        try {
302                               
303                                // transform job description to resource requirements
304
305                                newJob.setSenderId(this.get_id());
306                               
307                                for (TaskDescription taskDescription : jobDescription) {
308                                       
309                                        String xmlResReq = this.xsltTransformer.taskToResourceRequirements(
310                                                                                taskDescription.getDocument(),
311                                                                                jobDescription.getJobId(),
312                                                                                taskDescription.getUserDn(),
313                                                                                submitionTime);
314
315                                        Task newTask = new Task(xmlResReq);
316                                        newTask.setSenderId(this.get_id());
317                                        newTask.setStatus((int)BrokerConstants.TASK_STATUS_UNSUBMITTED);
318                                        newTask.setLength(taskDescription.getTaskLength());
319                                        newTask.setWorkloadLogWaitTime(taskDescription.getWorkloadLogWaitTime());
320                        //              newTask.setSubmissionTime(taskDescription.getSubmissionTime());
321                                       
322                                        newJob.add(newTask);
323                                }
324
325                                jobDescription.discardUnused();
326                               
327                        } catch (Exception e){
328                                log.error(e.getMessage());
329                                e.printStackTrace();
330                        }
331               
332                return newJob;
333        }
334
335        public long getSubmissionStartTime() {
336                return submissionStartTime;
337        }
338       
339        public boolean isSimStartTimeDefined(){
340                return workloadLoader.isSimStartTimeDefined();
341        }
342}
Note: See TracBrowser for help on using the repository browser.