Apache Kafka é um serviço de mensagens de grande throughput e baixa latência para publicar mensagens em tempo real.
Softwares que foram utilizados:
- Oracle Database 12c
- Oracle Goldengate 12c para Base de Dados Oracle
- Oracle Goldengate 12c para Big Data (Big Data Adapters)
- Apache Kafka 2.11
Configurações do processo de replicação:
1) Processo de extração (extract):
Configuração de um processo simples de extract (com o SPECIALRUN), mas em um cenário de CDC, é questão de configurar o processo de extração como tal, e sugiro que se utilize o modo Integrated Extract.
Neste teste, o extract, foi configurado da seguinte forma:
GGSCI (ogg.m04m.com) 2> add schematrandata kafka
GGSCI (ogg.m04m.com) 3> add extract ikafka sourceistable
GGSCI (ogg.m04m.com) 4> view params ikafka
EXTRACT ikafka
USERID ogg, Password ogg
RMTHOST ogg.m04m.com, MGRPORT 7810
RMTFILE /u01/app/oracle/product/12.1.0/oggbdhome_1/dirdat/il, MEGABYTES 2, PURGE
TABLE kafka.*;GGSCI (ogg.m04m.com) 5> start extract ikafka
2) Adicionar novo tópico ao Apache Kafka:
Neste exemplo, foi criado o tópico oggtopic para receber os dados da replicação:
bin/kafka-topics.sh –zookeeper ogg:2181 –create –topic oggtopic –partitions 1 –replication-factor 1
3) Processo de réplica (replicat):
Preparação dos ficheiros de configuração necessários, neste exemplo, o Kafka irá receber os dados em formato json:
cp $OGG_HOME/AdapterExamples/big-data/kafka/* $OGG_HOME/dirprm/
Edit dirprm/custom_kafka_producer.properties file:bootstrap.servers=ogg:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size=102400
linger.ms=10000
Edit dirprm/kafka.props file (flume):gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.TopicName =oggtopic
gg.handler.kafkahandler.format =json
gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=falsegg.handler.kafkahandler.mode =tx
#gg.handler.kafkahandler.maxGroupSize =100, 1Mb
#gg.handler.kafkahandler.minGroupSize =50, 500Kbgoldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUEgg.log=log4j
gg.log.level=INFOgg.report.time=30sec
gg.classpath=dirprm/:/u02/kafka/kafka/libs/*:
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
Edit dirprm/irkafka.prm parameter file:
REPLICAT irkafka
END RUNTIME
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP kafka.*, TARGET kafka.*;
No shell do Goldengate:
GGSCI (ogg.m04m.com) 1> add replicat irkafka, specialrun
GGSCI (ogg.m04m.com) 2> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT STOPPED IRKAFKA 00:00:00 00:38:24GGSCI (ogg.m04m.com) 3> start replicat irkafka
4) Validação Kafka e Logs:
Com o Kafka Tool (http://www.kafkatool.com), podemos ver as mensagens que a entrar no Kafka:
Se usarmos uma tool para converter o Hexadecimal para algo legível, podemos perceber a estrutura do json:
Esta é a estrutura json com os dados replicados e informações adicionais do OGG (em azul) + os dados da tabela (em verde):
{“table”:”KAFKA.TABLE2″,“op_type”:”I”,”op_ts”:”2016-10-03 20:50:17.266570″,”current_ts”:”2016-10-03T21:50:56.485000″,”pos”:”00000000000000002044″,”after”:{“A”:”World”,”B”:”3″}}