131%macro mv_jobflow(inds=0,outds=work.mv_jobflow
133 ,access_token_var=ACCESS_TOKEN
134 ,grant_type=sas_services
140%
if &mdebug=1 %then %
do;
141 %put &sysmacroname entry vars:;
152%
if &grant_type=detect %then %
do;
153 %
if %symexist(&access_token_var) %then %let grant_type=authorization_code;
154 %
else %let grant_type=sas_services;
156%
if &grant_type=sas_services %then %
do;
157 %let oauth_bearer=oauth_bearer=sas_services;
158 %let &access_token_var=;
161%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
162 and &grant_type ne sas_services
165 ,msg=%str(Invalid value
for grant_type: &grant_type)
168%mp_abort(iftrue=(
"&inds"=
"0")
170 ,msg=%str(Input dataset was not provided)
172%mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0)
174 ,msg=%str(The _PROGRAM column must exist on input dataset &inds)
176%mp_abort(iftrue=(&maxconcurrency<1)
178 ,msg=%str(The maxconcurrency variable should be a positive integer)
182%
if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %
do;
184 %
if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %
do;
185 length _CONTEXTNAME $128;
186 retain _CONTEXTNAME
"SAS Job Execution compute context";
188 %
if %mf_existvarList(&inds,FLOW_ID)=0 %then %
do;
192 &dbg. putlog (_all_)(=);
198select count(*) into: missings
200 where flow_id is null or _program is null;
201%mp_abort(iftrue=(&missings>0)
203 ,msg=%str(input dataset has &missings missing values for FLOW_ID or _PROGRAM)
206%if %mf_nobs(&inds)=0 %then %do;
207 %put No observations in &inds! Leaving macro &sysmacroname;
216options noquotelenmax;
218%let base_uri=%mf_getplatform(VIYARESTAPI);
226 set &inds (keep=flow_id) end=last;
228 if last.flow_id then do;
230 call symputx(cats('flow',cnt),flow_id,'l');
232 if last then call symputx('flowcnt',cnt,'l');
236%local fid jid jds jjson jdsapp jdsrunning jdswaitfor jfref;
237data;run;%let jds=&syslast;
238data;run;%let jjson=&syslast;
239data;run;%let jdsapp=&syslast;
240data;run;%let jdsrunning=&syslast;
241data;run;%let jdswaitfor=&syslast;
242%let jfref=%mf_getuniquefileref();
245%do fid=1 %to &flowcnt;
247 %if not ( &raise_err and &syscc ) %then %do;
249 %put preparing job attributes for flow &&flow&fid;
251 data &jds(drop=_contextName _program);
252 set &inds(where=(flow_id=&&flow&fid));
253 if _contextName='' then _contextName="SAS Job Execution compute context";
254 call symputx(cats('job',_n_),_program,'l');
255 call symputx(cats('context',_n_),_contextName,'l');
256 call symputx('jcnt',_n_,'l');
257 &dbg. if _n_= 1 then putlog "Loop &fid";
258 &dbg. putlog (_all_)(=);
260 %put exporting job variables in json format;
269 proc json out=&jfref;
270 export &jjson / nosastags fmtnumeric;
273 infile &jfref lrecl=32767;
275 jparams=cats('jparams',symget('jid'));
276 call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
281 %local concurrency completed;
284 proc sql; drop table &jdsrunning;
289 %if "&&job&jid" ne "0" %then %do;
292 %if %sysfunc(exist(&outds))=1 %then %do;
293 %local jobcheck; %let jobcheck=0;
295 select count(*) into: jobcheck
296 from &outds where uuid="&&jobuid&jid";
297 %if &jobcheck>0 %then %do;
298 %put &&job&jid in flow &fid with uid &&jobuid&jid completed!;
305 %if ("&&jobuid&jid"="0") and (&concurrency<&maxconcurrency) %then %do;
308 %if not ( &raise_err and &syscc ) %then %do;
310 %local jobname jobpath;
311 %let jobname=%scan(&&job&jid,-1,/);
313 %substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
315 %put executing &jobpath/&jobname with paramstring &&jparams&jid;
316 %mv_jobexecute(path=&jobpath
318 ,paramstring=%superq(jparams&jid)
320 ,contextname=&&context&jid
323 format jobparams $32767.;
324 set &jdsapp(where=(method='GET' and rel='state'));
325 jobparams=symget("jparams&jid");
327 uuid=scan(uri,-2,'/');
328 call symputx("jobuid&jid",uuid,'l');
330 proc append base=&jdsrunning data=&jdsapp;
332 %let concurrency=%eval(&concurrency+1);
341 %put jobid &&job&jid in flow &fid skipped due to SYSCC (&syscc);
342 %let completed = %eval(&completed+1);
349 %if &jid=&jcnt %then %do;
351 %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref
352 ,raise_err=&raise_err,mdebug=&mdebug)
354 %let done=%mf_nobs(&jdswaitfor);
355 %if &done>0 %then %do;
356 %let completed=%eval(&completed+&done);
357 %let concurrency=%eval(&concurrency-&done);
361 uuid=scan(uri,-1,'/');
363 proc append base=&outds data=&jdsapp;
367 delete from &jdsrunning
368 where uuid in (select uuid from &outds
369 where state in ('canceled','completed','failed')
373 %if &completed < &jcnt %then %do;
375 %put looping flow &fid again;
376 %put &completed of &jcnt jobs completed, &concurrency jobs running;
384 %put Flow &&flow&fid skipped due to SYSCC (&syscc);
390%if &mdebug=1 %then %do;
391 %put &sysmacroname exit vars:;