source: DCWoRMS/trunk/src/simulator/DCWormsUsers.java @ 481

Revision 481, 9.6 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package simulator;
2
3import eduni.simjava.Sim_event;
4import eduni.simjava.Sim_system;
5import gridsim.GridSim;
6import gridsim.GridSimTags;
7import gridsim.Gridlet;
8import gridsim.IO_data;
9import gridsim.gssim.DCWormsTags;
10import gridsim.net.InfoPacket;
11import gssim.schedframe.scheduling.utils.JobDescription;
12import gssim.schedframe.scheduling.utils.TaskDescription;
13
14import java.util.ArrayList;
15import java.util.HashSet;
16import java.util.Iterator;
17import java.util.List;
18import java.util.Map;
19import java.util.Set;
20import java.util.TreeMap;
21
22import org.apache.commons.logging.Log;
23import org.apache.commons.logging.LogFactory;
24import org.joda.time.DateTime;
25
26import qcg.shared.constants.BrokerConstants;
27import schedframe.scheduling.tasks.Job;
28import schedframe.scheduling.tasks.JobInterface;
29import schedframe.scheduling.tasks.Task;
30import schedframe.scheduling.tasks.TaskInterface;
31import simulator.utils.XsltTransformations;
32import simulator.workload.WorkloadLoader;
33
34public class DCWormsUsers extends GridSim implements GenericUser {
35
36        /**A job generator, which produces jobs and tasks. These jobs are then sent by this entity */
37        protected WorkloadLoader workloadLoader;
38       
39        /** The name of the entity, to which the created tasks will be sent */
40        protected String destName;
41       
42        /** Indicates, that all tasks that returned to this object are finished */
43        protected boolean allTasksAreFinished;
44       
45        /** Stores the list of jobs, that have been returned to this object */
46        protected List<JobInterface<?>> returnedJobs;
47        protected Set<String> sendJobsIds;
48        protected Set<String> returnedJobsIds;
49       
50        protected XsltTransformations xsltTransformer;
51       
52        /**
53         * Indicates, that an error has occurred - it is used for debug purposes
54         */
55        protected boolean error;
56       
57        private static Log log = LogFactory.getLog(DCWormsUsers.class);
58       
59        /**
60         * Constructs the users object with the given parameters
61         * @param name the name of the users entity (must be unique across all entities in the whole simulation)
62         * @param destinationName the name of the entity, to which the created tasks will be sent
63         * @param jobGenerator the job generator, which produces jobs and tasks, that will be sent by this class
64         * @throws Exception if any occurs (see {@link GridSim#GridSim(String, double)})
65         */
66        public DCWormsUsers(String name, String destinationName, WorkloadLoader workload) throws Exception {
67                super(name, DCWormsConstants.DEFAULT_BAUD_RATE);
68                this.workloadLoader = workload;
69                destName = destinationName;
70                allTasksAreFinished = true;
71                error = false;
72               
73                sendJobsIds = new HashSet<String>();
74                returnedJobsIds = new HashSet<String>();
75               
76                this.xsltTransformer = new XsltTransformations();
77        }
78
79        @Override
80        public void body() {
81                sendJobs();
82                collectJobs();
83
84                //GridSim dependent code for shutting down the simulation
85                shutdownGridStatisticsEntity();
86                terminateIOEntities();
87                shutdownUserEntity();
88        }
89
90        /**
91         * Collects the jobs sent to broker(s)
92         */
93        protected void collectJobs() {
94                final int FACTOR = Math.min(10, workloadLoader.getTaskCount()); //the refresh rate of the gauge: at most 10 times
95                final int denominator = workloadLoader.getTaskCount() / FACTOR;
96                allTasksAreFinished = true;
97                returnedJobs = new ArrayList<JobInterface<?>>();
98                int counter = 0;
99                int oldRemeinder = 0;
100                boolean hundredPercent = false;
101               
102                Sim_event ev = new Sim_event();
103                sim_get_next(ev);
104                while (Sim_system.running()) {
105                        switch (ev.get_tag()) {
106                        case GridSimTags.END_OF_SIMULATION:
107                                //no action
108                                break;
109                       
110                        case GridSimTags.INFOPKT_SUBMIT:
111                                processPingRequest(ev);
112                                break;
113                               
114                        case GridSimTags.GRIDLET_RETURN:
115                                JobInterface<?> returnedJob = (JobInterface<?>) ev.get_data();
116                               
117                                String jobId = returnedJob.getId();
118
119                               
120                                if (returnedJobs.contains(returnedJob)) {
121                                        if(log.isErrorEnabled())
122                                                log.error("Received the same job twice (job ID " + jobId + ")");
123                                        error = true;
124                                        break;
125                                }
126                               
127                                returnedJobs.add(returnedJob);
128                                returnedJobsIds.add(jobId);
129                               
130                                if(returnedJob.getStatus() == BrokerConstants.JOB_STATUS_FINISHED) {
131                                        if(log.isDebugEnabled())
132                                                log.debug("Received finished job " + jobId);
133                                       
134                                } else {
135                                        if(returnedJob.getStatus() == BrokerConstants.JOB_STATUS_CANCELED) {
136                                                if(log.isWarnEnabled()){
137                                                        String str = "Warning! An uncomplished job (Job ID: "+jobId+") has returned to users. Job was canceled.";
138                                                        log.warn(str);
139                                                }
140                                        }
141                                       
142                                        allTasksAreFinished = false;
143                                }
144                               
145                                counter += returnedJob.getTaskCount();
146                                int remainder = (counter / denominator);
147                                if (remainder != oldRemeinder) {
148                                        int gauge = ((counter * 100) / (workloadLoader.getTaskCount()));
149                                        if(log.isInfoEnabled())
150                                                log.info(gauge + "% ");
151                                       
152                                        oldRemeinder = remainder;
153                                        if (gauge == 100)
154                                                hundredPercent = true;
155                                }
156                       
157                                break;
158                        }
159                       
160                        //if all the Gridlets have been collected
161                        if (counter == workloadLoader.getTaskCount()) {
162                                break;
163                        }
164                       
165                        sim_get_next(ev);
166                }
167               
168                //if all the Gridlets have been collected
169                if (counter == workloadLoader.getTaskCount()) {
170                        if (! hundredPercent) {
171                                if(log.isInfoEnabled())
172                                        log.info("100%");
173                        }
174                        if(log.isInfoEnabled())
175                                log.info(get_name() + ": Received all "+workloadLoader.getJobCount()+" jobs and " + counter + " tasks");                       
176                } else {
177                        if(log.isErrorEnabled())
178                                log.error(get_name() + ": ERROR DID NOT RECEIVED all tasks - some tasks were not finished! (received "+counter+" of "+workloadLoader.getTaskCount()+")");
179                }
180               
181                Iterator <String> itr = sendJobsIds.iterator();
182                String jobId;
183                if(log.isInfoEnabled()){
184                        log.info("Missing tasks: ");
185                        while(itr.hasNext()){
186                                jobId = itr.next();
187                                if(!returnedJobsIds.contains(jobId)){
188                                        log.info(jobId + ", ");
189                                }
190                        }
191                }
192               
193        }
194
195        /**
196         * Sends jobs to broker entities
197         */
198        protected void sendJobs() {
199               
200                Map<Long, List<Job>> jobTimes = new TreeMap<Long, List<Job>>();
201                int destID = GridSim.getEntityId(destName);
202                //destID=GridSim.getEntityId("BROKER@COMPUTING_GRID_0");
203                List<JobDescription> jobs = workloadLoader.getJobs();
204                //TaskRequirements taskReq = new TaskRequirementsImpl();
205                //double values[] = null;
206
207                for (JobDescription job : jobs) {
208                        long l_submissionTime = Long.MAX_VALUE;
209                        //pick the lowest submission time
210                        for (TaskDescription task : job) {
211                                if (task.getSubmissionTime() < l_submissionTime)
212                                        l_submissionTime = task.getSubmissionTime();
213                        }
214
215                        //store the submission time expressed in seconds after the simulation start time
216                        long submissionTime = l_submissionTime;
217                       
218                        Job newJob = prepareJob(job, submissionTime);
219                       
220                        List<Job> list = jobTimes.get(submissionTime);
221                        if (list == null) {
222                                list = new ArrayList<Job>();
223                                jobTimes.put(submissionTime, list);
224                        }
225                        list.add(newJob);
226                }
227                for (Long submissionTime : jobTimes.keySet()) {
228                        List<Job> list = jobTimes.get(submissionTime);
229                        for(int i = 0; i < list.size(); i++){
230                                this.sendJobsIds.add(list.get(i).getId());
231                                send(destID, submissionTime, GridSimTags.GRIDLET_SUBMIT, list.get(i));
232                        }
233                       
234                        //send(output, submissionTime, GridSimTags.GRIDLET_SUBMIT, new IO_data(list, GssConstants.DEFAULT_GRIDLET_SIZE, destID));
235                        //send(destID, submissionTime, GridSimTags.GRIDLET_SUBMIT, list);
236                       
237                }
238                System.out.println("finished sending jobs");
239        }
240       
241        public List<JobDescription> getAllSentJobs() {
242                return (List<JobDescription>) workloadLoader.getJobs();
243        }
244       
245        public List<TaskDescription> getAllSentTasks() {
246                List<TaskDescription> result = new ArrayList<TaskDescription>();
247                List<JobDescription> sentJobs = getAllSentJobs();
248                for (JobDescription job : sentJobs) {
249                        result.addAll(job);
250                }
251                return result;
252        }
253       
254        public List<JobInterface<?>> getAllReceivedJobs() {
255                return returnedJobs;
256        }
257
258        public int getFinishedTasksCount() {
259                int result = 0;
260                for (JobInterface<?> job : returnedJobs) {
261                        for (TaskInterface<?> task: job.getTask()) {
262                                if(task.getStatus() == DCWormsTags.SUCCESS)
263                                        result++;
264                        }
265                }
266                return result;
267        }
268       
269        public String getUserName() {
270                return get_name();
271        }
272       
273        public boolean isError() {
274                return error;
275        }
276       
277        /**
278         * Performs action concerning a ping request to this entity
279         * @param ev the event object
280         */
281        protected void processPingRequest(Sim_event ev) {
282        InfoPacket pkt = (InfoPacket) ev.get_data();
283        pkt.setTag(GridSimTags.INFOPKT_RETURN);
284        pkt.setDestID(pkt.getSrcID());
285
286                // sends back to the sender
287                send(output, GridSimTags.SCHEDULE_NOW, GridSimTags.INFOPKT_RETURN,
288                                new IO_data(pkt, pkt.getSize(), pkt.getSrcID()));
289    }
290               
291        protected Job prepareJob(JobDescription jobDescription, long submissionTime){
292
293                Job newJob = new Job(jobDescription.getJobId());
294                DateTime submitionTime = new DateTime();
295        submitionTime = submitionTime.plusMillis((int)submissionTime*1000);
296                        try {
297                               
298                                // transform job description to resource requirements
299
300                                newJob.setSenderId(this.get_id());
301                               
302                                for (TaskDescription taskDescription : jobDescription) {
303                                       
304                                        String xmlResReq = this.xsltTransformer.taskToResourceRequirements(
305                                                                                taskDescription.getDocument(),
306                                                                                jobDescription.getJobId(),
307                                                                                taskDescription.getUserDn(),
308                                                                                submitionTime);
309
310                                        Task newTask = new Task(xmlResReq);
311                                        newTask.setSenderId(this.get_id());
312                                        newTask.setStatus((int)BrokerConstants.TASK_STATUS_UNSUBMITTED);
313                                        newTask.setLength(taskDescription.getTaskLength());
314                                        newTask.setWorkloadLogWaitTime(taskDescription.getWorkloadLogWaitTime());
315                        //              newTask.setSubmissionTime(taskDescription.getSubmissionTime());
316                                       
317                                        newJob.add(newTask);
318                                }
319
320                                jobDescription.discardUnused();
321                               
322                        } catch (Exception e){
323                                log.error(e.getMessage());
324                                e.printStackTrace();
325                        }
326               
327                return newJob;
328        }
329}
Note: See TracBrowser for help on using the repository browser.