- 資訊首頁(yè) > 開(kāi)發(fā)技術(shù) > web開(kāi)發(fā) >
- KOA+egg.js如何集成kafka消息隊列
這篇文章主要為大家展示了“KOA+egg.js如何集成kafka消息隊列”,內容簡(jiǎn)而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習一下“KOA+egg.js如何集成kafka消息隊列”這篇文章吧。
Egg.js : 基于KOA2的企業(yè)級框架
Kafka:高吞吐量的分布式發(fā)布訂閱消息系統
本文章將集成egg + kafka + 的日志系統例子
系統要求:日志記錄,通過(guò)kafka進(jìn)行消息隊列控制
思路圖:
這里消費者和生產(chǎn)者都由日志系統提供
λ.1 環(huán)境準備
①Kafka
官網(wǎng)下載kafka后,解壓
啟動(dòng)zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
啟動(dòng)Kafka server
這里config/server.properties中將num.partitions=5,我們設置5個(gè)partitions
bin/kafka-server-start.sh config/server.properties
② egg + mysql
根據腳手架搭建好egg,再多安裝kafka-node,egg-mysql
mysql 用戶(hù)名root 密碼123456
λ.2 集成
1、根目錄新建app.js,這個(gè)文件在每次項目加載時(shí)候都會(huì )運作
'use strict'; const kafka = require('kafka-node'); module.exports = app => { app.beforeStart(async () => { const ctx = app.createAnonymousContext(); const Producer = kafka.Producer; const client = new kafka.KafkaClient({ kafkaHost: app.config.kafkaHost }); const producer = new Producer(client, app.config.producerConfig); producer.on('error', function(err) { console.error('ERROR: [Producer] ' + err); }); app.producer = producer; const consumer = new kafka.Consumer(client, app.config.consumerTopics, { autoCommit: false, }); consumer.on('message', async function(message) { try { await ctx.service.log.insert(JSON.parse(message.value)); consumer.commit(true, (err, data) => { console.error('commit:', err, data); }); } catch (error) { console.error('ERROR: [GetMessage] ', message, error); } }); consumer.on('error', function(err) { console.error('ERROR: [Consumer] ' + err); }); }); };
上述代碼新建了生產(chǎn)者、消費者。
生產(chǎn)者新建后加載進(jìn)app全局對象。我們將在請求時(shí)候生產(chǎn)消息。這里只是先新建實(shí)例
消費者獲取消息將訪(fǎng)問(wèn)service層的insert方法(數據庫插入數據)。
具體參數可以參考kafka-node官方API,往下看會(huì )有生產(chǎn)者和消費者的配置參數。
2、controller · log.js
這里獲取到了producer,并傳往service層
'use strict'; const Controller = require('egg').Controller; class LogController extends Controller { /** * @description Kafka控制日志信息流 * @host /log/notice * @method POST * @param {Log} log 日志信息 */ async notice() { const producer = this.ctx.app.producer; const Response = new this.ctx.app.Response(); const requestBody = this.ctx.request.body; const backInfo = await this.ctx.service.log.send(producer, requestBody); this.ctx.body = Response.success(backInfo); } } module.exports = LogController;
3、service · log.js
這里有一個(gè)send方法,這里調用了producer.send ,進(jìn)行生產(chǎn)者生產(chǎn)
insert方法則是數據庫插入數據
'use strict'; const Service = require('egg').Service; const uuidv1 = require('uuid/v1'); class LogService extends Service { async send(producer, params) { const payloads = [ { topic: this.ctx.app.config.topic, messages: JSON.stringify(params), }, ]; producer.send(payloads, function(err, data) { console.log('send : ', data); }); return 'success'; } async insert(message) { try { const logDB = this.ctx.app.mysql.get('log'); const ip = this.ctx.ip; const Logs = this.ctx.model.Log.build({ id: uuidv1(), type: message.type || '', level: message.level || 0, operator: message.operator || '', content: message.content || '', ip, user_agent: message.user_agent || '', error_stack: message.error_stack || '', url: message.url || '', request: message.request || '', response: message.response || '', created_at: new Date(), updated_at: new Date(), }); const result = await logDB.insert('logs', Logs.dataValues); if (result.affectedRows === 1) { console.log(`SUCEESS: [Insert ${message.type}]`); } else console.error('ERROR: [Insert DB] ', result); } catch (error) { console.error('ERROR: [Insert] ', message, error); } } } module.exports = LogService;
4、config · config.default.js
一些上述代碼用到的配置參數具體在這里,注這里開(kāi)了5個(gè)partition。
'use strict'; module.exports = appInfo => { const config = (exports = {}); const topic = 'logAction_p5'; // add your config here config.middleware = []; config.security = { csrf: { enable: false, }, }; // mysql database configuration config.mysql = { clients: { basic: { host: 'localhost', port: '3306', user: 'root', password: '123456', database: 'merchants_basic', }, log: { host: 'localhost', port: '3306', user: 'root', password: '123456', database: 'merchants_log', }, }, default: {}, app: true, agent: false, }; // sequelize config config.sequelize = { dialect: 'mysql', database: 'merchants_log', host: 'localhost', port: '3306', username: 'root', password: '123456', dialectOptions: { requestTimeout: 999999, }, pool: { acquire: 999999, }, }; // kafka config config.kafkaHost = 'localhost:9092'; config.topic = topic; config.producerConfig = { // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0 partitionerType: 1, }; config.consumerTopics = [ { topic, partition: 0 }, { topic, partition: 1 }, { topic, partition: 2 }, { topic, partition: 3 }, { topic, partition: 4 }, ]; return config; };
5、實(shí)體類(lèi):
mode · log.js
這里使用了 Sequelize
'use strict'; module.exports = app => { const { STRING, INTEGER, DATE, TEXT } = app.Sequelize; const Log = app.model.define('log', { /** * UUID */ id: { type: STRING(36), primaryKey: true }, /** * 日志類(lèi)型 */ type: STRING(100), /** * 優(yōu)先等級(數字越高,優(yōu)先級越高) */ level: INTEGER, /** * 操作者 */ operator: STRING(50), /** * 日志內容 */ content: TEXT, /** * IP */ ip: STRING(36), /** * 當前用戶(hù)代理信息 */ user_agent: STRING(150), /** * 錯誤堆棧 */ error_stack: TEXT, /** * URL */ url: STRING(255), /** * 請求對象 */ request: TEXT, /** * 響應對象 */ response: TEXT, /** * 創(chuàng )建時(shí)間 */ created_at: DATE, /** * 更新時(shí)間 */ updated_at: DATE, }); return Log; };
6、測試Python腳本:
import requests from multiprocessing import Pool from threading import Thread from multiprocessing import Process def loop(): t = 1000 while t: url = "http://localhost:7001/log/notice" payload = "{\n\t\"type\": \"ERROR\",\n\t\"level\": 1,\n\t\"content\": \"URL send ERROR\",\n\t\"operator\": \"Knove\"\n}" headers = { 'Content-Type': "application/json", 'Cache-Control': "no-cache" } response = requests.request("POST", url, data=payload, headers=headers) print(response.text) if __name__ == '__main__': for i in range(10): t = Thread(target=loop) t.start()
7、建表語(yǔ)句:
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for logs -- ---------------------------- DROP TABLE IF EXISTS `logs`; CREATE TABLE `logs` ( `id` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL, `type` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '日志類(lèi)型', `level` int(11) NULL DEFAULT NULL COMMENT '優(yōu)先等級(數字越高,優(yōu)先級越高)', `operator` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '操作人', `content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '日志信息', `ip` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT 'IP\r\nIP', `user_agent` varchar(150) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '當前用戶(hù)代理信息', `error_stack` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '錯誤堆棧', `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '當前URL', `request` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '請求對象', `response` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '響應對象', `created_at` datetime(0) NULL DEFAULT NULL COMMENT '創(chuàng )建時(shí)間', `updated_at` datetime(0) NULL DEFAULT NULL COMMENT '更新時(shí)間', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1;
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自互聯(lián)網(wǎng)轉載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權請聯(lián)系站長(cháng)郵箱:ts@56dr.com進(jìn)行舉報,并提供相關(guān)證據,一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容。
Copyright ? 2009-2021 56dr.com. All Rights Reserved. 特網(wǎng)科技 版權所有 珠海市特網(wǎng)科技有限公司 粵ICP備16109289號
域名注冊服務(wù)機構:阿里云計算有限公司(萬(wàn)網(wǎng)) 域名服務(wù)機構:煙臺帝思普網(wǎng)絡(luò )科技有限公司(DNSPod) CDN服務(wù):阿里云計算有限公司 中國互聯(lián)網(wǎng)舉報中心 增值電信業(yè)務(wù)經(jīng)營(yíng)許可證B2 建議您使用Chrome、Firefox、Edge、IE10及以上版本和360等主流瀏覽器瀏覽本網(wǎng)站