#!/usr/bin/env python3 import argparse import json import socket from typing import List def enc_len(n: int) -> bytes: if n <= 0x7F: return bytes([n]) if n <= 0x3FFF: n |= 0x8000 return n.to_bytes(2, 'big') if n <= 0x1FFFFF: n |= 0xC00000 return n.to_bytes(3, 'big') if n <= 0x0FFFFFFF: n |= 0xE0000000 return n.to_bytes(4, 'big') return bytes([0xF0]) + n.to_bytes(4, 'big') def dec_len(sock: socket.socket) -> int: b = sock.recv(1) if not b: raise ConnectionError('socket closed while reading word length') c = b[0] if c < 0x80: return c if c < 0xC0: return ((c & 0x3F) << 8) | sock.recv(1)[0] if c < 0xE0: b2 = sock.recv(2) return ((c & 0x1F) << 16) | (b2[0] << 8) | b2[1] if c < 0xF0: b3 = sock.recv(3) return ((c & 0x0F) << 24) | (b3[0] << 16) | (b3[1] << 8) | b3[2] if c == 0xF0: return int.from_bytes(sock.recv(4), 'big') raise ValueError(f'unsupported control byte: {hex(c)}') def send_sentence(sock: socket.socket, words: List[str]) -> None: for w in words: data = w.encode('utf-8') sock.sendall(enc_len(len(data)) + data) sock.sendall(b'\x00') def recv_sentence(sock: socket.socket) -> List[str]: out = [] while True: ln = dec_len(sock) if ln == 0: return out data = sock.recv(ln) while len(data) < ln: chunk = sock.recv(ln - len(data)) if not chunk: raise ConnectionError('socket closed while reading word body') data += chunk out.append(data.decode('utf-8', errors='replace')) class RouterOS: def __init__(self, host: str, port: int, timeout: float = 10.0): self.sock = socket.create_connection((host, port), timeout=timeout) def close(self): try: self.sock.close() except Exception: pass def login(self, user: str, password: str) -> None: send_sentence(self.sock, ['/login', f'=name={user}', f'=password={password}']) while True: s = recv_sentence(self.sock) if not s: continue t = s[0] if t == '!done': return if t == '!trap': raise RuntimeError('login failed: ' + ' | '.join(s)) def command(self, words: List[str]) -> List[List[str]]: send_sentence(self.sock, words) replies = [] while True: s = recv_sentence(self.sock) if not s: continue replies.append(s) t = s[0] if t in ('!done', '!fatal'): break return replies def parse_args(): p = argparse.ArgumentParser(description='Minimal MikroTik RouterOS API client (8728).') p.add_argument('--host', required=True) p.add_argument('--port', type=int, default=8728) p.add_argument('--user', required=True) p.add_argument('--password', required=True) sub = p.add_subparsers(dest='mode', required=True) c = sub.add_parser('cmd', help='Send raw command path and attrs') c.add_argument('command', help='e.g. /system/resource/print') c.add_argument('--attr', action='append', default=[], help='attribute in name=value format') c.add_argument('--tag', default='') pr = sub.add_parser('print', help='Convenience wrapper for print/getall style') pr.add_argument('command', help='e.g. /interface/print') pr.add_argument('--proplist', default='') pr.add_argument('--query', action='append', default=[], help='query words, e.g. ?type=ether') pr.add_argument('--tag', default='') return p.parse_args() def main(): a = parse_args() api = RouterOS(a.host, a.port) try: api.login(a.user, a.password) words = [] if a.mode == 'cmd': words = [a.command] for x in a.attr: if '=' not in x: raise ValueError('--attr format must be name=value') words.append('=' + x) if a.tag: words.append(f'.tag={a.tag}') else: words = [a.command] if a.proplist: words.append(f'=.proplist={a.proplist}') if a.tag: words.append(f'.tag={a.tag}') words.extend(a.query) replies = api.command(words) out = [] for s in replies: row = {'type': s[0], 'words': s[1:]} attrs = {} for w in s[1:]: if w.startswith('=') and '=' in w[1:]: k, v = w[1:].split('=', 1) attrs[k] = v if attrs: row['attrs'] = attrs out.append(row) print(json.dumps({'ok': True, 'reply': out}, ensure_ascii=False)) finally: api.close() if __name__ == '__main__': main()