Replicação para Apache Kafka via OGG

Apache Kafka é um serviço de mensagens de grande throughput e baixa latência para publicar mensagens em tempo real. kafka_logo

Softwares que foram utilizados:

  • Oracle Database 12c
  • Oracle Goldengate 12c para Base de Dados Oracle
  • Oracle Goldengate 12c para Big Data (Big Data Adapters)
  • Apache Kafka 2.11

Configurações do processo de replicação:

1) Processo de extração (extract):

Configuração de um processo simples de extract (com o SPECIALRUN), mas em um cenário de CDC, é questão de configurar o processo de extração como tal, e sugiro que se utilize o modo Integrated Extract.

Neste teste, o extract, foi configurado da seguinte forma:

GGSCI (ogg.m04m.com) 2> add schematrandata kafka

GGSCI (ogg.m04m.com) 3> add extract ikafka  sourceistable

GGSCI (ogg.m04m.com) 4> view params ikafka

EXTRACT ikafka
USERID ogg, Password ogg
RMTHOST ogg.m04m.com, MGRPORT 7810
RMTFILE /u01/app/oracle/product/12.1.0/oggbdhome_1/dirdat/il, MEGABYTES 2, PURGE
TABLE kafka.*;

GGSCI (ogg.m04m.com) 5> start extract ikafka

2) Adicionar novo tópico ao Apache Kafka:

Neste exemplo, foi criado o tópico oggtopic para receber os dados da replicação:

bin/kafka-topics.sh –zookeeper ogg:2181 –create –topic oggtopic –partitions 1 –replication-factor 1

3) Processo de réplica (replicat):

Preparação dos ficheiros de configuração necessários, neste exemplo, o Kafka irá receber os dados em formato json:

cp $OGG_HOME/AdapterExamples/big-data/kafka/* $OGG_HOME/dirprm/
Edit dirprm/custom_kafka_producer.properties file:

bootstrap.servers=ogg:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size=102400
linger.ms=10000
Edit dirprm/kafka.props file (flume):

gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.TopicName =oggtopic
gg.handler.kafkahandler.format =json
gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false

gg.handler.kafkahandler.mode =tx
#gg.handler.kafkahandler.maxGroupSize =100, 1Mb
#gg.handler.kafkahandler.minGroupSize =50, 500Kb

goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE

gg.log=log4j
gg.log.level=INFO

gg.report.time=30sec

gg.classpath=dirprm/:/u02/kafka/kafka/libs/*:

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

Edit dirprm/irkafka.prm parameter file:
REPLICAT irkafka
END RUNTIME
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP kafka.*, TARGET kafka.*;

 

No shell do Goldengate:

GGSCI (ogg.m04m.com) 1> add replicat irkafka, specialrun

GGSCI (ogg.m04m.com) 2> info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING
REPLICAT STOPPED IRKAFKA 00:00:00 00:38:24

GGSCI (ogg.m04m.com) 3> start replicat irkafka

4) Validação Kafka e Logs:

Com o Kafka Tool (http://www.kafkatool.com), podemos ver as mensagens que a entrar no Kafka:

screen-shot-2016-10-03-at-21-51-34

Se usarmos uma tool para converter o Hexadecimal para algo legível, podemos perceber a estrutura do json:

screen-shot-2016-10-03-at-21-52-15

Esta é a estrutura json com os dados replicados e informações adicionais do OGG (em azul)  + os dados da tabela (em verde):

{“table”:”KAFKA.TABLE2″,“op_type”:”I”,”op_ts”:”2016-10-03 20:50:17.266570″,”current_ts”:”2016-10-03T21:50:56.485000″,”pos”:”00000000000000002044″,”after”:{“A”:”World”,”B”:”3″}}

 

Deixe um comentário

Preencha os seus dados abaixo ou clique em um ícone para log in:

Logotipo do WordPress.com

Você está comentando utilizando sua conta WordPress.com. Sair / Alterar )

Imagem do Twitter

Você está comentando utilizando sua conta Twitter. Sair / Alterar )

Foto do Facebook

Você está comentando utilizando sua conta Facebook. Sair / Alterar )

Foto do Google+

Você está comentando utilizando sua conta Google+. Sair / Alterar )

Conectando a %s