O desenvolvimento através do Apache Beam é realizado por meio de diferentes SDKs (Software Development Kits). Cada SDK geralmente suporta uma diferente linguagem de programação. Uma das linguagens suportados e a linguagem utilizada no exemplo é o Python.
Frequência de letras em um texto
O código abaixo mostra o código de uma pipeline desenvolvida em Python SDK que lê um arquivo de texto e calcula a frequência das letras no texto:
from __future__ import print_function
from string import ascii_lowercase
import apache_beam as beam
class CalculateFrequency(beam.DoFn):
def process(self, element, total_characters):
letter, counts = element
yield letter, '{:.2%}'.format(counts / float(total_characters))
def run():
with beam.Pipeline() as p:
letters = (p | beam.io.ReadFromText('romeojuliet.txt')
| beam.FlatMap(lambda line: (ch for ch in line.lower() if ch
in
ascii_lowercase))
| beam.Map(lambda x: (x, 1)))
counts = (letters | beam.CombinePerKey(sum))
total_characters = (letters | beam.MapTuple(lambda x, y: y)
total_characters = (letters | beam.MapTuple(lambda x, y: y)
(counts | beam.ParDo(CalculateFrequency(),
beam.pvalue.AsSingleton(total_characters))
| beam.Map(lambda x: print(x)))
if __name__ == '__main__':
run()
Passo a passo
letters = (p | beam.io.ReadFromText('romeojuliet.txt')
Em primeiro lugar, especifica-se a origem dos dados. A transformação ReadFromText retorna uma PCollection contendo todas as linhas do arquivo 'romeojuliet.txt'.
| beam.FlatMap(lambda line: (ch for ch in line.lower() if ch
in ascii_lowercase))
Nesse passo, processa-se todas as linhas e emite-se letras minúsculas do alfabeto inglês, cada uma como elemento único.
| beam.Map(lambda x: (x, 1)))
Para cada letra retorna-se uma sequência contendo uma letra e 1.
counts = (letters | beam.CombinePerKey(sum))
Esse passo pega todos os pares que possuem a mesma chave que, para esse exemplo, é a mesma letra, e calcula a soma de 1. Os resultados são retornados para a PCollection counts.
total_characters = (letters | beam.MapTuple(lambda x, y: y)
| beam.CombineGlobally(sum))
CombineGlobally junta todos os elementos do PCollection input e utiliza como argumento sum. Como sum - que é nativo do Python - aceita apenas inteiros, deve-se abandonar a primeira parte da sequência.
(counts | beam.ParDo(CalculateFrequency(),
beam.pvalue.AsSingleton(total_characters))
| beam.Map(lambda x: print(x)))
Agora é possível calcular a frequência. Nossa transformação pega duas PCollections - counts e total_characters. Para cada gravação de count ele simplesmente divide count por total_characters. Os resultados são mostrados na tela:
(u'n', '6.19%')
(u'o', '8.20%')
(u'l', '4.58%')
(u'm', '3.29%')
(u'j', '0.27%')
(u'k', '0.81%')
(u'h', '6.60%')
(u'i', '6.42%')
(u'f', '2.00%')
(u'g', '1.77%')
(u'd', '3.74%')
(u'e', '11.89%')
(u'b', '1.66%')
(u'c', '2.05%')
(u'a', '7.78%')
(u'z', '0.03%')
(u'x', '0.13%')
(u'y', '2.50%')
(u'v', '1.01%')
(u'w', '2.47%')
(u't', '9.12%')
(u'u', '3.42%')
(u'r', '6.20%')
(u's', '6.33%')
(u'p', '1.46%')
(u'q', '0.06%')