Chapter 12 Abstract duplicated supervision code
In the previous two chapters, we implemented start, stop, and autostart functionality for both the streamer and naive applications.
If you look at the code in Streamer.DynamicStreamerSupervisor and Naive.DynamicSymbolSupervisor, you’ll notice they’re almost identical -
we essentially copy-pasted the same patterns with minor tweaks.
That’s a code smell. When we need to fix a bug or add a feature, we’d have to make the same change in multiple places. And as we add more applications to our umbrella, the duplication would only grow.
In this chapter, we’ll fix this by creating a Core.ServiceSupervisor module that extracts the common supervision logic.
We’ll use Elixir’s powerful macro system to generate the boilerplate code at compile time.
By the end, our supervisor modules will be slim wrappers that simply declare their configuration,
while all the heavy lifting happens in the shared Core module.
If you’ve never worked with macros before, here’s the key insight: regular functions operate on values at runtime, but macros operate on code itself at compile time. Think of macros as “code templates” that Elixir fills in and pastes into your modules before your application even starts.
This is a great opportunity to explore Elixir metaprogramming. We’ll use macros to solve our duplication problem, and along the way you’ll learn how tools like Phoenix’s router work under the hood. In the next chapter, we’ll discover an even simpler approach - but understanding macros first will make that “aha moment” much more satisfying!
12.1 Objectives
- overview of requirements
- pseudo generalize
Core.ServiceSupervisormodule - utilize pseudo generalized code inside the
Naive.DynamicSymbolSupervisor - implement a truly generic
Core.ServiceSupervisor - use the
Core.ServiceSupervisormodule inside thestreamerapplication
12.2 Overview of requirements
In the last few chapters, we went through adding and modifying the dynamic supervision tree around the naive and
streamer applications’ workers. Initially, we just copied the implementation from the streamer application to
the naive application(with a few minor tweaks like log messages). That wasn’t the most sophisticated solution and we will
address this copy-paste pattern in this chapter.
We will write an “extension” of the DynamicSupervisor that allows us to start, stop and autostart workers.
Just to keep things simple we will create a new application inside our umbrella project where we will place the logic.
This will save us from creating a new repo for the time being.
Our new Core.ServiceSupervisor module will hold all the logic responsible for starting, stopping, and autostarting worker processes.
To limit the boilerplate inside the implementation modules(like Naive.DynamicSymbolSupervisor or Streamer.DynamicStreamerSupervisor)
we will utilize the use macro that will dynamically generate low-level wiring for us.
12.3 Pseudo generalize Core.ServiceSupervisor module
Let’s start by creating a new non-supervised application called core inside our umbrella project.
At this moment our “abstraction code” will sit inside it just to keep things simple as otherwise,
we would need to create a new repo and jump between codebases which we will avoid for the time being:
$ cd apps
$ mix new core
* creating README.md
* creating .formatter.exs
* creating .gitignore
* creating mix.exs
* creating lib
* creating lib/core.ex
* creating test
* creating test/test_helper.exs
* creating test/core_test.exs
...We can now create a new directory called core inside the apps/core/lib directory and a new file called service_supervisor.ex inside it where we will put all abstracted starting/stopping/autostarting logic.
Let’s start with an empty module:
The first step in our refactoring process will be to move(cut) all of the functions from the Naive.DynamicSymbolSupervisor
(excluding the start_link/1, init/1 and shutdown_trading/1) and put them inside the Core.ServiceSupervisor module which should look as follows:
# /apps/core/lib/core/service_supervisor.ex
defmodule Core.ServiceSupervisor do
def autostart_trading do
...
end
def start_trading(symbol) when is_binary(symbol) do
...
end
def stop_trading(symbol) when is_binary(symbol) do
...
end
defp get_pid(symbol) do
...
end
defp update_trading_status(symbol, status)
when is_binary(symbol) and is_binary(status) do
...
end
defp start_symbol_supervisor(symbol) do
...
end
defp fetch_symbols_to_trade do
...
end
endAll of the above code is trading-related - we need to rename functions/logs to be more generic.
Starting with autostart_trading/0 we can rename it to autostart_workers/0:
# /apps/core/lib/core/service_supervisor.ex
...
def autostart_workers do # <= updated function name
fetch_symbols_to_start() # <= updated function name
|> Enum.map(&start_worker/1) # <= updated function name
end
...As we updated two functions inside the autostart_workers/0 we need to update their implementations.
The start_trading/1 will become start_worker/1, internally we will inline the
start_symbol_supervisor/1 function(move its contents inside the start_worker/1 function and remove the start_symbol_supervisor/1 function)
as it’s used just once inside this module. We also need to rename update_trading_status/2 to update_status/2.
The fetch_symbols_to_trade/0 will get updated to fetch_symbols_to_start/0:
# /apps/core/lib/core/service_supervisor.ex
def start_worker(symbol) when is_binary(symbol) do # <= updated name
case get_pid(symbol) do
nil ->
Logger.info("Starting trading of #{symbol}")
{:ok, _settings} = update_status(symbol, "on") # <= updated name
{:ok, _pid} =
DynamicSupervisor.start_child(
Naive.DynamicSymbolSupervisor,
{Naive.SymbolSupervisor, symbol}
) # ^^^^^^ inlined `start_symbol_supervisor/1`
pid ->
Logger.warning("Trading on #{symbol} already started")
{:ok, _settings} = update_status(symbol, "on") # <= updated name
{:ok, pid}
end
end
...
defp fetch_symbols_to_start do # <= updated name
...
end Inside the above code we updated the update_trading_status/2 call to update_status/2 so we need to update the function header to match:
# /apps/core/lib/core/service_supervisor.ex
defp update_status(symbol, status) # <= updated name
when is_binary(symbol) and is_binary(status) do
...
endThe last function to rename in this module will be stop_trading/1 to stop_worker/1.
We also need to update calls to update_trading_status/2 to update_status/2 as it was renamed:
# /apps/core/lib/core/service_supervisor.ex
def stop_worker(symbol) when is_binary(symbol) do # <= updated name
case get_pid(symbol) do
nil ->
Logger.warning("Trading on #{symbol} already stopped")
{:ok, _settings} = update_status(symbol, "off") # <= updated name
pid ->
Logger.info("Stopping trading of #{symbol}")
:ok =
DynamicSupervisor.terminate_child(
Naive.DynamicSymbolSupervisor,
pid
)
{:ok, _settings} = update_status(symbol, "off") # <= updated name
end
endAt this moment we have a pseudo-generic implementation of start_worker/1 and stop_worker/1 inside the Core.ServiceSupervisor module.
Function names are generic but they still refer to Repo, Settings, and other modules specific to the naive app’s implementation.
we’re probably in a worse situation than before we started this refactoring ;)
but don’t fear this was just the first step on the way to abstract away that starting, stopping, and autostarting code.
12.4 Utilize pseudo generalized code inside the Naive DynamicSymbolSupervisor
Before we jump back to the naive application’s modules, we need to add the core application to the dependencies of the naive application:
# /apps/naive/mix.exs
defp deps do
[
{:binance, "~> 1.0"},
{:binance_mock, in_umbrella: true},
{:core, in_umbrella: true}, # <= core dep added
....Let’s get back to the Naive.DynamicSymbolSupervisor where we expect functions that we just cut out to exist like start_trading/1 or stop_trading/1.
Let’s reimplement more generic versions of those functions as just simple calls to the
Core.ServiceSupervisor module:
# /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
...
def autostart_workers do
Core.ServiceSupervisor.autostart_workers()
end
def start_worker(symbol) do
Core.ServiceSupervisor.start_worker(symbol)
end
def stop_worker(symbol) do
Core.ServiceSupervisor.stop_worker(symbol)
endWe also need to update the shutdown_trading/1 function as we removed all the private functions that it relies on:
# /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
def shutdown_worker(symbol) when is_binary(symbol) do # <= updated name
case Core.ServiceSupervisor.get_pid(symbol) do # <= module added
nil ->
Logger.warning("Trading on #{symbol} already stopped")
{:ok, _settings} = Core.ServiceSupervisor.update_status(symbol, "off")
# ^^^ updated name + module
_pid ->
Logger.info("Shutdown of trading on #{symbol} initialized")
{:ok, settings} = Core.ServiceSupervisor.update_status(symbol, "shutdown")
# ^^^ updated name + module
Naive.Leader.notify(:settings_updated, settings)
{:ok, settings}
end
endAs we were moving all the private helper functions we didn’t make them public so the
Naive.DynamicSymbolSupervisor module can use them - we will fix that now together with temporary aliases/require/imports at the top of the Core.ServiceSupervisor:
# /apps/core/lib/core/service_supervisor.ex
defmodule Core.ServiceSupervisor do
require Logger # <= added require
import Ecto.Query, only: [from: 2] # <= added import
alias Naive.Repo # <= added alias
alias Naive.Schema.Settings # <= added alias
...
def get_pid(symbol) do # <= updated from private to public
...
end
def update_status(symbol, status) # <= updated from private to public
when is_binary(symbol) and is_binary(status) do
...
endAs fetch_symbols_to_start/0 is only used internally by the Core.ServiceSupervisor module itself, we don’t need to make it public.
We can also remove the aliases and import from the Naive.DynamicSymbolSupervisor as it won’t need them anymore.
The next step will be to add ecto to the deps of the core application as it will make db queries now:
As we modified the interface of the Naive.DynamicSymbolSupervisor(for example renamed start_trading/1 to start_worker/1 and others) we need to modify the Naive.Supervisor’s children list - more specifically the Task process:
# /apps/naive/lib/naive/supervisor.ex
...
{Task,
fn ->
Naive.DynamicSymbolSupervisor.autostart_workers() # <= func name updated
end}
...The last step will be to update the interface of the naive application:
# /apps/naive/lib/naive.ex
alias Naive.DynamicSymbolSupervisor
def start_trading(symbol) do
symbol
|> String.upcase()
|> DynamicSymbolSupervisor.start_worker()
end
def stop_trading(symbol) do
symbol
|> String.upcase()
|> DynamicSymbolSupervisor.stop_worker()
end
def shutdown_trading(symbol) do
symbol
|> String.upcase()
|> DynamicSymbolSupervisor.shutdown_worker()
endBelieve it or not, but at this moment(ignoring all of the warnings because we created a circular dependency between the core and
the naive applications - which we will fix in the next steps) our application runs just fine! We are able to start and stop trading,
autostarting works as well.
12.5 Implement a truly generic Core.ServiceSupervisor
Ok. Why did we even do this? What we are aiming for is a separation between the interface of our Naive.DynamicSymbolSupervisor module
(like start_worker/1, autostart_workers/0 and stop_worker/1) and the implementation which is now placed inside the Core.ServiceSupervisor module.
That’s all nice and to-some-extent understandable but Core.ServiceSupervisor module is not a generic module.
We can’t use it inside the streamer application to supervise the Streamer.Binance processes.
So, what’s the point? Well, we can make it even more generic!
12.5.1 First path starting with the fetch_symbols_to_start/0 function
Moving on to full generalization of the Core.ServiceSupervisor module. We will start with the helper functions first as they are
the ones doing the work and they need to be truly generalized first:
# /apps/core/lib/core/service_supervisor.ex
def fetch_symbols_to_start do
Repo.all(
from(s in Settings,
where: s.status == "on",
select: s.symbol
)
)
endThe fetch_symbols_to_start/0 function uses Repo and Settings that are aliased at the top of the
Core.ServiceSupervisor module. This just won’t work with any other applications as Streamer will require its own Repo and Settings modules etc.
To fix that we will pass both repo and schema as arguments to the fetch_symbols_to_start/0 function which will become fetch_symbols_to_start/2:
# /apps/core/lib/core/service_supervisor.ex
def fetch_symbols_to_start(repo, schema) do # <= args added
repo.all( # <= lowercase `repo` is an argument not aliased module
from(s in schema, # <= settings schema module passed as arg
where: s.status == "on",
select: s.symbol
)
)
endThis will have a knock-on effect on any functions that are using fetch_symbols_to_start/0 - now they need to use fetch_symbols_to_start/2 and pass appropriate Repo and Schema modules.
So, the fetch_symbols_to_start/0 is referenced by the autostart_workers/0 - we will need to modify it to pass the repo and schema to the fetch_symbols_to_start/2 and as it’s inside the Core.ServiceSupervisor module it needs to get them passed as arguments:
# /apps/core/lib/core/service_supervisor.ex
def autostart_workers(repo, schema) do # <= args added
fetch_symbols_to_start(repo, schema) # <= args passed
|> Enum.map(&start_worker/1)
endGoing even further down the line, autostart_workers/0 is referenced by the autostart_workers/0 inside the Naive.DynamicSymbolSupervisor module. As this module is(naive) application-specific, it is a place where repo and schema are known from the context - for the naive application repo is the Naive.Repo module and schema is the Naive.Schema.Settings module:
# /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
...
def autostart_workers do
Core.ServiceSupervisor.autostart_workers(
Naive.Repo, # <= new value passed
Naive.Schema.Settings # <= new value passed
)
endThis finishes the first of multiple paths that we need to follow to fully refactor the Core.ServiceSupervisor module.
12.5.2 Second path starting with the update_status/2
Let’s not waste time and start from the other helper function inside the Core.ServiceSupervisor module.
This time we will make the update_status/2 function fully generic:
# /apps/core/lib/core/service_supervisor.ex
def update_status(symbol, status, repo, schema) # <= args added
when is_binary(symbol) and is_binary(status) do
repo.get_by(schema, symbol: symbol) # <= using dynamic repo and schema modules
|> Ecto.Changeset.change(%{status: status})
|> repo.update() # <= using dynamic repo module
endAs previously we added repo and schema as arguments and modified the body of the function to utilize them
instead of hardcoded modules(aliased at the top of the Core.ServiceSupervisor module).
In the same fashion as previously, we need to check “who” is using the update_status/2 and update those calls to update_status/4.
The function is used inside the start_worker/1 and the stop_worker/1 inside the
Core.ServiceSupervisor module so as previously we need to bubble them up(pass via arguments to both start_worker/1 and stop_worker/1 functions):
# /apps/core/lib/core/service_supervisor.ex
def start_worker(symbol, repo, schema) when is_binary(symbol) do # <= new args
...
{:ok, _settings} = update_status(symbol, "on", repo, schema) # <= args passed
...
{:ok, _settings} = update_status(symbol, "on", repo, schema) # <= args passed
...
end
def stop_worker(symbol, repo, schema) when is_binary(symbol) do # <= new args
...
{:ok, _settings} = update_status(symbol, "off", repo, schema) # <= args passed
...
{:ok, _settings} = update_status(symbol, "off", repo, schema) # <= args passed
...
endAs we modified both start_worker/1 and stop_worker/1 by adding two additional arguments we need to update all references to them and here is where things branch out a bit.
We will start with start_worker/1 function(which is now start_worker/3) - it’s used by the autostart_workers/2 inside Core.ServiceSupervisor module. The autostart_workers/2 function already has repo and schema so we can just pass them to the start_worker/3:
# /apps/core/lib/core/service_supervisor.ex
def autostart_workers(repo, schema) do
fetch_symbols_to_start(repo, schema)
|> Enum.map(&start_worker(&1, repo, schema)) # <= args passed
endBoth the start_worker/3 and the stop_worker/3 function are used by the functions inside the Naive.DynamicSymbolSupervisor module. We need to pass the Repo and Schema in the same fashion as previously with the autostart_workers/2 function:
# /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
def start_worker(symbol) do
Core.ServiceSupervisor.start_worker(
symbol,
Naive.Repo, # <= new arg passed
Naive.Schema.Settings # <= new arg passed
)
end
def stop_worker(symbol) do
Core.ServiceSupervisor.stop_worker(
symbol,
Naive.Repo, # <= new arg passed
Naive.Schema.Settings # <= new arg passed
)
endAt this moment there’s no code inside the Core.ServiceSupervisor module referencing the aliased Repo nor Schema modules so we can safely remove both aliases - definitely, we are moving in the right direction!
Btw. Our project still works at this stage, we can start/stop trading and it autostarts trading.
12.5.3 Third path starting with the get_pid/1 function
Starting again from the most nested helper function - this time the get_pid/1:
# /apps/core/lib/core/service_supervisor.ex
def get_pid(symbol) do
Process.whereis(:"Elixir.Naive.SymbolSupervisor-#{symbol}")
end We can see that it has a hardcoded Naive.SymbolSupervisor worker module - we need to make this part dynamic by using the worker_module argument:
# /apps/core/lib/core/service_supervisor.ex
def get_pid(worker_module, symbol) do # <= arg added
Process.whereis(:"#{worker_module}-#{symbol}") # <= arg used
endNote about atoms usage: As noted back in Chapter 5, this dynamic atom creation is safe for our finite set of trading symbols, but we’ll replace it with a Registry-based approach in the next chapter.
Moving up to functions that are referencing the get_pid/1 function, those will be the start_worker/3 and the stop_worker/3 function.
As those are the two last functions to be updated, we will look into them more closely to finish our refactoring in this 3rd run.
At this moment both need to add worker_module as both are calling the get_pid/2 function.
Looking at both functions we can see two other hardcoded details:
- inside log message there are words “trading” - we can replace them so we will utilize the
worker_moduleandsymbolarguments - there are two references to the
Naive.DynamicSymbolSupervisorwhich we will replace with themoduleargument - there is one more reference to the
Naive.SymbolSupervisormodule which we will replace with theworker_moduleargument
Let’s look at updated functions:
# /apps/core/lib/core/service_supervisor.ex
# module and worker_module args added vvvv
def start_worker(symbol, repo, schema, module, worker_module)
when is_binary(symbol) do
case get_pid(worker_module, symbol) do # <= worker_module passed
nil ->
Logger.info("Starting #{worker_module} worker for #{symbol}")
# ^^^ dynamic text
{:ok, _settings} = update_status(symbol, "on", repo, schema)
{:ok, _pid} = DynamicSupervisor.start_child(module, {worker_module, symbol})
# ^^^ args used
pid ->
Logger.warning("#{worker_module} worker for #{symbol} already started")
# ^^^ dynamic text
{:ok, _settings} = update_status(symbol, "on", repo, schema)
{:ok, pid}
end
end
# module and worker_module added as args vvvv
def stop_worker(symbol, repo, schema, module, worker_module)
when is_binary(symbol) do
case get_pid(worker_module, symbol) do # <= worker_module passed
nil ->
Logger.warning("#{worker_module} worker for #{symbol} already stopped")
# ^^^ dynamic text
{:ok, _settings} = update_status(symbol, "off", repo, schema)
pid ->
Logger.info("Stopping #{worker_module} worker for #{symbol}")
# ^^^ dynamic text
:ok = DynamicSupervisor.terminate_child(module, pid) # <= arg used
{:ok, _settings} = update_status(symbol, "off", repo, schema)
end
endInside both the start_worker/5 and the stop_worker/5 functions we modified:
get_pid/1to pass theworker_moduleLogger’s messages to use theworker_moduleandsymbolDynamicSupervisor’s functions to use themoduleand theworker_module
Again, as we modified start_worker/5 we need to make the last change inside the
Core.ServiceSupervisor module - autostart_workers/2 uses the start_worker/5 function:
# /apps/core/lib/core/service_supervisor.ex
def autostart_workers(repo, schema, module, worker_module) do # <= args added
fetch_symbols_to_start(repo, schema)
|> Enum.map(&start_worker(&1, repo, schema, module, worker_module)) # <= args added
endJust for reference - the final function headers look as follows:
# /apps/core/lib/core/service_supervisor.ex
defmodule Core.ServiceSupervisor do
def autostart_workers(repo, schema, module, worker_module) do
...
end
def start_worker(symbol, repo, schema, module, worker_module)
when is_binary(symbol) do
...
end
def stop_worker(symbol, repo, schema, module, worker_module)
when is_binary(symbol) do
...
end
def get_pid(worker_module, symbol) do
...
end
def update_status(symbol, status, repo, schema)
when is_binary(symbol) and is_binary(status) do
...
end
defp fetch_symbols_to_start(repo, schema) do
...
end
endThat finishes the 3rd round of updates inside the Core.ServiceSupervisor module, now we need to update the Naive.DynamicSymbolSupervisor module to use updated functions(and pass required arguments):
# /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
...
def autostart_workers do
Core.ServiceSupervisor.autostart_workers(
Naive.Repo,
Naive.Schema.Settings,
__MODULE__, # <= added arg
Naive.SymbolSupervisor # <= added arg
)
end
def start_worker(symbol) do
Core.ServiceSupervisor.start_worker(
symbol,
Naive.Repo,
Naive.Schema.Settings,
__MODULE__, # <= added arg
Naive.SymbolSupervisor # <= added arg
)
end
def stop_worker(symbol) do
Core.ServiceSupervisor.stop_worker(
symbol,
Naive.Repo,
Naive.Schema.Settings,
__MODULE__, # <= added arg
Naive.SymbolSupervisor # <= added arg
)
end
def shutdown_worker(symbol) when is_binary(symbol) do
case Core.ServiceSupervisor.get_pid(Naive.SymbolSupervisor, symbol) do # <= arg added
nil ->
Logger.warning("#{Naive.SymbolSupervisor} worker for #{symbol} already stopped")
# ^^^ updated
{:ok, _settings} =
Core.ServiceSupervisor.update_status(
symbol,
"off",
Naive.Repo,
Naive.Schema.Settings
) # ^^^ args added
_pid ->
Logger.info(
"Initializing shutdown of #{Naive.SymbolSupervisor} worker for #{symbol}"
) # ^^^ updated
{:ok, settings} =
Core.ServiceSupervisor.update_status(
symbol,
"shutdown",
Naive.Repo,
Naive.Schema.Settings
) # ^^^ additional args passed
Naive.Leader.notify(:settings_updated, settings)
{:ok, settings}
end
endWe needed to update references inside the shutdown_worker/1 function as well, as it calls get_pid/2 and update_status/4 functions.
We are now done with refactoring the Core.ServiceSupervisor module, it’s completely generic and can be used inside both streamer and the naive applications.
At this moment, to use the Core.ServiceSupervisor module we need to write interface functions in our supervisors
and pass multiple arguments in each one - we’d essentially be copying those functions into both the streamer and naive applications again.
In the next section we will look into how we could leverage Elixir macros
to remove that boilerplate.
12.6 Remove boilerplate using use macro
Elixir provides a way to use other modules.
The idea is that inside the Naive.DynamicSymbolSupervisor module we are useing the DynamicSupervisor module
currently but we could use Core.ServiceSupervisor:
# /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
defmodule Naive.DynamicSymbolSupervisor do
use Core.ServiceSupervisorTo be able to use the Core.ServiceSupervisor module it needs to provide the __using__/1 macro.
The simplest content we can use for that macro would be to just use the DynamicSupervisor inside:
# /apps/core/lib/core/service_supervisor.ex
defmacro __using__(opts) do
IO.inspect(opts)
quote location: :keep do
use DynamicSupervisor
end
endHow does this work? Think of quote as a “code template” and unquote as the “fill in the blank” markers.
When Elixir compiles a module that uses ours, it:
1. Takes everything inside the quote ... do ... end block as a template
2. Looks for unquote(...) calls and replaces them with actual values
3. Pastes the resulting code into the module that called use
It’s similar to string interpolation ("Hello #{name}"), but instead of building strings, we’re building code.
The IO.inspect(opts) will print at compile time - not runtime - which lets us see what values are being passed in.
This will become clearer as we work through our first example.
At this moment(after swapping to use Core.ServiceSupervisor) our code still works and it’s exactly as
we would simply have use DynamicSupervisor inside the Naive.DynamicSymbolSupervisor module -
as at compilation, it will be swapped to it either way(as per contents of the __using__/1 macro).
As the autostart_workers/0 function is a part of the boilerplate, we will move it from the Naive.DynamicSymbolSupervisor module
to the Core.ServiceSupervisor module inside the __using__/1 macro.
Ok, but it has all of those other naive application-specific arguments - where will we get those?
That’s what that opts argument is for inside the __using__/1 macro. When we call
use Core.ServiceSupervisor we can pass an additional keyword list which will contain all naive application-specific details:
# /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
defmodule Naive.DynamicSymbolSupervisor do
use Core.ServiceSupervisor,
repo: Naive.Repo,
schema: Naive.Schema.Settings,
module: __MODULE__,
worker_module: Naive.SymbolSupervisorWe can now update the __using__/1 macro to assign all of those details to variables(instead of using IO.inspect/1):
# /apps/core/lib/core/service_supervisor.ex
defmacro __using__(opts) do
{:ok, repo} = Keyword.fetch(opts, :repo)
{:ok, schema} = Keyword.fetch(opts, :schema)
{:ok, module} = Keyword.fetch(opts, :module)
{:ok, worker_module} = Keyword.fetch(opts, :worker_module)
...At this moment we can use those dynamic values to generate code that will be specific to the implementation module
for example autostart_workers/0 that we moved from the Naive.DynamicSymbolSupervisor module and
will need to have different values passed to it(like Streamer.Binance as worker_module) for the streamer application.
We can see that it requires inserting those dynamic values inside the autostart_workers/0 but how to dynamically inject arguments -
unquote to the rescue. When we will update the autostart_workers/0 function from:
# sample moved code from the `Naive.DynamicSymbolSupervisor` module
def autostart_workers do
Core.ServiceSupervisor.autostart_workers(
Naive.Repo,
Naive.Schema.Settings,
__MODULE__,
Naive.SymbolSupervisor
)
endto:
# updated code that will become part of the `__using__/1` macro
def autostart_workers do
Core.ServiceSupervisor.autostart_workers(
unquote(repo),
unquote(schema),
unquote(module),
unquote(worker_module)
)
endAt the end, generated code that will be “pasted” to the Naive.DynamicSymbolSupervisor module at compile time will be:
# compiled code attached to the `Naive.DynamicSymbolSupervisor` module
def autostart_workers do
Core.ServiceSupervisor.autostart_workers(
Naive.Repo,
Naive.Schema.Settings,
Naive.DynamicSymbolSupervisor,
Naive.SymbolSupervisor
)
endThis way we can dynamically create functions for any application(for the streamer application, it will generate function call with the Streamer.Repo, Streamer.Schema.Settings args, etc).
We can apply that to all of the passed variables inside the autostart_workers/0 function - just for reference full macro will look as follows:
# /apps/core/lib/core/service_supervisor.ex
defmacro __using__(opts) do
{:ok, repo} = Keyword.fetch(opts, :repo)
{:ok, schema} = Keyword.fetch(opts, :schema)
{:ok, module} = Keyword.fetch(opts, :module)
{:ok, worker_module} = Keyword.fetch(opts, :worker_module)
quote location: :keep do
use DynamicSupervisor
def autostart_workers do
Core.ServiceSupervisor.autostart_workers(
unquote(repo),
unquote(schema),
unquote(module),
unquote(worker_module)
)
end
end
endYou can think of the macro as a sophisticated copy-paste machine.
It takes the values you passed to opts, substitutes them wherever it sees unquote(...),
and then injects the resulting code block directly into the Naive.DynamicSymbolSupervisor at compile-time
- we can visualize generated/“pasted” code as:
# generated by the `__using__/1` macro injected into the `Naive.DynamicSymbolSupervisor`
use DynamicSupervisor
def autostart_workers do
Core.ServiceSupervisor.autostart_workers(
Naive.Repo,
Naive.Schema.Settings,
Naive.DynamicSymbolSupervisor,
Naive.SymbolSupervisor
)
endThis is exactly the code that we had before inside the Naive.DynamicSymbolSupervisor module but now it’s stored away inside the Core.ServiceSupervisor’s __using__/1 macro and it doesn’t need to be implemented/copied across twice into two apps anymore.
We can now follow the same principle and move start_worker/1 and stop_worker/1 from the
Naive.DynamicSymbolSupervisor module into __using__/1 macro inside the Core.ServiceSupervisor module:
# /apps/core/lib/core/service_supervisor.ex
# append the below before the end of the __using__/1 macro
def start_worker(symbol) do
Core.ServiceSupervisor.start_worker(
symbol, # <= this needs to stay as variable
unquote(repo),
unquote(schema),
unquote(module),
unquote(worker_module)
)
end
def stop_worker(symbol) do
Core.ServiceSupervisor.stop_worker(
symbol, # <= this needs to stay as variable
unquote(repo),
unquote(schema),
unquote(module),
unquote(worker_module)
)
endHere we have a crucial distinction:
symbol is a runtime variable that changes with each function call, while repo,
schema, etc. are compile-time constants that stay fixed for the entire module.
Common Pitfall: If you accidentally unquote(symbol), the macro would try to evaluate symbol at compile time -
when it doesn’t exist yet - and you’d get a compilation error.
The rule of thumb: unquote values you know at compile time (from opts), leave runtime variables alone.
At this moment the Naive.DynamicSymbolSupervisor consists of only start_link/1, init/1 and
shutdown_worker/1, it’s under 50 lines of code and works exactly as before refactoring.
All of the boilerplate was moved to the Core.ServiceSupervisor module.
We left the shutdown_worker/1 function as it’s specific to the naive application, but inside it,
we utilize both the get_pid/2 and the update_status/4 functions where we are passing the naive application-specific variables(like Naive.Repo).
To make things even nicer we can create convenience wrappers for those two functions inside the __using__/1 macro:
# /apps/core/lib/core/service_supervisor.ex
# add below at the end of `quote` block inside `__using__/1`
defp get_pid(symbol) do
Core.ServiceSupervisor.get_pid(
unquote(worker_module),
symbol
)
end
defp update_status(symbol, status) do
Core.ServiceSupervisor.update_status(
symbol,
status,
unquote(repo),
unquote(schema)
)
endAs those will get compiled and “pasted” into the Naive.DynamicSymbolSupervisor module we can utilize them inside the shutdown_worker/1 function as they would be much simpler naive application-specific local functions:
# /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
def shutdown_worker(symbol) when is_binary(symbol) do
case get_pid(symbol) do # <= macro provided function
nil ->
Logger.warning("#{Naive.SymbolSupervisor} worker for #{symbol} already stopped")
{:ok, _settings} = update_status(symbol, "off") # <= macro provided function
_pid ->
Logger.info(
"Initializing shutdown of #{Naive.SymbolSupervisor} worker for #{symbol}"
)
{:ok, settings} = update_status(symbol, "shutdown") # <= macro provided function
Naive.Leader.notify(:settings_updated, settings)
{:ok, settings}
end
endAnd now, a very last change - I promise ;) Both the start_link/1 and the init/1 functions are still referencing the DynamicSupervisor module
which could be a little bit confusing - let’s swap those calls to use the Core.ServiceSupervisor module(both to not confuse people and
be consistent with the use macro):
# /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
def start_link(init_arg) do
Core.ServiceSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
def init(_init_arg) do
Core.ServiceSupervisor.init(strategy: :one_for_one)
endAs we don’t want/need to do anything different inside the Core.ServiceSupervisor module than the DynamicSupervisor is doing we can just delegate both of those inside the Core.ServiceSupervisor module:
# /apps/core/lib/core/service_supervisor.ex
defdelegate start_link(module, args, opts), to: DynamicSupervisor
defdelegate init(opts), to: DynamicSupervisorThat finishes our refactoring of both the Naive.DynamicSymbolSupervisor and the
Core.ServiceSupervisor modules.
12.7 Manual testing in IEx
We can test to confirm that everything works as expected:
$ iex -S mix
iex(1)> Naive.start_trading("XRPUSDT")
21:42:37.741 [info] Starting Elixir.Naive.SymbolSupervisor worker for XRPUSDT
21:42:37.768 [info] Starting new supervision tree to trade on XRPUSDT
{:ok, #PID<0.464.0>}
21:42:39.455 [info] Initializing new trader(1614462159452) for XRPUSDT
iex(2)> Naive.stop_trading("XRPUSDT")
21:43:08.362 [info] Stopping Elixir.Naive.SymbolSupervisor worker for XRPUSDT
{:ok,
%Naive.Schema.Settings{
...
}}
iex(3)> Naive.start_trading("ETHUSDT")
21:44:08.689 [info] Starting Elixir.Naive.SymbolSupervisor worker for ETHUSDT
21:44:08.723 [info] Starting new supervision tree to trade on ETHUSDT
{:ok, #PID<0.475.0>}
21:44:11.182 [info] Initializing new trader(1614462251182) for ETHUSDT
BREAK: (a)bort (A)bort with dump (c)ontinue (p)roc info (i)nfo
(l)oaded (v)ersion (k)ill (D)b-tables (d)istribution
$ iex -S mix
21:47:22.119 [info] Starting Elixir.Naive.SymbolSupervisor worker for ETHUSDT
21:47:22.161 [info] Starting new supervision tree to trade on ETHUSDT
21:47:24.213 [info] Initializing new trader(1614462444212) for ETHUSDT
iex(1)> Naive.shutdown_trading("ETHUSDT")
21:48:42.003 [info] Initializing shutdown of Elixir.Naive.SymbolSupervisor worker for
ETHUSDT
{:ok,
%Naive.Schema.Settings{
...
}}The above test confirms that we can start, stop, and shut down trading on a symbol as well as autostarting of trading works.
12.8 Use the Core.ServiceSupervisor module inside the streamer application
As we are happy with the implementation of the Core.ServiceSupervisor module we can upgrade the streamer application to use it.
We need to start with adding the core application to the list of dependencies of the streamer application:
# /apps/streamer/mix.exs
defp deps do
[
{:binance, "~> 1.0"},
{:core, in_umbrella: true}, # <= core added to deps
...We can now move on to the Streamer.DynamicStreamerSupervisor where we will remove everything(really everything including imports, aliases and even require) beside the start_link/1 and the init/1. As with the Naive.DynamicSymbolSupervisor we will use the Core.ServiceSupervisor and pass all required options - full implementation of the Streamer.DynamicStreamerSupervisor module should look as follows:
# /apps/streamer/lib/streamer/dynamic_streamer_supervisor.ex
defmodule Streamer.DynamicStreamerSupervisor do
use Core.ServiceSupervisor,
repo: Streamer.Repo,
schema: Streamer.Schema.Settings,
module: __MODULE__,
worker_module: Streamer.Binance
def start_link(init_arg) do
Core.ServiceSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
def init(_init_arg) do
Core.ServiceSupervisor.init(strategy: :one_for_one)
end
endNot much to add here - we are useing the Core.ServiceSupervisor module and
passing options to it so the macro can generate streamer application-specific wrappers
(like start_worker/1 or stop_worker/1 with required repo, schema, etc) around generic logic from the Core.ServiceSupervisor module.
Using the Core.ServiceSupervisor module will have an impact on the interface of the
Streamer.DynamicStreamerSupervisor as it will now provide functions like start_worker/1 instead of start_streaming/1 etc.
As with the naive application, we need to update the Task function inside the Streamer.Supervisor module:
# /apps/streamer/lib/streamer/supervisor.ex
...
{Task,
fn ->
Streamer.DynamicStreamerSupervisor.autostart_workers()
end}
...As well as main Streamer module needs to forward calls instead of delegating:
# /apps/streamer/lib/streamer.ex
alias Streamer.DynamicStreamerSupervisor
def start_streaming(symbol) do
symbol
|> String.upcase()
|> DynamicStreamerSupervisor.start_worker()
end
def stop_streaming(symbol) do
symbol
|> String.upcase()
|> DynamicStreamerSupervisor.stop_worker()
endWe can run a quick test to confirm that indeed everything works as expected:
$ iex -S mix
iex(1)> Streamer.start_streaming("XRPUSDT")
22:10:38.813 [info] Starting Elixir.Streamer.Binance worker for XRPUSDT
{:ok, #PID<0.465.0>}
iex(2)> Streamer.stop_streaming("XRPUSDT")
22:10:48.212 [info] Stopping Elixir.Streamer.Binance worker for XRPUSDT
{:ok,
%Streamer.Schema.Settings{
__meta__: #Ecto.Schema.Metadata<:loaded, "settings">,
id: "db8c9429-2356-4243-a08f-0d0e89b74986",
inserted_at: ~N[2021-02-25 22:15:16],
status: "off",
symbol: "XRPUSDT",
updated_at: ~N[2021-02-27 22:10:48]
}}
iex(3)> Streamer.start_streaming("LTCUSDT")
22:26:03.361 [info] Starting Elixir.Streamer.Binance worker for LTCUSDT
{:ok, #PID<0.490.0>}
BREAK: (a)bort (A)bort with dump (c)ontinue (p)roc info (i)nfo
(l)oaded (v)ersion (k)ill (D)b-tables (d)istribution
^C
$ iex -S mix
...
22:26:30.775 [info] Starting Elixir.Streamer.Binance worker for LTCUSDTThis finishes the implementation for both the streamer and the naive application. We are generating dynamic functions(metaprogramming) using Elixir macros which is a cool exercise to go through and feels like superpowers ;)
We’ve successfully eliminated the duplication! Here’s what we achieved:
- Created a generic
Core.ServiceSupervisormodule with fully parameterized functions - Used the
__using__/1macro to generate application-specific wrapper functions at compile time - Reduced
Naive.DynamicSymbolSupervisorto under 50 lines while keeping all functionality - Applied the same pattern to
Streamer.DynamicStreamerSupervisorwith just a few lines of configuration
The key insight is that macros let us write code that writes code.
Instead of copying the same functions into every supervisor,
we define them once with unquote placeholders, and Elixir fills in the application-specific values at compile time.
A learning milestone, not the final destination:
Working through this macro implementation taught us how Elixir’s metaprogramming works under the hood - and that knowledge will serve us well when reading Phoenix source code or other libraries that use macros heavily. However, as the Elixir documentation notes, “macros should only be used as a last resort” because they make debugging harder and can confuse developers unfamiliar with the codebase.
Here’s the good news: in the next chapter, we’ll discover that the Registry module provides a simpler way to manage named processes -
no macros required. We’ll use it to build the data_warehouse application, and you’ll see firsthand how it compares to our macro-based approach.
By the end of chapter 13, we’ll have a cleaner solution that supersedes the Core.ServiceSupervisor entirely.
This is exactly how real-world development works: we try one approach, learn from it, and find something better.
The macro exercise wasn’t wasted - understanding quote, unquote, and __using__/1 is valuable Elixir knowledge.
We just won’t need it for this particular problem.
So, What’s Next?
Our trading bot can now start, stop, and autostart both streaming and trading. But we’re missing something crucial for any serious trading system: data persistence.
In the next chapter, we’ll create a data_warehouse application that subscribes to trade events and orders,
storing them in a database. Along the way, we’ll discover a simpler pattern for managing dynamic workers using the Registry -
one that’s so clean it will make us question whether we needed macros at all.
(Spoiler: we’ll end up removing the Core.ServiceSupervisor module entirely as a challenge exercise!)
This historical data will be essential for the chapter after that, where we’ll implement backtesting - running our strategy against past market data to see how it would have performed.
[Note] Please remember to run mix format to keep things nice and tidy.
The source code for this chapter can be found in the book’s source code repository (branch: chapter_12).