Pagerank on Distributed Dask #9721
Replies: 3 comments 8 replies
-
Hi Manoj, great question! You could try following the (somewhat sparse) directions for creating sparse dask arrays backed by pydata/sparse or scipy.sparse: https://docs.dask.org/en/latest/array-sparse.html Another option is to try using fyi, I think we ought to have an example notebook for distributed PageRank with |
Beta Was this translation helpful? Give feedback.
-
@Manoj-red-hat you probably have seen this, but nx-scipy's implementation could be a starting point: The tricky part is that dask arrays are lazy, so checking convergence will require triggerring compute. Unless there are reasons why you'd want to perform This also is probably going to be OK for smaller proof of concept code, but for larger dask arrays this might not scale well due to lack of sparse-awareness in dask arrays, see this issue #7652 |
Beta Was this translation helpful? Give feedback.
-
Here is an implementation of pagerank that works with dask-grblas. It doesn't handle personalization, but covers the basic algorithm. import dask_grblas as dgb
import grblas as gb
def is_converged(xprev, x, tol):
"""Check convergence, L1 norm: err = sum(abs(xprev - x)); err < N * tol
This modifies `xprev`.
"""
xprev << gb.binary.minus(xprev | x, require_monoid=False)
xprev << gb.unary.abs(xprev)
err = xprev.reduce().value.compute()
return err < xprev.size * tol
alpha=0.85
max_iter=100
tol=1e-06
A = dgb.Matrix.from_values([0, 1, 1], [0, 0, 1], [1., 2., 3.])
N = A.nrows
# Initial vector
x = dgb.Vector.new(float, N)
x[:] = 1.0 / N
# Personalization vector or scalar
p = 1.0 / N
# Inverse of row_degrees
# Fold alpha constant into S
row_degrees = A.reduce_rowwise("plus")
S = (alpha / row_degrees).new()
semiring = gb.op.plus_times[float]
is_dangling = S.nvals.compute() < N
if is_dangling:
dangling_mask = Vector.new(float, N)
dangling_mask(mask=~S.S) << 1.0
# Fold alpha constant into dangling_weights (or dangling_mask)
# Fast case (and common case); is iso-valued
dangling_mask(mask=dangling_mask.S) << alpha * p
# Fold constant into p
p *= 1 - alpha
# Power iteration: make up to max_iter iterations
xprev = dgb.Vector.new(float, N)
w = dgb.Vector.new(float, N)
for _ in range(max_iter):
xprev, x = x, xprev
# x << alpha * ((xprev * S) @ A + "dangling_weights") + (1 - alpha) * p
x << p
if is_dangling:
# Fast case: add a scalar; x is still iso-valued (b/c p is also scalar)
x += xprev @ dangling_mask
w << xprev * S
x += semiring(w @ A) # plus_first if A.ss.is_iso else plus_times
if is_converged(xprev, x, tol): # sum(abs(xprev - x)) < N * tol
break
else:
print('Convergence failure')
x.compute() |
Beta Was this translation helpful? Give feedback.
-
Hi everyone,
I am trying to understand Dask for some graph algorithms, for the same I am trying to write a page-rank in dask using COO graph format.
Question : Do we have some library already developed on dask for graph algos except cuGraph its GPU based and metagraph looks its development already stopped.
What approach should I take for writing pagerank on DASK ?
Beta Was this translation helpful? Give feedback.
All reactions