source: DCWoRMS/branches/coolemall/src/simulator/workload/WorkloadLoader.java @ 1357

Revision 1357, 9.8 KB checked in by wojtekp, 11 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package simulator.workload;
2
3import java.io.BufferedReader;
4import java.io.FileReader;
5import java.io.IOException;
6import java.text.DateFormat;
7import java.text.ParseException;
8import java.text.SimpleDateFormat;
9import java.util.ArrayList;
10import java.util.Date;
11import java.util.LinkedHashMap;
12import java.util.List;
13import java.util.Locale;
14import java.util.Map;
15import java.util.TreeMap;
16
17import javax.xml.parsers.ParserConfigurationException;
18import javax.xml.xpath.XPathExpressionException;
19
20import org.apache.commons.io.FilenameUtils;
21import org.apache.commons.logging.Log;
22import org.apache.commons.logging.LogFactory;
23import org.exolab.castor.xml.MarshalException;
24import org.exolab.castor.xml.ValidationException;
25
26import simulator.utils.XsltTransformations;
27import simulator.workload.exceptons.NoSuchCommentException;
28import simulator.workload.reader.archive.AbstractWAParser;
29import simulator.workload.reader.archive.QcgWAJobReader;
30import simulator.workload.reader.archive.WAFields;
31import simulator.workload.reader.archive.WAParser;
32import simulator.workload.reader.archive.WAReader;
33import simulator.workload.reader.archive.gwf.GWFFields;
34import simulator.workload.reader.archive.swf.SWFFields;
35import simulator.workload.reader.xmlJob.XMLJobReader;
36import dcworms.schedframe.scheduling.utils.ApplicationProfileDescription;
37import dcworms.schedframe.scheduling.utils.JobDescription;
38import dcworms.schedframe.scheduling.utils.TaskDescription;
39
40/**
41 *
42 * @author Marcin Krystek
43 *
44 */
45public class WorkloadLoader {
46       
47        private static Log log = LogFactory.getLog(WorkloadLoader.class);
48       
49        protected XMLJobReader<org.qcg.broker.schemas.jobdesc.Job> xmlJobReader;
50        protected WAReader<org.qcg.broker.schemas.jobdesc.Job> waReader;
51        protected WAParser localWAParser;
52        protected int generatedJobsCnt;
53        protected int generatedTasksCnt;
54       
55        protected Date simulationStartTime;
56        protected boolean simStartTimeDefined = true;
57       
58        protected XsltTransformations xsltTransformation = null;
59       
60        protected Map<String, JobDescription> jobsMap;
61       
62        protected String appProfilesFolder;
63        protected Map<String, ApplicationProfileDescription> applicationProfiles;
64
65        public WorkloadLoader(XMLJobReader<org.qcg.broker.schemas.jobdesc.Job> xmlReader, WAReader<org.qcg.broker.schemas.jobdesc.Job> swfReader, String appProfilesFolder){
66                if(swfReader == null){
67                        throw new RuntimeException("Swf reader is required to build proper tasks.");
68                }
69                this.xmlJobReader = xmlReader;
70                this.waReader = swfReader;
71                this.generatedJobsCnt = 0;
72                this.generatedTasksCnt = 0;
73                this.jobsMap = new LinkedHashMap<String, JobDescription>();
74                this.appProfilesFolder = appProfilesFolder;
75                this.applicationProfiles = new TreeMap<String, ApplicationProfileDescription>();
76                try {
77                        this.xsltTransformation = new XsltTransformations();
78                } catch (XPathExpressionException e) {
79                        e.printStackTrace();
80                } catch (ParserConfigurationException e) {
81                        e.printStackTrace();
82                }
83        }
84       
85        protected JobDescription createJobDescription(String jobDesc, long puSpeed) throws IOException{
86               
87                JobDescription jobDescription = null;
88                try {
89                        jobDescription = this.xsltTransformation.splitJobToTasks(jobDesc);
90                } catch (Exception e) {
91                        throw new IOException(e.getMessage());
92                }
93               
94                // look for any
95               
96                if(jobDescription.size() == 0){
97                        if(log.isWarnEnabled())
98                                log.warn("Omitting tasks creation for job "+jobDescription.getDescription().getId()+". This job contains no tasks.");
99                        return null;
100                }
101               
102                // needed for Executable class
103               
104                String jobId = jobDescription.getJobId();
105                int parserType = localWAParser.getType();
106               
107                for(int i = 0; i < jobDescription.size(); i++){
108                        TaskDescription taskDesc = jobDescription.get(i);
109                       
110                        String waTaskDesc[] = localWAParser.readTask(jobId, taskDesc.getTaskId());
111                       
112                        if(waTaskDesc != null) {
113                               
114                                if(parserType == 0){
115                               
116                                        taskDesc.setUserDn(waTaskDesc[SWFFields.DATA_USER_ID]);
117                                       
118                                        long timeValue = Long.parseLong(waTaskDesc[SWFFields.DATA_SUBMIT_TIME]);
119                                        taskDesc.setSubmissionTime(timeValue);
120                                       
121                                        long waitTime = Long.parseLong(waTaskDesc[SWFFields.DATA_WAIT_TIME]);
122                                        taskDesc.setWorkloadLogWaitTime(waitTime);
123                                       
124                                        timeValue = Long.parseLong(waTaskDesc[SWFFields.DATA_RUN_TIME]);
125                                        if(timeValue <= 0)
126                                                return null;
127                                       
128                                        long allocProcesors = Long.parseLong(waTaskDesc[SWFFields.DATA_NUMBER_OF_ALLOCATED_PROCESSORS]);
129                                        if(allocProcesors <= 0)
130                                                return null;
131                                       
132                                        timeValue = timeValue * puSpeed * allocProcesors;
133                                        taskDesc.setTaskLength(timeValue);
134                                       
135                                       
136                                        //ENABLES MERGING PRECEDING CONSTRAINST COMING BOTH FROM SWF AND XML FILES
137                                        /*String precTaskId = waTaskDesc[SWFFields.DATA_PRECEDING_JOB_NUMBER];
138                                        String [] xmlJobId = localWAParser.getIDMapping(precTaskId);
139                                       
140                                        Workflow workflow = taskDesc.getDescription().getWorkflow();
141                                        if(workflow == null){
142                                                workflow = new Workflow();
143                                                ParentType parent = new ParentType();
144                                                parent.setTriggerState(TaskStatesName.FINISHED);
145                                                parent.setContent(xmlJobId[0] + "_" + xmlJobId[1]);
146                                                workflow.addParent(parent);
147                                                taskDesc.getDescription().setWorkflow(workflow);
148                                        }*/
149
150                                } else if(parserType == 1){
151                               
152                                        taskDesc.setUserDn(waTaskDesc[GWFFields.DATA_USER_ID]);
153                                       
154                                        long timeValue = Long.parseLong(waTaskDesc[GWFFields.DATA_SUBMIT_TIME]);
155                                        taskDesc.setSubmissionTime(timeValue);
156                                       
157                                        long waitTime = Long.parseLong(waTaskDesc[GWFFields.DATA_WAIT_TIME]);
158                                        taskDesc.setWorkloadLogWaitTime(waitTime);
159                                       
160                                        timeValue = Long.parseLong(waTaskDesc[GWFFields.DATA_RUN_TIME]);
161                                        if(timeValue <= 0)
162                                                return null;
163                                       
164                                        long allocProcesors = Long.parseLong(waTaskDesc[GWFFields.DATA_NPROCS]);
165                                        if(allocProcesors <= 0)
166                                                return null;
167                                       
168                                        timeValue = timeValue * puSpeed * allocProcesors;
169                                        taskDesc.setTaskLength(timeValue);
170                                }
171                        }
172                       
173                        this.generatedTasksCnt++;
174                }
175               
176                this.generatedJobsCnt++;
177               
178                return jobDescription;
179        }
180       
181       
182        public boolean load() throws IOException, MarshalException, ValidationException{
183               
184                // prepare local swf parser
185                this.localWAParser = AbstractWAParser.getInstance(this.waReader.getParser().getFileName());
186                this.localWAParser.loadHeader();
187
188                long puSpeed = getPUSpeed();
189                this.simulationStartTime = getSimulationStartTimeValue();
190               
191                this.localWAParser.reset();
192
193
194                // determine which reader should be used
195                String xmlJob = null;
196                JobDescription jobDesc = null;
197                       
198                for(String key: waReader.getParser().getAppProfilesLocation().keySet()){
199                        String xmlApp = loadApplicationProfile(waReader.getParser().getAppProfilesLocation().get(key));
200                        xmlApp = this.xsltTransformation.extendJobDescription(xmlApp);
201                        ApplicationProfileDescription app = this.xsltTransformation.getApplicationProfileDescription(xmlApp);
202                        this.applicationProfiles.put(key, app);
203                }
204               
205                if(this.xmlJobReader != null){ // use xml job reader. Xml job require xslt transformation
206                        while((xmlJob = this.xmlJobReader.readRaw()) != null){
207                                xmlJob = this.xsltTransformation.extendJobDescription(xmlJob);
208                                jobDesc = createJobDescription(xmlJob, puSpeed);
209                                if(jobDesc != null)
210                                        this.jobsMap.put(jobDesc.getJobId(), jobDesc);
211                        }
212                } else {                                                // use swf job reader. Job created by this reader does not require xslt transformation
213                       
214                        while((xmlJob = this.waReader.readRaw()) != null){
215                                QcgWAJobReader qcgReader = (QcgWAJobReader)waReader;
216                                if(!applicationProfiles.isEmpty())
217                                        xmlJob = qcgReader.mergeSwfAndAppProfile(applicationProfiles, xmlJob);
218                                jobDesc = createJobDescription(xmlJob, puSpeed);
219                                if(jobDesc != null)
220                                        this.jobsMap.put(jobDesc.getJobId(), jobDesc);
221                        }
222                }
223
224                this.localWAParser.close();
225                if(this.xmlJobReader != null)
226                        this.xmlJobReader.close();
227               
228                return true;
229        }
230       
231        protected long getPUSpeed(){
232                long puSpeed = 1;
233                try {
234                        puSpeed = Long.parseLong(this.localWAParser.getCommentValue(WAFields.COMMENT_PUSPEED));
235                } catch (NoSuchCommentException e1) {
236                        if(log.isWarnEnabled())
237                                log.warn("Assuming default processing unit speed = 1");
238                }
239                return puSpeed;
240        }
241       
242       
243        protected Date getSimulationStartTimeValue(){
244                Date simulationStartTime = null;
245                try {
246                       
247                        DateFormat df = new SimpleDateFormat("EEE MMM dd HH:mm:ss ZZZZ yyyy", Locale.ENGLISH);
248                        simulationStartTime = df.parse(this.localWAParser.getCommentValue(WAFields.COMMENT_STARTTIME));
249                } catch (NoSuchCommentException e){
250                        simStartTimeDefined = false;
251                        simulationStartTime = new Date(0);
252                        if(log.isWarnEnabled())
253                                log.warn("Assuming default simulation start time - "+simulationStartTime);
254                } catch (ParseException e) {
255                        simStartTimeDefined = false;
256                        simulationStartTime = new Date(0);
257                        if(log.isWarnEnabled())
258                                log.warn("Wrong format of simulation start time comment.\nAssuming default simulation start time - "+simulationStartTime);
259                }
260                return simulationStartTime;
261        }
262       
263        public int getJobCount(){
264                return generatedJobsCnt;
265        }
266       
267        public int getTaskCount(){
268                return generatedTasksCnt;
269        }
270       
271        public Date getSimulationStartTime(){
272                return this.simulationStartTime;
273        }
274       
275        public List<JobDescription> getJobs(){
276                ArrayList<JobDescription> list =
277                        new ArrayList<JobDescription>(this.jobsMap.values());
278                return list;
279        }
280
281        public boolean isSimStartTimeDefined() {
282                return simStartTimeDefined;
283        }
284
285        public String loadApplicationProfile(String pathToAppProfile) throws IOException{
286               
287                BufferedReader reader = null;
288                StringBuffer buffer = new StringBuffer();
289
290                String localPathToAppProfile;
291               
292                if(appProfilesFolder != null) {
293                        String folderName = FilenameUtils.getFullPath(appProfilesFolder);
294                        String fileName = FilenameUtils.getName(pathToAppProfile);
295                        localPathToAppProfile = folderName + fileName;
296                } else {
297                        localPathToAppProfile = pathToAppProfile;
298                }
299                try {
300                        reader = new BufferedReader(new FileReader(localPathToAppProfile ));
301                        String line = null;
302                       
303                        while((line = reader.readLine()) != null){
304                                buffer.append(line);
305                        }
306                       
307                } finally{
308                        reader.close();
309                }
310               
311                return buffer.toString();
312        }
313}
Note: See TracBrowser for help on using the repository browser.