quarta-feira, 13 de fevereiro de 2008

Semáforo

Este vai ser um artigo mais leve… =D

Na lista do PythonBrasil, um dos participantes perguntou como controlar a quantidade de threads em um processo.

Uma forma legal de fazer isso é usando semáforos.

Observação: o exemplo neste artigo foi desenvolvido em Python 2.5.


Vamos usar um exemplo: vamos criar um servidor que aceite conexões TCP, peça alguma string, imprima-a num arquivo de log, envie um sinal de sucesso e encerre a conexão.

Podemos começar com um arquivo de configurações. Pode ser kodserv.conf:
[global]
ip = 0.0.0.0
port = 8001
max = 3
logfile = /tmp/kodserv.log


Agora vamos começar nosso executável – kodserv.py, por exemplo – criando um abeçalho com os requerimentos de módulos um procedimento principal:
#!/usr/bin/env python
# coding: utf-8
# Ou a codificação que você usa

from __future__ import with_statement
from ConfigParser import ConfigParser, NoOptionError
from logging import getLogger, FileHandler
from socket import (
AF_INET,
SO_REUSEADDR,
SOCK_STREAM,
SOL_SOCKET,
socket
)
from threading import Semaphore, Thread
import os, sys


def main():
if len(sys.argv) < 2:
print >>sys.stderr, "Use: %s configfile" % sys.argv[0]
sys.exit(1)

configfile = sys.argv[1]

# Lê o arquivo de configurações
c = ConfigParser()
try:
with open(configfile, 'r') as fd:
c.readfp(fd)
except IOError:
print >>sys.stderr, "File %s not found" % configfile
sys.exit(2)

try:
logfile = c.get("global", "logfile").strip()
except NoOptionError:
# Não tem logfile? Usa STDOUT
logfile = "/proc/%d/fd/1" % os.getpid()

try:
ip = c.get("global", "ip").strip()
except NoOptionError:
# Não tem IP? Usa todos os endereços
ip = "0.0.0.0"

try:
port = c.get("global", "port").strip()
except NoOptionError:
# Não tem port? Usa porta 8001/tcp
port = "8001"

try:
# Máximo de conexões simultâneas
max = c.get("global", "max").strip()
except NoOptionError:
# Não tem max? Vamos aceitar 1000 conexões
max = "1000"

# Endereço
addr = (ip, int(port))

# Cria objeto de log
logger = getLogger()
logger.addHandler(FileHandler(logfile))

# Cria semáforo: o cara da contagem
sem = Semaphore(int(max))

# Cria aplicação principal e inicia
app = AppServ(logger=logger, addr=addr, semaphore=sem)
app.loop()

# Encerra o logger
for handler in logger.handlers:
handler.close()

# Fim!
sys.exit(0)


Agora precisamos criar as classes.

Aplicação principal


A classe principal será AppServ. Seu construtor deve receber o logger, o endereço e o semáforo, criar o socket servidor e armazenar tudo. O método loop() será responsável pelo ciclo de conexões.

O destruidor (__del__()) será responsável por fechar o socket:
class AppServ:
def __init__(self, addr, logger, semaphore):
# Armazenamento dos objetos
self.skt = self.createSocket(addr)
self.logger = logger
self.sem = semaphore

def __del__(self):
# Destruidor
try:
self.skt.close()
except Exception, err:
self.logger.error(err.message)

def createSocket(self, addr):
# Cria o socket
skt = socket(AF_INET, SOCK_STREAM)
skt.setsockopt(SOL_SOCKET, SO_REUSEADDR, True)

# Associa socket ao endereço e prepara socket para ouvir
skt.bind(addr)
skt.listen(1)

# Retorna socket
return skt

def loop(self):
while True:
try:
conn, addr = self.skt.accept()
# Classe que tratará cada conexão
DealConn(
conn=conn,
logger=self.logger,
semaphore=self.sem
).start()
except KeyboardInterrupt:
# Sai com C-c
break


Lidando com as conexões


Precisamos agora criar a classe DealConn para lidar com as conexões recebidas.

Essa classe precisa ser filha de Thread e receber a conexão, o logger e o semáforo. É agora que vamos usar esse útimo!

O semáforo basicamente faz uma contagem regressiva a cada chamada do método acquire() a partir do valor informado no construtor. Caso a contagem chegue a zero, ele trava a execução do fluxo até que alguma thread chame o método release().

O método release() reincrementa a contagem:
class DealConn(Thread):
def __init__(self, conn, logger, semaphore):
# Chama construtor pai
Thread.__init__(self)

# Avisa ao semáforo que foi criado e armazena os objetos
semaphore.acquire()
self.sem = semaphore
self.conn = conn
self.logger = logger

def __del__(self):
# Destruidor encerra socket e libera semáforo
self.conn.close()
self.sem.release()

def run(self):
# Método chamado na thread criada
conn = self.conn

# Envia prompt e aguarda string arbitrária de até
# 1024 caracteres
conn.send("> ")
data = conn.recv(1024).strip()

# Armazena o que foi recebido
msg = "%s:%d sends: %s" % (conn.getpeername() + (data,))
self.logger.warn(msg)

# Avisa da saída
conn.send("Ciau\r\n")


Fazendo funcionar


Só falta agora fazer funcionar!
if __name__ == "__main__":
main()


Se estiver usando algum sistema operacional parecido com Unix, não esqueça de tornar o arquivo executável:
$ chmod +x kodserv.py


Agora execute seu servidor, elimine possíveis erros e tente conectar-se à porta 8001/tcp de localhost. Veja que os logs serão gerados no arquivo informado pela opção logfile do arquivo de configuração, kodserv.conf.

Tente fazer mais de três conexões simultâneas. =D

Depois brinque com o arquivo de configuração e boa diversão! Dúvidas e sugestões nos comentários, por favor.

[]'s
Cacilhas, La Batalema
blog comments powered by Disqus