FORM v5.0.0-35-g6318119
parallel.c
Go to the documentation of this file.
1
11/* #[ License : */
12/*
13 * Copyright (C) 1984-2026 J.A.M. Vermaseren
14 * When using this file you are requested to refer to the publication
15 * J.A.M.Vermaseren "New features of FORM" math-ph/0010025
16 * This is considered a matter of courtesy as the development was paid
17 * for by FOM the Dutch physics granting agency and we would like to
18 * be able to track its scientific use to convince FOM of its value
19 * for the community.
20 *
21 * This file is part of FORM.
22 *
23 * FORM is free software: you can redistribute it and/or modify it under the
24 * terms of the GNU General Public License as published by the Free Software
25 * Foundation, either version 3 of the License, or (at your option) any later
26 * version.
27 *
28 * FORM is distributed in the hope that it will be useful, but WITHOUT ANY
29 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
30 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
31 * details.
32 *
33 * You should have received a copy of the GNU General Public License along
34 * with FORM. If not, see <http://www.gnu.org/licenses/>.
35 */
36/* #] License : */
37/*
38 #[ includes :
39*/
40#include "form3.h"
41#include "vector.h"
42
43#include <assert.h> // must come after form3.h
44
45/*
46#define PF_DEBUG_BCAST_LONG
47#define PF_DEBUG_BCAST_BUF
48#define PF_DEBUG_BCAST_PREDOLLAR
49#define PF_DEBUG_BCAST_RHSEXPR
50#define PF_DEBUG_BCAST_DOLLAR
51#define PF_DEBUG_BCAST_PREVAR
52#define PF_DEBUG_BCAST_CBUF
53#define PF_DEBUG_BCAST_EXPRFLAGS
54#define PF_DEBUG_REDUCE_DOLLAR
55*/
56
57/* mpi.c */
58LONG PF_RealTime(int);
59int PF_LibInit(int*, char***);
60int PF_LibTerminate(int);
61int PF_Probe(int*);
62int PF_RecvWbuf(WORD*,LONG*,int*);
63int PF_IRecvRbuf(PF_BUFFER*,int,int);
64int PF_WaitRbuf(PF_BUFFER *,int,LONG *);
65int PF_RawSend(int dest, void *buf, LONG l, int tag);
66LONG PF_RawRecv(int *src,void *buf,LONG thesize,int *tag);
67int PF_RawProbe(int *src, int *tag, int *bytesize);
68int PF_RawIsend(int dest, const void *buf, int count, MPI_Datatype type, int tag, MPI_Request *request);
69int PF_RawWaitAll(int count, MPI_Request *request, MPI_Status *status);
70int PF_Discard(int *src, int *tag);
71
72/* Private functions */
73
74static int PF_WaitAllSlaves(void);
75
76static void PF_PackRedefinedPreVars(void);
77static void PF_UnpackRedefinedPreVars(void);
78
79static int PF_Wait4MasterIP(int tag);
80static int PF_DoOneExpr(void);
81static int PF_ReadMaster(void);/*reads directly to its scratch!*/
82static int PF_Slave2MasterIP(int src);/*both master and slave*/
83static int PF_Master2SlaveIP(int dest, EXPRESSIONS e);
84static int PF_WalkThrough(WORD *t, LONG l, LONG chunk, LONG *count);
85static int PF_SendChunkIP(FILEHANDLE *curfile, POSITION *position, int to, LONG thesize);
86static int PF_RecvChunkIP(FILEHANDLE *curfile, int from, LONG thesize);
87
88static void PF_ReceiveErrorMessage(int src, int tag);
89static void PF_CatchErrorMessages(int *src, int *tag);
90static void PF_CatchErrorMessagesForAll(void);
91static int PF_ProbeWithCatchingErrorMessages(int *src);
92
93static void PF_RaiseRuntimeError(void);
94static void PF_BroadcastRuntimeError(void);
95static void PF_PostEndSortBarrier(void);
96
97/* Variables */
98
100
101static int PF_processing; /* Flag indicating that parallel processing of terms is in progress */
102static LONG PF_goutterms; /* (master) Total out terms at PF_EndSort(), used in PF_Statistics(). */
103static POSITION PF_exprsize; /* (master) The size of the expression at PF_EndSort(), used in PF_Processor(). */
104
105/*
106 This will work well only under Linux, see
107 #ifdef PF_WITH_SCHED_YIELD
108 below in PF_WaitAllSlaves().
109*/
110#ifdef PF_WITH_SCHED_YIELD
111 #include <sched.h>
112#endif
113
114#ifdef PF_WITHLOG
115 #define PRINTFBUF(TEXT,TERM,SIZE) { UBYTE lbuf[24]; if(PF.log){ WORD iii;\
116 NumToStr(lbuf,AC.CModule); \
117 fprintf(stderr,"[%d|%s] %s : ",PF.me,lbuf,(char*)TEXT);\
118 if(TERM){ fprintf(stderr,"[%d] ",(int)(*TERM));\
119 if((SIZE)<500 && (SIZE)>0) for(iii=1;iii<(SIZE);iii++)\
120 fprintf(stderr,"%d ",TERM[iii]); }\
121 fprintf(stderr,"\n");\
122 fflush(stderr); } }
123#else
124 #define PRINTFBUF(TEXT,TERM,SIZE) {}
125#endif
126
131#define SWAP(x, y) \
132 do { \
133 char swap_tmp__[sizeof(x) == sizeof(y) ? (int)sizeof(x) : -1]; \
134 memcpy(swap_tmp__, &y, sizeof(x)); \
135 memcpy(&y, &x, sizeof(x)); \
136 memcpy(&x, swap_tmp__, sizeof(x)); \
137 } while (0)
138
142#define PACK_LONG(p, n) \
143 do { \
144 *(p)++ = (UWORD)((ULONG)(n) & (ULONG)WORDMASK); \
145 *(p)++ = (UWORD)(((ULONG)(n) >> BITSINWORD) & (ULONG)WORDMASK); \
146 } while (0)
147
151#define UNPACK_LONG(p, n) \
152 do { \
153 (n) = (LONG)((((ULONG)(p)[1] & (ULONG)WORDMASK) << BITSINWORD) | ((ULONG)(p)[0] & (ULONG)WORDMASK)); \
154 (p) += 2; \
155 } while (0)
156
160#define CHECK(condition) _CHECK(condition, __FILE__, __LINE__)
161#define _CHECK(condition, file, line) __CHECK(condition, file, line)
162#define __CHECK(condition, file, line) \
163 do { \
164 if ( !(condition) ) { \
165 Error0("Fatal error at " file ":" #line); \
166 Terminate(-1); \
167 } \
168 } while (0)
169
170/*
171 * For debugging.
172 */
173#define DBGOUT(lv1, lv2, a) do { if ( lv1 >= lv2 ) { printf a; fflush(stdout); } } while (0)
174
175/* (AN.ninterms of master) == max(AN.ninterms of slaves) == sum(PF_linterms of slaves) at EndSort(). */
176#define DBGOUT_NINTERMS(lv, a)
177/* #define DBGOUT_NINTERMS(lv, a) DBGOUT(1, lv, a) */
178
179/*
180 #] includes :
181 #[ statistics :
182 #[ variables : (should be part of a struct?)
183*/
184static LONG PF_linterms; /* local interms on this proces: PF_Proces */
185#define PF_STATS_SIZE 5
186static LONG **PF_stats = NULL;/* space for collecting statistics of all procs */
187static LONG PF_laststat; /* last realtime when statistics were printed */
188static LONG PF_statsinterval;/* timeinterval for printing statistics */
189/*
190 #] variables :
191 #[ PF_Statistics :
192*/
193
202static int PF_Statistics(LONG **stats, int proc)
203{
204 GETIDENTITY
205 LONG real, cpu;
206 WORD rpart, cpart;
207 int i, j;
208
209 if ( AT.SS == AM.S0 && PF.me == MASTER ) {
210 real = PF_RealTime(PF_TIME); rpart = (WORD)(real%100); real /= 100;
211
212 if ( PF_stats == NULL ) {
213 PF_stats = (LONG**)Malloc1(PF.numtasks*sizeof(LONG*),"PF_stats 1");
214 for ( i = 0; i < PF.numtasks; i++ ) {
215 PF_stats[i] = (LONG*)Malloc1(PF_STATS_SIZE*sizeof(LONG),"PF_stats 2");
216 for ( j = 0; j < PF_STATS_SIZE; j++ ) PF_stats[i][j] = 0;
217 }
218 }
219 if ( proc > 0 ) for ( i = 0; i < PF_STATS_SIZE; i++ ) PF_stats[proc][i] = stats[0][i];
220
221 if ( real >= PF_laststat + PF_statsinterval || proc == 0 ) {
222 LONG sum[PF_STATS_SIZE];
223
224 for ( i = 0; i < PF_STATS_SIZE; i++ ) sum[i] = 0;
225 sum[0] = cpu = TimeCPU(1);
226 cpart = (WORD)(cpu%1000);
227 cpu /= 1000;
228 cpart /= 10;
229 if ( AC.OldParallelStats ) MesPrint("");
230 if ( proc > 0 && AC.StatsFlag && AC.OldParallelStats ) {
231 MesPrint("proc CPU in gen left byte");
232 MesPrint("%3d : %7l.%2i %10l",0,cpu,cpart,AN.ninterms);
233 }
234 else if ( AC.StatsFlag && AC.OldParallelStats ) {
235 MesPrint("proc CPU in gen out byte");
236 MesPrint("%3d : %7l.%2i %10l %10l %10l",0,cpu,cpart,AN.ninterms,0,PF_goutterms);
237 }
238
239 for ( i = 1; i < PF.numtasks; i++ ) {
240 cpart = (WORD)(PF_stats[i][0]%1000);
241 cpu = PF_stats[i][0] / 1000;
242 cpart /= 10;
243 if ( AC.StatsFlag && AC.OldParallelStats )
244 MesPrint("%3d : %7l.%2i %10l %10l %10l",i,cpu,cpart,
245 PF_stats[i][2],PF_stats[i][3],PF_stats[i][4]);
246 for ( j = 0; j < PF_STATS_SIZE; j++ ) sum[j] += PF_stats[i][j];
247 }
248 cpart = (WORD)(sum[0]%1000);
249 cpu = sum[0] / 1000;
250 cpart /= 10;
251 if ( AC.StatsFlag && AC.OldParallelStats ) {
252 MesPrint("Sum = %7l.%2i %10l %10l %10l",cpu,cpart,sum[2],sum[3],sum[4]);
253 MesPrint("Real = %7l.%2i %20s (%l) %16s",
254 real,rpart,AC.Commercial,AC.CModule,EXPRNAME(AR.CurExpr));
255 MesPrint("");
256 }
257 PF_laststat = real;
258 }
259 }
260 return(0);
261}
262/*
263 #] PF_Statistics :
264 #] statistics :
265 #[ sort.c :
266 #[ sort variables :
267*/
268
272typedef struct NoDe {
273 struct NoDe *left;
274 struct NoDe *rght;
275 int lloser;
276 int rloser;
277 int lsrc;
278 int rsrc;
280
281/*
282 should/could be put in one struct
283*/
284static NODE *PF_root; /* root of tree of losers */
285static WORD PF_loser; /* this is the last loser */
286static WORD **PF_term; /* these point to the active terms */
287static WORD **PF_newcpos; /* new coefficients of merged terms */
288static WORD *PF_newclen; /* length of new coefficients */
289
290/*
291 preliminary: could also write somewhere else?
292*/
293
294static WORD *PF_WorkSpace; /* used in PF_EndSort() */
295static UWORD *PF_ScratchSpace; /* used in PF_GetLoser() */
296
297/*
298 #] sort variables :
299 #[ PF_AllocBuf :
300*/
301
318static PF_BUFFER *PF_AllocBuf(int nbufs, LONG bsize, WORD free)
319{
320 PF_BUFFER *buf;
321 UBYTE *p, *stop;
322 LONG allocsize;
323 int i;
324
325 allocsize =
326 (LONG)(sizeof(PF_BUFFER) + 4*nbufs*sizeof(WORD*) + (nbufs-free)*bsize);
327
328 allocsize +=
329 (LONG)( nbufs * ( 2 * sizeof(MPI_Status)
330 + sizeof(MPI_Request)
331 + sizeof(MPI_Datatype)
332 ) );
333 allocsize += (LONG)( nbufs * 3 * sizeof(int) );
334
335 if ( ( buf = (PF_BUFFER*)Malloc1(allocsize,"PF_AllocBuf") ) == NULL ) return(NULL);
336
337 p = ((UBYTE *)buf) + sizeof(PF_BUFFER);
338 stop = ((UBYTE *)buf) + allocsize;
339
340 buf->numbufs = nbufs;
341 buf->active = 0;
342
343 buf->buff = (WORD**)p; p += buf->numbufs*sizeof(WORD*);
344 buf->fill = (WORD**)p; p += buf->numbufs*sizeof(WORD*);
345 buf->full = (WORD**)p; p += buf->numbufs*sizeof(WORD*);
346 buf->stop = (WORD**)p; p += buf->numbufs*sizeof(WORD*);
347 buf->status = (MPI_Status *)p; p += buf->numbufs*sizeof(MPI_Status);
348 buf->retstat = (MPI_Status *)p; p += buf->numbufs*sizeof(MPI_Status);
349 buf->request = (MPI_Request *)p; p += buf->numbufs*sizeof(MPI_Request);
350 buf->type = (MPI_Datatype *)p; p += buf->numbufs*sizeof(MPI_Datatype);
351 buf->index = (int *)p; p += buf->numbufs*sizeof(int);
352
353 for ( i = 0; i < buf->numbufs; i++ ) buf->request[i] = MPI_REQUEST_NULL;
354 buf->tag = (int *)p; p += buf->numbufs*sizeof(int);
355 buf->from = (int *)p; p += buf->numbufs*sizeof(int);
356/*
357 and finally the real bufferspace
358*/
359 for ( i = free; i < buf->numbufs; i++ ) {
360 buf->buff[i] = (WORD*)p; p += bsize;
361 buf->stop[i] = (WORD*)p;
362 buf->fill[i] = buf->full[i] = buf->buff[i];
363 }
364 if ( p != stop ) {
365 MesPrint("Error in PF_AllocBuf p = %x stop = %x\n",p,stop);
366 return(NULL);
367 }
368 return(buf);
369}
370
371/*
372 #] PF_AllocBuf :
373 #[ PF_InitTree :
374*/
375
387static int PF_InitTree(void)
388{
389 GETIDENTITY
390 PF_BUFFER **rbuf = PF.rbufs;
391 UBYTE *p, *stop;
392 int numrbufs,numtasks = PF.numtasks;
393 int i, j, src, numnodes;
394 int numslaves = numtasks - 1;
395 LONG size;
396/*
397 #[ the buffers : for the new coefficients and the terms
398 we need one for each slave
399*/
400 if ( PF_term == NULL ) {
401 size = 2*numtasks*sizeof(WORD*) + sizeof(WORD)*
402 ( numtasks*(1 + AM.MaxTal) + (AM.MaxTer/sizeof(WORD)+1) + 2*(AM.MaxTal+2));
403
404 PF_term = (WORD **)Malloc1(size,"PF_term");
405 stop = ((UBYTE*)PF_term) + size;
406 p = ((UBYTE*)PF_term) + numtasks*sizeof(WORD*);
407
408 PF_newcpos = (WORD **)p; p += sizeof(WORD*) * numtasks;
409 PF_newclen = (WORD *)p; p += sizeof(WORD) * numtasks;
410 for ( i = 0; i < numtasks; i++ ) {
411 PF_newcpos[i] = (WORD *)p; p += sizeof(WORD)*AM.MaxTal;
412 PF_newclen[i] = 0;
413 }
414 PF_WorkSpace = (WORD *)p; p += AM.MaxTer+sizeof(WORD);
415 PF_ScratchSpace = (UWORD*)p; p += 2*(AM.MaxTal+2)*sizeof(UWORD);
416
417 if ( p != stop ) { MesPrint("error in PF_InitTree"); return(-1); }
418 }
419/*
420 #] the buffers :
421 #[ the receive buffers :
422*/
423 numrbufs = PF.numrbufs;
424/*
425 this is the size we have in the combined sortbufs for one slave
426*/
427 size = (AT.SS->sTop2 - AT.SS->lBuffer - 1)/(PF.numtasks - 1);
428
429 if ( rbuf == NULL ) {
430 if ( ( rbuf = (PF_BUFFER**)Malloc1(numtasks*sizeof(PF_BUFFER*), "Master: rbufs") ) == NULL ) return(-1);
431 if ( (rbuf[0] = PF_AllocBuf(1,0,1) ) == NULL ) return(-1);
432 for ( i = 1; i < numtasks; i++ ) {
433 if (!(rbuf[i] = PF_AllocBuf(numrbufs,sizeof(WORD)*size,1))) return(-1);
434 }
435 }
436 rbuf[0]->buff[0] = AT.SS->lBuffer;
437 rbuf[0]->full[0] = rbuf[0]->fill[0] = rbuf[0]->buff[0];
438 rbuf[0]->stop[0] = rbuf[1]->buff[0] = rbuf[0]->buff[0] + 1;
439 rbuf[1]->full[0] = rbuf[1]->fill[0] = rbuf[1]->buff[0];
440 for ( i = 2; i < numtasks; i++ ) {
441 rbuf[i-1]->stop[0] = rbuf[i]->buff[0] = rbuf[i-1]->buff[0] + size;
442 rbuf[i]->full[0] = rbuf[i]->fill[0] = rbuf[i]->buff[0];
443 }
444 rbuf[numtasks-1]->stop[0] = rbuf[numtasks-1]->buff[0] + size;
445
446 for ( i = 1; i < numtasks; i++ ) {
447 for ( j = 0; j < rbuf[i]->numbufs; j++ ) {
448 rbuf[i]->full[j] = rbuf[i]->fill[j] = rbuf[i]->buff[j] + AM.MaxTer/sizeof(WORD) + 2;
449 }
450 PF_term[i] = rbuf[i]->fill[rbuf[i]->active];
451 *PF_term[i] = 0;
452 PF_IRecvRbuf(rbuf[i],rbuf[i]->active,i);
453 }
454 rbuf[0]->active = 0;
455 PF_term[0] = rbuf[0]->buff[0];
456 PF_term[0][0] = 0; /* PF_term[0] is used for a zero term. */
457 PF.rbufs = rbuf;
458/*
459 #] the receive buffers :
460 #[ the actual tree :
461
462 calculate number of nodes in mergetree and allocate space for them
463*/
464 if ( numslaves < 3 ) numnodes = 1;
465 else {
466 numnodes = 2;
467 while ( numnodes < numslaves ) numnodes *= 2;
468 numnodes -= 1;
469 }
470
471 if ( PF_root == NULL )
472 if ( ( PF_root = (NODE*)Malloc1(sizeof(NODE)*numnodes,"nodes in mergetree") ) == NULL )
473 return(-1);
474/*
475 then initialize all the nodes
476*/
477 src = 1;
478 for ( i = 0; i < numnodes; i++ ) {
479 if ( 2*(i+1) <= numnodes ) {
480 PF_root[i].left = &(PF_root[2*(i+1)-1]);
481 PF_root[i].lsrc = 0;
482 }
483 else {
484 PF_root[i].left = 0;
485 if ( src < numtasks ) PF_root[i].lsrc = src++;
486 else PF_root[i].lsrc = 0;
487 }
488 PF_root[i].lloser = 0;
489 }
490 for ( i = 0; i < numnodes; i++ ) {
491 if ( 2*(i+1)+1 <= numnodes ) {
492 PF_root[i].rght = &(PF_root[2*(i+1)]);
493 PF_root[i].rsrc = 0;
494 }
495 else {
496 PF_root[i].rght = 0;
497 if (src<numtasks) PF_root[i].rsrc = src++;
498 else PF_root[i].rsrc = 0;
499 }
500 PF_root[i].rloser = 0;
501 }
502/*
503 #] the actual tree :
504*/
505 return(numnodes);
506}
507
508/*
509 #] PF_InitTree :
510 #[ PF_PutIn :
511*/
512
531static WORD *PF_PutIn(int src)
532{
533 int tag;
534 WORD im, r;
535 WORD *m1, *m2;
536 LONG size;
537 PF_BUFFER *rbuf = PF.rbufs[src];
538 int a = rbuf->active;
539 int next = a+1 >= rbuf->numbufs ? 0 : a+1 ;
540 WORD *lastterm = PF_term[src];
541 WORD *term = rbuf->fill[a];
542
543 if ( src <= 0 ) return(PF_term[0]);
544
545 if ( rbuf->full[a] == rbuf->buff[a] + AM.MaxTer/sizeof(WORD) + 2 ) {
546/*
547 very first term from this src
548*/
549 tag = PF_WaitRbuf(rbuf,a,&size);
550 if ( tag == PF_RUNTIME_ERROR_MSGTAG ) {
552 }
553 rbuf->full[a] += size;
554 if ( tag == PF_ENDBUFFER_MSGTAG ) *rbuf->full[a]++ = 0;
555 else if ( rbuf->numbufs > 1 ) {
556/*
557 post a nonblock. recv. for the next buffer
558*/
559 rbuf->full[next] = rbuf->buff[next] + AM.MaxTer/sizeof(WORD) + 2;
560 size = (LONG)(rbuf->stop[next] - rbuf->full[next]);
561 PF_IRecvRbuf(rbuf,next,src);
562 }
563 }
564 if ( *term == 0 && term != rbuf->full[a] ) return(PF_term[0]);
565/*
566 exception is for rare cases when the terms fitted exactly into buffer
567*/
568 if ( term + *term > rbuf->full[a] || term + 1 >= rbuf->full[a] ) {
569newterms:
570 m1 = rbuf->buff[next] + AM.MaxTer/sizeof(WORD) + 1;
571 if ( *term < 0 || term == rbuf->full[a] ) {
572/*
573 copy term and lastterm to the new buffer, so that they end at m1
574*/
575 m2 = rbuf->full[a] - 1;
576 while ( m2 >= term ) *m1-- = *m2--;
577 rbuf->fill[next] = term = m1 + 1;
578 m2 = lastterm + *lastterm - 1;
579 while ( m2 >= lastterm ) *m1-- = *m2--;
580 lastterm = m1 + 1;
581 }
582 else {
583/*
584 copy beginning of term to the next buffer so that it ends at m1
585*/
586 m2 = rbuf->full[a] - 1;
587 while ( m2 >= term ) *m1-- = *m2--;
588 rbuf->fill[next] = term = m1 + 1;
589 }
590 if ( rbuf->numbufs == 1 ) {
591 rbuf->full[a] = rbuf->buff[a] + AM.MaxTer/sizeof(WORD) + 2;
592 size = (LONG)(rbuf->stop[a] - rbuf->full[a]);
593 PF_IRecvRbuf(rbuf,a,src);
594 }
595/*
596 wait for new terms in the next buffer
597*/
598 rbuf->full[next] = rbuf->buff[next] + AM.MaxTer/sizeof(WORD) + 2;
599 tag = PF_WaitRbuf(rbuf,next,&size);
600 rbuf->full[next] += size;
601 if ( tag == PF_ENDBUFFER_MSGTAG ) {
602 *rbuf->full[next]++ = 0;
603 }
604 else if ( rbuf->numbufs > 1 ) {
605/*
606 post a nonblock. recv. for active buffer, it is not needed anymore
607*/
608 rbuf->full[a] = rbuf->buff[a] + AM.MaxTer/sizeof(WORD) + 2;
609 size = (LONG)(rbuf->stop[a] - rbuf->full[a]);
610 PF_IRecvRbuf(rbuf,a,src);
611 }
612/*
613 now safely make next buffer active
614*/
615 a = rbuf->active = next;
616 }
617
618 if ( *term < 0 ) {
619/*
620 We need to decompress the term
621*/
622 im = *term;
623 r = term[1] - im + 1;
624 m1 = term + 2;
625 m2 = lastterm - im + 1;
626 while ( ++im <= 0 ) *--m1 = *--m2;
627 *--m1 = r;
628 rbuf->fill[a] = term = m1;
629 if ( term + *term > rbuf->full[a] ) goto newterms;
630 }
631 rbuf->fill[a] += *term;
632 return(term);
633}
634
635/*
636 #] PF_PutIn :
637 #[ PF_GetLoser :
638*/
639
658static int PF_GetLoser(NODE *n)
659{
660 GETIDENTITY
661 WORD comp;
662
663 if ( PF_loser == 0 ) {
664/*
665 this is for the right initialization of the tree only
666*/
667 if ( n->left ) n->lloser = PF_GetLoser(n->left);
668 else {
669 n->lloser = n->lsrc;
670 if ( *(PF_term[n->lsrc] = PF_PutIn(n->lsrc)) == 0) n->lloser = 0;
671 }
672 PF_loser = 0;
673 if ( n->rght ) n->rloser = PF_GetLoser(n->rght);
674 else{
675 n->rloser = n->rsrc;
676 if ( *(PF_term[n->rsrc] = PF_PutIn(n->rsrc)) == 0 ) n->rloser = 0;
677 }
678 PF_loser = 0;
679 }
680 else if ( PF_loser == n->lloser ) {
681 if ( n->left ) n->lloser = PF_GetLoser(n->left);
682 else {
683 n->lloser = n->lsrc;
684 if ( *(PF_term[n->lsrc] = PF_PutIn(n->lsrc)) == 0 ) n->lloser = 0;
685 }
686 }
687 else if ( PF_loser == n->rloser ) {
688newright:
689 if ( n->rght ) n->rloser = PF_GetLoser(n->rght);
690 else {
691 n->rloser = n->rsrc;
692 if ( *(PF_term[n->rsrc] = PF_PutIn(n->rsrc)) == 0 ) n->rloser = 0;
693 }
694 }
695 if ( n->lloser > 0 && n->rloser > 0 ) {
696 comp = CompareTerms(BHEAD PF_term[n->lloser],PF_term[n->rloser],(WORD)0);
697 if ( comp > 0 ) return(n->lloser);
698 else if (comp < 0 ) return(n->rloser);
699 else {
700/*
701 #[ terms are equal :
702*/
703 WORD *lcpos, *rcpos;
704 UWORD *newcpos;
705 WORD lclen, rclen, newclen, newnlen;
706 SORTING *S = AT.SS;
707
708 if ( S->PolyWise ) {
709/*
710 #[ Here we work with PolyFun :
711*/
712 WORD *tt1, *w;
713 WORD r1,r2;
714 WORD *ml = PF_term[n->lloser];
715 WORD *mr = PF_term[n->rloser];
716
717 if ( ( r1 = (int)*PF_term[n->lloser] ) <= 0 ) r1 = 20;
718 if ( ( r2 = (int)*PF_term[n->rloser] ) <= 0 ) r2 = 20;
719 tt1 = ml;
720 ml += S->PolyWise;
721 mr += S->PolyWise;
722 if ( S->PolyFlag == 2 ) {
723 w = poly_ratfun_add(BHEAD ml,mr);
724 if ( *tt1 + w[1] - ml[1] > AM.MaxTer/((LONG)sizeof(WORD)) ) {
725 MesPrint("Term too complex in PolyRatFun addition. MaxTermSize of %10l is too small",AM.MaxTer);
726 Terminate(-1);
727 }
728 AT.WorkPointer = w;
729 }
730 else {
731 w = AT.WorkPointer;
732 if ( w + ml[1] + mr[1] > AT.WorkTop ) {
733 MesPrint("A WorkSpace of %10l is too small",AM.WorkSize);
734 Terminate(-1);
735 }
736 AddArgs(BHEAD ml,mr,w);
737 }
738 r1 = w[1];
739 if ( r1 <= FUNHEAD || ( w[FUNHEAD] == -SNUMBER && w[FUNHEAD+1] == 0 ) ) {
740 goto cancelled;
741 }
742 if ( r1 == ml[1] ) {
743 NCOPY(ml,w,r1);
744 }
745 else if ( r1 < ml[1] ) {
746 r2 = ml[1] - r1;
747 mr = w + r1;
748 ml += ml[1];
749 while ( --r1 >= 0 ) *--ml = *--mr;
750 mr = ml - r2;
751 r1 = S->PolyWise;
752 while ( --r1 >= 0 ) *--ml = *--mr;
753 *ml -= r2;
754 PF_term[n->lloser] = ml;
755 }
756 else {
757 r2 = r1 - ml[1];
758 if ( r2 > 2*AM.MaxTal )
759 MesPrint("warning: new term in polyfun is large");
760 mr = tt1 - r2;
761 r1 = S->PolyWise;
762 ml = tt1;
763 *ml += r2;
764 PF_term[n->lloser] = mr;
765 NCOPY(mr,ml,r1);
766 r1 = w[1];
767 NCOPY(mr,w,r1);
768 }
769 PF_newclen[n->rloser] = 0;
770 PF_loser = n->rloser;
771 goto newright;
772/*
773 #] Here we work with PolyFun :
774*/
775 }
776 if ( ( lclen = PF_newclen[n->lloser] ) != 0 ) lcpos = PF_newcpos[n->lloser];
777 else {
778 lcpos = PF_term[n->lloser];
779 lclen = *(lcpos += *lcpos - 1);
780 lcpos -= ABS(lclen) - 1;
781 }
782 if ( ( rclen = PF_newclen[n->rloser] ) != 0 ) rcpos = PF_newcpos[n->rloser];
783 else {
784 rcpos = PF_term[n->rloser];
785 rclen = *(rcpos += *rcpos - 1);
786 rcpos -= ABS(rclen) -1;
787 }
788 lclen = ( (lclen > 0) ? (lclen-1) : (lclen+1) ) >> 1;
789 rclen = ( (rclen > 0) ? (rclen-1) : (rclen+1) ) >> 1;
790 newcpos = PF_ScratchSpace;
791 if ( AddRat(BHEAD (UWORD *)lcpos,lclen,(UWORD *)rcpos,rclen,newcpos,&newnlen) ) return(-1);
792 if ( AN.ncmod != 0 ) {
793 if ( ( AC.modmode & POSNEG ) != 0 ) {
794 NormalModulus(newcpos,&newnlen);
795 }
796 if ( BigLong(newcpos,newnlen,(UWORD *)AC.cmod,ABS(AN.ncmod)) >=0 ) {
797 WORD ii;
798 SubPLon(newcpos,newnlen,(UWORD *)AC.cmod,ABS(AN.ncmod),newcpos,&newnlen);
799 newcpos[newnlen] = 1;
800 for ( ii = 1; ii < newnlen; ii++ ) newcpos[newnlen+ii] = 0;
801 }
802 }
803 if ( newnlen == 0 ) {
804/*
805 terms cancel, get loser of left subtree and then of right subtree
806*/
807cancelled:
808 PF_loser = n->lloser;
809 PF_newclen[n->lloser] = 0;
810 if ( n->left ) n->lloser = PF_GetLoser(n->left);
811 else {
812 n->lloser = n->lsrc;
813 if ( *(PF_term[n->lsrc] = PF_PutIn(n->lsrc)) == 0 ) n->lloser = 0;
814 }
815 PF_loser = n->rloser;
816 PF_newclen[n->rloser] = 0;
817 goto newright;
818 }
819 else {
820/*
821 keep the left term and get the loser of right subtree
822*/
823 newnlen *= 2;
824 newclen = ( newnlen > 0 ) ? ( newnlen + 1 ) : ( newnlen - 1 );
825 if ( newnlen < 0 ) newnlen = -newnlen;
826 PF_newclen[n->lloser] = newclen;
827 lcpos = PF_newcpos[n->lloser];
828 if ( newclen < 0 ) newclen = -newclen;
829 while ( newclen-- ) *lcpos++ = *newcpos++;
830 PF_loser = n->rloser;
831 PF_newclen[n->rloser] = 0;
832 goto newright;
833 }
834/*
835 #] terms are equal :
836*/
837 }
838 }
839 if (n->lloser > 0) return(n->lloser);
840 if (n->rloser > 0) return(n->rloser);
841 return(0);
842}
843/*
844 #] PF_GetLoser :
845 #[ PF_EndSort :
846*/
847
874int PF_EndSort(void)
875{
876 GETIDENTITY
877 FILEHANDLE *fout = AR.outfile;
878 PF_BUFFER *sbuf=PF.sbuf;
879 SORTING *S = AT.SS;
880 WORD *outterm,*pp;
881 LONG size, noutterms;
882 POSITION position, oldposition;
883 WORD i,cc;
884 int oldgzipCompress;
885
886 if ( AT.SS != AT.S0 || !PF.parallel ) return 0;
887
888 if ( PF.me != MASTER ) {
889/*
890 #[ the slaves have to initialize their sendbuffer :
891
892 this is a slave and it's PObuffer should be the minimum of the
893 sortiosize on the master and the POsize of our file.
894 First save the original PObuffer and POstop of the outfile
895*/
896 size = (S->sTop2 - S->lBuffer - 1)/(PF.numtasks - 1);
897 size -= (AM.MaxTer/sizeof(WORD) + 2);
898 if ( fout->POsize < (LONG)(size*sizeof(WORD)) ) size = fout->POsize/sizeof(WORD);
899 if ( sbuf == NULL ) {
900 if ( (sbuf = PF_AllocBuf(PF.numsbufs, size*sizeof(WORD), 1)) == NULL ) return -1;
901 sbuf->active = 0;
902 PF.sbuf = sbuf;
903 }
904 sbuf->buff[0] = fout->PObuffer;
905 sbuf->stop[0] = fout->PObuffer+size;
906 if ( sbuf->stop[0] > fout->POstop ) return -1;
907 for ( i = 0; i < PF.numsbufs; i++ )
908 sbuf->fill[i] = sbuf->full[i] = sbuf->buff[i];
909
910 fout->PObuffer = sbuf->buff[sbuf->active];
911 fout->POstop = sbuf->stop[sbuf->active];
912 fout->POsize = size*sizeof(WORD);
913 fout->POfill = fout->POfull = fout->PObuffer;
914/*
915 #] the slaves have to initialize their sendbuffer :
916*/
917 return(0);
918 }
919/*
920 this waits for all slaves to be ready to send terms back
921*/
922 PF_WaitAllSlaves(); /* Note, the returned value should be 0 on success. */
923/*
924 Now collect the terms of all slaves and merge them.
925 PF_GetLoser gives the position of the smallest term, which is the real
926 work. The smallest term needs to be copied to the outbuf: use PutOut.
927*/
928 PF_InitTree();
929 if ( AR.PolyFun == 0 ) { S->PolyFlag = 0; }
930 else if ( AR.PolyFunType == 1 ) { S->PolyFlag = 1; }
931 else if ( AR.PolyFunType == 2 ) {
932 if ( AR.PolyFunExp == 2
933 || AR.PolyFunExp == 3 ) S->PolyFlag = 1;
934 else S->PolyFlag = 2;
935 }
936 *AR.CompressPointer = 0;
937 SeekScratch(fout, &position);
938 oldposition = position;
939 oldgzipCompress = AR.gzipCompress;
940 AR.gzipCompress = 0;
941
942 noutterms = 0;
943
944 while ( PF_loser >= 0 ) {
945 if ( (PF_loser = PF_GetLoser(PF_root)) == 0 ) break;
946 outterm = PF_term[PF_loser];
947 noutterms++;
948
949 if ( PF_newclen[PF_loser] != 0 ) {
950/*
951 #[ this is only when new coeff was too long :
952*/
953 outterm = PF_WorkSpace;
954 pp = PF_term[PF_loser];
955 cc = *pp;
956 while ( cc-- ) *outterm++ = *pp++;
957 outterm = (outterm[-1] > 0) ? outterm-outterm[-1] : outterm+outterm[-1];
958 if ( PF_newclen[PF_loser] > 0 ) cc = (WORD)PF_newclen[PF_loser] - 1;
959 else cc = -(WORD)PF_newclen[PF_loser] - 1;
960 pp = PF_newcpos[PF_loser];
961 while ( cc-- ) *outterm++ = *pp++;
962 *outterm++ = PF_newclen[PF_loser];
963 *PF_WorkSpace = outterm - PF_WorkSpace;
964 outterm = PF_WorkSpace;
965 *PF_newcpos[PF_loser] = 0;
966 PF_newclen[PF_loser] = 0;
967/*
968 #] this is only when new coeff was too long :
969*/
970 }
971 PRINTFBUF("PF_EndSort to PutOut: ",outterm,*outterm);
972 PutOut(BHEAD outterm,&position,fout,1);
973 }
974 if ( FlushOut(&position,fout,0) ) {
975 AR.gzipCompress = oldgzipCompress;
976 return(-1);
977 }
978 S->TermsLeft = PF_goutterms = noutterms;
979 DIFPOS(PF_exprsize, position, oldposition);
980 AR.gzipCompress = oldgzipCompress;
981 return(1);
982}
983
984/*
985 #] PF_EndSort :
986 #] sort.c :
987 #[ proces.c :
988 #[ variables :
989*/
990
991static WORD *PF_CurrentBracket;
992
993/*
994 #] variables :
995 #[ PF_GetTerm :
996*/
997
1016static WORD PF_GetTerm(WORD *term)
1017{
1018 assert(PF.me != MASTER);
1019 GETIDENTITY
1020 FILEHANDLE *fi = AC.RhsExprInModuleFlag && PF.rhsInParallel ? &PF.slavebuf : AR.infile;
1021 WORD i;
1022 WORD *next, *np, *last, *lp = 0, *nextstop, *tp=term;
1023
1024 AN.deferskipped = 0;
1025 if ( fi->POfill >= fi->POfull || fi->POfull == fi->PObuffer ) {
1026ReceiveNew:
1027 {
1028/*
1029 #[ receive new terms from master :
1030*/
1031 int src = MASTER, tag;
1032 int follow = 0;
1033 LONG size,cpu,space = 0;
1034
1035 if ( PF.log ) {
1036 fprintf(stderr,"[%d] Starting to send to Master\n",PF.me);
1037 fflush(stderr);
1038 }
1039
1040 cpu = TimeCPU(1);
1042 PF_Pack(&cpu ,1,PF_LONG);
1043 PF_Pack(&space ,1,PF_LONG);
1044 PF_Pack(&PF_linterms ,1,PF_LONG);
1045 PF_Pack(&(AM.S0->GenTerms) ,1,PF_LONG);
1046 PF_Pack(&(AM.S0->TermsLeft),1,PF_LONG);
1047 PF_Pack(&follow ,1,PF_INT );
1048
1049 if ( PF.log ) {
1050 fprintf(stderr,"[%d] Now sending with tag = %d\n",PF.me,PF_READY_MSGTAG);
1051 fflush(stderr);
1052 }
1053
1054 PF_Send(MASTER, PF_READY_MSGTAG);
1055
1056 if ( PF.log ) {
1057 fprintf(stderr,"[%d] returning from send\n",PF.me);
1058 fflush(stderr);
1059 }
1060
1061 size = fi->POstop - fi->PObuffer - 1;
1062 tag=PF_RecvWbuf(fi->PObuffer,&size,&src);
1063
1064 fi->POfill = fi->PObuffer;
1065 /* Get AN.ninterms which sits in the first 2 WORDs. */
1066 {
1067 LONG ninterms;
1068 UNPACK_LONG(fi->POfill, ninterms);
1069 if ( fi->POfill < fi->POfull ) {
1070 DBGOUT_NINTERMS(2, ("PF.me=%d AN.ninterms=%d PF_linterms=%d ninterms=%d GET\n", (int)PF.me, (int)AN.ninterms, (int)PF_linterms, (int)ninterms));
1071 AN.ninterms = ninterms - 1;
1072 } else {
1073 DBGOUT_NINTERMS(2, ("PF.me=%d AN.ninterms=%d PF_linterms=%d ninterms=%d GETEND\n", (int)PF.me, (int)AN.ninterms, (int)PF_linterms, (int)ninterms));
1074 }
1075 }
1076 fi->POfull = fi->PObuffer + size;
1077 if ( tag == PF_ENDSORT_MSGTAG ) *fi->POfull++ = 0;
1078/*
1079 #] receive new terms from master :
1080*/
1081 }
1082 if ( PF_CurrentBracket ) *PF_CurrentBracket = 0;
1083 }
1084 if ( *fi->POfill == 0 ) {
1085 fi->POfill = fi->POfull = fi->PObuffer;
1086 *term = 0;
1087 goto RegRet;
1088 }
1089 if ( AR.DeferFlag ) {
1090 if ( !PF_CurrentBracket ) {
1091/*
1092 #[ alloc space :
1093*/
1094 PF_CurrentBracket =
1095 (WORD*)Malloc1(AM.MaxTer,"PF_CurrentBracket");
1096 *PF_CurrentBracket = 0;
1097/*
1098 #] alloc space :
1099*/
1100 }
1101 while ( *PF_CurrentBracket ) { /* "for each term in the buffer" */
1102/*
1103 #[ test : bracket & skip if it's equal to the last in PF_CurrentBracket
1104*/
1105 next = fi->POfill;
1106 nextstop = next + *next; nextstop -= ABS(nextstop[-1]);
1107 next++;
1108 last = PF_CurrentBracket+1;
1109 while ( next < nextstop ) {
1110/*
1111 scan the next term and PF_CurrentBracket
1112*/
1113 if ( *last == HAAKJE && *next == HAAKJE ) {
1114/*
1115 the part outside brackets is equal => skip this term
1116*/
1117 PRINTFBUF("PF_GetTerm skips",fi->POfill,*fi->POfill);
1118 break;
1119 }
1120/*
1121 check if the current subterms are equal
1122*/
1123 np = next; next += next[1];
1124 lp = last; last += last[1];
1125 while ( np < next ) if ( *lp++ != *np++ ) goto strip;
1126 }
1127/*
1128 go on to next term
1129*/
1130 fi->POfill += *fi->POfill;
1131 AN.deferskipped++;
1132/*
1133 the usual checks
1134*/
1135 if ( fi->POfill >= fi->POfull || fi->POfull == fi->PObuffer )
1136 goto ReceiveNew;
1137 if ( *fi->POfill == 0 ) {
1138 fi->POfill = fi->POfull = fi->PObuffer;
1139 *term = 0;
1140 goto RegRet;
1141 }
1142/*
1143 #] test :
1144*/
1145 }
1146/*
1147 #[ copy :
1148
1149 this term to CurrentBracket and the part outside of bracket
1150 to WorkSpace at term
1151*/
1152strip:
1153 next = fi->POfill;
1154 nextstop = next + *next; nextstop -= ABS(nextstop[-1]);
1155 next++;
1156 tp++;
1157 lp = PF_CurrentBracket + 1;
1158 while ( next < nextstop ) {
1159 if ( *next == HAAKJE ) {
1160 fi->POfill += *fi->POfill;
1161 while ( next < fi->POfill ) *lp++ = *next++;
1162 *PF_CurrentBracket = lp - PF_CurrentBracket;
1163 *lp = 0;
1164 *tp++ = 1;
1165 *tp++ = 1;
1166 *tp++ = 3;
1167 *term = WORDDIF(tp,term);
1168 PRINTFBUF("PF_GetTerm new brack",PF_CurrentBracket,*PF_CurrentBracket);
1169 PRINTFBUF("PF_GetTerm POfill",fi->POfill,*fi->POfill);
1170 goto RegRet;
1171 }
1172 np = next; next += next[1];
1173 while ( np < next ) *tp++ = *lp++ = *np++;
1174 }
1175 tp = term;
1176/*
1177 #] copy :
1178*/
1179 }
1180
1181 i = *fi->POfill;
1182 while ( i-- ) *tp++ = *fi->POfill++;
1183RegRet:
1184 PRINTFBUF("PF_GetTerm returns",term,*term);
1185 return(*term);
1186}
1187
1188/*
1189 #] PF_GetTerm :
1190 #[ PF_Deferred :
1191*/
1192
1201WORD PF_Deferred(WORD *term, WORD level)
1202{
1203 GETIDENTITY
1204 WORD *bra, *bstop;
1205 WORD *tstart;
1206 FILEHANDLE *fi = AC.RhsExprInModuleFlag && PF.rhsInParallel ? &PF.slavebuf : AR.infile;
1207 WORD *next = fi->POfill;
1208 WORD *termout = AT.WorkPointer;
1209 WORD *oldwork = AT.WorkPointer;
1210
1211 AT.WorkPointer = (WORD *)((UBYTE *)(AT.WorkPointer) + AM.MaxTer);
1212 AR.DeferFlag = 0;
1213
1214 PRINTFBUF("PF_Deferred (Term) ",term,*term);
1215 PRINTFBUF("PF_Deferred (Bracket)",PF_CurrentBracket,*PF_CurrentBracket);
1216
1217 bra = bstop = PF_CurrentBracket;
1218 if ( *bstop > 0 ) {
1219 bstop += *bstop;
1220 bstop -= ABS(bstop[-1]);
1221 }
1222 bra++;
1223 while ( *bra != HAAKJE && bra < bstop ) bra += bra[1];
1224 if ( bra >= bstop ) { /* No deferred action! */
1225 AT.WorkPointer = term + *term;
1226 if ( Generator(BHEAD term,level) ) goto DefCall;
1227 AR.DeferFlag = 1;
1228 AT.WorkPointer = oldwork;
1229 return(0);
1230 }
1231 bstop = bra;
1232 tstart = bra + bra[1];
1233 bra = PF_CurrentBracket;
1234 tstart--;
1235 *tstart = bra + *bra - tstart;
1236 bra++;
1237/*
1238 Status of affairs:
1239 First bracket content starts at tstart.
1240 Next term starts at next.
1241 The outside of the bracket runs from bra = PF_CurrentBracket to bstop.
1242*/
1243 for(;;) {
1244 if ( InsertTerm(BHEAD term,0,AM.rbufnum,tstart,termout,0) < 0 ) {
1245 goto DefCall;
1246 }
1247/*
1248 call Generator with new composed term
1249*/
1250 AT.WorkPointer = termout + *termout;
1251 if ( Generator(BHEAD termout,level) ) goto DefCall;
1252 AT.WorkPointer = termout;
1253 tstart = next + 1;
1254 if ( tstart >= fi->POfull ) goto ThatsIt;
1255 next += *next;
1256/*
1257 compare with current bracket
1258*/
1259 while ( bra <= bstop ) {
1260 if ( *bra != *tstart ) goto ThatsIt;
1261 bra++; tstart++;
1262 }
1263/*
1264 now bra and tstart should both be a HAAKJE
1265*/
1266 bra--; tstart--;
1267 if ( *bra != HAAKJE || *tstart != HAAKJE ) goto ThatsIt;
1268 tstart += tstart[1];
1269 tstart--;
1270 *tstart = next - tstart;
1271 bra = PF_CurrentBracket + 1;
1272 }
1273
1274ThatsIt:
1275/*
1276 AT.WorkPointer = oldwork;
1277*/
1278 AR.DeferFlag = 1;
1279 return(0);
1280DefCall:
1281 MesCall("PF_Deferred");
1282 SETERROR(-1);
1283}
1284
1285/*
1286 #] PF_Deferred :
1287 #[ PF_Wait4Slave :
1288*/
1289
1290static LONG **PF_W4Sstats = 0;
1291
1298static int PF_Wait4Slave(int src)
1299{
1300 int j, tag, next;
1301
1302 tag = PF_ANY_MSGTAG;
1303 PF_CatchErrorMessages(&src, &tag);
1304 PF_Receive(src, tag, &next, &tag);
1305
1306 if ( tag != PF_READY_MSGTAG ) {
1307 MesPrint("[%d] PF_Wait4Slave: received MSGTAG %d",(WORD)PF.me,(WORD)tag);
1308 return(-1);
1309 }
1310 if ( PF_W4Sstats == 0 ) {
1311 PF_W4Sstats = (LONG**)Malloc1(sizeof(LONG*),"");
1312 PF_W4Sstats[0] = (LONG*)Malloc1(PF_STATS_SIZE*sizeof(LONG),"");
1313 }
1314 PF_Unpack(PF_W4Sstats[0],PF_STATS_SIZE,PF_LONG);
1315 PF_Statistics(PF_W4Sstats,next);
1316
1317 PF_Unpack(&j,1,PF_INT);
1318
1319 if ( j ) {
1320/*
1321 actions depending on rest of information in last message
1322*/
1323 }
1324 return(next);
1325}
1326
1327/*
1328 #] PF_Wait4Slave :
1329 #[ PF_Wait4SlaveIP :
1330*/
1331/*
1332 array of expression numbers for PF_InParallel processor.
1333 Each time the master sends expression "i" to the slave
1334 "next" it sets partodoexr[next]=i:
1335*/
1336static WORD *partodoexr=NULL;
1337
1345static int PF_Wait4SlaveIP(int *src)
1346{
1347 int j,tag,next;
1348
1349 tag = PF_ANY_MSGTAG;
1350 PF_CatchErrorMessages(src, &tag);
1351 PF_Receive(*src, tag, &next, &tag);
1352 *src=tag;
1353 if ( PF_W4Sstats == 0 ) {
1354 PF_W4Sstats = (LONG**)Malloc1(sizeof(LONG*),"");
1355 PF_W4Sstats[0] = (LONG*)Malloc1(PF_STATS_SIZE*sizeof(LONG),"");
1356 }
1357
1358 PF_Unpack(PF_W4Sstats[0],PF_STATS_SIZE,PF_LONG);
1359 if ( tag == PF_DATA_MSGTAG )
1360 AR.CurExpr = partodoexr[next];
1361 PF_Statistics(PF_W4Sstats,next);
1362
1363 PF_Unpack(&j,1,PF_INT);
1364
1365 if ( j ) {
1366 /* actions depending on rest of information in last message */
1367 }
1368
1369 return(next);
1370}
1371/*
1372 #] PF_Wait4SlaveIP :
1373 #[ PF_WaitAllSlaves :
1374*/
1375
1384static int PF_WaitAllSlaves(void)
1385{
1386 int i, readySlaves, tag, next = PF_ANY_SOURCE;
1387 UBYTE *has_sent = 0;
1388
1389 has_sent = (UBYTE*)Malloc1(sizeof(UBYTE)*(PF.numtasks + 1),"PF_WaitAllSlaves");
1390 for ( i = 0; i < PF.numtasks; i++ ) has_sent[i] = 0;
1391
1392 for ( readySlaves = 1; readySlaves < PF.numtasks; ) {
1393 if ( next != PF_ANY_SOURCE) { /*Go to the next slave:*/
1394 do{ /*Note, here readySlaves<PF.numtasks, so this loop can't be infinite*/
1395 if ( ++next >= PF.numtasks ) next = 1;
1396 } while ( has_sent[next] == 1 );
1397 }
1398/*
1399 Here PF_ProbeWithCatchingErrorMessages() is BLOCKING function if next = PF_ANY_SOURCE:
1400*/
1401 tag = PF_ProbeWithCatchingErrorMessages(&next);
1402/*
1403 Here next != PF_ANY_SOURCE
1404*/
1405 switch ( tag ) {
1406 case PF_BUFFER_MSGTAG:
1407 case PF_ENDBUFFER_MSGTAG:
1408/*
1409 Slaves are ready to send their results back
1410*/
1411 if ( has_sent[next] == 0 ) {
1412 has_sent[next] = 1;
1413 readySlaves++;
1414 }
1415 else { /*error?*/
1416 fprintf(stderr,"ERROR next=%d tag=%d\n",next,tag);
1417 }
1418/*
1419 Note, we do NOT read results here! Messages from these slaves will be read
1420 only after all slaves are ready, further in caller function
1421*/
1422 break;
1423 case 0:
1424/*
1425 The slave is not ready. Just go to the next slave.
1426 It may appear that there are no more ready slaves, and the master
1427 will wait them in infinite loop. Stupid situation - the master can
1428 receive buffers from ready slaves!
1429*/
1430#ifdef PF_WITH_SCHED_YIELD
1431/*
1432 Relinquish the processor:
1433*/
1434 sched_yield();
1435#endif
1436 break;
1437 case PF_DATA_MSGTAG:
1438 tag=next;
1439 next=PF_Wait4SlaveIP(&tag);
1440/*
1441 tag must be == PF_DATA_MSGTAG!
1442*/
1443 PF_Statistics(PF_stats,0);
1444 PF_Slave2MasterIP(next);
1445 PF_Master2SlaveIP(next,NULL);
1446 if ( has_sent[next] == 0 ) {
1447 has_sent[next]=1;
1448 readySlaves++;
1449 }else{
1450 /*error?*/
1451 fprintf(stderr,"ERROR next=%d tag=%d\n",next,tag);
1452 }/*if ( has_sent[next] == 0 )*/
1453 break;
1454 case PF_EMPTY_MSGTAG:
1455 tag=next;
1456 next=PF_Wait4SlaveIP(&tag);
1457/*
1458 tag must be == PF_EMPTY_MSGTAG!
1459*/
1460 PF_Master2SlaveIP(next,NULL);
1461 if ( has_sent[next] == 0 ) {
1462 has_sent[next]=1;
1463 readySlaves++;
1464 }else{
1465 /*error?*/
1466 fprintf(stderr,"ERROR next=%d tag=%d\n",next,tag);
1467 }/*if ( has_sent[next] == 0 )*/
1468 break;
1469 case PF_READY_MSGTAG:
1470/*
1471 idle slave
1472 May be only PF_READY_MSGTAG:
1473*/
1474 next = PF_Wait4Slave(next);
1475 if ( next == -1 ) return(next); /*Cannot be!*/
1476 if ( has_sent[0] == 0 ) { /*Send the last chunk to the slave*/
1477 PF.sbuf->active = 0;
1478 has_sent[0] = 1;
1479 }
1480 else {
1481/*
1482 Last chunk was sent, so just send to slave ENDSORT
1483 AN.ninterms must be sent because the slave expects it:
1484*/
1485 PACK_LONG(PF.sbuf->fill[next], AN.ninterms);
1486/*
1487 This will tell to the slave that there are no more terms:
1488*/
1489 *(PF.sbuf->fill[next])++ = 0;
1490 PF.sbuf->active = next;
1491 }
1492/*
1493 Send ENDSORT
1494*/
1495 PF_ISendSbuf(next,PF_ENDSORT_MSGTAG);
1496 break;
1497 default:
1498/*
1499 Error?
1500 Indicates the error. This will force exit from the main loop:
1501*/
1502 MesPrint("!!!Unexpected MPI message src=%d tag=%d.", next, tag);
1503 readySlaves = PF.numtasks+1;
1504 break;
1505 }
1506 }
1507
1508 if ( has_sent ) M_free(has_sent,"PF_WaitAllSlaves");
1509/*
1510 0 on success (exit from the main loop by loop condition), or -1 if fails
1511 (exit from the main loop since readySlaves=PF.numtasks+1):
1512*/
1513 return(PF.numtasks-readySlaves);
1514}
1515
1516/*
1517 #] PF_WaitAllSlaves :
1518 #[ PF_Processor :
1519*/
1520
1533int PF_Processor(EXPRESSIONS e, WORD i, WORD LastExpression)
1534{
1535 GETIDENTITY
1536 WORD *term = AT.WorkPointer;
1537 LONG dd = 0;
1538 PF_BUFFER *sb = PF.sbuf;
1539 WORD j, *s, next;
1540 LONG size, cpu;
1541 POSITION position;
1542 int k, src, tag;
1543 FILEHANDLE *oldoutfile = AR.outfile;
1544
1545 if ( ( (WORD *)(((UBYTE *)(AT.WorkPointer)) + AM.MaxTer ) ) > AT.WorkTop ) {
1546 MesWork();
1547 }
1548
1549 /* For redefine statements. */
1550 if ( AC.numpfirstnum > 0 ) {
1551 for ( j = 0; j < AC.numpfirstnum; j++ ) {
1552 AC.inputnumbers[j] = -1;
1553 }
1554 }
1555
1556 if ( AC.mparallelflag != PARALLELFLAG ) return(0);
1557
1558 PF_processing = 1;
1559
1560 if ( PF.me == MASTER ) {
1561/*
1562 #[ Master:
1563 #[ write prototype to outfile:
1564*/
1565 WORD oldBracketOn = AR.BracketOn;
1566 WORD *oldBrackBuf = AT.BrackBuf;
1567 WORD oldbracketindexflag = AT.bracketindexflag;
1568
1569 LONG maxinterms; /* the maximum number of terms in the bucket */
1570 int cmaxinterms; /* a variable controling the transition of maxinterms */
1571 LONG termsinbucket; /* the number of filled terms in the bucket */
1572 LONG ProcessBucketSize = AC.mProcessBucketSize;
1573
1574 if ( PF.log && AC.CModule >= PF.log )
1575 MesPrint("[%d] working on expression %s in module %l",PF.me,EXPRNAME(i),AC.CModule);
1576 if ( GetTerm(BHEAD term) <= 0 ) {
1577 MesPrint("[%d] Expression %d has problems in scratchfile",PF.me,i);
1578 return(-1);
1579 }
1580 term[3] = i;
1581 if ( AR.outtohide ) {
1582 SeekScratch(AR.hidefile,&position);
1583 e->onfile = position;
1584 if ( PutOut(BHEAD term,&position,AR.hidefile,0) < 0 ) return(-1);
1585 }
1586 else {
1587 SeekScratch(AR.outfile,&position);
1588 e->onfile = position;
1589 if ( PutOut(BHEAD term,&position,AR.outfile,0) < 0 ) return(-1);
1590 }
1591 AR.DeferFlag = 0; /* The master leave the brackets!!! */
1592 AR.Eside = RHSIDE;
1593 if ( ( e->vflags & ISFACTORIZED ) != 0 ) {
1594 AR.BracketOn = 1;
1595 AT.BrackBuf = AM.BracketFactors;
1596 AT.bracketindexflag = 1;
1597 }
1598 if ( AT.bracketindexflag > 0 ) OpenBracketIndex(i);
1599/*
1600 #] write prototype to outfile:
1601 #[ initialize sendbuffer if necessary:
1602
1603 the size of the sendbufs is:
1604 MIN(1/PF.numtasks*(AT.SS->sBufsize+AT.SS->lBufsize),AR.infile->POsize)
1605 No allocation for extra buffers necessary, just make sb->buf... point
1606 to the right places in the sortbuffers.
1607*/
1608 NewSort(BHEAD0); /* we need AT.SS to be set for this!!! */
1609 if ( sb == 0 || sb->buff[0] != AT.SS->lBuffer ) {
1610 size = (LONG)((AT.SS->sTop2 - AT.SS->lBuffer)/(PF.numtasks));
1611 if ( size > (LONG)(AR.infile->POsize/sizeof(WORD) - 1) )
1612 size = AR.infile->POsize/sizeof(WORD) - 1;
1613 if ( sb == 0 ) {
1614 if ( ( sb = PF_AllocBuf(PF.numtasks,size*sizeof(WORD),PF.numtasks) ) == NULL )
1615 return(-1);
1616 }
1617 sb->buff[0] = AT.SS->lBuffer;
1618 sb->full[0] = sb->fill[0] = sb->buff[0];
1619 for ( j = 1; j < PF.numtasks; j++ ) {
1620 sb->stop[j-1] = sb->buff[j] = sb->buff[j-1] + size;
1621 }
1622 sb->stop[PF.numtasks-1] = sb->buff[PF.numtasks-1] + size;
1623 PF.sbuf = sb;
1624 }
1625 for ( j = 0; j < PF.numtasks; j++ ) {
1626 sb->full[j] = sb->fill[j] = sb->buff[j];
1627 }
1628/*
1629 #] initialize sendbuffer if necessary:
1630 #[ loop for all terms in infile:
1631*/
1632 /*
1633 * The initial value of maxinterms is determined by the user given
1634 * ProcessBucketSize and the number of terms in the current expression.
1635 * We make the initial maxinterms smaller, so that we get the all
1636 * workers busy as soon as possible.
1637 */
1638 maxinterms = ProcessBucketSize / 100;
1639 if ( maxinterms > e->counter / (PF.numtasks - 1) / 4 )
1640 maxinterms = e->counter / (PF.numtasks - 1) / 4;
1641 if ( maxinterms < 1 ) maxinterms = 1;
1642 cmaxinterms = 0;
1643 /*
1644 * Copy them always to sb->buff[0]. When that is full, wait for
1645 * the next slave to accept terms, exchange sb->buff[0] and
1646 * sb->buff[next], send sb->buff[next] to next slave and go on
1647 * filling the now empty sb->buff[0].
1648 */
1649 AN.ninterms = 0;
1650 termsinbucket = 0;
1651 PACK_LONG(sb->fill[0], 1);
1652 while ( GetTerm(BHEAD term) ) {
1653 AN.ninterms++; dd = AN.deferskipped;
1654 if ( AC.CollectFun && *term <= (LONG)(AM.MaxTer/(2*sizeof(WORD))) ) {
1655 if ( GetMoreTerms(term) < 0 ) {
1656 LowerSortLevel(); return(-1);
1657 }
1658 }
1659 PRINTFBUF("PF_Processor gets",term,*term);
1660 if ( termsinbucket >= maxinterms || sb->fill[0] + *term >= sb->stop[0] ) {
1661 next = PF_Wait4Slave(PF_ANY_SOURCE);
1662
1663 sb->fill[next] = sb->fill[0];
1664 sb->full[next] = sb->full[0];
1665 SWAP(sb->stop[next], sb->stop[0]);
1666 SWAP(sb->buff[next], sb->buff[0]);
1667 sb->fill[0] = sb->full[0] = sb->buff[0];
1668 sb->active = next;
1669
1670 PF_ISendSbuf(next,PF_TERM_MSGTAG);
1671
1672 /* Initialize the next bucket. */
1673 termsinbucket = 0;
1674 PACK_LONG(sb->fill[0], AN.ninterms);
1675 /*
1676 * For the "slow startup". We double maxinterms up to ProcessBucketSize
1677 * after (hopefully) the all workers got some terms.
1678 */
1679 if ( cmaxinterms >= PF.numtasks - 2 ) {
1680 maxinterms *= 2;
1681 if ( maxinterms >= ProcessBucketSize ) {
1682 cmaxinterms = -1;
1683 maxinterms = ProcessBucketSize;
1684 }
1685 }
1686 else if ( cmaxinterms >= 0 ) {
1687 cmaxinterms++;
1688 }
1689 }
1690 j = *(s = term);
1691 NCOPY(sb->fill[0], s, j);
1692 termsinbucket++;
1693 }
1694 /* NOTE: The last chunk will be sent to a slave at EndSort() => PF_EndSort()
1695 * => PF_WaitAllSlaves(). */
1696 AN.ninterms += dd;
1697/*
1698 #] loop for all terms in infile:
1699 #[ Clean up & EndSort:
1700*/
1701 if ( LastExpression ) {
1702 UpdateMaxSize();
1703 if ( AR.infile->handle >= 0 ) {
1704 CloseFile(AR.infile->handle);
1705 AR.infile->handle = -1;
1706 remove(AR.infile->name);
1707 PUTZERO(AR.infile->POposition);
1708 }
1709 AR.infile->POfill = AR.infile->POfull = AR.infile->PObuffer;
1710 }
1711 if ( AR.outtohide ) AR.outfile = AR.hidefile;
1712 PF.parallel = 1;
1713 if ( EndSort(BHEAD AM.S0->sBuffer,0) < 0 ) return(-1);
1714 PF.parallel = 0;
1715 if ( AR.outtohide ) {
1716 AR.outfile = oldoutfile;
1717 AR.hidefile->POfull = AR.hidefile->POfill;
1718 }
1719 UpdateMaxSize();
1720 AR.BracketOn = oldBracketOn;
1721 AT.BrackBuf = oldBrackBuf;
1722 if ( ( e->vflags & TOBEFACTORED ) != 0 )
1724 else if ( ( ( e->vflags & TOBEUNFACTORED ) != 0 )
1725 && ( ( e->vflags & ISFACTORIZED ) != 0 ) )
1727 AT.bracketindexflag = oldbracketindexflag;
1728 AR.GetFile = 0;
1729 AR.outtohide = 0;
1730 /*
1731 * NOTE: e->numdummies, e->vflags and AR.exprflags will be updated
1732 * after gathering the information from all slaves.
1733 */
1734/*
1735 #] Clean up & EndSort:
1736 #[ Collect (stats,prepro,...):
1737*/
1738 PF_PostEndSortBarrier();
1739
1740 DBGOUT_NINTERMS(1, ("PF.me=%d AN.ninterms=%d ENDSORT\n", (int)PF.me, (int)AN.ninterms));
1741 PF_CatchErrorMessagesForAll();
1742 e->numdummies = 0;
1743 for ( k = 1; k < PF.numtasks; k++ ) {
1744 PF_LongSingleReceive(PF_ANY_SOURCE, PF_ENDSORT_MSGTAG, &src, &tag);
1745 PF_LongSingleUnpack(PF_stats[src], PF_STATS_SIZE, PF_LONG);
1746 {
1747 WORD numdummies, expchanged;
1748 PF_LongSingleUnpack(&numdummies, 1, PF_WORD);
1749 PF_LongSingleUnpack(&expchanged, 1, PF_WORD);
1750 if ( e->numdummies < numdummies ) e->numdummies = numdummies;
1751 AR.expchanged |= expchanged;
1752 }
1753 /* Now handle redefined preprocessor variables. */
1754 if ( AC.numpfirstnum > 0 ) PF_UnpackRedefinedPreVars();
1755 }
1756 /* Broadcast redefined preprocessor variables. */
1757 if ( AC.numpfirstnum > 0 ) {
1758 int RetCode = PF_BroadcastRedefinedPreVars();
1759 if ( RetCode ) return RetCode;
1760 }
1761 if ( ! AC.OldParallelStats ) {
1762 /* Now we can calculate AT.SS->GenTerms from the statistics of the slaves. */
1763 LONG genterms = 0;
1764 for ( k = 1; k < PF.numtasks; k++ ) {
1765 genterms += PF_stats[k][3];
1766 }
1767 AT.SS->GenTerms = genterms;
1768 WriteStats(&PF_exprsize, STATSPOSTSORT, NOCHECKLOGTYPE);
1769 Expressions[AR.CurExpr].size = PF_exprsize;
1770 }
1771 PF_Statistics(PF_stats,0);
1772/*
1773 #] Collect (stats,prepro,...):
1774 #[ Update flags :
1775*/
1776 if ( AM.S0->TermsLeft ) e->vflags &= ~ISZERO;
1777 else e->vflags |= ISZERO;
1778 if ( AR.expchanged == 0 ) e->vflags |= ISUNMODIFIED;
1779 if ( AM.S0->TermsLeft ) AR.expflags |= ISZERO;
1780 if ( AR.expchanged ) AR.expflags |= ISUNMODIFIED;
1781/*
1782 #] Update flags :
1783 #] Master:
1784*/
1785 }
1786 else {
1787/*
1788 #[ Slave :
1789*/
1790/*
1791 #[ Generator Loop & EndSort :
1792
1793 loop for all terms to get from master, call Generator for each of them
1794 then call EndSort and do cleanup (to be implemented)
1795*/
1796 WORD oldBracketOn = AR.BracketOn;
1797 WORD *oldBrackBuf = AT.BrackBuf;
1798 WORD oldbracketindexflag = AT.bracketindexflag;
1799
1800 /* For redefine statements. */
1801 if ( AC.numpfirstnum > 0 ) {
1802 for ( j = 0; j < AC.numpfirstnum; j++ ) {
1803 AC.inputnumbers[j] = -1;
1804 }
1805 }
1806
1807 SeekScratch(AR.outfile,&position);
1808 e->onfile = position;
1809 AR.DeferFlag = AC.ComDefer;
1810 AR.Eside = RHSIDE;
1811 if ( ( e->vflags & ISFACTORIZED ) != 0 ) {
1812 AR.BracketOn = 1;
1813 AT.BrackBuf = AM.BracketFactors;
1814 AT.bracketindexflag = 1;
1815 }
1816 NewSort(BHEAD0);
1817 AR.MaxDum = AM.IndDum;
1818 AN.ninterms = 0;
1819 PF_linterms = 0;
1820 PF.parallel = 1;
1821 {
1822 FILEHANDLE *fi = AC.RhsExprInModuleFlag && PF.rhsInParallel ? &PF.slavebuf : AR.infile;
1823 fi->POfull = fi->POfill = fi->PObuffer;
1824 }
1825 /* FIXME: AN.ninterms is still broken when AN.deferskipped is non-zero.
1826 * It still needs some work, also in PF_GetTerm(). (TU 30 Aug 2011) */
1827 while ( PF_GetTerm(term) ) {
1828 PF_linterms++; AN.ninterms++; dd = AN.deferskipped;
1829 AT.WorkPointer = term + *term;
1830 AN.RepPoint = AT.RepCount + 1;
1831 if ( ( e->vflags & ISFACTORIZED ) != 0 && term[1] == HAAKJE ) {
1832 StoreTerm(BHEAD term);
1833 continue;
1834 }
1835 if ( AR.DeferFlag ) {
1836 AR.CurDum = AN.IndDum = Expressions[AR.CurExpr].numdummies + AM.IndDum;
1837 }
1838 else {
1839 AN.IndDum = AM.IndDum;
1840 AR.CurDum = ReNumber(BHEAD term);
1841 }
1842 if ( AC.SymChangeFlag ) MarkDirty(term,DIRTYSYMFLAG);
1843 if ( AN.ncmod ) {
1844 if ( ( AC.modmode & ALSOFUNARGS ) != 0 ) MarkDirty(term,DIRTYFLAG);
1845 else if ( AR.PolyFun ) PolyFunDirty(BHEAD term);
1846 }
1847 else if ( AC.PolyRatFunChanged ) PolyFunDirty(BHEAD term);
1848 if ( ( AR.PolyFunType == 2 ) && ( AC.PolyRatFunChanged == 0 )
1849 && ( e->status == LOCALEXPRESSION || e->status == GLOBALEXPRESSION ) ) {
1850 PolyFunClean(BHEAD term);
1851 }
1852 if ( Generator(BHEAD term,0) ) {
1854 Terminate(-1);
1855 }
1856 PF_linterms += dd; AN.ninterms += dd;
1857 }
1858 PF_linterms += dd; AN.ninterms += dd;
1859 {
1860 /*
1861 * EndSort() overrides AR.outfile->PObuffer etc. (See also PF_EndSort()),
1862 * but it causes a problem because
1863 * (1) PF_EndSort() sets AR.outfile->PObuffer to a send-buffer.
1864 * (2) RevertScratch() clears AR.infile, but then swaps buffers of AR.infile
1865 * and AR.outfile.
1866 * (3) RHS expressions are stored to AR.infile->PObuffer.
1867 * (4) Again, PF_EndSort() sets AR.outfile->PObuffer, but now AR.outfile->PObuffer
1868 * == AR.infile->PObuffer because of (1) and (2).
1869 * (5) The result goes to AR.outfile. This breaks the RHS expressions,
1870 * which may be needed for the next expression.
1871 * Solution: backup & restore AR.outfile->PObuffer etc. (TU 14 Sep 2011)
1872 */
1873 FILEHANDLE *fout = AR.outfile;
1874 WORD *oldbuff = fout->PObuffer;
1875 WORD *oldstop = fout->POstop;
1876 LONG oldsize = fout->POsize;
1877 if ( EndSort(BHEAD AM.S0->sBuffer, 0) < 0 ) return -1;
1878 fout->PObuffer = oldbuff;
1879 fout->POstop = oldstop;
1880 fout->POsize = oldsize;
1881 fout->POfill = fout->POfull = fout->PObuffer;
1882 }
1883 AR.BracketOn = oldBracketOn;
1884 AT.BrackBuf = oldBrackBuf;
1885 AT.bracketindexflag = oldbracketindexflag;
1886/*
1887 #] Generator Loop & EndSort :
1888 #[ Collect (stats,prepro...) :
1889*/
1890 PF_PostEndSortBarrier();
1891
1892 DBGOUT_NINTERMS(1, ("PF.me=%d AN.ninterms=%d PF_linterms=%d ENDSORT\n", (int)PF.me, (int)AN.ninterms, (int)PF_linterms));
1894 cpu = TimeCPU(1);
1895 size = 0;
1896 PF_LongSinglePack(&cpu, 1, PF_LONG);
1897 PF_LongSinglePack(&size, 1, PF_LONG);
1898 PF_LongSinglePack(&PF_linterms, 1, PF_LONG);
1899 PF_LongSinglePack(&AM.S0->GenTerms, 1, PF_LONG);
1900 PF_LongSinglePack(&AM.S0->TermsLeft, 1, PF_LONG);
1901 {
1902 WORD numdummies = AR.MaxDum - AM.IndDum;
1903 PF_LongSinglePack(&numdummies, 1, PF_WORD);
1904 PF_LongSinglePack(&AR.expchanged, 1, PF_WORD);
1905 }
1906 /* Now handle redefined preprocessor variables. */
1907 if ( AC.numpfirstnum > 0 ) PF_PackRedefinedPreVars();
1908 PF_LongSingleSend(MASTER, PF_ENDSORT_MSGTAG);
1909 /* Broadcast redefined preprocessor variables. */
1910 if ( AC.numpfirstnum > 0 ) {
1911 int RetCode = PF_BroadcastRedefinedPreVars();
1912 if ( RetCode ) return RetCode;
1913 }
1914/*
1915 #] Collect (stats,prepro...) :
1916
1917 This operation is moved to the beginning of each block, see PreProcessor
1918 in pre.c.
1919
1920 #] Slave :
1921*/
1922 if ( PF.log ) {
1923 UBYTE lbuf[24];
1924 NumToStr(lbuf,AC.CModule);
1925 fprintf(stderr,"[%d|%s] Endsort,Collect,Broadcast done\n",PF.me,lbuf);
1926 fflush(stderr);
1927 }
1928 }
1929 return(0);
1930}
1931
1932/*
1933 #] PF_Processor :
1934 #] proces.c :
1935 #[ startup :, prepro & compile
1936 #[ PF_Init :
1937*/
1938
1947int PF_Init(int *argc, char ***argv)
1948{
1949/*
1950 this should definitely be somewhere else ...
1951*/
1952 PF_CurrentBracket = 0;
1953
1954 PF.numtasks = 0; /* number of tasks, is determined in PF_LibInit ! */
1955 PF.numsbufs = 2; /* might be changed by the environment variable on the master ! */
1956 PF.numrbufs = 2; /* might be changed by the environment variable on the master ! */
1957
1958 int ret = PF_LibInit(argc,argv);
1959 if (ret) { return ret; }
1960 PF_RealTime(PF_RESET);
1961
1962 PF.log = 0;
1963 PF.parallel = 0;
1964 PF_statsinterval = 10;
1965 PF.rhsInParallel=1;
1966 PF.exprbufsize=4096;/*in WORDs*/
1967
1968#ifdef PF_WITHGETENV
1969 if ( PF.me == MASTER ) {
1970 char *c;
1971/*
1972 get these from the environment at the moment should be in setfile/tail
1973*/
1974 if ( ( c = getenv("PF_LOG") ) != 0 ) {
1975 if ( *c ) PF.log = (int)atoi(c);
1976 else PF.log = 1;
1977 fprintf(stderr,"[%d] changing PF.log to %d\n",PF.me,PF.log);
1978 fflush(stderr);
1979 }
1980 if ( ( c = (char*)getenv("PF_RBUFS") ) != 0 ) {
1981 PF.numrbufs = (int)atoi(c);
1982 fprintf(stderr,"[%d] changing numrbufs to: %d\n",PF.me,PF.numrbufs);
1983 fflush(stderr);
1984 }
1985 if ( ( c = (char*)getenv("PF_SBUFS") ) != 0 ) {
1986 PF.numsbufs = (int)atoi(c);
1987 fprintf(stderr,"[%d] changing numsbufs to: %d\n",PF.me,PF.numsbufs);
1988 fflush(stderr);
1989 }
1990 if ( PF.numsbufs > 10 ) PF.numsbufs = 10;
1991 if ( PF.numsbufs < 1 ) PF.numsbufs = 1;
1992 if ( PF.numrbufs > 2 ) PF.numrbufs = 2;
1993 if ( PF.numrbufs < 1 ) PF.numrbufs = 1;
1994
1995 if ( ( c = getenv("PF_STATS") ) ) {
1996 UBYTE lbuf[24];
1997 PF_statsinterval = (int)atoi(c);
1998 NumToStr(lbuf,PF_statsinterval);
1999 fprintf(stderr,"[%d] changing PF_statsinterval to %s\n",PF.me,lbuf);
2000 fflush(stderr);
2001 if ( PF_statsinterval < 1 ) PF_statsinterval = 10;
2002 }
2003 }
2004#endif
2005/*
2006 #[ Broadcast settings from getenv: could also be done in PF_DoSetup
2007*/
2008 if ( PF.me == MASTER ) {
2010 PF_Pack(&PF.log,1,PF_INT);
2011 PF_Pack(&PF.numrbufs,1,PF_WORD);
2012 PF_Pack(&PF.numsbufs,1,PF_WORD);
2013 }
2014 PF_Broadcast();
2015 if ( PF.me != MASTER ) {
2016 PF_Unpack(&PF.log,1,PF_INT);
2017 PF_Unpack(&PF.numrbufs,1,PF_WORD);
2018 PF_Unpack(&PF.numsbufs,1,PF_WORD);
2019 if ( PF.log ) {
2020 fprintf(stderr, "[%d] log=%d rbufs=%d sbufs=%d\n",
2021 PF.me, PF.log, PF.numrbufs, PF.numsbufs);
2022 fflush(stderr);
2023 }
2024 }
2025/*
2026 #] Broadcast settings from getenv:
2027*/
2028 return(0);
2029}
2030/*
2031 #] PF_Init :
2032 #[ PF_PreTerminate :
2033*/
2034
2041void PF_PreTerminate(int errorcode)
2042{
2043 if ( errorcode != 0 && PF_processing ) {
2044 PF_processing = 0;
2045 PF_RaiseRuntimeError();
2046 }
2047}
2048
2049/*
2050 #] PF_PreTerminate :
2051 #[ PF_Terminate :
2052*/
2053
2061int PF_Terminate(int errorcode)
2062{
2063 return PF_LibTerminate(errorcode);
2064}
2065
2066/*
2067 #] PF_Terminate :
2068 #[ PF_GetSlaveTimes :
2069*/
2070
2078{
2079 LONG slavetimes = 0;
2080 LONG t = PF.me == MASTER ? 0 : AM.SumTime + TimeCPU(1);
2081 int ret = PF_Reduce(&t, &slavetimes, 1, PF_LONG, MPI_SUM, MASTER);
2082 CHECK(ret == 0);
2083 return slavetimes;
2084}
2085
2086/*
2087 #] PF_GetSlaveTimes :
2088 #] startup :
2089 #[ PF_BroadcastNumber :
2090*/
2091
2099{
2100#ifdef PF_DEBUG_BCAST_LONG
2101 if ( PF.me == MASTER ) {
2102 MesPrint(">> Broadcast LONG: %l", x);
2103 }
2104#endif
2105 PF_Bcast(&x, sizeof(LONG));
2106 return x;
2107}
2108
2109/*
2110 #] PF_BroadcastNumber :
2111 #[ PF_BroadcastBuffer :
2112*/
2113
2125void PF_BroadcastBuffer(WORD **buffer, LONG *length)
2126{
2127 WORD *p;
2128 LONG rest;
2129#ifdef PF_DEBUG_BCAST_BUF
2130 if ( PF.me == MASTER ) {
2131 MesPrint(">> Broadcast Buffer: length=%l", *length);
2132 }
2133#endif
2134 /* Initialize the buffer on the slaves. */
2135 if ( PF.me != MASTER ) {
2136 *buffer = NULL;
2137 }
2138 /* Broadcast the length of the buffer. */
2139 *length = PF_BroadcastNumber(*length);
2140 if ( *length <= 0 ) return;
2141 /* Allocate the buffer on the slaves. */
2142 if ( PF.me != MASTER ) {
2143 *buffer = (WORD *)Malloc1(*length * sizeof(WORD), "PF_BroadcastBuffer");
2144 }
2145 /* Broadcast the data in the buffer. */
2146 p = *buffer;
2147 rest = *length;
2148 while ( rest > 0 ) {
2149 int l = rest < (LONG)PF.exprbufsize ? (int)rest : PF.exprbufsize;
2150 PF_Bcast(p, l * sizeof(WORD));
2151 p += l;
2152 rest -= l;
2153 }
2154}
2155
2156/*
2157 #] PF_BroadcastBuffer :
2158 #[ PF_BroadcastString :
2159*/
2160
2167int PF_BroadcastString(UBYTE *str)
2168{
2169 int clength = 0;
2170/*
2171 If string does not fit to the PF_buffer, it
2172 will be split into chunks. Next chunk is started at str+clength
2173*/
2174 UBYTE *cstr=str;
2175/*
2176 Note, compilation is performed INDEPENDENTLY on AC.mparallelflag!
2177 No if ( AC.mparallelflag == PARALLELFLAG ) !!
2178*/
2179 do {
2180 cstr += clength; /*at each step for all slaves and master */
2181
2182 if ( MASTER == PF.me ) { /*Pack str*/
2183/*
2184 initialize buffers
2185*/
2186 if ( PF_PreparePack() != 0 ) Terminate(-1);
2187 if ( ( clength = PF_PackString(cstr) ) <0 ) Terminate(-1);
2188 }
2189 PF_Broadcast();
2190
2191 if ( MASTER != PF.me ) {
2192/*
2193 Slave - unpack received string
2194 For slaves buffers are initialised automatically.
2195*/
2196 if ( ( clength = PF_UnpackString(cstr) ) < 0 ) Terminate(-1);
2197 }
2198 } while ( cstr[clength-1] != '\0' );
2199 return (0);
2200}
2201
2202/*
2203 #] PF_BroadcastString :
2204 #[ PF_BroadcastPreDollar :
2205*/
2206
2222int PF_BroadcastPreDollar(WORD **dbuffer, LONG *newsize, int *numterms)
2223{
2224 int err = 0;
2225 LONG i;
2226/*
2227 Note, compilation is performed INDEPENDENTLY on AC.mparallelflag!
2228 No if(AC.mparallelflag==PARALLELFLAG) !!
2229*/
2230 if ( MASTER == PF.me ) {
2231/*
2232 The problem is that sometimes dollar variables are longer
2233 than PF_packbuf! So we split long expression into chunks.
2234 There are n filled chunks and one partially filled chunk:
2235*/
2236 LONG n = ((*newsize)+1)/PF_maxDollarChunkSize;
2237/*
2238 ...and one more chunk for the rest; if the expression fits to
2239 the buffer without splitting, the latter will be the only one.
2240
2241 PF_maxDollarChunkSize is the maximal number of items fitted to
2242 the buffer. It is calculated in PF_LibInit() in mpi.c.
2243 PF_maxDollarChunkSize is calculated for the first step, when
2244 two fields (numterms and newsize, see below) are already packed.
2245 For simplicity, this value is used also for all steps, in
2246 despite of it is a bit less than maximally available space.
2247*/
2248 WORD *thechunk = *dbuffer;
2249
2250 err = PF_PreparePack(); /* initialize buffers */
2251 err |= PF_Pack(numterms,1,PF_INT);
2252 err |= PF_Pack(newsize,1,PF_LONG); /* pack the size */
2253/*
2254 Pack and broadcast completely filled chunks.
2255 It may happen, this loop is not entered at all:
2256*/
2257 for ( i = 0; i < n; i++ ) {
2258 err |= PF_Pack(thechunk,PF_maxDollarChunkSize,PF_WORD);
2259 err |= PF_Broadcast();
2260 thechunk +=PF_maxDollarChunkSize;
2262 }
2263/*
2264 Pack and broadcast the rest:
2265*/
2266 if ( ( n = ( (*newsize)+1)%PF_maxDollarChunkSize ) != 0 ) {
2267 err |= PF_Pack(thechunk,n,PF_WORD);
2268 err |= PF_Broadcast();
2269 }
2270#ifdef PF_DEBUG_BCAST_PREDOLLAR
2271 MesPrint(">> Broadcast PreDollar: newsize=%d numterms=%d", (int)*newsize, *numterms);
2272#endif
2273 }
2274 if ( MASTER != PF.me ) { /* Slave - unpack received buffer */
2275 WORD *thechunk;
2276 LONG n, therest, thesize;
2277 err |= PF_Broadcast();
2278 err |=PF_Unpack(numterms,1,PF_INT);
2279 err |=PF_Unpack(newsize,1,PF_LONG);
2280/*
2281 Now we know the buffer size.
2282*/
2283 thesize = (*newsize)+1;
2284/*
2285 Evaluate the number of completely filled chunks. The last step must be
2286 treated separately, so -1:
2287*/
2288 n = (thesize/PF_maxDollarChunkSize) - 1;
2289/*
2290 Note, here n can be <0, this is ok.
2291*/
2292 therest = thesize % PF_maxDollarChunkSize;
2293 thechunk = *dbuffer =
2294 (WORD*)Malloc1( thesize * sizeof(WORD),"$-buffer slave");
2295 if ( thechunk == NULL ) return(err|4);
2296/*
2297 Unpack completely filled chunks and receive the next portion.
2298 It may happen, this loop is not entered at all:
2299*/
2300 for ( i = 0; i < n; i++ ) {
2301 err |= PF_Unpack(thechunk,PF_maxDollarChunkSize,PF_WORD);
2302 thechunk += PF_maxDollarChunkSize;
2303 err |= PF_Broadcast();
2304 }
2305/*
2306 Now the last completely filled chunk:
2307*/
2308 if ( n >= 0 ) {
2309 err |= PF_Unpack(thechunk,PF_maxDollarChunkSize,PF_WORD);
2310 thechunk += PF_maxDollarChunkSize;
2311 if ( therest != 0 ) err |= PF_Broadcast();
2312 }
2313/*
2314 Unpack the rest (it is already received!):
2315*/
2316 if ( therest != 0 ) err |= PF_Unpack(thechunk,therest,PF_WORD);
2317 }
2318 return (err);
2319}
2320
2321/*
2322 #] PF_BroadcastPreDollar :
2323 #[ Synchronization of modified dollar variables :
2324 #[ Helper functions :
2325 #[ dollarlen :
2326*/
2327
2331static inline LONG dollarlen(const WORD *terms)
2332{
2333 const WORD *p = terms;
2334 while ( *p ) p += *p;
2335 return p - terms; /* Not including the null terminator. */
2336}
2337
2338/*
2339 #] dollarlen :
2340 #[ dollar_mod_type :
2341*/
2342
2347static inline WORD dollar_mod_type(WORD index)
2348{
2349 int i;
2350 for ( i = 0; i < NumModOptdollars; i++ )
2351 if ( ModOptdollars[i].number == index ) break;
2352 if ( i >= NumModOptdollars ) return -1;
2353 return ModOptdollars[i].type;
2354}
2355
2356
2357/*
2358 #] dollar_mod_type :
2359 #] Helper functions :
2360 #[ PF_CollectModifiedDollars :
2361*/
2362
2363/*
2364 #[ dollar_to_be_collected :
2365*/
2366
2371static inline int dollar_to_be_collected(WORD index)
2372{
2373 switch ( dollar_mod_type(index) ) {
2374 case MODSUM:
2375 case MODMAX:
2376 case MODMIN:
2377 return 1;
2378 default:
2379 return 0;
2380 }
2381}
2382
2383/*
2384 #] dollar_to_be_collected :
2385 #[ copy_dollar :
2386*/
2387
2392static inline void copy_dollar(WORD index, WORD type, const WORD *where, LONG size)
2393{
2394 DOLLARS d = Dollars + index;
2395
2396 CleanDollarFactors(d);
2397
2398 if ( type != DOLZERO && where != NULL && where != &AM.dollarzero && where[0] != 0 && size > 0 ) {
2399 if ( size > d->size || size < d->size / 4 ) { /* Reallocate if not enough or too much. */
2400 if ( d->where && d->where != &AM.dollarzero )
2401 M_free(d->where, "old content of dollar");
2402 d->where = Malloc1(sizeof(WORD) * size, "copy buffer to dollar");
2403 d->size = size;
2404 }
2405 d->type = type;
2406 WCOPY(d->where, where, size);
2407 }
2408 else {
2409 if ( d->where && d->where != &AM.dollarzero )
2410 M_free(d->where, "old content of dollar");
2411 d->type = DOLZERO;
2412 d->where = &AM.dollarzero;
2413 d->size = 0;
2414 }
2415}
2416
2417/*
2418 #] copy_dollar :
2419 #[ compare_two_expressions :
2420*/
2421
2426static inline int compare_two_expressions(const WORD *e1, const WORD *e2)
2427{
2428 GETIDENTITY
2429 /*
2430 * We consider the cases that
2431 * (1) the expression has no term,
2432 * (2) the expression has only one term and it is a number,
2433 * (3) otherwise.
2434 * Assume that the expressions are sorted and all terms are normalized.
2435 * The numerators of the coefficients must never be zero.
2436 *
2437 * Note that TwoExprCompare() is not adequate for our purpose
2438 * (as of 6 Aug. 2013), e.g., TwoExprCompare({0}, {4, 1, 1, -1}, LESS)
2439 * returns TRUE.
2440 */
2441 if ( e1[0] == 0 ) {
2442 if ( e2[0] == 0 ) {
2443 return(0);
2444 }
2445 else if ( e2[e2[0]] == 0 && e2[0] == ABS(e2[e2[0] - 1]) + 1 ) {
2446 if ( e2[e2[0] - 1] > 0 )
2447 return(-1);
2448 else
2449 return(+1);
2450 }
2451 }
2452 else if ( e1[e1[0]] == 0 && e1[0] == ABS(e1[e1[0] - 1]) + 1 ) {
2453 if ( e2[0] == 0 ) {
2454 if ( e1[e1[0] - 1] > 0 )
2455 return(+1);
2456 else
2457 return(-1);
2458 }
2459 else if ( e2[e2[0]] == 0 && e2[0] == ABS(e2[e2[0] - 1]) + 1 ) {
2460 return(CompCoef((WORD *)e1, (WORD *)e2));
2461 }
2462 }
2463 /* The expressions are not so simple. Define the order by each term. */
2464 while ( e1[0] && e2[0] ) {
2465 int c = CompareTerms(BHEAD (WORD *)e1, (WORD *)e2, 1);
2466 if ( c < 0 )
2467 return(-1);
2468 else if ( c > 0 )
2469 return(+1);
2470 e1 += e1[0];
2471 e2 += e2[0];
2472 }
2473 if ( e1[0] ) return(+1);
2474 if ( e2[0] ) return(-1);
2475 return(0);
2476}
2477
2478/*
2479 #] compare_two_expressions :
2480 #[ Variables :
2481*/
2482
2483typedef struct {
2484 VectorStruct(WORD) buf;
2485 LONG size;
2486 WORD type;
2487} dollar_buf;
2488
2489/* Buffers used to store data for each variable from each slave. */
2490static Vector(dollar_buf, dollar_slave_bufs);
2491
2492/*
2493 #] Variables :
2494*/
2495
2510{
2511 int i, j, ndollars;
2512 /*
2513 * If the current module was executed in the sequential mode,
2514 * there are no modified module on the slaves.
2515 */
2516 if ( AC.mparallelflag != PARALLELFLAG && !AC.partodoflag ) return 0;
2517 /*
2518 * Count the number of (potentially) modified dollar variables, which we need to collect.
2519 * Here we need to collect all max/min/sum variables.
2520 */
2521 ndollars = 0;
2522 for ( i = 0; i < NumPotModdollars; i++ ) {
2523 WORD index = PotModdollars[i];
2524 if ( dollar_to_be_collected(index) ) ndollars++;
2525 }
2526 if ( ndollars == 0 ) return 0; /* No dollars to be collected. */
2527
2528 if ( PF.me == MASTER ) {
2529/*
2530 #[ Master :
2531*/
2532 int nslaves, nvars;
2533 /* Prepare receive buffers. We need ndollars*(PF.numtasks-1) buffers. */
2534 int nbufs = ndollars * (PF.numtasks - 1);
2535 VectorReserve(dollar_slave_bufs, nbufs);
2536 for ( i = VectorSize(dollar_slave_bufs); i < nbufs; i++ ) {
2537 VectorInit(VectorPtr(dollar_slave_bufs)[i].buf);
2538 }
2539 VectorSize(dollar_slave_bufs) = nbufs;
2540 /* Receive data from each slave. */
2541 for ( nslaves = 1; nslaves < PF.numtasks; nslaves++ ) {
2542 int src;
2543 PF_LongSingleReceive(PF_ANY_SOURCE, PF_DOLLAR_MSGTAG, &src, NULL);
2544 nvars = 0;
2545 for ( i = 0; i < NumPotModdollars; i++ ) {
2546 WORD index = PotModdollars[i];
2547 dollar_buf *b;
2548 if ( !dollar_to_be_collected(index) ) continue;
2549 b = &VectorPtr(dollar_slave_bufs)[(PF.numtasks - 1) * nvars + (src - 1)];
2550 PF_LongSingleUnpack(&b->type, 1, PF_WORD);
2551 if ( b->type != DOLZERO ) {
2552 LONG size;
2553 WORD *where;
2554 PF_LongSingleUnpack(&size, 1, PF_LONG);
2555 VectorReserve(b->buf, size + 1);
2556 where = VectorPtr(b->buf);
2557 PF_LongSingleUnpack(where, size, PF_WORD);
2558 where[size] = 0; /* The null terminator is needed. */
2559 b->size = size + 1; /* Including the null terminator. */
2560 /* Note that we don't collect factored stuff for max/min/sum variables. */
2561 }
2562 else {
2563 VectorReserve(b->buf, 1);
2564 VectorPtr(b->buf)[0] = 0;
2565 b->size = 0;
2566 }
2567 nvars++;
2568 }
2569 }
2570 /*
2571 * Combine received dollars. The FORM reference manual says maximum/minimum/sum
2572 * $-variables must have a numerical value, however, this routine should work also
2573 * for non-numerical cases, although the maximum/minimum value for non-numerical
2574 * terms has ambiguity.
2575 */
2576 nvars = 0;
2577 for ( i = 0; i < NumPotModdollars; i++ ) {
2578 WORD index = PotModdollars[i];
2579 WORD dtype;
2580 DOLLARS d;
2581 dollar_buf *b;
2582 if ( !dollar_to_be_collected(index) ) continue;
2583 d = Dollars + index;
2584 b = &VectorPtr(dollar_slave_bufs)[(PF.numtasks - 1) * nvars];
2585 dtype = dollar_mod_type(index);
2586 switch ( dtype ) {
2587 case MODMAX:
2588 case MODMIN: {
2589/*
2590 #[ MODMAX & MODMIN :
2591*/
2592 int selected = 0;
2593 for ( j = 1; j < PF.numtasks - 1; j++ ) {
2594 int c = compare_two_expressions(VectorPtr(b[j].buf), VectorPtr(b[selected].buf));
2595 if ( (dtype == MODMAX && c > 0) || (dtype == MODMIN && c < 0) )
2596 selected = j;
2597 }
2598 b = b + selected;
2599 copy_dollar(index, b->type, VectorPtr(b->buf), b->size);
2600/*
2601 #] MODMAX & MODMIN :
2602*/
2603 break;
2604 }
2605 case MODSUM: {
2606/*
2607 #[ MODSUM :
2608*/
2609 GETIDENTITY
2610 int err = 0;
2611
2612 CBUF *C = cbuf + AM.rbufnum;
2613 WORD *oldwork = AT.WorkPointer, *oldcterm = AN.cTerm;
2614 WORD olddefer = AR.DeferFlag, oldnumlhs = AR.Cnumlhs, oldnumrhs = C->numrhs;
2615
2616 LONG size;
2617 WORD type, *dbuf;
2618
2619 AN.cTerm = 0;
2620 AR.DeferFlag = 0;
2621
2622 if ( ((WORD *)((UBYTE *)AT.WorkPointer + AM.MaxTer)) > AT.WorkTop ) {
2623 err = -1;
2624 goto cleanup;
2625 MesWork();
2626 }
2627
2628 if ( NewSort(BHEAD0) ) {
2629 err = -1;
2630 goto cleanup;
2631 }
2632 if ( NewSort(BHEAD0) ) {
2634 err = -1;
2635 goto cleanup;
2636 }
2637
2638 /*
2639 * Sum up the original $-variable in the master and $-variables on all slaves.
2640 * Note that $-variables on the slaves are set to zero at the beginning of
2641 * the module (See also DoExecute()).
2642 */
2643 for ( j = 0; j < PF.numtasks; j++ ) {
2644 const WORD *r;
2645 for ( r = j == 0 ? Dollars[index].where : VectorPtr(b[j - 1].buf); *r; r += *r ) {
2646 WCOPY(AT.WorkPointer, r, *r);
2647 AT.WorkPointer += *r;
2648 AR.Cnumlhs = 0;
2649 if ( Generator(BHEAD oldwork, 0) ) {
2651 err = -1;
2652 goto cleanup;
2653 }
2654 AT.WorkPointer = oldwork;
2655 }
2656 }
2657
2658 size = EndSort(BHEAD (WORD *)&dbuf, 2);
2659 if ( size < 0 ) {
2661 err = -1;
2662 goto cleanup;
2663 }
2665
2666 /* Find special cases. */
2667 type = DOLTERMS;
2668 if ( dbuf[0] == 0 ) {
2669 type = DOLZERO;
2670 }
2671 else if ( dbuf[dbuf[0]] == 0 ) {
2672 const WORD *t = dbuf, *w;
2673 WORD n, nsize;
2674 n = *t;
2675 nsize = t[n - 1];
2676 if ( nsize < 0 ) nsize = -nsize;
2677 if ( nsize == n - 1 ) {
2678 nsize = (nsize - 1) / 2;
2679 w = t + 1 + nsize;
2680 if ( *w == 1 ) {
2681 w++; while ( w < t + n - 1 ) { if ( *w ) break; w++; }
2682 if ( w >= t + n - 1 ) type = DOLNUMBER;
2683 }
2684 else if ( n == 7 && t[6] == 3 && t[5] == 1 && t[4] == 1 && t[1] == INDEX && t[2] == 3 ) {
2685 type = DOLINDEX;
2686 d->index = t[3];
2687 }
2688 }
2689 }
2690 copy_dollar(index, type, dbuf, dollarlen(dbuf) + 1);
2691 M_free(dbuf, "temporary dollar buffer");
2692cleanup:
2693 AR.Cnumlhs = oldnumlhs;
2694 C->numrhs = oldnumrhs;
2695 AR.DeferFlag = olddefer;
2696 AN.cTerm = oldcterm;
2697 AT.WorkPointer = oldwork;
2698
2699 if ( err ) return err;
2700/*
2701 #] MODSUM :
2702*/
2703 break;
2704 }
2705 }
2706 if ( d->type == DOLTERMS )
2707 cbuf[AM.dbufnum].CanCommu[index] = numcommute(d->where, &cbuf[AM.dbufnum].NumTerms[index]);
2708 cbuf[AM.dbufnum].rhs[index] = d->where;
2709 nvars++;
2710#ifdef PF_DEBUG_REDUCE_DOLLAR
2711 MesPrint("<< Reduce $-var: %s", AC.dollarnames->namebuffer + d->name);
2712#endif
2713 }
2714/*
2715 #] Master :
2716*/
2717 }
2718 else {
2719/*
2720 #[ Slave :
2721*/
2723 /* Pack each variable. */
2724 for ( i = 0; i < NumPotModdollars; i++ ) {
2725 WORD index = PotModdollars[i];
2726 DOLLARS d;
2727 if ( !dollar_to_be_collected(index) ) continue;
2728 d = Dollars + index;
2729 PF_LongSinglePack(&d->type, 1, PF_WORD);
2730 if ( d->type != DOLZERO ) {
2731 /*
2732 * NOTE: d->size is the allocated buffer size for d->where in WORDs.
2733 * So dollarlen(d->where) can be < d->size-1. (TU 15 Dec 2011)
2734 */
2735 LONG size = dollarlen(d->where);
2736 PF_LongSinglePack(&size, 1, PF_LONG);
2737 PF_LongSinglePack(d->where, size, PF_WORD);
2738 /* Note that we don't collect factored stuff for max/min/sum variables. */
2739 }
2740 }
2741 PF_LongSingleSend(MASTER, PF_DOLLAR_MSGTAG);
2742/*
2743 #] Slave :
2744*/
2745 }
2746 return 0;
2747}
2748
2749/*
2750 #] PF_CollectModifiedDollars :
2751 #[ PF_BroadcastModifiedDollars :
2752*/
2753
2754/*
2755 #[ dollar_to_be_broadcast :
2756*/
2757
2762static inline int dollar_to_be_broadcast(WORD index)
2763{
2764 switch ( dollar_mod_type(index) ) {
2765 case MODLOCAL:
2766 return 0;
2767 default:
2768 return 1;
2769 }
2770}
2771
2772/*
2773 #] dollar_to_be_broadcast :
2774*/
2775
2789{
2790 int i, j, ndollars;
2791 /*
2792 * Count the number of (potentially) modified dollar variables, which we need to broadcast.
2793 * Here we need to broadcast all non-local variables.
2794 */
2795 ndollars = 0;
2796 for ( i = 0; i < NumPotModdollars; i++ ) {
2797 WORD index = PotModdollars[i];
2798 if ( dollar_to_be_broadcast(index) ) ndollars++;
2799 }
2800 if ( ndollars == 0 ) return 0; /* No dollars to be broadcast. */
2801
2802 if ( PF.me == MASTER ) {
2803/*
2804 #[ Master :
2805*/
2807 /* Pack each variable. */
2808 for ( i = 0; i < NumPotModdollars; i++ ) {
2809 WORD index = PotModdollars[i];
2810 DOLLARS d;
2811 if ( !dollar_to_be_broadcast(index) ) continue;
2812 d = Dollars + index;
2813 PF_LongMultiPack(&d->type, 1, PF_WORD);
2814 if ( d->type != DOLZERO ) {
2815 /*
2816 * NOTE: d->size is the allocated buffer size for d->where in WORDs.
2817 * So dollarlen(d->where) can be < d->size-1. (TU 15 Dec 2011)
2818 */
2819 LONG size = dollarlen(d->where);
2820 PF_LongMultiPack(&size, 1, PF_LONG);
2821 PF_LongMultiPack(d->where, size, PF_WORD);
2822 /* ...and the factored stuff. */
2823 PF_LongMultiPack(&d->nfactors, 1, PF_WORD);
2824 if ( d->nfactors > 1 ) {
2825 for ( j = 0; j < d->nfactors; j++ ) {
2826 FACDOLLAR *f = &d->factors[j];
2827 PF_LongMultiPack(&f->type, 1, PF_WORD);
2828 PF_LongMultiPack(&f->size, 1, PF_LONG);
2829 if ( f->size > 0 )
2830 PF_LongMultiPack(f->where, f->size, PF_WORD);
2831 else
2832 PF_LongMultiPack(&f->value, 1, PF_WORD);
2833 }
2834 }
2835 }
2836#ifdef PF_DEBUG_BCAST_DOLLAR
2837 MesPrint(">> Broadcast $-var: %s", AC.dollarnames->namebuffer + d->name);
2838#endif
2839 }
2840/*
2841 #] Master :
2842*/
2843 }
2844 if ( PF_LongMultiBroadcast() ) return -1;
2845 if ( PF.me != MASTER ) {
2846/*
2847 #[ Slave :
2848*/
2849 for ( i = 0; i < NumPotModdollars; i++ ) {
2850 WORD index = PotModdollars[i];
2851 DOLLARS d;
2852 if ( !dollar_to_be_broadcast(index) ) continue;
2853 d = Dollars + index;
2854 /* Clear the contents of the dollar variable. */
2855 if ( d->where && d->where != &AM.dollarzero )
2856 M_free(d->where, "old content of dollar");
2857 d->where = &AM.dollarzero;
2858 d->size = 0;
2859 CleanDollarFactors(d);
2860 /* Unpack and store the contents. */
2861 PF_LongMultiUnpack(&d->type, 1, PF_WORD);
2862 if ( d->type != DOLZERO ) {
2863 LONG size;
2864 PF_LongMultiUnpack(&size, 1, PF_LONG);
2865 d->size = size + 1;
2866 d->where = (WORD *)Malloc1(sizeof(WORD) * d->size, "dollar content");
2867 PF_LongMultiUnpack(d->where, size, PF_WORD);
2868 d->where[size] = 0; /* The null terminator is needed. */
2869 /* ...and the factored stuff. */
2870 PF_LongMultiUnpack(&d->nfactors, 1, PF_WORD);
2871 if ( d->nfactors > 1 ) {
2872 d->factors = (FACDOLLAR *)Malloc1(sizeof(FACDOLLAR) * d->nfactors, "dollar factored stuff");
2873 for ( j = 0; j < d->nfactors; j++ ) {
2874 FACDOLLAR *f = &d->factors[j];
2875 PF_LongMultiUnpack(&f->type, 1, PF_WORD);
2876 PF_LongMultiUnpack(&f->size, 1, PF_LONG);
2877 if ( f->size > 0 ) {
2878 f->where = (WORD *)Malloc1(sizeof(WORD) * (f->size + 1), "dollar factor content");
2879 PF_LongMultiUnpack(f->where, f->size, PF_WORD);
2880 f->where[f->size] = 0; /* The null terminator is needed. */
2881 f->value = 0;
2882 }
2883 else {
2884 f->where = NULL;
2885 PF_LongMultiUnpack(&f->value, 1, PF_WORD);
2886 }
2887 }
2888 }
2889 }
2890 if ( d->type == DOLTERMS )
2891 cbuf[AM.dbufnum].CanCommu[index] = numcommute(d->where, &cbuf[AM.dbufnum].NumTerms[index]);
2892 cbuf[AM.dbufnum].rhs[index] = d->where;
2893 }
2894/*
2895 #] Slave :
2896*/
2897 }
2898 return 0;
2899}
2900
2901/*
2902 #] PF_BroadcastModifiedDollars :
2903 #] Synchronization of modified dollar variables :
2904 #[ Synchronization of redefined preprocessor variables :
2905 #[ Variables :
2906*/
2907
2908/* A buffer used in receivers. */
2909static Vector(UBYTE, prevarbuf);
2910
2911/*
2912 #] Variables :
2913 #[ PF_PackRedefinedPreVars :
2914*/
2915
2924static void PF_PackRedefinedPreVars(void)
2925{
2926 int i;
2927 /* First, pack the number of redefined preprocessor variables. */
2928 int nredefs = 0;
2929 for ( i = 0; i < AC.numpfirstnum; i++ )
2930 if ( AC.inputnumbers[i] >= 0 ) nredefs++;
2931 PF_LongSinglePack(&nredefs, 1, PF_INT);
2932 /* Then, pack each variable. */
2933 for ( i = 0; i < AC.numpfirstnum; i++ )
2934 if ( AC.inputnumbers[i] >= 0) {
2935 WORD index = AC.pfirstnum[i];
2936 UBYTE *value = PreVar[index].value;
2937 int bytes = strlen((char *)value);
2938 PF_LongSinglePack(&index, 1, PF_WORD);
2939 PF_LongSinglePack(&bytes, 1, PF_INT);
2940 PF_LongSinglePack(value, bytes, PF_BYTE);
2941 PF_LongSinglePack(&AC.inputnumbers[i], 1, PF_LONG);
2942 }
2943}
2944
2945/*
2946 #] PF_PackRedefinedPreVars :
2947 #[ PF_UnpackRedefinedPreVars :
2948*/
2949
2959static void PF_UnpackRedefinedPreVars(void)
2960{
2961 int i, j;
2962 /* Unpack the number of redefined preprocessor variables. */
2963 int nredefs;
2964 PF_LongSingleUnpack(&nredefs, 1, PF_INT);
2965 if ( nredefs > 0 ) {
2966 /* Then unpack each variable. */
2967 for ( i = 0; i < nredefs; i++ ) {
2968 WORD index;
2969 int bytes;
2970 UBYTE *value;
2971 LONG inputnumber;
2972 PF_LongSingleUnpack(&index, 1, PF_WORD);
2973 PF_LongSingleUnpack(&bytes, 1, PF_INT);
2974 VectorReserve(prevarbuf, bytes + 1);
2975 value = VectorPtr(prevarbuf);
2976 PF_LongSingleUnpack(value, bytes, PF_BYTE);
2977 value[bytes] = '\0'; /* The null terminator is needed. */
2978 PF_LongSingleUnpack(&inputnumber, 1, PF_LONG);
2979 /* Put this variable if it must be updated. */
2980 for ( j = 0; j < AC.numpfirstnum; j++ )
2981 if ( AC.pfirstnum[j] == index ) break;
2982 if ( AC.inputnumbers[j] < inputnumber ) {
2983 AC.inputnumbers[j] = inputnumber;
2984 PutPreVar(PreVar[index].name, value, NULL, 1);
2985 }
2986 }
2987 }
2988}
2989
2990/*
2991 #] PF_UnpackRedefinedPreVars :
2992 #[ PF_BroadcastRedefinedPreVars :
2993*/
2994
3006{
3007 /*
3008 * NOTE: Because the compilation is performed on the all processes
3009 * independently on AC.mparallelflag, we always have to broadcast redefined
3010 * preprocessor variables from the master to the all slaves.
3011 */
3012 if ( PF.me == MASTER ) {
3013/*
3014 #[ Master :
3015*/
3016 int i, nredefs;
3018 /* First, pack the number of redefined preprocessor variables. */
3019 nredefs = 0;
3020 for ( i = 0; i < AC.numpfirstnum; i++ )
3021 if ( AC.inputnumbers[i] >= 0 ) nredefs++;
3022 PF_LongMultiPack(&nredefs, 1, PF_INT);
3023 /* Then, pack each variable. */
3024 for ( i = 0; i < AC.numpfirstnum; i++ )
3025 if ( AC.inputnumbers[i] >= 0) {
3026 WORD index = AC.pfirstnum[i];
3027 UBYTE *value = PreVar[index].value;
3028 int bytes = strlen((char *)value);
3029 PF_LongMultiPack(&index, 1, PF_WORD);
3030 PF_LongMultiPack(&bytes, 1, PF_INT);
3031 PF_LongMultiPack(value, bytes, PF_BYTE);
3032#ifdef PF_DEBUG_BCAST_PREVAR
3033 MesPrint(">> Broadcast PreVar: %s = \"%s\"", PreVar[index].name, value);
3034#endif
3035 }
3036/*
3037 #] Master :
3038*/
3039 }
3040 if ( PF_LongMultiBroadcast() ) return -1;
3041 if ( PF.me != MASTER ) {
3042/*
3043 #[ Slave :
3044*/
3045 int i, nredefs;
3046 /* Unpack the number of redefined preprocessor variables. */
3047 PF_LongMultiUnpack(&nredefs, 1, PF_INT);
3048 if ( nredefs > 0 ) {
3049 /* Then unpack each variable and put it. */
3050 for ( i = 0; i < nredefs; i++ ) {
3051 WORD index;
3052 int bytes;
3053 UBYTE *value;
3054 PF_LongMultiUnpack(&index, 1, PF_WORD);
3055 PF_LongMultiUnpack(&bytes, 1, PF_INT);
3056 VectorReserve(prevarbuf, bytes + 1);
3057 value = VectorPtr(prevarbuf);
3058 PF_LongMultiUnpack(value, bytes, PF_BYTE);
3059 value[bytes] = '\0'; /* The null terminator is needed. */
3060 PutPreVar(PreVar[index].name, value, NULL, 1);
3061 }
3062 }
3063/*
3064 #] Slave :
3065*/
3066 }
3067 return 0;
3068}
3069
3070/*
3071 #] PF_BroadcastRedefinedPreVars :
3072 #] Synchronization of redefined preprocessor variables :
3073 #[ Preprocessor Inside instruction :
3074 #[ Variables :
3075*/
3076
3077/* Saved values of AC.RhsExprInModuleFlag, PotModdollars and AC.pfirstnum. */
3078static WORD oldRhsExprInModuleFlag;
3079static Vector(WORD, oldPotModdollars);
3080static Vector(WORD, oldpfirstnum);
3081
3082/*
3083 #] Variables :
3084 #[ PF_StoreInsideInfo :
3085*/
3086
3087/*
3088 * Saves the current values of AC.RhsExprInModuleFlag, PotModdollars
3089 * and AC.pfirstnum.
3090 *
3091 * Called by DoInside().
3092 *
3093 * @return 0 if OK, nonzero on error.
3094 */
3095int PF_StoreInsideInfo(void)
3096{
3097 int i;
3098 oldRhsExprInModuleFlag = AC.RhsExprInModuleFlag;
3099 VectorClear(oldPotModdollars);
3100 for ( i = 0; i < NumPotModdollars; i++ )
3101 VectorPushBack(oldPotModdollars, PotModdollars[i]);
3102 VectorClear(oldpfirstnum);
3103 for ( i = 0; i < AC.numpfirstnum; i++ )
3104 VectorPushBack(oldpfirstnum, AC.pfirstnum[i]);
3105 return 0;
3106}
3107
3108/*
3109 #] PF_StoreInsideInfo :
3110 #[ PF_RestoreInsideInfo :
3111*/
3112
3113/*
3114 * Restores the saved values of AC.RhsExprInModuleFlag, PotModdollars
3115 * and AC.pfirstnum.
3116 *
3117 * Called by DoEndInside().
3118 *
3119 * @return 0 if OK, nonzero on error.
3120 */
3121int PF_RestoreInsideInfo(void)
3122{
3123 int i;
3124 AC.RhsExprInModuleFlag = oldRhsExprInModuleFlag;
3125 NumPotModdollars = VectorSize(oldPotModdollars);
3126 for ( i = 0; i < NumPotModdollars; i++ )
3127 PotModdollars[i] = VectorPtr(oldPotModdollars)[i];
3128 AC.numpfirstnum = VectorSize(oldpfirstnum);
3129 for ( i = 0; i < AC.numpfirstnum; i++ )
3130 AC.pfirstnum[i] = VectorPtr(oldpfirstnum)[i];
3131 return 0;
3132}
3133
3134/*
3135 #] PF_RestoreInsideInfo :
3136 #] Preprocessor Inside instruction :
3137 #[ PF_BroadcastCBuf :
3138*/
3139
3147int PF_BroadcastCBuf(int bufnum)
3148{
3149 CBUF *C = cbuf + bufnum;
3150 int i;
3151 LONG l;
3152 if ( PF.me == MASTER ) {
3153/*
3154 #[ Master :
3155*/
3157 /* Pack CBUF struct except pointers. */
3158 PF_LongMultiPack(&C->BufferSize, 1, PF_LONG);
3159 PF_LongMultiPack(&C->numlhs, 1, PF_INT);
3160 PF_LongMultiPack(&C->numrhs, 1, PF_INT);
3161 PF_LongMultiPack(&C->maxlhs, 1, PF_INT);
3162 PF_LongMultiPack(&C->maxrhs, 1, PF_INT);
3163 PF_LongMultiPack(&C->mnumlhs, 1, PF_INT);
3164 PF_LongMultiPack(&C->mnumrhs, 1, PF_INT);
3165 PF_LongMultiPack(&C->numtree, 1, PF_INT);
3166 PF_LongMultiPack(&C->rootnum, 1, PF_INT);
3167 PF_LongMultiPack(&C->MaxTreeSize, 1, PF_INT);
3168 /* Now pointers. Pointer, lhs and rhs are packed as offsets. We don't pack Top. */
3169 l = C->Pointer - C->Buffer;
3170 PF_LongMultiPack(&l, 1, PF_LONG);
3171 PF_LongMultiPack(C->Buffer, l, PF_WORD);
3172 for ( i = 0; i < C->numlhs + 1; i++ ) {
3173 l = C->lhs[i] - C->Buffer;
3174 PF_LongMultiPack(&l, 1, PF_LONG);
3175 }
3176 for ( i = 0; i < C->numrhs + 1; i++ ) {
3177 l = C->rhs[i] - C->Buffer;
3178 PF_LongMultiPack(&l, 1, PF_LONG);
3179 }
3180 PF_LongMultiPack(C->CanCommu, C->numrhs + 1, PF_LONG);
3181 PF_LongMultiPack(C->NumTerms, C->numrhs + 1, PF_LONG);
3182 PF_LongMultiPack(C->numdum, C->numrhs + 1, PF_WORD);
3183 PF_LongMultiPack(C->dimension, C->numrhs + 1, PF_WORD);
3184 if ( C->MaxTreeSize > 0 )
3185 PF_LongMultiPack(C->boomlijst, (C->numtree + 1) * (sizeof(COMPTREE) / sizeof(int)), PF_INT);
3186#ifdef PF_DEBUG_BCAST_CBUF
3187 MesPrint(">> Broadcast CBuf %d", bufnum);
3188#endif
3189/*
3190 #] Master :
3191*/
3192 }
3193 if ( PF_LongMultiBroadcast() ) return -1;
3194 if ( PF.me != MASTER ) {
3195/*
3196 #[ Slave :
3197*/
3198 /* First, free already allocated buffers. */
3199 finishcbuf(bufnum);
3200 /* Unpack CBUF struct except pointers. */
3201 PF_LongMultiUnpack(&C->BufferSize, 1, PF_LONG);
3202 PF_LongMultiUnpack(&C->numlhs, 1, PF_INT);
3203 PF_LongMultiUnpack(&C->numrhs, 1, PF_INT);
3204 PF_LongMultiUnpack(&C->maxlhs, 1, PF_INT);
3205 PF_LongMultiUnpack(&C->maxrhs, 1, PF_INT);
3206 PF_LongMultiUnpack(&C->mnumlhs, 1, PF_INT);
3207 PF_LongMultiUnpack(&C->mnumrhs, 1, PF_INT);
3208 PF_LongMultiUnpack(&C->numtree, 1, PF_INT);
3209 PF_LongMultiUnpack(&C->rootnum, 1, PF_INT);
3210 PF_LongMultiUnpack(&C->MaxTreeSize, 1, PF_INT);
3211 /* Allocate new buffers. */
3212 C->Buffer = (WORD *)Malloc1(C->BufferSize * sizeof(WORD), "compiler buffer");
3213 C->Top = C->Buffer + C->BufferSize;
3214 C->lhs = (WORD **)Malloc1(C->maxlhs * sizeof(WORD *), "compiler buffer");
3215 C->rhs = (WORD **)Malloc1(C->maxrhs * (sizeof(WORD *) + 2 * sizeof(LONG) + 2 * sizeof(WORD)), "compiler buffer");
3216 C->CanCommu = (LONG *)(C->rhs + C->maxrhs);
3217 C->NumTerms = C->CanCommu + C->maxrhs;
3218 C->numdum = (WORD *)(C->NumTerms + C->maxrhs);
3219 C->dimension = C->numdum + C->maxrhs;
3220 if ( C->MaxTreeSize > 0 )
3221 C->boomlijst = (COMPTREE *)Malloc1(C->MaxTreeSize * sizeof(COMPTREE), "compiler buffer");
3222 /* Unpack buffers. */
3223 PF_LongMultiUnpack(&l, 1, PF_LONG);
3224 PF_LongMultiUnpack(C->Buffer, l, PF_WORD);
3225 C->Pointer = C->Buffer + l;
3226 for ( i = 0; i < C->numlhs + 1; i++ ) {
3227 PF_LongMultiUnpack(&l, 1, PF_LONG);
3228 C->lhs[i] = C->Buffer + l;
3229 }
3230 for ( i = 0; i < C->numrhs + 1; i++ ) {
3231 PF_LongMultiUnpack(&l, 1, PF_LONG);
3232 C->rhs[i] = C->Buffer + l;
3233 }
3234 PF_LongMultiUnpack(C->CanCommu, C->numrhs + 1, PF_LONG);
3235 PF_LongMultiUnpack(C->NumTerms, C->numrhs + 1, PF_LONG);
3236 PF_LongMultiUnpack(C->numdum, C->numrhs + 1, PF_WORD);
3237 PF_LongMultiUnpack(C->dimension, C->numrhs + 1, PF_WORD);
3238 if ( C->MaxTreeSize > 0 )
3239 PF_LongMultiUnpack(C->boomlijst, (C->numtree + 1) * (sizeof(COMPTREE) / sizeof(int)), PF_INT);
3240/*
3241 #] Slave :
3242*/
3243 }
3244 return 0;
3245}
3246
3247/*
3248 #] PF_BroadcastCBuf :
3249 #[ PF_BroadcastExpFlags :
3250*/
3251
3259{
3260 WORD i;
3261 EXPRESSIONS e;
3262 if ( PF.me == MASTER ) {
3263/*
3264 #[ Master :
3265*/
3267 PF_LongMultiPack(&AR.expflags, 1, PF_WORD);
3268 for ( i = 0; i < NumExpressions; i++ ) {
3269 e = &Expressions[i];
3270 PF_LongMultiPack(&e->counter, 1, PF_WORD);
3271 PF_LongMultiPack(&e->vflags, 1, PF_WORD);
3272 PF_LongMultiPack(&e->uflags, 1, PF_WORD);
3273 PF_LongMultiPack(&e->numdummies, 1, PF_WORD);
3274 PF_LongMultiPack(&e->numfactors, 1, PF_WORD);
3275#ifdef PF_DEBUG_BCAST_EXPRFLAGS
3276 MesPrint(">> Broadcast ExprFlags: %s", AC.exprnames->namebuffer + e->name);
3277#endif
3278 }
3279/*
3280 #] Master :
3281*/
3282 }
3283 if ( PF_LongMultiBroadcast() ) return -1;
3284 if ( PF.me != MASTER ) {
3285/*
3286 #[ Slave :
3287*/
3288 PF_LongMultiUnpack(&AR.expflags, 1, PF_WORD);
3289 for ( i = 0; i < NumExpressions; i++ ) {
3290 e = &Expressions[i];
3291 PF_LongMultiUnpack(&e->counter, 1, PF_WORD);
3292 PF_LongMultiUnpack(&e->vflags, 1, PF_WORD);
3293 PF_LongMultiUnpack(&e->uflags, 1, PF_WORD);
3294 PF_LongMultiUnpack(&e->numdummies, 1, PF_WORD);
3295 PF_LongMultiUnpack(&e->numfactors, 1, PF_WORD);
3296 }
3297/*
3298 #] Slave :
3299*/
3300 }
3301 return 0;
3302}
3303
3304/*
3305 #] PF_BroadcastExpFlags :
3306 #[ PF_SetScratch :
3307*/
3308
3315static void PF_SetScratch(FILEHANDLE *f,POSITION *position)
3316{
3317 if(
3318 ( f->handle >= 0) && ISGEPOS(*position,f->POposition) &&
3319 ( ISGEPOSINC(*position,f->POposition,(f->POfull-f->PObuffer)*sizeof(WORD)) ==0 )
3320 )/*position is inside the buffer! SetScratch() will do nothing.*/
3321 f->POfull=f->PObuffer;/*force SetScratch() to re-read the position from the beginning:*/
3322 SetScratch(f,position);
3323}
3324
3325/*
3326 #] PF_SetScratch :
3327 #[ PF_pushScratch :
3328*/
3329
3336static int PF_pushScratch(FILEHANDLE *f)
3337{
3338 LONG size,RetCode;
3339 if ( f->handle < 0){
3340 /*Create the file*/
3341 if ( ( RetCode = CreateFile(f->name) ) >= 0 ) {
3342 f->handle = (WORD)RetCode;
3343 PUTZERO(f->filesize);
3344 PUTZERO(f->POposition);
3345 }
3346 else{
3347 MesPrint("Cannot create scratch file %s",f->name);
3348 return(-1);
3349 }
3350 }/*if ( f->handle < 0)*/
3351 size = (f->POfill-f->PObuffer)*sizeof(WORD);
3352 if( size > 0 ){
3353 SeekFile(f->handle,&(f->POposition),SEEK_SET);
3354 if ( WriteFile(f->handle,(UBYTE *)(f->PObuffer),size) != size ){
3355 MesPrint("Error while writing to disk. Disk full?");
3356 return(-1);
3357 }
3358 ADDPOS(f->filesize,size);
3359 ADDPOS(f->POposition,size);
3360 f->POfill = f->POfull=f->PObuffer;
3361 }/*if( size > 0 )*/
3362 return(0);
3363}
3364
3365/*
3366 #] PF_pushScratch :
3367 #[ Broadcasting RHS expressions :
3368 #[ PF_WalkThroughExprMaster :
3369 Returns <=0 if the expression is ready, or dl+1;
3370*/
3371
3372static int PF_WalkThroughExprMaster(FILEHANDLE *curfile, int dl)
3373{
3374 LONG l=0;
3375 for(;;){
3376 if(curfile->POfull-curfile->POfill < dl){
3377 POSITION pos;
3378 SeekScratch(curfile,&pos);
3379 PF_SetScratch(curfile,&pos);
3380 }/*if(curfile->POfull-curfile->POfill < dl)*/
3381 curfile->POfill+=dl;
3382 l+=dl;
3383 if( l >= PF.exprbufsize){
3384 if( l == PF.exprbufsize){
3385 if( *(curfile->POfill) == 0)/*expression is ready*/
3386 return(0);
3387 }
3388 l-=PF.exprbufsize;
3389 curfile->POfill-=l;
3390 return l+1;
3391 }
3392
3393 dl=*(curfile->POfill);
3394 if(dl == 0)
3395 return l-PF.exprbufsize;
3396
3397 if(dl<0){/*compressed term*/
3398 if(curfile->POfull-curfile->POfill < 1){
3399 POSITION pos;
3400 SeekScratch(curfile,&pos);
3401 PF_SetScratch(curfile,&pos);
3402 }/*if(curfile->POfull-curfile->POfill < 1)*/
3403 dl=*(curfile->POfill+1)+2;
3404 }/*if(*(curfile->POfill)<0)*/
3405 }/*for(;;)*/
3406}
3407
3408/*
3409 #] PF_WalkThroughExprMaster :
3410 #[ PF_WalkThroughExprSlave :
3411 Returns <=0 if the expression is ready, or dl+1;
3412*/
3413
3414static int PF_WalkThroughExprSlave(FILEHANDLE *curfile, LONG *counter, int dl)
3415{
3416 LONG l=0;
3417 for(;;){
3418 if(curfile->POstop-curfile->POfill < dl){
3419 if(PF_pushScratch(curfile))
3420 return(-PF.exprbufsize-1);
3421 }
3422 curfile->POfill+=dl;
3423 curfile->POfull=curfile->POfill;
3424 l+=dl;
3425 if( l >= PF.exprbufsize){
3426 if( l == PF.exprbufsize){
3427 /*
3428 * This access is valid because PF.exprbufsize+1 WORDs are
3429 * broadcasted, this shortcut is not mandatory though. (TU 15 Sep 2011)
3430 */
3431 if( *(curfile->POfill) == 0)/*expression is ready*/
3432 return(0);
3433 }
3434 l-=PF.exprbufsize;
3435 curfile->POfill-=l;
3436 curfile->POfull=curfile->POfill;
3437 return l+1;
3438 }
3439
3440 dl=*(curfile->POfill);
3441 if(dl == 0)
3442 return l-PF.exprbufsize;
3443 (*counter)++;
3444 if(dl<0){/*compressed term*/
3445 if(curfile->POstop-curfile->POfill < 1){
3446 if(PF_pushScratch(curfile))
3447 return(-PF.exprbufsize-1);
3448 }
3449 /*
3450 * This access is always valid because PF.exprbufsize+1 WORDs are
3451 * broadcasted. (TU 15 Sep 2011)
3452 */
3453 dl=*(curfile->POfill+1)+2;
3454 }/*if(*(curfile->POfill)<0)*/
3455 }/*for(;;)*/
3456}
3457
3458/*
3459 #] PF_WalkThroughExprSlave :
3460 #[ PF_rhsBCastMaster :
3461*/
3462
3470static int PF_rhsBCastMaster(FILEHANDLE *curfile, EXPRESSIONS e)
3471{
3472 LONG l=1;/*PF_WalkThroughExpr returns length + 1*/
3473 SetScratch(curfile,&(e->onfile));
3474 do{
3475 /*
3476 * We need to broadcast PF.exprbufsize+1 WORDs because PF_WalkThroughExprSlave
3477 * may access to an additional 1 WORD. It is better to rewrite the routines
3478 * in such a way as to broadcast only PF.exprbufsize WORDs. (TU 15 Sep 2011)
3479 */
3480 if ( curfile->POfull - curfile->POfill < PF.exprbufsize + 1 ) {
3481 POSITION pos;
3482 SeekScratch(curfile,&pos);
3483 PF_SetScratch(curfile,&pos);
3484 }
3485 if ( PF_Bcast(curfile->POfill, (PF.exprbufsize + 1) * sizeof(WORD)) )
3486 return -1;
3487 l=PF_WalkThroughExprMaster(curfile,l-1);
3488 }while(l>0);
3489 if(l<0)/*The tail is extra, decrease POfill*/
3490 curfile->POfill-=l;
3491 return(0);
3492}
3493
3494/*
3495 #] PF_rhsBCastMaster :
3496 #[ PF_rhsBCastSlave :
3497*/
3498
3507static int PF_rhsBCastSlave(FILEHANDLE *curfile, EXPRESSIONS e)
3508{
3509 LONG l=1;/*PF_WalkThroughExpr returns length + 1*/
3510 LONG counter = 0;
3511 do{
3512 /*
3513 * We need to broadcast PF.exprbufsize+1 WORDs because PF_WalkThroughExprSlave
3514 * may access to an additional 1 WORD. It is better to rewrite the routines
3515 * in such a way as to broadcast only PF.exprbufsize WORDs. (TU 15 Sep 2011)
3516 */
3517 if ( curfile->POstop - curfile->POfill < PF.exprbufsize + 1 ) {
3518 if(PF_pushScratch(curfile))
3519 return(-1);
3520 }
3521 if ( PF_Bcast(curfile->POfill, (PF.exprbufsize + 1) * sizeof(WORD)) )
3522 return(-1);
3523 l = PF_WalkThroughExprSlave(curfile, &counter, l - 1);
3524 }while(l>0);
3525 if(l<0){/*The tail is extra, decrease POfill*/
3526 if(l<-PF.exprbufsize)/*error due to a PF_pushScratch() failure */
3527 return(-1);
3528 curfile->POfill-=l;
3529 }
3530 if ( curfile->handle >= 0 ) {
3531 if ( PF_pushScratch(curfile) ) return -1;
3532 }
3533 curfile->POfull=curfile->POfill;
3534 if ( curfile != AR.hidefile ) AR.InInBuf = curfile->POfull-curfile->PObuffer;
3535 else AR.InHiBuf = curfile->POfull-curfile->PObuffer;
3536 CHECK(counter == e->counter + 1); /* The first term is the prototype. */
3537 return(0);
3538}
3539
3540/*
3541 #] PF_rhsBCastSlave :
3542 #[ PF_BroadcastExpr :
3543*/
3544
3553{
3554 if ( PF.me == MASTER ) {
3555 if ( PF_rhsBCastMaster(file, e) ) return -1;
3556#ifdef PF_DEBUG_BCAST_RHSEXPR
3557 MesPrint(">> Broadcast RhsExpr: %s", AC.exprnames->namebuffer + e->name);
3558#endif
3559 }
3560 else {
3561 POSITION pos;
3562 SetEndHScratch(file, &pos);
3563 e->onfile = pos;
3564 if ( PF_rhsBCastSlave(file, e) ) return -1;
3565 }
3566 return 0;
3567}
3568
3569/*
3570 #] PF_BroadcastExpr :
3571 #[ PF_BroadcastRHS :
3572*/
3573
3581{
3582 int i;
3583 for ( i = 0; i < NumExpressions; i++ ) {
3584 EXPRESSIONS e = &Expressions[i];
3585 if ( !(e->vflags & ISINRHS) ) continue;
3586 switch ( e->status ) {
3587 case LOCALEXPRESSION:
3588 case SKIPLEXPRESSION:
3589 case DROPLEXPRESSION:
3590 case GLOBALEXPRESSION:
3591 case SKIPGEXPRESSION:
3592 case DROPGEXPRESSION:
3593 case HIDELEXPRESSION:
3594 case HIDEGEXPRESSION:
3595 case INTOHIDELEXPRESSION:
3596 case INTOHIDEGEXPRESSION:
3597 if ( PF_BroadcastExpr(e, AR.infile) ) return -1;
3598 break;
3599 case HIDDENLEXPRESSION:
3600 case HIDDENGEXPRESSION:
3601 case DROPHLEXPRESSION:
3602 case DROPHGEXPRESSION:
3603 case UNHIDELEXPRESSION:
3604 case UNHIDEGEXPRESSION:
3605 if ( PF_BroadcastExpr(e, AR.hidefile) ) return -1;
3606 break;
3607 }
3608 }
3609 if ( PF.me != MASTER )
3610 UpdatePositions();
3611 return 0;
3612}
3613
3614/*
3615 #] PF_BroadcastRHS :
3616 #] Broadcasting RHS expressions :
3617 #[ InParallel mode :
3618 #[ PF_InParallelProcessor :
3619*/
3620
3628{
3629 GETIDENTITY
3630 int i, next,tag;
3631 EXPRESSIONS e;
3632 /*
3633 * Skip expressions with zero terms. All the master and slaves need to
3634 * change the "partodo" flag.
3635 */
3636 if ( PF.numtasks >= 3 ) {
3637 for ( i = 0; i < NumExpressions; i++ ) {
3638 e = Expressions + i;
3639 if ( e->partodo > 0 && e->counter == 0 ) {
3640 e->partodo = 0;
3641 }
3642 }
3643 }
3644 PF_processing = 1;
3645 if(PF.me == MASTER){
3646 if ( PF.numtasks >= 3 ) {
3647 partodoexr = (WORD*)Malloc1(sizeof(WORD)*(PF.numtasks+1),"PF_InParallelProcessor");
3648 for ( i = 0; i < NumExpressions; i++ ) {
3649 e = Expressions+i;
3650 if ( e->partodo <= 0 ) continue;
3651 switch(e->status){
3652 case LOCALEXPRESSION:
3653 case GLOBALEXPRESSION:
3654 case UNHIDELEXPRESSION:
3655 case UNHIDEGEXPRESSION:
3656 case INTOHIDELEXPRESSION:
3657 case INTOHIDEGEXPRESSION:
3658 tag=PF_ANY_SOURCE;
3659 next=PF_Wait4SlaveIP(&tag);
3660 if(next<0)
3661 return(-1);
3662 if(tag == PF_DATA_MSGTAG){
3663 PF_Statistics(PF_stats,0);
3664 if(PF_Slave2MasterIP(next))
3665 return(-1);
3666 }
3667 if(PF_Master2SlaveIP(next,e))
3668 return(-1);
3669 partodoexr[next]=i;
3670 break;
3671 default:
3672 e->partodo = 0;
3673 continue;
3674 }/*switch(e->status)*/
3675 }/*for ( i = 0; i < NumExpressions; i++ )*/
3676 /*Here some slaves are working, other are waiting on PF_Send.
3677 Wait all of them.*/
3678 /*At this point no new slaves may be launched so PF_WaitAllSlaves()
3679 does not modify partodoexr[].*/
3680 if(PF_WaitAllSlaves())
3681 return(-1);
3682
3683 if ( AC.CollectFun ) AR.DeferFlag = 0;
3684 if(partodoexr){
3685 M_free(partodoexr,"PF_InParallelProcessor");
3686 partodoexr=NULL;
3687 }/*if(partodoexr)*/
3688 }/*if ( PF.numtasks >= 3 ) */
3689 else {
3690 for ( i = 0; i < NumExpressions; i++ ) {
3691 Expressions[i].partodo = 0;
3692 }
3693 }
3694 PF_PostEndSortBarrier();
3695 return(0);
3696 }/*if(PF.me == MASTER)*/
3697 /*Slave:*/
3698 if(PF_Wait4MasterIP(PF_EMPTY_MSGTAG))
3699 return(-1);
3700 /*master is ready to listen to me*/
3701 do{
3702 WORD *oldwork= AT.WorkPointer;
3703 tag=PF_ReadMaster();/*reads directly to its scratch!*/
3704 if(tag<0)
3705 return(-1);
3706 if(tag == PF_DATA_MSGTAG){
3707 oldwork = AT.WorkPointer;
3708
3709 /* For redefine statements. */
3710 if ( AC.numpfirstnum > 0 ) {
3711 int j;
3712 for ( j = 0; j < AC.numpfirstnum; j++ ) {
3713 AC.inputnumbers[j] = -1;
3714 }
3715 }
3716
3717 if(PF_DoOneExpr())/*the processor*/
3718 return(-1);
3719 if(PF_Wait4MasterIP(PF_DATA_MSGTAG))
3720 return(-1);
3721 if(PF_Slave2MasterIP(PF.me))/*both master and slave*/
3722 return(-1);
3723 AT.WorkPointer=oldwork;
3724 }/*if(tag == PF_DATA_MSGTAG)*/
3725 }while(tag!=PF_EMPTY_MSGTAG);
3726 PF.exprtodo=-1;
3727 PF_PostEndSortBarrier();
3728 return(0);
3729}/*PF_InParallelProcessor*/
3730
3731/*
3732 #] PF_InParallelProcessor :
3733 #[ PF_Wait4MasterIP :
3734*/
3735
3736static int PF_Wait4MasterIP(int tag)
3737{
3738 int follow = 0;
3739 LONG cpu,space = 0;
3740
3741 if(PF.log){
3742 fprintf(stderr,"[%d] Starting to send to Master\n",PF.me);
3743 fflush(stderr);
3744 }
3745
3747 cpu = TimeCPU(1);
3748 PF_Pack(&cpu ,1,PF_LONG);
3749 PF_Pack(&space ,1,PF_LONG);
3750 PF_Pack(&PF_linterms ,1,PF_LONG);
3751 PF_Pack(&(AM.S0->GenTerms) ,1,PF_LONG);
3752 PF_Pack(&(AM.S0->TermsLeft),1,PF_LONG);
3753 PF_Pack(&follow ,1,PF_INT );
3754
3755 if(PF.log){
3756 fprintf(stderr,"[%d] Now sending with tag = %d\n",PF.me,tag);
3757 fflush(stderr);
3758 }
3759
3760 PF_Send(MASTER, tag);
3761
3762 if(PF.log){
3763 fprintf(stderr,"[%d] returning from send\n",PF.me);
3764 fflush(stderr);
3765 }
3766 return(0);
3767}
3768/*
3769 #] PF_Wait4MasterIP :
3770 #[ PF_DoOneExpr :
3771*/
3772
3780static int PF_DoOneExpr(void)/*the processor*/
3781{
3782 GETIDENTITY
3783 EXPRESSIONS e;
3784 int i;
3785 WORD *term;
3786 POSITION position, outposition;
3787 FILEHANDLE *fi, *fout;
3788 LONG dd = 0;
3789 WORD oldBracketOn = AR.BracketOn;
3790 WORD *oldBrackBuf = AT.BrackBuf;
3791 WORD oldbracketindexflag = AT.bracketindexflag;
3792 e = Expressions + PF.exprtodo;
3793 i = PF.exprtodo;
3794 AR.CurExpr = i;
3795 AR.SortType = AC.SortType;
3796 AR.expchanged = 0;
3797 if ( ( e->vflags & ISFACTORIZED ) != 0 ) {
3798 AR.BracketOn = 1;
3799 AT.BrackBuf = AM.BracketFactors;
3800 AT.bracketindexflag = 1;
3801 }
3802
3803 position = AS.OldOnFile[i];
3804 if ( e->status == HIDDENLEXPRESSION || e->status == HIDDENGEXPRESSION
3805 || e->status == UNHIDELEXPRESSION || e->status == UNHIDEGEXPRESSION ) {
3806 AR.GetFile = 2; fi = AR.hidefile;
3807 }
3808 else {
3809 AR.GetFile = 0; fi = AR.infile;
3810 }
3811/*
3812 PUTZERO(fi->POposition);
3813 if ( fi->handle >= 0 ) {
3814 fi->POfill = fi->POfull = fi->PObuffer;
3815 }
3816*/
3817 SetScratch(fi,&position);
3818 term = AT.WorkPointer;
3819 AR.CompressPointer = AR.CompressBuffer;
3820 AR.CompressPointer[0] = 0;
3821 AR.KeptInHold = 0;
3822 if ( GetTerm(BHEAD term) <= 0 ) {
3823 MesPrint("Expression %d has problems in scratchfile",i);
3824 Terminate(-1);
3825 }
3826 if ( AT.bracketindexflag > 0 ) OpenBracketIndex(i);
3827 term[3] = i;
3828 PUTZERO(outposition);
3829 fout = AR.outfile;
3830 fout->POfill = fout->POfull = fout->PObuffer;
3831 fout->POposition = outposition;
3832 if ( fout->handle >= 0 ) {
3833 fout->POposition = outposition;
3834 }
3835/*
3836 The next statement is needed because we need the system
3837 to believe that the expression is at position zero for
3838 the moment. In this worker, with no memory of other expressions,
3839 it is. This is needed for when a bracket index is made
3840 because there e->onfile is an offset. Afterwards, when the
3841 expression is written to its final location in the masters
3842 output e->onfile will get its real value.
3843*/
3844 PUTZERO(e->onfile);
3845 if ( PutOut(BHEAD term,&outposition,fout,0) < 0 ) return -1;
3846
3847 AR.DeferFlag = AC.ComDefer;
3848
3849/* AR.sLevel = AB[0]->R.sLevel;*/
3850 term = AT.WorkPointer;
3851 NewSort(BHEAD0);
3852 AR.MaxDum = AM.IndDum;
3853 AN.ninterms = 0;
3854 while ( GetTerm(BHEAD term) ) {
3855 SeekScratch(fi,&position);
3856 AN.ninterms++; dd = AN.deferskipped;
3857 if ( ( e->vflags & ISFACTORIZED ) != 0 && term[1] == HAAKJE ) {
3858 StoreTerm(BHEAD term);
3859 }
3860 else {
3861 if ( AC.CollectFun && *term <= (AM.MaxTer/(2*(LONG)sizeof(WORD))) ) {
3862 if ( GetMoreTerms(term) < 0 ) {
3863 LowerSortLevel(); return(-1);
3864 }
3865 SeekScratch(fi,&position);
3866 }
3867 AT.WorkPointer = term + *term;
3868 AN.RepPoint = AT.RepCount + 1;
3869 if ( AR.DeferFlag ) {
3870 AR.CurDum = AN.IndDum = Expressions[PF.exprtodo].numdummies;
3871 }
3872 else {
3873 AN.IndDum = AM.IndDum;
3874 AR.CurDum = ReNumber(BHEAD term);
3875 }
3876 if ( AC.SymChangeFlag ) MarkDirty(term,DIRTYSYMFLAG);
3877 if ( AN.ncmod ) {
3878 if ( ( AC.modmode & ALSOFUNARGS ) != 0 ) MarkDirty(term,DIRTYFLAG);
3879 else if ( AR.PolyFun ) PolyFunDirty(BHEAD term);
3880 }
3881 else if ( AC.PolyRatFunChanged ) PolyFunDirty(BHEAD term);
3882 if ( ( AR.PolyFunType == 2 ) && ( AC.PolyRatFunChanged == 0 )
3883 && ( e->status == LOCALEXPRESSION || e->status == GLOBALEXPRESSION ) ) {
3884 PolyFunClean(BHEAD term);
3885 }
3886 if ( Generator(BHEAD term,0) ) {
3887 LowerSortLevel(); return(-1);
3888 }
3889 AN.ninterms += dd;
3890 }
3891 SetScratch(fi,&position);
3892 if ( fi == AR.hidefile ) {
3893 AR.InHiBuf = (fi->POfull-fi->PObuffer)
3894 -DIFBASE(position,fi->POposition)/sizeof(WORD);
3895 }
3896 else {
3897 AR.InInBuf = (fi->POfull-fi->PObuffer)
3898 -DIFBASE(position,fi->POposition)/sizeof(WORD);
3899 }
3900 }
3901 AN.ninterms += dd;
3902 if ( EndSort(BHEAD AM.S0->sBuffer,0) < 0 ) return(-1);
3903 e->numdummies = AR.MaxDum - AM.IndDum;
3904 AR.BracketOn = oldBracketOn;
3905 AT.BrackBuf = oldBrackBuf;
3906 if ( ( e->vflags & TOBEFACTORED ) != 0 )
3908 else if ( ( ( e->vflags & TOBEUNFACTORED ) != 0 )
3909 && ( ( e->vflags & ISFACTORIZED ) != 0 ) )
3911 if ( AM.S0->TermsLeft ) e->vflags &= ~ISZERO;
3912 else e->vflags |= ISZERO;
3913 if ( AR.expchanged == 0 ) e->vflags |= ISUNMODIFIED;
3914/* if ( AM.S0->TermsLeft ) AR.expflags |= ISZERO;
3915 if ( AR.expchanged ) AR.expflags |= ISUNMODIFIED;*/
3916 AR.GetFile = 0;
3917 AT.bracketindexflag = oldbracketindexflag;
3918
3919 fout->POfull = fout->POfill;
3920 return(0);
3921}
3922
3923/*
3924 #] PF_DoOneExpr :
3925 #[ PF_Slave2MasterIP :
3926*/
3927
3928typedef struct bufIPstruct {
3929 LONG i;
3930 struct ExPrEsSiOn e;
3932
3933static int PF_Slave2MasterIP(int src)/*both master and slave*/
3934{
3935 EXPRESSIONS e;
3936 bufIPstruct_t exprData;
3937 int i,l;
3938 FILEHANDLE *fout=AR.outfile;
3939 POSITION pos;
3940 /*Here we know the length of data to send in advance:
3941 slave has the only one expression in its scratch file, and it sends
3942 this information to the master.*/
3943 if(PF.me != MASTER){/*slave*/
3944 e = Expressions + PF.exprtodo;
3945 /*Fill in the expression data:*/
3946 memcpy(&(exprData.e), e, sizeof(struct ExPrEsSiOn));
3947 SeekScratch(fout,&pos);
3948 exprData.i=BASEPOSITION(pos);
3949 /*Send the metadata:*/
3950 if(PF_RawSend(MASTER,&exprData,sizeof(bufIPstruct_t),0))
3951 return(-1);
3952 i=exprData.i;
3953 SETBASEPOSITION(pos,0);
3954 do{
3955 int blen=PF.exprbufsize*sizeof(WORD);
3956 if(i<blen)
3957 blen=i;
3958 l=PF_SendChunkIP(fout,&pos, MASTER, blen);
3959 /*Here always l == blen!*/
3960 if(l<0)
3961 return(-1);
3962 ADDPOS(pos,l);
3963 i-=l;
3964 }while(i>0);
3965 if ( fout->handle >= 0 ) { /* Now get rid of the file */
3966 CloseFile(fout->handle);
3967 fout->handle = -1;
3968 remove(fout->name);
3969 PUTZERO(fout->POposition);
3970 PUTZERO(fout->filesize);
3971 fout->POfill = fout->POfull = fout->PObuffer;
3972 }
3973 /* Now handle redefined preprocessor variables. */
3974 if ( AC.numpfirstnum > 0 ) {
3976 PF_PackRedefinedPreVars();
3977 PF_LongSingleSend(MASTER, PF_MISC_MSGTAG);
3978 }
3979 return(0);
3980 }/*if(PF.me != MASTER)*/
3981 /*Master*/
3982 /*partodoexr[src] is the number of expression.*/
3983 e = Expressions +partodoexr[src];
3984 /*Get metadata:*/
3985 i = PF_ANY_MSGTAG;
3986 PF_CatchErrorMessages(&src, &i);
3987 if (PF_RawRecv(&src, &exprData,sizeof(bufIPstruct_t),&i)!= sizeof(bufIPstruct_t))
3988 return(-1);
3989 /*Fill in the expression data:*/
3990/* memcpy(e, &(exprData.e), sizeof(struct ExPrEsSiOn)); */
3991 e->counter = exprData.e.counter;
3992 e->vflags = exprData.e.vflags;
3993 e->uflags = exprData.e.uflags;
3994 e->numdummies = exprData.e.numdummies;
3995 e->numfactors = exprData.e.numfactors;
3996 if ( !(e->vflags & ISZERO) ) AR.expflags |= ISZERO;
3997 if ( !(e->vflags & ISUNMODIFIED) ) AR.expflags |= ISUNMODIFIED;
3998 SeekScratch(fout,&pos);
3999 e->onfile = pos;
4000 i=exprData.i;
4001 while(i>0){
4002 int blen=PF.exprbufsize*sizeof(WORD);
4003 if(i<blen)
4004 blen=i;
4005 l=PF_RecvChunkIP(fout,src,blen);
4006 /*Here always l == blen!*/
4007 if(l<0)
4008 return(-1);
4009 i-=l;
4010 }
4011 /* Now handle redefined preprocessor variables. */
4012 if ( AC.numpfirstnum > 0 ) {
4013 PF_LongSingleReceive(src, PF_MISC_MSGTAG, NULL, NULL);
4014 PF_UnpackRedefinedPreVars();
4015 }
4016 return(0);
4017}
4018
4019/*
4020 #] PF_Slave2MasterIP :
4021 #[ PF_Master2SlaveIP :
4022*/
4023
4024static int PF_Master2SlaveIP(int dest, EXPRESSIONS e)
4025{
4026 bufIPstruct_t exprData;
4027 FILEHANDLE *fi;
4028 POSITION pos;
4029 int l;
4030 LONG ll=0,count=0;
4031 WORD *t;
4032 if(e==NULL){/*Say to the slave that no more job:*/
4033 if(PF_RawSend(dest,&exprData,sizeof(bufIPstruct_t),PF_EMPTY_MSGTAG))
4034 return(-1);
4035 return(0);
4036 }
4037 memcpy(&(exprData.e), e, sizeof(struct ExPrEsSiOn));
4038 exprData.i=e-Expressions;
4039 if ( AC.StatsFlag && AC.OldParallelStats ) {
4040 MesPrint("");
4041 MesPrint(" Sending expression %s to slave %d",EXPRNAME(exprData.i),dest);
4042 }
4043 if(PF_RawSend(dest,&exprData,sizeof(bufIPstruct_t),PF_DATA_MSGTAG))
4044 return(-1);
4045 if ( e->status == HIDDENLEXPRESSION || e->status == HIDDENGEXPRESSION
4046 || e->status == UNHIDELEXPRESSION || e->status == UNHIDEGEXPRESSION )
4047 fi = AR.hidefile;
4048 else
4049 fi = AR.infile;
4050 pos=e->onfile;
4051 SetScratch(fi,&pos);
4052 do{
4053 l=PF_SendChunkIP(fi, &pos, dest, PF.exprbufsize*sizeof(WORD));
4054 if(l<0)
4055 return(-1);
4056 t=fi->PObuffer+ (DIFBASE(pos,fi->POposition))/sizeof(WORD);
4057 ll=PF_WalkThrough(t,ll,l/sizeof(WORD),&count);
4058 ADDPOS(pos,l);
4059 }while(ll>-2);
4060 return(0);
4061}
4062
4063/*
4064 #] PF_Master2SlaveIP :
4065 #[ PF_ReadMaster :
4066*/
4067
4068static int PF_ReadMaster(void)/*reads directly to its scratch!*/
4069{
4070 bufIPstruct_t exprData;
4071 int tag,m=MASTER;
4072 EXPRESSIONS e;
4073 FILEHANDLE *fi;
4074 POSITION pos;
4075 LONG count=0;
4076 WORD *t;
4077 LONG ll=0;
4078 int l;
4079 /*Get metadata:*/
4080 tag = PF_ANY_MSGTAG;
4081 PF_CatchErrorMessages(&m, &tag);
4082 if (PF_RawRecv(&m, &exprData,sizeof(bufIPstruct_t),&tag)!= sizeof(bufIPstruct_t))
4083 return(-1);
4084
4085 if(tag == PF_EMPTY_MSGTAG)/*No data, no job*/
4086 return(tag);
4087
4088 /*data expected, tag must be == PF_DATA_MSGTAG!*/
4089 PF.exprtodo=exprData.i;
4090 e=Expressions + PF.exprtodo;
4091 /*Fill in the expression data:*/
4092/* memcpy(e, &(exprData.e), sizeof(struct ExPrEsSiOn)); */
4093 if ( e->status == HIDDENLEXPRESSION || e->status == HIDDENGEXPRESSION
4094 || e->status == UNHIDELEXPRESSION || e->status == UNHIDEGEXPRESSION )
4095 fi = AR.hidefile;
4096 else
4097 fi = AR.infile;
4098 SetEndHScratch(fi,&pos);
4099 e->onfile=AS.OldOnFile[PF.exprtodo]=pos;
4100
4101 do{
4102 l=PF_RecvChunkIP(fi,MASTER,PF.exprbufsize*sizeof(WORD));
4103 if(l<0)
4104 return(-1);
4105 t=fi->POfull-l/sizeof(WORD);
4106 ll=PF_WalkThrough(t,ll,l/sizeof(WORD),&count);
4107 }while(ll>-2);
4108 /*Now -ll-2 is the number of "extra" elements transferred from the master.*/
4109 fi->POfull-=-ll-2;
4110 fi->POfill=fi->POfull;
4111 return(PF_DATA_MSGTAG);
4112}
4113
4114/*
4115 #] PF_ReadMaster :
4116 #[ PF_SendChunkIP :
4117 thesize is in bytes. Returns the number of sent bytes or <0 on error:
4118*/
4119
4120static int PF_SendChunkIP(FILEHANDLE *curfile, POSITION *position, int to, LONG thesize)
4121{
4122 LONG l=thesize;
4123 if(
4124 ISLESSPOS(*position,curfile->POposition) ||
4125 ISGEPOSINC(*position,curfile->POposition,
4126 ((curfile->POfull-curfile->PObuffer)*sizeof(WORD)-thesize) )
4127 ){
4128 if(curfile->handle< 0)
4129 l=(curfile->POfull-curfile->PObuffer)*sizeof(WORD) - (LONG)(position->p1);
4130 else{
4131 PF_SetScratch(curfile,position);
4132 if(
4133 ISGEPOSINC(*position,curfile->POposition,
4134 ((curfile->POfull-curfile->PObuffer)*sizeof(WORD)-thesize) )
4135 )
4136 l=(curfile->POfull-curfile->PObuffer)*sizeof(WORD) - (LONG)position->p1;
4137 }
4138 }
4139 /*Now we are able to sent l bytes from the
4140 curfile->PObuffer[position-curfile->POposition]*/
4141 if(PF_RawSend(to,curfile->PObuffer+ (DIFBASE(*position,curfile->POposition))/sizeof(WORD),l,0))
4142 return(-1);
4143 return(l);
4144}
4145
4146/*
4147 #] PF_SendChunkIP :
4148 #[ PF_RecvChunkIP :
4149 thesize is in bytes. Returns the number of sent bytes or <0 on error:
4150*/
4151
4152static int PF_RecvChunkIP(FILEHANDLE *curfile, int from, LONG thesize)
4153{
4154 LONG receivedBytes;
4155
4156 if( (LONG)((curfile->POstop - curfile->POfull)*sizeof(WORD)) < thesize )
4157 if(PF_pushScratch(curfile))
4158 return(-1);
4159 /*Now there is enough space from curfile->POfill to curfile->POstop*/
4160 {/*Block:*/
4161 int tag=0;
4162 receivedBytes=PF_RawRecv(&from,curfile->POfull,thesize,&tag);
4163 }/*:Block*/
4164 if(receivedBytes >= 0 ){
4165 curfile->POfull+=receivedBytes/sizeof(WORD);
4166 curfile->POfill=curfile->POfull;
4167 }/*if(receivedBytes >= 0 )*/
4168 return(receivedBytes);
4169}
4170
4171/*
4172 #] PF_RecvChunkIP :
4173 #[ PF_WalkThrough :
4174 Returns:
4175 >= 0 -- initial offset,
4176 -1 -- the first element of t contains the length of the tail of compressed term,
4177 <= -2 -- -(d+2), where d is the number of extra transferred elements.
4178 Expects:
4179 l -- initial offset or -1,
4180 chunk -- number of transferred elements (not bytes!)
4181 *count -- incremented each time a new term is found
4182*/
4183
4184static int PF_WalkThrough(WORD *t, LONG l, LONG chunk, LONG *count)
4185{
4186 if(l<0) /*==-1!*/
4187 l=(*t)+1;/*the first element of t contains the length of
4188 the tail of compressed term*/
4189 else{
4190 if(l>=chunk)/*next term is out of the chunk*/
4191 return(l-chunk);
4192 t+=l;
4193 chunk-=l;/*note, l was less than chunk so chunk >0!*/
4194 l=*t;
4195 }
4196 /*Main loop:*/
4197 while(l!=0){
4198 if(l>0){/*an offset to the next term*/
4199 if(l<chunk){
4200 t+=l;
4201 chunk-=l;/*note, l was less than chunk so chunk >0!*/
4202 l=*t;
4203 (*count)++;
4204 }/*if(l<chunk)*/
4205 else
4206 return(l-chunk);
4207 }/*if(l>0)*/
4208 else{ /* l<0 */
4209 if(chunk < 2)/*i.e., chunk == 1*/
4210 return(-1);/*the first WORD in the next chunk is length of the tail of the compressed term*/
4211 l=*(t+1)+2;/*+2 since
4212 1. t points to the length field -1,
4213 2. the size of a tail of compressed term is equal to the number of WORDs in this tail*/
4214 }
4215 }/*while(l!=0)*/
4216 return(-1-chunk);/* -(2+(chunk-1)), chunk>0 ! */
4217}
4218
4219/*
4220 #] PF_WalkThrough :
4221 #] InParallel mode :
4222 #[ PF_SendFile :
4223*/
4224
4225#define PF_SNDFILEBUFSIZE 4096
4226
4234int PF_SendFile(int to, FILE *fd)
4235{
4236 size_t len=0;
4237 if(fd == NULL){
4238 if(PF_RawSend(to,&to,sizeof(int),PF_EMPTY_MSGTAG))
4239 return(-1);
4240 return(0);
4241 }
4242 for(;;){
4243 char buf[PF_SNDFILEBUFSIZE];
4244 size_t l;
4245 l=fread(buf, 1, PF_SNDFILEBUFSIZE, fd);
4246 len+=l;
4247 if(l==PF_SNDFILEBUFSIZE){
4248 if(PF_RawSend(to,buf,PF_SNDFILEBUFSIZE,PF_BUFFER_MSGTAG))
4249 return(-1);
4250 }
4251 else{
4252 if(PF_RawSend(to,buf,l,PF_ENDBUFFER_MSGTAG))
4253 return(-1);
4254 break;
4255 }
4256 }/*for(;;)*/
4257 return(len);
4258}
4259
4260/*
4261 #] PF_SendFile :
4262 #[ PF_RecvFile :
4263*/
4264
4272int PF_RecvFile(int from, FILE *fd)
4273{
4274 size_t len=0;
4275 int tag;
4276 do{
4277 char buf[PF_SNDFILEBUFSIZE];
4278 int l;
4279 l=PF_RawRecv(&from,buf,PF_SNDFILEBUFSIZE,&tag);
4280 if(l<0)
4281 return(-1);
4282 if(tag == PF_EMPTY_MSGTAG)
4283 return(-1);
4284
4285 if( fwrite(buf,l,1,fd)!=1 )
4286 return(-1);
4287 len+=l;
4288 }while(tag!=PF_ENDBUFFER_MSGTAG);
4289 return(len);
4290}
4291
4292/*
4293 #] PF_RecvFile :
4294 #[ Synchronised output :
4295 #[ Explanations :
4296*/
4297
4298/*
4299 * If the master and slaves output statistics or error messages to the same stream
4300 * or file (e.g., the standard output or the log file) simultaneously, then
4301 * a mixing of their outputs can occur. To avoid this, TFORM uses a lock of
4302 * ErrorMessageLock, but there is no locking functionality in the original MPI
4303 * specification. We need to synchronise the output from the master and slaves.
4304 *
4305 * The idea of the synchronised output (by, e.g., MesPrint()) implemented here is
4306 * Slaves:
4307 * 1. Save the output by WriteFile() (set to PF_WriteFileToFile())
4308 * into some buffers between MLOCK(ErrorMessageLock) and
4309 * MUNLOCK(ErrorMessageLock), which call PF_MLock() and PF_MUnlock(),
4310 * respectively. The output for AM.StdOut and AC.LogHandle are saved to
4311 * the buffers.
4312 * 2. At MUNLOCK(ErrorMessageLock), send the output in the buffer to the master,
4313 * with PF_STDOUT_MSGTAG or PF_LOG_MSGTAG.
4314 * Master:
4315 * 1. Receive the buffered output from slaves, and write them by
4316 * WriteFileToFile().
4317 * The main problem is how and where the master receives messages from
4318 * the slaves (PF_ReceiveErrorMessage()). For this purpose there are three
4319 * helper functions: PF_CatchErrorMessages() and PF_CatchErrorMessagesForAll()
4320 * which remove messages with PF_STDOUT_MSGTAG or PF_LOG_MSGTAG from the top
4321 * of the message queue, and PF_ProbeWithCatchingErrorMessages() which is same as
4322 * PF_Probe() except removing these messages.
4323 */
4324
4325/*
4326 #] Explanations :
4327 #[ Variables :
4328*/
4329
4330static int errorMessageLock = 0; /* (slaves) The lock count. See PF_MLock() and PF_MUnlock(). */
4331static Vector(UBYTE, stdoutBuffer); /* (slaves) The buffer for AM.StdOut. */
4332static Vector(UBYTE, logBuffer); /* (slaves) The buffer for AC.LogHandle. */
4333#define recvBuffer logBuffer /* (master) The buffer for receiving messages. */
4334
4335/*
4336 * If PF_ENABLE_STDOUT_BUFFERING is defined, the master performs the line buffering
4337 * (using stdoutBuffer) at PF_WriteFileToFile().
4338 */
4339#ifndef PF_ENABLE_STDOUT_BUFFERING
4340#ifdef UNIX
4341#define PF_ENABLE_STDOUT_BUFFERING
4342#endif
4343#endif
4344
4345/*
4346 #] Variables :
4347 #[ PF_MLock :
4348*/
4349
4353void PF_MLock(void)
4354{
4355 assert(PF.me != MASTER);
4356 if ( errorMessageLock++ > 0 ) return;
4357 VectorClear(stdoutBuffer);
4358 VectorClear(logBuffer);
4359}
4360
4361/*
4362 #] PF_MLock :
4363 #[ PF_MUnlock :
4364*/
4365
4369void PF_MUnlock(void)
4370{
4371 assert(PF.me != MASTER);
4372 if ( --errorMessageLock > 0 ) return;
4373 if ( !VectorEmpty(stdoutBuffer) ) {
4374 PF_RawSend(MASTER, VectorPtr(stdoutBuffer), VectorSize(stdoutBuffer), PF_STDOUT_MSGTAG);
4375 }
4376 if ( !VectorEmpty(logBuffer) ) {
4377 PF_RawSend(MASTER, VectorPtr(logBuffer), VectorSize(logBuffer), PF_LOG_MSGTAG);
4378 }
4379}
4380
4381/*
4382 #] PF_MUnlock :
4383 #[ PF_WriteFileToFile :
4384*/
4385
4398LONG PF_WriteFileToFile(int handle, UBYTE *buffer, LONG size)
4399{
4400 if ( PF.me != MASTER && errorMessageLock > 0 ) {
4401 if ( handle == AM.StdOut ) {
4402 VectorPushBacks(stdoutBuffer, buffer, size);
4403 return size;
4404 }
4405 else if ( handle == AC.LogHandle ) {
4406 VectorPushBacks(logBuffer, buffer, size);
4407 return size;
4408 }
4409 }
4410#ifdef PF_ENABLE_STDOUT_BUFFERING
4411 /*
4412 * On my computer, sometimes a single linefeed "\n" sent to the standard
4413 * output is ignored on the execution of mpiexec. A typical example is:
4414 * $ cat foo.c
4415 * #include <unistd.h>
4416 * int main() {
4417 * write(1, " ", 4);
4418 * write(1, "\n", 1);
4419 * write(1, " ", 4);
4420 * write(1, "123\n", 4);
4421 * return 0;
4422 * }
4423 * or even as a shell script:
4424 * $ cat foo.sh
4425 * #! bin/sh
4426 * printf " "
4427 * printf "\n"
4428 * printf " "
4429 * printf "123\n"
4430 * When I ran it on mpiexec
4431 * $ while :; do mpiexec -np 1 ./foo.sh; done
4432 * I observed the single linefeed (printf "\n") was sometimes ignored. Even
4433 * though this phenomenon might be specific to my environment, I added this
4434 * code because someone may encounter a similar phenomenon and feel it
4435 * frustrating. (TU 16 Jun 2011)
4436 *
4437 * Phenomenon:
4438 * A single linefeed sent to the standard output occasionally ignored
4439 * on mpiexec.
4440 *
4441 * Environment:
4442 * openSUSE 11.4 (x86_64)
4443 * kernel: 2.6.37.6-0.5-desktop
4444 * gcc: 4.5.1 20101208
4445 * mpich2-1.3.2p1 configured with '--enable-shared --with-pm=smpd'
4446 *
4447 * Solution:
4448 * In Unix (in which Uwrite() calls write() system call without any buffering),
4449 * we perform the line buffering here. A single linefeed is also buffered.
4450 *
4451 * XXX:
4452 * At the end of the program the buffered output (text without LF) will not be flushed,
4453 * i.e., will not be written to the standard output. This is not problematic at a normal run.
4454 * The buffer can be explicitly flushed by PF_FlushStdOutBuffer().
4455 */
4456 if ( PF.me == MASTER && handle == AM.StdOut ) {
4457 size_t oldsize;
4458 /* Assume the newline character is LF (when UNIX is defined). */
4459 if ( (size > 0 && buffer[size - 1] != LINEFEED) || (size == 1 && buffer[0] == LINEFEED) ) {
4460 VectorPushBacks(stdoutBuffer, buffer, size);
4461 return size;
4462 }
4463 if ( (oldsize = VectorSize(stdoutBuffer)) > 0 ) {
4464 LONG ret;
4465 VectorPushBacks(stdoutBuffer, buffer, size);
4466 ret = WriteFileToFile(handle, VectorPtr(stdoutBuffer), VectorSize(stdoutBuffer));
4467 VectorClear(stdoutBuffer);
4468 if ( ret < 0 ) {
4469 return ret;
4470 }
4471 else if ( ret < (LONG)oldsize ) {
4472 return 0; /* This means the buffered output in previous calls is lost. */
4473 }
4474 else {
4475 return ret - (LONG)oldsize;
4476 }
4477 }
4478 }
4479#endif
4480 return WriteFileToFile(handle, buffer, size);
4481}
4482
4483/*
4484 #] PF_WriteFileToFile :
4485 #[ PF_FlushStdOutBuffer :
4486*/
4487
4493{
4494#ifdef PF_ENABLE_STDOUT_BUFFERING
4495 if ( PF.me == MASTER && VectorSize(stdoutBuffer) > 0 ) {
4496 WriteFileToFile(AM.StdOut, VectorPtr(stdoutBuffer), VectorSize(stdoutBuffer));
4497 VectorClear(stdoutBuffer);
4498 }
4499#endif
4500}
4501
4502/*
4503 #] PF_FlushStdOutBuffer :
4504 #[ PF_ReceiveErrorMessage :
4505*/
4506
4515static void PF_ReceiveErrorMessage(int src, int tag)
4516{
4517 assert(PF.me == MASTER);
4518 int size;
4519 int ret = PF_RawProbe(&src, &tag, &size);
4520 CHECK(ret == 0);
4521 switch ( tag ) {
4522 case PF_STDOUT_MSGTAG:
4523 case PF_LOG_MSGTAG:
4524 VectorReserve(recvBuffer, size);
4525 ret = PF_RawRecv(&src, VectorPtr(recvBuffer), size, &tag);
4526 CHECK(ret == size);
4527 if ( size > 0 ) {
4528 int handle = (tag == PF_STDOUT_MSGTAG) ? AM.StdOut : AC.LogHandle;
4529#ifdef PF_ENABLE_STDOUT_BUFFERING
4530 if ( handle == AM.StdOut ) PF_WriteFileToFile(handle, VectorPtr(recvBuffer), size);
4531 else
4532#endif
4533 WriteFileToFile(handle, VectorPtr(recvBuffer), size);
4534 }
4535 break;
4536 }
4537}
4538
4539/*
4540 #] PF_ReceiveErrorMessage :
4541 #[ PF_CatchErrorMessages :
4542*/
4543
4553static void PF_CatchErrorMessages(int *src, int *tag)
4554{
4555 for (;;) {
4556 int asrc = *src;
4557 int atag = *tag;
4558 int ret = PF_RawProbe(&asrc, &atag, NULL);
4559 CHECK(ret == 0);
4560 if ( atag == PF_STDOUT_MSGTAG || atag == PF_LOG_MSGTAG ) {
4561 assert(PF.me == MASTER);
4562 PF_ReceiveErrorMessage(asrc, atag);
4563 continue;
4564 }
4565 if ( atag == PF_RUNTIME_ERROR_MSGTAG ) {
4567 }
4568 *src = asrc;
4569 *tag = atag;
4570 break;
4571 }
4572}
4573
4574/*
4575 #] PF_CatchErrorMessages :
4576 #[ PF_CatchErrorMessagesForAll :
4577*/
4578
4583static void PF_CatchErrorMessagesForAll(void)
4584{
4585 assert(PF.me == MASTER);
4586 int i;
4587 for ( i = 1; i < PF.numtasks; i++ ) {
4588 int src = i;
4589 int tag = PF_ANY_MSGTAG;
4590 PF_CatchErrorMessages(&src, &tag);
4591 }
4592}
4593
4594/*
4595 #] PF_CatchErrorMessagesForAll :
4596 #[ PF_ProbeWithCatchingErrorMessages :
4597*/
4598
4609static int PF_ProbeWithCatchingErrorMessages(int *src)
4610{
4611 for (;;) {
4612 int newsrc = *src;
4613 int tag = PF_Probe(&newsrc);
4614 if ( tag == PF_STDOUT_MSGTAG || tag == PF_LOG_MSGTAG ) {
4615 assert(PF.me == MASTER);
4616 PF_ReceiveErrorMessage(newsrc, tag);
4617 continue;
4618 }
4619 if ( tag == PF_RUNTIME_ERROR_MSGTAG ) {
4621 }
4622 *src = newsrc;
4623 return tag;
4624 }
4625}
4626
4627/*
4628 #] PF_ProbeWithCatchingErrorMessages :
4629 #[ PF_FreeErrorMessageBuffers :
4630*/
4631
4638{
4639 VectorFree(stdoutBuffer);
4640 VectorFree(logBuffer);
4641}
4642
4643/*
4644 #] PF_FreeErrorMessageBuffers :
4645 #] Synchronised output :
4646 #[ Handling runtime errors :
4647 #[ PF_RaiseRuntimeError :
4648*/
4649
4655static void PF_RaiseRuntimeError(void)
4656{
4657 if ( PF.me == MASTER ) {
4658 PF_BroadcastRuntimeError();
4659 }
4660 else {
4661 int ret, dummy;
4662 ret = PF_RawSend(MASTER, &dummy, 0, PF_RUNTIME_ERROR_MSGTAG);
4663 CHECK(ret == 0);
4664 int src = MASTER;
4665 int tag = PF_RUNTIME_ERROR_MSGTAG;
4666 ret = PF_RawRecv(&src, &dummy, 0, &tag);
4667 CHECK(ret == 0);
4668 }
4669}
4670
4671/*
4672 #] PF_RaiseRuntimeError :
4673 #[ PF_BroadcastRuntimeError :
4674*/
4675
4680static void PF_BroadcastRuntimeError(void)
4681{
4682 assert(PF.me == MASTER);
4683
4684 int ret, dummy;
4685 MPI_Request requests[PF.numtasks - 1];
4686
4687 /*
4688 * Notify all slaves of program termination by sending PF_RUNTIME_ERROR_MSGTAG.
4689 * This must be non-blocking to avoid deadlock if some slaves have already
4690 * performed a blocking send.
4691 */
4692 for ( int i = 1; i < PF.numtasks; i++ ) {
4693 ret = PF_RawIsend(i, &dummy, 0, PF_BYTE, PF_RUNTIME_ERROR_MSGTAG, &requests[i - 1]);
4694 CHECK(ret == 0);
4695 }
4696
4697 /*
4698 * Receive exactly one PF_RUNTIME_SYNC_MSGTAG or PF_RUNTIME_ERROR_MSGTAG
4699 * message from each slave.
4700 */
4701 for ( int i = 1; i < PF.numtasks; i++ ) {
4702retry:
4703 int asrc = PF_ANY_SOURCE; // blocking probe
4704 int tag = PF_Probe(&asrc);
4705 CHECK(tag >= 0);
4706 assert(1 <= asrc && asrc < PF.numtasks);
4707 switch ( tag ) {
4708 case PF_STDOUT_MSGTAG:
4709 case PF_LOG_MSGTAG:
4710 PF_ReceiveErrorMessage(asrc, tag);
4711 goto retry;
4712 case PF_RUNTIME_ERROR_MSGTAG:
4713 case PF_RUNTIME_SYNC_MSGTAG:
4714 ret = PF_RawRecv(&asrc, &dummy, 0, &tag);
4715 CHECK(ret == 0);
4716 break;
4717 default:
4718 ret = PF_Discard(&asrc, &tag);
4719 CHECK(ret == 0);
4720 goto retry;
4721 }
4722 }
4723
4724 ret = PF_RawWaitAll(PF.numtasks - 1, requests, MPI_STATUSES_IGNORE);
4725 CHECK(ret == 0);
4726}
4727
4728/*
4729 #] PF_BroadcastRuntimeError :
4730 #[ PF_PostEndSortBarrier :
4731*/
4732
4737void PF_PostEndSortBarrier(void)
4738{
4739 assert(PF_processing);
4740 PF_processing = 0;
4741
4742 int ret, dummy;
4743
4744 /*
4745 * Each slave reports either PF_RUNTIME_SYNC_MSGTAG (completed without errors)
4746 * or PF_RUNTIME_ERROR_MSGTAG (see PF_RaiseRuntimeError())
4747 * to the master. After collecting one message from each slave, the master sends
4748 * PF_RUNTIME_SYNC_MSGTAG to all slaves on success,
4749 * or PF_RUNTIME_ERROR_MSGTAG otherwise.
4750 *
4751 * This matches the behaviour of PF_RaiseRuntimeError() and
4752 * PF_ReceiveRuntimeError(). In all cases, each slave sends exactly one
4753 * PF_RUNTIME_SYNC_MSGTAG or PF_RUNTIME_ERROR_MSGTAG message to the master,
4754 * and the master sends exactly one such message to each slave.
4755 */
4756 if ( PF.me == MASTER ) {
4757 int error = 0;
4758 for ( int i = 1; i < PF.numtasks; i++ ) {
4759retry:
4760 int asrc = PF_ANY_SOURCE; // blocking probe
4761 int tag = PF_Probe(&asrc);
4762 CHECK(tag >= 0);
4763 assert(1 <= asrc && asrc < PF.numtasks);
4764 switch ( tag ) {
4765 case PF_STDOUT_MSGTAG:
4766 case PF_LOG_MSGTAG:
4767 PF_ReceiveErrorMessage(asrc, tag);
4768 goto retry;
4769 case PF_RUNTIME_ERROR_MSGTAG:
4770 error = 1;
4771 ret = PF_RawRecv(&asrc, &dummy, 0, &tag);
4772 CHECK(ret == 0);
4773 break;
4774 case PF_RUNTIME_SYNC_MSGTAG:
4775 ret = PF_RawRecv(&asrc, &dummy, 0, &tag);
4776 CHECK(ret == 0);
4777 break;
4778 default:
4779 MesPrint("!!!Unexpected MPI message src=%d tag=%d.", asrc, tag);
4780 ret = PF_Discard(&asrc, &tag);
4781 CHECK(ret == 0);
4782 goto retry;
4783 }
4784 }
4785 for ( int i = 1; i < PF.numtasks; i++ ) {
4786 ret = PF_RawSend(i, &dummy, 0, error ? PF_RUNTIME_ERROR_MSGTAG : PF_RUNTIME_SYNC_MSGTAG);
4787 CHECK(ret == 0);
4788 }
4789 }
4790 else {
4791 int tag;
4792 ret = PF_RawSend(MASTER, &dummy, 0, PF_RUNTIME_SYNC_MSGTAG);
4793 CHECK(ret == 0);
4794 int src = MASTER;
4795 ret = PF_RawRecv(&src, &dummy, 0, &tag);
4796 CHECK(ret == 0);
4797 switch ( tag ) {
4798 case PF_RUNTIME_SYNC_MSGTAG:
4799 break;
4800 case PF_RUNTIME_ERROR_MSGTAG:
4801 PF.notMyFault = 1;
4802 Terminate(-1);
4803 break;
4804 default:
4805 MesPrint("!!!Unexpected MPI message src=%d tag=%d.", src, tag);
4806 break;
4807 }
4808 }
4809}
4810
4811/*
4812 #] PF_PostEndSortBarrier :
4813 #[ PF_ReceiveRuntimeError :
4814*/
4815
4821{
4822 PF_processing = 0;
4823 PF.notMyFault = 1;
4824 if ( PF.me == MASTER ) {
4825 PF_BroadcastRuntimeError();
4826 }
4827 else {
4828 int ret, dummy;
4829 int src = MASTER;
4830 int tag = PF_RUNTIME_ERROR_MSGTAG;
4831 ret = PF_RawRecv(&src, &dummy, 0, &tag);
4832 CHECK(ret == 0);
4833 ret = PF_RawSend(MASTER, &dummy, 0, PF_RUNTIME_ERROR_MSGTAG);
4834 CHECK(ret == 0);
4835 }
4836 Terminate(-1);
4837}
4838
4839/*
4840 #] PF_ReceiveRuntimeError :
4841 #] Handling runtime errors :
4842*/
void finishcbuf(WORD num)
Definition comtool.c:89
void AddArgs(PHEAD WORD *, WORD *, WORD *)
Definition sort.c:2048
WORD * poly_ratfun_add(PHEAD WORD *, WORD *)
Definition polywrap.cc:633
int poly_unfactorize_expression(EXPRESSIONS)
Definition polywrap.cc:1535
WORD CompCoef(WORD *, WORD *)
Definition reken.c:3048
WORD PutOut(PHEAD WORD *, POSITION *, FILEHANDLE *, WORD)
Definition sort.c:1171
LONG EndSort(PHEAD WORD *, int)
Definition sort.c:454
int Generator(PHEAD WORD *, WORD)
Definition proces.c:3249
void LowerSortLevel(void)
Definition sort.c:4661
int StoreTerm(PHEAD WORD *)
Definition sort.c:4244
int poly_factorize_expression(EXPRESSIONS)
Definition polywrap.cc:1178
void WriteStats(POSITION *, WORD, WORD)
Definition sort.c:129
int NewSort(PHEAD0)
Definition sort.c:359
int NormalModulus(UWORD *, WORD *)
Definition reken.c:1404
int InsertTerm(PHEAD WORD *, WORD, WORD, WORD *, WORD *, WORD)
Definition proces.c:2727
int FlushOut(POSITION *, FILEHANDLE *, int)
Definition sort.c:1533
LONG TimeCPU(WORD)
Definition tools.c:3487
int PutPreVar(UBYTE *, UBYTE *, UBYTE *, int)
Definition pre.c:724
int PF_LongSingleReceive(int src, int tag, int *psrc, int *ptag)
Definition mpi.c:1693
int PF_PackString(const UBYTE *str)
Definition mpi.c:818
int PF_LongSingleSend(int to, int tag)
Definition mpi.c:1650
int PF_PrepareLongSinglePack(void)
Definition mpi.c:1561
int PF_Unpack(void *buffer, size_t count, MPI_Datatype type)
Definition mpi.c:783
int PF_Receive(int src, int tag, int *psrc, int *ptag)
Definition mpi.c:959
int PF_Send(int to, int tag)
Definition mpi.c:933
int PF_Reduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype type, MPI_Op op, int root)
Definition mpi.c:475
int PF_PreparePack(void)
Definition mpi.c:736
int PF_LongSingleUnpack(void *buffer, size_t count, MPI_Datatype type)
Definition mpi.c:1613
int PF_Pack(const void *buffer, size_t count, MPI_Datatype type)
Definition mpi.c:754
int PF_PrepareLongMultiPack(void)
Definition mpi.c:1752
int PF_Broadcast(void)
Definition mpi.c:994
int PF_LongMultiBroadcast(void)
Definition mpi.c:1916
int PF_UnpackString(UBYTE *str)
Definition mpi.c:886
int PF_LongSinglePack(const void *buffer, size_t count, MPI_Datatype type)
Definition mpi.c:1579
int PF_Bcast(void *buffer, int count)
Definition mpi.c:452
int PF_ISendSbuf(int to, int tag)
Definition mpi.c:266
int PF_WaitRbuf(PF_BUFFER *, int, LONG *)
Definition mpi.c:412
void PF_MUnlock(void)
Definition parallel.c:4369
struct NoDe NODE
void PF_FreeErrorMessageBuffers(void)
Definition parallel.c:4637
WORD PF_Deferred(WORD *term, WORD level)
Definition parallel.c:1201
int PF_BroadcastRedefinedPreVars(void)
Definition parallel.c:3005
int PF_Init(int *argc, char ***argv)
Definition parallel.c:1947
int PF_BroadcastRHS(void)
Definition parallel.c:3580
#define CHECK(condition)
Definition parallel.c:160
LONG PF_GetSlaveTimes(void)
Definition parallel.c:2077
int PF_BroadcastExpFlags(void)
Definition parallel.c:3258
int PF_BroadcastPreDollar(WORD **dbuffer, LONG *newsize, int *numterms)
Definition parallel.c:2222
#define SWAP(x, y)
Definition parallel.c:131
int PF_BroadcastModifiedDollars(void)
Definition parallel.c:2788
int PF_RawIsend(int dest, const void *buf, int count, MPI_Datatype type, int tag, MPI_Request *request)
Definition mpi.c:574
void PF_PreTerminate(int errorcode)
Definition parallel.c:2041
int PF_RecvWbuf(WORD *, LONG *, int *)
Definition mpi.c:342
int PF_BroadcastCBuf(int bufnum)
Definition parallel.c:3147
void PF_BroadcastBuffer(WORD **buffer, LONG *length)
Definition parallel.c:2125
LONG PF_RawRecv(int *src, void *buf, LONG thesize, int *tag)
Definition mpi.c:518
int PF_CollectModifiedDollars(void)
Definition parallel.c:2509
int PF_RawWaitAll(int count, MPI_Request *request, MPI_Status *status)
Definition mpi.c:594
LONG PF_WriteFileToFile(int handle, UBYTE *buffer, LONG size)
Definition parallel.c:4398
int PF_RawProbe(int *src, int *tag, int *bytesize)
Definition mpi.c:542
int PF_RecvFile(int from, FILE *fd)
Definition parallel.c:4272
int PF_IRecvRbuf(PF_BUFFER *, int, int)
Definition mpi.c:378
int PF_BroadcastString(UBYTE *str)
Definition parallel.c:2167
int PF_Terminate(int errorcode)
Definition parallel.c:2061
int PF_LibTerminate(int)
Definition mpi.c:214
int PF_Discard(int *src, int *tag)
Definition mpi.c:615
void PF_FlushStdOutBuffer(void)
Definition parallel.c:4492
int PF_RawSend(int dest, void *buf, LONG l, int tag)
Definition mpi.c:497
int PF_Probe(int *)
Definition mpi.c:235
LONG PF_BroadcastNumber(LONG x)
Definition parallel.c:2098
int PF_EndSort(void)
Definition parallel.c:874
int PF_LibInit(int *, char ***)
Definition mpi.c:128
void PF_MLock(void)
Definition parallel.c:4353
int PF_SendFile(int to, FILE *fd)
Definition parallel.c:4234
int PF_Processor(EXPRESSIONS e, WORD i, WORD LastExpression)
Definition parallel.c:1533
#define PACK_LONG(p, n)
Definition parallel.c:142
int PF_BroadcastExpr(EXPRESSIONS e, FILEHANDLE *file)
Definition parallel.c:3552
#define UNPACK_LONG(p, n)
Definition parallel.c:151
LONG PF_RealTime(int)
Definition mpi.c:106
void PF_ReceiveRuntimeError(void)
Definition parallel.c:4820
int PF_InParallelProcessor(void)
Definition parallel.c:3627
LONG BufferSize
Definition structs.h:981
WORD * numdum
Definition structs.h:978
LONG * NumTerms
Definition structs.h:977
WORD * Top
Definition structs.h:972
COMPTREE * boomlijst
Definition structs.h:980
WORD * dimension
Definition structs.h:979
WORD ** rhs
Definition structs.h:975
WORD ** lhs
Definition structs.h:974
WORD * Buffer
Definition structs.h:971
WORD * Pointer
Definition structs.h:973
LONG * CanCommu
Definition structs.h:976
int handle
Definition structs.h:709
#define Vector(T, X)
Definition vector.h:84
#define VectorStruct(T)
Definition vector.h:65
#define VectorClear(X)
Definition vector.h:235
#define VectorReserve(X, newcapacity)
Definition vector.h:249
#define VectorFree(X)
Definition vector.h:130
#define VectorSize(X)
Definition vector.h:194
#define VectorPushBack(X, x)
Definition vector.h:277
#define VectorInit(X)
Definition vector.h:113
#define VectorPtr(X)
Definition vector.h:150
#define VectorEmpty(X)
Definition vector.h:222
#define VectorPushBacks(X, src, n)
Definition vector.h:295