Skip to content

Conversation

@CorentinJ
Copy link

The repeated creation of the pool in the inner loop leads to serious performance hit:

from timerit import Timerit
for _ in Timerit(1, verbose=2):
    for it in steps:
        if self.parallelized and (it > 0):
            with Pool(initializer = _init_pool, initargs = (mutual_information,)) as p:
                mi = p.map(_parallel_mi, candidates)
        else:
            mi = [mutual_information(i) for i in candidates]
        
        max_ind = np.argmax(mi)
        mutual_information.append(candidates[max_ind])
        del candidates[max_ind]

USPS: Time elapsed: 61.996s
MIRFLICKR: Time elapsed: 359.179s

Moving the pool outside the loop:

        from timerit import Timerit
        for _ in Timerit(1, verbose=2):
            if self.parallelized:
                pool = Pool(initializer=_init_pool, initargs=(mutual_information,))
    
            for it in steps:
                if self.parallelized and (it > 0):
                    mi = pool.map(_parallel_mi, candidates)
                else:
                    mi = [mutual_information(i) for i in candidates]
        
                max_ind = np.argmax(mi)
                mutual_information.append(candidates[max_ind])
                del candidates[max_ind]
    
            if self.parallelized:
                pool.close()

USPS: Time elapsed: 17.587s
MIRFLICKR: Time elapsed: 140.803s

@Callidior Callidior self-requested a review July 20, 2020 12:15
@Callidior Callidior self-assigned this Jul 20, 2020
Copy link
Contributor

@Callidior Callidior left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out! The speed could definitely be improved here by setting up the worker processes only once.

However, I am concerned that your suggested fix breaks the correctness of the method due to the mutual_information object that is passed in the initialization of the pool to each worker. This object is not constant and changes it's state after each step. With your implementation, the workers receive the mutual_information object only once in the beginning and won't update it's state to be in sync with the object from the main process.

Thus, we should update this object in the worker's memory as well, in a fashion like this:

if self.parallelized and (it > 0):
    mi = pool.map(_parallel_mi, candidates)
else
    mi = [mutual_information(i) for i in candidates]

max_ind = np.argmax(mi)
mutual_information.append(candidates[max_ind])
del candidates[max_ind]

if self.parallelized:
    for worker_ind in range(p._processes):
        p.apply_async(_update_pool, mutual_information)

To make sure that the new object is passed to all worker processes and avoid race conditions, we should also introduce a multiprocessing.Barrier for the update:

def _init_pool(mi, barrier):
    global _mi, _barrier
    _mi = mi
    _barrier = barrier

def _update_pool(mi):
    global _mi, _barrier
    _mi = mi
    _barrier.wait()

Since the barrier must be passed to the workers upon initialization, we need to specify the number of worker threads explicitly when initializing the pool:

barrier = Barrier(os.cpu_count())
pool = Pool(barrier.parties, initializer = _init_pool, initargs = (mutual_information, barrier))

It would be nice if you could implement these changes and also verify that the results are still the same as before, in addition to comparing just the run-time. I would then be happy to merge it.

@CorentinJ
Copy link
Author

Oh so that's why the gain of speed is so large, not surprising then. I completely missed that argument.

It would probably be better to refactor that global variable away too, wouldn't it?

@Callidior
Copy link
Contributor

That would be preferable, but I currently don't see an elegant way of doing this.

Nevertheless, I would still expect a speed up with the synchronization of that variable between individual steps among the processes, though not as big as you initially observed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants