- 資訊首頁(yè) > 開(kāi)發(fā)技術(shù) > 編程語(yǔ)言 >
- spring kafka中怎么批量給topic加前綴
spring kafka中怎么批量給topic加前綴,很多新手對此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細講解,有這方面需求的人可以來(lái)學(xué)習下,希望你能有所收獲。
可以通過(guò)生產(chǎn)者攔截器,來(lái)給topic加前綴
a、編寫(xiě)一個(gè)生產(chǎn)者攔截器
@Slf4j public class KafkaProducerInterceptor implements ProducerInterceptor<String, MessageDTO> { /** * 運行在用戶(hù)主線(xiàn)程中,在消息被序列化之前調用 * @param record * @return */ @Override public ProducerRecord<String, MessageDTO> onSend(ProducerRecord<String, MessageDTO> record) { log.info("原始topic:{}",record.topic()); return new ProducerRecord<String, MessageDTO>(TOPIC_KEY_PREFIX + record.topic(), record.partition(),record.timestamp(),record.key(), record.value()); } /** * 在消息被應答之前或者消息發(fā)送失敗時(shí)調用,通常在producer回調邏輯觸發(fā)之前,運行在produer的io線(xiàn)程中 * @param metadata * @param exception */ @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { log.info("實(shí)際topic:{}",metadata.topic()); } /** * 清理工作 */ @Override public void close() { } /** * 初始化工作 * @param configs */ @Override public void configure(Map<String, ?> configs) { }
b、配置攔截器
kafka: producer: # 生產(chǎn)者攔截器配置 properties: interceptor.classes: com.github.lybgeek.kafka.producer.interceptor.KafkaProducerInterceptor
c、測試
這個(gè)就稍微有點(diǎn)難搞了,因為業(yè)務(wù)開(kāi)發(fā)部門(mén)他們是直接用@KafkaListener的注解,形如下
@KafkaListener(id = "msgId",topics = {Constant.TOPIC})
像這種也沒(méi)啥好的辦法,就只能通過(guò)源碼了,通過(guò)源碼可以發(fā)現在如下地方
KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization
會(huì )把@KafkaListener的值賦值給消費者,如果對spring有了解的朋友,可能會(huì )知道postProcessAfterInitialization是spring后置處理器的方法,主要用來(lái)bean初始化后的一些操作,既然我們知道@KafkaListener會(huì )在bean初始化后再進(jìn)行賦值,那我們就可以在bean初始化前,修改掉@KafkaListener的值。具體實(shí)現如下
@Component public class KafkaListenerFactoryBeanPostProcesser implements BeanFactoryPostProcessor { @SneakyThrows @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { List<String> packageNames = AutoConfigurationPackages.get(beanFactory); for (String packageName : packageNames) { Reflections reflections = new Reflections(new ConfigurationBuilder() .forPackages(packageName) // 指定路徑URL .addScanners(new SubTypesScanner()) // 添加子類(lèi)掃描工具 .addScanners(new FieldAnnotationsScanner()) // 添加 屬性注解掃描工具 .addScanners(new MethodAnnotationsScanner() ) // 添加 方法注解掃描工具 .addScanners(new MethodParameterScanner() ) // 添加方法參數掃描工具 ); Set<Method> methodSet = reflections.getMethodsAnnotatedWith(KafkaListener.class); if(!CollectionUtils.isEmpty(methodSet)){ for (Method method : methodSet) { KafkaListener kafkaListener = method.getAnnotation(KafkaListener.class); changeTopics(kafkaListener); } } } } private void changeTopics(KafkaListener kafkaListener) throws Exception{ InvocationHandler invocationHandler = Proxy.getInvocationHandler(kafkaListener); Field memberValuesField = invocationHandler.getClass().getDeclaredField("memberValues"); memberValuesField.setAccessible(true); Map<String,Object> memberValues = (Map<String,Object>)memberValuesField.get(invocationHandler); String[] topics = (String[])memberValues.get("topics"); System.out.println("修改前topics:" + Lists.newArrayList(topics)); for (int i = 0; i < topics.length; i++) { topics[i] = Constant.TOPIC_KEY_PREFIX + topics[i]; } memberValues.put("topics", topics); System.out.println("修改后topics:" + Lists.newArrayList(kafkaListener.topics())); } }
測試
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自本網(wǎng)站內容采集于網(wǎng)絡(luò )互聯(lián)網(wǎng)轉載等其它媒體和分享為主,內容觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如侵犯了原作者的版權,請告知一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容,聯(lián)系我們QQ:712375056,同時(shí)歡迎投稿傳遞力量。
Copyright ? 2009-2022 56dr.com. All Rights Reserved. 特網(wǎng)科技 特網(wǎng)云 版權所有 特網(wǎng)科技 粵ICP備16109289號
域名注冊服務(wù)機構:阿里云計算有限公司(萬(wàn)網(wǎng)) 域名服務(wù)機構:煙臺帝思普網(wǎng)絡(luò )科技有限公司(DNSPod) CDN服務(wù):阿里云計算有限公司 百度云 中國互聯(lián)網(wǎng)舉報中心 增值電信業(yè)務(wù)經(jīng)營(yíng)許可證B2
建議您使用Chrome、Firefox、Edge、IE10及以上版本和360等主流瀏覽器瀏覽本網(wǎng)站