""" =============================================================================== DYEVERSE DIGITAL ASSETS LLC — Sovereign V4 Crypto Ensemble Router ───────────────────────────────────────────────────────────────── V4.1 — Live tick ingest wired (DexScreener + Birdeye + Helius WS) Advanced Solana screener with Jupiter route validation Screener dynamically feeds new pairs into TickIngester Strategies: 1. Avellaneda-Stoikov AMM Mean-Reversion Maker 2. Hodrick-Prescott Filter Momentum 3. Statistical Arbitrage Pairs Trading 4. KNN Machine Learning Predictor Risk: $100 daily cap, $1 base size, 60s per-ticker mutex, kill switch. Execution: Solana Web3 direct (Jupiter V6). Paper mode until key loaded. =============================================================================== """ from __future__ import annotations import asyncio import logging import math import time import json import os from typing import Dict, List, Optional import numpy as np from core.risk.global_risk import GlobalRiskManager from core.execution.web3_signer import SolanaWeb3Signer from core.strategies.avellaneda_maker import AS_MarketMaker from core.strategies.momentum_hp import HPMomentumFilter from core.strategies.stat_arb import PairsTrader from core.strategies.knn_predictor import KNNPredictor from core.strategies.rl_agent import RL_MarketMaker # New V4.1 modules from tick_ingest import TickIngester from solana_screener import SolanaScreener, CandidateToken from rl_recorder import RLRecorder from core.execution.position_tracker import PositionTracker logger = logging.getLogger(__name__) # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.StreamHandler(), logging.FileHandler("/opt/dyeverse/crypto_agency/logs/sovereign_v4.log"), ], ) TAPE_PATH = "/opt/dyeverse/crypto_agency/data/ticks/tape.jsonl" SCREENER_RESULTS = "/opt/dyeverse/crypto_agency/data/screener_results.json" CONFLUENCE_THRESHOLD = 0.35 class SovereignEnsembleBot: """V4.1 Ensemble Router: live ticks + dynamic screener + multi-strategy confluence.""" def __init__(self, config: dict | None = None): config = config or {} # ── Risk manager ────────────────────────────────────────────────────── self.risk_manager = GlobalRiskManager( daily_cap = config.get("daily_cap", 100.0), base_size = config.get("base_size", 1.0), cooldown_s = config.get("cooldown_s", 60.0), max_orders_per_minute = config.get("max_orders_per_minute", 30), mode = config.get("mode", "paper"), ) # ── Execution engine (paper until SOLANA_PRIVATE_KEY loaded) ───────── self.executor = SolanaWeb3Signer( env_path=config.get("env_path", "/opt/dyeverse/crypto_agency/.env") ) # ── Position tracker ────────────────────────────────────────────────── self.positions = PositionTracker() # ── Strategy modules ────────────────────────────────────────────────── self.strategies: Dict[str, object] = { "as_maker": AS_MarketMaker( gamma = config.get("as_gamma", 0.1), k = config.get("as_k", 1.5), tau = config.get("as_tau", 300.0), ), "hp_momentum": HPMomentumFilter( hp_lambda = config.get("hp_lambda", 100_000), slope_threshold = config.get("hp_slope_threshold", 0.001), ), "stat_arb": PairsTrader( entry_z = config.get("stat_arb_entry_z", 2.5), exit_z = config.get("stat_arb_exit_z", 0.5), stop_z = config.get("stat_arb_stop_z", 6.0), min_obs = config.get("stat_arb_min_obs", 100), ), "knn": KNNPredictor( k = config.get("knn_k", 7), min_samples = config.get("knn_min_samples", 100), ), "rl_agent": RL_MarketMaker( min_conviction = config.get("rl_min_conviction", 0.40), ), } self.strategy_weights: Dict[str, float] = config.get("strategy_weights", { "as_maker": 1.0, "hp_momentum": 1.0, "stat_arb": 1.2, "knn": 0.8, "rl_agent": 0.0, # SHADOW MODE: signals logged but zero weight "as_maker": 1.0, "hp_momentum": 1.0, "stat_arb": 1.2, "knn": 0.8, }) self.confluence_threshold = config.get("confluence_threshold", CONFLUENCE_THRESHOLD) self._config = config # ── Live price cache for stat-arb pair sync ───────────────────────── self._price_cache: Dict[str, float] = {} # ticker -> latest price self._volume_cache: Dict[str, float] = {} # ticker -> latest volume # ── Tape logger ─────────────────────────────────────────────────────── self._tape_path = config.get("tape_path", TAPE_PATH) self._tape_file = None self._tick_count = 0 self._signal_count = 0 self._execution_count = 0 self._start_ts = 0.0 # ── Ingest + Screener (initialised in initialize()) ─────────────────── self._ingest: Optional[TickIngester] = None self._screener: Optional[SolanaScreener] = None # ── 3-Bucket system ─────────────────────────────────────────────────── self._pair_buckets: Dict[str, int] = {} # ticker -> bucket (1/2/3) self._rl_recorder = RLRecorder( output_dir=config.get("rl_output_dir", "/opt/dyeverse/crypto_agency/data/rl"), ) self._bucket_tick_counts: Dict[int, int] = {1: 0, 2: 0, 3: 0} # ── Initialisation ──────────────────────────────────────────────────────── async def initialize(self): """Set up executor, tape, ingest layer, and screener.""" self._start_ts = time.time() await self.executor.initialize() # Open tape os.makedirs(os.path.dirname(self._tape_path), exist_ok=True) self._tape_file = open(self._tape_path, "a") # Advanced Solana screener birdeye_key = self._load_env_key("BIRDEYE_API_KEY") self._screener = SolanaScreener( min_liquidity = self._config.get("min_liquidity", 500_000), min_volume_24h = self._config.get("min_volume_24h", 200_000), min_age_hours = self._config.get("min_age_hours", 24.0), top_n = self._config.get("bucket1_slots", 15), scan_interval = self._config.get("scan_interval", 120.0), birdeye_api_key = birdeye_key, on_candidates = self._on_screener_candidates, # 3-Bucket config bucket1_min_liquidity = self._config.get("bucket1_min_liquidity", 150_000), bucket1_slots = self._config.get("bucket1_slots", 15), bucket2_min_liquidity = self._config.get("bucket2_min_liquidity", 10_000), bucket2_max_liquidity = self._config.get("bucket2_max_liquidity", 500_000), bucket2_min_change_5m = self._config.get("bucket2_min_change_5m", 3.0), bucket2_min_change_1h = self._config.get("bucket2_min_change_1h", 15.0), bucket2_max_age_hours = self._config.get("bucket2_max_age_hours", 24.0), bucket2_slots = self._config.get("bucket2_slots", 5), bucket3_slots = self._config.get("bucket3_slots", 5), ) # Tick ingest layer self._ingest = TickIngester( bot = self, env_path = self._config.get("env_path", "/opt/dyeverse/crypto_agency/.env"), ) logger.info( f"[SOVEREIGN V4.1] Online — strategies={list(self.strategies.keys())} " f"confluence={self.confluence_threshold} daily_cap=${self.risk_manager.daily_cap:.0f} " f"3-bucket RL harvesting ENABLED" ) def _load_env_key(self, key: str) -> Optional[str]: env_path = self._config.get("env_path", "/opt/dyeverse/crypto_agency/.env") if not os.path.exists(env_path): return None with open(env_path) as f: for line in f: line = line.strip()