Quickstart

Let's get this show on the road

⚠️

Have you gotten your API key yet? If not, grab it here

Creating your first Subscription

  1. Navigate to the Datastreams tab
  2. Click the "New Datastream" button
  3. Choose the data source that you would like to receive data from. For a description of each data source, see Streaming Data Sources.
  4. Use the Query build to choose how to filter what data you would like to receive. (The content of box is considered an "OR" join with each other and each box is considered an "AND" join with the other boxes)
  5. Configure how you would like to receive your data: WebSocket or Webhook. If you choose Webhook, provide a valid endpoint that can receive a POST request. Using a WebSocket is covered in a different section

Connecting to a WebSocket

📘

Now supports the JSONRPC standard

  1. If you have not created a Subscription, please do that first
  2. Go to the Datastreams table and find the stream that you'd like to connect to
  3. Copy the Subscription Id of the stream that you'd like to subscribe to.
  4. Go to the dashboard table and copy your API key from that page, you will need both the Subscription ID and API Key to connect
  5. We will be using wscat for this example, but you can use any WebSocket implementation (see below).
  6. Install wscat if you don't have it already:
npm install -g wscat
  1. Open a websocket connection to our service.
wscat -c wss://kiki-stream.hellomoon.io --auth user:YOUR_API_KEY
  1. At the blank prompt enter:
> { "method": "subscribe", "params": { "subscriptionId": "YOUR_SUBSCRIPTION_ID" }, "jsonrpc": "2.0", "id": "AN_OPTIONAL_UNIQUE_ID" }
  1. You should see the receive the following message:
< "You have successfully subscribed"

Data from your stream will now flow to this socket when it comes through.

**Note: There is a max number of concurrent connections allowed at once**

Additional Websocket Implementations

  const websocket = require("websocket");
  const client = new websocket.client();

  client.on("connect", (connection) => {
    connection.on("message", (message) => {
      if (!message || message.type !== "utf8") return;
      const data = JSON.parse(message.utf8Data);
      if (data === "You have successfully subscribed") {
        return;
      }

      // do logic under here
    });

    connection.sendUTF(
      JSON.stringify({
        method: "subscribe",
        jsonrpc: "2.0",
        params: { subscriptionId: "<your_subscription_id>" },
        requestId: "AN_OPTIONAL_REQUEST_ID",
      })
    );
  });

  client.connect("wss://kiki-stream.hellomoon.io");
  
use url::Url;
use tungstenite::{connect, Message};

fn main() {
    let (mut socket, _response) = connect(
        Url::parse("wss://kiki-stream.hellomoon.io").unwrap()
    ).expect("Can't connect");

    socket.write_message(Message::Text(r#"{
        "method": "authenticate",
        "params": { "apiKey": "<your_api_key>" },
        "jsonrpc": "2.0",
        "id": "AN_OPTIONAL_REQUEST"
    }"#.into()));

    let auth_message = socket.read_message().expect("Error reading message");

    // An example to subscribe to a stream
    socket.write_message(Message::Text(r#"{
        "method": "subscribe",
        "params": { "subscriptionId": "<your_subscription_id>" },
        "jsonrpc": "2.0",
        "id": "AN_OPTIONAL_REQUEST_ID"
    }"#.into()));

    // Example of how to query the REST API
    socket.write_message(Message::Text(r#"{
        "method": "/defi/swaps",
        "params": {  },
        "jsonrpc": "2.0",
        "id": "AN_OPTIONAL_REQUEST_ID_2"
    }"#.into()));

    loop {
        let msg = socket.read_message().expect("Error reading message");
        println!("Received: {}", msg);
    }
}


import asyncio
import websockets

async def subscribe():
    headers = websockets.Headers()
    # The authorization header is a base64 encoded key of ("user:<your_api_key>")
    headers["Authorization"] = "Basic BASE64_ENCODED_KEY"
    async with websockets.connect("wss://kiki-stream.hellomoon.io", extra_headers=headers) as websocket:
        try:
            await websocket.send('{ "jsonrpc": "2.0", "method": "subscribe", "params": { "subscriptionId": "<your_subscription_id>" }, "id": "OPTIONAL_REQUEST_ID" }')
            async for message in websocket:
                print(message)
        except websockets.ConnectionClosed:
            return

async def query():
    headers = websockets.Headers()
    # The authorization header is a base64 encoded key of ("user:<your_api_key>")
    headers["Authorization"] = "Basic BASE64_ENCODED_KEY"
    async with websockets.connect("wss://kiki-stream.hellomoon.io", extra_headers=headers) as websocket:
        try:
            await websocket.send('{ "jsonrpc": "2.0", "method": "/citrus/loan-events", "params": {  }, "id": "OPTIONAL_REQUEST_ID_2" }')
            async for message in websocket:
                print(message)
        except websockets.ConnectionClosed:
            return


asyncio.run(query())

package org.example;

import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;

public class ExampleClient extends WebSocketClient {


    public ExampleClient(URI serverURI, String apiKey) {
        super(serverURI, Collections.singletonMap("Authorization", "Basic " + Base64.getEncoder().encodeToString(("user:" + apiKey).getBytes(StandardCharsets.UTF_8))));
    }


    public void onOpen(ServerHandshake handshakedata) {
    }

    private void _send(String method, String params, String id) {
        send("{ \"method\": \"" + method + "\", \"params\": \"" + params+ "\", \"jsonrpc\": \"2.0\", \"id\": \"" + id + "\" }");
    }

    public void subscribe(String subscriptionId) {
        this._send("subscribe", "{ \"subscriptionId\": \"" + subscriptionId + "\" }", "authentication_id");
    }

    public void query(String method, Object params) {
        this._send(method, "{}", "AN_OPTIONAL_REQUEST_ID");
    }

    @Override
    public void onMessage(String message) {
        System.out.println("received: " + message);
    }

    @Override
    public void onClose(int code, String reason, boolean remote) {
        // The close codes are documented in class org.java_websocket.framing.CloseFrame
        System.out.println(
                "Connection closed by " + (remote ? "remote peer" : "us") + " Code: " + code + " Reason: "
                        + reason);
    }

    @Override
    public void onError(Exception ex) {
        ex.printStackTrace();
        // if the error is fatal then onClose will be called additionally
    }

    public static void main(String[] args) throws URISyntaxException, InterruptedException {
        ExampleClient client = new ExampleClient(new URI(
                "wss://kiki-stream.hellomoon.io"), "<your_api_key>");
        client.connect();
        while (client.isOpen() == false) {
            Thread.sleep(10);
        }
        client.subscribe("<your_subscription_id>");
        client.query("/defi/swaps", "{}");
    }
}
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;

using (var ws = new ClientWebSocket())
{
    await ws.ConnectAsync(new Uri("wss://kiki-stream.hellomoon.io"), CancellationToken.None);
    var buffer = new byte[256];
    var settings = new JsonSerializerOptions {
        PropertyNamingPolicy = JsonNamingPolicy.CamelCase
    };

    var authenticateRequest = JsonSerializer.Serialize(new Request("authenticate") {
        Params = new { ApiKey = "<your_api_key>" },
    }, settings);

    await ws.SendAsync(Encoding.UTF8.GetBytes(authenticateRequest), WebSocketMessageType.Text, true, CancellationToken.None);
    var message = await ws.ReceiveAsync(buffer, CancellationToken.None);

    var subscribeRequest = JsonSerializer.Serialize(new Request("subscribe") {
        Params = new { SubscriptionId = "<your_subscription_id>" },
    }, settings);

    await ws.SendAsync(Encoding.UTF8.GetBytes(subscribeRequest), WebSocketMessageType.Text, true, CancellationToken.None);
    while (ws.State == WebSocketState.Open)
    {
        var result = await ws.ReceiveAsync(buffer, CancellationToken.None);
        if (result.MessageType == WebSocketMessageType.Close)
        {
            await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None);
        }
        else
        {
            Console.WriteLine(Encoding.UTF8.GetString(buffer, 0, result.Count));
        }
    }
}

class Request {
    public Request(string method) {
        Method = method;
    }
    public string Method { get; set; }
    public object? Params { get; set; }
    // An optional Id that helps match requests to responses
    public string? Id { get; set; }
}


Query Websocket for data

The method input is any Rest Api Route that we have. i.e. "/defi/swaps"

The params are any filtering params, they match the schema of that route's Rest Api Params

requestId is an optional id to track the response when it comes back.

  const websocket = require("websocket");
  const client = new websocket.client();

  client.on("connect", (connection) => {
    connection.on("message", (message) => {
      if (!message || message.type !== "utf8") return;
      const data = JSON.parse(message.utf8Data);
      if (data === "You have successfully subscribed") {
        return;
      }

      // do logic under here
    });

    connection.sendUTF(
      JSON.stringify({
        method: "/citrus/loan-events",
        jsonrpc: "2.0",
        params: { lender: "9376CqRajWNVAcVxwb8wGpZj1bDGdXQTxYfhQ5VeFS3N" },
        requestId: "AN_OPTIONAL_REQUEST_ID",
      })
    );
  });

  client.connect("wss://kiki-stream.hellomoon.io");
  

Querying the REST API

Using the API key found in the developer portal under https://www.hellomoon.io/dashboard head over to the API Reference tab.


What’s Next