Skip to content

Commit

Permalink
re-add notifications to queue in case of throttling error (#1436)
Browse files Browse the repository at this point in the history
* re-add notifications to queue in case of throttling error

* log as error
  • Loading branch information
ErinLMoore committed Apr 29, 2024
1 parent 4fdeabe commit 15324a4
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 7 deletions.
32 changes: 29 additions & 3 deletions apps/alert_processor/lib/dissemination/notification_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,31 @@ defmodule AlertProcessor.NotificationWorker do
log(id, notification, "event=pop time=#{now() - pop_start}")

send_start = now()
NotificationSender.send(notification)
log(id, notification, "event=send time=#{now() - send_start}")

send(self(), :work)
case NotificationSender.send(notification) do
{:ok, _} ->
log(id, notification, "event=send time=#{now() - send_start}")
send(self(), :work)

{:error, {:http_error, 400, _}} ->
log_error(
id,
notification,
"event=send_failure action=put_back_on_queue time=#{now() - send_start}"
)

SendingQueue.push(notification)
send(self(), :work)

{:error, _} ->
log_error(
id,
notification,
"event=send_failure action=none time=#{now() - send_start}"
)

send(self(), :work)
end

:error ->
Process.send_after(self(), :work, @idle_wait)
Expand All @@ -66,6 +87,11 @@ defmodule AlertProcessor.NotificationWorker do
Logger.info("worker_log id=#{id} #{message}")
end

defp log_error(id, notification, message) do
mode = if(is_nil(notification.phone_number), do: "email", else: "sms")
Logger.error("worker_log id=#{id} mode=#{mode} notification=#{notification.id} #{message}")
end

defp now() do
System.monotonic_time(:microsecond)
end
Expand Down
1 change: 1 addition & 0 deletions apps/alert_processor/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ defmodule AlertProcessor.Mixfile do
{:fast_local_datetime, "~> 1.0.0"},
{:gettext, "~> 0.11"},
{:httpoison, "~> 2.2.1"},
{:mock, "~> 0.3.0", only: :test},
{:paper_trail, "0.8.3"},
{:poison, "~> 2.0"},
{:poolboy, "~> 1.5.0"},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule AlertProcessor.NotificationWorkerTest do
@moduledoc false
use AlertProcessor.DataCase
use AlertProcessor.DataCase, async: false
import Mock

alias AlertProcessor.{Model, NotificationWorker, SendingQueue}
alias Model.{Alert, InformedEntity, Notification}
Expand All @@ -12,7 +13,7 @@ defmodule AlertProcessor.NotificationWorkerTest do
]
}

@notification %Notification{
@email_notification %Notification{
header: "This is a test alert",
service_effect: "effect",
alert_id: "1",
Expand All @@ -23,12 +24,66 @@ defmodule AlertProcessor.NotificationWorkerTest do
alert: @alert
}

test "passes jobs from sending queue to notification" do
@sms_notification %Notification{
header: "This is a test alert",
service_effect: "effect",
alert_id: "1",
email: nil,
phone_number: 3,
send_after: DateTime.utc_now(),
status: :sent,
alert: @alert
}

test "passes email jobs from sending queue to notification" do
{:ok, pid} = start_supervised(NotificationWorker)
:erlang.trace(pid, true, [:receive])

SendingQueue.push(@notification)
SendingQueue.push(@email_notification)

assert_receive {:trace, ^pid, :receive, {:sent_email, _}}
end

test "passes sms jobs from sending queue to notification" do
{:ok, pid} = start_supervised(NotificationWorker)
:erlang.trace(pid, true, [:receive])

SendingQueue.push(@sms_notification)
assert SendingQueue.length() == 1
assert_receive {:trace, ^pid, :receive, {:publish, _}}
assert SendingQueue.length() == 0
end

test "re-adds notification to queue in case of 400 throttling error" do
with_mock ExAws.Mock,
request: fn operation, _ ->
:timer.sleep(40)
send(self(), {operation.action, operation.params})
{:error, {:http_error, 400, %{code: "Throttling", message: "Rate exceeded"}}}
end do
{:ok, pid} = start_supervised(NotificationWorker)
:erlang.trace(pid, true, [:receive])
assert SendingQueue.length() == 0
SendingQueue.push(@sms_notification)
assert SendingQueue.length() == 1
assert_receive {:trace, ^pid, :receive, {:publish, _}}
end
end

test "does not re-add to queue in case of other error" do
with_mock ExAws.Mock,
request: fn operation, _ ->
:timer.sleep(40)
send(self(), {operation.action, operation.params})
{:error, {:other_error, 500, %{}}}
end do
{:ok, pid} = start_supervised(NotificationWorker)
:erlang.trace(pid, true, [:receive])
assert SendingQueue.length() == 0
SendingQueue.push(@sms_notification)
assert SendingQueue.length() == 1
assert_receive {:trace, ^pid, :receive, {:publish, _}}
assert SendingQueue.length() == 0
end
end
end
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"mint": {:hex, :mint, "1.5.1", "8db5239e56738552d85af398798c80648db0e90f343c8469f6c6d8898944fb6f", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "4a63e1e76a7c3956abd2c72f370a0d0aecddc3976dea5c27eccbecfa5e7d5b1e"},
"mjml": {:hex, :mjml, "1.5.0", "20a4ed2490a60c6928d45a69b64fb45ce8d8bdac686ef689315b0adda69c6406", [:mix], [{:rustler, ">= 0.0.0", [hex: :rustler, repo: "hexpm", optional: true]}, {:rustler_precompiled, "~> 0.6.0", [hex: :rustler_precompiled, repo: "hexpm", optional: false]}], "hexpm", "44dc36c0fccf52eeb8e0afcb26a863ba41a5f9adcb71bb32e084619a13bb4cdf"},
"mock": {:hex, :mock, "0.3.8", "7046a306b71db2488ef54395eeb74df0a7f335a7caca4a3d3875d1fc81c884dd", [:mix], [{:meck, "~> 0.9.2", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "7fa82364c97617d79bb7d15571193fc0c4fe5afd0c932cef09426b3ee6fe2022"},
"nimble_options": {:hex, :nimble_options, "1.0.2", "92098a74df0072ff37d0c12ace58574d26880e522c22801437151a159392270e", [:mix], [], "hexpm", "fd12a8db2021036ce12a309f26f564ec367373265b53e25403f0ee697380f1b8"},
"nimble_pool": {:hex, :nimble_pool, "1.0.0", "5eb82705d138f4dd4423f69ceb19ac667b3b492ae570c9f5c900bb3d2f50a847", [:mix], [], "hexpm", "80be3b882d2d351882256087078e1b1952a28bf98d0a287be87e4a24a710b67a"},
"oidcc": {:hex, :oidcc, "3.1.1", "4016f35f08131053bddaae3bf644a6b3ce33cf8b297e6f46d638005b3e68d8fd", [:mix, :rebar3], [{:jose, "~> 1.11", [hex: :jose, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.2", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_registry, "~> 0.3.1", [hex: :telemetry_registry, repo: "hexpm", optional: false]}], "hexpm", "4401959a3674071345c7a8041f3962dfce68b588a9a270f927602dde10a801c4"},
Expand Down

0 comments on commit 15324a4

Please sign in to comment.