Chapter 23 Back to the Monolith

23.1 Objectives

  • reverting the course to the monolith
  • creating a new Phoenix app
  • reintegrating the core app
  • reintegrating the binance_mock app
  • reintegrating the streamer app
  • reintegrating the naive app
  • reintegrating the data_warehouse app
  • reintegrating the indicator app

23.2 Reverting the course to the monolith

We started our journey with the idea that we would use an umbrella to “future-proof” our project in case we would migrate to microservices architecture.

As we delved deeper into our project(which also included me gaining more knowledge and experience over three years of writing this book), we began to see that the umbrella approach was, at best, an example of overengineering.

Looking back, the umbrella approach introduced significant complexity, a factor that became increasingly difficult to justify as our understanding of the project evolved.

For example, each umbrella app has its configuration, which resulted in using multiple databases and duplicated settings for things like the Binance client, making our deployment unnecessarily expensive and complex.

In this chapter, we will simplify the whole project by ditching the umbrella and migrating to Phoenix, which will be instrumental in our next steps, including deployment and clustering.

23.3 Creating a new Phoenix app

Instead of improving the existing structure, we will start from scratch. We will create a new Phoenix project side-by-side next to the current umbrella app, and then we will reintegrate each nested app one after another.

First, let’s make sure that we have the latest version of the Phoenix application generator installed:

$ mix archive.install hex phx_new

Note: At the time of writing this chapter, the current version of the Phoenix framework is 1.7.12.

With this taken care of, we can progress with the creation of a new Phoenix application and a database:

$ mix phx.new hedgehog
...
$ cd hedgehog
$ mix ecto.create

With the Phoenix app skeleton in place, we can move on to reintegrating the umbrella apps’ contents into it.

23.4 Reintegrating the core app

We will start with the core app, as it is used/referenced by other apps in the umbrella.

It contains just a single module called Core.Struct.TradeEvent (inside the apps/core/lib/core/struct/trade_event.ex file) which we will rename to Hedgehog.Exchange.TradeEvent inside the newly created Phoenix app (we need to create a new exchange directory inside the /lib/hedgehog and paste the updated trade_event.ex file inside).

Besides the TradeEvent module, the core application supervises the PubSub supervision tree. After a quick check inside the /lib/hedgehog/application.ex file, we can confirm that Phoenix already uses PubSub:

  # /lib/hedgehog/application.ex
    children = [
      ...
      {Phoenix.PubSub, name: Hedgehog.PubSub},
    ...

Furthermore, the phoenix_pubsub dependency is already included in the mix.lock file, and its default backend is now Phoenix.PubSub.PG2(the adapter we have been explicitly specifying before).

The above change finishes merging the core app into our new Phoenix app. We should be able to run our app:

$ iex -S mix phx.server
...
[info] Running HedgehogWeb.Endpoint with Bandit 1.4.2 at 127.0.0.1:4000 (http)
...
iex(1)> alias Hedgehog.Exchange.TradeEvent
...
iex(2)> %TradeEvent{}
%Hedgehog.Exchange.TradeEvent{
...
}

23.5 Reintegrating the binance_mock app

The binance_mock app is used by both the naive and streamer apps, so we will need to focus on it next.

First, we will find a new home for the cached exchange info file which was previously based in

/apps/binance_mock/test/assets/exchange_info.json. We will create a new directory called /priv/cache where we will paste the exchange_info.json file.

Next, we will take care of configuration that was deciding should we use the cached exchange info file mentioned above - inside the new application it will look as follows:

  # /config/config.exs
config :hedgehog,
  ...
  exchanges: [ # <= added
    binance_mock: [ 
      use_cached_exchange_info: true
    ]
  ]

Moving on to the main file of interest - apps/binance_mock/lib/binance_mock.ex, which we will move to the /lib/hedgehog/exchange directory.

Inside the module, we need to apply the following changes:

  • change the module name to Hedgehog.Exchange.BinanceMock
  • update the alias to the Core.Struct.TradeEvent struct to be Hedgehog.Exchange.TradeEvent
  • update references to the Core.PubSub with Hedgehog.PubSub
  • extract to boolean config flag to a module’s attribute and use it inside the get_exchange_info/0 function:
  # /lib/hedgehog/exchange/binance_mock.ex
  @use_cached_exchange_info Application.compile_env!(:hedgehog, [
                              :exchanges,
                              :binance_mock,
                              :use_cached_exchange_info
                            ])
  ...
  def get_exchange_info() do
    case @use_cached_exchange_info do
    ...
  • Update the get_cached_exchange_info/0 function to point to the new location of the exchange_info.json file:
  # /lib/hedgehog/exchange/binance_mock.ex    
  defp get_cached_exchange_info do
    {:ok, data} =
      File.cwd!()
      |> Path.split()
      |> Kernel.++([
        "priv",
        "cache",
        "exchange_info.json"
      ])
      |> Path.join()
      |> File.read()
    ...

That finishes our changes to the Hedgehog.Exchange.BinanceMock module which we need to add supervision tree of our application:

  # /lib/hedgehog/application.ex
  def start(_type, _args) do
    children = [
      ...
      Hedgehog.Exchange.BinanceMock
    ...

The Hedgehog.Exchange.BinanceMock module depends on a few packages that we need to add to the mix.exs dependencies:

  # /mix.exs
  defp deps do
    [
      ...
      {:binance, "~> 1.0"},
      {:decimal, "~> 2.0"},
      ...

Please remember to run the mix deps.get before trying out our changes:

$ iex -S mix phx.server
...
iex(1)> alias Hedgehog.Exchange.BinanceMock
iex(2)> Process.whereis(BinanceMock)
#PID<...>
iex(3)> |> Process.alive?()
true
iex(4)> BinanceMock.get_exchange_info()
%{
  ...
}

The above confirms that we now have a BinanceMock process running in the background and we are able to fetch exchange info.

23.6 Reintegrating the streamer app

In the case of the streamer app, there are multiple files to move, so we will first create a new /lib/hedgehog/streamer(inside the new Phoenix app) directory and then copy both the /apps/streamer/lib/streamer directory and the /apps/streamer/lib/streamer.ex file into that new directory.

As we will be updating/renaming all of those files, we will use this opportunity to place all of them inside the Binance namespace.

We now need to modify each file starting with renaming /lib/hedgehog/streamer/streamer.ex to /lib/hedgehog/streamer/binance.ex and updating the module name and alias:

# /lib/hedgehog/streamer/binance.ex
defmodule Hedgehog.Streamer.Binance do
  ...
  alias Hedgehog.Streamer.Binance.DynamicStreamerSupervisor

The next step will be to update the /lib/hedgehog/streamer/streamer directory to

/lib/hedgehog/streamer/binance. We will now move on to the files inside this directory.

23.6.1 Supervisor

Starting with the supervisor.ex file, we need to update the module’s name and alias:

# /lib/hedgehog/streamer/binance/supervisor.ex
defmodule Hedgehog.Streamer.Binance.Supervisor do
   ...
   alias Hedgehog.Streamer.Binance.DynamicStreamerSupervisor

Moving forward, we can remove the repo.ex file as we will use the Hedgehog.Repo module to work with the database.

23.6.2 Worker

Next, we will rename the /lib/hedgehog/streamer/binance/binance.ex to

/lib/hedgehog/streamer/binance/worker.ex - mainly to avoid “binance/binance” name after we added the namespace. Following the filename change, we need to update the module’s name and a couple of references to the Core module:

# /lib/hedgehog/streamer/binance/worker.ex
defmodule Hedgehog.Streamer.Binance.Worker do
...
  defp process_event(%{"e" => "trade"} = event) do
    trade_event = %Hedgehog.Exchange.TradeEvent{
...
    Phoenix.PubSub.broadcast(
      Hedgehog.PubSub,
...

23.6.3 DynamicStreamerSupervisor

Next, we will update the dynamic_streamer_supervisor.ex, where we will update the module’s name and all the aliases:

# /lib/hedgehog/streamer/binance/dynamic_streamer_supervisor.ex
defmodule Hedgehog.Streamer.Binance.DynamicStreamerSupervisor do
...
  alias Hedgehog.Repo
  alias Hedgehog.Streamer.Binance.Worker
  alias Hedgehog.Streamer.Settings
...
  defp start_child(args) do
    DynamicSupervisor.start_child(
      __MODULE__,
      {Worker, args}
...

23.6.4 schema/settings.ex and schema/streaming_status_enum.ex

We will move the /lib/hedgehog/streamer/binance/schema/settings.ex file to

/lib/hedgehog/streamer/settings.ex and the

/lib/hedgehog/streamer/binance/schema/streaming_status_enum.ex file to

/lib/hedgehog/streamer/settings_status_enum.ex(file renamed). We can now remove the empty

/lib/hedgehog/streamer/binance/schema directory. Both of those modules need their modules’ names and references updated:

# /lib/hedgehog/streamer/settings.ex
defmodule Hedgehog.Streamer.Settings do
  ...
  alias Hedgehog.Streamer.SettingsStatusEnum
  ...
  schema "streamer_settings" do
    ...
    field(:status, SettingsStatusEnum)
  ...

and:

# /lib/hedgehog/streamer/settings_status_enum.ex
...
defenum(Hedgehog.Streamer.SettingsStatusEnum, :status, [:on, :off])

23.6.5 application

The final file that we need to look into is /lib/hedgehog/streamer/binance/application.ex, where we will look into children who were supervised by the streamer app:

# /lib/hedgehog/streamer/binance/application.ex
    children = [
      {Streamer.Repo, []},
      {Streamer.Supervisor, []}
    ]

We don’t need to worry about the Streamer.Repo as we will use the Hedgehog.Repo. On the other hand, the Streamer.Supervisor which we renamed to Hedgehog.Streamer.Binance.Supervisor needs to be added to the main Hedgehog.Application module:

# /lib/hedgehog/application.ex
    children = [
      ...
      Hedgehog.Streamer.Binance.Supervisor
    ]

We can now remove /lib/hedgehog/streamer/binance/application.ex as it is no longer required.

23.6.6 DB migrations and seeding

Streaming depends on the database tables and settings(seed data). We will start by copying

apps/streamer/priv/repo/migrations/20210203184805_create_settings.exs migration to

/priv/repo/migrations(and renaming the file to 20210203184805_create_streamer_settings.exs) directory and update the most of the code inside:

# /priv/repo/migrations/20210203184805_create_streamer_settings.exs
defmodule Hedgehog.Repo.Migrations.CreateStreamerSettings do
  ...
  alias Hedgehog.Streamer.SettingsStatusEnum

  def change do
    SettingsStatusEnum.create_type()

    create table(:streamer_settings, primary_key: false) do
      ...
      add(:status, SettingsStatusEnum.type(), default: "off", null: false)
    ...
    create(unique_index(:streamer_settings, [:symbol]))
  ...

Next, we will copy the seeding script’s code from /apps/streamer/priv/seed_settings.exs to

/priv/repo/seeds.exs and make the following updates:

# /priv/repo/seeds.exs
alias Hedgehog.Repo
alias Hedgehog.Streamer.Settings

binance_client = Application.compile_env(:hedgehog, :binance_client)

23.6.7 Config

Inside the script above, we were reading the application’s configuration expecting the binance_client to be there - let’s append the required settings to the configuration:

# /config/config.exs
config :hedgehog,
  binance_client: Hedgehog.Exchange.BinanceMock, # <= added
  ecto_repos: [Hedgehog.Repo],
  ...

23.6.8 Deps

We need to add a couple of new dependencies that the streaming code is using:

# /mix.exs
  defp deps do
    [
      ...
      {:ecto_enum, "~> 1.4"},
      {:websockex, "~> 0.4.2"}

With the above changes, the reintegration of the Streamer app is finished. We need to remember about getting new deps, running migrations, seeding database and we can test that everything works up to this point:

$ mix deps.get
...
$ mix setup
...
$ iex -S mix phx.server
...
iex(1)> Hedgehog.Streamer.Binance.start_streaming("XRPUSDT")
...
[info] Binance streamer is connecting to websocket stream for XRPUSDT trade events
{:ok, #PID<0.801.0>}
[debug] Trade event received XRPUSDT@0.55080000
BREAK: (a)bort (A)bort with dump (c)ontinue (p)roc info (i)nfo
       (l)oaded (v)ersion (k)ill (D)b-tables (d)istribution
^C
$ iex -S mix phx.server
...
[info] Binance streamer is connecting to websocket stream for XRPUSDT trade events
[debug] Trade event received XRPUSDT@0.55240000
iex(1)> Hedgehog.Streamer.Binance.stop_streaming("XRPUSDT")
[info] Stopping streaming XRPUSDT trade events
...

The above confirms that we can start and stop streaming, and upon startup, streaming starts on its own when enabled in the database.

23.7 Reintegrating the naive app

We will kick off the naive application reintegration by creating a new directory called strategy inside the /lib/hedgehog/, where we will place both the naive.ex file and the naive directory(both originally located inside the /apps/naive/lib/).

23.7.1 naive.ex

We can update the module name and aliases inside the naive.ex file:

# /lib/hedgehog/strategy/naive.ex
defmodule Hedgehog.Strategy.Naive do
  ..
  alias Hedgehog.Strategy.Naive.DynamicTraderSupervisor
  alias Hedgehog.Strategy.Naive.Trader

23.7.2 strategy.ex -> formula.ex

Moving on to the naive directory, first, we will rename the strategy.ex file to formula.ex and update the references inside:

# /lib/hedgehog/strategy/naive/formula.ex
defmodule Hedgehog.Strategy.Naive.Formula do
  alias Hedgehog.Exchange.TradeEvent
  alias Hedgehog.Repo
  alias Hedgehog.Strategy.Naive.Settings
  ...

  @binance_client Application.compile_env(:hedgehog, :binance_client)
  ...
  defp broadcast_order(%Binance.Order{} = order) do
    Phoenix.PubSub.broadcast(
      Hedgehog.PubSub,
    ...

23.7.3 trader.ex

Next, we will update the trader.ex:

# /lib/hedgehog/strategy/naive/trader.ex
defmodule Hedgehog.Strategy.Naive.Trader do
  ...
  alias Hedgehog.Exchange.TradeEvent
  alias Hedgehog.Strategy.Naive.Formula

as well as all update all references to:

  • Naive.Strategy or Strategy to Formula
  • Core.PubSub to Hedgehog.PubSub

23.7.4 supervisor.ex

Similarly, for the supervisor.ex file, we need to update the module name and alias:

# /lib/hedgehog/strategy/naive/supervisor.ex
defmodule Hedgehog.Strategy.Naive.Supervisor do
  ...
  alias Hedgehog.Strategy.Naive.DynamicTraderSupervisor

23.7.5 dynamic_trader_supervisor.ex

For the dynamic_trader_supervisor.ex file, we need to update the module name, aliases and all the references to the Strategy module:

# /lib/hedgehog/strategy/naive/dynamic_trader_supervisor.ex
defmodule Hedgehog.Strategy.Naive.DynamicTraderSupervisor do
  ...
  alias Hedgehog.Repo
  alias Hedgehog.Strategy.Naive.Settings
  alias Hedgehog.Strategy.Naive.Formula
  alias Hedgehog.Strategy.Naive.Trader
  ...
  # three occurrences of `Strategy.update_status/2` call
  Strategy.update_status(...) -> Formula.update_status(...)

23.7.6 Repo and schema files

As we will use Hedgehog.Repo, we can remove the /lib/hedgehog/strategy/naive/repo.ex file.

We will move both the schema/settings.ex and the schema/trading_status_enum.ex out of the schema directory into the main Naive’s strategy directory(/lib/hedgehog/strategy/naive). We can now remove the empty schema directory.

In line with the other Enum modules’ changes, we will update the file name from

/lib/hedgehog/strategy/naive/trading_status_enum.ex to

/lib/hedgehog/strategy/naive/settings_status_enum.ex.

Inside the file, we will update the module name inside the defenum:

# /lib/hedgehog/strategy/naive/settings_status_enum.ex
defenum(
  Hedgehog.Strategy.Naive.SettingsStatusEnum,
  :naive_trading_status,
  [:on, :off, :shutdown]
)

Moving on to the settings.ex file, where we need to update the module name, alias as well as table name and status enum module:

# /lib/hedgehog/strategy/naive/settings_status_enum.ex
defmodule Hedgehog.Strategy.Naive.Settings do
  ...
  alias Hedgehog.Strategy.Naive.SettingsStatusEnum
  ...
  schema "naive_strategy_settings" do
    ...
    field(:status, SettingsStatusEnum)

23.7.7 application.ex

The final file that we copied across from the naive umbrella app is application.ex, where we will look into the supervised children list. It was supervising the Naive.Repo, which we don’t need any more and Naive.Supervisor, which we renamed and need to put into our new main application.ex file:

# /lib/hedgehog/application.ex
  def start(_type, _args) do
    children = [
      ...
      Hedgehog.Strategy.Naive.Supervisor

After the above addition, we can remove the /lib/hedgehog/strategy/naive/application.ex file.

23.7.8 Migration and seeding

First, we will move two migration files from the /apps/naive/priv/repo/migrations/ to /priv/repo/migrations. We will rename the 20210202223209_create_settings.exs to

20210202223209_create_naive_strategy_settings.exs where we will update the module name and all references to modules and table name:

# /priv/repo/migrations/20210202223209_create_naive_strategy_settings.exs
defmodule Hedgehog.Repo.Migrations.CreateNaiveStrategySettings do
  ...
  alias Hedgehog.Strategy.Naive.SettingsStatusEnum

  def change do
    SettingsStatusEnum.create_type()

    create table(:naive_strategy_settings, primary_key: false) do
      ...
      add(:status, SettingsStatusEnum.type(), default: "off", null: false)
    ...

    create(unique_index(:naive_strategy_settings, [:symbol]))

Next, we will rename the 20210205232303_update_trading_status.exs to

20210205232303_update_naive_strategy_settings_status.exs where we will update the module name and all references to modules:

# /priv/repo/migrations/20210205232303_update_naive_strategy_settings_status.exs
defmodule Hedgehog.Repo.Migrations.UpdateNaiveStrategySettingsStatus do
  ...
  def change do
    Ecto.Migration.execute(
      "ALTER TYPE naive_trading_status ADD VALUE IF NOT EXISTS 'shutdown'"
    )

Finally, we need to merge the code that seeds the naive’s strategy settings into the /priv/repo/seeds.exs file:

# /priv/repo/seeds.exs
alias Hedgehog.Streamer.Settings, as: StreamerSettings # <= updated
alias Hedgehog.Strategy.Naive.Settings, as: NaiveStrategySettings # <= added
...
Logger.info("Inserting default streamer settings for symbols")
...
# updated, `on_conflict` added
{count, nil} = Repo.insert_all(StreamerSettings, maps, on_conflict: :nothing)

Logger.info("Inserted streamer settings for #{count} symbols")

# below added at the end
%{
  chunks: chunks,
  budget: budget,
  buy_down_interval: buy_down_interval,
  profit_interval: profit_interval,
  rebuy_interval: rebuy_interval
} = Application.compile_env(:hedgehog, [:strategy, :naive, :defaults])

base_settings = %{
  symbol: "",
  chunks: chunks,
  budget: Decimal.new(budget),
  buy_down_interval: Decimal.new(buy_down_interval),
  profit_interval: Decimal.new(profit_interval),
  rebuy_interval: Decimal.new(rebuy_interval),
  status: "off",
  inserted_at: timestamp,
  updated_at: timestamp
}

Logger.info("Inserting default naive strategy settings for symbols")

maps = symbols
  |> Enum.map(&(%{base_settings | symbol: &1["symbol"]}))

{count, nil} = Repo.insert_all(NaiveStrategySettings, maps, on_conflict: :nothing)

Logger.info("Inserted naive strategy settings for #{count} symbols")

23.7.9 Configuration

The seeding script above requires additional configuration that we will add now to the /config/config.exs file:

# /config/config.exs
config :hedgehog,
  ...
  strategy: [
    naive: [
      defaults: %{
        chunks: 5,
        budget: 1000,
        buy_down_interval: "0.0001",
        profit_interval: "-0.0012",
        rebuy_interval: "0.001"
      }
    ]
  ]

23.7.10 Tests

The final part of the naive application that we need to copy across are tests located in /apps/naive/test. We will create new directories /test/hedgehog/strategy and paste there the /apps/naive/test/naive_test.exs file and /apps/naive/test/naive directory.

First, we will update the naive_test.exs by updating its module name and aliases:

# /test/hedgehog/strategy/naive_test.exs
defmodule Hedgehog.Strategy.NaiveTest do
  ...
  doctest Hedgehog.Strategy.Naive

  alias Hedgehog.Exchange.Order # <= to be migrated...
  alias Hedgehog.Strategy.Naive.Settings, as: TradingSettings
  alias Hedgehog.Exchange.TradeEvent

Next, we will rename the /test/hedgehog/strategy/naive/strategy_test.exs to

/test/hedgehog/strategy/naive/formula_test.exs and update its module name and aliases:

# /test/hedgehog/strategy/naive/formula_test.exs
defmodule Hedgehog.Strategy.Naive.FormulaTest do
  ...
  alias Hedgehog.Exchange.BinanceMock    # <= added
  alias Hedgehog.Exchange.TradeEvent
  alias Hedgehog.Strategy.Naive.Formula
  ...
  # update all references to `Strategy` with `Formula`

Finally, we will overwrite the test_helper.exs based on the one from the naive app:

# /test/test_helper.exs
ExUnit.start(capture_log: true)

Application.ensure_all_started(:mimic)

Mimic.copy(Hedgehog.Exchange.BinanceMock)
Mimic.copy(Phoenix.PubSub)

23.7.11 Dependencies

The final part of the integration will be to move dependencies to the mix.exs file:

# /mix.exs
  defp deps do
    [
       ...
       {:mimic, "~> 1.7", only: [:test, :integration]}

The final test will be to fetch the required deps and run the unit tests:

$ mix deps.get
...
$ mix setup
...
$ mix test --only unit --no-start
...
......
Finished in 0.1 seconds (0.1s async, 0.00s sync)
17 tests, 0 failures, 6 excluded

We will see a lot of warnings as we haven’t yet migrated the DataWarehouse application. Either way, we received the confirmation that unit tests ran successfully. We are also able to run the application to see that it can go through the trading cycle:

$ iex -S mix phx.server
...
iex(1)> alias Hedgehog.Strategy.Naive
...
iex(2)> alias Hedgehog.Streamer.Binance
...
iex(3)> Binance.start_streaming("XRPUSDT")
...
iex(4)> Naive.start_trading("XRPUSDT")
...
[info] Position (XRPUSDT/1715703392316): Placing a BUY order @ 0.50650000,
quantity: 394.86000000
[info] Position (XRPUSDT/1715704251578): The BUY order is now partially filled
[info] Position (XRPUSDT/1715704251578): The BUY order is now filled. Placing a
SELL order @ 0.50690000, quantity: 394.86000000
[info] Position (XRPUSDT/1715704251578): The SELL order is now partially filled
[info] Position (XRPUSDT/1715704251578): Trade cycle finished
[info] Position (XRPUSDT/1715704342038): Placing a BUY order @ 0.50680000,
quantity: 394.63000000

The above confirms that our trading strategy works - we have successfully integrated the naive app. Things will be easier from now on.

23.8 Reintegrating the data_warehouse app

Inside the new Phoenix application, we will reintegrate the previously named data_warehoure app into data/collector and data/publisher “namespaces”.

We will start by creating a new directory called data inside the lib/headgehog directory. Inside the data directory, we will create a collector directory. We will update all the subscriber_* modules to collector_*.

We will kick off the transition by copying files from the /apps/data_warehouse/lib/data_warehouse/ directory to /lib/hedgehog/ and renaming them as follows:

  • subscriber_supervisor.ex -> data/collector/collector_supervisor.ex
  • subscriber/dynamic_supervisor.ex -> data/collector/dynamic_worker_supervisor.ex
  • subscriber/worker.ex -> data/collector/worker.ex
  • publisher.ex -> data/publisher.ex
  • schema/order.ex -> exchange/order.ex
  • schema/trade_event.ex -> exchange/trade_event.ex (overwrite)
  • schema/subscriber_settings.ex -> data/collector/settings.ex
  • schema/subscriber_status_enum.ex -> data/collector/settings_status_enum.ex
  • ../data_warehouse.ex -> data/collector.ex

Now, we will update the new(copied/renamed) files one by one.

23.8.1 exchange/order.ex

We will start with exchange/order.ex, where we will update the module name:

# /lib/hedgehog/exchange/order.ex
defmodule Hedgehog.Exchange.Order do

23.8.2 exchange/trade_event.ex

This file got overwritten by the contents from the data_warehoure app - we just need to update the module name:

# /lib/hedgehog/exchange/trade_event.ex
defmodule Hedgehog.Exchange.TradeEvent do

23.8.3 data/publisher.ex

This module uses tons of other modules, including Repo - we need to update all of these:

# /lib/hedgehog/data/publisher.ex
defmodule Hedgehog.Data.Publisher do
...
  alias Hedgehog.Repo                 # <= added
  alias Hedgehog.Exchange.TradeEvent  # <= added
  ...
  def start(arg) do # <= renamed the `start_link/1` function 
    ...
  end
  ...
  def run(%{
    ...
 }) do
    ...
    Repo.transaction(
      fn ->
        from(te in TradeEvent,
          ...
        )
        |> Repo.stream()
    ...
  end

  defp publish_trade_event(%TradeEvent{} = trade_event) do
    new_trade_event =
      struct(
        TradeEvent,
        ...
      )

    Phoenix.PubSub.broadcast(
      Hedgehog.PubSub,
      ...
    )
...

23.8.4 data/collector.ex

This module was an interface for both collectors and publishers. From now on, it will only cater for collectors:

# /lib/hedgehog/data/collector.ex
defmodule Hedgehog.Data.Collector do
  @moduledoc """
 Documentation for `Hedgehog.Data.Collector`.
 """
  alias Hedgehog.Data.Collector.DynamicWorkerSupervisor

  def start_storing(stream, symbol) do
    ...
    |> DynamicWorkerSupervisor.start_worker()
  end

  def stop_storing(stream, symbol) do
    ...
    |> DynamicWorkerSupervisor.stop_worker()
  end

  # remove `publish_data/1`

23.8.5 data/collector/collector_supervisor.ex

In the case of the main collector supervisor, we need to update the aliases:

# /lib/hedgehog/data/collector/collector_supervisor.ex
defmodule Hedgehog.Data.Collector.CollectorSupervisor do
...
  alias Hedgehog.Data.Collector.DynamicWorkerSupervisor
...
  @registry :collector_workers
...
  def init(_args) do
    children = [
      ...
      {DynamicWorkerSupervisor, []},
      {Task,
        fn ->
          DynamicWorkerSupervisor.autostart_workers()
        end}
     ]
    ...

23.8.6 /data/collector/dynamic_worker_supervisor.ex

In the case of the dynamic worker supervisor that we renamed, we need to update the module name:

# /lib/hedgehog/data/collector/dynamic_worker_supervisor.ex
defmodule Hedgehog.Data.Collector.DynamicWorkerSupervisor do
  ...
  alias Hedgehog.Repo
  alias Hedgehog.Data.Collector.Settings
  alias Hedgehog.Data.Collector.Worker
  ...
  @registry :collector_workers
  ...
  def autostart_workers do
    Repo.all(
      from(s in Settings,
   ...
  defp update_status(topic, status)
       when is_binary(topic) and is_binary(status) do
 %Settings{
  ...

23.8.7 /data/collector/settings_status_enum.ex

As in the case of other settings’ status enums - we need to update the module name and field name:

# /lib/hedgehog/data/collector/settings_status_enum.ex
import EctoEnum

defenum(Hedgehog.Data.Collector.SettingsStatusEnum, :collector_status, [:on, :off])

23.8.8 /data/collector/settings.ex

For the settings schema, we need to update module name, aliases, and table name:

# /lib/hedgehog/data/collector/settings.ex
defmodule Hedgehog.Data.Collector.Settings do
  ...
  alias Hedgehog.Data.Collector.SettingsStatusEnum
  ...
  schema "collector_settings" do
    ...
    field(:status, SettingsStatusEnum)

23.8.9 /data/collector/worker.ex

The final module from the data_warehouse app is a collector worker, where we will update the module name and a few references:

# /lib/hedgehog/data/collector/worker.ex
defmodule Hedgehog.Data.Collector.Worker do
  ...
  alias Hedgehog.Exchange.Order      # <= added
  alias Hedgehog.Exchange.TradeEvent # <= added
  alias Hedgehog.Repo                # <= added
  ...
  def init(topic) do
    Logger.info("Collector worker is subscribing to #{topic}")

    Phoenix.PubSub.subscribe(
      Hedgehog.PubSub,
      ...
     )
  ...
  def handle_info(%TradeEvent{} = trade_event, state) do
    ...

    struct!(TradeEvent, opts)
    |> Repo.insert()
  ...
  def handle_info(%Binance.Order{} = order, state) do
    ...

    struct(Order, data)
    |> Map.merge(%{
      ...
    })
    |> Repo.insert(
    ...
  ...
  defp via_tuple(topic) do
    {:via, Registry, {:collector_workers, topic}}

This finishes the module updates. We can move on to the other files from the data_warehouse application.

23.8.10 Supervision tree

When we were copying the modules from the data_warehouse app, we skipped over its application.ex module. Looking inside, we can see that it was supervising the DataWarehouse.SubscriberSupervisor (currently renamed to Hedgehog.Data.Collector.CollectorSupervisor) - we need to add it to the main supervision tree of our new application:

# /lib/hedgehog/application.ex
  def start(_type, _args) do
    children = [
      ...
      Hedgehog.Data.Collector.CollectorSupervisor

23.8.11 Migrations

We will copy three migration files across from the data_warehouse app (the

/apps/data_warehouse/priv/repo/migrations/ directory) to the new Phoenix application (the

priv/repo/migrations/ directory).

In the case of the 20210227230123_create_subscriber_settings.exs, we need to rename it to

20210227230123_create_collector_settings.exs.

For all three of the migration files, we need to update module names:

# /priv/repo/migrations/20210222224514_create_trade_events.exs
defmodule Hedgehog.Repo.Migrations.CreateTradeEvents do
# /priv/repo/migrations/20210222224522_create_orders.exs
defmodule Hedgehog.Repo.Migrations.CreateOrders do

for the final file we will also change alias and table name:

# /priv/repo/migrations/20210227230123_create_collector_settings.exs
defmodule Hedgehog.Repo.Migrations.CreateSubscriberSettings do
  ...
  alias Hedgehog.Data.Collector.SettingsStatusEnum

  def change do
    SettingsStatusEnum.create_type()

    create table(:collector_settings, primary_key: false) do
      ...
      add(:status, SettingsStatusEnum.type(), default: "off", null: false)
    ...
    create(unique_index(:collector_settings, [:topic]))

The above changes finish the integration of the data_warehouse application. We can go ahead and drop the database, set the application again, and confirm that it still works:

$ mix ecto.drop
...
$ mix setup
...
$ mix test --only unit --no-start
...
..........
Finished in 0.1 seconds (0.1s async, 0.00s sync)
17 tests, 0 failures, 6 excluded
$ iex -S mix phx.server
...
iex(1)> alias Hedgehog.Strategy.Naive
...
iex(2)> alias Hedgehog.Streamer.Binance
...
iex(3)> alias Hedgehog.Data.Collector
...
iex(4)> Collector.start_storing("TRADE_EVENTS", "XRPUSDT")
...
iex(5)> Collector.start_storing("ORDERS", "XRPUSDT")
...
iex(6)> Binance.start_streaming("XRPUSDT")
...
iex(7)> Naive.start_trading("XRPUSDT")
...
[debug] QUERY OK source="trade_events" ...
INSERT INTO "trade_events"
...
[debug] QUERY OK source="orders" ...
INSERT INTO "orders"

The above log messages confirm that we are streaming trade events from Binance, placing orders, and storing both in the database.

This finishes the integration of the data_warehouse application.

23.9 Reintegrating the indicator app

The final application that we will integrate into our new Phoenix app is indicator, which luckily has only three files that we will move across.

We will use this opportunity to rename the indicators to aggregators.

Let’s start by creating a new directory called aggregator inside the /lib/hedgehog/data/ directory.

We will copy the /apps/indicator/lib/indicator/ohlc directory and the

/apps/indicator/lib/indicator/ohlc.ex file into it.

We will also copy the /apps/indicator/lib/indicator.ex to the /lib/hedgehog/data directory and rename it to aggregator.ex.

Now, we can update each file to fit the new naming convention.

23.9.1 /data/aggregator.ex

Here, we need to update the module name a reference to the worker module:

# /lib/hedgehog/data/aggregator.ex
defmodule Hedgehog.Data.Aggregator do
  @moduledoc """
 Documentation for `Hedgehog.Data.Aggregator`.
 """

  def aggregate_ohlcs(symbol) do
    DynamicSupervisor.start_child(
      Hedgehog.Data.Aggregator.DynamicWorkerSupervisor,
 {Hedgehog.Data.Aggregator.Ohlc.Worker, symbol}

23.9.2 /data/aggregator/ohlc.ex

Inside the ohlc module we need to update the reference to the trade event and pubsub:

# /lib/hedgehog/data/aggregator/ohlc.ex
defmodule Hedgehog.Data.Aggregator.Ohlc do
  ...
  alias Hedgehog.Exchange.TradeEvent
  ...
  defp maybe_broadcast(%__MODULE__{} = ohlc) do
    ...
    Phoenix.PubSub.broadcast(
      Hedgehog.PubSub,
    ...

23.9.3 /data/aggregator/ohlc/worker.ex

The final file to update will be the worker module, where we need to update the references to pubsub and the ohlc module:

# /lib/hedgehog/data/aggregator/ohlc/worker.ex
defmodule Hedgehog.Data.Aggregator.Ohlc.Worker do
  ...
  alias Hedgehog.Data.Aggregator.Ohlc # <= added
  alias Hedgehog.Exchange.TradeEvent
  ...
  def init(symbol) do
    ...
    Phoenix.PubSub.subscribe(
      Hedgehog.PubSub,
  ...
  def handle_info(%TradeEvent{} = trade_event, ohlc) do
    {:noreply, Ohlc.process(ohlc, trade_event)}
    ...

23.9.4 Supervision tree

A quick look at the application.ex module of the indicator app will tell us that we need to add a dynamic supervisor to the supervision tree of the new app (we referred to it already as

Hedgehog.Data.Aggregator.DynamicWorkerSupervisor inside the Hedgehog.Data.Aggregator as we were updating it):

# /lib/hedgehog/application.ex
  def start(_type, _args) do
    children = [
      ...
      {DynamicSupervisor,
       strategy: :one_for_one, name: Hedgehog.Data.Aggregator.DynamicWorkerSupervisor}
    ...

The above change finishes integrating the indicator app and the changes in this chapter, as it was the last application to be merged in.

We can make the last check that everything works by starting the streaming and aggregating:

$ mix ecto.drop
...
$ mix setup
...
$ iex -S mix phx.server
...
iex(1)> alias Hedgehog.Streamer.Binance
...
iex(2)> alias Hedgehog.Data.Aggregator
...
iex(3)> Aggregator.aggregate_ohlcs("XRPUSDT")
...
iex(4)> Binance.start_streaming("XRPUSDT")
...
[debug] Broadcasting OHLC: %Hedgehog.Data.Aggregator.Ohlc{symbol: "XRPUSDT",
start_time: 1717173780, duration: 1, open: "0.51350000", high: "0.51380000",
low: "0.51350000", close: "0.51370000"}

The above log confirms that we have a fully working aggregation that gets broadcasted and could be stored in the database or used inside the strategy.

It was a lot of repetitive copy/moving/renaming - thank you for sticking with me through this. After our migration to Phoenix application we will have a great start into potential deploying, which we will focus on in the upcoming chapter!

[Note] Please remember to run the mix format to keep things nice and tidy.

The source code for this chapter can be found on GitHub