source: DCWoRMS/branches/coolemall/src/simulator/DCWormsUsers.java @ 1160

Revision 1160, 10.8 KB checked in by wojtekp, 12 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package simulator;
2
3import dcworms.schedframe.scheduling.utils.JobDescription;
4import dcworms.schedframe.scheduling.utils.TaskDescription;
5import eduni.simjava.Sim_event;
6import eduni.simjava.Sim_system;
7import gridsim.GridSim;
8import gridsim.GridSimTags;
9import gridsim.IO_data;
10import gridsim.dcworms.DCWormsTags;
11import gridsim.net.InfoPacket;
12
13import java.util.ArrayList;
14import java.util.HashSet;
15import java.util.Iterator;
16import java.util.List;
17import java.util.Map;
18import java.util.Set;
19import java.util.TreeMap;
20
21import org.apache.commons.logging.Log;
22import org.apache.commons.logging.LogFactory;
23import org.joda.time.DateTime;
24import org.qcg.broker.schemas.jobdesc.ParentType;
25import org.qcg.broker.schemas.jobdesc.Workflow;
26import org.qcg.broker.schemas.jobdesc.types.TaskStatesName;
27
28import qcg.shared.constants.BrokerConstants;
29import schedframe.scheduling.tasks.Job;
30import schedframe.scheduling.tasks.JobInterface;
31import schedframe.scheduling.tasks.Task;
32import schedframe.scheduling.tasks.TaskInterface;
33import simulator.utils.XsltTransformations;
34import simulator.workload.WorkloadLoader;
35
36public class DCWormsUsers extends GridSim implements GenericUser {
37
38        /**A job generator, which produces jobs and tasks. These jobs are then sent by this entity */
39        protected WorkloadLoader workloadLoader;
40       
41        /** The name of the entity, to which the created tasks will be sent */
42        protected String destName;
43       
44        /** Indicates, that all tasks that returned to this object are finished */
45        protected boolean allTasksAreFinished;
46       
47        /** Stores the list of jobs, that have been returned to this object */
48        protected List<JobInterface<?>> returnedJobs;
49        protected Set<String> sendJobsIds;
50        protected Set<String> returnedJobsIds;
51       
52        protected long submissionStartTime = Long.MAX_VALUE;
53       
54        protected XsltTransformations xsltTransformer;
55       
56        /**
57         * Indicates, that an error has occurred - it is used for debug purposes
58         */
59        protected boolean error;
60       
61        private static Log log = LogFactory.getLog(DCWormsUsers.class);
62       
63        /**
64         * Constructs the users object with the given parameters
65         * @param name the name of the users entity (must be unique across all entities in the whole simulation)
66         * @param destinationName the name of the entity, to which the created tasks will be sent
67         * @param jobGenerator the job generator, which produces jobs and tasks, that will be sent by this class
68         * @throws Exception if any occurs (see {@link GridSim#GridSim(String, double)})
69         */
70        public DCWormsUsers(String name, String destinationName, WorkloadLoader workload) throws Exception {
71                super(name, DCWormsConstants.DEFAULT_BAUD_RATE);
72                this.workloadLoader = workload;
73                destName = destinationName;
74                allTasksAreFinished = true;
75                error = false;
76               
77                sendJobsIds = new HashSet<String>();
78                returnedJobsIds = new HashSet<String>();
79               
80                this.xsltTransformer = new XsltTransformations();
81        }
82
83        @Override
84        public void body() {
85                sendJobs();
86                collectJobs();
87
88                //GridSim dependent code for shutting down the simulation
89                shutdownGridStatisticsEntity();
90                terminateIOEntities();
91                shutdownUserEntity();
92        }
93
94        /**
95         * Collects the jobs sent to broker(s)
96         */
97        protected void collectJobs() {
98                final int FACTOR = Math.min(10, workloadLoader.getTaskCount()); //the refresh rate of the gauge: at most 10 times
99                final int denominator = workloadLoader.getTaskCount() / FACTOR;
100                allTasksAreFinished = true;
101                returnedJobs = new ArrayList<JobInterface<?>>();
102                int counter = 0;
103                int oldRemeinder = 0;
104                boolean hundredPercent = false;
105               
106                Sim_event ev = new Sim_event();
107                sim_get_next(ev);
108                while (Sim_system.running()) {
109                        switch (ev.get_tag()) {
110                        case GridSimTags.END_OF_SIMULATION:
111                                //no action
112                                break;
113                       
114                        case GridSimTags.INFOPKT_SUBMIT:
115                                processPingRequest(ev);
116                                break;
117                               
118                        case GridSimTags.GRIDLET_RETURN:
119                                JobInterface<?> returnedJob = (JobInterface<?>) ev.get_data();
120                               
121                                String jobId = returnedJob.getId();
122
123                               
124                                if (returnedJobs.contains(returnedJob)) {
125                                        if(log.isErrorEnabled())
126                                                log.error("Received the same job twice (job ID " + jobId + ")");
127                                        error = true;
128                                        break;
129                                }
130                               
131                                returnedJobs.add(returnedJob);
132                                returnedJobsIds.add(jobId);
133                               
134                                if(returnedJob.getStatus() == BrokerConstants.JOB_STATUS_FINISHED) {
135                                        if(log.isDebugEnabled())
136                                                log.debug("Received finished job " + jobId);
137                                       
138                                } else {
139                                        if(returnedJob.getStatus() == BrokerConstants.JOB_STATUS_CANCELED) {
140                                                if(log.isWarnEnabled()){
141                                                        String str = "Warning! An uncomplished job (Job ID: "+jobId+") has returned to users. Job was canceled.";
142                                                        log.warn(str);
143                                                }
144                                        }
145                                       
146                                        allTasksAreFinished = false;
147                                }
148                               
149                                counter += returnedJob.getTaskCount();
150                                int remainder = (counter / denominator);
151                                if (remainder != oldRemeinder) {
152                                        int gauge = ((counter * 100) / (workloadLoader.getTaskCount()));
153                                        if(log.isInfoEnabled())
154                                                log.info(gauge + "% ");
155                                       
156                                        oldRemeinder = remainder;
157                                        if (gauge == 100)
158                                                hundredPercent = true;
159                                }
160                       
161                                break;
162                        }
163                       
164                        //if all the Gridlets have been collected
165                        if (counter == workloadLoader.getTaskCount()) {
166                                break;
167                        }
168                       
169                        sim_get_next(ev);
170                }
171               
172                //if all the Gridlets have been collected
173                if (counter == workloadLoader.getTaskCount()) {
174                        if (! hundredPercent) {
175                                if(log.isInfoEnabled())
176                                        log.info("100%");
177                        }
178                        if(log.isInfoEnabled())
179                                log.info(get_name() + ": Received all "+workloadLoader.getJobCount()+" jobs and " + counter + " tasks");                       
180                } else {
181                        if(log.isErrorEnabled())
182                                log.error(get_name() + ": ERROR DID NOT RECEIVED all tasks - some tasks were not finished! (received "+counter+" of "+workloadLoader.getTaskCount()+")");
183                }
184               
185                Iterator <String> itr = sendJobsIds.iterator();
186                String jobId;
187                if(log.isInfoEnabled()){
188                        log.info("Missing tasks: ");
189                        while(itr.hasNext()){
190                                jobId = itr.next();
191                                if(!returnedJobsIds.contains(jobId)){
192                                        log.info(jobId + ", ");
193                                }
194                        }
195                }
196               
197        }
198
199        /**
200         * Sends jobs to broker entities
201         */
202        protected void sendJobs() {
203               
204                Map<Long, List<Job>> jobTimes = new TreeMap<Long, List<Job>>();
205                int destID = GridSim.getEntityId(destName);
206                //destID=GridSim.getEntityId("BROKER@COMPUTING_GRID_0");
207                List<JobDescription> jobs = workloadLoader.getJobs();
208                //TaskRequirements taskReq = new TaskRequirementsImpl();
209                //double values[] = null;
210
211                for (JobDescription job : jobs) {
212                        long l_submissionTime = Long.MAX_VALUE;
213                        //pick the lowest submission time
214                        for (TaskDescription task : job) {
215                                if (task.getSubmissionTime() < l_submissionTime)
216                                        l_submissionTime = task.getSubmissionTime();
217                        }
218
219                        //store the submission time expressed in seconds after the simulation start time
220                        long submissionTime = l_submissionTime;
221                       
222                        Job newJob = prepareJob(job, submissionTime);
223                       
224                        List<Job> list = jobTimes.get(submissionTime);
225                        if (list == null) {
226                                list = new ArrayList<Job>();
227                                jobTimes.put(submissionTime, list);
228                        }
229                        list.add(newJob);
230                }
231                for (Long submissionTime : jobTimes.keySet()) {
232                       
233                        if(submissionTime < submissionStartTime)
234                                submissionStartTime = submissionTime;
235
236                        List<Job> list = jobTimes.get(submissionTime);
237                        for(int i = 0; i < list.size(); i++){
238                                this.sendJobsIds.add(list.get(i).getId());
239                                send(destID, submissionTime, GridSimTags.GRIDLET_SUBMIT, list.get(i));
240                        }
241                       
242                        //send(output, submissionTime, GridSimTags.GRIDLET_SUBMIT, new IO_data(list, GssConstants.DEFAULT_GRIDLET_SIZE, destID));
243                        //send(destID, submissionTime, GridSimTags.GRIDLET_SUBMIT, list);
244                       
245                }
246                System.out.println("finished sending jobs");
247        }
248       
249        public List<JobDescription> getAllSentJobs() {
250                return (List<JobDescription>) workloadLoader.getJobs();
251        }
252       
253        public List<TaskDescription> getAllSentTasks() {
254                List<TaskDescription> result = new ArrayList<TaskDescription>();
255                List<JobDescription> sentJobs = getAllSentJobs();
256                for (JobDescription job : sentJobs) {
257                        result.addAll(job);
258                }
259                return result;
260        }
261       
262        public List<JobInterface<?>> getAllReceivedJobs() {
263                return returnedJobs;
264        }
265
266        public int getFinishedTasksCount() {
267                int result = 0;
268                for (JobInterface<?> job : returnedJobs) {
269                        for (TaskInterface<?> task: job.getTask()) {
270                                if(task.getStatus() == DCWormsTags.SUCCESS)
271                                        result++;
272                        }
273                }
274                return result;
275        }
276       
277        public String getUserName() {
278                return get_name();
279        }
280       
281        public boolean isError() {
282                return error;
283        }
284       
285        /**
286         * Performs action concerning a ping request to this entity
287         * @param ev the event object
288         */
289        protected void processPingRequest(Sim_event ev) {
290        InfoPacket pkt = (InfoPacket) ev.get_data();
291        pkt.setTag(GridSimTags.INFOPKT_RETURN);
292        pkt.setDestID(pkt.getSrcID());
293
294                // sends back to the sender
295                send(output, GridSimTags.SCHEDULE_NOW, GridSimTags.INFOPKT_RETURN,
296                                new IO_data(pkt, pkt.getSize(), pkt.getSrcID()));
297    }
298               
299        protected Job prepareJob(JobDescription jobDescription, long submissionTime){
300
301                Job newJob = new Job(jobDescription.getJobId());
302                DateTime submitionTime = new DateTime();
303        submitionTime = submitionTime.plusMillis((int)submissionTime*1000);
304                        try {
305                               
306                                // transform job description to resource requirements
307
308                                newJob.setSenderId(this.get_id());
309                               
310                                for (TaskDescription taskDescription : jobDescription) {
311                                       
312                                        String xmlResReq = this.xsltTransformer.taskToResourceRequirements(
313                                                                                taskDescription.getDocument(),
314                                                                                jobDescription.getJobId(),
315                                                                                taskDescription.getUserDn(),
316                                                                                submitionTime);
317
318                                        Task newTask = new Task(xmlResReq);
319                                        newTask.setSenderId(this.get_id());
320                                        newTask.setStatus((int)BrokerConstants.TASK_STATUS_UNSUBMITTED);
321                                        newTask.setLength(taskDescription.getTaskLength());
322                                        newTask.setWorkloadLogWaitTime(taskDescription.getWorkloadLogWaitTime());
323                        //              newTask.setSubmissionTime(taskDescription.getSubmissionTime());
324                                       
325                                        //ENABLES MERGING PRECEDING CONSTRAINST COMING BOTH FROM SWF AND XML FILES
326                                        /*if(taskDescription.getDescription().getWorkflow() != null){
327                                                String precTaskId = taskDescription.getDescription().getWorkflow().getParent(0).getContent();
328                                                if(     newTask.getDescription().getWorkflow() == null){
329                                                        org.qcg.broker.schemas.resreqs.Workflow workflow = new org.qcg.broker.schemas.resreqs.Workflow();
330                                                        org.qcg.broker.schemas.resreqs.ParentType parent = new org.qcg.broker.schemas.resreqs.ParentType();
331                                                        parent.setTriggerState(org.qcg.broker.schemas.resreqs.types.TaskStatesName.FINISHED);
332                                                        parent.setContent(precTaskId);
333                                                        workflow.addParent(parent);
334                                                        newTask.getDescription().setWorkflow(workflow);
335                                                }
336                                        }*/
337                                        newJob.add(newTask);
338                                }
339                                newJob.setStatus((int)BrokerConstants.JOB_STATUS_UNSUBMITTED);
340                                jobDescription.discardUnused();
341                               
342                        } catch (Exception e){
343                                log.error(e.getMessage());
344                                e.printStackTrace();
345                        }
346               
347                return newJob;
348        }
349
350        public long getSubmissionStartTime() {
351                return submissionStartTime;
352        }
353       
354        public boolean isSimStartTimeDefined(){
355                return workloadLoader.isSimStartTimeDefined();
356        }
357}
Note: See TracBrowser for help on using the repository browser.