Run only one function in parallel with pvbatch

I try to use pvbatch to optimize my post-processing code. In fact one method function of the code consists to slice a square into several squares keeping associated data. I wrote a multi threads version of this function with MPI but I can’t correctly run the script. For instance if I execute with the command “mpiexec -n 4 pvbatch /path/to/”, the script runs correctly but not the parallel function : it runs only with the process of rank 0… and if I execute with “mpiexec -n 4 pvbatch --symmetric /path/to/”, the execution gives an error because the entire script is reading by each process.

How to proceed in order to run in parallel only the MPI function during the script execution ?

Thanks you

ParaView Python scripts cannot really be mixed with multithreaded algorithms. If you can post a short snippet, it’ll be clearer what you’re trying to do.

Sorry, I am a beginner with paraview. I want to parallelize the clipping of my square domain like this :

def clipCellsMPI(self):

    # Slice the domain into NbCell^2 cells
    # set cell centres, boundaries, and the attributes
        from  mpi4py import MPI
        comm = MPI.COMM_WORLD 
        size = comm.Get_size()
        rank = comm.Get_rank()
        N = self.__Par.getNbCell() # number of cells along each axis, ie: NbCell**2 cells in the domain
        Pts = self.__PtsL[0]
        dl = (np.max(Pts[:,0])-np.min(Pts[:,0]))*(1.0/N) # cell length
        XRawData = self.__XRawDataL[0]
        YRawData = self.__YRawDataL[0]
        # synchronize processes before deleting
        # set the job for each process
        if rank ==0:
            nwork = N//size # number of job per process when N is divisible by size
            shape_sub_arrays = np.ones(size,dtype=int)*(nwork) # final number of job per process
            start_ind = np.zeros(size,dtype=int) # index to know cells to slice per process (in linear mapping)
            # set shape_sub_arrays and start_ind arrays
            if(nwork*size != N):
                for i in range(N-nwork*size):
                    shape_sub_arrays[i] += 1
            for i in range(size):
                start_ind[i] = np.sum(shape_sub_arrays[:i])
                print("shape of sub arrays :",shape_sub_arrays)
                print("start indice :",start_ind)
        else :
            shape_sub_arrays = None
            start_ind = None
        # send to each process the job to do
        local_start_ind = comm.scatter(start_ind, root=0)
        local_array_shape = comm.scatter(shape_sub_arrays, root=0)
        # set cell indices in linear mapping for each process
        local_cell_ind = np.arange(local_start_ind,local_start_ind+local_array_shape,1)
        # loop on local cells to clip
        localXRawDataL = []
        localYRawDataL = []
        localnbPtsL = []
        localPtsL = []
        localXPL = []
        localYPL = []
        localXVelocityL = []
        localYVelocityL = []
        for ind in local_cell_ind :
            i = ind//N # indice row in matrix mapping
            j = N - i # indice column in matrix mapping
            Xcell = Clip(Input=XRawData)
            Ycell = Clip(Input=YRawData)
            Xcell.Scalars=['POINTS' , 'PR']
            Ycell.Scalars=['POINTS' , 'PR']
            Xcell.ClipType.UseReferenceBounds = 1
            Ycell.ClipType.UseReferenceBounds = 1
            l = self.__Par.getl()
            Xcell.ClipType.Bounds = [i*dl/l, (i+1)*dl/l, j*dl/l, (j+1)*dl/l, 0.0, 0.0]
            Ycell.ClipType.Bounds = [i*dl/l, (i+1)*dl/l, j*dl/l, (j+1)*dl/l, 0.0, 0.0]
            Xcell.ClipType.Length = [l,l,l]
            Ycell.ClipType.Length = [l,l,l]
            # Updating the attributes of the PostProcessing class
            XFetchedData = servermanager.Fetch(Xcell)
            YFetchedData = servermanager.Fetch(Ycell)
            nbPts = XFetchedData.GetNumberOfPoints()
        # results gathering
        XRawDataL = comm.gather(localXRawDataL, root=0)
        YRawDataL = comm.gather(localYRawDataL, root=0)
        nbPtsL = comm.gather(localnbPtsL, root=0)
        PtsL = comm.gather(localPtsL, root=0)
        XPL = comm.gather(localXPL, root=0)
        YPL = comm.gather(localYPL, root=0)
        XVelocityL = comm.gather(localXVelocityL, root=0)
        YVelocityL = comm.gather(localYVelocityL, root=0)
        #result = comm.gather(results, root=0)
        if rank == 0:
            # Computing CellBoundaries
            CellBoundariesX = np.arange(0,N+1,1).reshape(N+1,1)*np.ones((1,N+1))*dl
            CellBoundariesY = np.arange(0,N+1,1)*np.ones((N+1,1))*dl
            CellCentres = np.zeros((N*N,2))
            for j in range(N):
                # Computing the CellCentre
                CellCentres[j::N] = (np.array([np.ones(N)*j,np.arange(0,N,1)]).T+0.5)*dl
            self.__CellCentres = CellCentres
            self.__CellBoundariesX = CellBoundariesX.T
            self.__CellBoundariesY = CellBoundariesY.T
            self.__tag = 'Divided'
            self.__XRawDataL = [array for sublist in XRawDataL for array in sublist] # store in the good format values
            self.__YRawDataL = [array for sublist in YRawDataL for array in sublist]
            self.__nbPtsL = [array for sublist in nbPtsL for array in sublist]
            self.__PtsL = [array for sublist in PtsL for array in sublist]
            self.__XPL = [array for sublist in XPL for array in sublist]
            self.__YPL = [array for sublist in YPL for array in sublist]
            self.__XVelocityL = [array for sublist in XVelocityL for array in sublist]
            self.__YVelocityL = [array for sublist in YVelocityL for array in sublist]

That’s definitely not the way. From what I gather, your code needs to be placed in a Programmable Filter or PythonAlgorithm. But it’s unclear to me where your input data is coming from. I’d suspect that also need to become a data producer algorithm.

Alternatively, you may want to using VTK directly with MPI using pvtkpython.

Input data is coming from a vtk file with these commands :
data = XMLUnstructuredGridReader(file.vtk)
XRawData = servermanager.Fetch(data)
and after that I can extract lists of interest with for example :
“numpy_support.vtk_to_numpy(XFetchedData.GetPoints().GetData())” to take points list.

Previously the sequential version was used with pvpython an it did the “for ind in local_cell_ind” with two for loop on i in range(N) and j in range(N) but the computation time of this function was as long as the solver part… That’s why I’m trying to improve this function.

Can I do a multi-clip with a paraview command ? I have some difficulties to find examples or documentation on the subject.

Can I do a multi-clip with a paraview command ?

Here’s a simple example:

reader = OpenDataFile("file.vtk")
# VTK files are not distributed, and hence even when running with MPI, only rank 0 will have
# all the data. You need to apply a filter to redistribute the data.
redistributor = D3(Input=reader)
# if you're using ParaView 5.8 or newer, you can use the newer implementation of this `D3` filter
# called `RedistributeDataSet` instead
# redistributor = RedistributeDataSet(Input=reader)

# lets clip
clip = Clip(Input=redistributor)
# setup properties are appropriate
# The above will cause the clip to execute. If you ran this script using
#  mpirun -np [> 1] pvbatch .... , then the Clip filter will execute on all the ranks
# with each rank operating only on a part of the dataset.

Hope that helps.

If I understand correctly, the proposed solution allows to make a clip by sharing the input datafile into X processes which realize all together the clip, isn’t it ?
So I set reader and redistributor before clip loops, in loops (over i and j) I set the clip_ij with Clip(Input=redistributor) and at the end of the clip_ij I use UpdatePipeline(clip) to put together clip data. That’s right ?

But when I executed with mpiexec -n 4 pvbatch, I have the following error :
TypeError: UpdatePipeline argument 1: must be real number, not Clip

I do not follow. What is this i/j loop?

I had a typo in the script. I’ve updated it. Use UpdatePipeline(proxy=clip).

Since my try of parallel version was bad, I restart with the sequential and your advices like this :

        Xreader = OpenDataFile(Xfilename)
        Yreader = OpenDataFile(Yfilename)
        Xredistributor = RedistributeDataSet(Input=Xreader)
        Yredistributor = RedistributeDataSet(Input=Yreader)
        for j in range(N):
            for i in range(N):
                # Computing the Clip to build the cells
                Xcell = Clip(Input=Xredistributor)
                Ycell = Clip(Input=Yredistributor)
                Xcell.Scalars=['POINTS' , 'PR']
                Ycell.Scalars=['POINTS' , 'PR']
                Xcell.ClipType.UseReferenceBounds = 1
                Ycell.ClipType.UseReferenceBounds = 1
                l = self.__Par.getl()
                Xcell.ClipType.Bounds = [i*dl/l, (i+1)*dl/l, j*dl/l, (j+1)*dl/l, 0.0, 0.0]
                Ycell.ClipType.Bounds = [i*dl/l, (i+1)*dl/l, j*dl/l, (j+1)*dl/l, 0.0, 0.0]
                Xcell.ClipType.Length = [l,l,l]
                Ycell.ClipType.Length = [l,l,l]
                XFetchedData = servermanager.Fetch(Xcell)
                YFetchedData = servermanager.Fetch(Ycell)
                nbPts = XFetchedData.GetNumberOfPoints()

Indices i and j are the position of the subsquare in the entire square domain (like a matrix).

When I ran the previous code, I had an another error, the clip seems empty :
in vtk_to_numpy typ = vtk_array.GetDataType()
AttributeError: ‘NoneType’ object has no attribute ‘GetDataType’

If you’re looping over your entire domain, then no, that is not the way.

You need to write a new VTK algorithm i.e vtkAlgorithm subclass to do it. ParaView Python scripts are primarily intended to setup the pipeline. Actual data processing happens in VTK algorithms.

There are ways to write VTK algorithms in Python (see links referenced earlier).

Here are some pointers:

  • I’d start simple. Start by figuring out how to write a simple VTK algorithm that takes in 2 inputs and iterates over cells.
  • Next, update this filter to use vtkTableBasedClipDataSet or vtkBoxClipDataSet to clip and implement your algorithm in non-distributed fashion.
  • In distributed mode, however, things get quite complicated especially since you cannot assume all ranks have matching input data bounds. At that point, I’d probably write a C++ filter.