131%macro mv_jobflow(inds=0,outds=work.mv_jobflow
133 ,access_token_var=ACCESS_TOKEN
134 ,grant_type=sas_services
140%
if &mdebug=1 %then %
do;
141 %put &sysmacroname entry vars:;
152%
if &grant_type=detect %then %
do;
153 %
if %symexist(&access_token_var) %then %let grant_type=authorization_code;
154 %
else %let grant_type=sas_services;
156%
if &grant_type=sas_services %then %
do;
157 %let oauth_bearer=oauth_bearer=sas_services;
158 %let &access_token_var=;
161%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
162 and &grant_type ne sas_services
165 ,msg=%str(Invalid value
for grant_type: &grant_type)
168%mp_abort(iftrue=(
"&inds"=
"0")
170 ,msg=%str(Input dataset was not provided)
172%mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0)
174 ,msg=%str(The _PROGRAM column must exist on input dataset &inds)
176%mp_abort(iftrue=(&maxconcurrency<1)
178 ,msg=%str(The maxconcurrency variable should be a positive integer)
182%if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %do;
184 %
if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %
do;
185 length _CONTEXTNAME $128;
186 retain _CONTEXTNAME
"SAS Job Execution compute context";
188 %
if %mf_existvarList(&inds,FLOW_ID)=0 %then %
do;
192 retain _omitSessionResults
"false";
194 &dbg. putlog (_all_)(=);
200select count(*) into: missings
202 where flow_id is null or _program is null;
203%mp_abort(iftrue=(&missings>0)
205 ,msg=%str(input dataset has &missings missing values for FLOW_ID or _PROGRAM)
208%if %mf_nobs(&inds)=0 %then %do;
209 %put No observations in &inds! Leaving macro &sysmacroname;
218options noquotelenmax;
220%let base_uri=%mf_getplatform(VIYARESTAPI);
228 set &inds (keep=flow_id) end=last;
230 if last.flow_id then do;
232 call symputx(cats('flow',cnt),flow_id,'l');
234 if last then call symputx('flowcnt',cnt,'l');
238%local fid jid jds jdsapp jdsrunning jdswaitfor;
239data;run;%let jds=&syslast;
240data;run;%let jdsapp=&syslast;
241data;run;%let jdsrunning=&syslast;
242data;run;%let jdswaitfor=&syslast;
245%do fid=1 %to &flowcnt;
247 %if not ( &raise_err and &syscc ) %then %do;
249 %put preparing job attributes for flow &&flow&fid;
251 data &jds(drop=_contextName _program);
252 set &inds(where=(flow_id=&&flow&fid));
253 if _contextName='' then _contextName="SAS Job Execution compute context";
254 call symputx(cats('job',_n_),_program,'l');
255 call symputx(cats('context',_n_),_contextName,'l');
256 call symputx('jcnt',_n_,'l');
257 &dbg. if _n_= 1 then putlog "Loop &fid";
258 &dbg. putlog (_all_)(=);
262 %local nvars cvars ii _vnm;
263 %let cvars=%mf_getvarlist(&jds,typefilter=C);
264 %let nvars=%mf_getvarlist(&jds,typefilter=N);
265 %put exporting job variables in json format;
270 length _param $32767;
272 %if %length(&cvars)>0 %then %do ii=1 %to %sysfunc(countw(&cvars,%str( )));
273 %let _vnm=%scan(&cvars,&ii,%str( ));
274 if _param ne '' then _param=cats(_param,',');
275 _param=cats(_param,'"'
281 %if %length(&nvars)>0 %then %do ii=1 %to %sysfunc(countw(&nvars,%str( )));
282 %let _vnm=%scan(&nvars,&ii,%str( ));
283 if _param ne '' then _param=cats(_param,',');
284 _param=cats(_param,'"'
287 ,strip(put(&_vnm,best32.))
291 call symputx(cats('jparams',&jid),_param,'l');
297 %local concurrency completed;
300 proc sql; drop table &jdsrunning;
305 %if "&&job&jid" ne "0" %then %do;
308 %if %sysfunc(exist(&outds))=1 %then %do;
309 %local jobcheck; %let jobcheck=0;
311 select count(*) into: jobcheck
312 from &outds where uuid="&&jobuid&jid";
313 %if &jobcheck>0 %then %do;
314 %put &&job&jid in flow &fid with uid &&jobuid&jid completed!;
321 %if ("&&jobuid&jid"="0") and (&concurrency<&maxconcurrency) %then %do;
324 %if not ( &raise_err and &syscc ) %then %do;
326 %local jobname jobpath;
327 %let jobname=%scan(&&job&jid,-1,/);
329 %substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
331 %put executing &jobpath/&jobname with paramstring &&jparams&jid;
332 %mv_jobexecute(path=&jobpath
334 ,paramstring=%superq(jparams&jid)
336 ,contextname=&&context&jid
340 format jobparams $32767.;
341 set &jdsapp(where=(method='GET' and rel='state'));
342 jobparams=symget("jparams&jid");
344 uuid=scan(uri,-2,'/');
345 call symputx("jobuid&jid",uuid,'l');
347 proc append base=&jdsrunning data=&jdsapp;
349 %let concurrency=%eval(&concurrency+1);
358 %put jobid &&job&jid in flow &fid skipped due to SYSCC (&syscc);
359 %let completed = %eval(&completed+1);
366 %if &jid=&jcnt %then %do;
368 %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref
369 ,raise_err=&raise_err,mdebug=&mdebug)
371 %let done=%mf_nobs(&jdswaitfor);
372 %if &done>0 %then %do;
373 %let completed=%eval(&completed+&done);
374 %let concurrency=%eval(&concurrency-&done);
378 uuid=scan(uri,-1,'/');
380 proc append base=&outds data=&jdsapp;
384 delete from &jdsrunning
385 where uuid in (select uuid from &outds
386 where state in ('canceled','completed','failed')
390 %if &completed < &jcnt %then %do;
392 %put looping flow &fid again;
393 %put &completed of &jcnt jobs completed, &concurrency jobs running;
401 %put Flow &&flow&fid skipped due to SYSCC (&syscc);
407%if &mdebug=1 %then %do;
408 %put &sysmacroname exit vars:;