Skip to content

Commit

Permalink
Merge pull request #1305 from michaelcaterisano/mc/run-docset-callback
Browse files Browse the repository at this point in the history
Add optional run_docset callback to Absinthe.Subscription.PubSub behavior
  • Loading branch information
benwilson512 committed Apr 20, 2024
2 parents d93e490 + 90f868e commit faebd88
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 25 deletions.
51 changes: 30 additions & 21 deletions lib/absinthe/subscription/local.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ defmodule Absinthe.Subscription.Local do
{topic, key_strategy, doc}
end

run_docset(pubsub, docs_and_topics, mutation_result)
run_docset_fn =
if function_exported?(pubsub, :run_docset, 3), do: &pubsub.run_docset/3, else: &run_docset/3

run_docset_fn.(pubsub, docs_and_topics, mutation_result)

:ok
end
Expand All @@ -37,26 +40,7 @@ defmodule Absinthe.Subscription.Local do
defp run_docset(pubsub, docs_and_topics, mutation_result) do
for {topic, key_strategy, doc} <- docs_and_topics do
try do
pipeline =
doc.initial_phases
|> Pipeline.replace(
Phase.Telemetry,
{Phase.Telemetry, event: [:subscription, :publish, :start]}
)
|> Pipeline.without(Phase.Subscription.SubscribeSelf)
|> Pipeline.insert_before(
Phase.Document.Execution.Resolution,
{Phase.Document.OverrideRoot, root_value: mutation_result}
)
|> Pipeline.upto(Phase.Document.Execution.Resolution)

pipeline = [
pipeline,
[
result_phase(doc),
{Absinthe.Phase.Telemetry, event: [:subscription, :publish, :stop]}
]
]
pipeline = pipeline(doc, mutation_result)

{:ok, %{result: data}, _} = Absinthe.Pipeline.run(doc.source, pipeline)

Expand All @@ -75,6 +59,31 @@ defmodule Absinthe.Subscription.Local do
end
end

def pipeline(doc, mutation_result) do
pipeline =
doc.initial_phases
|> Pipeline.replace(
Phase.Telemetry,
{Phase.Telemetry, event: [:subscription, :publish, :start]}
)
|> Pipeline.without(Phase.Subscription.SubscribeSelf)
|> Pipeline.insert_before(
Phase.Document.Execution.Resolution,
{Phase.Document.OverrideRoot, root_value: mutation_result}
)
|> Pipeline.upto(Phase.Document.Execution.Resolution)

pipeline = [
pipeline,
[
result_phase(doc),
{Absinthe.Phase.Telemetry, event: [:subscription, :publish, :stop]}
]
]

pipeline
end

defp get_docs(pubsub, field, mutation_result, topic: topic_fun)
when is_function(topic_fun, 1) do
do_get_docs(pubsub, field, topic_fun.(mutation_result))
Expand Down
8 changes: 8 additions & 0 deletions lib/absinthe/subscription/pubsub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,12 @@ defmodule Absinthe.Subscription.Pubsub do
only.
"""
@callback publish_subscription(topic :: binary, data :: map) :: term

# @doc """
# This function is called by publish_mutation and is responsible for resolving the documents
# and publishing the results to the appropriate topics.
# """
@callback run_docset(pubsub :: t, docs_and_topics :: list, mutation_result :: term) :: term

@optional_callbacks run_docset: 3
end
93 changes: 90 additions & 3 deletions test/absinthe/execution/subscription_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,47 @@ defmodule Absinthe.Execution.SubscriptionTest do
end
end

defmodule PubSubWithDocsetRunner do
@behaviour Absinthe.Subscription.Pubsub

def start_link() do
Registry.start_link(keys: :duplicate, name: __MODULE__)
end

def node_name() do
node()
end

def subscribe(topic) do
Registry.register(__MODULE__, topic, [])
:ok
end

def publish_subscription(topic, data) do
message = %{
topic: topic,
event: "subscription:data",
result: data
}

Registry.dispatch(__MODULE__, topic, fn entries ->
for {pid, _} <- entries, do: send(pid, {:broadcast, message})
end)
end

def publish_mutation(_proxy_topic, _mutation_result, _subscribed_fields) do
# this pubsub is local and doesn't support clusters
:ok
end

def run_docset(pubsub, docs_and_topics, _mutation_result) do
for {topic, _key_strategy, _doc} <- docs_and_topics do
# publish mutation results to topic
pubsub.publish_subscription(topic, %{data: %{runner: "calls the custom docset runner"}})
end
end
end

defmodule Schema do
use Absinthe.Schema

Expand Down Expand Up @@ -189,6 +230,9 @@ defmodule Absinthe.Execution.SubscriptionTest do
setup_all do
{:ok, _} = PubSub.start_link()
{:ok, _} = Absinthe.Subscription.start_link(PubSub)

{:ok, _} = PubSubWithDocsetRunner.start_link()
{:ok, _} = Absinthe.Subscription.start_link(PubSubWithDocsetRunner)
:ok
end

Expand Down Expand Up @@ -719,12 +763,55 @@ defmodule Absinthe.Execution.SubscriptionTest do
refute_receive({:broadcast, _})
end

defp run_subscription(query, schema, opts \\ []) do
opts = Keyword.update(opts, :context, %{pubsub: PubSub}, &Map.put(&1, :pubsub, PubSub))
@query """
subscription ($userId: ID!) {
user(id: $userId) { id name }
}
"""
test "calls the optional run_docset callback if supplied" do
id = "1"

assert {:ok, %{"subscribed" => topic}} =
run_subscription(
@query,
Schema,
variables: %{"userId" => id},
context: %{pubsub: PubSubWithDocsetRunner}
)

mutation = """
mutation ($userId: ID!) {
updateUser(id: $userId) { id name }
}
"""

assert {:ok, %{data: _}} =
run_subscription(mutation, Schema,
variables: %{"userId" => id},
context: %{pubsub: PubSubWithDocsetRunner}
)

assert_receive({:broadcast, msg})

assert %{
event: "subscription:data",
result: %{data: %{runner: "calls the custom docset runner"}},
topic: topic
} == msg
end

def run_subscription(query, schema, opts \\ []) do
opts =
Keyword.update(
opts,
:context,
%{pubsub: PubSub},
&Map.put(&1, :pubsub, opts[:context][:pubsub] || PubSub)
)

case run(query, schema, opts) do
{:ok, %{"subscribed" => topic}} = val ->
PubSub.subscribe(topic)
opts[:context][:pubsub].subscribe(topic)
val

val ->
Expand Down
3 changes: 2 additions & 1 deletion test/absinthe/middleware/batch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ defmodule Absinthe.Middleware.BatchTest do
end)

wait_for_process_to_exit(pid)
end) =~ "fn: {Absinthe.Middleware.BatchTest.TimeoutModule, :arbitrary_fn_name, %{arbitrary: :data}}"
end) =~
"fn: {Absinthe.Middleware.BatchTest.TimeoutModule, :arbitrary_fn_name, %{arbitrary: :data}}"
end

defp wait_for_process_to_exit(pid) do
Expand Down

0 comments on commit faebd88

Please sign in to comment.