Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

关于 [Issue 142][Why not “p.Running()<capacity”] 的问题 #143

Open
math345 opened this issue Mar 18, 2021 · 4 comments
Open

关于 [Issue 142][Why not “p.Running()<capacity”] 的问题 #143

math345 opened this issue Mar 18, 2021 · 4 comments
Assignees
Labels
needs investigation This issue or proposal needs some investigations proposal Proposal for this repo

Comments

@math345
Copy link

math345 commented Mar 18, 2021

由于 Issue 142 Why not “p.Running()<capacity” 已经 Closed ,所以我新提交了一个 Issue 。

这个问题,我想过。我觉得目前的做法不是很优雅。我之前对这段代码进行了一些推敲,我把我的理解整理如下。

为什么要加下面这段代码呢?而且 p.lock.Unlock() 后再调用 spawnWorker(),running 个数失去了保护,这个次数就有可能超过了设定的最大值 capacity。

		if p.Running() == 0 {
			p.lock.Unlock()
			spawnWorker()
			return
		}

好吧,我们先忽略上面这个小问题。再来看下面这段代码,为什么不能在 w == nil 时,直接 spawnWorker() 个新的 worker? 因为有可能是收到广播信号,此时不能保证 p.Running() <= capacity 。

		w = p.workers.detach()
		if w == nil {
			goto Reentry
		}

但是,我觉得可以通过修改成下面的代码解决掉这个问题

		w = p.workers.detach()
		if w == nil {
			if p.Running() < capacity {
				spawnWorker()
				p.lock.Unlock()
				return
			}
			goto Reentry
		}

由于作者没有采用类似这样的写法,导致需要增加 if p.Running() == 0 这段代码处理广播信号。

再来看广播信号,广播信号是在定时回收时,满足下面的条件触发的。

		// There might be a situation that all workers have been cleaned up(no any worker is running)
		// while some invokers still get stuck in "p.cond.Wait()",
		// then it ought to wakes all those invokers.
		if p.Running() == 0 {
			p.cond.Broadcast()
		}

为什么需要这段代码?原因在于需要考虑这么一种情况,当 retrieveWorker() 发现 runing() 个数超过了容量,因此在 p.cond.Wait() 中。之后 runing() 的所有 worker f() 都发生了 panic,按当前的代码逻辑,是无法正常归还到工作池中的,revertWorker 不会被调用,也就是 revertWorker 不会发出 Signal 通知。因此,如果我们检测到没有1个worker running,就 Broadcast 信号。

for f := range w.task {
	
	if f == nil {
		return
	}
	
	// 需要考虑 f() panic 的情况,此时不会归还到工作池中
	f()
	
	if ok := w.pool.revertWorker(w); !ok {
		return
	}
}

因此广播信号的加入,确实增加了复杂度。能不能在work run() 中包装调用 f() ,处理 panic ?

@math345 math345 added the proposal Proposal for this repo label Mar 18, 2021
@panjf2000
Copy link
Owner

可以尝试提个 PR。

@xuyue86
Copy link

xuyue86 commented Mar 24, 2021

今天刚看完这块的代码,也有疑虑,如果广播意义在于处理全部worker panic的话(没有放回池子中)。

		if p.Running() == 0 {
			p.lock.Unlock()
                        //Running()==0的判断,不是安全的
                        //另外如果p.cond.Wait()的等待很多,广播唤醒时,会产生大量的worker
			if !p.IsClosed() {
				spawnWorker()
			}
			return
		}

另外还有个疑问,如果是大部分worker发生panic,那么如果有大量的p.cond.Wait()也只能等待仅剩下的正常的worker回池,此间如果没新submit,那么worker是无法补充的。

@SummerLius
Copy link

看了下最近的提交内容,潘少已在 worker 的 derfer 中加入了,w.pool.cond.Signal(),这种情况下,还有必要 p.cond.Broadcast()

@panjf2000 panjf2000 added the needs investigation This issue or proposal needs some investigations label May 23, 2021
@molon
Copy link

molon commented Aug 30, 2021

关于 spawnWorker 的调用时机在Lock外这点有相同的疑惑。
另外无意冒犯,想请教:
如果在固定了 pool 的 size 的前提下,想把一个 task 丢进 pool 里运行,大家一般都会想到预先开启size个gorountine 内部对一个chan进行loop,然后想往 pool 里塞任务的话只要往chan里丢就可以了,而且还直接 select ctx 之类的方法。
那ants选用 cond 锁来自己实现这块逻辑的最直接的好处在哪里呢?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs investigation This issue or proposal needs some investigations proposal Proposal for this repo
Projects
None yet
Development

No branches or pull requests

5 participants