Newsvibe
Building a News-Aware Algo with Newsvibe API
Every article in the Sentiment and AI section builds toward this one. How Newsvibe Works explains the signal format. The news sentiment strategy articles describe the edge. This tutorial is the implementation: a complete, runnable Python strategy that polls Newsvibe sentiment signals, evaluates entry conditions, executes via Alpaca, and logs every trade with its triggering signal for post-trade analysis.
Nothing in this tutorial is theoretical. The code runs. The structure is production-appropriate, not toy-grade.
Prerequisites
Before starting, confirm the following are in place:
- Python 3.11 or higher — f-string improvements and better type annotation support used throughout
- alpaca-py — the official Alpaca Python SDK (
pip install alpaca-py) - requests — HTTP client for Newsvibe API calls (
pip install requests) - pandas — trade log management and data handling (
pip install pandas) - A Newsvibe API key — available at oyamori.com; required to call the sentiment endpoint
- An Alpaca paper trading account — register at alpaca.markets; paper accounts are free and allow full strategy testing against live market data without real capital at risk
The tutorial uses paper trading throughout. The switch to live trading requires a single parameter change (paper=False) in the TradingClient initialization — but only after the strategy has been validated on paper across multiple market conditions.
If you have not yet set up a Python trading environment, start with the Trading Development Environment Setup guide. For a walkthrough of the Alpaca API fundamentals and your first live trade from the CLI, those articles provide the groundwork this tutorial builds on.
Store credentials as environment variables, never in source code:
export NEWSVIBE_API_KEY="your-newsvibe-key"
export ALPACA_API_KEY="your-alpaca-key"
export ALPACA_SECRET_KEY="your-alpaca-secret"
Authenticating with Newsvibe API
The Newsvibe API uses Bearer token authentication. Every request includes an Authorization header with your API key. The base URL is https://api.newsvibe.io/v1.
import os
import requests
NEWSVIBE_API_KEY = os.environ['NEWSVIBE_API_KEY']
NEWSVIBE_BASE_URL = "https://api.newsvibe.io/v1"
def newsvibe_get(path: str, params: dict = None) -> dict:
"""Make an authenticated GET request to the Newsvibe API."""
url = f"{NEWSVIBE_BASE_URL}{path}"
headers = {"Authorization": f"Bearer {NEWSVIBE_API_KEY}"}
response = requests.get(url, headers=headers, params=params or {}, timeout=10)
response.raise_for_status()
return response.json()
The raise_for_status() call converts any 4xx or 5xx HTTP response into a Python exception, which the calling code must handle. Do not silently swallow API errors — a failed authentication or a rate limit response should stop the strategy loop, not produce a spurious no-signal result.
Rate limits: The Newsvibe API enforces rate limits per API key. The standard tier allows 60 requests per minute. At one poll per ticker every 60 seconds for a single-ticker strategy, you will not approach this limit. Multi-ticker architectures (covered at the end of this article) need to account for the combined request rate across all tickers.
Error handling pattern for production:
from requests.exceptions import HTTPError, Timeout, ConnectionError
def safe_newsvibe_get(path: str, params: dict = None) -> dict | None:
"""Newsvibe API call with graceful error handling."""
try:
return newsvibe_get(path, params)
except HTTPError as e:
if e.response.status_code == 401:
raise RuntimeError("Newsvibe API key invalid or expired") from e
if e.response.status_code == 429:
print("Rate limit hit — backing off 60 seconds")
return None
if e.response.status_code == 404:
return None # Ticker not covered by Newsvibe
raise
except (Timeout, ConnectionError) as e:
print(f"Network error calling Newsvibe: {e}")
return None
A 429 (rate limit) should trigger a backoff. A 404 for a specific ticker means Newsvibe does not cover that symbol — remove it from your ticker list. Any other 4xx is an error in your request; any 5xx is a server issue on Newsvibe's end.
Subscribing to a Sentiment Feed
The single-ticker sentiment endpoint returns the most recent scored signal for a given symbol. In a polling architecture, you call this endpoint on a schedule and compare the latest signal to any previous signal to detect changes.
def get_sentiment(ticker: str) -> dict | None:
"""
Fetch the current sentiment signal for a ticker.
Returns a dict with keys:
ticker (str) — the symbol queried
score (float) — sentiment score, -1.0 (bearish) to 1.0 (bullish)
tier (int) — signal strength tier, 1 (weak) to 5 (strong)
urgency (str) — 'low' | 'medium' | 'high'
confidence (float) — model confidence, 0.0 to 1.0
headline (str) — the driving headline or event summary
source (str) — news source identifier
timestamp (str) — ISO 8601 UTC timestamp of the triggering event
"""
return safe_newsvibe_get(f"/signal/{ticker}")
The full payload is documented in How Newsvibe Works. The fields this strategy uses directly:
score: the numeric sentiment value. Positive means bullish news, negative means bearish. A score of 0.0 means no clear directional signal.tier: signal strength on a 1–5 scale. Tier 1 is a weak signal from a minor source. Tier 5 is a high-confidence signal from a primary source with direct market relevance.urgency: categorical.highurgency signals typically carry time pressure — the news is recent, material, and likely to move price quickly.lowurgency signals have longer expected impact windows.confidence: the model's internal estimate of how reliably the signal score reflects actual sentiment. High score with low confidence = the model disagrees with itself. Treat confidence below 0.6 as a filter condition, not a trading signal.timestamp: critical for preventing stale signal entries. A signal timestamped 4 hours ago is not an actionable intraday signal.
Defining Entry and Exit Logic
The entry conditions translate the signal payload into a binary trading decision. This strategy uses a conservative threshold set appropriate for paper trading validation:
from datetime import datetime, timezone, timedelta
def is_signal_fresh(timestamp: str, max_age_minutes: int = 30) -> bool:
"""Check that the signal was generated within the last N minutes."""
signal_time = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
age = datetime.now(timezone.utc) - signal_time
return age <= timedelta(minutes=max_age_minutes)
def should_enter_long(signal: dict) -> bool:
"""
Entry condition for a long position.
Requires: strong positive score, high urgency, tier 3+, high confidence, and fresh signal.
"""
return (
signal.get('score', 0) >= 0.5 and
signal.get('urgency') == 'high' and
signal.get('tier', 0) >= 3 and
signal.get('confidence', 0) >= 0.7 and
is_signal_fresh(signal.get('timestamp', '2000-01-01T00:00:00Z'))
)
def should_enter_short(signal: dict) -> bool:
"""
Entry condition for a short position.
Requires: strong negative score, high urgency, tier 3+, high confidence, and fresh signal.
"""
return (
signal.get('score', 0) <= -0.5 and
signal.get('urgency') == 'high' and
signal.get('tier', 0) >= 3 and
signal.get('confidence', 0) >= 0.7 and
is_signal_fresh(signal.get('timestamp', '2000-01-01T00:00:00Z'))
)
These thresholds are conservative by design. Score threshold of 0.5 (rather than 0.3) reduces signal noise. Tier 3+ filters out minor sources. Confidence 0.7+ ensures the model itself is not ambivalent. The freshness check prevents acting on signals from earlier sessions that happen to still be the most recent for a low-volume news ticker.
Exit logic operates on two conditions:
-
Signal reversal: if the signal score for an open long position crosses below 0, or an open short position crosses above 0, close the position. The news narrative has changed direction.
-
Time-based exit: close any position held longer than N minutes regardless of signal state. Sentiment signals are short-window — the market typically prices in news within 2–6 hours. Holding past that window means holding on price momentum alone, not on the original signal thesis.
def should_exit_long(signal: dict, entry_time: datetime, max_hold_minutes: int = 120) -> bool:
"""Exit a long position on signal reversal or time limit."""
signal_reversed = signal.get('score', 0) < 0
time_expired = (datetime.now(timezone.utc) - entry_time) > timedelta(minutes=max_hold_minutes)
return signal_reversed or time_expired
def should_exit_short(signal: dict, entry_time: datetime, max_hold_minutes: int = 120) -> bool:
"""Exit a short position on signal reversal or time limit."""
signal_reversed = signal.get('score', 0) > 0
time_expired = (datetime.now(timezone.utc) - entry_time) > timedelta(minutes=max_hold_minutes)
return signal_reversed or time_expired
Wiring to Alpaca Execution
The Alpaca trading client handles order submission. Initialize it once at the module level — creating a new client per trade adds unnecessary latency.
from alpaca.trading.client import TradingClient
from alpaca.trading.requests import MarketOrderRequest, GetOrdersRequest
from alpaca.trading.enums import OrderSide, TimeInForce, QueryOrderStatus
from alpaca.data.historical import StockHistoricalDataClient
from alpaca.data.requests import StockLatestBarRequest
ALPACA_KEY = os.environ['ALPACA_API_KEY']
ALPACA_SECRET = os.environ['ALPACA_SECRET_KEY']
trading_client = TradingClient(ALPACA_KEY, ALPACA_SECRET, paper=True)
data_client = StockHistoricalDataClient(ALPACA_KEY, ALPACA_SECRET)
The position size for news-based trades should be ATR-adjusted: size to risk a fixed dollar amount based on the stock's recent volatility. A $1,000 risk budget on a stock with a $2.50 ATR means 400 shares. This prevents over-sizing on low-volatility names and under-sizing on high-volatility names.
def get_atr(ticker: str, period: int = 14) -> float:
"""Fetch recent price data and compute ATR."""
import pandas as pd
from alpaca.data.requests import StockBarsRequest
from alpaca.data.timeframe import TimeFrame
from datetime import date, timedelta
end = date.today()
start = end - timedelta(days=period + 5)
req = StockBarsRequest(
symbol_or_symbols=ticker,
timeframe=TimeFrame.Day,
start=str(start),
end=str(end)
)
bars = data_client.get_stock_bars(req).df
if bars.empty or len(bars) < period:
return 1.0 # Fallback to avoid zero division
bars = bars.reset_index()
bars['tr'] = bars.apply(
lambda r: max(r['high'] - r['low'], 1.0), axis=1
)
return float(bars['tr'].tail(period).mean())
def compute_position_size(ticker: str, risk_per_trade: float = 500.0) -> int:
"""
Compute share quantity using ATR-based position sizing.
Risk per trade / ATR = number of shares.
Minimum 1, maximum 1000 shares (hard cap for safety).
"""
atr = get_atr(ticker)
raw_size = risk_per_trade / max(atr, 0.01)
return max(1, min(int(raw_size), 1000))
Placing the order:
def place_market_order(ticker: str, qty: int, side: OrderSide):
"""Submit a market order via Alpaca. Returns the Order object."""
req = MarketOrderRequest(
symbol=ticker,
qty=qty,
side=side,
time_in_force=TimeInForce.DAY
)
return trading_client.submit_order(req)
Check buying power before placing any long order. For short positions, check that the account is margin-enabled:
def check_buying_power(min_required: float = 1000.0) -> bool:
"""Confirm sufficient buying power before entering a position."""
account = trading_client.get_account()
return float(account.buying_power) >= min_required
def is_already_positioned(ticker: str) -> bool:
"""Return True if the account already holds a position in this ticker."""
try:
pos = trading_client.get_open_position(ticker)
return pos is not None
except Exception:
return False
The is_already_positioned check prevents doubling into an existing position on a repeated high-urgency signal.
Logging Trades with Sentiment Context
Trade logging that captures the triggering signal is what separates a strategy that can be analyzed from one that is a black box. Every entry and exit should record the full signal state at the time of the decision.
import pandas as pd
from pathlib import Path
TRADE_LOG_PATH = Path("trade_log.csv")
def log_trade(
ticker: str,
order,
signal: dict,
event_type: str = "entry" # "entry" | "exit"
):
"""
Append a trade record to the trade log CSV.
Each row contains the Alpaca order details and the full Newsvibe signal payload.
"""
record = {
'event_type': event_type,
'ticker': ticker,
'order_id': str(order.id),
'side': str(order.side),
'qty': str(order.qty),
'submitted_at': str(order.submitted_at),
'sentiment_score': signal.get('score'),
'tier': signal.get('tier'),
'urgency': signal.get('urgency'),
'confidence': signal.get('confidence'),
'headline': signal.get('headline', ''),
'signal_timestamp': signal.get('timestamp'),
}
df = pd.DataFrame([record])
df.to_csv(
TRADE_LOG_PATH,
mode='a',
header=not TRADE_LOG_PATH.exists(),
index=False
)
The header=not TRADE_LOG_PATH.exists() pattern writes the header row only on the first write, then appends without repeating it. After a week of trading, you can load the log and group by tier or urgency to see which signal quality bands are actually generating positive returns — and tighten your entry filters accordingly.
Running the Complete Strategy
With all components defined, the main strategy loop is:
import time
from alpaca.trading.enums import OrderSide
def run_strategy(
ticker: str,
position_size: int = None,
poll_interval_seconds: int = 60,
risk_per_trade: float = 500.0
):
"""
Main strategy loop for a single ticker.
Polls Newsvibe every poll_interval_seconds, enters/exits based on signal conditions.
"""
open_position = None # dict: {side: OrderSide, entry_time: datetime, qty: int}
print(f"Starting news-aware strategy for {ticker}")
while True:
signal = get_sentiment(ticker)
if signal is None:
print(f"No signal returned for {ticker} — skipping cycle")
time.sleep(poll_interval_seconds)
continue
# Exit logic — check open positions first
if open_position is not None:
if open_position['side'] == OrderSide.BUY and should_exit_long(signal, open_position['entry_time']):
order = place_market_order(ticker, open_position['qty'], OrderSide.SELL)
log_trade(ticker, order, signal, event_type="exit")
print(f"Exited long {ticker} — signal score: {signal.get('score')}")
open_position = None
elif open_position['side'] == OrderSide.SELL and should_exit_short(signal, open_position['entry_time']):
order = place_market_order(ticker, open_position['qty'], OrderSide.BUY)
log_trade(ticker, order, signal, event_type="exit")
print(f"Exited short {ticker} — signal score: {signal.get('score')}")
open_position = None
# Entry logic — only when no open position
if open_position is None:
qty = position_size or compute_position_size(ticker, risk_per_trade)
if should_enter_long(signal) and check_buying_power(qty * 10):
order = place_market_order(ticker, qty, OrderSide.BUY)
log_trade(ticker, order, signal, event_type="entry")
open_position = {
'side': OrderSide.BUY,
'entry_time': datetime.now(timezone.utc),
'qty': qty
}
print(f"Entered long {ticker} — score: {signal.get('score')}, tier: {signal.get('tier')}")
elif should_enter_short(signal) and check_buying_power(qty * 10):
order = place_market_order(ticker, qty, OrderSide.SELL)
log_trade(ticker, order, signal, event_type="entry")
open_position = {
'side': OrderSide.SELL,
'entry_time': datetime.now(timezone.utc),
'qty': qty
}
print(f"Entered short {ticker} — score: {signal.get('score')}, tier: {signal.get('tier')}")
time.sleep(poll_interval_seconds)
if __name__ == "__main__":
run_strategy("AAPL", risk_per_trade=500.0, poll_interval_seconds=60)
Run this as python strategy.py. The loop runs until you terminate it with Ctrl+C. Before running against a ticker with real paper account activity, verify your Alpaca paper account is funded (typically $100,000 by default on new paper accounts) and that the ticker is tradeable.
One important operational note: this strategy polls on a fixed interval and does not use websocket streaming. For low-frequency strategies (signals with urgency medium or low), polling every 60 seconds is adequate. For high urgency signals where timing matters, consider shortening the poll interval to 15–30 seconds — but watch the rate limit.
Extending to Multi-Ticker
The single-ticker loop above extends naturally to multiple tickers with two modifications: a dictionary tracking open positions per ticker, and a rate-limit-aware polling schedule.
def run_multi_ticker_strategy(
tickers: list[str],
risk_per_trade: float = 500.0,
max_open_positions: int = 3,
poll_interval_seconds: int = 90
):
"""
Multi-ticker news-aware strategy.
Polls each ticker in rotation, respects position limits.
"""
open_positions: dict = {} # {ticker: {side, entry_time, qty}}
while True:
for ticker in tickers:
# Portfolio-level position limit
if len(open_positions) >= max_open_positions and ticker not in open_positions:
continue
signal = get_sentiment(ticker)
if signal is None:
continue
# Exit check
if ticker in open_positions:
pos = open_positions[ticker]
if pos['side'] == OrderSide.BUY and should_exit_long(signal, pos['entry_time']):
order = place_market_order(ticker, pos['qty'], OrderSide.SELL)
log_trade(ticker, order, signal, event_type="exit")
del open_positions[ticker]
elif pos['side'] == OrderSide.SELL and should_exit_short(signal, pos['entry_time']):
order = place_market_order(ticker, pos['qty'], OrderSide.BUY)
log_trade(ticker, order, signal, event_type="exit")
del open_positions[ticker]
# Entry check
elif len(open_positions) < max_open_positions:
qty = compute_position_size(ticker, risk_per_trade)
if should_enter_long(signal) and check_buying_power(qty * 10):
order = place_market_order(ticker, qty, OrderSide.BUY)
log_trade(ticker, order, signal, event_type="entry")
open_positions[ticker] = {
'side': OrderSide.BUY,
'entry_time': datetime.now(timezone.utc),
'qty': qty
}
elif should_enter_short(signal) and check_buying_power(qty * 10):
order = place_market_order(ticker, qty, OrderSide.SELL)
log_trade(ticker, order, signal, event_type="entry")
open_positions[ticker] = {
'side': OrderSide.SELL,
'entry_time': datetime.now(timezone.utc),
'qty': qty
}
time.sleep(1) # Brief pause between tickers to stay within rate limits
time.sleep(poll_interval_seconds)
The max_open_positions parameter is the portfolio-level position cap. Three simultaneous positions is a reasonable starting point — enough to see multi-ticker behavior, few enough that an adverse move on all three simultaneously does not blow through the account. Adjust after you have reviewed trade log data across at least 30 trading days.
The Oyamori Approach
This tutorial covers one implementation path. The Oyamori platform provides the signal infrastructure — Newsvibe API access, the scoring model, and the validated signal format — so that traders building on it start from a verified signal rather than a scraped or inferred one.
The implementation decisions in this tutorial are defaults, not prescriptions. Entry thresholds, position sizing formulas, poll intervals, and max hold times are all parameters. The correct values for any specific ticker or market regime come from post-trade analysis of the trade log — the log structure here is designed specifically to support that analysis.
The extension paths are incremental: improve signal filters based on log analysis, add stop-loss orders to Alpaca alongside the market entry, incorporate the portfolio risk tracker to monitor combined exposure across concurrent strategy runs, and test against more tickers to identify which Newsvibe-covered symbols respond most reliably to sentiment signals in your strategy's holding window.
Every production trading system started as a loop like this one.
Next: How Newsvibe Works →