Chapter 23 Back to the Monolith
23.1 Objectives
- reverting the course to the monolith
- creating a new Phoenix app
- reintegrating the
core
app - reintegrating the
binance_mock
app - reintegrating the
streamer
app - reintegrating the
naive
app - reintegrating the
data_warehouse
app - reintegrating the
indicator
app
23.2 Reverting the course to the monolith
We started our journey with the idea that we would use an umbrella to “future-proof” our project in case we would migrate to microservices architecture.
As we delved deeper into our project(which also included me gaining more knowledge and experience over three years of writing this book), we began to see that the umbrella approach was, at best, an example of overengineering.
Looking back, the umbrella approach introduced significant complexity, a factor that became increasingly difficult to justify as our understanding of the project evolved.
For example, each umbrella app has its configuration, which resulted in using multiple databases and duplicated settings for things like the Binance client, making our deployment unnecessarily expensive and complex.
In this chapter, we will simplify the whole project by ditching the umbrella and migrating to Phoenix, which will be instrumental in our next steps, including deployment and clustering.
23.3 Creating a new Phoenix app
Instead of improving the existing structure, we will start from scratch. We will create a new Phoenix project side-by-side next to the current umbrella app, and then we will reintegrate each nested app one after another.
First, let’s make sure that we have the latest version of the Phoenix application generator installed:
$ mix archive.install hex phx_new
Note: At the time of writing this chapter, the current version of the Phoenix framework is 1.7.12.
With this taken care of, we can progress with the creation of a new Phoenix application and a database:
$ mix phx.new hedgehog
...
$ cd hedgehog
$ mix ecto.create
With the Phoenix app skeleton in place, we can move on to reintegrating the umbrella apps’ contents into it.
23.4 Reintegrating the core
app
We will start with the core
app, as it is used/referenced by other apps in the umbrella.
It contains just a single module called Core.Struct.TradeEvent
(inside the apps/core/lib/core/struct/trade_event.ex
file) which we will rename to Hedgehog.Exchange.TradeEvent
inside the newly created Phoenix app (we need to create a new exchange
directory inside the /lib/hedgehog
and paste the updated trade_event.ex
file inside).
Besides the TradeEvent
module, the core
application supervises the PubSub
supervision tree. After a quick check inside the /lib/hedgehog/application.ex
file, we can confirm that Phoenix already uses PubSub:
Furthermore, the phoenix_pubsub
dependency is already included in the mix.lock
file, and its default backend is now Phoenix.PubSub.PG2
(the adapter we have been explicitly specifying before).
The above change finishes merging the core
app into our new Phoenix app. We should be able to run our app:
$ iex -S mix phx.server
...
[info] Running HedgehogWeb.Endpoint with Bandit 1.4.2 at 127.0.0.1:4000 (http)
...
iex(1)> alias Hedgehog.Exchange.TradeEvent
...
iex(2)> %TradeEvent{}
%Hedgehog.Exchange.TradeEvent{
...
}
23.5 Reintegrating the binance_mock
app
The binance_mock
app is used by both the naive
and streamer
apps, so we will need to focus on it next.
First, we will find a new home for the cached exchange info file which was previously based in
/apps/binance_mock/test/assets/exchange_info.json
. We will create a new directory called /priv/cache
where we will paste the exchange_info.json
file.
Next, we will take care of configuration that was deciding should we use the cached exchange info file mentioned above - inside the new application it will look as follows:
# /config/config.exs
config :hedgehog,
...
exchanges: [ # <= added
binance_mock: [
use_cached_exchange_info: true
]
]
Moving on to the main file of interest - apps/binance_mock/lib/binance_mock.ex
, which we will move to the /lib/hedgehog/exchange
directory.
Inside the module, we need to apply the following changes:
- change the module name to
Hedgehog.Exchange.BinanceMock
- update the alias to the
Core.Struct.TradeEvent
struct to beHedgehog.Exchange.TradeEvent
- update references to the
Core.PubSub
withHedgehog.PubSub
- extract to boolean config flag to a module’s attribute and use it inside the
get_exchange_info/0
function:
# /lib/hedgehog/exchange/binance_mock.ex
@use_cached_exchange_info Application.compile_env!(:hedgehog, [
:exchanges,
:binance_mock,
:use_cached_exchange_info
])
...
def get_exchange_info() do
case @use_cached_exchange_info do
...
- Update the
get_cached_exchange_info/0
function to point to the new location of theexchange_info.json
file:
# /lib/hedgehog/exchange/binance_mock.ex
defp get_cached_exchange_info do
{:ok, data} =
File.cwd!()
|> Path.split()
|> Kernel.++([
"priv",
"cache",
"exchange_info.json"
])
|> Path.join()
|> File.read()
...
That finishes our changes to the Hedgehog.Exchange.BinanceMock
module which we need to add supervision tree of our application:
# /lib/hedgehog/application.ex
def start(_type, _args) do
children = [
...
Hedgehog.Exchange.BinanceMock
...
The Hedgehog.Exchange.BinanceMock
module depends on a few packages that we need to add to the mix.exs
dependencies:
Please remember to run the mix deps.get
before trying out our changes:
$ iex -S mix phx.server
...
iex(1)> alias Hedgehog.Exchange.BinanceMock
iex(2)> Process.whereis(BinanceMock)
#PID<...>
iex(3)> |> Process.alive?()
true
iex(4)> BinanceMock.get_exchange_info()
%{
...
}
The above confirms that we now have a BinanceMock
process running in the background and we are able to fetch exchange info.
23.6 Reintegrating the streamer
app
In the case of the streamer
app, there are multiple files to move, so we will first create a new /lib/hedgehog/streamer
(inside the new Phoenix app) directory and then copy both the /apps/streamer/lib/streamer
directory and the /apps/streamer/lib/streamer.ex
file into that new directory.
As we will be updating/renaming all of those files, we will use this opportunity to place all of them inside the Binance
namespace.
We now need to modify each file starting with renaming /lib/hedgehog/streamer/streamer.ex
to /lib/hedgehog/streamer/binance.ex
and updating the module name and alias:
# /lib/hedgehog/streamer/binance.ex
defmodule Hedgehog.Streamer.Binance do
...
alias Hedgehog.Streamer.Binance.DynamicStreamerSupervisor
The next step will be to update the /lib/hedgehog/streamer/streamer
directory to
/lib/hedgehog/streamer/binance
. We will now move on to the files inside this directory.
23.6.1 Supervisor
Starting with the supervisor.ex
file, we need to update the module’s name and alias:
# /lib/hedgehog/streamer/binance/supervisor.ex
defmodule Hedgehog.Streamer.Binance.Supervisor do
...
alias Hedgehog.Streamer.Binance.DynamicStreamerSupervisor
Moving forward, we can remove the repo.ex
file as we will use the Hedgehog.Repo
module to work with the database.
23.6.2 Worker
Next, we will rename the /lib/hedgehog/streamer/binance/binance.ex
to
/lib/hedgehog/streamer/binance/worker.ex
- mainly to avoid “binance/binance” name after we added the namespace. Following the filename change, we need to update the module’s name and a couple of references to the Core
module:
23.6.3 DynamicStreamerSupervisor
Next, we will update the dynamic_streamer_supervisor.ex
, where we will update the module’s name and all the aliases:
# /lib/hedgehog/streamer/binance/dynamic_streamer_supervisor.ex
defmodule Hedgehog.Streamer.Binance.DynamicStreamerSupervisor do
...
alias Hedgehog.Repo
alias Hedgehog.Streamer.Binance.Worker
alias Hedgehog.Streamer.Settings
...
defp start_child(args) do
DynamicSupervisor.start_child(
__MODULE__,
{Worker, args}
...
23.6.4 schema/settings.ex and schema/streaming_status_enum.ex
We will move the /lib/hedgehog/streamer/binance/schema/settings.ex
file to
/lib/hedgehog/streamer/settings.ex
and the
/lib/hedgehog/streamer/binance/schema/streaming_status_enum.ex
file to
/lib/hedgehog/streamer/settings_status_enum.ex
(file renamed). We can now remove the empty
/lib/hedgehog/streamer/binance/schema
directory. Both of those modules need their modules’ names and references updated:
# /lib/hedgehog/streamer/settings.ex
defmodule Hedgehog.Streamer.Settings do
...
alias Hedgehog.Streamer.SettingsStatusEnum
...
schema "streamer_settings" do
...
field(:status, SettingsStatusEnum)
...
and:
23.6.5 application
The final file that we need to look into is /lib/hedgehog/streamer/binance/application.ex
, where we will look into children who were supervised by the streamer
app:
# /lib/hedgehog/streamer/binance/application.ex
children = [
{Streamer.Repo, []},
{Streamer.Supervisor, []}
]
We don’t need to worry about the Streamer.Repo
as we will use the Hedgehog.Repo
. On the other hand, the Streamer.Supervisor
which we renamed to Hedgehog.Streamer.Binance.Supervisor
needs to be added to the main Hedgehog.Application
module:
We can now remove /lib/hedgehog/streamer/binance/application.ex
as it is no longer required.
23.6.6 DB migrations and seeding
Streaming depends on the database tables and settings(seed data). We will start by copying
apps/streamer/priv/repo/migrations/20210203184805_create_settings.exs
migration to
/priv/repo/migrations
(and renaming the file to 20210203184805_create_streamer_settings.exs
) directory and update the most of the code inside:
# /priv/repo/migrations/20210203184805_create_streamer_settings.exs
defmodule Hedgehog.Repo.Migrations.CreateStreamerSettings do
...
alias Hedgehog.Streamer.SettingsStatusEnum
def change do
SettingsStatusEnum.create_type()
create table(:streamer_settings, primary_key: false) do
...
add(:status, SettingsStatusEnum.type(), default: "off", null: false)
...
create(unique_index(:streamer_settings, [:symbol]))
...
Next, we will copy the seeding script’s code from /apps/streamer/priv/seed_settings.exs
to
/priv/repo/seeds.exs
and make the following updates:
23.6.7 Config
Inside the script above, we were reading the application’s configuration expecting the binance_client
to be there - let’s append the required settings to the configuration:
23.6.8 Deps
We need to add a couple of new dependencies that the streaming code is using:
With the above changes, the reintegration of the Streamer
app is finished. We need to remember about getting new deps, running migrations, seeding database and we can test that everything works up to this point:
$ mix deps.get
...
$ mix setup
...
$ iex -S mix phx.server
...
iex(1)> Hedgehog.Streamer.Binance.start_streaming("XRPUSDT")
...
[info] Binance streamer is connecting to websocket stream for XRPUSDT trade events
{:ok, #PID<0.801.0>}
[debug] Trade event received XRPUSDT@0.55080000
BREAK: (a)bort (A)bort with dump (c)ontinue (p)roc info (i)nfo
(l)oaded (v)ersion (k)ill (D)b-tables (d)istribution
^C
$ iex -S mix phx.server
...
[info] Binance streamer is connecting to websocket stream for XRPUSDT trade events
[debug] Trade event received XRPUSDT@0.55240000
iex(1)> Hedgehog.Streamer.Binance.stop_streaming("XRPUSDT")
[info] Stopping streaming XRPUSDT trade events
...
The above confirms that we can start and stop streaming, and upon startup, streaming starts on its own when enabled in the database.
23.7 Reintegrating the naive
app
We will kick off the naive
application reintegration by creating a new directory called strategy
inside the /lib/hedgehog/
, where we will place both the naive.ex
file and the naive
directory(both originally located inside the /apps/naive/lib/
).
23.7.2 strategy.ex -> formula.ex
Moving on to the naive
directory, first, we will rename the strategy.ex
file to formula.ex
and update the references inside:
# /lib/hedgehog/strategy/naive/formula.ex
defmodule Hedgehog.Strategy.Naive.Formula do
alias Hedgehog.Exchange.TradeEvent
alias Hedgehog.Repo
alias Hedgehog.Strategy.Naive.Settings
...
@binance_client Application.compile_env(:hedgehog, :binance_client)
...
defp broadcast_order(%Binance.Order{} = order) do
Phoenix.PubSub.broadcast(
Hedgehog.PubSub,
...
23.7.3 trader.ex
Next, we will update the trader.ex
:
# /lib/hedgehog/strategy/naive/trader.ex
defmodule Hedgehog.Strategy.Naive.Trader do
...
alias Hedgehog.Exchange.TradeEvent
alias Hedgehog.Strategy.Naive.Formula
as well as all update all references to:
Naive.Strategy
orStrategy
toFormula
Core.PubSub
toHedgehog.PubSub
23.7.4 supervisor.ex
Similarly, for the supervisor.ex
file, we need to update the module name and alias:
23.7.5 dynamic_trader_supervisor.ex
For the dynamic_trader_supervisor.ex
file, we need to update the module name, aliases and all the references to the Strategy
module:
# /lib/hedgehog/strategy/naive/dynamic_trader_supervisor.ex
defmodule Hedgehog.Strategy.Naive.DynamicTraderSupervisor do
...
alias Hedgehog.Repo
alias Hedgehog.Strategy.Naive.Settings
alias Hedgehog.Strategy.Naive.Formula
alias Hedgehog.Strategy.Naive.Trader
...
# three occurrences of `Strategy.update_status/2` call
Strategy.update_status(...) -> Formula.update_status(...)
23.7.6 Repo and schema
files
As we will use Hedgehog.Repo
, we can remove the /lib/hedgehog/strategy/naive/repo.ex
file.
We will move both the schema/settings.ex
and the schema/trading_status_enum.ex
out of the schema
directory into the main Naive’s strategy directory(/lib/hedgehog/strategy/naive
). We can now remove the empty schema
directory.
In line with the other Enum modules’ changes, we will update the file name from
/lib/hedgehog/strategy/naive/trading_status_enum.ex
to
/lib/hedgehog/strategy/naive/settings_status_enum.ex
.
Inside the file, we will update the module name inside the defenum
:
# /lib/hedgehog/strategy/naive/settings_status_enum.ex
defenum(
Hedgehog.Strategy.Naive.SettingsStatusEnum,
:naive_trading_status,
[:on, :off, :shutdown]
)
Moving on to the settings.ex
file, where we need to update the module name, alias as well as table name and status enum module:
23.7.7 application.ex
The final file that we copied across from the naive
umbrella app is application.ex
, where we will look into the supervised children
list. It was supervising the Naive.Repo
, which we don’t need any more and Naive.Supervisor
, which we renamed and need to put into our new main application.ex
file:
# /lib/hedgehog/application.ex
def start(_type, _args) do
children = [
...
Hedgehog.Strategy.Naive.Supervisor
After the above addition, we can remove the /lib/hedgehog/strategy/naive/application.ex
file.
23.7.8 Migration and seeding
First, we will move two migration files from the /apps/naive/priv/repo/migrations/
to /priv/repo/migrations
. We will rename the 20210202223209_create_settings.exs
to
20210202223209_create_naive_strategy_settings.exs
where we will update the module name and all references to modules and table name:
# /priv/repo/migrations/20210202223209_create_naive_strategy_settings.exs
defmodule Hedgehog.Repo.Migrations.CreateNaiveStrategySettings do
...
alias Hedgehog.Strategy.Naive.SettingsStatusEnum
def change do
SettingsStatusEnum.create_type()
create table(:naive_strategy_settings, primary_key: false) do
...
add(:status, SettingsStatusEnum.type(), default: "off", null: false)
...
create(unique_index(:naive_strategy_settings, [:symbol]))
Next, we will rename the 20210205232303_update_trading_status.exs
to
20210205232303_update_naive_strategy_settings_status.exs
where we will update the module name and all references to modules:
# /priv/repo/migrations/20210205232303_update_naive_strategy_settings_status.exs
defmodule Hedgehog.Repo.Migrations.UpdateNaiveStrategySettingsStatus do
...
def change do
Ecto.Migration.execute(
"ALTER TYPE naive_trading_status ADD VALUE IF NOT EXISTS 'shutdown'"
)
Finally, we need to merge the code that seeds the naive’s strategy settings into the /priv/repo/seeds.exs
file:
# /priv/repo/seeds.exs
alias Hedgehog.Streamer.Settings, as: StreamerSettings # <= updated
alias Hedgehog.Strategy.Naive.Settings, as: NaiveStrategySettings # <= added
...
Logger.info("Inserting default streamer settings for symbols")
...
# updated, `on_conflict` added
{count, nil} = Repo.insert_all(StreamerSettings, maps, on_conflict: :nothing)
Logger.info("Inserted streamer settings for #{count} symbols")
# below added at the end
%{
chunks: chunks,
budget: budget,
buy_down_interval: buy_down_interval,
profit_interval: profit_interval,
rebuy_interval: rebuy_interval
} = Application.compile_env(:hedgehog, [:strategy, :naive, :defaults])
base_settings = %{
symbol: "",
chunks: chunks,
budget: Decimal.new(budget),
buy_down_interval: Decimal.new(buy_down_interval),
profit_interval: Decimal.new(profit_interval),
rebuy_interval: Decimal.new(rebuy_interval),
status: "off",
inserted_at: timestamp,
updated_at: timestamp
}
Logger.info("Inserting default naive strategy settings for symbols")
maps = symbols
|> Enum.map(&(%{base_settings | symbol: &1["symbol"]}))
{count, nil} = Repo.insert_all(NaiveStrategySettings, maps, on_conflict: :nothing)
Logger.info("Inserted naive strategy settings for #{count} symbols")
23.7.9 Configuration
The seeding script above requires additional configuration that we will add now to the /config/config.exs
file:
23.7.10 Tests
The final part of the naive
application that we need to copy across are tests located in /apps/naive/test
. We will create new directories /test/hedgehog/strategy
and paste there the /apps/naive/test/naive_test.exs
file and /apps/naive/test/naive
directory.
First, we will update the naive_test.exs
by updating its module name and aliases:
# /test/hedgehog/strategy/naive_test.exs
defmodule Hedgehog.Strategy.NaiveTest do
...
doctest Hedgehog.Strategy.Naive
alias Hedgehog.Exchange.Order # <= to be migrated...
alias Hedgehog.Strategy.Naive.Settings, as: TradingSettings
alias Hedgehog.Exchange.TradeEvent
Next, we will rename the /test/hedgehog/strategy/naive/strategy_test.exs
to
/test/hedgehog/strategy/naive/formula_test.exs
and update its module name and aliases:
# /test/hedgehog/strategy/naive/formula_test.exs
defmodule Hedgehog.Strategy.Naive.FormulaTest do
...
alias Hedgehog.Exchange.BinanceMock # <= added
alias Hedgehog.Exchange.TradeEvent
alias Hedgehog.Strategy.Naive.Formula
...
# update all references to `Strategy` with `Formula`
Finally, we will overwrite the test_helper.exs
based on the one from the naive
app:
23.7.11 Dependencies
The final part of the integration will be to move dependencies to the mix.exs
file:
The final test will be to fetch the required deps and run the unit tests:
$ mix deps.get
...
$ mix setup
...
$ mix test --only unit --no-start
...
......
Finished in 0.1 seconds (0.1s async, 0.00s sync)
17 tests, 0 failures, 6 excluded
We will see a lot of warnings as we haven’t yet migrated the DataWarehouse
application. Either way, we received the confirmation that unit tests ran successfully. We are also able to run the application to see that it can go through the trading cycle:
$ iex -S mix phx.server
...
iex(1)> alias Hedgehog.Strategy.Naive
...
iex(2)> alias Hedgehog.Streamer.Binance
...
iex(3)> Binance.start_streaming("XRPUSDT")
...
iex(4)> Naive.start_trading("XRPUSDT")
...
[info] Position (XRPUSDT/1715703392316): Placing a BUY order @ 0.50650000,
quantity: 394.86000000
[info] Position (XRPUSDT/1715704251578): The BUY order is now partially filled
[info] Position (XRPUSDT/1715704251578): The BUY order is now filled. Placing a
SELL order @ 0.50690000, quantity: 394.86000000
[info] Position (XRPUSDT/1715704251578): The SELL order is now partially filled
[info] Position (XRPUSDT/1715704251578): Trade cycle finished
[info] Position (XRPUSDT/1715704342038): Placing a BUY order @ 0.50680000,
quantity: 394.63000000
The above confirms that our trading strategy works - we have successfully integrated the naive
app. Things will be easier from now on.
23.8 Reintegrating the data_warehouse
app
Inside the new Phoenix application, we will reintegrate the previously named data_warehoure
app into data/collector
and data/publisher
“namespaces”.
We will start by creating a new directory called data
inside the lib/headgehog
directory. Inside the data
directory, we will create a collector
directory. We will update all the subscriber_*
modules to collector_*
.
We will kick off the transition by copying files from the /apps/data_warehouse/lib/data_warehouse/
directory to /lib/hedgehog/
and renaming them as follows:
subscriber_supervisor.ex
->data/collector/collector_supervisor.ex
subscriber/dynamic_supervisor.ex
->data/collector/dynamic_worker_supervisor.ex
subscriber/worker.ex
->data/collector/worker.ex
publisher.ex
->data/publisher.ex
schema/order.ex
->exchange/order.ex
schema/trade_event.ex
->exchange/trade_event.ex
(overwrite)schema/subscriber_settings.ex
->data/collector/settings.ex
schema/subscriber_status_enum.ex
->data/collector/settings_status_enum.ex
../data_warehouse.ex
->data/collector.ex
Now, we will update the new(copied/renamed) files one by one.
23.8.1 exchange/order.ex
We will start with exchange/order.ex
, where we will update the module name:
23.8.2 exchange/trade_event.ex
This file got overwritten by the contents from the data_warehoure
app - we just need to update the module name:
23.8.3 data/publisher.ex
This module uses tons of other modules, including Repo
- we need to update all of these:
# /lib/hedgehog/data/publisher.ex
defmodule Hedgehog.Data.Publisher do
...
alias Hedgehog.Repo # <= added
alias Hedgehog.Exchange.TradeEvent # <= added
...
def start(arg) do # <= renamed the `start_link/1` function
...
end
...
def run(%{
...
}) do
...
Repo.transaction(
fn ->
from(te in TradeEvent,
...
)
|> Repo.stream()
...
end
defp publish_trade_event(%TradeEvent{} = trade_event) do
new_trade_event =
struct(
TradeEvent,
...
)
Phoenix.PubSub.broadcast(
Hedgehog.PubSub,
...
)
...
23.8.4 data/collector.ex
This module was an interface for both collectors and publishers. From now on, it will only cater for collectors:
# /lib/hedgehog/data/collector.ex
defmodule Hedgehog.Data.Collector do
@moduledoc """
Documentation for `Hedgehog.Data.Collector`.
"""
alias Hedgehog.Data.Collector.DynamicWorkerSupervisor
def start_storing(stream, symbol) do
...
|> DynamicWorkerSupervisor.start_worker()
end
def stop_storing(stream, symbol) do
...
|> DynamicWorkerSupervisor.stop_worker()
end
# remove `publish_data/1`
23.8.5 data/collector/collector_supervisor.ex
In the case of the main collector supervisor, we need to update the aliases:
# /lib/hedgehog/data/collector/collector_supervisor.ex
defmodule Hedgehog.Data.Collector.CollectorSupervisor do
...
alias Hedgehog.Data.Collector.DynamicWorkerSupervisor
...
@registry :collector_workers
...
def init(_args) do
children = [
...
{DynamicWorkerSupervisor, []},
{Task,
fn ->
DynamicWorkerSupervisor.autostart_workers()
end}
]
...
23.8.6 /data/collector/dynamic_worker_supervisor.ex
In the case of the dynamic worker supervisor that we renamed, we need to update the module name:
# /lib/hedgehog/data/collector/dynamic_worker_supervisor.ex
defmodule Hedgehog.Data.Collector.DynamicWorkerSupervisor do
...
alias Hedgehog.Repo
alias Hedgehog.Data.Collector.Settings
alias Hedgehog.Data.Collector.Worker
...
@registry :collector_workers
...
def autostart_workers do
Repo.all(
from(s in Settings,
...
defp update_status(topic, status)
when is_binary(topic) and is_binary(status) do
%Settings{
...
23.8.7 /data/collector/settings_status_enum.ex
As in the case of other settings’ status enums - we need to update the module name and field name:
23.8.8 /data/collector/settings.ex
For the settings schema, we need to update module name, aliases, and table name:
23.8.9 /data/collector/worker.ex
The final module from the data_warehouse
app is a collector worker, where we will update the module name and a few references:
# /lib/hedgehog/data/collector/worker.ex
defmodule Hedgehog.Data.Collector.Worker do
...
alias Hedgehog.Exchange.Order # <= added
alias Hedgehog.Exchange.TradeEvent # <= added
alias Hedgehog.Repo # <= added
...
def init(topic) do
Logger.info("Collector worker is subscribing to #{topic}")
Phoenix.PubSub.subscribe(
Hedgehog.PubSub,
...
)
...
def handle_info(%TradeEvent{} = trade_event, state) do
...
struct!(TradeEvent, opts)
|> Repo.insert()
...
def handle_info(%Binance.Order{} = order, state) do
...
struct(Order, data)
|> Map.merge(%{
...
})
|> Repo.insert(
...
...
defp via_tuple(topic) do
{:via, Registry, {:collector_workers, topic}}
This finishes the module updates. We can move on to the other files from the data_warehouse
application.
23.8.10 Supervision tree
When we were copying the modules from the data_warehouse
app, we skipped over its application.ex
module. Looking inside, we can see that it was supervising the DataWarehouse.SubscriberSupervisor
(currently renamed to Hedgehog.Data.Collector.CollectorSupervisor
) - we need to add it to the main supervision tree of our new application:
23.8.11 Migrations
We will copy three migration files across from the data_warehouse
app (the
/apps/data_warehouse/priv/repo/migrations/
directory) to the new Phoenix application (the
priv/repo/migrations/
directory).
In the case of the 20210227230123_create_subscriber_settings.exs
, we need to rename it to
20210227230123_create_collector_settings.exs
.
For all three of the migration files, we need to update module names:
# /priv/repo/migrations/20210222224514_create_trade_events.exs
defmodule Hedgehog.Repo.Migrations.CreateTradeEvents do
# /priv/repo/migrations/20210222224522_create_orders.exs
defmodule Hedgehog.Repo.Migrations.CreateOrders do
for the final file we will also change alias and table name:
# /priv/repo/migrations/20210227230123_create_collector_settings.exs
defmodule Hedgehog.Repo.Migrations.CreateSubscriberSettings do
...
alias Hedgehog.Data.Collector.SettingsStatusEnum
def change do
SettingsStatusEnum.create_type()
create table(:collector_settings, primary_key: false) do
...
add(:status, SettingsStatusEnum.type(), default: "off", null: false)
...
create(unique_index(:collector_settings, [:topic]))
The above changes finish the integration of the data_warehouse
application. We can go ahead and drop the database, set the application again, and confirm that it still works:
$ mix ecto.drop
...
$ mix setup
...
$ mix test --only unit --no-start
...
..........
Finished in 0.1 seconds (0.1s async, 0.00s sync)
17 tests, 0 failures, 6 excluded
$ iex -S mix phx.server
...
iex(1)> alias Hedgehog.Strategy.Naive
...
iex(2)> alias Hedgehog.Streamer.Binance
...
iex(3)> alias Hedgehog.Data.Collector
...
iex(4)> Collector.start_storing("TRADE_EVENTS", "XRPUSDT")
...
iex(5)> Collector.start_storing("ORDERS", "XRPUSDT")
...
iex(6)> Binance.start_streaming("XRPUSDT")
...
iex(7)> Naive.start_trading("XRPUSDT")
...
[debug] QUERY OK source="trade_events" ...
INSERT INTO "trade_events"
...
[debug] QUERY OK source="orders" ...
INSERT INTO "orders"
The above log messages confirm that we are streaming trade events from Binance, placing orders, and storing both in the database.
This finishes the integration of the data_warehouse
application.
23.9 Reintegrating the indicator
app
The final application that we will integrate into our new Phoenix app is indicator
, which luckily has only three files that we will move across.
We will use this opportunity to rename the indicators
to aggregators
.
Let’s start by creating a new directory called aggregator
inside the /lib/hedgehog/data/
directory.
We will copy the /apps/indicator/lib/indicator/ohlc
directory and the
/apps/indicator/lib/indicator/ohlc.ex
file into it.
We will also copy the /apps/indicator/lib/indicator.ex
to the /lib/hedgehog/data
directory and rename it to aggregator.ex
.
Now, we can update each file to fit the new naming convention.
23.9.1 /data/aggregator.ex
Here, we need to update the module name a reference to the worker module:
23.9.2 /data/aggregator/ohlc.ex
Inside the ohlc module we need to update the reference to the trade event and pubsub:
23.9.3 /data/aggregator/ohlc/worker.ex
The final file to update will be the worker module, where we need to update the references to pubsub and the ohlc module:
# /lib/hedgehog/data/aggregator/ohlc/worker.ex
defmodule Hedgehog.Data.Aggregator.Ohlc.Worker do
...
alias Hedgehog.Data.Aggregator.Ohlc # <= added
alias Hedgehog.Exchange.TradeEvent
...
def init(symbol) do
...
Phoenix.PubSub.subscribe(
Hedgehog.PubSub,
...
def handle_info(%TradeEvent{} = trade_event, ohlc) do
{:noreply, Ohlc.process(ohlc, trade_event)}
...
23.9.4 Supervision tree
A quick look at the application.ex
module of the indicator
app will tell us that we need to add a dynamic supervisor to the supervision tree of the new app (we referred to it already as
Hedgehog.Data.Aggregator.DynamicWorkerSupervisor
inside the Hedgehog.Data.Aggregator
as we were updating it):
# /lib/hedgehog/application.ex
def start(_type, _args) do
children = [
...
{DynamicSupervisor,
strategy: :one_for_one, name: Hedgehog.Data.Aggregator.DynamicWorkerSupervisor}
...
The above change finishes integrating the indicator
app and the changes in this chapter, as it was the last application to be merged in.
We can make the last check that everything works by starting the streaming and aggregating:
$ mix ecto.drop
...
$ mix setup
...
$ iex -S mix phx.server
...
iex(1)> alias Hedgehog.Streamer.Binance
...
iex(2)> alias Hedgehog.Data.Aggregator
...
iex(3)> Aggregator.aggregate_ohlcs("XRPUSDT")
...
iex(4)> Binance.start_streaming("XRPUSDT")
...
[debug] Broadcasting OHLC: %Hedgehog.Data.Aggregator.Ohlc{symbol: "XRPUSDT",
start_time: 1717173780, duration: 1, open: "0.51350000", high: "0.51380000",
low: "0.51350000", close: "0.51370000"}
The above log confirms that we have a fully working aggregation that gets broadcasted and could be stored in the database or used inside the strategy.
It was a lot of repetitive copy/moving/renaming - thank you for sticking with me through this. After our migration to Phoenix application we will have a great start into potential deploying, which we will focus on in the upcoming chapter!
[Note] Please remember to run the mix format
to keep things nice and tidy.
The source code for this chapter can be found on GitHub