source: DCWoRMS/trunk/src/simulator/workload/WorkloadLoader.java @ 801

Revision 801, 7.4 KB checked in by wojtekp, 12 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package simulator.workload;
2
3import org.qcg.broker.schemas.jobdesc.QcgJob;
4
5import java.io.IOException;
6import java.text.DateFormat;
7import java.text.ParseException;
8import java.text.SimpleDateFormat;
9import java.util.ArrayList;
10import java.util.Date;
11import java.util.List;
12import java.util.Locale;
13import java.util.Map;
14import java.util.TreeMap;
15
16import javax.xml.parsers.ParserConfigurationException;
17import javax.xml.xpath.XPathExpressionException;
18
19
20import org.apache.commons.logging.Log;
21import org.apache.commons.logging.LogFactory;
22import org.exolab.castor.xml.MarshalException;
23import org.exolab.castor.xml.ValidationException;
24
25import dcworms.schedframe.scheduling.utils.JobDescription;
26import dcworms.schedframe.scheduling.utils.TaskDescription;
27
28
29import simulator.utils.XsltTransformations;
30import simulator.workload.exceptons.NoSuchCommentException;
31import simulator.workload.reader.archive.AbstractWAParser;
32import simulator.workload.reader.archive.WAFields;
33import simulator.workload.reader.archive.WAReader;
34import simulator.workload.reader.archive.WAParser;
35import simulator.workload.reader.archive.gwf.GWFFields;
36import simulator.workload.reader.archive.swf.SWFFields;
37import simulator.workload.reader.xmlJob.XMLJobReader;
38
39/**
40 *
41 * @author Marcin Krystek
42 *
43 */
44public class WorkloadLoader {
45       
46        private static Log log = LogFactory.getLog(WorkloadLoader.class);
47       
48        protected XMLJobReader<QcgJob> xmlJobReader;
49        protected WAReader<QcgJob> waReader;
50        protected WAParser localWAParser;
51        protected int generatedJobsCnt;
52        protected int generatedTasksCnt;
53       
54        protected Date simulationStartTime;
55        protected boolean simStartTimeDefined = true;
56       
57        protected XsltTransformations xsltTransformation = null;
58       
59        protected Map<String, JobDescription> jobGridletsMap;
60       
61        public WorkloadLoader(XMLJobReader<QcgJob> xmlReader, WAReader<QcgJob> swfReader){
62                if(swfReader == null){
63                        throw new RuntimeException("Swf reader is required to build proper gridlerts.");
64                }
65                this.xmlJobReader = xmlReader;
66                this.waReader = swfReader;
67                this.generatedJobsCnt = 0;
68                this.generatedTasksCnt = 0;
69                this.jobGridletsMap = new TreeMap<String, JobDescription>();
70                try {
71                        this.xsltTransformation = new XsltTransformations();
72                } catch (XPathExpressionException e) {
73                        e.printStackTrace();
74                } catch (ParserConfigurationException e) {
75                        e.printStackTrace();
76                }
77        }
78       
79        protected JobDescription createJobDescription(String jobDesc, long puSpeed) throws IOException{
80               
81                JobDescription jobDescription = null;
82                try {
83                        jobDescription = this.xsltTransformation.splitJobToTasks(jobDesc);
84                } catch (Exception e) {
85                        throw new IOException(e.getMessage());
86                }
87               
88                // look for any
89               
90                if(jobDescription.size() == 0){
91                        if(log.isWarnEnabled())
92                                log.warn("Omiting job gridlet creation for job "+jobDescription.getDescription().getAppId()+". This job contains no tasks.");
93                        return null;
94                }
95               
96                // needed for Gridlet class
97               
98                String jobId = jobDescription.getJobId();
99                int parserType = localWAParser.getType();
100               
101                for(int i = 0; i < jobDescription.size(); i++){
102                        TaskDescription taskDesc = jobDescription.get(i);
103                       
104                        String waTaskDesc[] = localWAParser.readTask(jobId, taskDesc.getTaskId());
105                        if(waTaskDesc != null) {
106                               
107                                if(parserType == 0){
108                               
109                                        taskDesc.setUserDn(waTaskDesc[SWFFields.DATA_USER_ID]);
110                                       
111                                        long timeValue = Long.parseLong(waTaskDesc[SWFFields.DATA_SUBMIT_TIME]);
112                                        taskDesc.setSubmissionTime(timeValue);
113                                       
114                                        long waitTime = Long.parseLong(waTaskDesc[SWFFields.DATA_WAIT_TIME]);
115                                        taskDesc.setWorkloadLogWaitTime(waitTime);
116                                       
117                                        timeValue = Long.parseLong(waTaskDesc[SWFFields.DATA_RUN_TIME]);
118                                        if(timeValue <= 0)
119                                                return null;
120                                       
121                                        long allocProcesors = Long.parseLong(waTaskDesc[SWFFields.DATA_NUMBER_OF_ALLOCATED_PROCESSORS]);
122                                        if(allocProcesors <= 0)
123                                                return null;
124                                       
125                                        timeValue = timeValue * puSpeed * allocProcesors;
126                                        taskDesc.setTaskLength(timeValue);
127
128                                } else if(parserType == 1){
129                               
130                                        taskDesc.setUserDn(waTaskDesc[GWFFields.DATA_USER_ID]);
131                                       
132                                        long timeValue = Long.parseLong(waTaskDesc[GWFFields.DATA_SUBMIT_TIME]);
133                                        taskDesc.setSubmissionTime(timeValue);
134                                       
135                                        long waitTime = Long.parseLong(waTaskDesc[GWFFields.DATA_WAIT_TIME]);
136                                        taskDesc.setWorkloadLogWaitTime(waitTime);
137                                       
138                                        timeValue = Long.parseLong(waTaskDesc[GWFFields.DATA_RUN_TIME]);
139                                        if(timeValue <= 0)
140                                                return null;
141                                       
142                                        long allocProcesors = Long.parseLong(waTaskDesc[GWFFields.DATA_NPROCS]);
143                                        if(allocProcesors <= 0)
144                                                return null;
145                                       
146                                        timeValue = timeValue * puSpeed * allocProcesors;
147                                        taskDesc.setTaskLength(timeValue);
148                                }
149                        }
150                       
151                        this.generatedTasksCnt++;
152                }
153               
154                this.generatedJobsCnt++;
155               
156                return jobDescription;
157        }
158       
159       
160        public boolean load() throws IOException, MarshalException, ValidationException{
161               
162                // prepare local swf parser
163                this.localWAParser = AbstractWAParser.getInstance(this.waReader.getParser().getFileName());
164                this.localWAParser.loadHeader();
165
166                long puSpeed = getPUSpeed();
167                this.simulationStartTime = getSimulationStartTimeValue();
168               
169                this.localWAParser.reset();
170
171
172                // determine which reader should be used
173                String jobDesc = null;
174                JobDescription job = null;
175                       
176                if(this.xmlJobReader != null){ // use xml job reader. Xml job require xslt transformation
177                        while((jobDesc = this.xmlJobReader.readRaw()) != null){
178                                jobDesc = this.xsltTransformation.extendJobDescription(jobDesc);
179                                job = createJobDescription(jobDesc, puSpeed);
180                                if(job != null)
181                                        this.jobGridletsMap.put(job.getJobId(), job);
182                        }
183                } else {                                                // use swf job reader. Job created by this reader does not require xslt transformation
184                        while((jobDesc = this.waReader.readRaw()) != null){
185                                job = createJobDescription(jobDesc, puSpeed);
186                                if(job != null)
187                                        this.jobGridletsMap.put(job.getJobId(), job);
188                        }
189                }
190
191                this.localWAParser.close();
192                if(this.xmlJobReader != null)
193                        this.xmlJobReader.close();
194               
195                return true;
196        }
197       
198        protected long getPUSpeed(){
199                long puSpeed = 1;
200                try {
201                        puSpeed = Long.parseLong(this.localWAParser.getCommentValue(WAFields.COMMENT_PUSPEED));
202                } catch (NoSuchCommentException e1) {
203                        if(log.isWarnEnabled())
204                                log.warn("Assuming default processing unit speed = 1");
205                }
206                return puSpeed;
207        }
208       
209       
210        protected Date getSimulationStartTimeValue(){
211                Date simulationStartTime = null;
212                try {
213                       
214                        DateFormat df = new SimpleDateFormat("EEE MMM dd HH:mm:ss ZZZZ yyyy", Locale.ENGLISH);
215                        simulationStartTime = df.parse(this.localWAParser.getCommentValue(WAFields.COMMENT_STARTTIME));
216                } catch (NoSuchCommentException e){
217                        simStartTimeDefined = false;
218                        simulationStartTime = new Date(0);
219                        if(log.isWarnEnabled())
220                                log.warn("Assuming default simulation start time - "+simulationStartTime);
221                } catch (ParseException e) {
222                        simStartTimeDefined = false;
223                        simulationStartTime = new Date(0);
224                        if(log.isWarnEnabled())
225                                log.warn("Wrong format of simulation start time comment.\nAssuming default simulation start time - "+simulationStartTime);
226                }
227                return simulationStartTime;
228        }
229       
230        public int getJobCount(){
231                return generatedJobsCnt;
232        }
233       
234        public int getTaskCount(){
235                return generatedTasksCnt;
236        }
237       
238        public Date getSimulationStartTime(){
239                return this.simulationStartTime;
240        }
241       
242        public List<JobDescription> getJobs(){
243                ArrayList<JobDescription> list =
244                        new ArrayList<JobDescription>(this.jobGridletsMap.values());
245                return list;
246        }
247
248        public boolean isSimStartTimeDefined() {
249                return simStartTimeDefined;
250        }
251       
252}
Note: See TracBrowser for help on using the repository browser.