Macros for SAS Application Developers
https://github.com/sasjs/core
Loading...
Searching...
No Matches
mv_jobflow.sas
Go to the documentation of this file.
1/**
2 @file
3 @brief Execute a series of job flows
4 @details Very (very) simple flow manager. Jobs execute in sequential waves,
5 all previous waves must finish successfully.
6
7 The input table is formed as per below. Each observation represents one job.
8 Each variable is converted into a macro variable with the same name.
9
10 ## Input table (minimum variables needed)
11
12 @li _PROGRAM - Provides the path to the job itself
13 @li FLOW_ID - Numeric value, provides sequential ordering capability. Is
14 optional, will default to 0 if not provided.
15 @li _CONTEXTNAME - Dictates which context should be used to run the job. If
16 blank, or not provided, will default to `SAS Job Execution compute context`.
17
18 Any additional variables provided in this table are converted into macro
19 variables and passed into the relevant job.
20
21 |_PROGRAM| FLOW_ID (optional)| _CONTEXTNAME (optional) |
22 |---|---|---|
23 |/Public/jobs/somejob1|0|SAS Job Execution compute context|
24 |/Public/jobs/somejob2|0|SAS Job Execution compute context|
25
26 ## Output table (minimum variables produced)
27
28 @li _PROGRAM - the SAS Drive path of the job
29 @li URI - the URI of the executed job
30 @li STATE - the completed state of the job
31 @li TIMESTAMP - the datetime that the job completed
32 @li JOBPARAMS - the parameters that were passed to the job
33 @li FLOW_ID - the id of the flow in which the job was executed
34
35 ![https://i.imgur.com/nZE9PvT.png](https://i.imgur.com/nZE9PvT.png)
36
37 To avoid hammering the box with many hits in rapid succession, a one
38 second pause is made between every request.
39
40
41 ## Example
42
43 First, compile the macros:
44
45 filename mc url
46 "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
47 %inc mc;
48
49 Next, create some jobs (in this case, as web services):
50
51 filename ft15f001 temp;
52 parmcards4;
53 %put this is job: &_program;
54 %put this was run in flow &flow_id;
55 data ;
56 rand=ranuni(0)*&macrovar1;
57 do x=1 to rand;
58 y=rand*&macrovar2;
59 if y=100 then abort;
60 output;
61 end;
62 run;
63 ;;;;
64 %mv_createwebservice(path=/Public/temp,name=demo1)
65 %mv_createwebservice(path=/Public/temp,name=demo2)
66
67 Prepare an input table with 60 executions:
68
69 data work.inputjobs;
70 _contextName='SAS Job Execution compute context';
71 do flow_id=1 to 3;
72 do i=1 to 20;
73 _program='/Public/temp/demo1';
74 macrovar1=10*i;
75 macrovar2=4*i;
76 output;
77 i+1;
78 _program='/Public/temp/demo2';
79 macrovar1=40*i;
80 macrovar2=44*i;
81 output;
82 end;
83 end;
84 run;
85
86 Trigger the flow
87
88 %mv_jobflow(inds=work.inputjobs
89 ,maxconcurrency=4
90 ,outds=work.results
91 ,outref=myjoblog
92 )
93
94 data _null_;
95 infile myjoblog;
96 input; put _infile_;
97 run;
98
99
100 @param [in] access_token_var= The global macro variable to contain the
101 access token
102 @param [in] grant_type= valid values:
103 @li password
104 @li authorization_code
105 @li detect - will check if access_token exists, if not will use
106 sas_services if a SASStudioV session else authorization_code. Default
107 option.
108 @li sas_services - will use oauth_bearer=sas_services
109 @param [in] inds= The input dataset containing a list of jobs and parameters
110 @param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
111 @param [in] raise_err=0 Set to 1 to raise SYSCC when a job does not complete
112 succcessfully
113 @param [in] mdebug= set to 1 to enable DEBUG messages
114 @param [out] outds= The output dataset containing the results
115 @param [out] outref= The output fileref to which to append the log file(s).
116
117 @version VIYA V.03.05
118 @author Allan Bowe, source: https://github.com/sasjs/core
119
120 <h4> SAS Macros </h4>
121 @li mf_nobs.sas
122 @li mp_abort.sas
123 @li mf_getplatform.sas
124 @li mf_getvarlist.sas
125 @li mf_existvarlist.sas
126 @li mv_jobwaitfor.sas
127 @li mv_jobexecute.sas
128
129**/
130
131%macro mv_jobflow(inds=0,outds=work.mv_jobflow
132 ,maxconcurrency=8
133 ,access_token_var=ACCESS_TOKEN
134 ,grant_type=sas_services
135 ,outref=0
136 ,raise_err=0
137 ,mdebug=0
138 );
139%local dbg;
140%if &mdebug=1 %then %do;
141 %put &sysmacroname entry vars:;
142 %put _local_;
143 %put inds vars:;
144 data _null_;
145 set &inds;
146 putlog (_all_)(=);
147 run;
148%end;
149%else %let dbg=*;
150
151%local oauth_bearer;
152%if &grant_type=detect %then %do;
153 %if %symexist(&access_token_var) %then %let grant_type=authorization_code;
154 %else %let grant_type=sas_services;
155%end;
156%if &grant_type=sas_services %then %do;
157 %let oauth_bearer=oauth_bearer=sas_services;
158 %let &access_token_var=;
159%end;
160
161%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
162 and &grant_type ne sas_services
163 )
164 ,mac=&sysmacroname
165 ,msg=%str(Invalid value for grant_type: &grant_type)
166)
167
168%mp_abort(iftrue=("&inds"="0")
169 ,mac=&sysmacroname
170 ,msg=%str(Input dataset was not provided)
171)
172%mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0)
173 ,mac=&sysmacroname
174 ,msg=%str(The _PROGRAM column must exist on input dataset &inds)
175)
176%mp_abort(iftrue=(&maxconcurrency<1)
177 ,mac=&sysmacroname
178 ,msg=%str(The maxconcurrency variable should be a positive integer)
179)
180
181/* set defaults if not provided */
182%if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %do;
183 data &inds;
184 %if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %do;
185 length _CONTEXTNAME $128;
186 retain _CONTEXTNAME "SAS Job Execution compute context";
187 %end;
188 %if %mf_existvarList(&inds,FLOW_ID)=0 %then %do;
189 retain FLOW_ID 0;
190 %end;
191 /* https://github.com/sasjs/adapter/pull/845#issuecomment-2956589644 */
192 retain _omitSessionResults "false";
193 set &inds;
194 &dbg. putlog (_all_)(=);
195 run;
196%end;
197
198%local missings;
199proc sql noprint;
200select count(*) into: missings
201 from &inds
202 where flow_id is null or _program is null;
203%mp_abort(iftrue=(&missings>0)
204 ,mac=&sysmacroname
205 ,msg=%str(input dataset has &missings missing values for FLOW_ID or _PROGRAM)
206)
207
208%if %mf_nobs(&inds)=0 %then %do;
209 %put No observations in &inds! Leaving macro &sysmacroname;
210 %return;
211%end;
212
213/* ensure output table is available */
214data &outds;run;
215proc sql;
216drop table &outds;
217
218options noquotelenmax;
219%local base_uri; /* location of rest apis */
220%let base_uri=%mf_getplatform(VIYARESTAPI);
221
222
223/* get flows */
224proc sort data=&inds;
225 by flow_id;
226run;
227data _null_;
228 set &inds (keep=flow_id) end=last;
229 by flow_id;
230 if last.flow_id then do;
231 cnt+1;
232 call symputx(cats('flow',cnt),flow_id,'l');
233 end;
234 if last then call symputx('flowcnt',cnt,'l');
235run;
236
237/* prepare temporary datasets */
238%local fid jid jds jdsapp jdsrunning jdswaitfor;
239data;run;%let jds=&syslast;
240data;run;%let jdsapp=&syslast;
241data;run;%let jdsrunning=&syslast;
242data;run;%let jdswaitfor=&syslast;
243
244/* start loop */
245%do fid=1 %to &flowcnt;
246
247 %if not ( &raise_err and &syscc ) %then %do;
248
249 %put preparing job attributes for flow &&flow&fid;
250 %local jds jcnt;
251 data &jds(drop=_contextName _program);
252 set &inds(where=(flow_id=&&flow&fid));
253 if _contextName='' then _contextName="SAS Job Execution compute context";
254 call symputx(cats('job',_n_),_program,'l');
255 call symputx(cats('context',_n_),_contextName,'l');
256 call symputx('jcnt',_n_,'l');
257 &dbg. if _n_= 1 then putlog "Loop &fid";
258 &dbg. putlog (_all_)(=);
259 run;
260 /* build list of char and num vars in json format */
261 /* Viya 2026 expects all values to be strings */
262 %local nvars cvars ii _vnm;
263 %let cvars=%mf_getvarlist(&jds,typefilter=C);
264 %let nvars=%mf_getvarlist(&jds,typefilter=N);
265 %put exporting job variables in json format;
266 %do jid=1 %to &jcnt;
267 data _null_;
268 set &jds;
269 if _n_=&jid;
270 length _param $32767;
271 _param='';
272 %if %length(&cvars)>0 %then %do ii=1 %to %sysfunc(countw(&cvars,%str( )));
273 %let _vnm=%scan(&cvars,&ii,%str( ));
274 if _param ne '' then _param=cats(_param,',');
275 _param=cats(_param,'"'
276 ,"%lowcase(&_vnm)"
277 ,'":'
278 ,quote(trim(&_vnm))
279 );
280 %end;
281 %if %length(&nvars)>0 %then %do ii=1 %to %sysfunc(countw(&nvars,%str( )));
282 %let _vnm=%scan(&nvars,&ii,%str( ));
283 if _param ne '' then _param=cats(_param,',');
284 _param=cats(_param,'"'
285 ,"%lowcase(&_vnm)"
286 ,'":"'
287 ,strip(put(&_vnm,best32.))
288 ,'"'
289 );
290 %end;
291 call symputx(cats('jparams',&jid),_param,'l');
292 stop;
293 run;
294 %local jobuid&jid;
295 %let jobuid&jid=0; /* used in next loop */
296 %end;
297 %local concurrency completed;
298 %let concurrency=0;
299 %let completed=0;
300 proc sql; drop table &jdsrunning;
301 %do jid=1 %to &jcnt;
302 /**
303 * now we can execute the jobs up to the maxconcurrency setting
304 */
305 %if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
306
307 /* check to see if the job finished in the previous round */
308 %if %sysfunc(exist(&outds))=1 %then %do;
309 %local jobcheck; %let jobcheck=0;
310 proc sql noprint;
311 select count(*) into: jobcheck
312 from &outds where uuid="&&jobuid&jid";
313 %if &jobcheck>0 %then %do;
314 %put &&job&jid in flow &fid with uid &&jobuid&jid completed!;
315 %let job&jid=0;
316 %end;
317 %end;
318
319 /* check if job was triggered and, if
320 so, if we have enough slots to run? */
321 %if ("&&jobuid&jid"="0") and (&concurrency<&maxconcurrency) %then %do;
322
323 /* But only start if no issues detected so far */
324 %if not ( &raise_err and &syscc ) %then %do;
325
326 %local jobname jobpath;
327 %let jobname=%scan(&&job&jid,-1,/);
328 %let jobpath=
329 %substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
330
331 %put executing &jobpath/&jobname with paramstring &&jparams&jid;
332 %mv_jobexecute(path=&jobpath
333 ,name=&jobname
334 ,paramstring=%superq(jparams&jid)
335 ,outds=&jdsapp
336 ,contextname=&&context&jid
337 ,mdebug=&mdebug
338 )
339 data &jdsapp;
340 format jobparams $32767.;
341 set &jdsapp(where=(method='GET' and rel='state'));
342 jobparams=symget("jparams&jid");
343 /* uri here has the /state suffix */
344 uuid=scan(uri,-2,'/');
345 call symputx("jobuid&jid",uuid,'l');
346 run;
347 proc append base=&jdsrunning data=&jdsapp;
348 run;
349 %let concurrency=%eval(&concurrency+1);
350 /* sleep one second after every request to smooth the impact */
351 data _null_;
352 call sleep(1,1);
353 run;
354
355 %end;
356 %else %do; /* Job was skipped due to problems */
357
358 %put jobid &&job&jid in flow &fid skipped due to SYSCC (&syscc);
359 %let completed = %eval(&completed+1);
360 %let job&jid=0; /* Indicate job has finished */
361
362 %end;
363
364 %end;
365 %end;
366 %if &jid=&jcnt %then %do;
367 /* we are at the end of the loop - check which jobs have finished */
368 %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref
369 ,raise_err=&raise_err,mdebug=&mdebug)
370 %local done;
371 %let done=%mf_nobs(&jdswaitfor);
372 %if &done>0 %then %do;
373 %let completed=%eval(&completed+&done);
374 %let concurrency=%eval(&concurrency-&done);
375 data &jdsapp;
376 set &jdswaitfor;
377 flow_id=&&flow&fid;
378 uuid=scan(uri,-1,'/');
379 run;
380 proc append base=&outds data=&jdsapp;
381 run;
382 %end;
383 proc sql;
384 delete from &jdsrunning
385 where uuid in (select uuid from &outds
386 where state in ('canceled','completed','failed')
387 );
388
389 /* loop again if jobs are left */
390 %if &completed < &jcnt %then %do;
391 %let jid=0;
392 %put looping flow &fid again;
393 %put &completed of &jcnt jobs completed, &concurrency jobs running;
394 %end;
395 %end;
396 %end;
397
398 %end;
399 %else %do;
400
401 %put Flow &&flow&fid skipped due to SYSCC (&syscc);
402
403 %end;
404 /* back up and execute the next flow */
405%end;
406
407%if &mdebug=1 %then %do;
408 %put &sysmacroname exit vars:;
409 %put _local_;
410%end;
411
412%mend mv_jobflow;