source: mergebot/trunk/mergebot/mergebotdaemon.py @ 17

Last change on this file since 17 was 17, checked in by retracile, 16 years ago

Mergebot: redesigned implementation. Still has rough edges.

File size: 13.7 KB
Line 
1#!/usr/bin/python
2"""Daemon that performs mergebot operations and manages the associated work.
3"""
4
5import sys
6import socket
7import select
8import os
9import time
10import trac
11import base64
12
13from threading import Thread
14
15from mergebot.BranchActor import BranchActor
16from mergebot.RebranchActor import RebranchActor
17from mergebot.CheckMergeActor import CheckMergeActor
18from mergebot.MergeActor import MergeActor
19
20from mergebot.daemonize import daemonize
21
22
23class Task(object):
24    """Represents a task in the queue with its dependencies.
25    """
26    task_id = 0
27    task_name_to_actor = {
28        'merge': MergeActor,
29        'branch': BranchActor,
30        'rebranch': RebranchActor,
31        'checkmerge': CheckMergeActor,
32    }
33    def __init__(self, master, ticket, action, component, version, summary,
34                 requestor):
35        self.id = Task.task_id = Task.task_id + 1
36        self.master = master
37        self.action = action.lower()
38        if action not in Task.task_name_to_actor.keys():
39            raise Exception('Invalid task type %s' % action)
40        self.ticket = ticket
41        self.component = component
42        self.version = version
43        self.summary = summary
44        self.requestor = requestor
45        self.blocked_by = []
46        self._queued = False
47        self._running = False
48        self._completed = False
49
50    def find_blockers(self, tasks):
51        """Adds existing tasks that block this task to this task's blocked_by
52        list.
53        """
54        for task in tasks:
55            # Tasks for different components are completely independent
56            if task.component == self.component:
57                is_blocked = False
58                # if self.version is a ticket ID, then any action other than
59                # 'checkmerge' on the parent ticket blocks this action
60                if self.version.startswith('#'):
61                    parent_ticket = int(self.version.lstrip('#'))
62                    if task.ticket == parent_ticket and task.action != 'checkmerge':
63                        is_blocked = True
64                # Any action other than checkmerge on a child ticket of ours blocks us
65                if task.version.startswith('#'):
66                    parent_ticket = int(task.version.lstrip('#'))
67                    if self.ticket == parent_ticket and task.action != 'checkmerge':
68                        is_blocked = True
69                # If (re)branching, then blocked by other tickets that are merging _to_ self.version
70                if self.action == 'branch' or self.action == 'rebranch':
71                    if task.action == 'merge' and task.version == self.version:
72                        is_blocked = True
73                # If there is another queued operation for this same ticket,
74                # that task blocks this one
75                if self.ticket == task.ticket:
76                    is_blocked = True
77                if is_blocked:
78                    self.blocked_by.append(task)
79        return len(self.blocked_by)
80
81    def other_task_completed(self, task):
82        """Remove the given task from this task's blocked_by list.
83        """
84        if task in self.blocked_by:
85            self.blocked_by.remove(task)
86        return len(self.blocked_by)
87
88    def queued(self):
89        """Mark this task ask queued to run.
90        """
91        self._queued = True
92
93    def started(self):
94        """Mark this task as running/started.
95        """
96        self._running = True
97
98    def completed(self):
99        """Mark this task as completed/zombie.
100        """
101        self._completed = True
102
103    def get_state(self):
104        """Return a single-character indicator of this task's current state.
105
106        Tasks are:
107            Pending if they are ready to run.
108            Waiting if they are blocked by another task.
109            Running if they are currently being done.
110            Zombie if they have been completed.
111        """
112        if self._completed:
113            state = 'Z'#ombie
114        elif self._running:
115            state = 'R'#unning
116        elif self._queued:
117            state = 'Q'#ueued
118        elif self.blocked_by:
119            state = 'W'#aiting
120        else:
121            state = 'P'#ending
122        return state
123
124    def execute(self):
125        """Performs the actions for this task.
126        """
127        work_dir = os.path.join(self.master.work_dir, 'worker-%s' % self.id)
128        actor = Task.task_name_to_actor[self.action](work_dir,
129            self.master.repo_url, self.master.repo_dir, self.ticket,
130            self.component, self.version, self.summary, self.requestor)
131        self.results, self.result_comment, self.success = actor.execute()
132       
133    def __str__(self):
134        summary = base64.b64encode(self.summary)
135        return ','.join([str(e) for e in (self.id, self.get_state(),
136            self.ticket, self.action, self.component, self.version,
137            self.requestor, summary)])
138
139
140class Worker(object):
141    """Thread to do the work for an operation; has a work area it is
142    responsible for.
143    """
144    def __init__(self, num, work_dir):
145        self.number = num
146        self.work_dir = work_dir
147        self.task = None
148        self.inbox_read, self.inbox_write = os.pipe()
149        self.notifier_read, self.notifier_write = os.pipe()
150        self.thread = Thread(target=self.work)
151        self.thread.setDaemon(True)
152        self.thread.start()
153
154    def queue(self, task):
155        task.queued()
156        self.task = task
157        os.write(self.inbox_write, 'q')
158
159    def _dequeue(self):
160        os.read(self.inbox_read, 1)
161
162    def _completed(self):
163        self.task.completed()
164        os.write(self.notifier_write, 'd')
165
166    def ack_complete(self):
167        os.read(self.notifier_read, 1)
168        return self.task
169
170    def notifier(self):
171        return self.notifier_read
172
173    def work(self):
174        while True:
175            # get a task -- blocking read on pipe?
176            self._dequeue()
177            # do the task
178            log_filename = os.path.join(self.work_dir, 'worker.%s' % self.number)
179            open(log_filename, 'a').write(str(self.task) + ' started %s\n' % time.time())
180            self.task.started()
181            self.task.execute()
182            open(log_filename, 'a').write(str(self.task) + ' completed %s\n' % time.time())
183            # notify master of completion -- write to pipe?
184            self._completed()
185
186
187class Mergebot(object):
188    # Maybe I should just pass this the trac environment dir and have it create
189    # an environment, then pull config info from that.
190    def __init__(self, trac_dir):
191        self.trac_dir = os.path.abspath(trac_dir)
192        self.trac_env = trac.env.open_environment(self.trac_dir)
193        config = self.trac_env.config
194        self.listen_on = (config.get('mergebot', 'listen.ip'),
195                          config.getint('mergebot', 'listen.port'))
196        self.work_dir = config.get('mergebot', 'work_dir')
197        if not os.path.isabs(self.work_dir):
198            self.work_dir = os.path.join(self.trac_dir, self.work_dir)
199        self.repo_url = config.get('mergebot', 'repository_url')
200        repo_dir = config.get('trac', 'repository_dir')
201        if not os.path.isabs(repo_dir):
202            repo_dir = os.path.join(self.trac_dir, repo_dir)
203        self.repo_dir = repo_dir
204
205        self.listening = None
206        self.worker_count = config.getint('mergebot', 'worker_count')
207        self.task_list = []
208
209    def run(self):
210        """Run forever, handling requests.
211        """
212        self.listening = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
213        self.listening.bind(self.listen_on)
214        self.listening.listen(10)
215        open_sockets = []
216        workers = [Worker(i, work_dir=self.work_dir) for i in range(self.worker_count)]
217        active = []
218        try:
219            while True:
220                fds = [self.listening] + open_sockets + [w.notifier() for w in
221                                                         active]
222                readable, _writeable, _other = select.select(fds, [], [])
223                for s in readable:
224                    if s is self.listening:
225                        new_socket, _address = self.listening.accept()
226                        open_sockets.append(new_socket)
227                    elif s in open_sockets:
228                        data = s.recv(4096)
229                        for line in data.rstrip('\n').split('\n'):
230                            if line == '':
231                                open_sockets.remove(s)
232                                s.close()
233                            elif line[:4].upper() == 'QUIT':
234                                open_sockets.remove(s)
235                                s.close()
236                            else:
237                                response = self.handle_command(line)
238                                if response:
239                                    s.send(response)
240                                # I think we're going to want to make this a
241                                # single-shot deal
242                                #s.close()
243                                #open_sockets.remove(s)
244                    else:
245                        # Must be an active worker telling us it is done
246                        worker = [w for w in active if w.notifier() == s][0]
247                        task = worker.ack_complete()
248                        active.remove(worker)
249                        workers.insert(0, worker)
250                        # On failure, cancel all tasks that depend on this one.
251                        if not task.success:
252                            self._remove_dependant_tasks(task)
253                        self._remove_task(task)
254                        self._update_ticket(task)
255                # TODO: need to handle connections that the other end
256                # terminates?
257
258                # Assign a task to a worker
259                available_workers = list(workers)
260                pending = [t for t in self.task_list if t.get_state() == 'P']
261                for w, t in zip(available_workers, pending):
262                    w.queue(t)
263                    workers.remove(w)
264                    active.append(w)
265
266        except KeyboardInterrupt:
267            print 'Exiting due to keyboard interrupt.'
268        except Exception, e:
269            print 'Exiting due to: ', e
270            raise
271        self.listening.close()
272
273    def handle_command(self, command):
274        """Takes input from clients, and calls the appropriate sub-command.
275        """
276        parts = command.strip().split()
277        if not parts:
278            return '\n'
279        command = parts[0].upper()
280        args = parts[1:]
281
282        response = 'unrecognized command "%s"' % command
283        if command == 'LIST':
284            response = self.command_list()
285        elif command == 'ADD':
286            response = self.command_add(args)
287        elif command == 'CANCEL':
288            response = self.command_cancel(args)
289        # etc...
290        return response + '\n'
291
292    def command_list(self):
293        """Returns a listing of all active tasks.
294        """
295        listing = []
296        for task in self.task_list:
297            listing.append(str(task))
298        return '\n'.join(listing) + '\nOK'
299
300    def command_add(self, args):
301        # create a new task object and add it to the pool
302        try:
303            ticket, action, component, version, requestor = args
304        except ValueError:
305            return 'Error: wrong number of args: add <ticket> <action> ' \
306                '<component> <version> <requestor>\nGot: %r' % args
307        try:
308            ticket = int(ticket.strip('#'))
309        except ValueError:
310            return 'Error: invalid ticket number "%s"' % ticket
311
312        trac_ticket = trac.ticket.Ticket(self.trac_env, ticket)       
313        summary = trac_ticket['summary']
314        new_task = Task(self, ticket, action, component, version, summary,
315                        requestor)
316        new_task.find_blockers(self.task_list)
317        self.task_list.append(new_task)
318        # and trigger the worker threads if needed
319        return 'OK'
320
321    def command_cancel(self, args):
322        try:
323            tasknum = int(args[0])
324        except ValueError:
325            return 'Error'
326        except IndexError:
327            return 'Error'
328        found = [t for t in self.task_list if t.id == tasknum]
329        if len(found) != 1:
330            return 'Error: Not found'
331        dead = found[0]
332        dead_state = dead.get_state()
333        if dead_state not in ['W', 'P']:
334            return 'Error: Cannot kill task (state %s)' % dead_state
335        self._remove_dependant_tasks(dead)
336        self._remove_task(dead)
337        return 'OK'
338
339    def _remove_task(self, dead):
340        """Removes the given task from the queue.
341        Removes task as a dependency from any other tasks in the queue.
342        Assumes the task is in the task_list.
343        """
344        try:
345            self.task_list.remove(dead)
346        except ValueError:
347            self.trac_env.log.error("Task %s not in task_list when asked to remove" % dead)
348        for t in self.task_list:
349            if dead in t.blocked_by:
350                t.blocked_by.remove(dead)
351
352    def _remove_dependant_tasks(self, task):
353        for t in self.task_list:
354            if task in t.blocked_by:
355                self._remove_dependant_tasks(t)
356                self._remove_task(t)
357
358    def _update_ticket(self, task):
359        ticket = trac.ticket.Ticket(self.trac_env, task.ticket)       
360        if task.results or task.result_comment:
361            for key, value in task.results.items():
362                ticket[key] = value
363            ticket.save_changes('mergebot', task.result_comment)
364
365
366def main(args):
367    foreground = False
368    if args[0] == '-f':
369        foreground = True
370        args = args[1:]
371    trac_dir = args[0]
372    if not foreground:
373        daemonize()
374    bot = Mergebot(trac_dir)
375    bot.run()
376
377def run():
378    main(sys.argv[1:])
379
380if __name__ == '__main__':
381    run()
Note: See TracBrowser for help on using the repository browser.