FORM v5.0.0-35-g6318119
mpi.c
Go to the documentation of this file.
1
9/* #[ License : */
10/*
11 * Copyright (C) 1984-2026 J.A.M. Vermaseren
12 * When using this file you are requested to refer to the publication
13 * J.A.M.Vermaseren "New features of FORM" math-ph/0010025
14 * This is considered a matter of courtesy as the development was paid
15 * for by FOM the Dutch physics granting agency and we would like to
16 * be able to track its scientific use to convince FOM of its value
17 * for the community.
18 *
19 * This file is part of FORM.
20 *
21 * FORM is free software: you can redistribute it and/or modify it under the
22 * terms of the GNU General Public License as published by the Free Software
23 * Foundation, either version 3 of the License, or (at your option) any later
24 * version.
25 *
26 * FORM is distributed in the hope that it will be useful, but WITHOUT ANY
27 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
28 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
29 * details.
30 *
31 * You should have received a copy of the GNU General Public License along
32 * with FORM. If not, see <http://www.gnu.org/licenses/>.
33 */
34/* #] License : */
35/*
36 #[ Includes and variables :
37*/
38
39/*
40#define MPIDEBUGGING
41#define MPIDEBUGGING_DELAY_US 10000
42*/
43
44#include <limits.h>
45#include "form3.h"
46
47#ifdef MPICH_PROFILING
48# include "mpe.h"
49#endif
50
51#ifdef MPIDEBUGGING
52#include "mpidbg.h"
53#endif
54
55/*[12oct2005 mt]:*/
56/*
57 Today there was some cleanup, some stuff is moved into another place
58 in this file, and PF.packsize is removed and PF_packsize is used
59 instead. It is rather difficult to proper comment it, so not all these
60 changing are marked by "[12oct2005 mt]"
61*/
62
63#define PF_PACKSIZE 1600
64
65/*
66 Size in bytes, will be initialized soon as
67 PF_packsize=PF_PACKSIZE/sizeof(int)*sizeof(int); for possible
68 future developing we prefer to do this initialization not here,
69 but in PF_LibInit:
70*/
71
72static int PF_packsize = 0;
73static MPI_Status PF_status;
74LONG PF_maxDollarChunkSize = 0; /*:[04oct2005 mt]*/
75
76static int PF_ShortPackInit(void);
77static int PF_longPackInit(void); /*:[12oct2005 mt]*/
78
89#define MPI_ERRCODE_CHECK(err) \
90 do { \
91 int _tmp_err = (err); \
92 if ( _tmp_err != MPI_SUCCESS ) return _tmp_err != 0 ? _tmp_err : -1; \
93 } while (0)
94
95/*
96 #] Includes and variables :
97 #[ PF_RealTime :
98*/
99
106LONG PF_RealTime(int i)
107{
108 static double starttime;
109 if ( i == PF_RESET ) {
110 starttime = MPI_Wtime();
111 return((LONG)0);
112 }
113 return((LONG)( 100. * (MPI_Wtime() - starttime) ) );
114}
115
116/*
117 #] PF_RealTime :
118 #[ PF_LibInit :
119*/
120
128int PF_LibInit(int *argcp, char ***argvp)
129{
130 int ret;
131 ret = MPI_Init(argcp,argvp);
132 if ( ret != MPI_SUCCESS ) return(ret);
133 ret = MPI_Comm_rank(PF_COMM,&PF.me);
134 if ( ret != MPI_SUCCESS ) return(ret);
135 ret = MPI_Comm_size(PF_COMM,&PF.numtasks);
136 if ( ret != MPI_SUCCESS ) return(ret);
137
138 /* Initialization of packed communications. */
139 PF_packsize = PF_PACKSIZE/sizeof(int)*sizeof(int);
140 if ( PF_ShortPackInit() ) return -1;
141 if ( PF_longPackInit() ) return -1;
142
143 {/*Block*/
144 int bytes, totalbytes=0;
145/*
146 There is one problem with maximal possible packing: there is no API to
147 convert bytes to the record number. So, here we calculate the buffer
148 size needed for storing dollarvars:
149
150 LONG PF_maxDollarChunkSize is the size for the portion of the dollar
151 variable buffer suitable for broadcasting. This variable should be
152 visible from parallel.c
153
154 Evaluate PF_Pack(numterms,1,PF_INT):
155*/
156 if ( ( ret = MPI_Pack_size(1,PF_INT,PF_COMM,&bytes) )!=MPI_SUCCESS )
157 return(ret);
158
159 totalbytes+=bytes;
160/*
161 Evaluate PF_Pack( newsize,1,PF_LONG):
162*/
163 if ( ( ret = MPI_Pack_size(1,PF_LONG,PF_COMM,&bytes) )!=MPI_SUCCESS )
164 return(ret);
165
166 totalbytes += bytes;
167/*
168 Now available room is PF_packsize-totalbytes
169*/
170 totalbytes = PF_packsize-totalbytes;
171/*
172 Now totalbytes is the size of chunk in bytes.
173 Evaluate this size in number of records:
174
175 Rough estimate:
176*/
177 PF_maxDollarChunkSize=totalbytes/sizeof(WORD);
178/*
179 Go to the up limit:
180*/
181 do {
182 if ( ( ret = MPI_Pack_size(
183 ++PF_maxDollarChunkSize,PF_WORD,PF_COMM,&bytes) )!=MPI_SUCCESS )
184 return(ret);
185 } while ( bytes<totalbytes );
186/*
187 Now the chunk size is too large
188 And now evaluate the exact value:
189*/
190 do {
191 if ( ( ret = MPI_Pack_size(
192 --PF_maxDollarChunkSize,PF_WORD,PF_COMM,&bytes) )!=MPI_SUCCESS )
193 return(ret);
194 } while ( bytes>totalbytes );
195/*
196 Now PF_maxDollarChunkSize is the size of chunk of PF_WORD fitting the
197 buffer <= (PF_packsize-PF_INT-PF_LONG)
198*/
199 }/*Block*/
200 return(0);
201}
202/*
203 #] PF_LibInit :
204 #[ PF_LibTerminate :
205*/
206
214int PF_LibTerminate(int error)
215{
216 DUMMYUSE(error);
217 return(MPI_Finalize());
218}
219
220/*
221 #] PF_LibTerminate :
222 #[ PF_Probe :
223*/
224
235int PF_Probe(int *src)
236{
237 int ret, flag;
238 if ( *src == PF_ANY_SOURCE ) { /*Blocking call*/
239 ret = MPI_Probe(*src,MPI_ANY_TAG,PF_COMM,&PF_status);
240 flag = 1;
241 }
242 else { /*Non-blocking call*/
243 ret = MPI_Iprobe(*src,MPI_ANY_TAG,PF_COMM,&flag,&PF_status);
244 }
245 *src = PF_status.MPI_SOURCE;
246 if ( ret != MPI_SUCCESS ) { if ( ret > 0 ) ret *= -1; return(ret); }
247 if ( !flag ) return(0);
248 return(PF_status.MPI_TAG);
249}
250
251/*
252 #] PF_Probe :
253 #[ PF_ISendSbuf :
254*/
255
266int PF_ISendSbuf(int to, int tag)
267{
268 PF_BUFFER *s = PF.sbuf;
269 int a = s->active;
270 int size = s->fill[a] - s->buff[a];
271 int r = 0;
272
273 static int finished;
274
275 s->fill[a] = s->buff[a];
276 if ( s->numbufs == 1 ) {
277 r = MPI_Ssend(s->buff[a],size,PF_WORD,MASTER,tag,PF_COMM);
278 if ( r != MPI_SUCCESS ) {
279 fprintf(stderr,"[%d|%d] PF_ISendSbuf: MPI_Ssend returns: %d \n",
280 PF.me,(int)AC.CModule,r);
281 fflush(stderr);
282 return(r);
283 }
284 return(0);
285 }
286
287 switch ( tag ) { /* things to do before sending */
288 case PF_TERM_MSGTAG:
289 if ( PF.sbuf->request[to] != MPI_REQUEST_NULL)
290 r = MPI_Wait(&PF.sbuf->request[to],&PF.sbuf->retstat[to]);
291 if ( r != MPI_SUCCESS ) return(r);
292 break;
293 default:
294 break;
295 }
296
297 r = MPI_Isend(s->buff[a],size,PF_WORD,to,tag,PF_COMM,&s->request[a]);
298
299 if ( r != MPI_SUCCESS ) return(r);
300
301 switch ( tag ) { /* things to do after initialising sending */
302 case PF_TERM_MSGTAG:
303 finished = 0;
304 break;
305 case PF_ENDSORT_MSGTAG:
306 if ( ++finished == PF.numtasks - 1 )
307 r = MPI_Waitall(s->numbufs,s->request,s->status);
308 if ( r != MPI_SUCCESS ) return(r);
309 break;
310 case PF_BUFFER_MSGTAG:
311 if ( ++s->active >= s->numbufs ) s->active = 0;
312 while ( s->request[s->active] != MPI_REQUEST_NULL ) {
313 r = MPI_Waitsome(s->numbufs,s->request,&size,s->index,s->retstat);
314 if ( r != MPI_SUCCESS ) return(r);
315 }
316 break;
317 case PF_ENDBUFFER_MSGTAG:
318 if ( ++s->active >= s->numbufs ) s->active = 0;
319 r = MPI_Waitall(s->numbufs,s->request,s->status);
320 if ( r != MPI_SUCCESS ) return(r);
321 break;
322 default:
323 return(-99);
324 break;
325 }
326 return(0);
327}
328
329/*
330 #] PF_ISendSbuf :
331 #[ PF_RecvWbuf :
332*/
333
342int PF_RecvWbuf(WORD *b, LONG *s, int *src)
343{
344 int i, r = 0;
345
346 for (;;) {
347 r = MPI_Probe(*src,PF_ANY_MSGTAG,PF_COMM,&PF_status);
348 if ( r != MPI_SUCCESS ) { if ( r > 0 ) r *= -1; return(r); }
349 if ( PF_status.MPI_TAG != PF_RUNTIME_ERROR_MSGTAG ) break;
351 }
352
353 r = MPI_Recv(b,(int)*s,PF_WORD,PF_status.MPI_SOURCE,PF_status.MPI_TAG,PF_COMM,&PF_status);
354 if ( r != MPI_SUCCESS ) { if ( r > 0 ) r *= -1; return(r); }
355
356 r = MPI_Get_count(&PF_status,PF_WORD,&i);
357 if ( r != MPI_SUCCESS ) { if ( r > 0 ) r *= -1; return(r); }
358
359 *s = (LONG)i;
360 *src = PF_status.MPI_SOURCE;
361 return(PF_status.MPI_TAG);
362}
363
364/*
365 #] PF_RecvWbuf :
366 #[ PF_IRecvRbuf :
367*/
368
378int PF_IRecvRbuf(PF_BUFFER *r, int bn, int from)
379{
380 int ret;
381 r->type[bn] = PF_WORD;
382
383 if ( r->numbufs == 1 ) {
384 r->tag[bn] = MPI_ANY_TAG;
385 r->from[bn] = from;
386 }
387 else {
388 ret = MPI_Irecv(r->full[bn],(int)(r->stop[bn] - r->full[bn]),PF_WORD,from,
389 MPI_ANY_TAG,PF_COMM,&r->request[bn]);
390 if (ret != MPI_SUCCESS) { if(ret > 0) ret *= -1; return(ret); }
391 }
392 return(0);
393}
394
395/*
396 #] PF_IRecvRbuf :
397 #[ PF_WaitRbuf :
398*/
399
412int PF_WaitRbuf(PF_BUFFER *r, int bn, LONG *size)
413{
414 int ret, rsize;
415
416 if ( r->numbufs == 1 ) {
417 *size = r->stop[bn] - r->full[bn];
418 ret = MPI_Recv(r->full[bn],(int)*size,r->type[bn],r->from[bn],r->tag[bn],
419 PF_COMM,&(r->status[bn]));
420 if ( ret != MPI_SUCCESS ) { if ( ret > 0 ) ret *= -1; return(ret); }
421 ret = MPI_Get_count(&(r->status[bn]),r->type[bn],&rsize);
422 if ( ret != MPI_SUCCESS ) { if ( ret > 0 ) ret *= -1; return(ret); }
423 if ( rsize > *size ) return(-99);
424 *size = (LONG)rsize;
425 }
426 else {
427 while ( r->request[bn] != MPI_REQUEST_NULL ) {
428 ret = MPI_Waitsome(r->numbufs,r->request,&rsize,r->index,r->retstat);
429 if ( ret != MPI_SUCCESS ) { if ( ret > 0 ) ret *= -1; return(ret); }
430 while ( --rsize >= 0 ) r->status[r->index[rsize]] = r->retstat[rsize];
431 }
432 ret = MPI_Get_count(&(r->status[bn]),r->type[bn],&rsize);
433 if ( ret != MPI_SUCCESS ) { if ( ret > 0 ) ret *= -1; return(ret); }
434 *size = (LONG)rsize;
435 }
436 return(r->status[bn].MPI_TAG);
437}
438
439/*
440 #] PF_WaitRbuf :
441 #[ PF_Bcast :
442*/
443
452int PF_Bcast(void *buffer, int count)
453{
454 if ( MPI_Bcast(buffer,count,MPI_BYTE,MASTER,PF_COMM) != MPI_SUCCESS )
455 return(-1);
456 return(0);
457}
458
459/*
460 #] PF_Bcast :
461 #[ PF_Reduce :
462*/
463
475int PF_Reduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype type, MPI_Op op, int root)
476{
477 if ( MPI_Reduce(sendbuf,recvbuf,count,type,op,root,PF_COMM) != MPI_SUCCESS )
478 return(-1);
479 return(0);
480}
481
482/*
483 #] PF_Reduce :
484 #[ PF_RawSend :
485*/
486
497int PF_RawSend(int dest, void *buf, LONG l, int tag)
498{
499 int ret=MPI_Ssend(buf,(int)l,MPI_BYTE,dest,tag,PF_COMM);
500 if ( ret != MPI_SUCCESS ) return(-1);
501 return(0);
502}
503/*
504 #] PF_RawSend :
505 #[ PF_RawRecv :
506*/
507
518LONG PF_RawRecv(int *src,void *buf,LONG thesize,int *tag)
519{
520 MPI_Status stat;
521 int ret=MPI_Recv(buf,(int)thesize,MPI_BYTE,*src,MPI_ANY_TAG,PF_COMM,&stat);
522 if ( ret != MPI_SUCCESS ) return(-1);
523 if ( MPI_Get_count(&stat,MPI_BYTE,&ret) != MPI_SUCCESS ) return(-1);
524 *tag = stat.MPI_TAG;
525 *src = stat.MPI_SOURCE;
526 return(ret);
527}
528
529/*
530 #] PF_RawRecv :
531 #[ PF_RawProbe :
532*/
533
542int PF_RawProbe(int *src, int *tag, int *bytesize)
543{
544 MPI_Status stat;
545 int srcval = src != NULL ? *src : PF_ANY_SOURCE;
546 int tagval = tag != NULL ? *tag : PF_ANY_MSGTAG;
547 int ret = MPI_Probe(srcval, tagval, PF_COMM, &stat);
548 if ( ret != MPI_SUCCESS ) return -1;
549 if ( src != NULL ) *src = stat.MPI_SOURCE;
550 if ( tag != NULL ) *tag = stat.MPI_TAG;
551 if ( bytesize != NULL ) {
552 ret = MPI_Get_count(&stat, MPI_BYTE, bytesize);
553 if ( ret != MPI_SUCCESS ) return -1;
554 }
555 return 0;
556}
557
558/*
559 #] PF_RawProbe :
560 #[ PF_RawIsend :
561*/
562
574int PF_RawIsend(int dest, const void *buf, int count, MPI_Datatype type, int tag, MPI_Request *request)
575{
576 int ret = MPI_Isend(buf, count, type, dest, tag, PF_COMM, request);
577 if ( ret != MPI_SUCCESS ) return(-1);
578 return(0);
579}
580
581/*
582 #] PF_RawIsend :
583 #[ PF_RawWaitAll :
584*/
585
594int PF_RawWaitAll(int count, MPI_Request *request, MPI_Status *status)
595{
596 int ret = MPI_Waitall(count, request, status);
597 if ( ret != MPI_SUCCESS ) return(-1);
598 return(0);
599}
600
601/*
602 #] PF_RawWaitAll :
603 #[ PF_Discard :
604*/
605
615int PF_Discard(int *src, int *tag)
616{
617 enum { DEFAULT_BUF_SIZE = 1024 };
618 MPI_Status stat;
619 int count;
620 void *buf;
621 char default_buf[DEFAULT_BUF_SIZE];
622 int srcval = src != NULL ? *src : PF_ANY_SOURCE;
623 int tagval = tag != NULL ? *tag : PF_ANY_MSGTAG;
624 int ret = MPI_Probe(srcval, tagval, PF_COMM, &stat);
625 if ( ret != MPI_SUCCESS ) return -1;
626 if ( src != NULL ) *src = stat.MPI_SOURCE;
627 if ( tag != NULL ) *tag = stat.MPI_TAG;
628 ret = MPI_Get_count(&stat, MPI_BYTE, &count);
629 if ( ret != MPI_SUCCESS ) return -1;
630 buf = count <= DEFAULT_BUF_SIZE ? default_buf : Malloc1(count, "PF_Discard");
631 ret = MPI_Recv(buf, count, MPI_BYTE, stat.MPI_SOURCE, stat.MPI_TAG, PF_COMM, MPI_STATUS_IGNORE);
632 if ( buf != default_buf ) M_free(buf, "PF_Discard");
633 if ( ret != MPI_SUCCESS ) return -1;
634 return 0;
635}
636
637/*
638 #] PF_Discard :
639 #[ The pack buffer :
640 #[ Variables :
641*/
642
643/*
644 * The pack buffer with the fixed size (= PF_packsize).
645 */
646static UBYTE *PF_packbuf = NULL;
647static UBYTE *PF_packstop = NULL;
648static int PF_packpos = 0;
649
650/*
651 #] Variables :
652 #[ PF_ShortPackInit :
653*/
654
661static int PF_ShortPackInit(void)
662{
663 PF_packbuf = (UBYTE *)Malloc1(sizeof(UBYTE) * PF_packsize, "PF_ShortPackInit");
664 if ( PF_packbuf == NULL ) return -1;
665 PF_packstop = PF_packbuf + PF_packsize;
666 return 0;
667}
668
669/*
670 #] PF_ShortPackInit :
671 #[ PF_InitPackBuf :
672*/
673
679static inline int PF_InitPackBuf(void)
680{
681/*
682 This is definitely not the best place for allocating the
683 buffer! Moved to PF_LibInit():
684
685 if ( PF_packbuf == 0 ) {
686 PF_packbuf = (UBYTE *)Malloc1(sizeof(UBYTE)*PF.packsize,"PF_InitPackBuf");
687 if ( PF_packbuf == 0 ) return(-1);
688 PF_packstop = PF_packbuf + PF.packsize;
689 }
690*/
691 PF_packpos = 0;
692 return(0);
693}
694
695/*
696 #] PF_InitPackBuf :
697 #[ PF_PrintPackBuf :
698*/
699
707int PF_PrintPackBuf(char *s, int size)
708{
709#ifdef NOMESPRINTYET
710/*
711 The use of printf should be discouraged. The results are flushed to
712 the output at unpredictable moments. We should use printf only
713 during startup when MesPrint doesn't have its buffers and output
714 channels initialized.
715*/
716 int i;
717 printf("[%d] %s: ",PF.me,s);
718 for(i=0;i<size;i++) printf("%d ",PF_packbuf[i]);
719 printf("\n");
720#else
721 MesPrint("[%d] %s: %a",PF.me,s,size,(WORD *)(PF_packbuf));
722#endif
723 return(0);
724}
725
726/*
727 #] PF_PrintPackBuf :
728 #[ PF_PreparePack :
729*/
730
737{
738 return PF_InitPackBuf();
739}
740
741/*
742 #] PF_PreparePack :
743 #[ PF_Pack :
744*/
745
754int PF_Pack(const void *buffer, size_t count, MPI_Datatype type)
755{
756 int err, bytes;
757
758 if ( count > INT_MAX ) return -99;
759
760 err = MPI_Pack_size((int)count, type, PF_COMM, &bytes);
762 if ( PF_packpos + bytes > PF_packstop - PF_packbuf ) return -99;
763
764 err = MPI_Pack((void *)buffer, (int)count, type, PF_packbuf, PF_packsize, &PF_packpos, PF_COMM);
766
767 return 0;
768}
769
770/*
771 #] PF_Pack :
772 #[ PF_Unpack :
773*/
774
783int PF_Unpack(void *buffer, size_t count, MPI_Datatype type)
784{
785 int err;
786
787 if ( count > INT_MAX ) return -99;
788
789 err = MPI_Unpack(PF_packbuf, PF_packsize, &PF_packpos, buffer, (int)count, type, PF_COMM);
791
792 return 0;
793}
794
795/*
796 #] PF_Unpack :
797 #[ PF_PackString :
798*/
799
818int PF_PackString(const UBYTE *str)
819{
820 int ret,buflength,bytes,length;
821/*
822 length will be packed in the beginning.
823 Decrement buffer size by the length of the field "length":
824*/
825 if ( ( ret = MPI_Pack_size(1,PF_INT,PF_COMM,&bytes) ) != MPI_SUCCESS )
826 return(ret);
827 buflength = PF_packsize - bytes;
828/*
829 Calculate the string length (INCLUDING the trailing zero!):
830*/
831 for ( length = 0; length < buflength; length++ ) {
832 if ( str[length] == '\0' ) {
833 length++; /* since the trailing zero must be accounted */
834 break;
835 }
836 }
837/*
838 The string "\0!\0" is used as an image of the NULL.
839*/
840 if ( ( str[0] == '\0' ) /* empty string */
841 && ( str[1] == '!' ) /* Special case? */
842 && ( str[2] == '\0' ) /* Yes, pass 3 initial symbols */
843 ) length += 2; /* all 3 characters will be packed */
844 length++; /* Will be decremented in the following loop */
845/*
846 The problem: packed size of byte may be not equal 1! So first, suppose
847 it is 1, and if this is not the case decrease the length of the string
848 until it fits the buffer:
849*/
850 do {
851 if ( ( ret = MPI_Pack_size(--length,PF_BYTE,PF_COMM,&bytes) )
852 != MPI_SUCCESS ) return(ret);
853 } while ( bytes > buflength );
854/*
855 Note, now if str[length-1] == '\0' then the string fits to the buffer
856 (INCLUDING the trailing zero!);if not, the rest must be packed further!
857
858 Pack the length to PF_packbuf:
859*/
860 if ( ( ret = MPI_Pack(&length,1,PF_INT,PF_packbuf,PF_packsize,
861 &PF_packpos,PF_COMM) ) != MPI_SUCCESS ) return(ret);
862/*
863 Pack the string to PF_packbuf:
864*/
865 if ( ( ret = MPI_Pack((UBYTE *)str,length,PF_BYTE,PF_packbuf,PF_packsize,
866 &PF_packpos,PF_COMM) ) != MPI_SUCCESS ) return(ret);
867 return(length);
868}
869
870/*
871 #] PF_PackString :
872 #[ PF_UnpackString :
873*/
874
886int PF_UnpackString(UBYTE *str)
887{
888 int ret,length;
889/*
890 Unpack the length:
891*/
892 if( (ret = MPI_Unpack(PF_packbuf,PF_packsize,&PF_packpos,
893 &length,1,PF_INT,PF_COMM))!= MPI_SUCCESS )
894 return(ret);
895/*
896 Unpack the string:
897*/
898 if ( ( ret = MPI_Unpack(PF_packbuf,PF_packsize,&PF_packpos,
899 str,length,PF_BYTE,PF_COMM) ) != MPI_SUCCESS ) return(ret);
900/*
901 Now if str[length-1]=='\0' then the whole string
902 (INCLUDING the trailing zero!) was unpacked ;if not, the rest
903 must be unpacked to str+length.
904*/
905 return(length);
906}
907
908/*
909 #] PF_UnpackString :
910 #[ PF_Send :
911*/
912
933int PF_Send(int to, int tag)
934{
935 int err;
936 err = MPI_Ssend(PF_packbuf, PF_packpos, MPI_PACKED, to, tag, PF_COMM);
938 return 0;
939}
940
941/*
942 #] PF_Send :
943 #[ PF_Receive :
944*/
945
959int PF_Receive(int src, int tag, int *psrc, int *ptag)
960{
961 int err;
962 MPI_Status status;
963 PF_InitPackBuf();
964 err = MPI_Recv(PF_packbuf, PF_packsize, MPI_PACKED, src, tag, PF_COMM, &status);
966 if ( psrc ) *psrc = status.MPI_SOURCE;
967 if ( ptag ) *ptag = status.MPI_TAG;
968 return 0;
969}
970
971/*
972 #] PF_Receive :
973 #[ PF_Broadcast :
974*/
975
995{
996 int err;
997/*
998 * If PF_SHORTBROADCAST is defined, then the broadcasting will be performed in
999 * 2 steps. First, the size of the buffer will be broadcast, then the buffer of
1000 * exactly used size. This should be faster with slow connections, but slower on
1001 * SMP shmem MPI because of the latency.
1002 */
1003#ifdef PF_SHORTBROADCAST
1004 int pos = PF_packpos;
1005#endif
1006 if ( PF.me != MASTER ) {
1007 err = PF_InitPackBuf();
1008 if ( err ) return err;
1009 }
1010#ifdef PF_SHORTBROADCAST
1011 err = MPI_Bcast(&pos, 1, MPI_INT, MASTER, PF_COMM);
1012 MPI_ERRCODE_CHECK(err);
1013 err = MPI_Bcast(PF_packbuf, pos, MPI_PACKED, MASTER, PF_COMM);
1014#else
1015 err = MPI_Bcast(PF_packbuf, PF_packsize, MPI_PACKED, MASTER, PF_COMM);
1016#endif
1017 MPI_ERRCODE_CHECK(err);
1018 return 0;
1019}
1020
1021/*
1022 #] PF_Broadcast :
1023 #] The pack buffer :
1024 #[ Long pack stuff :
1025 #[ Explanations :
1026
1027 The problems here are:
1028 1. We need to send/receive long dollar variables. For
1029 preprocessor-defined dollarvars we used multiply
1030 packing/broadcasting (see parallel.c:PF_BroadcastPreDollar())
1031 since each variable must be broadcast immediately. For run-time
1032 the changed dollar variables, collecting and broadcasting are
1033 performed at the end of the module and all modified dollarvars
1034 are transferred "at once", that is why the size of packed and
1035 transferred buffers may be really very large.
1036 2. There is some strange feature of MPI_Bcast() on Altix MPI
1037 implementation, namely, sometimes it silently fails with big
1038 buffers. For better performance, it would be useful to send one
1039 big buffer instead of several small ones (since the latency is more
1040 important than the bandwidth). That is why we need two different
1041 sets of routines: for long point-to-point communication we collect
1042 big re-allocatable buffer, the corresponding routines have the
1043 prefix PF_longSingle, and for broadcasting we pack data into
1044 several smaller buffers, the corresponding routines have the
1045 prefix PF_longMulti.
1046 Note, from portability reasons we cannot split large packed
1047 buffer into small chunks, send them and collect back on the other
1048 side, see "Advice to users" on page 180 MPI--The Complete Reference
1049 Volume1, second edition.
1050 OPTIMIZING:
1051 We assume, for most communications, the single buffer of size
1052 PF_packsize is enough.
1053
1054 How does it work:
1055 For point-to-point, there is one big re-allocatable
1056 buffer PF_longPackBuf with two integer positions: PF_longPackPos
1057 and PF_longPackTop (due to re-allocatable character of the buffer,
1058 it is better to use integers rather than pointers).
1059 Each time of re-allocation, the size of the buffer
1060 PF_longPackBuf is incremented by the same size of a "standard" chunk
1061 PF_packsize.
1062 For broadcasting there is one linked list (PF_longMultiRoot),
1063 which contains either positions of a chunk of PF_longPackBuf, or
1064 it's own buffer. This is done for better memory utilisation:
1065 longSingle and longMulti are never used simultaneously.
1066 When a new cell is needed for LongMulti packing, we increment
1067 the counter PF_longPackN and just follow the list. If it is not
1068 possible, we allocate the cell's own buffer and link it to the end
1069 of the list PF_longMultiRoot.
1070 When PF_longPackPos is reallocated, we link new chunks into
1071 existing PF_longMultiRoot list before the first longMulti allocated
1072 cell's own buffer. The pointer PF_longMultiLastChunk points to the last
1073 cell of PF_longMultiRoot containing the pointer to the chunk of
1074 PF_longPackBuf.
1075 Initialization PF_longPackBuf is made by the function
1076 PF_longSingleReset(). In the begin of the PF_longPackBuf it packs
1077 the size of the last sent buffer. Upon sending, the program checks,
1078 whether there was at list one re-allocation (PF_longPackN>1) .
1079 If so, the sender first packs and sends small buffer
1080 (PF_longPackSmallBuf) containing one integer number -- the
1081 _negative_ new size of the send buffer. Getting the buffer, a
1082 receiver unpacks one integer and checks whether it is <0 . If so,
1083 the receiver will repeat receiving, but first it checks whether
1084 it has enough buffer and increase it, if necessary.
1085 Initialization PF_longMultiRoot is made by the function
1086 PF_longMultiReset(). In the begin of the first chunk it packs
1087 one integer -- the number 1. Upon sending, the program checks,
1088 how many cells were packed (PF_longPackN). If more than 1, the
1089 sender packs to the next cell the integer PF_longPackN, than
1090 packs PF_longPackN pairs of integers -- the information about how many
1091 times chunk on each cell was accessed by the packing procedure,
1092 this information is contained by the nPacks field of the cell
1093 structure, and how many non-complete items was at the end of this
1094 chunk the structure field lastLen. Then the sender sends first
1095 this auxiliary chunk.
1096 The receiver unpacks the integer from obtained chunk and, if this
1097 integer is more than 1, it gets more chunks, unpacking information
1098 from the first auxiliary chunk into the corresponding nPacks
1099 fields. Unpacking information from multiple chunks, the receiver
1100 knows, when the chunk is expired and it must switch to the next cell,
1101 successively decrementing corresponding nPacks field.
1102
1103 XXX: There are still some flaws:
1104 PF_LongSingleSend/PF_LongSingleReceive may fail, for example, for data
1105 transfers from the master to many slaves. Suppose that the master sends big
1106 data to slaves, which needs an increase of the buffer of the receivers. For
1107 the first data transfer, the master sends the new buffer size as the first
1108 message, and then sends the data as the second message, because
1109 PF_LongSinglePack records the increase of the buffer size on the master. For
1110 the next time, however, the master sends the data without sending the new
1111 buffer size, and then MPI_Recv fails due to the data overflow.
1112 In parallel.c, they are used for the communication from slaves to the
1113 master. In this case, this problem does not occur because the master always
1114 has enough buffer.
1115 The maximum size that PF_LongMultiBroadcast can broadcast is limited to
1116 around 320kB because the current implementation tries to pack all
1117 information of chained buffers into one buffer, whose size is PF_packsize
1118 = 1600B.
1119
1120 #] Explanations :
1121 #[ Variables :
1122*/
1123
1124typedef struct longMultiStruct {
1125 UBYTE *buffer; /* NULL if */
1126 int bufpos; /* if >=0, PF_longPackBuf+bufpos is the chunk start */
1127 int packpos; /* the current position */
1128 int nPacks; /* How many times PF_longPack operates on this cell */
1129 int lastLen; /* if > 0, the last packing didn't fit completely to this
1130 chunk, only lastLen items was packed, the rest is in
1131 the next cell. */
1132 struct longMultiStruct *next; /* next linked cell, or NULL */
1133} PF_LONGMULTI;
1134
1135static UBYTE *PF_longPackBuf = NULL;
1136static void *PF_longPackSmallBuf = NULL;
1137static int PF_longPackPos = 0;
1138static int PF_longPackTop = 0;
1139static PF_LONGMULTI *PF_longMultiRoot = NULL;
1140static PF_LONGMULTI *PF_longMultiTop = NULL;
1141static PF_LONGMULTI *PF_longMultiLastChunk = NULL;
1142static int PF_longPackN = 0;
1143
1144/*
1145 #] Variables :
1146 #[ Long pack private functions :
1147 #[ PF_longMultiNewCell :
1148*/
1149
1150static inline int PF_longMultiNewCell(void)
1151{
1152/*
1153 Allocate a new cell:
1154*/
1155 PF_longMultiTop->next = (PF_LONGMULTI *)
1156 Malloc1(sizeof(PF_LONGMULTI),"PF_longMultiCell");
1157 if ( PF_longMultiTop->next == NULL ) return(-1);
1158/*
1159 Allocate a private buffer:
1160*/
1161 PF_longMultiTop->next->buffer=(UBYTE*)
1162 Malloc1(sizeof(UBYTE)*PF_packsize,"PF_longMultiChunk");
1163 if ( PF_longMultiTop->next->buffer == NULL ) return(-1);
1164/*
1165 For the private buffer position is -1:
1166*/
1167 PF_longMultiTop->next->bufpos = -1;
1168/*
1169 This is the last cell in the chain:
1170*/
1171 PF_longMultiTop->next->next = NULL;
1172/*
1173 packpos and nPacks are not initialized!
1174*/
1175 return(0);
1176}
1177
1178/*
1179 #] PF_longMultiNewCell :
1180 #[ PF_longMultiPack2NextCell :
1181*/
1182static inline int PF_longMultiPack2NextCell(void)
1183{
1184/*
1185 Is there a free cell in the chain?
1186*/
1187 if ( PF_longMultiTop->next == NULL ) {
1188/*
1189 No, allocate the new cell with a private buffer:
1190*/
1191 if ( PF_longMultiNewCell() ) return(-1);
1192 }
1193/*
1194 Move to the next cell in the chain:
1195*/
1196 PF_longMultiTop = PF_longMultiTop->next;
1197/*
1198 if >=0, the cell buffer is the chunk of PF_longPackBuf, initialize it:
1199*/
1200 if ( PF_longMultiTop->bufpos > -1 )
1201 PF_longMultiTop->buffer = PF_longPackBuf+PF_longMultiTop->bufpos;
1202/*
1203 else -- the cell has it's own private buffer.
1204 Initialize the cell fields:
1205*/
1206 PF_longMultiTop->nPacks = 0;
1207 PF_longMultiTop->lastLen = 0;
1208 PF_longMultiTop->packpos = 0;
1209 return(0);
1210}
1211
1212/*
1213 #] PF_longMultiPack2NextCell :
1214 #[ PF_longMultiNewChunkAdded :
1215*/
1216
1217static inline int PF_longMultiNewChunkAdded(int n)
1218{
1219/*
1220 Store the list tail:
1221*/
1222 PF_LONGMULTI *MemCell = PF_longMultiLastChunk->next;
1223 int pos = PF_longPackTop;
1224
1225 while ( n-- > 0 ) {
1226/*
1227 Allocate a new cell:
1228*/
1229 PF_longMultiLastChunk->next = (PF_LONGMULTI *)
1230 Malloc1(sizeof(PF_LONGMULTI),"PF_longMultiCell");
1231 if ( PF_longMultiLastChunk->next == NULL ) return(-1);
1232/*
1233 Update the Last Chunk Pointer:
1234*/
1235 PF_longMultiLastChunk = PF_longMultiLastChunk->next;
1236/*
1237 Initialize the new cell:
1238*/
1239 PF_longMultiLastChunk->bufpos = pos;
1240 pos += PF_packsize;
1241 PF_longMultiLastChunk->buffer = NULL;
1242 PF_longMultiLastChunk->packpos = 0;
1243 PF_longMultiLastChunk->nPacks = 0;
1244 PF_longMultiLastChunk->lastLen = 0;
1245 }
1246/*
1247 Hitch the tail:
1248*/
1249 PF_longMultiLastChunk->next = MemCell;
1250 return(0);
1251}
1252
1253/*
1254 #] PF_longMultiNewChunkAdded :
1255 #[ PF_longCopyChunk :
1256*/
1257
1258static inline void PF_longCopyChunk(int *to, int *from, int n)
1259{
1260 NCOPYI(to,from,n)
1261/* for ( ; n > 0; n-- ) *to++ = *from++; */
1262}
1263
1264/*
1265 #] PF_longCopyChunk :
1266 #[ PF_longAddChunk :
1267
1268 The chunk must be increased by n*PF_packsize.
1269*/
1270
1271static int PF_longAddChunk(int n, int mustRealloc)
1272{
1273 UBYTE *newbuf;
1274 if ( ( newbuf = (UBYTE*)Malloc1(sizeof(UBYTE)*(PF_longPackTop+n*PF_packsize),
1275 "PF_longPackBuf") ) == NULL ) return(-1);
1276/*
1277 Allocate and chain a new cell for longMulti:
1278*/
1279 if ( PF_longMultiNewChunkAdded(n) ) return(-1);
1280/*
1281 Copy the content to the new buffer:
1282*/
1283 if ( mustRealloc ) {
1284 PF_longCopyChunk((int*)newbuf,(int*)PF_longPackBuf,PF_longPackTop/sizeof(int));
1285 }
1286/*
1287 Note, PF_packsize is multiple by sizeof(int) by construction!
1288*/
1289 PF_longPackTop += (n*PF_packsize);
1290/*
1291 Free the old buffer and store the new one:
1292*/
1293 M_free(PF_longPackBuf,"PF_longPackBuf");
1294 PF_longPackBuf = newbuf;
1295/*
1296 Count number of re-allocs:
1297*/
1298 PF_longPackN += n;
1299 return(0);
1300}
1301
1302/*
1303 #] PF_longAddChunk :
1304 #[ PF_longMultiHowSplit :
1305
1306 "count" of "type" elements in an input buffer occupy "bytes" bytes.
1307 We know from the algorithm, that it is too many. How to split
1308 the buffer so that the head fits to rest of a storage buffer?*/
1309static inline int PF_longMultiHowSplit(int count, MPI_Datatype type, int bytes)
1310{
1311 int ret, items, totalbytes;
1312
1313 if ( count < 2 ) return(0); /* Nothing to split */
1314/*
1315 A rest of a storage buffer:
1316*/
1317 totalbytes = PF_packsize - PF_longMultiTop->packpos;
1318/*
1319 Rough estimate:
1320*/
1321 items = (int)((double)totalbytes*count/bytes);
1322/*
1323 Go to the up limit:
1324*/
1325 do {
1326 if ( ( ret = MPI_Pack_size(++items,type,PF_COMM,&bytes) )
1327 !=MPI_SUCCESS ) return(ret);
1328 } while ( bytes < totalbytes );
1329/*
1330 Now the value of "items" is too large
1331 And now evaluate the exact value:
1332*/
1333 do {
1334 if ( ( ret = MPI_Pack_size(--items,type,PF_COMM,&bytes) )
1335 !=MPI_SUCCESS ) return(ret);
1336 if ( items == 0 ) /* Nothing about MPI_Pack_size(0) == 0 in standards! */
1337 return(0);
1338 } while ( bytes > totalbytes );
1339 return(items);
1340}
1341/*
1342 #] PF_longMultiHowSplit :
1343 #[ PF_longPackInit :
1344*/
1345
1346static int PF_longPackInit(void)
1347{
1348 int ret;
1349 PF_longPackBuf = (UBYTE*)Malloc1(sizeof(UBYTE)*PF_packsize,"PF_longPackBuf");
1350 if ( PF_longPackBuf == NULL ) return(-1);
1351/*
1352 PF_longPackTop is not initialized yet, use in as a return value:
1353*/
1354 ret = MPI_Pack_size(1,MPI_INT,PF_COMM,&PF_longPackTop);
1355 if ( ret != MPI_SUCCESS ) return(ret);
1356
1357 PF_longPackSmallBuf =
1358 (void*)Malloc1(sizeof(UBYTE)*PF_longPackTop,"PF_longPackSmallBuf");
1359
1360 PF_longPackTop = PF_packsize;
1361 PF_longMultiRoot =
1362 (PF_LONGMULTI *)Malloc1(sizeof(PF_LONGMULTI),"PF_longMultiRoot");
1363 if ( PF_longMultiRoot == NULL ) return(-1);
1364 PF_longMultiRoot->bufpos = 0;
1365 PF_longMultiRoot->buffer = NULL;
1366 PF_longMultiRoot->next = NULL;
1367 PF_longMultiLastChunk = PF_longMultiRoot;
1368
1369 PF_longPackPos = 0;
1370 PF_longMultiRoot->packpos = 0;
1371 PF_longMultiTop = PF_longMultiRoot;
1372 PF_longPackN = 1;
1373 return(0);
1374}
1375
1376/*
1377 #] PF_longPackInit :
1378 #[ PF_longMultiPreparePrefix :
1379*/
1380
1381static inline int PF_longMultiPreparePrefix(void)
1382{
1383 int ret;
1384 PF_LONGMULTI *thePrefix;
1385 int i = PF_longPackN;
1386/*
1387 Here we have PF_longPackN>1!
1388 New cell (at the list end) to create the auxiliary chunk:
1389*/
1390 if ( PF_longMultiPack2NextCell() ) return(-1);
1391/*
1392 Store the pointer to the chunk we will proceed:
1393*/
1394 thePrefix = PF_longMultiTop;
1395/*
1396 Pack PF_longPackN:
1397*/
1398 ret = MPI_Pack(&(PF_longPackN),
1399 1,
1400 MPI_INT,
1401 thePrefix->buffer,
1402 PF_packsize,
1403 &(thePrefix->packpos),
1404 PF_COMM);
1405 if ( ret != MPI_SUCCESS ) return(ret);
1406/*
1407 And start from the beginning:
1408*/
1409 for ( PF_longMultiTop = PF_longMultiRoot; i > 0; i-- ) {
1410/*
1411 Pack number of Pack hits:
1412*/
1413 ret = MPI_Pack(&(PF_longMultiTop->nPacks),
1414 1,
1415 MPI_INT,
1416 thePrefix->buffer,
1417 PF_packsize,
1418 &(thePrefix->packpos),
1419 PF_COMM);
1420/*
1421 Pack the length of the last fit portion:
1422*/
1423 ret |= MPI_Pack(&(PF_longMultiTop->lastLen),
1424 1,
1425 MPI_INT,
1426 thePrefix->buffer,
1427 PF_packsize,
1428 &(thePrefix->packpos),
1429 PF_COMM);
1430/*
1431 Check the size -- not necessary, MPI_Pack did it.
1432*/
1433 if ( ret != MPI_SUCCESS ) return(ret);
1434/*
1435 Go to the next cell:
1436*/
1437 PF_longMultiTop = PF_longMultiTop->next;
1438 }
1439
1440 PF_longMultiTop = thePrefix;
1441/*
1442 PF_longMultiTop is ready!
1443*/
1444 return(0);
1445}
1446
1447/*
1448 #] PF_longMultiPreparePrefix :
1449 #[ PF_longMultiProcessPrefix :
1450*/
1451
1452static inline int PF_longMultiProcessPrefix(void)
1453{
1454 int ret,i;
1455/*
1456 We have PF_longPackN records packed in PF_longMultiRoot->buffer,
1457 pairs nPacks and lastLen. Loop through PF_longPackN cells,
1458 unpacking these integers into proper fields:
1459*/
1460 for ( PF_longMultiTop = PF_longMultiRoot, i = 0; i < PF_longPackN; i++ ) {
1461/*
1462 Go to th next cell, allocating, when necessary:
1463*/
1464 if ( PF_longMultiPack2NextCell() ) return(-1);
1465/*
1466 Unpack the number of Pack hits:
1467*/
1468 ret = MPI_Unpack(PF_longMultiRoot->buffer,
1469 PF_packsize,
1470 &( PF_longMultiRoot->packpos),
1471 &(PF_longMultiTop->nPacks),
1472 1,
1473 MPI_INT,
1474 PF_COMM);
1475 if ( ret != MPI_SUCCESS ) return(ret);
1476/*
1477 Unpack the length of the last fit portion:
1478*/
1479 ret = MPI_Unpack(PF_longMultiRoot->buffer,
1480 PF_packsize,
1481 &( PF_longMultiRoot->packpos),
1482 &(PF_longMultiTop->lastLen),
1483 1,
1484 MPI_INT,
1485 PF_COMM);
1486 if ( ret != MPI_SUCCESS ) return(ret);
1487 }
1488 return(0);
1489}
1490
1491/*
1492 #] PF_longMultiProcessPrefix :
1493 #[ PF_longSingleReset :
1494*/
1495
1503static inline int PF_longSingleReset(int is_sender)
1504{
1505 int ret;
1506 PF_longPackPos=0;
1507 if ( is_sender ) {
1508 ret = MPI_Pack(&PF_longPackTop,1,MPI_INT,
1509 PF_longPackBuf,PF_longPackTop,&PF_longPackPos,PF_COMM);
1510 if ( ret != MPI_SUCCESS ) return(ret);
1511 PF_longPackN = 1;
1512 }
1513 else {
1514 PF_longPackN=0;
1515 }
1516 return(0);
1517}
1518
1519/*
1520 #] PF_longSingleReset :
1521 #[ PF_longMultiReset :
1522*/
1523
1531static inline int PF_longMultiReset(int is_sender)
1532{
1533 int ret = 0, theone = 1;
1534 PF_longMultiRoot->packpos = 0;
1535 if ( is_sender ) {
1536 ret = MPI_Pack(&theone,1,MPI_INT,
1537 PF_longPackBuf,PF_longPackTop,&(PF_longMultiRoot->packpos),PF_COMM);
1538 PF_longPackN = 1;
1539 }
1540 else {
1541 PF_longPackN = 0;
1542 }
1543 PF_longMultiRoot->nPacks = 0; /* The auxiliary field is not counted */
1544 PF_longMultiRoot->lastLen = 0;
1545 PF_longMultiTop = PF_longMultiRoot;
1546 PF_longMultiRoot->buffer = PF_longPackBuf;
1547 return ret;
1548}
1549
1550/*
1551 #] PF_longMultiReset :
1552 #] Long pack private functions :
1553 #[ PF_PrepareLongSinglePack :
1554*/
1555
1562{
1563 return PF_longSingleReset(1);
1564}
1565
1566/*
1567 #] PF_PrepareLongSinglePack :
1568 #[ PF_LongSinglePack :
1569*/
1570
1579int PF_LongSinglePack(const void *buffer, size_t count, MPI_Datatype type)
1580{
1581 int ret, bytes;
1582 /* XXX: Limited by int size. */
1583 if ( count > INT_MAX ) return -99;
1584 ret = MPI_Pack_size((int)count,type,PF_COMM,&bytes);
1585 if ( ret != MPI_SUCCESS ) return(ret);
1586
1587 while ( PF_longPackPos+bytes > PF_longPackTop ) {
1588 if ( PF_longAddChunk(1, 1) ) return(-1);
1589 }
1590/*
1591 PF_longAddChunk(1, 1) means, the chunk must
1592 be increased by 1 and re-allocated
1593*/
1594 ret = MPI_Pack((void *)buffer,(int)count,type,
1595 PF_longPackBuf,PF_longPackTop,&PF_longPackPos,PF_COMM);
1596 if ( ret != MPI_SUCCESS ) return(ret);
1597 return(0);
1598}
1599
1600/*
1601 #] PF_LongSinglePack :
1602 #[ PF_LongSingleUnpack :
1603*/
1604
1613int PF_LongSingleUnpack(void *buffer, size_t count, MPI_Datatype type)
1614{
1615 int ret;
1616 /* XXX: Limited by int size. */
1617 if ( count > INT_MAX ) return -99;
1618 ret = MPI_Unpack(PF_longPackBuf,PF_longPackTop,&PF_longPackPos,
1619 buffer,(int)count,type,PF_COMM);
1620 if ( ret != MPI_SUCCESS ) return(ret);
1621 return(0);
1622}
1623
1624/*
1625 #] PF_LongSingleUnpack :
1626 #[ PF_LongSingleSend :
1627*/
1628
1650int PF_LongSingleSend(int to, int tag)
1651{
1652 int ret, pos = 0;
1653/*
1654 Note, here we assume that this function couldn't be used
1655 with to == PF_ANY_SOURCE!
1656*/
1657 if ( PF_longPackN > 1 ) {
1658 /* The buffer was incremented, pack send the new size first: */
1659 int tmp = -PF_longPackTop;
1660/*
1661 Negative value means there will be the second buffer
1662*/
1663 ret = MPI_Pack(&tmp, 1,PF_INT,
1664 PF_longPackSmallBuf,PF_longPackTop,&pos,PF_COMM);
1665 if ( ret != MPI_SUCCESS ) return(ret);
1666 ret = MPI_Ssend(PF_longPackSmallBuf,pos,MPI_PACKED,to,tag,PF_COMM);
1667 if ( ret != MPI_SUCCESS ) return(ret);
1668 }
1669 ret = MPI_Ssend(PF_longPackBuf,PF_longPackPos,MPI_PACKED,to,tag,PF_COMM);
1670 if ( ret != MPI_SUCCESS ) return(ret);
1671 return(0);
1672}
1673
1674/*
1675 #] PF_LongSingleSend :
1676 #[ PF_LongSingleReceive :
1677*/
1678
1693int PF_LongSingleReceive(int src, int tag, int *psrc, int *ptag)
1694{
1695 int ret, missed, oncemore;
1696 MPI_Status status;
1697 PF_longSingleReset(0);
1698 do {
1699 ret = MPI_Recv(PF_longPackBuf,PF_longPackTop,MPI_PACKED,src,tag,
1700 PF_COMM,&status);
1701 if ( ret != MPI_SUCCESS ) return(ret);
1702/*
1703 The source and tag must be specified here for the case if
1704 MPI_Recv is performed more than once:
1705*/
1706 src = status.MPI_SOURCE;
1707 tag = status.MPI_TAG;
1708 if ( psrc ) *psrc = status.MPI_SOURCE;
1709 if ( ptag ) *ptag = status.MPI_TAG;
1710/*
1711 Now we got either small buffer with the new PF_longPackTop,
1712 or just a regular chunk.
1713*/
1714 ret = MPI_Unpack(PF_longPackBuf,PF_longPackTop,&PF_longPackPos,
1715 &missed,1,MPI_INT,PF_COMM);
1716 if ( ret != MPI_SUCCESS ) return(ret);
1717
1718 if ( missed < 0 ) { /* The small buffer was received. */
1719 oncemore = 1; /* repeat receiving afterwards */
1720 /* Reallocate the buffer and get the data */
1721 missed = -missed;
1722/*
1723 restore after unpacking small from buffer:
1724*/
1725 PF_longPackPos = 0;
1726 }
1727 else {
1728 oncemore = 0; /* That's all, no repetition */
1729 }
1730 if ( missed > PF_longPackTop ) {
1731 /*
1732 * The room must be increased. We need a re-allocation for the
1733 * case that there is no repetition.
1734 */
1735 if ( PF_longAddChunk( (missed-PF_longPackTop)/PF_packsize, !oncemore ) )
1736 return(-1);
1737 }
1738 } while ( oncemore );
1739 return(0);
1740}
1741
1742/*
1743 #] PF_LongSingleReceive :
1744 #[ PF_PrepareLongMultiPack :
1745*/
1746
1753{
1754 return PF_longMultiReset(1);
1755}
1756
1757/*
1758 #] PF_PrepareLongMultiPack :
1759 #[ PF_LongMultiPackImpl :
1760*/
1761
1771int PF_LongMultiPackImpl(const void*buffer, size_t count, size_t eSize, MPI_Datatype type)
1772{
1773 int ret, items;
1774
1775 /* XXX: Limited by int size. */
1776 if ( count > INT_MAX ) return -99;
1777
1778 ret = MPI_Pack_size((int)count,type,PF_COMM,&items);
1779 if ( ret != MPI_SUCCESS ) return(ret);
1780
1781 if ( PF_longMultiTop->packpos + items <= PF_packsize ) {
1782 ret = MPI_Pack((void *)buffer,(int)count,type,PF_longMultiTop->buffer,
1783 PF_packsize,&(PF_longMultiTop->packpos),PF_COMM);
1784 if ( ret != MPI_SUCCESS ) return(ret);
1785 PF_longMultiTop->nPacks++;
1786 return(0);
1787 }
1788/*
1789 The data do not fit to the rest of the buffer.
1790 There are two possibilities here: go to the next cell
1791 immediately, or first try to pack some portion. The function
1792 PF_longMultiHowSplit() returns the number of items could be
1793 packed in the end of the current cell:
1794*/
1795 if ( ( items = PF_longMultiHowSplit((int)count,type,items) ) < 0 ) return(items);
1796
1797 if ( items > 0 ) { /* store the head */
1798 ret = MPI_Pack((void *)buffer,items,type,PF_longMultiTop->buffer,
1799 PF_packsize,&(PF_longMultiTop->packpos),PF_COMM);
1800 if ( ret != MPI_SUCCESS ) return(ret);
1801 PF_longMultiTop->nPacks++;
1802 PF_longMultiTop->lastLen = items;
1803 }
1804/*
1805 Now the rest should be packed to the new cell.
1806 Slide to the new cell:
1807*/
1808 if ( PF_longMultiPack2NextCell() ) return(-1);
1809 PF_longPackN++;
1810/*
1811 Pack the rest to the next cell:
1812*/
1813 return(PF_LongMultiPackImpl((char *)buffer+items*eSize,count-items,eSize,type));
1814}
1815
1816/*
1817 #] PF_LongMultiPackImpl :
1818 #[ PF_LongMultiUnpackImpl :
1819*/
1820
1830int PF_LongMultiUnpackImpl(void *buffer, size_t count, size_t eSize, MPI_Datatype type)
1831{
1832 int ret;
1833
1834 /* XXX: Limited by int size. */
1835 if ( count > INT_MAX ) return -99;
1836
1837 if ( PF_longPackN < 2 ) { /* Just unpack the buffer from the single cell */
1838 ret = MPI_Unpack(
1839 PF_longMultiTop->buffer,
1840 PF_packsize,
1841 &(PF_longMultiTop->packpos),
1842 buffer,
1843 count,type,PF_COMM);
1844 if ( ret != MPI_SUCCESS ) return(ret);
1845 return(0);
1846 }
1847/*
1848 More than one cell is in use.
1849*/
1850 if ( ( PF_longMultiTop->nPacks > 1 ) /* the cell is not expired */
1851 || /* The last cell contains exactly required portion: */
1852 ( ( PF_longMultiTop->nPacks == 1 ) && ( PF_longMultiTop->lastLen == 0 ) )
1853 ) { /* Just unpack the buffer from the current cell */
1854 ret = MPI_Unpack(
1855 PF_longMultiTop->buffer,
1856 PF_packsize,
1857 &(PF_longMultiTop->packpos),
1858 buffer,
1859 count,type,PF_COMM);
1860 if ( ret != MPI_SUCCESS ) return(ret);
1861 (PF_longMultiTop->nPacks)--;
1862 return(0);
1863 }
1864 if ( ( PF_longMultiTop->nPacks == 1 ) && ( PF_longMultiTop->lastLen != 0 ) ) {
1865/*
1866 Unpack the head:
1867*/
1868 ret = MPI_Unpack(
1869 PF_longMultiTop->buffer,
1870 PF_packsize,
1871 &(PF_longMultiTop->packpos),
1872 buffer,
1873 PF_longMultiTop->lastLen,type,PF_COMM);
1874 if ( ret != MPI_SUCCESS ) return(ret);
1875/*
1876 Decrement the counter by read items:
1877*/
1878 count -= PF_longMultiTop->lastLen;
1879 if ( count <= 0 ) return(-1); /*Something is wrong! */
1880/*
1881 Shift the output buffer position:
1882*/
1883 buffer = (char *)buffer + PF_longMultiTop->lastLen * eSize;
1884 (PF_longMultiTop->nPacks)--;
1885 }
1886/*
1887 Here PF_longMultiTop->nPacks == 0
1888*/
1889 if ( ( PF_longMultiTop = PF_longMultiTop->next ) == NULL ) return(-1);
1890 return(PF_LongMultiUnpackImpl(buffer,count,eSize,type));
1891}
1892
1893/*
1894 #] PF_LongMultiUnpackImpl :
1895 #[ PF_LongMultiBroadcast :
1896*/
1897
1917{
1918 int ret, i;
1919
1920 if ( PF.me == MASTER ) {
1921/*
1922 PF_longPackN is the number of packed chunks. If it is more
1923 than 1, we have to pack a new one and send it first
1924*/
1925 if ( PF_longPackN > 1 ) {
1926 if ( PF_longMultiPreparePrefix() ) return(-1);
1927 ret = MPI_Bcast((void*)PF_longMultiTop->buffer,
1928 PF_packsize,MPI_PACKED,MASTER,PF_COMM);
1929 if ( ret != MPI_SUCCESS ) return(ret);
1930/*
1931 PF_longPackN was not incremented by PF_longMultiPreparePrefix()!
1932*/
1933 }
1934/*
1935 Now we start from the beginning:
1936*/
1937 PF_longMultiTop = PF_longMultiRoot;
1938/*
1939 Just broadcast all the chunks:
1940*/
1941 for ( i = 0; i < PF_longPackN; i++ ) {
1942 ret = MPI_Bcast((void*)PF_longMultiTop->buffer,
1943 PF_packsize,MPI_PACKED,MASTER,PF_COMM);
1944 if ( ret != MPI_SUCCESS ) return(ret);
1945 PF_longMultiTop = PF_longMultiTop->next;
1946 }
1947 return(0);
1948 }
1949/*
1950 else - the slave
1951*/
1952 PF_longMultiReset(0);
1953/*
1954 Get the first chunk; it can be either the only data chunk, or
1955 an auxiliary chunk, if the data do not fit the single chunk:
1956*/
1957 ret = MPI_Bcast((void*)PF_longMultiRoot->buffer,
1958 PF_packsize,MPI_PACKED,MASTER,PF_COMM);
1959 if ( ret != MPI_SUCCESS ) return(ret);
1960
1961 ret = MPI_Unpack((void*)PF_longMultiRoot->buffer,
1962 PF_packsize,
1963 &(PF_longMultiRoot->packpos),
1964 &PF_longPackN,1,MPI_INT,PF_COMM);
1965 if ( ret != MPI_SUCCESS ) return(ret);
1966/*
1967 Now in PF_longPackN we have the number of cells used
1968 for broadcasting. If it is >1, then we have to allocate
1969 enough cells, initialize them and receive all the chunks.
1970*/
1971 if ( PF_longPackN < 2 ) /* That's all, the single chunk is received. */
1972 return(0);
1973/*
1974 Here we have to get PF_longPackN chunks. But, first,
1975 initialize cells by info from the received auxiliary chunk.
1976*/
1977 if ( PF_longMultiProcessPrefix() ) return(-1);
1978/*
1979 Now we have free PF_longPackN cells, starting
1980 from PF_longMultiRoot->next, with properly initialized
1981 nPacks and lastLen fields. Get chunks:
1982*/
1983 for ( PF_longMultiTop = PF_longMultiRoot->next, i = 0; i < PF_longPackN; i++ ) {
1984 ret = MPI_Bcast((void*)PF_longMultiTop->buffer,
1985 PF_packsize,MPI_PACKED,MASTER,PF_COMM);
1986 if ( ret != MPI_SUCCESS ) return(ret);
1987 if ( i == 0 ) { /* The first chunk, it contains extra "1". */
1988 int tmp;
1989/*
1990 Extract this 1 into tmp and forget about it.
1991*/
1992 ret = MPI_Unpack((void*)PF_longMultiTop->buffer,
1993 PF_packsize,
1994 &(PF_longMultiTop->packpos),
1995 &tmp,1,MPI_INT,PF_COMM);
1996 if ( ret != MPI_SUCCESS ) return(ret);
1997 }
1998 PF_longMultiTop = PF_longMultiTop->next;
1999 }
2000/*
2001 multiUnPack starts with PF_longMultiTop, skip auxiliary chunk in
2002 PF_longMultiRoot:
2003*/
2004 PF_longMultiTop = PF_longMultiRoot->next;
2005 return(0);
2006}
2007
2008/*
2009 #] PF_LongMultiBroadcast :
2010 #] Long pack stuff :
2011*/
int PF_RecvWbuf(WORD *b, LONG *s, int *src)
Definition mpi.c:342
int PF_LongSingleReceive(int src, int tag, int *psrc, int *ptag)
Definition mpi.c:1693
int PF_PackString(const UBYTE *str)
Definition mpi.c:818
int PF_LibInit(int *argcp, char ***argvp)
Definition mpi.c:128
int PF_LongSingleSend(int to, int tag)
Definition mpi.c:1650
int PF_PrepareLongSinglePack(void)
Definition mpi.c:1561
int PF_Unpack(void *buffer, size_t count, MPI_Datatype type)
Definition mpi.c:783
int PF_IRecvRbuf(PF_BUFFER *r, int bn, int from)
Definition mpi.c:378
int PF_Receive(int src, int tag, int *psrc, int *ptag)
Definition mpi.c:959
int PF_Send(int to, int tag)
Definition mpi.c:933
int PF_Reduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype type, MPI_Op op, int root)
Definition mpi.c:475
LONG PF_RealTime(int i)
Definition mpi.c:106
int PF_PreparePack(void)
Definition mpi.c:736
int PF_RawIsend(int dest, const void *buf, int count, MPI_Datatype type, int tag, MPI_Request *request)
Definition mpi.c:574
int PF_LongSingleUnpack(void *buffer, size_t count, MPI_Datatype type)
Definition mpi.c:1613
LONG PF_RawRecv(int *src, void *buf, LONG thesize, int *tag)
Definition mpi.c:518
int PF_Pack(const void *buffer, size_t count, MPI_Datatype type)
Definition mpi.c:754
int PF_PrepareLongMultiPack(void)
Definition mpi.c:1752
int PF_RawWaitAll(int count, MPI_Request *request, MPI_Status *status)
Definition mpi.c:594
int PF_LongMultiPackImpl(const void *buffer, size_t count, size_t eSize, MPI_Datatype type)
Definition mpi.c:1771
int PF_RawProbe(int *src, int *tag, int *bytesize)
Definition mpi.c:542
int PF_Broadcast(void)
Definition mpi.c:994
int PF_LongMultiBroadcast(void)
Definition mpi.c:1916
int PF_UnpackString(UBYTE *str)
Definition mpi.c:886
int PF_PrintPackBuf(char *s, int size)
Definition mpi.c:707
int PF_Discard(int *src, int *tag)
Definition mpi.c:615
int PF_RawSend(int dest, void *buf, LONG l, int tag)
Definition mpi.c:497
int PF_LongMultiUnpackImpl(void *buffer, size_t count, size_t eSize, MPI_Datatype type)
Definition mpi.c:1830
#define MPI_ERRCODE_CHECK(err)
Definition mpi.c:89
int PF_LongSinglePack(const void *buffer, size_t count, MPI_Datatype type)
Definition mpi.c:1579
int PF_Bcast(void *buffer, int count)
Definition mpi.c:452
int PF_ISendSbuf(int to, int tag)
Definition mpi.c:266
int PF_Probe(int *src)
Definition mpi.c:235
int PF_WaitRbuf(PF_BUFFER *r, int bn, LONG *size)
Definition mpi.c:412
int PF_LibTerminate(int error)
Definition mpi.c:214
void PF_ReceiveRuntimeError(void)
Definition parallel.c:4820