PRODUTOR E CONSUMIDOR (BOUNDED BUFFER) - JAVA
1- DEFINIÇÃO:
O problema do produtor e consumidor, também referido como Bounded Buffer Problem (BBP), consiste em um desafio de sincronização de duas ou mais threads (tarefas) concorrentes que têm acesso a um mesmo recurso do programa que está sendo executado. O fato de o recurso estar sendo compartilhado, implica na possibilidade de conflitos entre as threads que disputam pelo controle da CPU para acessar esse mesmo recurso.
No caso do BBP, o recurso disputado é um buffer de memória (um vetor), de tamanho limitado/fixo, daí o motivo do nome. O produtor é o responsável por enfileirar novos jobs ou dados nesse buffer, e o consumidor assume a responsabilidade de extrair esses dados do buffer. Porém o seguinte problema poderá emergir: E se o produtor produzir muito rápido, e o consumidor não for capaz de acompanhá-lo?

Se isso acontecer, o produtor irá descartar todos os novos itens produzidos, até que haja espaço novamente no buffer, ou seja, dados serão perdidos (na prática, o efeito ocorre um pouco diferente, consulte o Apendice ). Mas ainda podemos contornar esse problema, o que discutiremos na seção a seguir.
2- INTERCOMUNICAÇÃO DE THREADS
Há uma solução para controlar o acesso ao Buffer, basta bloquearmos a execução do Produtor enquanto o buffer estiver cheio para reter a produção. O mesmo vale para o Consumidor, quando este se deparar com uma fila vazia deverá ser bloqueado para evitar ócio da CPU. Ambos os threads deverão ser despertados novamente assim que houver trabalho disponível - espaço na fila para o produtor e dados na fila para o consumidor - para ser processado.
Esta última consideração, nos leva a concluir que o Produtor precisará receber um sinal para ser acordado após ter sido bloqueado por falta de espaço no buffer, e este sinal terá que ser enviado pelo Consumidor que, após consumir um dado, liberará um novo espaço no buffer. De modo análogo, o Consumidor bloqueado por falta de dados no buffer, receberá um sinal do Produtor, após este último ter inserido um novo dado no buffer. Esse envio de sinais para controlar o acesso à uma região de memória, recebe o nome de Intercomunicação de Processos (IPC). Porém há ainda a possibilidade de o programa incorrer certas situações inesperadas, como será discutido na seção adiante.

3- DEAD LOCKS
Quando a execução de um programa multi-thread envolve a comunicação entre threads, ou IPC, para controlar o uso de um recurso, chamamos esse recurso de região crítica do programa. Na nossa discussão, o ponto crítico é o buffer de memória. O uso do IPC para acessar regiões críticas deve ser feito com cautela, pois problemas de sincronização podem surgir, como os Dead Locks, que são becos sem saída em que um programa pode entrar.
Como já dito anteriormente, o Consumidor precisa bloquear ao se deparar com uma fila vazia - quantidade de elementos é zero - e aguardar um sinal do Produtor. Se o Consumidor inicia o teste da quantidade de elementos quando a fila está vazia mas perde o controle do programa antes de bloquear, o Produtor que agora assume o controle produz um novo dado no buffer antes vazio e envia um sinal para o Consumidor acordar. Se após o envio do sinal o controle for passado novamente para o Consumidor, este irá ignorar o sinal para despertar, pois ainda não está bloqueado, e irá retomar o bloqueio que havia antes iniciado após verificar que a quantidade de elementos era zero nas suas variáveis salvas durante a troca de contexto. Após o Consumidor ser bloqueado, o controle será passado novamente ao Produtor, que irá produzir o máximo de dados possíveis achando que o Consumidor foi desperto e por fim irá bloquear aguardando um sinal do Consumidor. Ambos ficarão bloqueados e o programa continuará executando em um Dead Lock, sem que qualquer tarefa seja executada.

Isso acontece porque o Escalonador do sistema Operacional pode alocar o controle da CPU de uma forma inesperada. Para superar a barreira do Escalonador e evitar que Dead Locks ocorram, a operação de bloqueio deve ser atômica, ou seja, executada como se representasse uma única instrução para CPU, de modo que o Consumidor não perderá o controle da CPU no meio de um bloqueio, mas somente quando estiver totalmente bloqueado.
4- EXEMPLIFICAÇÃO - ANIMAÇÃO
No vídeo abaixo, toda a discussão das seções anteriores foi exemplificada e sumarizada em uma animação, na mesma ordem em que os assuntos foram abordados:
5- IMPLEMENTAÇÃO
A seguir descreveremos detalhes da implementação. Na primeira subseção abordaremos uma classe de fila criada para representar o buffer, na subseção seguinte criaremos a classe Produtor&Consumidor e por fim a classe de execução do programa responsável por integrar as anteriores.
5.1- FILA
Para implementar a solução do problema, utilizamos a linguagem JAVA com bibliotecas para controle de buffer já disponíveis, porém decidimos implementar também um buffer utilizando a estrutura simples First In First Out (fifo), ou fila, para facilitar o entendimento do problema, com métodos enque() e dequeue() para inserir e extrair da fila, respectivamente. Criamos uma fila que armazena valores inteiros para simplificar a solução:
public class FIFO
{
//VARIAVEIS
private int cabeca, cauda, qtd_elementos; //indices e contador
//cabeca -> primeiro item
//cauda-1 -> ultimo item
private int fila[]; //vetor de elementos
//METODOS
//constructor -- determina capacidade da fila / innicializando fila
public FIFO(int tamanho)
{
this.fila = new int[tamanho]; //cria vetor com capacidade = 'tamanho'
cauda = 0;
cabeca = 0;
qtd_elementos = 0;
}
//Funcao de insecao na fila
public void enqueue(int elemento)
{
//SE HOUVER ESPACO NA FILA
this.fila[this.cauda] = elemento; //insere elemento na cauda
//incrementa indice cauda
if(this.cauda == this.fila.length-1) /*se indice cauda aponta fim do vetor(length-1 pois indice vai de 0 a length-1)*/
this.cauda = 0; //circula e volta para o inicio
else //indice cauda não aponta o fim
this.cauda +=1; //incrementa indice cauda
this.qtd_elementos++; // incrementa qtd de elementos
//elemento inserido
return;
}
//Funcao remocao da fila
public int dequeue()
{
//SE HOUVER ITEM NA FILA
int valor = this.fila[this.cabeca]; //extrai o primeiro da fila
//incrementa indice cabeca
if(this.cabeca == this.fila.length-1) //se indice cabeca aponta fim do vetor(cabeca vai de 0 a length-1)
this.cabeca = 0; //circula e volta pro inicio
else //indice cabeca não aponta o fim
this.cabeca +=1; //incrementa indice cabeca
this.qtd_elementos--; // decrementa qtd de elementos
//elemento retirado
return valor;
}
//Funcao que indica fila cheia
public boolean cheia()
{
if(this.qtd_elementos == this.fila.length)
return true;
else
return false;
}
//Funcao que indica fila vazia
public boolean vazia()
{
if(this.qtd_elementos == 0)
return true;
else
return false;
}
}
5.2- PRODUTOR & CONSUMIDOR
E seguida, implementamos uma classe consumidor e produtor, onde os métodos produce() e consume(), enfileiram e extraem itens da fila, respectivamente. Como sabemos, o buffer/fila, é a região crítica que precisará ser acessada, e por esse motivo, utilizamos dentro dos métodos de produtor e consumidor o método synchronized() com parâmetro "this" que é o objeto chave, de modo que o acesso à fila desse objeto seja atômico e os threads não consigam acessa-la simultaneamente.
Os dois métodos verificam se a fila está vazia ou cheia, e caso aconteça, chamam o método wait() que bloqueia a thread em execução. O produtor e o consumidor também enviam um sinal após a inserção ou remoção de itens através do método notify():
public class ConsumerProducer
{
//VARIAVEIS
FIFO fila; //fila que ser usada como memoria compartilhada entre threads consumidor e produtor
//METODOS
//constructor
public ConsumerProducer(FIFO fila)
{
this.fila = fila; //variavel fila referencia uma fila passada como argumento
}
//produtor
public void produce()
{
int elemento = 0;
while(true)// executa em loop
{
synchronized (this)//acesso atomico à fila
{
while(this.fila.cheia())//se fila cheia
{
//System.out.println("produtor Dormindo");
try
{
wait(); //bloqueia o thread
}
catch (InterruptedException e){}
}
//Se houver espaco na fila
this.fila.enqueue(elemento); //insere elemento
System.out.println("Elemento "+elemento+" produzido");
elemento++;
notify(); //notifica que há elementos na fila
}
}
}
//consumidor
public void consume()
{
int elemento;
while(true)
{
synchronized (this)//inicia operacao atomica
{
while(this.fila.vazia())//se fila vazia
{
//System.out.println("consumidor Dormindo");
try{
wait(); //bloqueia o thread
}
catch (InterruptedException e){}
}
//Se houver elemento na fila
elemento = this.fila.dequeue(); //retira primeiro da fila
System.out.println("Elemento "+elemento+" consumido");
notify(); //notifica que há espaço na fila
}
}
}
}
Toda a operação de inserir na fila ou de retirar da fila é atômica. Isso é necessários por conta da variável qtd_elementos da classe FIFO. Lembre-se da analogia feita na seção 3, sobre os deadLocks. No nosso caso em específico, o Consumidor pode encontrar a fila vazia, perder o controle, e o Produtor poderá produzir todos os itens e bloquear. Depois o Consumidor restaura seu contexto e também bloqueia. Com o uso de synchronized e notify, podemos atualizar a quantidade de elementos de forma atômica.
5.3- EXECUÇÃO
Por fim, no método main() criamos um objeto fila e um consumidor/produtor. O Java nos permite rodar funções de um objeto em diferentes threads através da criação de um objeto Thread que recebe como parâmetro uma interface Runnable. Essa interface nos permite sobrescrever seu método run() com o método que desejamos rodar em uma thread.
Criamos duas Threads, uma para o método consume() e outra para produce(), e em seguida iniciamos a execução de ambas com Thread.start(). O método Thread.join() permite que a função principal abdique do controle da CPU, de modo que as threads possam ser executadas sem interrupção:
public class Programa
{
public static void main(String[] args) throws InterruptedException
{
//OBJETOS
FIFO fila = new FIFO(3); //fila de tamanho 3
ConsumerProducer programa = new ConsumerProducer(fila); /*fila é passado como buffer para novo Objeto ConsumerProducer*/
Thread produtor = new Thread //cria um objeto Thread para o metodo produtor
(
new Runnable() /*Thread recebe interface Runnable como parametro, e roda seu método run como uma thread*/
{
public void run() //Runnable permite sobrescrever seu método run
{
programa.produce(); //metodo produce do objeto ConsumerProducer sobrescreve metodo run
}
}
);
Thread consumidor = new Thread //cria uma thread para o metodo consumidor
(
new Runnable()
{
public void run()
{
programa.consume(); //método consume de ProducerConsumer rodará como thread
}
}
);
//INICIA PROGRAMA
//inicia as threads criadas
produtor.start();
consumidor.start();
//main abdica do controle - aguarda execucao das threads
produtor.join();
consumidor.join();
}
}
Output:
Elemento 0 produzido
Elemento 1 produzido
Elemento 2 produzido
.....
Elemento 3953 consumido
Elemento 3954 consumido
Elemento 3955 consumido
Elemento 3956 produzido
Elemento 3957 produzido
Elemento 3956 consumido
Elemento 3957 consumido
Elemento 3958 produzido
Elemento 3959 produzido
Elemento 3960 produzido
Elemento 3958 consumido
Elemento 3959 consumido
Elemento 3960 consumido
.....
Perceba que, apesar de a nossa fila ter tamanho 3 nesse exemplo, nem sempre serão produzidos 3 elementos de uma só vez. O Escalonador pode passar o controle para a Thread Consumidora a qualquer momento, como destacado na saída acima.
6- CONCLUSÃO
A classe Thread e a interface Runnable da linguagem Java tornam possível a execução de threads paralelamente, de modo que diversas soluções tornem-se possíveis para os casos em que a programação concorrente é necessária. Podemos criar vários threads para um mesmo método, ou para execução métodos distintos, o que permite dar ao usuário do programa a impressão de paralelismo - como quando vemos uma barra de progresso sendo executada concomitantemente com o carregamento de uma informação. A utilidade da programação concorrente é ampla, e além de abranger a interação com o usuário, também exerce um papel importante em outras aplicações que exigem a execução de múltiplas tarefas simultaneamente, como no caso dos sistemas operacionais.
Apesar das vantagens, alguns cuidados precisam ser tomados em relação aos recursos compartilhados pelas aplicações concorrentes, para que não haja perda de informações. É necessário ter cautela com as áreas críticas que devem impor regras de acesso, de maneira a evitar conflitos gerados pela disputa por controle entre as tarefas. Um sistema bancário por exemplo, deve atomizar as operações de rendimento e depósitos, pois se um usuário realizar um depósito, interrompendo uma operação de rendimento, seu depósito poderá ser ignorado após o rendimento ser calculado e registrado mais tarde. Por esse motivo, é importante sincronizar as tarefas e restringir o acesso às variáveis que não são thread safe (passíveis de gerar conflito).
O problema do produtor e consumidor é importante para abordagem dessas características da programação concorrente, que exigem uma solução baseada no uso das ferramentas de gerenciamento de execução de tarefas paralelas.
7- REFERÊNCIAS
Caelum Escola de Tecnologia Cursos Online. Disponível em: <https://www.caelum.com.br/apostila-java-orientacao-objetos/>. Acesso em: 19 jul. 2021.
Producer-Consumer solution using threads in Java - GeeksforGeeks. Disponível em: <https://www.geeksforgeeks.org/producer-consumer-solution-using-threads-java/>. Acesso em: 19 jul. 2021.
TANENBAUM, A. S. Modern operating systems. Fourth edition ed. Boston: Pearson, 2015.
APÊNDICE
Apêndice A - Definição original do problema
Originalmente, o problema do produtor e consumidor reside no fato de que uma operação para realizar o incremento de uma variável, exige que três instruções de máquina sejam utilizadas.
Abaixo estão representações de ambas as funções responsáveis pelo consumo e produção dos itens no buffer:
Produtor:
proximo_produto = produzir();
while(qtd_elementos == n); //bloquear se fila estiver cheia
buffer[cauda] = proximo_produto;
cauda = (cauda+1) % qtd_elementos; //maneira de circular o índice cauda na fila
qtd_elementos++;
Consumidor:
while(qtd_elementos ==0);
proximo_consumo = buffer[cabeca];
cabeca = (cabeca+1)%qtd_elementos;
qtd_elementos++;
consumir(proximo_consumo);
O while fica responsável por manter cada processo aguardando a disponibilidade do buffer, sem a necessidade de bloqueio. Itens são inseridos no fim da fila e consumidos do início da fila, e após cada inserção ou consumo, a variável qtd_elementos é atualizada, e é aí que reside o problema.
Na linguagem Assembly, para a maioria dos sistemas, o comando utilizado para incrementar a variável qtd_elementos é composto de três instruções, como a seguir:
registrador_auxiliar = qtd_elementos
registrador_auxiliar = registrador_auxiliar+1
qtd_elementos = registrador_auxiliar
A variável é incrementada com o uso de um registrador, e portanto são 3 etapas para que o seu valor seja calculado e registrado.
Se o produtor perder o controle antes que a primeira instrução seja executada, nenhum problema ocorrerá. O máximo que pode acontecer é o consumidor tomar controle e pensar que fila ainda está vazia, mas não haverá nenhuma inconsistência externa. O consumidor apenas devolverá o controle para o produtor, que em seguida incrementará a qtd_elementos.
O problema ocorre se o Produtor perder o controle depois da primeira ou segunda instrução. Porque o valor antigo de qtd_elementos será registrado e operado mais tarde, independente de qualquer atualização feita por quem tomou o controle. Exemplo: O produtor produz o segundo item da fila, e executa a primeira instrução, ou seja, salva o valor de qtd_elementos=1. Depois desse momento o consumidor toma o controle e consome o item que já havia no buffer, e decrementa a qtd_elementos para 0. Quando o produtor reassumir o controle, irá incrementar de 1 para 2. Eis uma inconsistência interna (que ocorre no código). A inconsistência externa (que ocorre na saída), é de que o Consumidor irá consumir um item duplicado ou nulo, já que irá iterar uma unidade além do devido (já que qtd_elementos vale uma unidade maior que o devido).
Do lado do consumidor, também são três as instruções de modificação da variável qtd_elementos, porém ela é decrementada. Imagine que existam 2 itens no buffer, e o consumidor consuma um deles, mas antes de decrementar qtd_elementos, ele perde o controle, com qtd_elementos=2 registrado. O produtor assume, produz um elemento e incrementa qtd_elementos=3. O consumidor retoma o controle, e decrementa o valor antigo para qtd_elementos=1, então um item deixou de ser contado. Por esse motivo o produtor produzirá uma unidade além do permitido, e irá sobrescrever algum item.
E por fim, nada impediria que o consumidor registrasse qtd_elementos=1 após consumir o último item, em seguida perdesse o controle e o produtor enchesse o buffer, bloqueando em seguida no while. Ao retomar o controle novamente, o consumidor decrementaria a qtd_elementos=1 para zero, ignorando as atualizações do produtor. E isso desencadearia um deadlock.