From cbfa1745b7ded2f8f3c43ad5b5f5daa02626abf9 Mon Sep 17 00:00:00 2001 From: rene Date: Sat, 28 Mar 2026 20:24:30 +0100 Subject: [PATCH] asciiquarium-ng: initial Python implementation with true RGB colors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Single-file animation engine: Canvas with per-cell true-color, ocean gradient background, waves, castle, seaweed, fish, shark, jellyfish, ship, bubbles. No Curses dependency — pure ANSI escape codes. --- asciiquarium_ng.py | 716 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 716 insertions(+) create mode 100755 asciiquarium_ng.py diff --git a/asciiquarium_ng.py b/asciiquarium_ng.py new file mode 100755 index 0000000..1dffb6e --- /dev/null +++ b/asciiquarium_ng.py @@ -0,0 +1,716 @@ +#!/usr/bin/env python3 +"""asciiquarium-ng — modernized aquarium animation with true RGB colors.""" + +import argparse +import math +import os +import random +import signal +import sys +import time +import tty +import termios +import threading +from dataclasses import dataclass, field +from typing import Optional + + +# ────────────────────────────────────────────────────────────────────────────── +# Terminal helpers +# ────────────────────────────────────────────────────────────────────────────── + +def truecolor(r: int, g: int, b: int, bg: bool = False) -> str: + """Return ANSI true-color escape for fg (bg=False) or bg (bg=True).""" + layer = 48 if bg else 38 + return f"\033[{layer};2;{r};{g};{b}m" + + +RESET = "\033[0m" +HIDE_CURSOR = "\033[?25l" +SHOW_CURSOR = "\033[?25h" +ALT_SCREEN_ON = "\033[?1049h" +ALT_SCREEN_OFF = "\033[?1049l" +CLEAR = "\033[2J\033[H" + + +def move(row: int, col: int) -> str: + return f"\033[{row+1};{col+1}H" + + +def terminal_size() -> tuple[int, int]: + import shutil + s = shutil.get_terminal_size() + return s.lines, s.columns + + +# ────────────────────────────────────────────────────────────────────────────── +# Color palette +# ────────────────────────────────────────────────────────────────────────────── + +@dataclass(frozen=True) +class RGB: + r: int + g: int + b: int + + def blend(self, other: "RGB", t: float) -> "RGB": + return RGB( + int(self.r + (other.r - self.r) * t), + int(self.g + (other.g - self.g) * t), + int(self.b + (other.b - self.b) * t), + ) + + def fg(self) -> str: + return truecolor(self.r, self.g, self.b, bg=False) + + def bg(self) -> str: + return truecolor(self.r, self.g, self.b, bg=True) + + +# Ocean gradient: deep midnight blue at bottom → teal at mid-depth → dark cyan at surface +OCEAN_TOP = RGB(0, 60, 80) +OCEAN_MID = RGB(0, 40, 70) +OCEAN_BOTTOM = RGB(0, 20, 50) + +# Entity colors +C_WHITE = RGB(255, 255, 255) +C_CYAN = RGB(0, 220, 220) +C_GREEN = RGB(50, 220, 80) +C_YELLOW = RGB(240, 200, 0) +C_ORANGE = RGB(255, 140, 0) +C_RED = RGB(220, 50, 50) +C_MAGENTA = RGB(200, 80, 200) +C_BLUE = RGB(80, 140, 255) +C_GRAY = RGB(160, 160, 160) +C_DARK = RGB(60, 60, 80) +C_SAND = RGB(180, 160, 100) +C_BUBBLE = RGB(150, 200, 230) +C_WAVE = RGB(100, 180, 220) +C_SHIP = RGB(140, 120, 100) +C_SHARK = RGB(120, 130, 145) + + +# ────────────────────────────────────────────────────────────────────────────── +# Canvas: per-cell fg color + character, rendered in one write() +# ────────────────────────────────────────────────────────────────────────────── + +@dataclass +class Cell: + ch: str = " " + fg: Optional[RGB] = None # None → no fg escape (use bg color as text color) + + +class Canvas: + def __init__(self, rows: int, cols: int): + self.rows = rows + self.cols = cols + self._cells: list[list[Cell]] = [[Cell() for _ in range(cols)] for _ in range(rows)] + self._ocean: list[RGB] = [] + self._build_ocean() + + def _build_ocean(self): + """Precompute ocean background color per row.""" + self._ocean = [] + for r in range(self.rows): + t = r / max(self.rows - 1, 1) + if t < 0.5: + color = OCEAN_TOP.blend(OCEAN_MID, t * 2) + else: + color = OCEAN_MID.blend(OCEAN_BOTTOM, (t - 0.5) * 2) + self._ocean.append(color) + + def resize(self, rows: int, cols: int): + self.rows = rows + self.cols = cols + self._cells = [[Cell() for _ in range(cols)] for _ in range(rows)] + self._build_ocean() + + def clear(self): + for row in self._cells: + for cell in row: + cell.ch = " " + cell.fg = None + + def put(self, row: int, col: int, ch: str, fg: Optional[RGB]): + if 0 <= row < self.rows and 0 <= col < self.cols: + self._cells[row][col].ch = ch + self._cells[row][col].fg = fg + + def put_str(self, row: int, col: int, s: str, fg: Optional[RGB]): + for i, ch in enumerate(s): + self.put(row, col + i, ch, fg) + + def render(self) -> str: + buf: list[str] = ["\033[H"] # move to top-left without clearing + last_fg: Optional[RGB] = None + last_bg: Optional[RGB] = None + + for r, row in enumerate(self._cells): + ocean_bg = self._ocean[r] + for c, cell in enumerate(row): + # Background + if ocean_bg is not last_bg: + buf.append(ocean_bg.bg()) + last_bg = ocean_bg + # Foreground + fg = cell.fg if cell.fg is not None else ocean_bg + if fg is not last_fg: + buf.append(fg.fg()) + last_fg = fg + buf.append(cell.ch) + # End of row — no newline needed (cursor wraps), but reset bg to avoid + # terminal smearing the last cell's bg color into line padding + if r < self.rows - 1: + buf.append(RESET) + last_fg = None + last_bg = None + + buf.append(RESET) + return "".join(buf) + + +# ────────────────────────────────────────────────────────────────────────────── +# Entity base class +# ────────────────────────────────────────────────────────────────────────────── + +class Entity: + def __init__(self, row: int, col: int, speed: float = 1.0): + self.row = row + self.col = col # float for sub-pixel movement + self.speed = speed # cols per second (negative = left) + self.alive = True + + def update(self, dt: float, rows: int, cols: int): + """Advance physics. Set self.alive = False to remove.""" + self.col += self.speed * dt + + def draw(self, canvas: Canvas): + pass + + def _offscreen(self, cols: int, width: int) -> bool: + c = int(self.col) + return c + width < 0 or c >= cols + + +# ────────────────────────────────────────────────────────────────────────────── +# Small fish ><> or <>< +# ────────────────────────────────────────────────────────────────────────────── + +SMALL_FISH_R = ["><>", "><°>", ">°<>"] # swimming right +SMALL_FISH_L = ["<><", "<°><", "<>°<"] # swimming left + +FISH_COLORS = [C_CYAN, C_YELLOW, C_ORANGE, C_GREEN, C_MAGENTA, C_BLUE, C_RED] + + +class SmallFish(Entity): + def __init__(self, row: int, col: int, speed: float, color: RGB): + super().__init__(row, col, speed) + self.color = color + forms = SMALL_FISH_R if speed > 0 else SMALL_FISH_L + self.body = random.choice(forms) + + def update(self, dt: float, rows: int, cols: int): + super().update(dt, rows, cols) + if self._offscreen(cols, len(self.body)): + self.alive = False + + def draw(self, canvas: Canvas): + canvas.put_str(int(self.row), int(self.col), self.body, self.color) + + +# ────────────────────────────────────────────────────────────────────────────── +# Big fish >==[}> or <{[==< +# ────────────────────────────────────────────────────────────────────────────── + +BIG_FISH_R = [ + [" __ ", ">==[}> ", " ~~ "], +] +BIG_FISH_L = [ + [" __ ", " <{[==<", " ~~ "], +] + + +class BigFish(Entity): + def __init__(self, row: int, col: int, speed: float, color: RGB): + super().__init__(row, col, speed) + self.color = color + self.sprite = random.choice(BIG_FISH_R if speed > 0 else BIG_FISH_L) + + def update(self, dt: float, rows: int, cols: int): + super().update(dt, rows, cols) + w = max(len(l) for l in self.sprite) + if self._offscreen(cols, w): + self.alive = False + + def draw(self, canvas: Canvas): + for dr, line in enumerate(self.sprite): + canvas.put_str(int(self.row) + dr - 1, int(self.col), line, self.color) + + +# ────────────────────────────────────────────────────────────────────────────── +# Shark +# ────────────────────────────────────────────────────────────────────────────── + +SHARK_R = [ + " __ ", + " =-_ =-_ =-_ ==>| ,----( o>", + " ~~-~ ", +] +SHARK_L = [ + " __ ", + " 0 else SHARK_L + + def update(self, dt: float, rows: int, cols: int): + super().update(dt, rows, cols) + w = max(len(l) for l in self.sprite) + if self._offscreen(cols, w): + self.alive = False + + def draw(self, canvas: Canvas): + for dr, line in enumerate(self.sprite): + canvas.put_str(int(self.row) + dr - 1, int(self.col), line, C_SHARK) + + +# ────────────────────────────────────────────────────────────────────────────── +# Jellyfish +# ────────────────────────────────────────────────────────────────────────────── + +JELLY_BODY = [" ,., ", "( o )", "|'|'|", "| | |"] +JELLY_ALT = [" ,., ", "( o )", "| | |", "|'|'|"] + + +class Jellyfish(Entity): + def __init__(self, row: int, col: int, color: RGB): + # Jellyfish drift slowly horizontally and bob vertically + speed = random.uniform(-0.3, 0.3) + super().__init__(row, col, speed) + self.color = color + self.base_row = float(row) + self.bob_phase = random.uniform(0, math.tau) + self.bob_amp = random.uniform(0.5, 1.5) + self.bob_freq = random.uniform(0.3, 0.7) + self.t = 0.0 + self.frame = 0 + self.frame_timer = 0.0 + + def update(self, dt: float, rows: int, cols: int): + self.t += dt + self.col += self.speed * dt + self.row = self.base_row + math.sin(self.bob_phase + self.t * self.bob_freq * math.tau) * self.bob_amp + self.frame_timer += dt + if self.frame_timer > 0.5: + self.frame = 1 - self.frame + self.frame_timer = 0.0 + if self._offscreen(cols, 5): + self.alive = False + + def draw(self, canvas: Canvas): + sprite = JELLY_BODY if self.frame == 0 else JELLY_ALT + for dr, line in enumerate(sprite): + canvas.put_str(int(self.row) + dr, int(self.col), line, self.color) + + +# ────────────────────────────────────────────────────────────────────────────── +# Bubble +# ────────────────────────────────────────────────────────────────────────────── + +class Bubble(Entity): + def __init__(self, row: int, col: int): + super().__init__(row, float(col), speed=0) + self.float_row = float(row) + self.float_speed = random.uniform(0.8, 2.0) # rows per second upward + self.wobble_phase = random.uniform(0, math.tau) + self.wobble_amp = random.uniform(0.3, 0.8) + self.t = 0.0 + + def update(self, dt: float, rows: int, cols: int): + self.t += dt + self.float_row -= self.float_speed * dt + self.col = self.col + math.sin(self.wobble_phase + self.t * 1.5) * self.wobble_amp * dt + self.row = self.float_row + if self.float_row < 1: + self.alive = False + + def draw(self, canvas: Canvas): + canvas.put(int(self.row), int(self.col), "o", C_BUBBLE) + + +# ────────────────────────────────────────────────────────────────────────────── +# Seaweed +# ────────────────────────────────────────────────────────────────────────────── + +class Seaweed(Entity): + """Stationary swaying seaweed at the bottom.""" + def __init__(self, row: int, col: int, height: int): + super().__init__(row, col, speed=0) + self.height = height + self.phase = random.uniform(0, math.tau) + self.t = 0.0 + + def update(self, dt: float, rows: int, cols: int): + self.t += dt + + def draw(self, canvas: Canvas): + for i in range(self.height): + r = int(self.row) - i + offset = int(math.sin(self.phase + self.t * 1.2 + i * 0.4) * 1) + ch = "(" if (i + int(self.t * 2)) % 2 == 0 else ")" + canvas.put(r, int(self.col) + offset, ch, C_GREEN) + + +# ────────────────────────────────────────────────────────────────────────────── +# Ship (on surface) +# ────────────────────────────────────────────────────────────────────────────── + +SHIP_R = [ + " | ", + " | ", + " __|__ ", + " / \\ ", + "/ * * \\", + "~~~~~~~~~~", +] +SHIP_L = [ + " | ", + " | ", + " __|__ ", + " / \\ ", + "/ * * \\", + "~~~~~~~~~~", +] + + +class Ship(Entity): + def __init__(self, col: int, speed: float): + super().__init__(0, col, speed) # row adjusted in draw + self.sprite = SHIP_R if speed > 0 else SHIP_L + + def update(self, dt: float, rows: int, cols: int): + super().update(dt, rows, cols) + self.row = 0 # always at surface + w = max(len(l) for l in self.sprite) + if self._offscreen(cols, w): + self.alive = False + + def draw(self, canvas: Canvas): + for dr, line in enumerate(self.sprite): + canvas.put_str(dr, int(self.col), line, C_SHIP) + + +# ────────────────────────────────────────────────────────────────────────────── +# Waves +# ────────────────────────────────────────────────────────────────────────────── + +class Waves: + """Animated wave band at the top of the screen.""" + def __init__(self): + self.t = 0.0 + self.offset = 0.0 + + def update(self, dt: float): + self.t += dt + self.offset += dt * 4.0 + + def draw(self, canvas: Canvas): + cols = canvas.cols + # Two rows of waves + for c in range(cols): + phase = (c + self.offset) * 0.3 + # Row 0: crests + ch = "~" if math.sin(phase) > 0 else " " + canvas.put(0, c, ch, C_WAVE) + # Row 1: troughs + ch2 = "~" if math.sin(phase + 1.0) > -0.3 else " " + canvas.put(1, c, ch2, C_WAVE.blend(C_BLUE, 0.3)) + + +# ────────────────────────────────────────────────────────────────────────────── +# Castle (static decoration) +# ────────────────────────────────────────────────────────────────────────────── + +CASTLE = [ + " T~~", + " | ", + " /|\\\\", + " T~~ /_|_\\\\", + " | | |", + " /|\\\\ | |", + " /_|_\\\\ | |", + " | | | |", + "===|===|===|===|===", +] + + +class Castle: + def __init__(self, row: int, col: int): + self.row = row + self.col = col + + def draw(self, canvas: Canvas): + for dr, line in enumerate(CASTLE): + canvas.put_str(self.row + dr, self.col, line, C_SAND) + + +# ────────────────────────────────────────────────────────────────────────────── +# Aquarium orchestrator +# ────────────────────────────────────────────────────────────────────────────── + +class Aquarium: + def __init__(self, rows: int, cols: int): + self.rows = rows + self.cols = cols + self.canvas = Canvas(rows, cols) + self.entities: list[Entity] = [] + self.waves = Waves() + self.castle: Optional[Castle] = None + self.seaweeds: list[Seaweed] = [] + self.t = 0.0 + self.next_fish = 0.0 + self.next_shark = random.uniform(30, 60) + self.next_jelly = random.uniform(5, 15) + self.next_ship = random.uniform(20, 45) + self._setup_static() + self._initial_population() + self._resize_lock = threading.Lock() + self._needs_resize = False + self._new_size: tuple[int, int] = (rows, cols) + + def _setup_static(self): + # Castle at bottom-right + castle_row = self.rows - len(CASTLE) + castle_col = max(0, self.cols - 22) + self.castle = Castle(castle_row, castle_col) + + # Seaweeds scattered at bottom + self.seaweeds = [] + n_weeds = max(3, self.cols // 15) + used_cols = set() + # Avoid castle area + castle_zone = range(castle_col - 2, castle_col + 22) + for _ in range(n_weeds): + for _ in range(20): + c = random.randint(2, self.cols - 4) + if c not in used_cols and c not in castle_zone: + used_cols.add(c) + h = random.randint(3, min(7, self.rows // 4)) + self.seaweeds.append(Seaweed(self.rows - 2, c, h)) + break + + def _initial_population(self): + """Spawn a few fish to start.""" + for _ in range(4): + self._spawn_fish() + for _ in range(2): + self._spawn_bubble_cluster() + + def _fish_row(self) -> int: + return random.randint(3, self.rows - 5) + + def _spawn_fish(self): + going_right = random.random() < 0.5 + speed = random.uniform(4, 10) * (1 if going_right else -1) + col = -10 if going_right else self.cols + 10 + color = random.choice(FISH_COLORS) + if random.random() < 0.3: + self.entities.append(BigFish(self._fish_row(), col, speed * 0.6, color)) + else: + self.entities.append(SmallFish(self._fish_row(), col, speed, color)) + + def _spawn_bubble_cluster(self): + col = random.randint(2, self.cols - 3) + base_row = random.randint(self.rows // 2, self.rows - 3) + for _ in range(random.randint(1, 3)): + self.entities.append(Bubble(base_row + random.randint(-1, 1), + col + random.randint(-1, 1))) + + def _spawn_shark(self): + going_right = random.random() < 0.5 + speed = random.uniform(5, 8) * (1 if going_right else -1) + col = -40 if going_right else self.cols + 40 + self.entities.append(Shark(self._fish_row(), col, speed)) + + def _spawn_jellyfish(self): + col = random.randint(0, self.cols - 6) + row = random.randint(3, self.rows - 8) + color = random.choice([C_CYAN, C_MAGENTA, C_BLUE, C_YELLOW]) + self.entities.append(Jellyfish(row, col, color)) + + def _spawn_ship(self): + going_right = random.random() < 0.5 + speed = random.uniform(2, 5) * (1 if going_right else -1) + col = -12 if going_right else self.cols + 12 + self.entities.append(Ship(col, speed)) + + def signal_resize(self, rows: int, cols: int): + with self._resize_lock: + self._needs_resize = True + self._new_size = (rows, cols) + + def _apply_resize(self): + rows, cols = self._new_size + self.rows = rows + self.cols = cols + self.canvas.resize(rows, cols) + self._setup_static() + self._needs_resize = False + + def update(self, dt: float): + with self._resize_lock: + if self._needs_resize: + self._apply_resize() + + self.t += dt + self.waves.update(dt) + + # Spawn timers + self.next_fish -= dt + if self.next_fish <= 0: + self._spawn_fish() + self.next_fish = random.uniform(3, 8) + + self.next_shark -= dt + if self.next_shark <= 0: + self._spawn_shark() + self.next_shark = random.uniform(25, 55) + + self.next_jelly -= dt + if self.next_jelly <= 0: + self._spawn_jellyfish() + self.next_jelly = random.uniform(8, 20) + + self.next_ship -= dt + if self.next_ship <= 0: + self._spawn_ship() + self.next_ship = random.uniform(20, 45) + + # Occasional bubble clusters + if random.random() < dt * 0.5: + self._spawn_bubble_cluster() + + # Update entities + for e in self.entities: + e.update(dt, self.rows, self.cols) + self.entities = [e for e in self.entities if e.alive] + + for w in self.seaweeds: + w.update(dt, self.rows, self.cols) + + def draw(self) -> str: + self.canvas.clear() + self.waves.draw(self.canvas) + if self.castle: + self.castle.draw(self.canvas) + for w in self.seaweeds: + w.draw(self.canvas) + for e in self.entities: + e.draw(self.canvas) + return self.canvas.render() + + +# ────────────────────────────────────────────────────────────────────────────── +# Input handling (non-blocking, raw mode) +# ────────────────────────────────────────────────────────────────────────────── + +class InputHandler: + def __init__(self): + self._fd = sys.stdin.fileno() + self._old_settings = termios.tcgetattr(self._fd) + tty.setraw(self._fd) + import fcntl + import os + flags = fcntl.fcntl(self._fd, fcntl.F_GETFL) + fcntl.fcntl(self._fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + def read_key(self) -> Optional[str]: + try: + ch = os.read(self._fd, 1) + return ch.decode("utf-8", errors="ignore") + except BlockingIOError: + return None + + def restore(self): + termios.tcsetattr(self._fd, termios.TCSADRAIN, self._old_settings) + + +# ────────────────────────────────────────────────────────────────────────────── +# Main loop +# ────────────────────────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser(description="asciiquarium-ng — animated aquarium with true RGB colors") + parser.add_argument("--fps", type=int, default=30, help="Target frames per second (default: 30)") + parser.add_argument("--no-ship", action="store_true", help="Disable the surface ship") + parser.add_argument("--no-shark", action="store_true", help="Disable the shark") + args = parser.parse_args() + + frame_time = 1.0 / args.fps + rows, cols = terminal_size() + + inp = InputHandler() + aquarium = Aquarium(rows, cols) + + if args.no_shark: + aquarium.next_shark = float("inf") + if args.no_ship: + aquarium.next_ship = float("inf") + + # SIGWINCH: terminal resize + def on_resize(signum, frame): + r, c = terminal_size() + aquarium.signal_resize(r, c) + + signal.signal(signal.SIGWINCH, on_resize) + + # Enter alternate screen, hide cursor + os.write(sys.stdout.fileno(), (ALT_SCREEN_ON + HIDE_CURSOR + CLEAR).encode()) + + try: + last = time.monotonic() + while True: + now = time.monotonic() + dt = now - last + last = now + + # Input + key = inp.read_key() + if key in ("q", "Q", "\x03", "\x1b"): + break + elif key == "p": + # Pause: wait for another p or q + while True: + k2 = inp.read_key() + if k2 in ("p", "q", "Q", "\x03", "\x1b"): + if k2 in ("q", "Q", "\x03", "\x1b"): + raise KeyboardInterrupt + break + time.sleep(0.05) + last = time.monotonic() + continue + + aquarium.update(dt) + frame = aquarium.draw() + os.write(sys.stdout.fileno(), frame.encode()) + + # Sleep to hit target fps + elapsed = time.monotonic() - now + sleep = frame_time - elapsed + if sleep > 0: + time.sleep(sleep) + + except KeyboardInterrupt: + pass + finally: + inp.restore() + os.write(sys.stdout.fileno(), (SHOW_CURSOR + ALT_SCREEN_OFF + RESET).encode()) + + +if __name__ == "__main__": + main()