Files
openclaw/skills/mikrotik-api/scripts/routeros_api.py

165 lines
4.9 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import argparse
import json
import socket
from typing import List
def enc_len(n: int) -> bytes:
if n <= 0x7F:
return bytes([n])
if n <= 0x3FFF:
n |= 0x8000
return n.to_bytes(2, 'big')
if n <= 0x1FFFFF:
n |= 0xC00000
return n.to_bytes(3, 'big')
if n <= 0x0FFFFFFF:
n |= 0xE0000000
return n.to_bytes(4, 'big')
return bytes([0xF0]) + n.to_bytes(4, 'big')
def dec_len(sock: socket.socket) -> int:
b = sock.recv(1)
if not b:
raise ConnectionError('socket closed while reading word length')
c = b[0]
if c < 0x80:
return c
if c < 0xC0:
return ((c & 0x3F) << 8) | sock.recv(1)[0]
if c < 0xE0:
b2 = sock.recv(2)
return ((c & 0x1F) << 16) | (b2[0] << 8) | b2[1]
if c < 0xF0:
b3 = sock.recv(3)
return ((c & 0x0F) << 24) | (b3[0] << 16) | (b3[1] << 8) | b3[2]
if c == 0xF0:
return int.from_bytes(sock.recv(4), 'big')
raise ValueError(f'unsupported control byte: {hex(c)}')
def send_sentence(sock: socket.socket, words: List[str]) -> None:
for w in words:
data = w.encode('utf-8')
sock.sendall(enc_len(len(data)) + data)
sock.sendall(b'\x00')
def recv_sentence(sock: socket.socket) -> List[str]:
out = []
while True:
ln = dec_len(sock)
if ln == 0:
return out
data = sock.recv(ln)
while len(data) < ln:
chunk = sock.recv(ln - len(data))
if not chunk:
raise ConnectionError('socket closed while reading word body')
data += chunk
out.append(data.decode('utf-8', errors='replace'))
class RouterOS:
def __init__(self, host: str, port: int, timeout: float = 10.0):
self.sock = socket.create_connection((host, port), timeout=timeout)
def close(self):
try:
self.sock.close()
except Exception:
pass
def login(self, user: str, password: str) -> None:
send_sentence(self.sock, ['/login', f'=name={user}', f'=password={password}'])
while True:
s = recv_sentence(self.sock)
if not s:
continue
t = s[0]
if t == '!done':
return
if t == '!trap':
raise RuntimeError('login failed: ' + ' | '.join(s))
def command(self, words: List[str]) -> List[List[str]]:
send_sentence(self.sock, words)
replies = []
while True:
s = recv_sentence(self.sock)
if not s:
continue
replies.append(s)
t = s[0]
if t in ('!done', '!fatal'):
break
return replies
def parse_args():
p = argparse.ArgumentParser(description='Minimal MikroTik RouterOS API client (8728).')
p.add_argument('--host', required=True)
p.add_argument('--port', type=int, default=8728)
p.add_argument('--user', required=True)
p.add_argument('--password', required=True)
sub = p.add_subparsers(dest='mode', required=True)
c = sub.add_parser('cmd', help='Send raw command path and attrs')
c.add_argument('command', help='e.g. /system/resource/print')
c.add_argument('--attr', action='append', default=[], help='attribute in name=value format')
c.add_argument('--tag', default='')
pr = sub.add_parser('print', help='Convenience wrapper for print/getall style')
pr.add_argument('command', help='e.g. /interface/print')
pr.add_argument('--proplist', default='')
pr.add_argument('--query', action='append', default=[], help='query words, e.g. ?type=ether')
pr.add_argument('--tag', default='')
return p.parse_args()
def main():
a = parse_args()
api = RouterOS(a.host, a.port)
try:
api.login(a.user, a.password)
words = []
if a.mode == 'cmd':
words = [a.command]
for x in a.attr:
if '=' not in x:
raise ValueError('--attr format must be name=value')
words.append('=' + x)
if a.tag:
words.append(f'.tag={a.tag}')
else:
words = [a.command]
if a.proplist:
words.append(f'=.proplist={a.proplist}')
if a.tag:
words.append(f'.tag={a.tag}')
words.extend(a.query)
replies = api.command(words)
out = []
for s in replies:
row = {'type': s[0], 'words': s[1:]}
attrs = {}
for w in s[1:]:
if w.startswith('=') and '=' in w[1:]:
k, v = w[1:].split('=', 1)
attrs[k] = v
if attrs:
row['attrs'] = attrs
out.append(row)
print(json.dumps({'ok': True, 'reply': out}, ensure_ascii=False))
finally:
api.close()
if __name__ == '__main__':
main()