Chapter 14 Store trade events and orders inside the database

14.1 Objectives

  • overview of requirements
  • create a new data_warehouse application in the umbrella
  • connect to the database using Ecto
  • store trade events’ data
  • store orders’ data
  • implement supervision

14.2 Overview of requirements

In the next chapter, we will move on to testing our strategy against historical data(aka backtesting - I will explain that process in the next chapter). What we need to have in place before we will be able to do that is both trade events and orders stored in the database.

Starting with the trade events. The streamer application could store trade events from Binance inside its database but how would that work if we would like to introduce another source of non-streamed trade events(ie. flat files, HTTP polling). It would be better if the Streamer.Binance process would keep on streaming those trade events as it is and we would create a new application that would subscribe to the existing TRADE_EVENTS:#{symbol} topic and store them in the database.

A similar idea applies to the orders’ data. At this moment the naive application uses the Binance module to place orders. We could store them inside the naive application’s database but how would that work if we would like to introduce another trading strategy. Holding data in separate databases for each strategy would cause further complications in future reporting, auditing, etc.

To store trade events’ and orders’ data we will create a new application called data_warehouse inside our umbrella project. It will subscribe to a TRADE_EVENTS:#{symbol} stream as well as ORDERS:#{symbol} stream, convert broadcasted data to its own representations(structs) and store it inside the database.

Trade events are already broadcasted to the PubSub topic, orders on the other hand aren’t. We will need to modify the Naive.Trader module to broadcast the new and updated orders to the ORDERS:#{symbol} topic.

After implementing the basic worker that will store the incoming data(trade events and orders) inside the database, we will look into adding a supervision tree utilizing Elixir Registry. It will allow us to skip registering every worker with a unique atom and will offer an easy lookup to fetch PIDs instead.

14.3 Create a new data_warehouse application in the umbrella

Let’s start by creating a new application called data_warehouse inside our umbrella:

$ cd apps
$ mix new data_warehouse --sup
* creating README.md
* creating .formatter.exs
* creating .gitignore
* creating mix.exs
* creating lib
* creating lib/data_warehouse.ex
* creating lib/data_warehouse/application.ex
* creating test
* creating test/test_helper.exs
* creating test/data_warehouse_test.exs
...

14.4 Connect to the database using Ecto

We can now follow similar steps as previously and add required dependencies (like the ecto) to its deps by modifying its mix.exs file:

  # /apps/data_warehouse/mix.exs
  defp deps do
    [
      {:binance, "~> 1.0"},
      {:ecto_sql, "~> 3.0"},
      {:ecto_enum, "~> 1.4"},
      {:phoenix_pubsub, "~> 2.0"},
      {:postgrex, ">= 0.0.0"},
      {:streamer, in_umbrella: true}
    ]
  end

Additionally, we added the phoenix_pubsub(to subscribe to the PubSub topic), the streamer application(to use its Streamer.Binance.TradeEvent struct) and the binance package(to pattern match it’s structs).

We can now jump back to the terminal to install added dependencies and generate a new Ecto.Repo module:

$ mix deps.get
  ...
$ cd apps/data_warehouse 
$ mix ecto.gen.repo -r DataWarehouse.Repo
* creating lib/data_warehouse
* creating lib/data_warehouse/repo.ex
* updating ../../config/config.exs

Before we will be able to create migrations that will create our tables we need to update the generated configuration inside the config/config.exs file:

# /config/config.exs
...
config :data_warehouse,            # <= added line
  ecto_repos: [DataWarehouse.Repo] # <= added line

config :data_warehouse, DataWarehouse.Repo,
  database: "data_warehouse",         # <= updated line
  username: "postgres",               # <= updated line
  password: "hedgehogSecretPassword", # <= updated line
  hostname: "localhost"
...

and add the DataWarehouse.Repo module to the children list of the DataWarehouse.Application’s process:

    # /apps/data_warehouse/lib/data_warehouse/application.ex
    ...
    children = [
      {DataWarehouse.Repo, []}
    ]
    ...

The last step will be to create a database by running mix ecto.create -r DataWarehouse.Repo command.

This ends up the setup of the Ecto - we can now move on to the implementation of storing the orders and the trade events.

14.5 Store trade events’ data

The first step to store trade events inside the database will be to create a table that will hold our data. We will start by creating the migration:

$ cd apps/data_warehouse
$ mix ecto.gen.migration create_trade_events
* creating priv/repo/migrations
* creating priv/repo/migrations/20210222224514_create_trade_events.exs

The Streamer.Binance.TradeEvent struct will serve as a list of columns for our new trade_events table. Here’s the full implementation of our migration:

# /apps/data_warehouse/priv/repo/migrations/20210222224514_create_trade_events.exs
defmodule DataWarehouse.Repo.Migrations.CreateTradeEvents do
  use Ecto.Migration

  def change do
    create table(:trade_events, primary_key: false) do
      add(:id, :uuid, primary_key: true)
      add(:event_type, :text)
      add(:event_time, :bigint)
      add(:symbol, :text)
      add(:trade_id, :integer)
      add(:price, :text)
      add(:quantity, :text)
      add(:buyer_order_id, :bigint)
      add(:seller_order_id, :bigint)
      add(:trade_time, :bigint)
      add(:buyer_market_maker, :bool)

      timestamps()
    end
  end
end

We added the additional id field to easily identify each trade event and our timestamps for monitoring.

Let’s run the migration so it will create a new trade_events table for us:

$ mix ecto.migrate

The next step will be to create a new directory called schema inside the
apps/data_warehouse/lib/data_warehouse directory. Inside it, we need to create a new schema file called trade_event.ex. We can copy across the same columns from the migration straight to schema:

# /apps/data_warehouse/lib/data_warehouse/schema/trade_event.ex
defmodule DataWarehouse.Schema.TradeEvent do
  use Ecto.Schema

  @primary_key {:id, :binary_id, autogenerate: true}

  schema "trade_events" do
    field(:event_type, :string)
    field(:event_time, :integer)
    field(:symbol, :string)
    field(:trade_id, :integer)
    field(:price, :string)
    field(:quantity, :string)
    field(:buyer_order_id, :integer)
    field(:seller_order_id, :integer)
    field(:trade_time, :integer)
    field(:buyer_market_maker, :boolean)

    timestamps()
  end
end

At this moment we should be able to execute crud(create, read[select], update, delete) operations over the table using the above struct.

Currently, we can already store the trade events’ data inside the database so we can move on to collecting it. Trade events are getting broadcasted by the Streamer.Binance process here:

    # /apps/streamer/lib/streamer/binance.ex
    ...
    Phoenix.PubSub.broadcast(
      Streamer.PubSub,
      "TRADE_EVENTS:#{trade_event.symbol}",
      trade_event
    )
    ...

We will implement a subscriber process that will be given a PubSub topic and will store incoming data inside the database.

Let’s start by creating a new folder called subscriber inside the
apps/data_warehouse/lib/data_warehouse directory together with a new file called worker.ex inside it:

# /apps/data_warehouse/lib/data_warehouse/subscriber/worker.ex
defmodule DataWarehouse.Subscriber.Worker do
  use GenServer

  require Logger

  defmodule State do
    @enforce_keys [:topic]
    defstruct [:topic]
  end

  def start_link(topic) do
    GenServer.start_link(
      __MODULE__,
      topic,
      name: :"#{__MODULE__}-#{topic}"
    )
  end

  def init(topic) do
    {:ok,
     %State{
       topic: topic
     }}
  end
end

At this moment it’s just a box standard implementation of the GenServer with a state struct containing a single key(:topic). We need to update the init/1 function to subscribe to the PubSub topic:

# /apps/data_warehouse/lib/data_warehouse/subscriber/worker.ex
  def init(topic) do
    Logger.info("DataWarehouse worker is subscribing to #{topic}")

    Phoenix.PubSub.subscribe(
      Streamer.PubSub,
      topic
    )
    ...

Next, we need to add a handler for received messages:

# /apps/data_warehouse/lib/data_warehouse/subscriber/worker.ex
  def handle_info(%Streamer.Binance.TradeEvent{} = trade_event, state) do
    opts =
      trade_event
      |> Map.from_struct()

    struct!(DataWarehouse.Schema.TradeEvent, opts)
    |> DataWarehouse.Repo.insert()

    {:noreply, state}
  end

As we did in the case of the Naive.Trader, all incoming messages trigger a handle_info/2 callback with the contents of the message and the current state of the subscriber worker. We just convert that incoming trade event to a map and then that map to the TradeEvent struct that gets inserted into the database.

This finishes storing of trade events implementation which we can test by in the interactive shell by running:

$ iex -S mix
...
iex(1)> Streamer.start_streaming("XRPUSDT")
00:48:30.147 [info]  Starting Elixir.Streamer.Binance worker for XRPUSDT
{:ok, #PID<0.395.0>}
iex(2)> DataWarehouse.Subscriber.Worker.start_link("TRADE_EVENTS:XRPUSDT")
00:49:48.204 [info]  DataWarehouse worker is subscribing to TRADE_EVENTS:XRPUSDT
{:ok, #PID<0.405.0>}

After a couple of minutes we can check the database using psql:

$ psql -Upostgres -h127.0.0.1
Password for user postgres:
...
postgres=# \c data_warehouse;
You are now connected to database "data_warehouse" as user "postgres".
data_warehouse=# \x
Expanded display is on.
data_warehouse=# SELECT * FROM trade_events;
-[ RECORD 1 ]------+-------------------------------------
id                 | f6eae686-946a-4e34-9c33-c7034c2cad5d
event_type         | trade
event_time         | 1614041388236
symbol             | XRPUSDT
trade_id           | 152765072
price              | 0.56554000
quantity           | 1199.10000000
buyer_order_id     | 1762454848
seller_order_id    | 1762454775
trade_time         | 1614041388235
buyer_market_maker | f
inserted_at        | 2021-02-23 00:49:48
...

As we can see in the above output, trade events are now getting stored inside the database.

14.6 Store orders’ data

In the same fashion as with trade events’ data above, to store orders data we will create an orders table inside a new migration:

$ cd apps/data_warehouse
$ mix ecto.gen.migration create_orders
* creating priv/repo/migrations/20210222224522_create_orders.exs

The list of columns for this table will be a copy of Binance.Order struct returned from the Binance exchange:

# /apps/data_warehouse/priv/repo/migrations/20210222224522_create_orders.exs
defmodule DataWarehouse.Repo.Migrations.CreateOrders do
  use Ecto.Migration

  def change do
    create table(:orders, primary_key: false) do
      add(:order_id, :bigint, primary_key: true)
      add(:client_order_id, :text)
      add(:symbol, :text)
      add(:price, :text)
      add(:original_quantity, :text)
      add(:executed_quantity, :text)
      add(:cummulative_quote_quantity, :text)
      add(:status, :text)
      add(:time_in_force, :text)
      add(:type, :text)
      add(:side, :text)
      add(:stop_price, :text)
      add(:iceberg_quantity, :text)
      add(:time, :bigint)
      add(:update_time, :bigint)

      timestamps()
    end
  end
end

We updated all of the shortened names like orig_qty to full names like original_quantity.

Let’s run the migration so it will create a new orders table for us:

$ mix ecto.migrate

We can copy the above fields list to create a schema module. First, let’s create a new file called order.ex inside the apps/data_warehouse/lib/data_warehouse/schema directory:

# /apps/data_warehouse/lib/data_warehouse/schema/order.ex
defmodule DataWarehouse.Schema.Order do
  use Ecto.Schema

  @primary_key {:order_id, :integer, autogenerate: false}

  schema "orders" do
    field(:client_order_id, :string)
    field(:symbol, :string)
    field(:price, :string)
    field(:original_quantity, :string)
    field(:executed_quantity, :string)
    field(:cummulative_quote_quantity, :string)
    field(:status, :string)
    field(:time_in_force, :string)
    field(:type, :string)
    field(:side, :string)
    field(:stop_price, :string)
    field(:iceberg_quantity, :string)
    field(:time, :integer)
    field(:update_time, :integer)

    timestamps()
  end
end

We can now add a handler to our DataWarehouse.Subscriber.Worker that will convert the Binance.Order struct to DataWarehouse.Schema.Order and store data inside the database:

# /apps/data_warehouse/lib/data_warehouse/subscriber/worker.ex
  def handle_info(%Binance.Order{} = order, state) do
    data =
      order
      |> Map.from_struct()

    struct(DataWarehouse.Schema.Order, data)
    |> Map.merge(%{
      original_quantity: order.orig_qty,
      executed_quantity: order.executed_qty,
      cummulative_quote_quantity: order.cummulative_quote_qty,
      iceberg_quantity: order.iceberg_qty
    })
    |> DataWarehouse.Repo.insert(
      on_conflict: :replace_all,
      conflict_target: :order_id
    )

    {:noreply, state}
  end
  ...

In the above code, we are copying the matching fields using the struct/2 function but all other fields that aren’t 1 to 1 between two structs won’t be copied, so we need to merge them in the second step(using the Map.merge/2 function). We are also using the on_conflict: :replace_all option to make the insert/2 function act as it would be upsert/2(to avoid writing separate logic for inserting and updating the orders).

Having all of this in place we will now be able to store broadcasted orders’ data in the database but there’s nothing actually broadcasting them.

We need to modify the Naive.Trader module to broadcast the Binance.Order whenever it places buy/sell orders or fetches them again:

    # /apps/naive/lib/naive/trader.ex
    ...
    # inside placing initial buy order callback
    {:ok, %Binance.OrderResponse{} = order} =
      @binance_client.order_limit_buy(symbol, quantity, price, "GTC")

    :ok = broadcast_order(order)
    ...

    # inside buy order (partially) filled callback
    {:ok, %Binance.Order{} = current_buy_order} =
      @binance_client.get_order(
        symbol,
        timestamp,
        order_id
      )

    :ok = broadcast_order(current_buy_order)
    ...

        # inside the same callback in case of buy order filled
        {:ok, %Binance.OrderResponse{} = order} =
          @binance_client.order_limit_sell(symbol, quantity, sell_price, "GTC")
        
        :ok = broadcast_order(order)
    ...

    # inside sell order (partially) filled callback
    {:ok, %Binance.Order{} = current_sell_order} =
      @binance_client.get_order(
        symbol,
        timestamp,
        order_id
      )
    
    :ok = broadcast_order(current_sell_order)
    ...

Above 4 places send both the Binance.OrderResponse and the Binance.Order structs - our broadcast_order/1 function needs to be able to handle them both. Add the following at the bottom of the Naive.Trader module:

  # /apps/naive/lib/naive/trader.ex
  defp broadcast_order(%Binance.OrderResponse{} = response) do
    response
    |> convert_to_order()
    |> broadcast_order()
  end

  defp broadcast_order(%Binance.Order{} = order) do
    Phoenix.PubSub.broadcast(
      Streamer.PubSub,
      "ORDERS:#{order.symbol}",
      order
    )
  end

  defp convert_to_order(%Binance.OrderResponse{} = response) do
    data =
      response
      |> Map.from_struct()

    struct(Binance.Order, data)
    |> Map.merge(%{
      cummulative_quote_qty: "0.00000000",
      stop_price: "0.00000000",
      iceberg_qty: "0.00000000",
      is_working: true
    })
  end

As DataWarehouse.Subscriber.Worker process expects only the Binance.Order structs to be broadcasted, we first check is it the Binance.OrderResponse struct and convert the passed value to the Binance.Order struct (if that’s the case) and only then broadcast it to the PubSub topic.

The converting logic as previously uses the struct/2 function but it also merges in default values that are missing from the much smaller Binance.OrderResponse struct(with comparison to the Binance.Order).

At this moment we will be able to store orders inside the database and we can check that by running:

$ iex -S mix
...
iex(1)> DataWarehouse.Subscriber.Worker.start_link("ORDERS:NEOUSDT")
22:37:43.043 [info]  DataWarehouse worker is subscribing to ORDERS:XRPUSDT
{:ok, #PID<0.400.0>}
iex(2)> Naive.start_trading("NEOUSDT")
22:38:39.741 [info]  Starting Elixir.Naive.SymbolSupervisor worker for NEOUSDT
22:38:39.832 [info]  Starting new supervision tree to trade on NEOUSDT
{:ok, #PID<0.402.0>}
22:38:41.654 [info]  Initializing new trader(1614119921653) for NEOUSDT
iex(3)> Streamer.start_streaming("NEOUSDT")
22:39:23.786 [info]  Starting Elixir.Streamer.Binance worker for NEOUSDT
{:ok, #PID<0.412.0>}
22:39:27.187 [info]  The trader(1614119921653) is placing a BUY order for NEOUSDT @ 37.549,
quantity: 5.326
22:39:27.449 [info]  The trader(1614119921653) is placing a SELL order for NEOUSDT @ 37.578,
quantity: 5.326.

At this moment inside the DataWarehouse’s database we should see orders:

$ psql -Upostgres -h127.0.0.1
Password for user postgres: 
...
postgres=# \c data_warehouse;
You are now connected to database "data_warehouse" as user "postgres".
data_warehouse=# \x
Expanded display is on.
data_warehouse=# SELECT * FROM orders;
-[ RECORD 1 ]--------------+---------------------------------
order_id                   | 1
client_order_id            | C81E728D9D4C2F636F067F89CC14862C
symbol                     | NEOUSDT
price                      | 38.16
original_quantity          | 5.241
executed_quantity          | 0.00000000
cummulative_quote_quantity | 0.00000000
status                     | FILLED
time_in_force              | GTC
type                       | LIMIT
side                       | BUY
stop_price                 | 0.00000000
iceberg_quantity           | 0.00000000
time                       | 1614120906320
update_time                | 1614120906320
inserted_at                | 2021-02-23 22:55:10
updated_at                 | 2021-02-23 22:55:10
-[ RECORD 2 ]--------------+---------------------------------
order_id                   | 2
client_order_id            | ECCBC87E4B5CE2FE28308FD9F2A7BAF3
symbol                     | NEOUSDT
price                      | 38.19
original_quantity          | 5.241
executed_quantity          | 0.00000000
cummulative_quote_quantity | 0.00000000
status                     | NEW
time_in_force              | GTC
type                       | LIMIT
side                       | SELL
stop_price                 | 0.00000000
iceberg_quantity           | 0.00000000
time                       | 
update_time                | 
inserted_at                | 2021-02-23 22:55:10
updated_at                 | 2021-02-23 22:55:10

The first record above got inserted and updated as its state is “FILLED”, the second one wasn’t updated yet as it’s still in “NEW” state - that confirms that the upsert trick works.

That finishes the implementation of storing orders inside the database.

14.7 Implement supervision

Currently, we have a DataWarehouse.Subscriber.Worker process that will take care of storing data into the database, but sadly if anything will go wrong inside our worker and it will crash there’s no supervision in place to restart it.

The supervision tree for the data_warehouse application will be similar to ones from the naive and streamer apps but different enough to not use the Core.ServiceSupervisor abstraction.

For example, it doesn’t use the symbol column, it works based on the topic column. This would require changes to the Core.ServiceSupervisor’s functions like update_status/4 or fetch_symbols_to_start/2, we could update them to accept column name but that would need to be passed through other functions. We can see that this is probably not the best approach and the further we will get the more complex it will become. The second issue would be that we are registering all processes with names and that can be problematic as the list of processes will start to grow(as we can imagine in the case of the data_warehouse application).

The better approach would be to mix the DynamicSupervisor together with Registry.

The DynamicSupervisor will supervise the Subscriber.Workers and instead of keeping track of them by registering them using atoms we will start them :via Elixir Registry.

We will add all functionality that we implemented for naive and streamer applications. We will provide the functions to start and stop storing data on passed PubSub topics as well as store those topics inside the database so storing will be autostarted.

14.7.1 Create subscriber_settings table

To provide autostarting function we need to create a new migration that will create the subscriber_settings table:

$ cd apps/data_warehouse
$ mix ecto.gen.migration create_subscriber_settings
* creating priv/repo/migrations/20210227230123_create_subscriber_settings.exs

At this moment we can copy the code to create the settings table(enum and index as well) from the streamer application and tweak it to fit the data_warehouse application. So the first important change (besides updating namespaces from Streamer to DataWarehouse) will be to make a note that we have a setting per topic - not per symbol as for the naive and streamer applications:

# /apps/data_warehouse/priv/repo/migrations/20210227230123_create_subscriber_settings.exs
defmodule DataWarehouse.Repo.Migrations.CreateSubscriberSettings do
  use Ecto.Migration

  alias DataWarehouse.Schema.SubscriberStatusEnum

  def change do
    SubscriberStatusEnum.create_type()

    create table(:subscriber_settings, primary_key: false) do
      add(:id, :uuid, primary_key: true)
      add(:topic, :text, null: false)
      add(:status, SubscriberStatusEnum.type(), default: "off", null: false)
      
      timestamps()
    end

    create(unique_index(:subscriber_settings, [:topic]))
  end
end

Both schema and enum will be almost identical to the ones from the streamer application - we can simply copy those files and apply basic tweaks like updating the namespace:

$ cp apps/streamer/lib/streamer/schema/settings.ex \
apps/data_warehouse/lib/data_warehouse/schema/subscriber_settings.ex
$ cp apps/streamer/lib/streamer/schema/streaming_status_enum.ex \
apps/data_warehouse/lib/data_warehouse/schema/subscriber_status_enum.ex

Remember about updating the symbol column to topic as well as table name inside the

DataWarehouse.Schema.SubscriberSettings:

# /apps/data_warehouse/lib/data_warehouse/schema/subscriber_settings.ex
defmodule DataWarehouse.Schema.SubscriberSettings do
  use Ecto.Schema

  alias DataWarehouse.Schema.SubscriberStatusEnum

  @primary_key {:id, :binary_id, autogenerate: true}

  schema "subscriber_settings" do
    field(:topic, :string)
    field(:status, SubscriberStatusEnum)

    timestamps()
  end
end

Inside apps/data_warehouse/lib/data_warehouse/schema/subscriber_status_enum.ex we need to swap references of Streamer to DataWarehouse and references of StreamingStatusEnum to SubscriberStatusEnum:

# /apps/data_warehouse/lib/data_warehouse/schema/subscriber_status_enum.ex
import EctoEnum

defenum(DataWarehouse.Schema.SubscriberStatusEnum, :subscriber_status, [:on, :off])

Don’t forget to run the migration:

$ mix ecto.migrate

At this moment we have all pieces in place to execute queries on our new table. In this place, we can think about the seeding script. For the data_warehouse specifically, we won’t need to provide that script as we don’t know in advance what topic names we will use. Instead of seeding settings in advance, our code will “upsert”(using insert function) settings when start_storing/1 or stop_storing/1 are called.

14.7.2 Redesign supervision using Registry

We can now focus on drafting a supervision tree for the data_warehouse application. At this moment we have only the DataWarehouse.Subscriber.Worker and the DataWarehouse.Application modules.

As it was with the case of naive and streamer applications, we will need an additional level of supervision to cater for “autostarting” Task as well as, in the case of the data_warehouse application the Registry.

The full supervision tree will look as follows:

Everything looks very similar to the supervision tree that we created in the streamer and the naive applications but there’s an additional Registry that is supervised by the SubscriberSupervisior process.

The idea is that inside the Worker module’s start_link/1 we will register worker processes using :via tuple. Internally, GenServer will utilize Registry’s functions like register_name/2 to add process to the registry under the topic string. This way we will be able to retrieve PIDs assigned to topics using those topic strings instead of registering each worker process with an atom name.

Just as previously the DynamicSupervisor will be in charge of supervising the Worker processes and it won’t be even aware that we are using the Registry to keep track of topic => PID association.

14.7.3 Create the DataWarehouse.Subscriber.DynamicSupervisor module

Let’s start by creating a new file called dynamic_supervisor.ex inside the
apps/data_warehouse/lib/data_warehouse/subscriber directory and put default dynamic supervisor implementation inside:

# /apps/data_warehouse/lib/data_warehouse/subscriber/dynamic_supervisor.ex
defmodule DataWarehouse.Subscriber.DynamicSupervisor do
  use DynamicSupervisor

  def start_link(_arg) do
    DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_arg) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end
end

As we will put all our logic related to autostarting, starting and stopping inside this module we can already add aliases, import and require:

# /apps/data_warehouse/lib/data_warehouse/subscriber/dynamic_supervisor.ex
  require Logger

  alias DataWarehouse.Repo
  alias DataWarehouse.Schema.SubscriberSettings
  alias DataWarehouse.Subscriber.Worker

  import Ecto.Query, only: [from: 2]

  @registry :subscriber_workers

Additionally, we added the @registry module attribute that we will use to retrieve PID for the specific topic.

We can move on to implementing autostart_workers/0 which will look very similar to the ones that we implemented in the streamer and the naive applications:

  # /apps/data_warehouse/lib/data_warehouse/subscriber/dynamic_supervisor.ex
  ...
  def autostart_workers do
    Repo.all(
      from(s in SubscriberSettings,
        where: s.status == "on",
        select: s.topic
      )
    )
    |> Enum.map(&start_child/1)
  end

  defp start_child(args) do
    DynamicSupervisor.start_child(
      __MODULE__,
      {Worker, args}
    )
  end

We can see that we are querying the database for a list of topics(not symbols) and we are calling start_child/2 for each result.

The start_worker/1 is where the Registry will shine as we won’t need to check is there already a process running for that topic - we can leave that check to the Registry. If there’s a process already running for that topic it will just return a tuple starting with :error atom:

  # /apps/data_warehouse/lib/data_warehouse/subscriber/dynamic_supervisor.ex
  ...
  def start_worker(topic) do
    Logger.info("Starting storing data from #{topic} topic")
    update_status(topic, "on")
    start_child(topic)
  end
  ...
  defp update_status(topic, status)
       when is_binary(topic) and is_binary(status) do
    %SubscriberSettings{
      topic: topic,
      status: status
    }
    |> Repo.insert(
      on_conflict: :replace_all,
      conflict_target: :topic
    )
  end

As we are not seeding the database with the default settings we will use the insert/2 function with options(as previously) to make it work as it would be an “upsert” function.

Last function in this module will be stop_worker/1 which uses private stop_child/1 function. The stop_child/1 function shows how to retrieve PID of the process assigned to the passed topic:

  # /apps/data_warehouse/lib/data_warehouse/subscriber/dynamic_supervisor.ex
  ...
  def stop_worker(topic) do
    Logger.info("Stopping storing data from #{topic} topic")
    update_status(topic, "off")
    stop_child(topic)
  end
  ...
  defp stop_child(args) do
    case Registry.lookup(@registry, args) do
      [{pid, _}] -> DynamicSupervisor.terminate_child(__MODULE__, pid)
      _ -> Logger.warning("Unable to locate process assigned to #{inspect(args)}")
    end
  end

That is a full implementation of the DataWarehouse.Subscriber.DynamicSupervisor module and it’s almost as slim as one from the last chapter where we leveraged macros to achieve that lightness. Using the Registry is the preferred way to manage a list of identifiable processes. We won’t run into an issue of overusing the atoms(as they are not garbage collected, we could hit that limit sooner or later).

14.7.4 Register Worker processes using :via

The above DynamicSupervisor module assumes that Workers are registered inside the Registry - to make this happen we will need to update the start_link/1 function of the
DataWarehouse.Subscriber.Worker module:

  # /apps/data_warehouse/lib/data_warehouse/subscriber/worker.ex
  ...
  def start_link(topic) do
    GenServer.start_link(
      __MODULE__,
      topic,
      name: via_tuple(topic)
    )
  end
  ...
  defp via_tuple(topic) do
    {:via, Registry, {:subscriber_workers, topic}}
  end
  ...    

Passing the :name option to the GenServer’s start_link/3 function we instruct it to utilize the Registry module to register processes under topic names.

14.7.5 Create a new supervision level for Registry, Task and the DynamicSupervisor

We have the lowest level modules - the Worker and the DynamicSupervisor implemented - time to add a new Supervisor that will start the Registry, the DynamicSupervisor, and the autostart storing Task. First create a new file called subscriber_supervisor.ex inside the apps/data_warehouse/lib/data_warehouse directory:

# /apps/data_warehouse/lib/data_warehouse/subscriber_supervisor.ex
defmodule DataWarehouse.SubscriberSupervisor do
  use Supervisor

  alias DataWarehouse.Subscriber.DynamicSupervisor

  @registry :subscriber_workers

  def start_link(_args) do
    Supervisor.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_args) do
    children = [
      {Registry, [keys: :unique, name: @registry]},
      {DynamicSupervisor, []},
      {Task,
       fn ->
         DynamicSupervisor.autostart_workers()
       end}
    ]

    Supervisor.init(children, strategy: :rest_for_one)
  end
end

The important part here will be to match the Registry name to the one defined inside the DynamicSupervisor and the Worker modules.

14.7.7 Add interface

The final step will be to add an interface to the DataWarehouse application to start and stop storing:

  # /apps/data_warehouse/lib/data_warehouse.ex
  alias DataWarehouse.Subscriber.DynamicSupervisor

  def start_storing(stream, symbol) do
    to_topic(stream, symbol)
    |> DynamicSupervisor.start_worker()
  end

  def stop_storing(stream, symbol) do
    to_topic(stream, symbol)
    |> DynamicSupervisor.stop_worker()
  end

  defp to_topic(stream, symbol) do
    [stream, symbol]
    |> Enum.map(&String.upcase/1)
    |> Enum.join(":")
  end

Inside the above functions, we are just doing a couple of sanity checks on the case of the passed arguments assuming that both topics and stream are uppercase.

14.7.8 Test

The interface above was the last step in our implementation, we can now test that all works as expected:

$ iex -S mix
...
iex(1)> DataWarehouse.start_storing("TRADE_EVENTS", "NEOUSDT")
19:34:00.740 [info]  Starting storing data from TRADE_EVENTS:NEOUSDT topic
19:34:00.847 [info]  DataWarehouse worker is subscribing to TRADE_EVENTS:NEOUSDT
{:ok, #PID<0.429.0>}
iex(2)> DataWarehouse.start_storing("TRADE_EVENTS", "NEOUSDT")
19:34:04.753 [info]  Starting storing data from TRADE_EVENTS:NEOUSDT topic
{:error, {:already_started, #PID<0.459.0>}}
iex(3)> DataWarehouse.start_storing("ORDERS", "NEOUSDT")
19:34:09.386 [info]  Starting storing data from ORDERS:NEOUSDT topic
19:34:09.403 [info]  DataWarehouse worker is subscribing to ORDERS:NEOUSDT
{:ok, #PID<0.431.0>}
BREAK: (a)bort (A)bort with dump (c)ontinue (p)roc info (i)nfo
       (l)oaded (v)ersion (k)ill (D)b-tables (d)istribution
^C%
$ iex -S mix
...
19:35:30.058 [info]  DataWarehouse worker is subscribing to TRADE_EVENTS:NEOUSDT
19:35:30.062 [info]  DataWarehouse worker is subscribing to ORDERS:NEOUSDT
# autostart works ^^^
iex(1)> Naive.start_trading("NEOUSDT")
19:36:45.316 [info]  Starting Elixir.Naive.SymbolSupervisor worker for NEOUSDT
19:36:45.417 [info]  Starting new supervision tree to trade on NEOUSDT
{:ok, #PID<0.419.0>}
iex(3)> 
19:36:47.484 [info]  Initializing new trader(1615221407466) for NEOUSDT
iex(2)> Streamer.start_streaming("NEOUSDT")
16:37:39.660 [info]  Starting Elixir.Streamer.Binance worker for NEOUSDT
{:ok, #PID<0.428.0>}
...
iex(3)> DataWarehouse.stop_storing("trade_events", "NEOUSDT")
19:39:26.398 [info]  Stopping storing data from trade_events:NEOUSDT topic
:ok
iex(4)> DataWarehouse.stop_storing("trade_events", "NEOUSDT")
19:39:28.151 [info]  Stopping storing data from trade_events:NEOUSDT topic
19:39:28.160 [warn]  Unable to locate process assigned to "trade_events:NEOUSDT"
:ok
iex(5)> [{pid, nil}] = Registry.lookup(:subscriber_workers, "ORDERS:NEOUSDT")
[{#PID<0.417.0>, nil}]
iex(6)> Process.exit(pid, :crash)
true
16:43:40.812 [info]  DataWarehouse worker is subscribing to ORDERS:NEOUSDT

As we can see even this simple implementation handles starting, autostarting, and stopping. It also gracefully handles starting workers when one is already running as well as stopping when there none running.

As a challenge, you could update the naive and the streamer application to use the Registry and remove Core.ServiceSupervisor module as it was superseded by the above solution - here’s the link to PR(pull request) that sums up the required changes.

[Note] Please remember to run the mix format to keep things nice and tidy.

The source code for this chapter can be found on GitHub