source: DCWoRMS/trunk/src/simulator/workload/WorkloadLoader.java @ 490

Revision 490, 7.2 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package simulator.workload;
2
3import org.qcg.broker.schemas.jobdesc.QcgJob;
4
5import java.io.IOException;
6import java.text.DateFormat;
7import java.text.ParseException;
8import java.text.SimpleDateFormat;
9import java.util.ArrayList;
10import java.util.Date;
11import java.util.List;
12import java.util.Locale;
13import java.util.Map;
14import java.util.TreeMap;
15
16import javax.xml.parsers.ParserConfigurationException;
17import javax.xml.xpath.XPathExpressionException;
18
19
20import org.apache.commons.logging.Log;
21import org.apache.commons.logging.LogFactory;
22import org.exolab.castor.xml.MarshalException;
23import org.exolab.castor.xml.ValidationException;
24
25import dcworms.schedframe.scheduling.utils.JobDescription;
26import dcworms.schedframe.scheduling.utils.TaskDescription;
27
28
29import simulator.utils.XsltTransformations;
30import simulator.workload.exceptons.NoSuchCommentException;
31import simulator.workload.reader.archive.AbstractWAParser;
32import simulator.workload.reader.archive.WAFields;
33import simulator.workload.reader.archive.WAReader;
34import simulator.workload.reader.archive.WAParser;
35import simulator.workload.reader.archive.gwf.GWFFields;
36import simulator.workload.reader.archive.swf.SWFFields;
37import simulator.workload.reader.xmlJob.XMLJobReader;
38
39/**
40 *
41 * @author Marcin Krystek
42 *
43 */
44public class WorkloadLoader {
45       
46        private static Log log = LogFactory.getLog(WorkloadLoader.class);
47       
48        protected XMLJobReader<QcgJob> xmlJobReader;
49        protected WAReader<QcgJob> waReader;
50        protected WAParser localWAParser;
51        protected int generatedJobsCnt;
52        protected int generatedTasksCnt;
53       
54        protected Date simulationStartTime;
55       
56        protected XsltTransformations xsltTransformation = null;
57       
58        protected Map<String, JobDescription> jobGridletsMap;
59       
60        public WorkloadLoader(XMLJobReader<QcgJob> xmlReader, WAReader<QcgJob> swfReader){
61                if(swfReader == null){
62                        throw new RuntimeException("Swf reader is required to build proper gridlerts.");
63                }
64                this.xmlJobReader = xmlReader;
65                this.waReader = swfReader;
66                this.generatedJobsCnt = 0;
67                this.generatedTasksCnt = 0;
68                this.jobGridletsMap = new TreeMap<String, JobDescription>();
69                try {
70                        this.xsltTransformation = new XsltTransformations();
71                } catch (XPathExpressionException e) {
72                        e.printStackTrace();
73                } catch (ParserConfigurationException e) {
74                        e.printStackTrace();
75                }
76        }
77       
78        protected JobDescription createJobDescription(String jobDesc, long puSpeed) throws IOException{
79               
80                JobDescription jobDescription = null;
81                try {
82                        jobDescription = this.xsltTransformation.splitJobToTasks(jobDesc);
83                } catch (Exception e) {
84                        throw new IOException(e.getMessage());
85                }
86               
87                // look for any
88               
89                if(jobDescription.size() == 0){
90                        if(log.isWarnEnabled())
91                                log.warn("Omiting job gridlet creation for job "+jobDescription.getDescription().getAppId()+". This job contains no tasks.");
92                        return null;
93                }
94               
95                // needed for Gridlet class
96               
97                String jobId = jobDescription.getJobId();
98                int parserType = localWAParser.getType();
99               
100                for(int i = 0; i < jobDescription.size(); i++){
101                        TaskDescription taskDesc = jobDescription.get(i);
102                       
103                        String waTaskDesc[] = localWAParser.readTask(jobId, taskDesc.getTaskId());
104                        if(waTaskDesc != null) {
105                               
106                                if(parserType == 0){
107                               
108                                        taskDesc.setUserDn(waTaskDesc[SWFFields.DATA_USER_ID]);
109                                       
110                                        long timeValue = Long.parseLong(waTaskDesc[SWFFields.DATA_SUBMIT_TIME]);
111                                        taskDesc.setSubmissionTime(timeValue);
112                                       
113                                        long waitTime = Long.parseLong(waTaskDesc[GWFFields.DATA_WAIT_TIME]);
114                                        taskDesc.setWorkloadLogWaitTime(waitTime);
115                                       
116                                        timeValue = Long.parseLong(waTaskDesc[SWFFields.DATA_RUN_TIME]);
117                                        if(timeValue <= 0)
118                                                return null;
119                                       
120                                        long allocProcesors = Long.parseLong(waTaskDesc[SWFFields.DATA_NUMBER_OF_ALLOCATED_PROCESSORS]);
121                                        if(allocProcesors <= 0)
122                                                return null;
123                                       
124                                        timeValue = timeValue * puSpeed * allocProcesors;
125                                        taskDesc.setTaskLength(timeValue);
126
127                                } else if(parserType == 1){
128                               
129                                        taskDesc.setUserDn(waTaskDesc[GWFFields.DATA_USER_ID]);
130                                       
131                                        long timeValue = Long.parseLong(waTaskDesc[GWFFields.DATA_SUBMIT_TIME]);
132                                        taskDesc.setSubmissionTime(timeValue);
133                                       
134                                        long waitTime = Long.parseLong(waTaskDesc[GWFFields.DATA_WAIT_TIME]);
135                                        taskDesc.setWorkloadLogWaitTime(waitTime);
136                                       
137                                        timeValue = Long.parseLong(waTaskDesc[GWFFields.DATA_RUN_TIME]);
138                                        if(timeValue <= 0)
139                                                return null;
140                                       
141                                        long allocProcesors = Long.parseLong(waTaskDesc[GWFFields.DATA_NPROCS]);
142                                        if(allocProcesors <= 0)
143                                                return null;
144                                       
145                                        timeValue = timeValue * puSpeed * allocProcesors;
146                                        taskDesc.setTaskLength(timeValue);
147                                }
148                        }
149                       
150                        this.generatedTasksCnt++;
151                }
152               
153                this.generatedJobsCnt++;
154               
155                return jobDescription;
156        }
157       
158       
159        public boolean load() throws IOException, MarshalException, ValidationException{
160               
161                // prepare local swf parser
162                this.localWAParser = AbstractWAParser.getInstance(this.waReader.getParser().getFileName());
163                this.localWAParser.loadHeader();
164
165                long puSpeed = getPUSpeed();
166                this.simulationStartTime = getSimulationStartTimeValue();
167               
168                this.localWAParser.reset();
169
170
171                // determine which reader should be used
172                String jobDesc = null;
173                JobDescription job = null;
174                       
175                if(this.xmlJobReader != null){ // use xml job reader. Xml job require xslt transformation
176                        while((jobDesc = this.xmlJobReader.readRaw()) != null){
177                                jobDesc = this.xsltTransformation.extendJobDescription(jobDesc);
178                                job = createJobDescription(jobDesc, puSpeed);
179                                if(job != null)
180                                        this.jobGridletsMap.put(job.getJobId(), job);
181                        }
182                } else {                                                // use swf job reader. Job created by this reader does not require xslt transformation
183                        while((jobDesc = this.waReader.readRaw()) != null){
184                                job = createJobDescription(jobDesc, puSpeed);
185                                if(job != null)
186                                        this.jobGridletsMap.put(job.getJobId(), job);
187                        }
188                }
189
190                this.localWAParser.close();
191                if(this.xmlJobReader != null)
192                        this.xmlJobReader.close();
193               
194                return true;
195        }
196       
197        protected long getPUSpeed(){
198                long puSpeed = 1;
199                try {
200                        puSpeed = Long.parseLong(this.localWAParser.getCommentValue(WAFields.COMMENT_PUSPEED));
201                } catch (NoSuchCommentException e1) {
202                        if(log.isWarnEnabled())
203                                log.warn("Assuming default processing unit speed = 1");
204                }
205                return puSpeed;
206        }
207       
208       
209        protected Date getSimulationStartTimeValue(){
210                Date simulationStartTime = null;
211                try {
212                       
213                        DateFormat df = new SimpleDateFormat("EEE MMM dd HH:mm:ss ZZZZ yyyy", Locale.ENGLISH);
214                        simulationStartTime = df.parse(this.localWAParser.getCommentValue(WAFields.COMMENT_STARTTIME));
215                } catch (NoSuchCommentException e){
216                        simulationStartTime = new Date(0);
217                        if(log.isWarnEnabled())
218                                log.warn("Assuming default simulation start time - "+simulationStartTime);
219                } catch (ParseException e) {
220                        simulationStartTime = new Date(0);
221                        if(log.isWarnEnabled())
222                                log.warn("Wrong format of simulation start time comment.\nAssuming default simulation start time - "+simulationStartTime);
223                }
224                return simulationStartTime;
225        }
226       
227        public int getJobCount(){
228                return generatedJobsCnt;
229        }
230       
231        public int getTaskCount(){
232                return generatedTasksCnt;
233        }
234       
235        public Date getSimulationStartTime(){
236                return this.simulationStartTime;
237        }
238       
239        public List<JobDescription> getJobs(){
240                ArrayList<JobDescription> list =
241                        new ArrayList<JobDescription>(this.jobGridletsMap.values());
242                return list;
243        }
244       
245}
Note: See TracBrowser for help on using the repository browser.