Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is the curve of my permutation test analysis not smooth?

I am using a permutation test (pulling random sub-samples) to test the difference between 2 experiments. Each experiment was carried out 100 times (=100 replicas of each). Each replica consists of 801 measurement points over time. Now I would like to perform a kind of permutation (or boot strapping) in order to test how many replicas per experiment (and how many (time) measurement points) I need to obtain a certain reliability level.

For this purpose I have written a code from which I have extracted the minimal working example (with lots of things hard-coded) (please see below). The input data is generated as random numbers. Here np.random.rand(100, 801) for 100 replicas and 801 time points.

This code works in principle however the produced curves are sometimes not smoothly falling as one would expect if choosing random sub-samples for 5000 times. Here is the output of the code below:

Result of the code below

It can be seen that at 2 of the x-axis there is a peak up which should not be there. If I change the random seed from 52389 to 324235 it is gone and the curve is smooth. It seems there is something wrong with the way the random numbers are chosen?

Why is this the case? I have the semantically similar code in Matlab and there the curves are completely smooth at already 1000 permutations (here 5000).

Do I have a coding mistake or is the numpy random number generator not good?

Does anyone see the problem here?

import matplotlib.pyplot as plt
import numpy as np
from multiprocessing import current_process, cpu_count, Process, Queue
import matplotlib.pylab as pl


def groupDiffsInParallel (queue, d1, d2, nrOfReplicas, nrOfPermuts, timesOfInterestFramesIter):

    allResults = np.zeros([nrOfReplicas, nrOfPermuts])  # e.g. 100 x 3000
    for repsPerGroupIdx in range(1, nrOfReplicas + 1):
        for permutIdx in range(nrOfPermuts):
            d1TimeCut = d1[:, 0:int(timesOfInterestFramesIter)]
            d1Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
            d1Sel = d1TimeCut[d1Idxs, :]
            d1Mean = np.mean(d1Sel.flatten())

            d2TimeCut = d2[:, 0:int(timesOfInterestFramesIter)]
            d2Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
            d2Sel = d2TimeCut[d2Idxs, :]
            d2Mean = np.mean(d2Sel.flatten())

            diff = d1Mean - d2Mean

            allResults[repsPerGroupIdx - 1, permutIdx] = np.abs(diff)

    queue.put(allResults)



def evalDifferences_parallel (d1, d2):
    # d1 and d2 are of size reps x time (e.g. 100x801)

    nrOfReplicas = d1.shape[0]
    nrOfFrames = d1.shape[1]
    timesOfInterestNs = [0.25, 0.5, 1, 2, 3, 4, 5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]  # 17
    nrOfTimesOfInterest = len(timesOfInterestNs)
    framesPerNs = (nrOfFrames-1)/100  # sim time == 100 ns
    timesOfInterestFrames = [x*framesPerNs for x in timesOfInterestNs]

    nrOfPermuts = 5000

    allResults = np.zeros([nrOfTimesOfInterest, nrOfReplicas, nrOfPermuts]) # e.g. 17 x 100 x 3000
    nrOfProcesses = cpu_count()
    print('{} cores available'.format(nrOfProcesses))
    queue = Queue()
    jobs = []
    print('Starting ...')

    # use one process for each time cut
    for timesOfInterestFramesIterIdx, timesOfInterestFramesIter in enumerate(timesOfInterestFrames):
        p = Process(target=groupDiffsInParallel, args=(queue, d1, d2, nrOfReplicas, nrOfPermuts, timesOfInterestFramesIter))
        p.start()
        jobs.append(p)
        print('Process {} started work on time \"{} ns\"'.format(timesOfInterestFramesIterIdx, timesOfInterestNs[timesOfInterestFramesIterIdx]), end='\n', flush=True)
    # collect the results
    for timesOfInterestFramesIterIdx, timesOfInterestFramesIter in enumerate(timesOfInterestFrames):
        oneResult = queue.get()
        allResults[timesOfInterestFramesIterIdx, :, :] = oneResult
        print('Process number {} returned the results.'.format(timesOfInterestFramesIterIdx), end='\n', flush=True)
    # hold main thread and wait for the child process to complete. then join back the resources in the main thread
    for proc in jobs:
        proc.join()
    print("All parallel done.")

    allResultsMeanOverPermuts = allResults.mean(axis=2)  # size: 17 x 100

    replicaNumbersToPlot = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100])
    replicaNumbersToPlot -= 1  # zero index!
    colors = pl.cm.jet(np.linspace(0, 1, len(replicaNumbersToPlot)))

    ctr = 0

    f, ax = plt.subplots(nrows=2, ncols=2, figsize=(12, 12))
    axId = (1, 0)
    for lineIdx in replicaNumbersToPlot:
        lineData = allResultsMeanOverPermuts[:, lineIdx]
        ax[axId].plot(lineData, ".-", color=colors[ctr], linewidth=0.5, label="nReps="+str(lineIdx+1))
        ctr+=1

    ax[axId].set_xticks(range(nrOfTimesOfInterest))  # careful: this is not the same as plt.xticks!!
    ax[axId].set_xticklabels(timesOfInterestNs)
    ax[axId].set_xlabel("simulation length taken into account")
    ax[axId].set_ylabel("average difference between mean values boot strapping samples")
    ax[axId].set_xlim([ax[axId].get_xlim()[0], ax[axId].get_xlim()[1]+1])  # increase x max by 2

    plt.show()


##### MAIN ####
np.random.seed(83737)  # some number for reproducibility
d1 = np.random.rand(100, 801)
d2 = np.random.rand(100, 801)

np.random.seed(52389)  # if changed to 324235 the peak is gone
evalDifferences_parallel(d1, d2)

------------- UPDATE ---------------

Changing the random number generator from numpy to "from random import randint" does not fix the problem:

from:

d1Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
d2Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)

to:

d1Idxs = [randint(0, nrOfReplicas-1) for p in range(repsPerGroupIdx)]
d2Idxs = [randint(0, nrOfReplicas-1) for p in range(repsPerGroupIdx)]

--- UPDATE 2 ---

timesOfInterestNs can just be set to:

timesOfInterestNs = [0.25, 0.5, 1, 2, 3, 4, 5, 10, 20, 30, 40, 50]

to speed it up on machines with fewer cores.

--- UPDATE 3 ---

Re-initialising the random seed generator in each child process (Random seed is replication across child processes) does also not fix the problem:

pid = str(current_process())
pid = int(re.split("(\W)", pid)[6])
ms = int(round(time.time() * 1000))
mySeed = np.mod(ms, 4294967295)
mySeed = mySeed + 25000 * pid + 100 * pid + pid
mySeed = np.mod(mySeed, 4294967295)
np.random.seed(seed=mySeed)

--- UPDATE 4 --- On a windows machine you will need a:

if __name__ == '__main__':

to avoid creating subprocesses recursively (and a crash).

like image 850
lordy Avatar asked Mar 21 '19 11:03

lordy


1 Answers

I guess this is the classical multiprocessing mistake. Nothing guarantees that the processes will finish in the same order as the one they started. This means that you cannot be sure that the instruction allResults[timesOfInterestFramesIterIdx, :, :] = oneResult will store the result of process timesOfInterestFramesIterIdx at the location timesOfInterestFramesIterIdx in allResults. To make it clearer, let's say timesOfInterestFramesIterIdx is 2, then you have absolutely no guarantee that oneResult is the output of process 2.

I have implemented a very quick fix below. The idea is to track the order in which the processes have been launched by adding an extra argument to groupDiffsInParallel which is then stored in the queue and thereby serves as a process identifier when the results are gathered.

import matplotlib.pyplot as plt
import numpy as np
from multiprocessing import cpu_count, Process, Queue
import matplotlib.pylab as pl


def groupDiffsInParallel(queue, d1, d2, nrOfReplicas, nrOfPermuts,
                         timesOfInterestFramesIter,
                         timesOfInterestFramesIterIdx):

    allResults = np.zeros([nrOfReplicas, nrOfPermuts])  # e.g. 100 x 3000
    for repsPerGroupIdx in range(1, nrOfReplicas + 1):
        for permutIdx in range(nrOfPermuts):
            d1TimeCut = d1[:, 0:int(timesOfInterestFramesIter)]
            d1Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
            d1Sel = d1TimeCut[d1Idxs, :]
            d1Mean = np.mean(d1Sel.flatten())

            d2TimeCut = d2[:, 0:int(timesOfInterestFramesIter)]
            d2Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
            d2Sel = d2TimeCut[d2Idxs, :]
            d2Mean = np.mean(d2Sel.flatten())

            diff = d1Mean - d2Mean

            allResults[repsPerGroupIdx - 1, permutIdx] = np.abs(diff)

    queue.put({'allResults': allResults,
               'number': timesOfInterestFramesIterIdx})


def evalDifferences_parallel (d1, d2):
    # d1 and d2 are of size reps x time (e.g. 100x801)

    nrOfReplicas = d1.shape[0]
    nrOfFrames = d1.shape[1]
    timesOfInterestNs = [0.25, 0.5, 1, 2, 3, 4, 5, 10, 20, 30, 40, 50, 60, 70,
                         80, 90, 100]  # 17
    nrOfTimesOfInterest = len(timesOfInterestNs)
    framesPerNs = (nrOfFrames-1)/100  # sim time == 100 ns
    timesOfInterestFrames = [x*framesPerNs for x in timesOfInterestNs]

    nrOfPermuts = 5000

    allResults = np.zeros([nrOfTimesOfInterest, nrOfReplicas,
                           nrOfPermuts])  # e.g. 17 x 100 x 3000
    nrOfProcesses = cpu_count()
    print('{} cores available'.format(nrOfProcesses))
    queue = Queue()
    jobs = []
    print('Starting ...')

    # use one process for each time cut
    for timesOfInterestFramesIterIdx, timesOfInterestFramesIter \
            in enumerate(timesOfInterestFrames):
        p = Process(target=groupDiffsInParallel,
                    args=(queue, d1, d2, nrOfReplicas, nrOfPermuts,
                          timesOfInterestFramesIter,
                          timesOfInterestFramesIterIdx))
        p.start()
        jobs.append(p)
        print('Process {} started work on time \"{} ns\"'.format(
            timesOfInterestFramesIterIdx,
            timesOfInterestNs[timesOfInterestFramesIterIdx]),
              end='\n', flush=True)
    # collect the results
    resultdict = {}
    for timesOfInterestFramesIterIdx, timesOfInterestFramesIter \
            in enumerate(timesOfInterestFrames):
        resultdict.update(queue.get())
        allResults[resultdict['number'], :, :] = resultdict['allResults']
        print('Process number {} returned the results.'.format(
            resultdict['number']), end='\n', flush=True)
    # hold main thread and wait for the child process to complete. then join
    # back the resources in the main thread
    for proc in jobs:
        proc.join()
    print("All parallel done.")

    allResultsMeanOverPermuts = allResults.mean(axis=2)  # size: 17 x 100

    replicaNumbersToPlot = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40,
                                     50, 60, 70, 80, 90, 100])
    replicaNumbersToPlot -= 1  # zero index!
    colors = pl.cm.jet(np.linspace(0, 1, len(replicaNumbersToPlot)))

    ctr = 0

    f, ax = plt.subplots(nrows=2, ncols=2, figsize=(12, 12))
    axId = (1, 0)
    for lineIdx in replicaNumbersToPlot:
        lineData = allResultsMeanOverPermuts[:, lineIdx]
        ax[axId].plot(lineData, ".-", color=colors[ctr], linewidth=0.5,
                      label="nReps="+str(lineIdx+1))
        ctr += 1

    ax[axId].set_xticks(range(nrOfTimesOfInterest))
    # careful: this is not the same as plt.xticks!!
    ax[axId].set_xticklabels(timesOfInterestNs)
    ax[axId].set_xlabel("simulation length taken into account")
    ax[axId].set_ylabel("average difference between mean values boot "
                        + "strapping samples")
    ax[axId].set_xlim([ax[axId].get_xlim()[0], ax[axId].get_xlim()[1]+1])
    # increase x max by 2

    plt.show()


# #### MAIN ####
np.random.seed(83737)  # some number for reproducibility
d1 = np.random.rand(100, 801)
d2 = np.random.rand(100, 801)

np.random.seed(52389)  # if changed to 324235 the peak is gone
evalDifferences_parallel(d1, d2)

This is the output I get, which obviously shows that the order in which the processes return is shuffled compared to the starting order.

20 cores available
Starting ...
Process 0 started work on time "0.25 ns"
Process 1 started work on time "0.5 ns"
Process 2 started work on time "1 ns"
Process 3 started work on time "2 ns"
Process 4 started work on time "3 ns"
Process 5 started work on time "4 ns"
Process 6 started work on time "5 ns"
Process 7 started work on time "10 ns"
Process 8 started work on time "20 ns"
Process 9 started work on time "30 ns"
Process 10 started work on time "40 ns"
Process 11 started work on time "50 ns"
Process 12 started work on time "60 ns"
Process 13 started work on time "70 ns"
Process 14 started work on time "80 ns"
Process 15 started work on time "90 ns"
Process 16 started work on time "100 ns"
Process number 3 returned the results.
Process number 0 returned the results.
Process number 4 returned the results.
Process number 7 returned the results.
Process number 1 returned the results.
Process number 2 returned the results.
Process number 5 returned the results.
Process number 8 returned the results.
Process number 6 returned the results.
Process number 9 returned the results.
Process number 10 returned the results.
Process number 11 returned the results.
Process number 12 returned the results.
Process number 13 returned the results.
Process number 14 returned the results.
Process number 15 returned the results.
Process number 16 returned the results.
All parallel done.

And the figure which is produced.

Correct output

like image 78
Patol75 Avatar answered Sep 29 '22 03:09

Patol75