Chapter 1 Stream live cryptocurrency prices from the Binance WSS
1.1 Objectives
- create a new umbrella app
- create a supervised application inside an umbrella
- connect to Binance’s WebSocket Stream using the WebSockex module
- define a TradeEvent struct that will hold incoming data
- decode incoming events using the Jason module
1.2 Create a new umbrella app
As we are starting from scratch, we need to create a new umbrella project:
1.3 Create a supervised application inside an umbrella
We can now proceed with creating a new supervised application called streamer
inside our umbrella:
1.4 Connect to Binance’s WebSocket Stream using the WebSockex module
To establish a connection to Binance API’s stream, we will need to use a WebSocket client. The module that we will use is called WebSockex. Scrolling down to the Installation
section inside the module’s readme on Github, we are instructed what dependency we need to add to our project.
We will append :websockex
to the deps
function inside the mix.exs
file of the streamer
application:
As we added a dependency to our project, we need to fetch it using mix deps.get
.
We can now progress with creating a module that will be responsible for streaming. We will create a new file called binance.ex
inside the apps/streamer/lib/streamer
directory.
From the readme of WebSockex module, we can see that to use it we need to create a module that will implement the WebSockex
behavior:
# WebSockex's readme
defmodule WebSocketExample do
use WebSockex
def start_link(url, state) do
WebSockex.start_link(url, __MODULE__, state)
end
def handle_frame({type, msg}, state) do
IO.puts "Received Message - Type: #{inspect type} -- Message: #{inspect msg}"
{:ok, state}
end
def handle_cast({:send, {type, msg} = frame}, state) do
IO.puts "Sending #{type} frame with payload: #{msg}"
{:reply, frame, state}
end
end
We will copy the whole code above across to our new binance.ex
file.
The first step will be to update the module name to match our file name:
In the spirit of keeping things tidy - we will now remove the handle_cast/2
function (the last function in our module) as we won’t be sending any messages back to Binance via WebSocket (to place orders etc - Binance provides a REST API which we will use in the next chapter).
Next, let’s look up what URL should we use to connect to Binance’s API. Binance has a separate WSS (Web Socket Streams) documentation at Github.
Scrolling down we can see the General WSS information
section where 3 important pieces of information are listed:
- The base endpoint is:
wss://stream.binance.com:9443
- Raw streams are accessed at
/ws/<streamName>
- All symbols for streams are lowercase
We can see that the full endpoint for raw streams(we will be using a “raw” stream) will be wss://stream.binance.com:9443/ws/
with stream name at the end (together with lowercased symbol).
Note: In the context of Binance API, “raw” means that no aggregation was performed before broadcasting the data on WebSocket.
Let’s introduce a module attribute that will hold the full raw stream endpoint which will be used across the module:
Now back in Binance’s WSS documentation we need to search for “Trade Streams”. “trade” in the context of this documentation means an exchange of assets(coins/tokens) by two sides (buyer and seller). Our future trading strategy will be interested in the “latest price” which is simply the last trade event’s price.
We can see that docs are pointing to the following stream name:
Stream Name: <symbol>@trade
Together, our full URL looks like: “wss://stream.binance.com:9443/ws/
Back to the IDE, we will now modify the start_link/2
function to use Binance API’s URL:
# /apps/streamer/lib/streamer/binance.ex
def start_link(symbol) do
symbol = String.downcase(symbol)
WebSockex.start_link(
"#{@stream_endpoint}#{symbol}@trade",
__MODULE__,
nil
)
end
Instead of passing an URL, we modified the function to accept a symbol
, downcase it and use it together with the module’s @stream_endpoint
attribute to build a full URL.
At this moment streaming of trade events already works which we can test using iex
:
$ iex -S mix
...
iex(1)> Streamer.Binance.start_link("xrpusdt")
{:ok, #PID<0.335.0>}
Received Message - Type: :text -- Message: "{\"e\":\"trade\", \"E\":1603226394741,
\"s\":\"XRPUSDT\",\"t\":74608889,\"p\":\"0.24373000\",\"q\":\"200.00000000\",
\"b\":948244411,\"a\":948244502,\"T\":1603226394739,\"m\":true,\"M\":true}"
We can see the messages logged above because we copied the sample implementation from WebSockex’s readme where handle_frame/2
function uses IO.puts/1
to print out all incoming data. The lesson here is that every incoming message from Binance will cause the handle_frame/2
callback to be called with the message and the process’ state.
Just for reference, our module should look currently as follows:
# /apps/streamer/lib/streamer/binance.ex
defmodule Streamer.Binance do
use WebSockex
@stream_endpoint "wss://stream.binance.com:9443/ws/"
def start_link(symbol) do
symbol = String.downcase(symbol)
WebSockex.start_link(
"#{@stream_endpoint}#{symbol}@trade",
__MODULE__,
nil
)
end
def handle_frame({type, msg}, state) do
IO.puts "Received Message - Type: #{inspect type} -- Message: #{inspect msg}"
{:ok, state}
end
end
1.5 Decode incoming events using the Jason module
Currently, all incoming data from WebSocket is encoded as a JSON. To decode JSON we will use the jason module.
Scrolling down to the Installation
section inside the module’s readme, we can see that we need to add it to the dependencies and we can start to use it right away.
Let’s open the mix.exs
file of the streamer
application and append the :jason
dependency to the list inside deps
function:
As previously, don’t forget to run mix deps.get
to fetch the new dependency.
Looking through the documentation of the Jason module we can see encode!/2
and decode!/2
functions, both of them have exclamation marks which indicate that they will throw an error whenever they will be unable to successfully encode or decode the passed value.
This is less than perfect for our use case as we would like to handle those errors in our own way(technically we could just use try/rescue but as we will find out both encode/2
and decode/2
are available).
We will go a little bit off-topic but I would highly recommend those sorts of journeys around somebody’s code. Let’s look inside the Jason module. Scrolling down in search of decode/2
(without the exclamation mark) we can see it about line 54:
# /lib/jason.ex
def decode(input, opts \\ []) do
input = IO.iodata_to_binary(input)
Decoder.parse(input, format_decode_opts(opts))
end
It looks like it uses the parse/2
function of a Decoder
module, let’s scroll back up and check where it’s coming from. At line 6:
we can see that Decoder
is an alias of the Jason.Decoder
. Scrolling down to the Jason.Decoder
module we will find a parse/2
function about line 43:
# /lib/decoder.ex
def parse(data, opts) when is_binary(data) do
key_decode = key_decode_function(opts)
string_decode = string_decode_function(opts)
try do
value(data, data, 0, [@terminate], key_decode, string_decode)
catch
{:position, position} ->
{:error, %DecodeError{position: position, data: data}}
{:token, token, position} ->
{:error, %DecodeError{token: token, position: position, data: data}}
else
value ->
{:ok, value}
end
end
Based on the result of decoding it will either return {:ok, value}
or {:error, %Decode.Error{...}}
we can confirm that by digging through documentation of the module on the hexdocs.
Once again, the point of this lengthy investigation was to show that Elixir code is readable and easy to understand so don’t be thrown off when documentation is a little bit light, quite opposite, contribute to docs and code as you gain a better understanding of the codebase.
We can now get back to our Streamer.Binance
module and modify the handle_frame/2
function to decode the incoming JSON message. Based on the result of Jason.decode/2
we will either call the process_event/2
function or log an error. Here’s the new version of the handle_frame/2
function:
# /apps/streamer/lib/streamer/binance.ex
def handle_frame({_type, msg}, state) do
case Jason.decode(msg) do
{:ok, event} -> process_event(event)
{:error, _} -> Logger.error("Unable to parse msg: #{msg}")
end
{:ok, state}
end
Please make note that type
is now prefixed with an underscore as we aren’t using it at the moment.
The second important thing to note is that we are using Logger
so it needs to be require
d at the beginning of the module:
Before implementing the process_event/2
function we need to create a structure that will hold the incoming trade event’s data.
Let’s create a new directory called binance
inside the apps/streamer/lib/streamer/
and a new file called trade_event.ex
inside it.
Our new module will hold all the trade event’s information but we will also use readable field names(you will see the incoming data below). We can start by writing a skeleton module code:
# /apps/streamer/lib/streamer/binance/trade_event.ex
defmodule Streamer.Binance.TradeEvent do
defstruct []
end
We can refer to Binance’s docs to get a list of fields:
{
"e": "trade", // Event type
"E": 123456789, // Event time
"s": "BNBUSDT", // Symbol
"t": 12345, // Trade ID
"p": "0.001", // Price
"q": "100", // Quantity
"b": 88, // Buyer order ID
"a": 50, // Seller order ID
"T": 123456785, // Trade time
"m": true, // Is the buyer the market maker?
"M": true // Ignore
}
Let’s copy them across and convert the comments to update the defstruct
inside the
Streamer.Binance.TradeEvent
module’s struct to following:
# /apps/streamer/lib/streamer/binance/trade_event.ex
defstruct [
:event_type,
:event_time,
:symbol,
:trade_id,
:price,
:quantity,
:buyer_order_id,
:seller_order_id,
:trade_time,
:buyer_market_maker
]
That’s all for this struct, we can now get back to implementing the process_event/2
function inside the Streamer.Binance
module. We will map every field of the response map to the %Streamer.Binance.TradeEvent
struct. A useful trick here would be to copy the list of fields once again from the struct and assign the incoming fields one by one.
Inside the header of the function, we will pattern match on event type(a field called “e” in the message) to confirm that indeed we received a trade event). In the end, the process_event/2
function should look as follows:
# /apps/streamer/lib/streamer/binance.ex
defp process_event(%{"e" => "trade"} = event) do
trade_event = %Streamer.Binance.TradeEvent{
:event_type => event["e"],
:event_time => event["E"],
:symbol => event["s"],
:trade_id => event["t"],
:price => event["p"],
:quantity => event["q"],
:buyer_order_id => event["b"],
:seller_order_id => event["a"],
:trade_time => event["T"],
:buyer_market_maker => event["m"]
}
Logger.debug(
"Trade event received " <>
"#{trade_event.symbol}@#{trade_event.price}"
)
end
We added the Logger.debug/2
function to be able to see logs of incoming trade events.
Lastly, before testing our implementation, let’s add a nice interface to our streamer
application that allows starting streaming:
# /apps/streamer/lib/streamer.ex
defmodule Streamer do
@moduledoc """
Documentation for `Streamer`.
"""
def start_streaming(symbol) do
Streamer.Binance.start_link(symbol)
end
end
The final version of the Streamer.Binance
module should look like this.
The last step will be to add the Logger
configuration into the main config/config.exs
file. We will set the Logger
level to :debug
for a moment to be able to see incoming trade events:
This finishes the implementation part of this chapter, we can now give our implementation a whirl using iex
:
$ iex -S mix
...
iex(1)> Streamer.start_streaming("xrpusdt")
{:ok, #PID<0.251.0>}
23:14:32.217 [debug] Trade event received XRPUSDT@0.25604000
23:14:33.381 [debug] Trade event received XRPUSDT@0.25604000
23:14:35.380 [debug] Trade event received XRPUSDT@0.25605000
23:14:36.386 [debug] Trade event received XRPUSDT@0.25606000
As we can see, the streamer is establishing a WebSocket connection with Binance’s API and its receiving trade events. It decodes them from JSON to %Streamer.Binance.TradeEvent
struct and logs a compiled message. Also, our interface hides implementation details from the “user” of our application.
We will now flip the Logger
level back to info
so the output won’t every incoming trade event:
[Note] Please remember to run the mix format
to keep things nice and tidy.
The source code for this chapter can be found on GitHub