Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the option to skip into the raw data and only read a set number of time steps #2

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 71 additions & 29 deletions cdmt.cu
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
#include <helper_cuda.h>
#include <getopt.h>
#include <hdf5.h>
#include <limits.h>

#define HEADERSIZE 4096
#define DMCONSTANT 2.41e-10

// Struct for header information
struct header {
int64_t headersize,buffersize;
unsigned int nchan,nsamp,nbit,nif,nsub;
unsigned int nchan,nsamp,nbit=8,nif,nsub;
int machine_id,telescope_id,nbeam,ibeam,sumif;
double tstart,tsamp,fch1,foff,fcen,bwchan;
double src_raj,src_dej,az_start,za_start;
Expand All @@ -44,7 +45,7 @@ void write_filterbank_header(struct header h,FILE *file);
// Usage
void usage()
{
printf("cdmt -P <part> -d <DM start,step,num> -D <GPU device> -b <ndec> -N <forward FFT size> -n <overlap region> -o <outputname> <file.h5>\n\n");
printf("cdmt -P <part> -d <DM start,step,num> -D <GPU device> -b <ndec> -N <forward FFT size> -n <overlap region> -r <time steps to process> -s <time steps to skip> -o <outputname> <file.h5>\n\n");
printf("Compute coherently dedispersed SIGPROC filterbank files from LOFAR complex voltage data in HDF5 format.\n");
printf("-P <part> Specify part number for input file [integer, default: 0]\n");
printf("-D <GPU device> Select GPU device [integer, default: 0]\n");
Expand All @@ -53,7 +54,8 @@ void usage()
printf("-o <outputname> Output filename [default: cdmt]\n");
printf("-N <forward FFT size> Forward FFT size [integer, default: 65536]\n");
printf("-n <overlap region> Overlap region [integer, default: 2048]\n");

printf("-s <bytes> Number of time samples to skip in the filterbank before stating processing [integer, default: 0]\n");
printf("-r <bytes> Number of time samples to read in total from the -s offset [integer, default: length of file]\n");
return;
}

Expand All @@ -75,15 +77,18 @@ int main(int argc,char *argv[])
float *dm,*ddm,dm_start,dm_step;
char fname[128],fheader[1024],*h5fname,obsid[128]="cdmt";
int bytes_read;
long int ts_read=LONG_MAX,ts_skip=0;
long int total_ts_read=0,bytes_skip=0;
int part=0,device=0;
int arg=0;
FILE **outfile;
double timeInSeconds;

// Read options
if (argc>1) {
while ((arg=getopt(argc,argv,"P:d:D:ho:b:N:n:"))!=-1) {
while ((arg=getopt(argc,argv,"P:d:D:ho:b:N:n:r:s:"))!=-1) {
switch (arg) {

case 'n':
noverlap=atoi(optarg);
break;
Expand All @@ -99,25 +104,34 @@ int main(int argc,char *argv[])
case 'o':
strcpy(obsid,optarg);
break;

case 'P':
part=atoi(optarg);
break;

case 'D':
device=atoi(optarg);
break;

case 'd':
sscanf(optarg,"%f,%f,%d",&dm_start,&dm_step,&ndm);
break;

case 's':
ts_skip=atol(optarg);
break;

case 'r':
ts_read=atol(optarg);
break;

case 'h':
usage();
return 0;
}
}
} else {
printf("Unknown option '%c'\n", arg);
usage();
return 0;
}
Expand All @@ -126,9 +140,19 @@ int main(int argc,char *argv[])
// Read HDF5 header
h5=read_h5_header(h5fname);

// Handle skip flag
if (ts_skip > 0) {
// If it's not initialised by default...
if (h5.nbit == 0) h5.nbit = 8;

bytes_skip = (long int) (ts_skip * (float) h5.nsub * (float) h5.nbit / 8.0);
// Account for the difference in time in the new header if we skip bytes // tstart = MJD, tsamp = seconds, 1 byte = 8 bits = 1 sample per file by default
h5.tstart += (double) ts_skip * h5.tsamp / 86400.0;
}

// Set number of subbands
nsub=h5.nsub;
double timeOffset = h5.tsamp / nsub;

// Adjust header for filterbank format
h5.tsamp*=nchan*ndec;
Expand All @@ -151,51 +175,55 @@ int main(int argc,char *argv[])
// Set device
checkCudaErrors(cudaSetDevice(device));

// DMcK: cuFFT docs say it's best practice to plan before allocating memory
// cuda-memcheck fails initialisation before this block is run?
// Generate FFT plan (batch in-place forward FFT)
idist=nbin; odist=nbin; iembed=nbin; oembed=nbin; istride=1; ostride=1;
checkCudaErrors(cufftPlanMany(&ftc2cf,1,&nbin,&iembed,istride,idist,&oembed,ostride,odist,CUFFT_C2C,nfft*nsub));
cudaDeviceSynchronize();

// Generate FFT plan (batch in-place backward FFT)
idist=mbin; odist=mbin; iembed=mbin; oembed=mbin; istride=1; ostride=1;
checkCudaErrors(cufftPlanMany(&ftc2cb,1,&mbin,&iembed,istride,idist,&oembed,ostride,odist,CUFFT_C2C,nchan*nfft*nsub));
cudaDeviceSynchronize();

// Allocate memory for complex timeseries
checkCudaErrors(cudaMalloc((void **) &cp1,sizeof(cufftComplex)*nbin*nfft*nsub));
checkCudaErrors(cudaMalloc((void **) &cp2,sizeof(cufftComplex)*nbin*nfft*nsub));
checkCudaErrors(cudaMalloc((void **) &cp1p,sizeof(cufftComplex)*nbin*nfft*nsub));
checkCudaErrors(cudaMalloc((void **) &cp2p,sizeof(cufftComplex)*nbin*nfft*nsub));
checkCudaErrors(cudaMalloc((void **) &cp1, (size_t) sizeof(cufftComplex)*nbin*nfft*nsub));
checkCudaErrors(cudaMalloc((void **) &cp2, (size_t) sizeof(cufftComplex)*nbin*nfft*nsub));
checkCudaErrors(cudaMalloc((void **) &cp1p,(size_t) sizeof(cufftComplex)*nbin*nfft*nsub));
checkCudaErrors(cudaMalloc((void **) &cp2p,(size_t) sizeof(cufftComplex)*nbin*nfft*nsub));

// Allocate device memory for chirp
checkCudaErrors(cudaMalloc((void **) &dc,sizeof(cufftComplex)*nbin*nsub*ndm));
checkCudaErrors(cudaMalloc((void **) &dc, (size_t) sizeof(cufftComplex)*nbin*nsub*ndm));

// Allocate device memory for block sums
checkCudaErrors(cudaMalloc((void **) &bs1,sizeof(float)*mblock*mchan));
checkCudaErrors(cudaMalloc((void **) &bs2,sizeof(float)*mblock*mchan));
checkCudaErrors(cudaMalloc((void **) &bs1, (size_t) sizeof(float)*mblock*mchan));
checkCudaErrors(cudaMalloc((void **) &bs2, (size_t) sizeof(float)*mblock*mchan));

// Allocate device memory for channel averages and standard deviations
checkCudaErrors(cudaMalloc((void **) &zavg,sizeof(float)*mchan));
checkCudaErrors(cudaMalloc((void **) &zstd,sizeof(float)*mchan));
checkCudaErrors(cudaMalloc((void **) &zavg, (size_t) sizeof(float)*mchan));
checkCudaErrors(cudaMalloc((void **) &zstd, (size_t) sizeof(float)*mchan));

// Allocate memory for redigitized output and header
header=(char *) malloc(sizeof(char)*HEADERSIZE);
for (i=0;i<4;i++) {
h5buf[i]=(char *) malloc(sizeof(char)*nsamp*nsub);
checkCudaErrors(cudaMalloc((void **) &dh5buf[i],sizeof(char)*nsamp*nsub));
checkCudaErrors(cudaMalloc((void **) &dh5buf[i], (size_t) sizeof(char)*nsamp*nsub));
}

// Allocate output buffers
fbuf=(float *) malloc(sizeof(float)*nsamp*nsub);
checkCudaErrors(cudaMalloc((void **) &dfbuf,sizeof(float)*nsamp*nsub));
checkCudaErrors(cudaMalloc((void **) &dfbuf, (size_t) sizeof(float)*nsamp*nsub));
cbuf=(unsigned char *) malloc(sizeof(unsigned char)*msamp*mchan/ndec);
checkCudaErrors(cudaMalloc((void **) &dcbuf,sizeof(unsigned char)*msamp*mchan/ndec));
checkCudaErrors(cudaMalloc((void **) &dcbuf, (size_t) sizeof(unsigned char)*msamp*mchan/ndec));

// Allocate DMs and copy to device
dm=(float *) malloc(sizeof(float)*ndm);
for (idm=0;idm<ndm;idm++)
dm[idm]=dm_start+(float) idm*dm_step;
checkCudaErrors(cudaMalloc((void **) &ddm,sizeof(float)*ndm));
checkCudaErrors(cudaMalloc((void **) &ddm, (size_t) sizeof(float)*ndm));
checkCudaErrors(cudaMemcpy(ddm,dm,sizeof(float)*ndm,cudaMemcpyHostToDevice));

// Generate FFT plan (batch in-place forward FFT)
idist=nbin; odist=nbin; iembed=nbin; oembed=nbin; istride=1; ostride=1;
checkCudaErrors(cufftPlanMany(&ftc2cf,1,&nbin,&iembed,istride,idist,&oembed,ostride,odist,CUFFT_C2C,nfft*nsub));

// Generate FFT plan (batch in-place backward FFT)
idist=mbin; odist=mbin; iembed=mbin; oembed=mbin; istride=1; ostride=1;
checkCudaErrors(cufftPlanMany(&ftc2cb,1,&mbin,&iembed,istride,idist,&oembed,ostride,odist,CUFFT_C2C,nchan*nfft*nsub));

// Compute chirp
blocksize.x=32; blocksize.y=32; blocksize.z=1;
gridsize.x=nsub/blocksize.x+1; gridsize.y=nchan/blocksize.y+1; gridsize.z=ndm/blocksize.z+1;
Expand Down Expand Up @@ -226,6 +254,10 @@ int main(int argc,char *argv[])
// Read files
for (i=0;i<4;i++) {
rawfile[i]=fopen(h5.rawfname[i],"r");
if (bytes_skip > 0) {
fseek(rawfile[i],bytes_skip,SEEK_SET);
printf("Skipping to timestep %ld (byte %ld ftell %ld)\n", ts_skip, bytes_skip, ftell(rawfile[i]));
}
}

// Loop over input file contents
Expand All @@ -234,8 +266,13 @@ int main(int argc,char *argv[])
startclock=clock();
for (i=0;i<4;i++)
nread=fread(h5buf[i],sizeof(char),nsamp*nsub,rawfile[i])/nsub;
if (nread==0)
if (nread==0) {
printf("No data read from last file; assuming EOF, finishng up.\n");
break;
}

// Count up the total bytes read
total_ts_read += nread;
printf("Block: %d: Read %d MB in %.2f s\n",iblock,sizeof(char)*nread*nsub*4/(1<<20),(float) (clock()-startclock)/CLOCKS_PER_SEC);

// Copy buffers to device
Expand Down Expand Up @@ -305,6 +342,11 @@ int main(int argc,char *argv[])
fwrite(cbuf,sizeof(char),nread*nsub/ndec,outfile[idm]);
}
printf("Processed %d DMs in %.2f s\n",ndm,(float) (clock()-startclock)/CLOCKS_PER_SEC);
timeInSeconds = (double) ftell(rawfile[0]) * timeOffset;
printf("Current file position: %02ld:%02ld:%05.2lf (%1.2lfs)\n\n", (long int) (timeInSeconds / 3600.0), (long int) ((fmod(timeInSeconds, 3600.0)) / 60.0), fmod(timeInSeconds, 60.0), timeInSeconds);
// Exit when we pass the read length limit
if (total_ts_read > ts_read)
break;
}

// Close files
Expand Down