Chapter 14 Store trade events and orders inside the database
14.1 Objectives
- overview of requirements
- create a new
data_warehouseapplication in the umbrella - connect to the database using Ecto
- store trade events’ data
- store orders’ data
- implement supervision
14.2 Overview of requirements
In the next chapter, we will move on to testing our strategy against historical data(aka backtesting - I will explain that process in the next chapter). What we need to have in place before we will be able to do that is both trade events and orders stored in the database.
Starting with the trade events. The streamer application could store trade events from Binance inside its database but how would that work if we would like to introduce another source of non-streamed trade events(ie. flat files, HTTP polling). It would be better if the Streamer.Binance process would keep on streaming those trade events as it is and we would create a new application that would subscribe to the existing TRADE_EVENTS:#{symbol} topic and store them in the database.
A similar idea applies to the orders’ data. At this moment the naive application uses the Binance module to place orders. We could store them inside the naive application’s database but how would that work if we would like to introduce another trading strategy. Holding data in separate databases for each strategy would cause further complications in future reporting, auditing, etc.
To store trade events’ and orders’ data we will create a new application called data_warehouse inside our umbrella project. It will subscribe to a TRADE_EVENTS:#{symbol} stream as well as ORDERS:#{symbol} stream, convert broadcasted data to its own representations(structs) and store it inside the database.
Trade events are already broadcasted to the PubSub topic, orders on the other hand aren’t. We will need to modify the Naive.Trader module to broadcast the new and updated orders to the ORDERS:#{symbol} topic.
After implementing the basic worker that will store the incoming data(trade events and orders) inside the database, we will look into adding a supervision tree utilizing Elixir Registry. It will allow us to skip registering every worker with a unique atom and will offer an easy lookup to fetch PIDs instead.
14.3 Create a new data_warehouse application in the umbrella
Let’s start by creating a new application called data_warehouse inside our umbrella:
$ cd apps
$ mix new data_warehouse --sup
* creating README.md
* creating .formatter.exs
* creating .gitignore
* creating mix.exs
* creating lib
* creating lib/data_warehouse.ex
* creating lib/data_warehouse/application.ex
* creating test
* creating test/test_helper.exs
* creating test/data_warehouse_test.exs
...14.4 Connect to the database using Ecto
We can now follow similar steps as previously and add required dependencies (like the ecto) to its deps by modifying its mix.exs file:
# /apps/data_warehouse/mix.exs
defp deps do
[
{:binance, "~> 1.0"},
{:ecto_sql, "~> 3.0"},
{:ecto_enum, "~> 1.4"},
{:phoenix_pubsub, "~> 2.0"},
{:postgrex, ">= 0.0.0"},
{:streamer, in_umbrella: true}
]
endAdditionally, we added the phoenix_pubsub(to subscribe to the PubSub topic), the streamer application(to use its Streamer.Binance.TradeEvent struct) and the binance package(to pattern match it’s structs).
We can now jump back to the terminal to install added dependencies and generate a new Ecto.Repo module:
$ mix deps.get
...
$ cd apps/data_warehouse
$ mix ecto.gen.repo -r DataWarehouse.Repo
* creating lib/data_warehouse
* creating lib/data_warehouse/repo.ex
* updating ../../config/config.exsBefore we will be able to create migrations that will create our tables we need to update the generated configuration inside the config/config.exs file:
# /config/config.exs
...
config :data_warehouse, # <= added line
ecto_repos: [DataWarehouse.Repo] # <= added line
config :data_warehouse, DataWarehouse.Repo,
database: "data_warehouse", # <= updated line
username: "postgres", # <= updated line
password: "hedgehogSecretPassword", # <= updated line
hostname: "localhost"
...and add the DataWarehouse.Repo module to the children list of the DataWarehouse.Application’s process:
# /apps/data_warehouse/lib/data_warehouse/application.ex
...
children = [
{DataWarehouse.Repo, []}
]
...The last step will be to create a database by running mix ecto.create -r DataWarehouse.Repo command.
This ends up the setup of the Ecto - we can now move on to the implementation of storing the orders and the trade events.
14.5 Store trade events’ data
The first step to store trade events inside the database will be to create a table that will hold our data. We will start by creating the migration:
$ cd apps/data_warehouse
$ mix ecto.gen.migration create_trade_events
* creating priv/repo/migrations
* creating priv/repo/migrations/20210222224514_create_trade_events.exsThe Streamer.Binance.TradeEvent struct will serve as a list of columns for our new trade_events table. Here’s the full implementation of our migration:
# /apps/data_warehouse/priv/repo/migrations/20210222224514_create_trade_events.exs
defmodule DataWarehouse.Repo.Migrations.CreateTradeEvents do
use Ecto.Migration
def change do
create table(:trade_events, primary_key: false) do
add(:id, :uuid, primary_key: true)
add(:event_type, :text)
add(:event_time, :bigint)
add(:symbol, :text)
add(:trade_id, :integer)
add(:price, :text)
add(:quantity, :text)
add(:buyer_order_id, :bigint)
add(:seller_order_id, :bigint)
add(:trade_time, :bigint)
add(:buyer_market_maker, :bool)
timestamps()
end
end
endWe added the additional id field to easily identify each trade event and our timestamps for monitoring.
Let’s run the migration so it will create a new trade_events table for us:
The next step will be to create a new directory called schema inside the
apps/data_warehouse/lib/data_warehouse directory. Inside it, we need to create a new schema file called trade_event.ex. We can copy across the same columns from the migration straight to schema:
# /apps/data_warehouse/lib/data_warehouse/schema/trade_event.ex
defmodule DataWarehouse.Schema.TradeEvent do
use Ecto.Schema
@primary_key {:id, :binary_id, autogenerate: true}
schema "trade_events" do
field(:event_type, :string)
field(:event_time, :integer)
field(:symbol, :string)
field(:trade_id, :integer)
field(:price, :string)
field(:quantity, :string)
field(:buyer_order_id, :integer)
field(:seller_order_id, :integer)
field(:trade_time, :integer)
field(:buyer_market_maker, :boolean)
timestamps()
end
endAt this moment we should be able to execute crud(create, read[select], update, delete) operations over the table using the above struct.
Currently, we can already store the trade events’ data inside the database so we can move on to collecting it. Trade events are getting broadcasted by the Streamer.Binance process here:
# /apps/streamer/lib/streamer/binance.ex
...
Phoenix.PubSub.broadcast(
Streamer.PubSub,
"TRADE_EVENTS:#{trade_event.symbol}",
trade_event
)
...We will implement a subscriber process that will be given a PubSub topic and will store incoming data inside the database.
Let’s start by creating a new folder called subscriber inside the
apps/data_warehouse/lib/data_warehouse directory together with a new file called worker.ex inside it:
# /apps/data_warehouse/lib/data_warehouse/subscriber/worker.ex
defmodule DataWarehouse.Subscriber.Worker do
use GenServer
require Logger
defmodule State do
@enforce_keys [:topic]
defstruct [:topic]
end
def start_link(topic) do
GenServer.start_link(
__MODULE__,
topic,
name: :"#{__MODULE__}-#{topic}"
)
end
def init(topic) do
{:ok,
%State{
topic: topic
}}
end
endAt this moment it’s just a box standard implementation of the GenServer with a state struct containing a single key(:topic). We need to update the init/1 function to subscribe to the PubSub topic:
# /apps/data_warehouse/lib/data_warehouse/subscriber/worker.ex
def init(topic) do
Logger.info("DataWarehouse worker is subscribing to #{topic}")
Phoenix.PubSub.subscribe(
Streamer.PubSub,
topic
)
...Next, we need to add a handler for received messages:
# /apps/data_warehouse/lib/data_warehouse/subscriber/worker.ex
def handle_info(%Streamer.Binance.TradeEvent{} = trade_event, state) do
opts =
trade_event
|> Map.from_struct()
struct!(DataWarehouse.Schema.TradeEvent, opts)
|> DataWarehouse.Repo.insert()
{:noreply, state}
endAs we did in the case of the Naive.Trader, all incoming messages trigger a handle_info/2 callback with the contents of the message and the current state of the subscriber worker. We just convert that incoming trade event to a map and then that map to the TradeEvent struct that gets inserted into the database.
This finishes storing of trade events implementation which we can test by in the interactive shell by running:
$ iex -S mix
...
iex(1)> Streamer.start_streaming("XRPUSDT")
00:48:30.147 [info] Starting Elixir.Streamer.Binance worker for XRPUSDT
{:ok, #PID<0.395.0>}
iex(2)> DataWarehouse.Subscriber.Worker.start_link("TRADE_EVENTS:XRPUSDT")
00:49:48.204 [info] DataWarehouse worker is subscribing to TRADE_EVENTS:XRPUSDT
{:ok, #PID<0.405.0>}After a couple of minutes we can check the database using psql:
$ psql -Upostgres -h127.0.0.1
Password for user postgres:
...
postgres=# \c data_warehouse;
You are now connected to database "data_warehouse" as user "postgres".
data_warehouse=# \x
Expanded display is on.
data_warehouse=# SELECT * FROM trade_events;
-[ RECORD 1 ]------+-------------------------------------
id | f6eae686-946a-4e34-9c33-c7034c2cad5d
event_type | trade
event_time | 1614041388236
symbol | XRPUSDT
trade_id | 152765072
price | 0.56554000
quantity | 1199.10000000
buyer_order_id | 1762454848
seller_order_id | 1762454775
trade_time | 1614041388235
buyer_market_maker | f
inserted_at | 2021-02-23 00:49:48
...As we can see in the above output, trade events are now getting stored inside the database.
14.6 Store orders’ data
In the same fashion as with trade events’ data above, to store orders data we will create an orders table inside a new migration:
$ cd apps/data_warehouse
$ mix ecto.gen.migration create_orders
* creating priv/repo/migrations/20210222224522_create_orders.exsThe list of columns for this table will be a copy of Binance.Order struct returned from the Binance exchange:
# /apps/data_warehouse/priv/repo/migrations/20210222224522_create_orders.exs
defmodule DataWarehouse.Repo.Migrations.CreateOrders do
use Ecto.Migration
def change do
create table(:orders, primary_key: false) do
add(:order_id, :bigint, primary_key: true)
add(:client_order_id, :text)
add(:symbol, :text)
add(:price, :text)
add(:original_quantity, :text)
add(:executed_quantity, :text)
add(:cummulative_quote_quantity, :text)
add(:status, :text)
add(:time_in_force, :text)
add(:type, :text)
add(:side, :text)
add(:stop_price, :text)
add(:iceberg_quantity, :text)
add(:time, :bigint)
add(:update_time, :bigint)
timestamps()
end
end
endWe updated all of the shortened names like orig_qty to full names like original_quantity.
Let’s run the migration so it will create a new orders table for us:
We can copy the above fields list to create a schema module. First, let’s create a new file called order.ex inside the apps/data_warehouse/lib/data_warehouse/schema directory:
# /apps/data_warehouse/lib/data_warehouse/schema/order.ex
defmodule DataWarehouse.Schema.Order do
use Ecto.Schema
@primary_key {:order_id, :integer, autogenerate: false}
schema "orders" do
field(:client_order_id, :string)
field(:symbol, :string)
field(:price, :string)
field(:original_quantity, :string)
field(:executed_quantity, :string)
field(:cummulative_quote_quantity, :string)
field(:status, :string)
field(:time_in_force, :string)
field(:type, :string)
field(:side, :string)
field(:stop_price, :string)
field(:iceberg_quantity, :string)
field(:time, :integer)
field(:update_time, :integer)
timestamps()
end
endWe can now add a handler to our DataWarehouse.Subscriber.Worker that will convert the Binance.Order struct to DataWarehouse.Schema.Order and store data inside the database:
# /apps/data_warehouse/lib/data_warehouse/subscriber/worker.ex
def handle_info(%Binance.Order{} = order, state) do
data =
order
|> Map.from_struct()
struct(DataWarehouse.Schema.Order, data)
|> Map.merge(%{
original_quantity: order.orig_qty,
executed_quantity: order.executed_qty,
cummulative_quote_quantity: order.cummulative_quote_qty,
iceberg_quantity: order.iceberg_qty
})
|> DataWarehouse.Repo.insert(
on_conflict: :replace_all,
conflict_target: :order_id
)
{:noreply, state}
end
...In the above code, we are copying the matching fields using the struct/2 function but all other fields that aren’t 1 to 1 between two structs won’t be copied, so we need to merge them in the second step(using the Map.merge/2 function). We are also using the on_conflict: :replace_all option to make the insert/2 function act as it would be upsert/2(to avoid writing separate logic for inserting and updating the orders).
Having all of this in place we will now be able to store broadcasted orders’ data in the database but there’s nothing actually broadcasting them.
We need to modify the Naive.Trader module to broadcast the Binance.Order whenever it places buy/sell orders or fetches them again:
# /apps/naive/lib/naive/trader.ex
...
# inside placing initial buy order callback
{:ok, %Binance.OrderResponse{} = order} =
@binance_client.order_limit_buy(symbol, quantity, price, "GTC")
:ok = broadcast_order(order)
...
# inside buy order (partially) filled callback
{:ok, %Binance.Order{} = current_buy_order} =
@binance_client.get_order(
symbol,
timestamp,
order_id
)
:ok = broadcast_order(current_buy_order)
...
# inside the same callback in case of buy order filled
{:ok, %Binance.OrderResponse{} = order} =
@binance_client.order_limit_sell(symbol, quantity, sell_price, "GTC")
:ok = broadcast_order(order)
...
# inside sell order (partially) filled callback
{:ok, %Binance.Order{} = current_sell_order} =
@binance_client.get_order(
symbol,
timestamp,
order_id
)
:ok = broadcast_order(current_sell_order)
...Above 4 places send both the Binance.OrderResponse and the Binance.Order structs - our broadcast_order/1 function needs to be able to handle them both. Add the following at the bottom of the Naive.Trader module:
# /apps/naive/lib/naive/trader.ex
defp broadcast_order(%Binance.OrderResponse{} = response) do
response
|> convert_to_order()
|> broadcast_order()
end
defp broadcast_order(%Binance.Order{} = order) do
Phoenix.PubSub.broadcast(
Streamer.PubSub,
"ORDERS:#{order.symbol}",
order
)
end
defp convert_to_order(%Binance.OrderResponse{} = response) do
data =
response
|> Map.from_struct()
struct(Binance.Order, data)
|> Map.merge(%{
cummulative_quote_qty: "0.00000000",
stop_price: "0.00000000",
iceberg_qty: "0.00000000",
is_working: true
})
endAs DataWarehouse.Subscriber.Worker process expects only the Binance.Order structs to be broadcasted, we first check is it the Binance.OrderResponse struct and convert the passed value to the Binance.Order struct (if that’s the case) and only then broadcast it to the PubSub topic.
The converting logic as previously uses the struct/2 function but it also merges in default values that are missing from the much smaller Binance.OrderResponse struct(with comparison to the Binance.Order).
At this moment we will be able to store orders inside the database and we can check that by running:
$ iex -S mix
...
iex(1)> DataWarehouse.Subscriber.Worker.start_link("ORDERS:NEOUSDT")
22:37:43.043 [info] DataWarehouse worker is subscribing to ORDERS:XRPUSDT
{:ok, #PID<0.400.0>}
iex(2)> Naive.start_trading("NEOUSDT")
22:38:39.741 [info] Starting Elixir.Naive.SymbolSupervisor worker for NEOUSDT
22:38:39.832 [info] Starting new supervision tree to trade on NEOUSDT
{:ok, #PID<0.402.0>}
22:38:41.654 [info] Initializing new trader(1614119921653) for NEOUSDT
iex(3)> Streamer.start_streaming("NEOUSDT")
22:39:23.786 [info] Starting Elixir.Streamer.Binance worker for NEOUSDT
{:ok, #PID<0.412.0>}
22:39:27.187 [info] The trader(1614119921653) is placing a BUY order for NEOUSDT @ 37.549,
quantity: 5.326
22:39:27.449 [info] The trader(1614119921653) is placing a SELL order for NEOUSDT @ 37.578,
quantity: 5.326.At this moment inside the DataWarehouse’s database we should see orders:
$ psql -Upostgres -h127.0.0.1
Password for user postgres:
...
postgres=# \c data_warehouse;
You are now connected to database "data_warehouse" as user "postgres".
data_warehouse=# \x
Expanded display is on.
data_warehouse=# SELECT * FROM orders;
-[ RECORD 1 ]--------------+---------------------------------
order_id | 1
client_order_id | C81E728D9D4C2F636F067F89CC14862C
symbol | NEOUSDT
price | 38.16
original_quantity | 5.241
executed_quantity | 0.00000000
cummulative_quote_quantity | 0.00000000
status | FILLED
time_in_force | GTC
type | LIMIT
side | BUY
stop_price | 0.00000000
iceberg_quantity | 0.00000000
time | 1614120906320
update_time | 1614120906320
inserted_at | 2021-02-23 22:55:10
updated_at | 2021-02-23 22:55:10
-[ RECORD 2 ]--------------+---------------------------------
order_id | 2
client_order_id | ECCBC87E4B5CE2FE28308FD9F2A7BAF3
symbol | NEOUSDT
price | 38.19
original_quantity | 5.241
executed_quantity | 0.00000000
cummulative_quote_quantity | 0.00000000
status | NEW
time_in_force | GTC
type | LIMIT
side | SELL
stop_price | 0.00000000
iceberg_quantity | 0.00000000
time |
update_time |
inserted_at | 2021-02-23 22:55:10
updated_at | 2021-02-23 22:55:10The first record above got inserted and updated as its state is “FILLED”, the second one wasn’t updated yet as it’s still in “NEW” state - that confirms that the upsert trick works.
That finishes the implementation of storing orders inside the database.
14.7 Implement supervision
Currently, we have a DataWarehouse.Subscriber.Worker process that will take care of storing data into the database, but sadly if anything will go wrong inside our worker and it will crash there’s no supervision in place to restart it.
The supervision tree for the data_warehouse application will be similar to ones from the naive and streamer apps but different enough to not use the Core.ServiceSupervisor abstraction.
For example, it doesn’t use the symbol column, it works based on the topic column. This would require changes to the Core.ServiceSupervisor’s functions like update_status/4 or fetch_symbols_to_start/2, we could update them to accept column name but that would need to be passed through other functions. We can see that this is probably not the best approach and the further we will get the more complex it will become. The second issue would be that we are registering all processes with names and that can be problematic as the list of processes will start to grow(as we can imagine in the case of the data_warehouse application).
The better approach would be to mix the DynamicSupervisor together with Registry.
The DynamicSupervisor will supervise the Subscriber.Workers and instead of keeping track of them by registering them using atoms we will start them :via Elixir Registry.
We will add all functionality that we implemented for naive and streamer applications. We will provide the functions to start and stop storing data on passed PubSub topics as well as store those topics inside the database so storing will be autostarted.
14.7.1 Create subscriber_settings table
To provide autostarting function we need to create a new migration that will create the subscriber_settings table:
$ cd apps/data_warehouse
$ mix ecto.gen.migration create_subscriber_settings
* creating priv/repo/migrations/20210227230123_create_subscriber_settings.exsAt this moment we can copy the code to create the settings table(enum and index as well) from the streamer application and tweak it to fit the data_warehouse application. So the first important change (besides updating namespaces from Streamer to DataWarehouse) will be to make a note that we have a setting per topic - not per symbol as for the naive and streamer applications:
# /apps/data_warehouse/priv/repo/migrations/20210227230123_create_subscriber_settings.exs
defmodule DataWarehouse.Repo.Migrations.CreateSubscriberSettings do
use Ecto.Migration
alias DataWarehouse.Schema.SubscriberStatusEnum
def change do
SubscriberStatusEnum.create_type()
create table(:subscriber_settings, primary_key: false) do
add(:id, :uuid, primary_key: true)
add(:topic, :text, null: false)
add(:status, SubscriberStatusEnum.type(), default: "off", null: false)
timestamps()
end
create(unique_index(:subscriber_settings, [:topic]))
end
endBoth schema and enum will be almost identical to the ones from the streamer application - we can simply copy those files and apply basic tweaks like updating the namespace:
$ cp apps/streamer/lib/streamer/schema/settings.ex \
apps/data_warehouse/lib/data_warehouse/schema/subscriber_settings.ex
$ cp apps/streamer/lib/streamer/schema/streaming_status_enum.ex \
apps/data_warehouse/lib/data_warehouse/schema/subscriber_status_enum.exRemember about updating the symbol column to topic as well as table name inside the
DataWarehouse.Schema.SubscriberSettings:
# /apps/data_warehouse/lib/data_warehouse/schema/subscriber_settings.ex
defmodule DataWarehouse.Schema.SubscriberSettings do
use Ecto.Schema
alias DataWarehouse.Schema.SubscriberStatusEnum
@primary_key {:id, :binary_id, autogenerate: true}
schema "subscriber_settings" do
field(:topic, :string)
field(:status, SubscriberStatusEnum)
timestamps()
end
endInside apps/data_warehouse/lib/data_warehouse/schema/subscriber_status_enum.ex we need to swap references of Streamer to DataWarehouse and references of StreamingStatusEnum to SubscriberStatusEnum:
# /apps/data_warehouse/lib/data_warehouse/schema/subscriber_status_enum.ex
import EctoEnum
defenum(DataWarehouse.Schema.SubscriberStatusEnum, :subscriber_status, [:on, :off])Don’t forget to run the migration:
At this moment we have all pieces in place to execute queries on our new table. In this place, we can think about the seeding script. For the data_warehouse specifically, we won’t need to provide that script as we don’t know in advance what topic names we will use. Instead of seeding settings in advance, our code will “upsert”(using insert function) settings when start_storing/1 or stop_storing/1 are called.
14.7.2 Redesign supervision using Registry
We can now focus on drafting a supervision tree for the data_warehouse application. At this moment we have only the DataWarehouse.Subscriber.Worker and the DataWarehouse.Application modules.
As it was with the case of naive and streamer applications, we will need an additional level of supervision to cater for “autostarting” Task as well as, in the case of the data_warehouse application the Registry.
The full supervision tree will look as follows:

Everything looks very similar to the supervision tree that we created in the streamer and the naive applications but there’s an additional Registry that is supervised by the SubscriberSupervisior process.
The idea is that inside the Worker module’s start_link/1 we will register worker processes using :via tuple. Internally, GenServer will utilize Registry’s functions like register_name/2 to add process to the registry under the topic string. This way we will be able to retrieve PIDs assigned to topics using those topic strings instead of registering each worker process with an atom name.
Just as previously the DynamicSupervisor will be in charge of supervising the Worker processes and it won’t be even aware that we are using the Registry to keep track of topic => PID association.
14.7.3 Create the DataWarehouse.Subscriber.DynamicSupervisor module
Let’s start by creating a new file called dynamic_supervisor.ex inside the
apps/data_warehouse/lib/data_warehouse/subscriber directory and put default dynamic supervisor implementation inside:
# /apps/data_warehouse/lib/data_warehouse/subscriber/dynamic_supervisor.ex
defmodule DataWarehouse.Subscriber.DynamicSupervisor do
use DynamicSupervisor
def start_link(_arg) do
DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_arg) do
DynamicSupervisor.init(strategy: :one_for_one)
end
endAs we will put all our logic related to autostarting, starting and stopping inside this module we can already add aliases, import and require:
# /apps/data_warehouse/lib/data_warehouse/subscriber/dynamic_supervisor.ex
require Logger
alias DataWarehouse.Repo
alias DataWarehouse.Schema.SubscriberSettings
alias DataWarehouse.Subscriber.Worker
import Ecto.Query, only: [from: 2]
@registry :subscriber_workersAdditionally, we added the @registry module attribute that we will use to retrieve PID for the specific topic.
We can move on to implementing autostart_workers/0 which will look very similar to the ones that we implemented in the streamer and the naive applications:
# /apps/data_warehouse/lib/data_warehouse/subscriber/dynamic_supervisor.ex
...
def autostart_workers do
Repo.all(
from(s in SubscriberSettings,
where: s.status == "on",
select: s.topic
)
)
|> Enum.map(&start_child/1)
end
defp start_child(args) do
DynamicSupervisor.start_child(
__MODULE__,
{Worker, args}
)
endWe can see that we are querying the database for a list of topics(not symbols) and we are calling start_child/2 for each result.
The start_worker/1 is where the Registry will shine as we won’t need to check is there already a process running for that topic - we can leave that check to the Registry. If there’s a process already running for that topic it will just return a tuple starting with :error atom:
# /apps/data_warehouse/lib/data_warehouse/subscriber/dynamic_supervisor.ex
...
def start_worker(topic) do
Logger.info("Starting storing data from #{topic} topic")
update_status(topic, "on")
start_child(topic)
end
...
defp update_status(topic, status)
when is_binary(topic) and is_binary(status) do
%SubscriberSettings{
topic: topic,
status: status
}
|> Repo.insert(
on_conflict: :replace_all,
conflict_target: :topic
)
endAs we are not seeding the database with the default settings we will use the insert/2 function with options(as previously) to make it work as it would be an “upsert” function.
Last function in this module will be stop_worker/1 which uses private stop_child/1 function. The stop_child/1 function shows how to retrieve PID of the process assigned to the passed topic:
# /apps/data_warehouse/lib/data_warehouse/subscriber/dynamic_supervisor.ex
...
def stop_worker(topic) do
Logger.info("Stopping storing data from #{topic} topic")
update_status(topic, "off")
stop_child(topic)
end
...
defp stop_child(args) do
case Registry.lookup(@registry, args) do
[{pid, _}] -> DynamicSupervisor.terminate_child(__MODULE__, pid)
_ -> Logger.warning("Unable to locate process assigned to #{inspect(args)}")
end
endThat is a full implementation of the DataWarehouse.Subscriber.DynamicSupervisor module and it’s almost as slim as one from the last chapter where we leveraged macros to achieve that lightness. Using the Registry is the preferred way to manage a list of identifiable processes. We won’t run into an issue of overusing the atoms(as they are not garbage collected, we could hit that limit sooner or later).
14.7.4 Register Worker processes using :via
The above DynamicSupervisor module assumes that Workers are registered inside the Registry - to make this happen we will need to update the start_link/1 function of the
DataWarehouse.Subscriber.Worker module:
# /apps/data_warehouse/lib/data_warehouse/subscriber/worker.ex
...
def start_link(topic) do
GenServer.start_link(
__MODULE__,
topic,
name: via_tuple(topic)
)
end
...
defp via_tuple(topic) do
{:via, Registry, {:subscriber_workers, topic}}
end
... Passing the :name option to the GenServer’s start_link/3 function we instruct it to utilize the Registry module to register processes under topic names.
14.7.5 Create a new supervision level for Registry, Task and the DynamicSupervisor
We have the lowest level modules - the Worker and the DynamicSupervisor implemented - time to add a new Supervisor that will start the Registry, the DynamicSupervisor, and the autostart storing Task. First create a new file called subscriber_supervisor.ex inside the apps/data_warehouse/lib/data_warehouse directory:
# /apps/data_warehouse/lib/data_warehouse/subscriber_supervisor.ex
defmodule DataWarehouse.SubscriberSupervisor do
use Supervisor
alias DataWarehouse.Subscriber.DynamicSupervisor
@registry :subscriber_workers
def start_link(_args) do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_args) do
children = [
{Registry, [keys: :unique, name: @registry]},
{DynamicSupervisor, []},
{Task,
fn ->
DynamicSupervisor.autostart_workers()
end}
]
Supervisor.init(children, strategy: :rest_for_one)
end
endThe important part here will be to match the Registry name to the one defined inside the DynamicSupervisor and the Worker modules.
14.7.6 Link the SubscriberSupervisor to the Application
We need to update the DataWarehouse.Application module to start our new
DataWarehouse.SubscriberSupervisor process as well as register itself under name matching to its module(just for consistency with other applications):
# /apps/data_warehouse/lib/data_warehouse/application.ex
...
def start(_type, _args) do
children = [
{DataWarehouse.Repo, []},
{DataWarehouse.SubscriberSupervisor, []} # <= new module added
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: __MODULE__] # <= name updated
Supervisor.start_link(children, opts)
end
...14.7.7 Add interface
The final step will be to add an interface to the DataWarehouse application to start and stop storing:
# /apps/data_warehouse/lib/data_warehouse.ex
alias DataWarehouse.Subscriber.DynamicSupervisor
def start_storing(stream, symbol) do
to_topic(stream, symbol)
|> DynamicSupervisor.start_worker()
end
def stop_storing(stream, symbol) do
to_topic(stream, symbol)
|> DynamicSupervisor.stop_worker()
end
defp to_topic(stream, symbol) do
[stream, symbol]
|> Enum.map(&String.upcase/1)
|> Enum.join(":")
endInside the above functions, we are just doing a couple of sanity checks on the case of the passed arguments assuming that both topics and stream are uppercase.
14.7.8 Test
The interface above was the last step in our implementation, we can now test that all works as expected:
$ iex -S mix
...
iex(1)> DataWarehouse.start_storing("TRADE_EVENTS", "NEOUSDT")
19:34:00.740 [info] Starting storing data from TRADE_EVENTS:NEOUSDT topic
19:34:00.847 [info] DataWarehouse worker is subscribing to TRADE_EVENTS:NEOUSDT
{:ok, #PID<0.429.0>}
iex(2)> DataWarehouse.start_storing("TRADE_EVENTS", "NEOUSDT")
19:34:04.753 [info] Starting storing data from TRADE_EVENTS:NEOUSDT topic
{:error, {:already_started, #PID<0.459.0>}}
iex(3)> DataWarehouse.start_storing("ORDERS", "NEOUSDT")
19:34:09.386 [info] Starting storing data from ORDERS:NEOUSDT topic
19:34:09.403 [info] DataWarehouse worker is subscribing to ORDERS:NEOUSDT
{:ok, #PID<0.431.0>}
BREAK: (a)bort (A)bort with dump (c)ontinue (p)roc info (i)nfo
(l)oaded (v)ersion (k)ill (D)b-tables (d)istribution
^C%
$ iex -S mix
...
19:35:30.058 [info] DataWarehouse worker is subscribing to TRADE_EVENTS:NEOUSDT
19:35:30.062 [info] DataWarehouse worker is subscribing to ORDERS:NEOUSDT
# autostart works ^^^
iex(1)> Naive.start_trading("NEOUSDT")
19:36:45.316 [info] Starting Elixir.Naive.SymbolSupervisor worker for NEOUSDT
19:36:45.417 [info] Starting new supervision tree to trade on NEOUSDT
{:ok, #PID<0.419.0>}
iex(3)>
19:36:47.484 [info] Initializing new trader(1615221407466) for NEOUSDT
iex(2)> Streamer.start_streaming("NEOUSDT")
16:37:39.660 [info] Starting Elixir.Streamer.Binance worker for NEOUSDT
{:ok, #PID<0.428.0>}
...
iex(3)> DataWarehouse.stop_storing("trade_events", "NEOUSDT")
19:39:26.398 [info] Stopping storing data from trade_events:NEOUSDT topic
:ok
iex(4)> DataWarehouse.stop_storing("trade_events", "NEOUSDT")
19:39:28.151 [info] Stopping storing data from trade_events:NEOUSDT topic
19:39:28.160 [warn] Unable to locate process assigned to "trade_events:NEOUSDT"
:ok
iex(5)> [{pid, nil}] = Registry.lookup(:subscriber_workers, "ORDERS:NEOUSDT")
[{#PID<0.417.0>, nil}]
iex(6)> Process.exit(pid, :crash)
true
16:43:40.812 [info] DataWarehouse worker is subscribing to ORDERS:NEOUSDTAs we can see even this simple implementation handles starting, autostarting, and stopping. It also gracefully handles starting workers when one is already running as well as stopping when there none running.
As a challenge, you could update the naive and the streamer application to use the Registry and remove Core.ServiceSupervisor module as it was superseded by the above solution - here’s the link to PR(pull request) that sums up the required changes.
[Note] Please remember to run the mix format to keep things nice and tidy.
The source code for this chapter can be found on GitHub