Production Ready Macros for SAS Application Developers
https://github.com/sasjs/core
mv_jobflow.sas
Go to the documentation of this file.
1 /**
2  @file
3  @brief Execute a series of job flows
4  @details Very (very) simple flow manager. Jobs execute in sequential waves,
5  all previous waves must finish successfully.
6 
7  The input table is formed as per below. Each observation represents one job.
8  Each variable is converted into a macro variable with the same name.
9 
10  ## Input table (minimum variables needed)
11 
12  @li _PROGRAM - Provides the path to the job itself
13  @li FLOW_ID - Numeric value, provides sequential ordering capability. Is
14  optional, will default to 0 if not provided.
15  @li _CONTEXTNAME - Dictates which context should be used to run the job. If
16  blank (or not provided), will default to `SAS Job Execution compute context`.
17 
18  Any additional variables provided in this table are converted into macro
19  variables and passed into the relevant job.
20 
21  |_PROGRAM| FLOW_ID (optional)| _CONTEXTNAME (optional) |
22  |---|---|---|
23  |/Public/jobs/somejob1|0|SAS Job Execution compute context|
24  |/Public/jobs/somejob2|0|SAS Job Execution compute context|
25 
26  ## Output table (minimum variables produced)
27 
28  @li _PROGRAM - the SAS Drive path of the job
29  @li URI - the URI of the executed job
30  @li STATE - the completed state of the job
31  @li TIMESTAMP - the datetime that the job completed
32  @li JOBPARAMS - the parameters that were passed to the job
33  @li FLOW_ID - the id of the flow in which the job was executed
34 
35  ![https://i.imgur.com/nZE9PvT.png](https://i.imgur.com/nZE9PvT.png)
36 
37  To avoid hammering the box with many hits in rapid succession, a one
38  second pause is made between every request.
39 
40 
41  ## Example
42 
43  First, compile the macros:
44 
45  filename mc url
46  "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
47  %inc mc;
48 
49  Next, create some jobs (in this case, as web services):
50 
51  filename ft15f001 temp;
52  parmcards4;
53  %put this is job: &_program;
54  %put this was run in flow &flow_id;
55  data ;
56  rand=ranuni(0)*&macrovar1;
57  do x=1 to rand;
58  y=rand*&macrovar2;
59  if y=100 then abort;
60  output;
61  end;
62  run;
63  ;;;;
64  %mv_createwebservice(path=/Public/temp,name=demo1)
65  %mv_createwebservice(path=/Public/temp,name=demo2)
66 
67  Prepare an input table with 60 executions:
68 
69  data work.inputjobs;
70  _contextName='SAS Job Execution compute context';
71  do flow_id=1 to 3;
72  do i=1 to 20;
73  _program='/Public/temp/demo1';
74  macrovar1=10*i;
75  macrovar2=4*i;
76  output;
77  i+1;
78  _program='/Public/temp/demo2';
79  macrovar1=40*i;
80  macrovar2=44*i;
81  output;
82  end;
83  end;
84  run;
85 
86  Trigger the flow
87 
88  %mv_jobflow(inds=work.inputjobs
89  ,maxconcurrency=4
90  ,outds=work.results
91  ,outref=myjoblog
92  )
93 
94  data _null_;
95  infile myjoblog;
96  input; put _infile_;
97  run;
98 
99 
100  @param [in] access_token_var= The global macro variable to contain the access token
101  @param [in] grant_type= valid values:
102  @li password
103  @li authorization_code
104  @li detect - will check if access_token exists, if not will use sas_services if
105  a SASStudioV session else authorization_code. Default option.
106  @li sas_services - will use oauth_bearer=sas_services
107  @param [in] inds= The input dataset containing a list of jobs and parameters
108  @param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
109  @param [in] mdebug= set to 1 to enable DEBUG messages
110  @param [out] outds= The output dataset containing the results
111  @param [out] outref= The output fileref to which to append the log file(s).
112 
113  @version VIYA V.03.05
114  @author Allan Bowe, source: https://github.com/sasjs/core
115 
116  <h4> SAS Macros </h4>
117  @li mf_nobs.sas
118  @li mp_abort.sas
119  @li mf_getplatform.sas
120  @li mf_getuniquefileref.sas
121  @li mf_existvarlist.sas
122  @li mv_jobwaitfor.sas
123  @li mv_jobexecute.sas
124 
125 **/
126 
127 %macro mv_jobflow(inds=0,outds=work.mv_jobflow
128  ,maxconcurrency=8
129  ,access_token_var=ACCESS_TOKEN
130  ,grant_type=sas_services
131  ,outref=0
132  ,mdebug=0
133  );
134 %local oauth_bearer;
135 %if &grant_type=detect %then %do;
136  %if %symexist(&access_token_var) %then %let grant_type=authorization_code;
137  %else %let grant_type=sas_services;
138 %end;
139 %if &grant_type=sas_services %then %do;
140  %let oauth_bearer=oauth_bearer=sas_services;
141  %let &access_token_var=;
142 %end;
143 
144 %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
145  and &grant_type ne sas_services
146  )
147  ,mac=&sysmacroname
148  ,msg=%str(Invalid value for grant_type: &grant_type)
149 )
150 
151 %mp_abort(iftrue=("&inds"="0")
152  ,mac=&sysmacroname
153  ,msg=%str(Input dataset was not provided)
154 )
155 %mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0)
156  ,mac=&sysmacroname
157  ,msg=%str(The _PROGRAM column must exist on input dataset &inds)
158 )
159 %mp_abort(iftrue=(&maxconcurrency<1)
160  ,mac=&sysmacroname
161  ,msg=%str(The maxconcurrency variable should be a positive integer)
162 )
163 
164 /* set defaults if not provided */
165 %if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %do;
166  data &inds;
167  %if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %do;
168  length _CONTEXTNAME $128;
169  retain _CONTEXTNAME "SAS Job Execution compute context";
170  %end;
171  %if %mf_existvarList(&inds,FLOW_ID)=0 %then %do;
172  retain FLOW_ID 0;
173  %end;
174  set &inds;
175  run;
176 %end;
177 
178 %local missings;
179 proc sql noprint;
180 select count(*) into: missings
181  from &inds
182  where flow_id is null or _program is null;
183 %mp_abort(iftrue=(&missings>0)
184  ,mac=&sysmacroname
185  ,msg=%str(input dataset contains &missings missing values for FLOW_ID or _PROGRAM)
186 )
187 
188 %if %mf_nobs(&inds)=0 %then %do;
189  %put No observations in &inds! Leaving macro &sysmacroname;
190  %return;
191 %end;
192 
193 /* ensure output table is available */
194 data &outds;run;
195 proc sql;
196 drop table &outds;
197 
198 options noquotelenmax;
199 %local base_uri; /* location of rest apis */
200 %let base_uri=%mf_getplatform(VIYARESTAPI);
201 
202 
203 /* get flows */
204 proc sort data=&inds;
205  by flow_id;
206 run;
207 data _null_;
208  set &inds (keep=flow_id) end=last;
209  by flow_id;
210  if last.flow_id then do;
211  cnt+1;
212  call symputx(cats('flow',cnt),flow_id,'l');
213  end;
214  if last then call symputx('flowcnt',cnt,'l');
215 run;
216 
217 /* prepare temporary datasets and frefs */
218 %local fid jid jds jjson jdsapp jdsrunning jdswaitfor jfref;
219 data;run;%let jds=&syslast;
220 data;run;%let jjson=&syslast;
221 data;run;%let jdsapp=&syslast;
222 data;run;%let jdsrunning=&syslast;
223 data;run;%let jdswaitfor=&syslast;
224 %let jfref=%mf_getuniquefileref();
225 
226 /* start loop */
227 %do fid=1 %to &flowcnt;
228  %put preparing job attributes for flow &&flow&fid;
229  %local jds jcnt;
230  data &jds(drop=_contextName _program);
231  set &inds(where=(flow_id=&&flow&fid));
232  if _contextName='' then _contextName="SAS Job Execution compute context";
233  call symputx(cats('job',_n_),_program,'l');
234  call symputx(cats('context',_n_),_contextName,'l');
235  call symputx('jcnt',_n_,'l');
236  run;
237  %put exporting job variables in json format;
238  %do jid=1 %to &jcnt;
239  data &jjson;
240  set &jds;
241  if _n_=&jid then do;
242  output;
243  stop;
244  end;
245  run;
246  proc json out=&jfref;
247  export &jjson / nosastags fmtnumeric;
248  run;
249  data _null_;
250  infile &jfref lrecl=32767;
251  input;
252  jparams='jparams'!!left(symget('jid'));
253  call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
254  run;
255  %local jobuid&jid;
256  %let jobuid&jid=0; /* used in next loop */
257  %end;
258  %local concurrency completed;
259  %let concurrency=0;
260  %let completed=0;
261  proc sql; drop table &jdsrunning;
262  %do jid=1 %to &jcnt;
263  /**
264  * now we can execute the jobs up to the maxconcurrency setting
265  */
266  %if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
267 
268  /* check to see if the job finished in the previous round */
269  %if %sysfunc(exist(&outds))=1 %then %do;
270  %local jobcheck; %let jobcheck=0;
271  proc sql noprint;
272  select count(*) into: jobcheck
273  from &outds where uuid="&&jobuid&jid";
274  %if &jobcheck>0 %then %do;
275  %put &&job&jid in flow &fid with uid &&jobuid&jid completed!;
276  %let job&jid=0;
277  %end;
278  %end;
279 
280  /* check if job was triggered and if so, if we have enough slots to run */
281  %if "&&jobuid&jid"="0" and &concurrency<&maxconcurrency %then %do;
282  %local jobname jobpath;
283  %let jobname=%scan(&&job&jid,-1,/);
284  %let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
285  %put executing &jobpath/&jobname with paramstring &&jparams&jid;
286  %mv_jobexecute(path=&jobpath
287  ,name=&jobname
288  ,paramstring=%superq(jparams&jid)
289  ,outds=&jdsapp
290  )
291  data &jdsapp;
292  format jobparams $32767.;
293  set &jdsapp(where=(method='GET' and rel='state'));
294  jobparams=symget("jparams&jid");
295  /* uri here has the /state suffix */
296  uuid=scan(uri,-2,'/');
297  call symputx("jobuid&jid",uuid,'l');
298  run;
299  proc append base=&jdsrunning data=&jdsapp;
300  run;
301  %let concurrency=%eval(&concurrency+1);
302  /* sleep one second after every request to smooth the impact */
303  data _null_;
304  call sleep(1,1);
305  run;
306  %end;
307  %end;
308  %if &jid=&jcnt %then %do;
309  /* we are at the end of the loop - time to see which jobs have finished */
310  %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref)
311  %local done;
312  %let done=%mf_nobs(&jdswaitfor);
313  %if &done>0 %then %do;
314  %let completed=%eval(&completed+&done);
315  %let concurrency=%eval(&concurrency-&done);
316  data &jdsapp;
317  set &jdswaitfor;
318  flow_id=&&flow&fid;
319  uuid=scan(uri,-1,'/');
320  run;
321  proc append base=&outds data=&jdsapp;
322  run;
323  %end;
324  proc sql;
325  delete from &jdsrunning
326  where uuid in (select uuid from &outds
327  where state in ('canceled','completed','failed')
328  );
329 
330  /* loop again if jobs are left */
331  %if &completed < &jcnt %then %do;
332  %let jid=0;
333  %put looping flow &fid again - &completed of &jcnt jobs completed, &concurrency jobs running;
334  %end;
335  %end;
336  %end;
337  /* back up and execute the next flow */
338 %end;
339 
340 %if &mdebug=1 %then %do;
341  %put _local_;
342 %end;
343 
344 %mend;