#!/usr/bin/python """Daemon that performs mergebot operations and manages the associated work. """ import sys import socket import select import os import time import trac import base64 from threading import Thread from mergebot.BranchActor import BranchActor from mergebot.RebranchActor import RebranchActor from mergebot.CheckMergeActor import CheckMergeActor from mergebot.MergeActor import MergeActor from mergebot.daemonize import daemonize class Task(object): """Represents a task in the queue with its dependencies. """ task_id = 0 task_name_to_actor = { 'merge': MergeActor, 'branch': BranchActor, 'rebranch': RebranchActor, 'checkmerge': CheckMergeActor, } def __init__(self, master, ticket, action, component, version, summary, requestor): self.id = Task.task_id = Task.task_id + 1 self.master = master self.action = action.lower() if action not in Task.task_name_to_actor.keys(): raise Exception('Invalid task type %s' % action) self.ticket = ticket self.component = component self.version = version self.summary = summary self.requestor = requestor self.blocked_by = [] self._queued = False self._running = False self._completed = False def find_blockers(self, tasks): """Adds existing tasks that block this task to this task's blocked_by list. """ for task in tasks: # Tasks for different components are completely independent if task.component == self.component: is_blocked = False # if self.version is a ticket ID, then any action other than # 'checkmerge' on the parent ticket blocks this action if self.version.startswith('#'): parent_ticket = int(self.version.lstrip('#')) if task.ticket == parent_ticket and task.action != 'checkmerge': is_blocked = True # Any action other than checkmerge on a child ticket of ours blocks us if task.version.startswith('#'): parent_ticket = int(task.version.lstrip('#')) if self.ticket == parent_ticket and task.action != 'checkmerge': is_blocked = True # If (re)branching, then blocked by other tickets that are merging _to_ self.version if self.action == 'branch' or self.action == 'rebranch': if task.action == 'merge' and task.version == self.version: is_blocked = True # A merge operation targeting our version blocks us if task.action == 'merge' and task.version == self.version: is_blocked = True # If there is another queued operation for this same ticket, # that task blocks this one if self.ticket == task.ticket: is_blocked = True if is_blocked: self.blocked_by.append(task) return len(self.blocked_by) def other_task_completed(self, task): """Remove the given task from this task's blocked_by list. """ if task in self.blocked_by: self.blocked_by.remove(task) return len(self.blocked_by) def queued(self): """Mark this task ask queued to run. """ self._queued = True def started(self): """Mark this task as running/started. """ self._running = True def completed(self): """Mark this task as completed/zombie. """ self._completed = True def get_state(self): """Return a single-character indicator of this task's current state. Tasks are: Pending if they are ready to run. Waiting if they are blocked by another task. Running if they are currently being done. Zombie if they have been completed. """ if self._completed: state = 'Z'#ombie elif self._running: state = 'R'#unning elif self._queued: state = 'Q'#ueued elif self.blocked_by: state = 'W'#aiting else: state = 'P'#ending return state def execute(self): """Performs the actions for this task. """ work_dir = os.path.join(self.master.work_dir, 'worker-%s' % self.id) actor = Task.task_name_to_actor[self.action](work_dir, self.master.repo_url, self.master.repo_dir, self.ticket, self.component, self.version, self.summary, self.requestor) self.results, self.result_comment, self.success = actor.execute() def __str__(self): summary = base64.b64encode(self.summary) return ','.join([str(e) for e in (self.id, self.get_state(), self.ticket, self.action, self.component, self.version, self.requestor, summary)]) class Worker(object): """Thread to do the work for an operation; has a work area it is responsible for. """ def __init__(self, num, work_dir): self.number = num self.work_dir = work_dir self.task = None self.inbox_read, self.inbox_write = os.pipe() self.notifier_read, self.notifier_write = os.pipe() self.thread = Thread(target=self.work) self.thread.setDaemon(True) self.thread.start() def queue(self, task): task.queued() self.task = task os.write(self.inbox_write, 'q') def _dequeue(self): os.read(self.inbox_read, 1) def _completed(self): self.task.completed() os.write(self.notifier_write, 'd') def ack_complete(self): os.read(self.notifier_read, 1) return self.task def notifier(self): return self.notifier_read def work(self): while True: # get a task -- blocking read on pipe? self._dequeue() # do the task log_filename = os.path.join(self.work_dir, 'worker.%s' % self.number) open(log_filename, 'a').write(str(self.task) + ' started %s\n' % time.time()) self.task.started() self.task.execute() open(log_filename, 'a').write(str(self.task) + ' completed %s\n' % time.time()) # notify master of completion -- write to pipe? self._completed() class Mergebot(object): # Maybe I should just pass this the trac environment dir and have it create # an environment, then pull config info from that. def __init__(self, trac_dir): self.trac_dir = os.path.abspath(trac_dir) self.trac_env = trac.env.open_environment(self.trac_dir) config = self.trac_env.config self.listen_on = (config.get('mergebot', 'listen.ip'), config.getint('mergebot', 'listen.port')) self.work_dir = config.get('mergebot', 'work_dir') if not os.path.isabs(self.work_dir): self.work_dir = os.path.join(self.trac_dir, self.work_dir) self.repo_url = config.get('mergebot', 'repository_url') repo_dir = config.get('trac', 'repository_dir') if not os.path.isabs(repo_dir): repo_dir = os.path.join(self.trac_dir, repo_dir) self.repo_dir = repo_dir self.listening = None self.worker_count = config.getint('mergebot', 'worker_count') self.task_list = [] def run(self): """Run forever, handling requests. """ self.listening = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.listening.bind(self.listen_on) self.listening.listen(10) open_sockets = [] workers = [Worker(i, work_dir=self.work_dir) for i in range(self.worker_count)] active = [] try: while True: fds = [self.listening] + open_sockets + [w.notifier() for w in active] readable, _writeable, _other = select.select(fds, [], []) for s in readable: if s is self.listening: new_socket, _address = self.listening.accept() open_sockets.append(new_socket) elif s in open_sockets: data = s.recv(4096) for line in data.rstrip('\n').split('\n'): if line == '': open_sockets.remove(s) s.close() elif line[:4].upper() == 'QUIT': open_sockets.remove(s) s.close() else: response = self.handle_command(line) if response: s.send(response) # I think we're going to want to make this a # single-shot deal #s.close() #open_sockets.remove(s) else: # Must be an active worker telling us it is done worker = [w for w in active if w.notifier() == s][0] task = worker.ack_complete() active.remove(worker) workers.insert(0, worker) # On failure, cancel all tasks that depend on this one. if not task.success: self._remove_dependant_tasks(task) self._remove_task(task) self._update_ticket(task) # TODO: need to handle connections that the other end # terminates? # Assign a task to a worker available_workers = list(workers) pending = [t for t in self.task_list if t.get_state() == 'P'] for w, t in zip(available_workers, pending): w.queue(t) workers.remove(w) active.append(w) except KeyboardInterrupt: print 'Exiting due to keyboard interrupt.' except Exception, e: print 'Exiting due to: ', e raise self.listening.close() def handle_command(self, command): """Takes input from clients, and calls the appropriate sub-command. """ parts = command.strip().split() if not parts: return '\n' command = parts[0].upper() args = parts[1:] response = 'unrecognized command "%s"' % command if command == 'LIST': response = self.command_list() elif command == 'ADD': response = self.command_add(args) elif command == 'CANCEL': response = self.command_cancel(args) # etc... return response + '\n' def command_list(self): """Returns a listing of all active tasks. """ listing = [] for task in self.task_list: listing.append(str(task)) return '\n'.join(listing) + '\nOK' def command_add(self, args): # create a new task object and add it to the pool try: ticket, action, component, version, requestor = args except ValueError: return 'Error: wrong number of args: add ' \ ' \nGot: %r' % args try: ticket = int(ticket.strip('#')) except ValueError: return 'Error: invalid ticket number "%s"' % ticket trac_ticket = trac.ticket.Ticket(self.trac_env, ticket) summary = trac_ticket['summary'] new_task = Task(self, ticket, action, component, version, summary, requestor) new_task.find_blockers(self.task_list) self.task_list.append(new_task) # and trigger the worker threads if needed return 'OK' def command_cancel(self, args): try: tasknum = int(args[0]) except ValueError: return 'Error' except IndexError: return 'Error' found = [t for t in self.task_list if t.id == tasknum] if len(found) != 1: return 'Error: Not found' dead = found[0] dead_state = dead.get_state() if dead_state not in ['W', 'P']: return 'Error: Cannot kill task (state %s)' % dead_state self._remove_dependant_tasks(dead) self._remove_task(dead) return 'OK' def _remove_task(self, dead): """Removes the given task from the queue. Removes task as a dependency from any other tasks in the queue. Assumes the task is in the task_list. """ try: self.task_list.remove(dead) except ValueError: self.trac_env.log.error("Task %s not in task_list when asked to remove" % dead) for t in self.task_list: if dead in t.blocked_by: t.blocked_by.remove(dead) def _remove_dependant_tasks(self, task): for t in self.task_list: if task in t.blocked_by: self._remove_dependant_tasks(t) self._remove_task(t) def _update_ticket(self, task): ticket = trac.ticket.Ticket(self.trac_env, task.ticket) if task.results or task.result_comment: for key, value in task.results.items(): ticket[key] = value ticket.save_changes('mergebot', task.result_comment) def main(args): foreground = False if args[0] == '-f': foreground = True args = args[1:] trac_dir = args[0] if not foreground: daemonize() bot = Mergebot(trac_dir) bot.run() def run(): main(sys.argv[1:]) if __name__ == '__main__': run()