Chapter 13 Abstract duplicated supervision code
13.1 Objectives
- overview of requirements
- pseudo generalize
Core.ServiceSupervisormodule - utilize pseudo generalized code inside the
Naive.DynamicSymbolSupervisor - implement a truly generic
Core.ServiceSupervisor - use the
Core.ServiceSupervisormodule inside thestreamerapplication
13.2 Overview of requirements
In the last few chapters, we went through adding and modifying the dynamic supervision tree around the naive and streamer applications’ workers. Initially, we just copied the implementation from the streamer application to the naive application (with a few minor tweaks like log messages). That wasn’t the most sophisticated solution and we will address this copy-paste pattern in this chapter.
We will write an “extension” of the DynamicSupervisor that allows to start, stop and autostart workers. Just to keep things simple we will create a new application inside our umbrella project where we will place the logic. This will save us from creating a new repo for time being.
Our new Core.ServiceSupervisor module will hold all the logic responsible for starting, stopping, and autostarting worker processes. To limit the boilerplate inside the implementation modules (like Naive.DynamicSymbolSupervisor or Streamer.DynamicStreamerSupervisor) we will utilize the use macro that will dynamically generate low-level wiring for us.
13.3 Pseudo generalize Core.ServiceSupervisor module
Let’s start by creating a new non-supervised application called core inside our umbrella project. At this moment our “abstraction code” will sit inside it just to keep things simple as otherwise, we would need to create a new repo and jump between codebases which we will avoid for time being:
$ cd apps
$ mix new core
* creating README.md
* creating .formatter.exs
* creating .gitignore
* creating mix.exs
* creating lib
* creating lib/core.ex
* creating test
* creating test/test_helper.exs
* creating test/core_test.exs
...We can now create a new directory called core inside the apps/core/lib directory and a new file called service_supervisor.ex inside it where we will put all abstracted starting/stopping/autostarting logic.
Let’s start with an empty module:
The first step in our refactoring process will be to move(cut) all of the functions from the Naive.DynamicSymbolSupervisor (excluding the start_link/1, init/1 and shutdown_trading/1) and put them inside the Core.ServiceSupervisor module which should look as follows:
# /apps/core/lib/core/service_supervisor.ex
defmodule Core.ServiceSupervisor do
def autostart_trading do
...
end
def start_trading(symbol) when is_binary(symbol) do
...
end
def stop_trading(symbol) when is_binary(symbol) do
...
end
defp get_pid(symbol) do
...
end
defp update_trading_status(symbol, status)
when is_binary(symbol) and is_binary(status) do
...
end
defp start_symbol_supervisor(symbol) do
...
end
defp fetch_symbols_to_trade do
...
end
endAll of the above code is trading related - we need to rename functions/logs to be more generic.
Starting with autostart_trading/0 we can rename it to autostart_workers/0:
# /apps/core/lib/core/service_supervisor.ex
...
def autostart_workers do # <= updated function name
fetch_symbols_to_start() # <= updated function name
|> Enum.map(&start_worker/1) # <= updated function name
end
...As we updated two functions inside the autostart_workers/0 we need to update their implementations.
The start_trading/1 will become start_worker/1, internally we will inline the
start_symbol_supervisor/1 function(move it’s contents inside the start_worker/1 function and remove the start_symbol_supervisor/1 function) as it’s used just once inside this module as well as update_trading_status/2 need to be renamed to update_status/2.
The fetch_symbols_to_trade/0 will get updated to fetch_symbols_to_start/0:
# /apps/core/lib/core/service_supervisor.ex
def start_worker(symbol) when is_binary(symbol) do # <= updated name
case get_pid(symbol) do
nil ->
Logger.info("Starting trading of #{symbol}")
{:ok, _settings} = update_status(symbol, "on") # <= updated name
{:ok, _pid} =
DynamicSupervisor.start_child(
Naive.DynamicSymbolSupervisor,
{Naive.SymbolSupervisor, symbol}
) # ^^^^^^ inlined `start_symbol_supervisor/1`
pid ->
Logger.warning("Trading on #{symbol} already started")
{:ok, _settings} = update_status(symbol, "on") # <= updated name
{:ok, pid}
end
end
...
defp fetch_symbols_to_start do # <= updated name
...
end Inside the above code we updated the update_trading_status/2 call to update_status/2 so we need to update the function header to match:
# /apps/core/lib/core/service_supervisor.ex
defp update_status(symbol, status) # <= updated name
when is_binary(symbol) and is_binary(status) do
...
endLast function to rename in this module will be the stop_trading/1 to stop_worker/1, we also need to update calls to update_trading_status/2 to update_status/2 as it was renamed:
# /apps/core/lib/core/service_supervisor.ex
def stop_worker(symbol) when is_binary(symbol) do # <= updated name
case get_pid(symbol) do
nil ->
Logger.warning("Trading on #{symbol} already stopped")
{:ok, _settings} = update_status(symbol, "off") # <= updated name
pid ->
Logger.info("Stopping trading of #{symbol}")
:ok =
DynamicSupervisor.terminate_child(
Naive.DynamicSymbolSupervisor,
pid
)
{:ok, _settings} = update_status(symbol, "off") # <= updated name
end
endAt this moment we have a pseudo-generic implementation of start_worker/1 and stop_worker/1 inside the Core.ServiceSupervisor module. Function names are generic but they still refer to Repo, Settings, and other modules specific to the naive app’s implementation. We are probably in a worse situation than we have been before starting this refactoring ;) but don’t fear this was just the first step on the way to abstract away that starting, stopping, and autostarting code.
13.4 Utilize pseudo generalized code inside the Naive DynamicSymbolSupervisor
Before we will jump back to the naive’s application modules we need to add the core application the dependencies of the naive application:
# /apps/naive/mix.exs
defp deps do
[
{:binance, "~> 1.0"},
{:binance_mock, in_umbrella: true},
{:core, in_umbrella: true}, # <= core dep added
....Let’s get back to the Naive.DynamicSymbolSupervisor where we expect functions that we just cut out to exist like start_trading/1 or stop_trading/1.
Let’s reimplement more generic versions of those functions as just simple calls to the
Core.ServiceSupervisor module:
# /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
...
def autostart_workers do
Core.ServiceSupervisor.autostart_workers()
end
def start_worker(symbol) do
Core.ServiceSupervisor.start_worker(symbol)
end
def stop_worker(symbol) do
Core.ServiceSupervisor.stop_worker(symbol)
endWe also need to update the shutdown_trading/1 function as we removed all the private functions that it relies on:
# /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
def shutdown_worker(symbol) when is_binary(symbol) do # <= updated name
case Core.ServiceSupervisor.get_pid(symbol) do # <= module added
nil ->
Logger.warning("Trading on #{symbol} already stopped")
{:ok, _settings} = Core.ServiceSupervisor.update_status(symbol, "off")
# ^^^ updated name + module
_pid ->
Logger.info("Shutdown of trading on #{symbol} initialized")
{:ok, settings} = Core.ServiceSupervisor.update_status(symbol, "shutdown")
# ^^^ updated name + module
Naive.Leader.notify(:settings_updated, settings)
{:ok, settings}
end
endAs we were moving all the private helper functions we didn’t make them public so the
Naive.DynamicSymbolSupervisor module can use them - we will fix that now together with temporary aliases/require/imports at the top of the Core.ServiceSupervisor:
# /apps/core/lib/core/service_supervisor.ex
defmodule Core.ServiceSupervisor do
require Logger # <= added require
import Ecto.Query, only: [from: 2] # <= added import
alias Naive.Repo # <= added alias
alias Naive.Schema.Settings # <= added alias
...
def get_pid(symbol) do # <= updated from private to public
...
end
def update_status(symbol, status) # <= updated from private to public
when is_binary(symbol) and is_binary(status) do
...
endAs fetch_symbols_to_start/0 is only used internally by the Core.ServiceSupervisor module itself, we don’t need to make it public.
We can also remove the aliases and import from the Naive.DynamicSymbolSupervisor as it won’t need them anymore.
The next step will be to add ecto to the deps of the core application as it will make db queries now:
As we modified the interface of the Naive.DynamicSymbolSupervisor (for example renamed start_trading/1 to start_worker/1 and others) we need to modify the Naive.Supervisor’s children list - more specifically the Task process:
# /apps/naive/lib/naive/supervisor.ex
...
{Task,
fn ->
Naive.DynamicSymbolSupervisor.autostart_workers() # <= func name updated
end}
...The last step will be to update the interface of the naive application:
# /apps/naive/lib/naive.ex
alias Naive.DynamicSymbolSupervisor
def start_trading(symbol) do
symbol
|> String.upcase()
|> DynamicSymbolSupervisor.start_worker()
end
def stop_trading(symbol) do
symbol
|> String.upcase()
|> DynamicSymbolSupervisor.stop_worker()
end
def shutdown_trading(symbol) do
symbol
|> String.upcase()
|> DynamicSymbolSupervisor.shutdown_worker()
endBelieve it or not, but at this moment(ignoring all of the warnings because we created a circular dependency between the core and the naive applications - which we will fix in the next steps) our application runs just fine! We are able to start and stop trading, autostarting works as well.
13.5 Implement a truly generic Core.ServiceSupervisor
Ok. Why did we even do this? What we are aiming for is a separation between the interface of our Naive.DynamicSymbolSupervisor module (like start_worker/1, autostart_workers/0 and stop_worker/1) and the implementation which is now placed inside the Core.ServiceSupervisor module.
That’s all nice and to-some-extent understandable but Core.ServiceSupervisor module is not a generic module. We can’t use it inside the streaming application to supervise the Streamer.Binance processes.
So, what’s the point? Well, we can make it even more generic!
13.5.1 First path starting with the fetch_symbols_to_start/0 function
Moving on to full generalization of the Core.ServiceSupervisor module. We will start with the helper functions first as they are the ones doing the work and they need to be truly generalized first:
# /apps/core/lib/core/service_supervisor.ex
def fetch_symbols_to_start do
Repo.all(
from(s in Settings,
where: s.status == "on",
select: s.symbol
)
)
endThe fetch_symbols_to_start/0 function uses Repo and Settings that are aliased at the top of the
Core.ServiceSupervisor module. This just won’t work with any other applications as Streamer will require its own Repo and Settings modules etc.
To fix that we will pass both repo and schema as arguments to the fetch_symbols_to_start/0 function which will become fetch_symbols_to_start/2:
# /apps/core/lib/core/service_supervisor.ex
def fetch_symbols_to_start(repo, schema) do # <= args added
repo.all( # <= lowercase `repo` is an argument not aliased module
from(s in schema, # <= settings schema module passed as arg
where: s.status == "on",
select: s.symbol
)
)
endThis will have a knock-on effect on any functions that are using fetch_symbols_to_start/0 - now they need to use fetch_symbols_to_start/2 and pass appropriate Repo and Schema modules.
So, the fetch_symbols_to_start/0 is referenced by the autostart_workers/0 - we will need to modify it to pass the repo and schema to the fetch_symbols_to_start/2 and as it’s inside the Core.ServiceSupervisor module it needs to get them passed as arguments:
# /apps/core/lib/core/service_supervisor.ex
def autostart_workers(repo, schema) do # <= args added
fetch_symbols_to_start(repo, schema) # <= args passed
|> Enum.map(&start_worker/1)
endGoing even further down the line, autostart_workers/0 is referenced by the autostart_workers/0 inside the Naive.DynamicSymbolSupervisor module. As this module is (naive) application-specific, it is a place where repo and schema are known from the context - for the naive application repo is the Naive.Repo module and schema is the Naive.Schema.Settings module:
# /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
...
def autostart_workers do
Core.ServiceSupervisor.autostart_workers(
Naive.Repo, # <= new value passed
Naive.Schema.Settings # <= new value passed
)
endThis finishes the first of multiple paths that we need to follow to fully refactor the Core.ServiceSupervisor module.
13.5.2 Second path starting with the update_status/2
Let’s don’t waste time and start from the other helper function inside the Core.ServiceSupervisor module. This time we will make the update_status/2 function fully generic:
# /apps/core/lib/core/service_supervisor.ex
def update_status(symbol, status, repo, schema) # <= args added
when is_binary(symbol) and is_binary(status) do
repo.get_by(schema, symbol: symbol) # <= using dynamic repo and schema modules
|> Ecto.Changeset.change(%{status: status})
|> repo.update() # <= using dynamic repo module
endAs previously we added repo and schema as arguments and modified the body of the function to utilize them instead of hardcoded modules (aliased at the top of the Core.ServiceSupervisor module).
In the same fashion as previously, we need to check “who” is using the update_status/2 and update those calls to update_status/4.
The function is used inside the start_worker/1 and the stop_worker/1 inside the
Core.ServiceSupervisor module so as previously we need to bubble them up(pass via arguments to both start_worker/1 and stop_worker/1 functions):
# /apps/core/lib/core/service_supervisor.ex
def start_worker(symbol, repo, schema) when is_binary(symbol) do # <= new args
...
{:ok, _settings} = update_status(symbol, "on", repo, schema) # <= args passed
...
{:ok, _settings} = update_status(symbol, "on", repo, schema) # <= args passed
...
end
def stop_worker(symbol, repo, schema) when is_binary(symbol) do # <= new args
...
{:ok, _settings} = update_status(symbol, "off", repo, schema) # <= args passed
...
{:ok, _settings} = update_status(symbol, "off", repo, schema) # <= args passed
...
endAs we modified both start_worker/1 and stop_worker/1 by adding two additional arguments we need to update all references to them and here is where things branch out a bit.
We will start with start_worker/1 function (which is now start_worker/3) - it’s used by the autostart_workers/2 inside Core.ServiceSupervisor module. The autostart_workers/2 function already has repo and schema so we can just pass them to the start_worker/3:
# /apps/core/lib/core/service_supervisor.ex
def autostart_workers(repo, schema) do
fetch_symbols_to_start(repo, schema)
|> Enum.map(&start_worker(&1, repo, schema)) # <= args passed
endBoth the start_worker/3 and the stop_worker/3 function are used by the functions inside the Naive.DynamicSymbolSupervisor module. We need to pass the Repo and Schema in the same fashion as previously with the autostart_workers/2 function:
# /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
def start_worker(symbol) do
Core.ServiceSupervisor.start_worker(
symbol,
Naive.Repo, # <= new arg passed
Naive.Schema.Settings # <= new arg passed
)
end
def stop_worker(symbol) do
Core.ServiceSupervisor.stop_worker(
symbol,
Naive.Repo, # <= new arg passed
Naive.Schema.Settings # <= new arg passed
)
endAt this moment there’s no code inside the Core.ServiceSupervisor module referencing the aliased Repo nor Schema modules so we can safely remove both aliases - definitely, we are moving in the right direction!
Btw. Our project still works at this stage, we can start/stop trading and it autostarts trading.
13.5.3 Third path starting with the get_pid/1 function
Starting again from the most nested helper function - this time the get_pid/1:
# /apps/core/lib/core/service_supervisor.ex
def get_pid(symbol) do
Process.whereis(:"Elixir.Naive.SymbolSupervisor-#{symbol}")
end We can see that it has a hardcoded Naive.SymbolSupervisor worker module - we need to make this part dynamic by using the worker_module argument:
# /apps/core/lib/core/service_supervisor.ex
def get_pid(worker_module, symbol) do # <= arg added
Process.whereis(:"#{worker_module}-#{symbol}") # <= arg used
endMoving up to functions that are referencing the get_pid/1 function, those will be the start_worker/3 and the stop_worker/3 function.
As those are the two last functions to be updated, we will look into them more closely to finish our refactoring in this 3rd run. At this moment both need to add worker_module as both are calling the get_pid/2 function. Looking at both function we can see two other hardcoded details:
- inside log message there are words “trading” - we can replace them so we will utilize the
worker_moduleandsymbolarguments - there are two references to the
Naive.DynamicSymbolSupervisorwhich we will replace with themoduleargument - there is one more reference to the
Naive.SymbolSupervisormodule which we will replace with theworker_moduleargument
Let’s look at updated functions:
# /apps/core/lib/core/service_supervisor.ex
# module and worker_module args added vvvv
def start_worker(symbol, repo, schema, module, worker_module)
when is_binary(symbol) do
case get_pid(worker_module, symbol) do # <= worker_module passed
nil ->
Logger.info("Starting #{worker_module} worker for #{symbol}")
# ^^^ dynamic text
{:ok, _settings} = update_status(symbol, "on", repo, schema)
{:ok, _pid} = DynamicSupervisor.start_child(module, {worker_module, symbol})
# ^^^ args used
pid ->
Logger.warning("#{worker_module} worker for #{symbol} already started")
# ^^^ dynamic text
{:ok, _settings} = update_status(symbol, "on", repo, schema)
{:ok, pid}
end
end
# module and worker_module added as args vvvv
def stop_worker(symbol, repo, schema, module, worker_module)
when is_binary(symbol) do
case get_pid(worker_module, symbol) do # <= worker_module passed
nil ->
Logger.warning("#{worker_module} worker for #{symbol} already stopped")
# ^^^ dynamic text
{:ok, _settings} = update_status(symbol, "off", repo, schema)
pid ->
Logger.info("Stopping #{worker_module} worker for #{symbol}")
# ^^^ dynamic text
:ok = DynamicSupervisor.terminate_child(module, pid) # <= arg used
{:ok, _settings} = update_status(symbol, "off", repo, schema)
end
endInside both the start_worker/5 and the stop_worker/5 functions we modified:
get_pid/1to pass theworker_moduleLogger’s messages to use theworker_moduleandsymbolDynamicSupervisor’s functions to use themoduleand theworker_module
Again, as we modified start_worker/5 we need to make the last change inside the
Core.ServiceSupervisor module - autostart_workers/2 uses the start_worker/5 function:
# /apps/core/lib/core/service_supervisor.ex
def autostart_workers(repo, schema, module, worker_module) do # <= args added
fetch_symbols_to_start(repo, schema)
|> Enum.map(&start_worker(&1, repo, schema, module, worker_module)) # <= args added
endJust for reference - the final function headers look as following:
# /apps/core/lib/core/service_supervisor.ex
defmodule Core.ServiceSupervisor do
def autostart_workers(repo, schema, module, worker_module) do
...
end
def start_worker(symbol, repo, schema, module, worker_module)
when is_binary(symbol) do
...
end
def stop_worker(symbol, repo, schema, module, worker_module)
when is_binary(symbol) do
...
end
def get_pid(worker_module, symbol) do
...
end
def update_status(symbol, status, repo, schema)
when is_binary(symbol) and is_binary(status) do
...
end
def fetch_symbols_to_start(repo, schema) do
...
end
endThat finishes the 3rd round of updates inside the Core.ServiceSupervisor module, now we need to update the Naive.DynamicSymbolSupervisor module to use updated functions(and pass required arguments):
# /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
...
def autostart_workers do
Core.ServiceSupervisor.autostart_workers(
Naive.Repo,
Naive.Schema.Settings,
__MODULE__, # <= added arg
Naive.SymbolSupervisor # <= added arg
)
end
def start_worker(symbol) do
Core.ServiceSupervisor.start_worker(
symbol,
Naive.Repo,
Naive.Schema.Settings,
__MODULE__, # <= added arg
Naive.SymbolSupervisor # <= added arg
)
end
def stop_worker(symbol) do
Core.ServiceSupervisor.stop_worker(
symbol,
Naive.Repo,
Naive.Schema.Settings,
__MODULE__, # <= added arg
Naive.SymbolSupervisor # <= added arg
)
end
def shutdown_worker(symbol) when is_binary(symbol) do
case Core.ServiceSupervisor.get_pid(Naive.SymbolSupervisor, symbol) do # <= arg added
nil ->
Logger.warning("#{Naive.SymbolSupervisor} worker for #{symbol} already stopped")
# ^^^ updated
{:ok, _settings} =
Core.ServiceSupervisor.update_status(
symbol,
"off",
Naive.Repo,
Naive.Schema.Settings
) # ^^^ args added
_pid ->
Logger.info(
"Initializing shutdown of #{Naive.SymbolSupervisor} worker for #{symbol}"
) # ^^^ updated
{:ok, settings} =
Core.ServiceSupervisor.update_status(
symbol,
"shutdown",
Naive.Repo,
Naive.Schema.Settings
) # ^^^ additional args passed
Naive.Leader.notify(:settings_updated, settings)
{:ok, settings}
end
endWe needed to update referrences inside the shutdown_trading/1 function as well, as it calls get_pid/2 and update_status/4 functions.
We are now done with refactoring the Core.ServiceSupervisor module, it’s completely generic and can be used inside both streamer and the naive applications.
At this moment to use the Core.ServiceSupervisor module we need to write interface functions in our supervisors and pass multiple arguments in each one - again, would need to use of copies those functions inside streamer and naive application. In the next section we will look into how could we leverage Elixir macros to remove that boilerplate.
13.6 Remove boilerplate using use macro
Elixir provides a way to use other modules. Idea is that inside the Naive.DynamicSymbolSupervisor module we are useing the DynamicSupervisor module currently but we could use Core.ServiceSupervisor:
# /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
defmodule Naive.DynamicSymbolSupervisor do
use Core.ServiceSupervisorTo be able to use the Core.ServiceSupervisor module it needs to provide the __using__/1 macro. As the simplest content of that macro, we can use here would be just to use the DynamicSupervisor inside:
# /apps/core/lib/core/service_supervisor.ex
defmacro __using__(opts) do
IO.inspect(opts)
quote location: :keep do
use DynamicSupervisor
end
endHow does this work? As an oversimplification, you can think about it as Elixir will look through the contents of quote’s body(everything between quote ... do and end) in search for the unquote function which can inject dynamic content. All of this will become much clearer as we will go through the first example but the important part is that after executing any potential unquotes inside the quote’s body, Elixir will grab that code as it would be just part of code and place it inside the Naive.DynamicSymbolSupervisor module at compile time(we will also see the result of IO.inspect/1 at compilation).
At this moment(after swapping to use Core.ServiceSupervisor) our code still works and it’s exactly as we would simply have use DynamicSupervisor inside the Naive.DynamicSymbolSupervisor module - as at compilation, it will be swapped to it either way(as per contents of the __using__/1 macro).
As the autostart_workers/0 function is a part of the boilerplate, we will move it from the Naive.DynamicSymbolSupervisor module to the Core.ServiceSupervisor module inside the __using__/1 macro.
Ok, but it has all of those other naive application-specific arguments - where will we get those?
That’s what that opts argument is for inside the __using__/1 macro. When we call
use Core.ServiceSupervisor we can pass an additional keyword list which will contain all naive application-specific details:
# /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
defmodule Naive.DynamicSymbolSupervisor do
use Core.ServiceSupervisor,
repo: Naive.Repo,
schema: Naive.Schema.Settings,
module: __MODULE__,
worker_module: Naive.SymbolSupervisorWe can now update the __using__/1 macro to assign all of those details to variables(instead of using IO.inspect/1):
# /apps/core/lib/core/service_supervisor.ex
defmacro __using__(opts) do
{:ok, repo} = Keyword.fetch(opts, :repo)
{:ok, schema} = Keyword.fetch(opts, :schema)
{:ok, module} = Keyword.fetch(opts, :module)
{:ok, worker_module} = Keyword.fetch(opts, :worker_module)
...At this moment we can use those dynamic values to generate code that will be specific to the implementation module for example autostart_workers/0 that we moved from the Naive.DynamicSymbolSupervisor module and will need to have different values passed to it(like Streamer.Binance as worker_module) for the streamer application. We can see that it requires inserting those dynamic values inside the autstart_workers/0 but how to dynamically inject arguments - unquote to the rescue. When we will update the autostart_workers/0 function from:
# sample moved code from the `Naive.DynamicSymbolSupervisor` module
def autostart_workers do
Core.ServiceSupervisor.autostart_workers(
Naive.Repo,
Naive.Schema.Settings,
__MODULE__,
Naive.SymbolSupervisor
)
endto:
# updated code that will become part of the `__using__/1` macro
def autostart_workers do
Core.ServiceSupervisor.autostart_workers(
unquote(repo),
unquote(schema),
unquote(module),
unquote(worker_module)
)
endAt the end, generated code that will be “pasted” to the Naive.DynamicSymbolSupervisor module at compile time will be:
# compiled code attached to the `Naive.DynamicSymbolSupervisor` module
def autostart_workers do
Core.ServiceSupervisor.autostart_workers(
Naive.Repo,
Naive.Schema.Settings,
Naive.DynamicSymbolSupervisor,
Naive.SymbolSupervisor
)
endThis way we can dynamically create functions for any application(for the streamer application, it will generate function call with the Streamer.Repo, Streamer.Schema.Settings args, etc).
We can apply that to all of the passed variables inside the autostart_workers/0 function - just for reference full macro will look as follows:
# /apps/core/lib/core/service_supervisor.ex
defmacro __using__(opts) do
{:ok, repo} = Keyword.fetch(opts, :repo)
{:ok, schema} = Keyword.fetch(opts, :schema)
{:ok, module} = Keyword.fetch(opts, :module)
{:ok, worker_module} = Keyword.fetch(opts, :worker_module)
quote location: :keep do
use DynamicSupervisor
def autostart_workers do
Core.ServiceSupervisor.autostart_workers(
unquote(repo),
unquote(schema),
unquote(module),
unquote(worker_module)
)
end
end
endYou can think about the above macro that it will substitute the unquote(..) parts with passed values and then it will grab the whole contents between quote ... do and end and it will paste it to the Naive.DynamicSymbolSupervisor module at compile-time - we can visualize generated/“pasted” code as:
# generated by the `__using__/1` macro injected into the `Naive.DynamicSymbolSupervisor`
use DynamicSupervisor
def autostart_workers do
Core.ServiceSupervisor.autostart_workers(
Naive.Repo,
Naive.Schema.Settings,
Naive.DynamicSymbolSupervisor,
Naive.SymbolSupervisor
)
endThis is exactly the code that we had before inside the Naive.DynamicSymbolSupervisor module but now it’s stored away inside the Core.ServiceSupervisor’s __using__/1 macro and it doesn’t need to be implemented/copied across twice into two apps anymore.
We can now follow the same principle and move start_worker/1 and stop_worker/1 from the
Naive.DynamicSymbolSupervisor module into __using__/1 macro inside the Core.ServiceSupervisor module:
# /apps/core/lib/core/service_supervisor.ex
# append the below before the end of the __using__/1 macro
def start_worker(symbol) do
Core.ServiceSupervisor.start_worker(
symbol, # <= this needs to stay as variable
unquote(repo),
unquote(schema),
unquote(module),
unquote(worker_module)
)
end
def stop_worker(symbol) do
Core.ServiceSupervisor.stop_worker(
symbol, # <= this needs to stay as variable
unquote(repo),
unquote(schema),
unquote(module),
unquote(worker_module)
)
endHere we have an example of an execution time variable called symbol that we should not unquote as it will be different per function call (source code should have symbol variable there not for example "NEOUSDT").
At this moment the Naive.DynamicSymbolSupervisor consists of only start_link/1, init/1 and
shutdown_worker/1, it’s under 50 lines of code and works exactly as before refactoring. All of the boilerplate was moved to the Core.ServiceSupervisor module.
We left the shutdown_worker/1 function as it’s specific to the naive application, but inside it, we utilize both the get_pid/2 and the update_status/4 functions where we are passing the naive application-specific variables(like Naive.Repo).
To make things even nicer we can create convenience wrappers for those two functions inside the __using__/1 macro:
# /apps/core/lib/core/service_supervisor.ex
# add below at the end of `quote` block inside `__using__/1`
defp get_pid(symbol) do
Core.ServiceSupervisor.get_pid(
unquote(worker_module),
symbol
)
end
defp update_status(symbol, status) do
Core.ServiceSupervisor.update_status(
symbol,
status,
unquote(repo),
unquote(schema)
)
endAs those will get compiled and “pasted” into the Naive.DynamicSymbolSupervisor module we can utilize them inside the shutdown_worker/1 function as they would be much simpler naive application-specific local functions:
# /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
def shutdown_worker(symbol) when is_binary(symbol) do
case get_pid(symbol) do # <= macro provided function
nil ->
Logger.warning("#{Naive.SymbolSupervisor} worker for #{symbol} already stopped")
{:ok, _settings} = update_status(symbol, "off") # <= macro provided function
_pid ->
Logger.info(
"Initializing shutdown of #{Naive.SymbolSupervisor} worker for #{symbol}"
)
{:ok, settings} = update_status(symbol, "shutdown") # <= macro provided function
Naive.Leader.notify(:settings_updated, settings)
{:ok, settings}
end
endAnd now, a very last change - I promise ;) Both the start_link/1 and the init/1 functions are still referencing the DynamicSupervisor module which could be a little bit confusing - let’s swap those calls to use the Core.ServiceSupervisor module (both to not confuse people and be consistent with the use macro):
# /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
def start_link(init_arg) do
Core.ServiceSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
def init(_init_arg) do
Core.ServiceSupervisor.init(strategy: :one_for_one)
endAs we don’t want/need to do anything different inside the Core.ServiceSupervisor module than the DynamicSupervisor is doing we can just delegate both of those inside the Core.ServiceSupervisor module:
# /apps/core/lib/core/service_supervisor.ex
defdelegate start_link(module, args, opts), to: DynamicSupervisor
defdelegate init(opts), to: DynamicSupervisorThat finishes our refactoring of both the Naive.DynamicSymbolSupervisor and the
Core.ServiceSupervisor modules.
We can test to confirm that everything works as expected:
$ iex -S mix
iex(1)> Naive.start_trading("NEOUSDT")
21:42:37.741 [info] Starting Elixir.Naive.SymbolSupervisor worker for NEOUSDT
21:42:37.768 [info] Starting new supervision tree to trade on NEOUSDT
{:ok, #PID<0.464.0>}
21:42:39.455 [info] Initializing new trader(1614462159452) for NEOUSDT
iex(2)> Naive.stop_trading("NEOUSDT")
21:43:08.362 [info] Stopping Elixir.Naive.SymbolSupervisor worker for NEOUSDT
{:ok,
%Naive.Schema.Settings{
...
}}
iex(3)> Naive.start_trading("HNTUSDT")
21:44:08.689 [info] Starting Elixir.Naive.SymbolSupervisor worker for HNTUSDT
21:44:08.723 [info] Starting new supervision tree to trade on HNTUSDT
{:ok, #PID<0.475.0>}
21:44:11.182 [info] Initializing new trader(1614462251182) for HNTUSDT
BREAK: (a)bort (A)bort with dump (c)ontinue (p)roc info (i)nfo
(l)oaded (v)ersion (k)ill (D)b-tables (d)istribution
$ iex -S mix
21:47:22.119 [info] Starting Elixir.Naive.SymbolSupervisor worker for HNTUSDT
21:47:22.161 [info] Starting new supervision tree to trade on HNTUSDT
21:47:24.213 [info] Initializing new trader(1614462444212) for HNTUSDT
iex(1)> Naive.shutdown_trading("HNTUSDT")
21:48:42.003 [info] Initializing shutdown of Elixir.Naive.SymbolSupervisor worker for
HNTUSDT
{:ok,
%Naive.Schema.Settings{
...
}}The above test confirms that we can start, stop, and shut down trading on a symbol as well as autostarting of trading works.
13.7 Use the Core.ServiceSupervisor module inside the streamer application
As we are happy with the implementation of the Core.ServiceSupervisor module we can upgrade the streamer application to use it.
We need to start with adding the core application to the list of dependencies of the streamer application:
# /apps/streamer/mix.exs
defp deps do
[
{:binance, "~> 1.0"},
{:core, in_umbrella: true}, # <= core added to deps
...We can now move on to the Streamer.DynamicStreamerSupervisor where we will remove everything (really everything including imports, aliases and even require) beside the start_link/1 and the init/1. As with the Naive.DynamicSymbolSupervisor we will use the Core.ServiceSupervisor and pass all required options - full implementation of the Streamer.DynamicStreamerSupervisor module should look as follows:
# /apps/streamer/lib/streamer/dynamic_streamer_supervisor.ex
defmodule Streamer.DynamicStreamerSupervisor do
use Core.ServiceSupervisor,
repo: Streamer.Repo,
schema: Streamer.Schema.Settings,
module: __MODULE__,
worker_module: Streamer.Binance
def start_link(init_arg) do
Core.ServiceSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
def init(_init_arg) do
Core.ServiceSupervisor.init(strategy: :one_for_one)
end
endNot much to add here - we are useing the Core.ServiceSupervisor module and passing options to it so it can macro generates streamer application-specific wrappers(like start_worker/1 or stop_worker/1 with required repo, schema, etc) around generic logic from the Core.ServiceSupervisor module.
Using the Core.Servicesupervisor module will have an impact on the interface of the
Streamer.DynamicStreamerSupervisor as it will now provide functions like start_worker/1 instead of start_streaming/1 etc.
As with the naive application, we need to update the Task function inside the Streamer.Supervisor module:
# /apps/streamer/lib/streamer/supervisor.ex
...
{Task,
fn ->
Streamer.DynamicStreamerSupervisor.autostart_workers()
end}
...As well as main Streamer module needs to forward calls instead of delegating:
# /apps/streamer/lib/streamer.ex
alias Streamer.DynamicStreamerSupervisor
def start_streaming(symbol) do
symbol
|> String.upcase()
|> DynamicStreamerSupervisor.start_worker()
end
def stop_streaming(symbol) do
symbol
|> String.upcase()
|> DynamicStreamerSupervisor.stop_worker()
endWe can run a quick test to confirm that indeed everything works as expected:
$ iex -S mix
iex(1)> Streamer.start_streaming("NEOUSDT")
22:10:38.813 [info] Starting Elixir.Streamer.Binance worker for NEOUSDT
{:ok, #PID<0.465.0>}
iex(2)> Streamer.stop_streaming("NEOUSDT")
22:10:48.212 [info] Stopping Elixir.Streamer.Binance worker for NEOUSDT
{:ok,
%Streamer.Schema.Settings{
__meta__: #Ecto.Schema.Metadata<:loaded, "settings">,
id: "db8c9429-2356-4243-a08f-0d0e89b74986",
inserted_at: ~N[2021-02-25 22:15:16],
status: "off",
symbol: "NEOUSDT",
updated_at: ~N[2021-02-27 22:10:48]
}}
iex(3)> Streamer.start_streaming("LTCUSDT")
22:26:03.361 [info] Starting Elixir.Streamer.Binance worker for LTCUSDT
{:ok, #PID<0.490.0>}
BREAK: (a)bort (A)bort with dump (c)ontinue (p)roc info (i)nfo
(l)oaded (v)ersion (k)ill (D)b-tables (d)istribution
^C
$ iex -S mix
...
22:26:30.775 [info] Starting Elixir.Streamer.Binance worker for LTCUSDTThis finishes the implementation for both the streamer and the naive application. We are generating dynamic functions(metaprogramming) using Elixir macros which is a cool exercise to go through and feels like superpowers ;)
[Note] Please remember to run the mix format to keep things nice and tidy.
The source code for this chapter can be found on GitHub