Chapter 1 Streaming Live Cryptocurrency Prices from Binance WebSocket

In this chapter, we’ll build an Elixir application to stream real-time cryptocurrency prices - a perfect starting point that introduces several core concepts in a practical way. You’ll learn how to manage dependencies, connect to a live data feed, and process incoming information. These are all essential skills for building robust Elixir applications.

1.1 Objectives

  • Create a new umbrella project.
  • Create a supervised application.
  • Connect to the Binance WebSocket with the WebSockex library.
  • Define a %Streamer.TradeEvent{} struct to represent incoming data.
  • Decode incoming JSON events with the Jason library.

1.2 Create a New Umbrella Project

Since we’re starting from scratch, let’s create a new umbrella project:

mix new hedgehog --umbrella

1.3 Create a Supervised Application

Next, we’ll create a new supervised application called streamer inside our umbrella:

cd hedgehog/apps
mix new streamer --sup

Creating the streamer as a supervised application within an umbrella project allows us to enforce strict boundaries between different parts of our logic, such as data streaming and trading. While this isolation makes the system easier to reason about early on, it comes with an architectural tax: every boundary we draw now is one we must manage through explicit configuration and dependency rules. We’re choosing this structure deliberately so you can observe how those costs grow as the system expands.

1.4 Connect to the Binance WebSocket with the WebSockex Library

To connect to the Binance WebSocket stream, we’ll use WebSockex, a popular WebSocket client library that handles the low-level connection details for us. According to the library’s README, the first step is to add :websockex to the deps function in the streamer application’s mix.exs file:

  # /apps/streamer/mix.exs
  defp deps do
    [
      {:websockex, "~> 0.5"}
    ]
  end

After adding a new dependency, we’ll need to fetch it by running mix deps.get from the apps/streamer directory. This command downloads the library’s source code (and any of its own dependencies) into our project’s deps folder.

Now let’s create a module responsible for streaming. We’ll create a new file called binance.ex inside the apps/streamer/lib/streamer directory.

The WebSockex README provides a great starting point. To handle the WebSocket connection, we’ll create a module that uses the WebSockex behavior, which looks like this:

# From the WebSockex README
defmodule WebSocketExample do
  use WebSockex

  def start_link(url, state) do
    WebSockex.start_link(url, __MODULE__, state)
  end

  def handle_frame({type, msg}, state) do
    IO.puts "Received Message - Type: #{inspect type} -- Message: #{inspect msg}"
    {:ok, state}
  end

  def handle_cast({:send, {type, msg} = frame}, state) do
    IO.puts "Sending #{type} frame with payload: #{msg}"
    {:reply, frame, state}
  end
end

We’ll copy this code to our new binance.ex file.

First, let’s update the module name to Streamer.Binance to match our file name:

# /apps/streamer/lib/streamer/binance.ex
defmodule Streamer.Binance do

We can simplify this example right away. The handle_cast/2 callback is for sending messages to the WebSocket server. Since our application will only be receiving data for now, we can remove it.

Next, let’s find the correct URL for the Binance API. Binance provides its WSS (Web Socket Streams) documentation on GitHub.

The “General WSS information” section provides three key details:

  • The base endpoint is: wss://stream.binance.com:9443 (use wss://stream.binance.us:9443 if you’re US-based)
  • Raw streams are accessed at /ws/<streamName>
  • All symbols for streams are lowercase

The full URL for the raw streams we’ll use is wss://stream.binance.com:9443/ws/ followed by the stream name (e.g., <symbol>@trade).

A quick note: Binance uses “raw” to indicate that no data aggregation occurs before broadcasting over WebSocket. This means you receive every single trade as it happens. In a high-volume market like BTC/USDT, this can mean dozens of messages per second. Elixir’s lightweight processes are perfect for this: we can ingest thousands of messages without blocking the rest of our application logic.

Let’s define a module attribute to hold the base stream endpoint:

# /apps/streamer/lib/streamer/binance.ex
@stream_endpoint "wss://stream.binance.com:9443/ws/"

Heading back to the Binance documentation, let’s search for ‘Trade Streams’. In this context, a “trade” is a completed transaction - someone bought and someone sold at a specific price. For our purposes, the “latest price” is simply the price from the most recent trade.

The docs point to the following stream name:

Stream Name: <symbol>@trade

Together, our full URL looks like: wss://stream.binance.com:9443/ws/<symbol>@trade. For example, the raw trade stream URL for the XRPUSDT symbol is: “wss://stream.binance.com:9443/ws/xrpusdt@trade

Heads up: Binance requires the symbol to be lowercase. If you provide an uppercase symbol like “XRPUSDT”, the connection will succeed, but you will not receive any data, and no error will be returned. This can be tricky to debug, so always remember to downcase the symbol!

Back in our editor, let’s modify the start_link/1 function to construct the correct URL:

  # /apps/streamer/lib/streamer/binance.ex
  def start_link(symbol) do
    symbol = String.downcase(symbol)

    WebSockex.start_link(
      "#{@stream_endpoint}#{symbol}@trade",
      __MODULE__,
      nil
    )
  end

Our start_link/1 function now accepts a symbol instead of a full URL. It ensures the symbol is lowercase before combining it with our @stream_endpoint to build the complete URL.

Our trade event streaming is now functional! Let’s test it using iex:

$ iex -S mix
...
iex(1)> Streamer.Binance.start_link("xrpusdt")
{:ok, #PID<0.335.0>}
Received Message - Type: :text -- Message: "{\"e\":\"trade\", \"E\":1603226394741,
  \"s\":\"XRPUSDT\",\"t\":74608889,\"p\":\"0.24373000\",\"q\":\"200.00000000\",
  \"T\":1603226394739,\"m\":true,\"M\":true}"

These messages appear because the code we copied from the WebSockex README uses IO.puts/1 to print all incoming data.

The key takeaway is that WebSockex automatically calls our handle_frame/2 function for each message it receives from the server. This makes it the perfect place to process incoming data.

For reference, our module should currently look like this:

# /apps/streamer/lib/streamer/binance.ex
defmodule Streamer.Binance do
  use WebSockex

  @stream_endpoint "wss://stream.binance.com:9443/ws/"

  def start_link(symbol) do
    symbol = String.downcase(symbol)

    WebSockex.start_link(
      "#{@stream_endpoint}#{symbol}@trade",
      __MODULE__,
      nil
    )
  end

  def handle_frame({type, msg}, state) do
    IO.puts "Received Message - Type: #{inspect type} -- Message: #{inspect msg}"
    {:ok, state}
  end
end

1.5 Decode Incoming JSON Events With the Jason Library

The incoming WebSocket data is a JSON string. To parse it into a usable Elixir map, we’ll use the popular Jason library.

The library’s README shows the standard installation procedure: add :jason to your dependencies, and you’re ready to go.

Let’s open the mix.exs file of the streamer application and append the :jason dependency to the list inside the deps function:

  # /apps/streamer/mix.exs
  defp deps do
    [
      {:jason, "~> 1.4"},
      {:websockex, "~> 0.5"}
    ]
  end

As before, don’t forget to run mix deps.get to fetch the new dependency.

In the Jason documentation, you’ll find functions like encode!/2 and decode!/2. In Elixir, a trailing exclamation mark (!) is a convention indicating that a function will raise an error on failure. While this is useful, it’s better to handle potential errors gracefully when working with external data. Instead of using try/rescue, we’ll look for a function that returns a tagged tuple like {:ok, result} or {:error, reason}. This pattern is more idiomatic in Elixir.

Pro Tip: Don’t Be Afraid to Read the Source!

I highly recommend this kind of code exploration. Let’s look inside the Jason module’s source code, which we can find in our project’s deps/jason/lib/jason.ex file.

If we scroll down in search of decode/2 (without the exclamation mark), we find it defined like this:

  # /deps/jason/lib/jason.ex
  def decode(input, opts \\ []) do
    input = IO.iodata_to_binary(input)
    Decoder.parse(input, format_decode_opts(opts))
  end

This function uses parse/2 from a Decoder module. Let’s scroll back up to see where that module is defined:

# /deps/jason/lib/jason.ex
alias Jason.{Encode, Decoder, DecodeError, EncodeError, Formatter}

We can see Decoder is an alias for the Jason.Decoder module.

Scrolling down to the Jason.Decoder module we’ll find a parse/2 function:

  # /deps/jason/lib/decoder.ex
  def parse(data, opts) when is_binary(data) do
    key_decode = key_decode_function(opts)
    string_decode = string_decode_function(opts)
    float_decode = float_decode_function(opts)
    object_decode = object_decode_function(opts)
    decode = decode(
      keys: key_decode, strings: string_decode,
      objects: object_decode, floats: float_decode
    )
    try do
      value(data, data, 0, [@terminate], decode)
    catch
      {:position, position} ->
        {:error, %DecodeError{position: position, data: data}}
      {:token, token, position} ->
        {:error, %DecodeError{token: token, position: position, data: data}}
    else
      value ->
        {:ok, value}
    end
  end

As you can see, the function returns either {:ok, value} on success or {:error, %DecodeError{...}} on failure.

This side quest shows that Elixir code is often quite readable. Don’t be afraid to dive into the source code when you’re curious about the implementation details. It’s the best way to learn! Better yet, if you spot an opportunity for improvement, consider it a chance to contribute back to the project’s documentation or code!

Now let’s modify the handle_frame/2 function in our Streamer.Binance module to decode incoming JSON messages.

We’ll use Elixir’s built-in Logger to report any parsing failures. To use Logger macros like Logger.error/1, we need to require the module at the top of our file. Let’s add that now:

  # /apps/streamer/lib/streamer/binance.ex
  require Logger

We’ll use a case statement to handle the result from Jason.decode/2: if it’s successful, we’ll process the event; otherwise, we’ll log the error.

Here’s the new version of the handle_frame/2 function:

  # /apps/streamer/lib/streamer/binance.ex
  def handle_frame({_type, msg}, state) do
    case Jason.decode(msg) do
      {:ok, event} -> process_event(event)
      {:error, _} -> Logger.error("Unable to parse msg: #{msg}")
    end

    {:ok, state}
  end

Note that we prefixed type with an underscore (_type). This is an Elixir convention to signal an intentionally unused variable, which prevents a compiler warning.

Before implementing process_event/1, we need a struct to hold the data from the incoming trade events.

Let’s create a new directory called binance in apps/streamer/lib/streamer/ and add a file called trade_event.ex.

This new module will define a struct to hold our trade event data. We’ll use more descriptive field names instead of the abbreviated ones from the JSON payload. We can start by creating a skeleton module:

# /apps/streamer/lib/streamer/binance/trade_event.ex
defmodule Streamer.Binance.TradeEvent do
  defstruct []
end

Binance’s documentation lists these fields:

{
  "e": "trade",     // Event type
  "E": 123456789,   // Event time
  "s": "XRPUSDT",   // Symbol
  "t": 12345,       // Trade ID
  "p": "0.001",     // Price
  "q": "100",       // Quantity
  "T": 123456785,   // Trade time
  "m": true,        // Is the buyer the market maker?
  "M": true         // Ignore
}

Now, let’s define our struct, translating the abbreviated keys from the JSON payload into more descriptive field names.

Update the defstruct in the Streamer.Binance.TradeEvent module as follows:

  # /apps/streamer/lib/streamer/binance/trade_event.ex
  @type t :: %__MODULE__{
          event_type: String.t(),
          event_time: integer(),
          symbol: String.t(),
          trade_id: integer(),
          price: float(),
          quantity: String.t(),
          trade_time: integer(),
          buyer_market_maker: boolean()
        }

  defstruct [
    :event_type,
    :event_time,
    :symbol,
    :trade_id,
    :price,
    :quantity,
    :trade_time,
    :buyer_market_maker
  ]

The @type definition describes what a TradeEvent struct is supposed to look like. Today, it’s mainly used by tools like Dialyzer and by your editor - not by the compiler itself.

This is an important detail to understand. Even though we say price is a float() and symbol is a String.t(), the compiler won’t stop us from doing something silly like passing a string as the price. There are no warnings and no errors - Elixir is still very much dynamically typed here.

So why bother with typespecs at all? Because they still pull their weight. They make the code easier to read, help static analysis tools catch some mistakes, and set us up nicely for future improvements to Elixir’s gradual type system.

When it comes to actual safety, we rely on more traditional Elixir tools: pattern matching (%TradeEvent{}) to make sure we’re dealing with the right struct, and well-defined boundaries between parts of the system. The typespecs don’t enforce correctness, but they do help us reason about the code as it grows.

With our struct defined, let’s return to the Streamer.Binance module to implement the process_event/1 function. We’ll use pattern matching in the function head to ensure we’re only handling events where the “e” key is “trade”. Inside the function, we’ll map the raw event data into our clean %Streamer.Binance.TradeEvent{} struct. The process_event/1 function should now look like this:

  # /apps/streamer/lib/streamer/binance.ex
  defp process_event(%{"e" => "trade"} = event) do
    {trade_price, ""} = Float.parse(event["p"])

    trade_event = %Streamer.Binance.TradeEvent{
      :event_type => event["e"],
      :event_time => event["E"],
      :symbol => event["s"],
      :trade_id => event["t"],
      :price => trade_price,
      :quantity => event["q"],
      :trade_time => event["T"],
      :buyer_market_maker => event["m"]
    }

    Logger.debug(
      "Trade event received " <>
        "#{trade_event.symbol}@#{trade_event.price}"
    )
  end

Two details here are noteworthy:

  • The Float Warning: We use Float.parse/1 here strictly for simplicity. In production financial systems, never use floats - rounding errors can lead to “dust” (tiny, untradeable amounts left in your wallet) or to failed orders due to insufficient funds. In a professional system, you’d use a library like Decimal. We’ll stick with floats for now to keep our initial logic readable.
  • We added a Logger.debug/1 message to see our formatted trade events in the console.

Lastly, before testing our implementation, let’s add a clean, public API to our streamer application. The Streamer.start_streaming/1 function will serve as our public interface, hiding the Streamer.Binance implementation details from the rest of the project.

# /apps/streamer/lib/streamer.ex
defmodule Streamer do
  @moduledoc """
  Documentation for `Streamer`.
  """

  def start_streaming(symbol) do
    Streamer.Binance.start_link(symbol)
  end
end

For reference, the final version of the Streamer.Binance module can be found here.

Now let’s configure the Logger in our main config/config.exs file. We’ll temporarily set the Logger level to :debug so we can see the incoming trade events:

# /config/config.exs
config :logger,
  level: :debug

Architecture Note: We are using the umbrella’s root configuration file for simplicity. While this works perfectly for now, it’s worth noting that as we add more applications to our umbrella, the root config can become a “junk drawer” where settings for different apps get tangled together. This shared configuration is one of the reasons we will eventually consolidate into a single Phoenix application, moving away from the overhead of managing multiple app-specific environments in favor of a simpler, unified deployment.

With our implementation complete, it’s time to see it in action in an iex session:

$ iex -S mix
...
iex(1)> Streamer.start_streaming("xrpusdt")
{:ok, #PID<0.251.0>}
23:14:32.217 [debug] Trade event received XRPUSDT@0.25604000
23:14:33.381 [debug] Trade event received XRPUSDT@0.25604000
23:14:35.380 [debug] Trade event received XRPUSDT@0.25605000
23:14:36.386 [debug] Trade event received XRPUSDT@0.25606000

Our application now establishes a WebSocket connection, receives trade events, decodes the JSON into %Streamer.Binance.TradeEvent{} structs, and logs a formatted message.

Notice that we are starting this process manually. In Elixir, a process without a supervisor is an “orphan” - if the WebSocket connection drops or Binance rate-limits us, our streamer will simply die and stay dead, leaving our bot blind to the market. We’ll address this vulnerability by implementing a proper supervision tree in Phase 2.

Finally, let’s reset the Logger level back to :info to avoid cluttering our console with debug messages.

# /config/config.exs
config :logger,
  level: :info

You’ve just built a real-time data pipeline in Elixir. Take a moment to appreciate what you’ve accomplished: you’ve connected to a live WebSocket stream, parsed incoming events, and structured the data cleanly. This isn’t just a toy example; it’s the core of a serious application, and you built it from scratch. As a final step, remember to run mix format to keep your code nice and tidy.

The source code for this chapter can be found in the book’s source code repository (branch: chapter_01).