Source code for almir.lib.bconsole

import fcntl
import select
import os
import re
import shlex
import tempfile
from contextlib import contextmanager
from subprocess import Popen, PIPE
from pyramid.threadlocal import get_current_registry

from almir.lib.utils import nl2br


CURRENT_DIRECTORY = os.path.dirname(os.path.abspath(__file__))
JOBS_DEF_RE = re.compile("""
                              \s+name=(?P<name>.+)\s+JobType
                              .+
                              level=(?P<level>\S*)
                              .+
                              Priority=(?P<priority>\d*)
                              .+
                              Client
                              .+
                              FileSet:\s+name=(?P<fileset>.*)\n
                              .+
                              Storage
                              .+
                              Pool
                              """, re.X)  # TODO: where/when


[docs]class BConsoleError(Exception): pass
[docs]class DirectorNotRunning(BConsoleError): pass
[docs]class BConsole(object): """Interface to bconsole binary""" # TODO: provide bconsole session through http request to avoid multiple connects in one http request which slow down performance def __init__(self, bconsole_command='bconsole -n -c %s', config_file=None): default_config_file = os.path.realpath(os.path.join(CURRENT_DIRECTORY, '..', '..', 'bconsole.conf')) registry = get_current_registry() self.config_file = registry.settings.get('bconsole_config', config_file or default_config_file) self.bconsole_command = bconsole_command % self.config_file @classmethod @contextmanager
[docs] def from_temp_config(cls, name, address, port, password): """Constructs :class:`BConsole` object with help of passing temporary file for the session. """ with tempfile.NamedTemporaryFile() as f: template = os.path.join(CURRENT_DIRECTORY, '..', '..', 'buildout.d', 'templates', 'bconsole.conf.in') with open(template) as f2: config = f2.read()\ .replace('${almir:director_name}', name)\ .replace('${almir:director_port}', port)\ .replace('${almir:director_password}', password)\ .replace('${almir:director_address}', address) f.write(config) f.flush() yield cls(config_file=f.name)
[docs] def start_process(self): return Popen(shlex.split(self.bconsole_command), stdout=PIPE, stdin=PIPE, stderr=PIPE)
[docs] def is_running(self): try: self.get_version() return True except DirectorNotRunning: return False
[docs] def get_version(self): p = self.start_process() stdout, stderr = p.communicate('version\n') version = filter(lambda s: 'Version' in s, stdout.split('\n')) if version: return version[-1] else: raise DirectorNotRunning
[docs] def get_jobs_settings(self): p = self.start_process() stdout, stderr = p.communicate('show job') jobs = [] for job in stdout.split('Job:'): jobs.append(JOBS_DEF_RE.find(stdout)) return jobs
[docs] def make_backup(self, job, level=None, storage=None, fileset=None, client=None, priority=None, pool=None, when=None): # TODO: figure out how to preselect options based on job definition (bat does it) # TODO: support 'where' for restore # TODO: support datetime for when # TODO: support parameters as database models instances cmd = 'run job=%s' % job if level: cmd += " level=%s" % level if storage: cmd += " storage=%s" % storage if fileset: cmd += " fileset=%s" % fileset if client: cmd += " client=%s" % client if priority: cmd += " priority=%s" % priority if pool: cmd += " pool=%s" % pool if when: cmd += " when=%s" % when p = self.start_process() stdout, stderr = p.communicate(cmd + "\nyes\n") if True: return "jobid" else: # TODO: stderr why job failed? return False
[docs] def get_upcoming_jobs(self, days=1): """""" p = self.start_process() stdout, stderr = p.communicate('.status dir scheduled days=%d\n' % days) #if stderr.strip(): # pass # TODO: display flash? try: unparsed_jobs = stdout.split('===================================================================================\n')[1].split('====\n')[0] except IndexError: return [] jobs = [] for line in unparsed_jobs.split('\n'): if not line.strip(): continue jobs.append({ 'level': line[:14].strip(), 'type': line[14:23].strip(), 'priority': line[23:28].strip(), 'date': line[28:38].strip(), 'time': line[38:44].strip(), 'name': line[47:67].strip(), 'volume': line[67:].strip(), }) return jobs
[docs] def send_command_by_polling(self, command, process=None): """""" if command == 'quit': return process, {'commands': ['Try harder.']} # start bconsole session if it's not initialized if process is None: process = self.start_process() poll = process.poll() if poll is not None: process = None return process, {'error': 'Connection to director terminated with status %d. Refresh to reconnect.' % poll} # send bconsole command if command: process.stdin.write(command.strip().encode('utf-8') + '\n') # make stdout fileobject nonblockable fp = process.stdout.fileno() flags = fcntl.fcntl(fp, fcntl.F_GETFL) fcntl.fcntl(fp, fcntl.F_SETFL, flags | os.O_NONBLOCK) output = '' while 1: # wait for data or timeout [i, o, e] = select.select([fp], [], [], 1) if i: # we have more data output += process.stdout.read(1000) else: # we have a timeout output = nl2br(output) return process, {"commands": [output]}

Project Versions

This Page