国产成人精品18p,天天干成人网,无码专区狠狠躁天天躁,美女脱精光隐私扒开免费观看

spring kafka中怎么批量給topic加前綴

發(fā)布時(shí)間:2021-07-27 11:45 來(lái)源:億速云 閱讀:0 作者:Leah 欄目: 編程語(yǔ)言 歡迎投稿:712375056

spring kafka中怎么批量給topic加前綴,很多新手對此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細講解,有這方面需求的人可以來(lái)學(xué)習下,希望你能有所收獲。

實(shí)現思路

1、生產(chǎn)者端

可以通過(guò)生產(chǎn)者攔截器,來(lái)給topic加前綴

2、實(shí)現步驟

a、編寫(xiě)一個(gè)生產(chǎn)者攔截器

@Slf4j
public class KafkaProducerInterceptor implements ProducerInterceptor<String, MessageDTO> {



    /**
     * 運行在用戶(hù)主線(xiàn)程中,在消息被序列化之前調用
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<String, MessageDTO> onSend(ProducerRecord<String, MessageDTO> record) {
        log.info("原始topic:{}",record.topic());
        return new ProducerRecord<String, MessageDTO>(TOPIC_KEY_PREFIX + record.topic(),
                record.partition(),record.timestamp(),record.key(), record.value());
    }




    /**
     * 在消息被應答之前或者消息發(fā)送失敗時(shí)調用,通常在producer回調邏輯觸發(fā)之前,運行在produer的io線(xiàn)程中
     * @param metadata
     * @param exception
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
      log.info("實(shí)際topic:{}",metadata.topic());
    }


    /**
     *  清理工作
     */
    @Override
    public void close() {
    }


    /**
     * 初始化工作
     * @param configs
     */
    @Override
    public void configure(Map<String, ?> configs) {

    }

b、配置攔截器

kafka:
    producer:
      # 生產(chǎn)者攔截器配置
      properties:
        interceptor.classes: com.github.lybgeek.kafka.producer.interceptor.KafkaProducerInterceptor

c、測試

2、消費者端

這個(gè)就稍微有點(diǎn)難搞了,因為業(yè)務(wù)開(kāi)發(fā)部門(mén)他們是直接用@KafkaListener的注解,形如下

 @KafkaListener(id = "msgId",topics = {Constant.TOPIC})

像這種也沒(méi)啥好的辦法,就只能通過(guò)源碼了,通過(guò)源碼可以發(fā)現在如下地方

KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization

會(huì )把@KafkaListener的值賦值給消費者,如果對spring有了解的朋友,可能會(huì )知道postProcessAfterInitialization是spring后置處理器的方法,主要用來(lái)bean初始化后的一些操作,既然我們知道@KafkaListener會(huì )在bean初始化后再進(jìn)行賦值,那我們就可以在bean初始化前,修改掉@KafkaListener的值。具體實(shí)現如下

@Component
public class KafkaListenerFactoryBeanPostProcesser implements BeanFactoryPostProcessor {

    @SneakyThrows
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

        List<String> packageNames = AutoConfigurationPackages.get(beanFactory);

        for (String packageName : packageNames) {
            Reflections reflections = new Reflections(new ConfigurationBuilder()
                    .forPackages(packageName) // 指定路徑URL
                    .addScanners(new SubTypesScanner()) // 添加子類(lèi)掃描工具
                    .addScanners(new FieldAnnotationsScanner()) // 添加 屬性注解掃描工具
                    .addScanners(new MethodAnnotationsScanner() ) // 添加 方法注解掃描工具
                    .addScanners(new MethodParameterScanner() ) // 添加方法參數掃描工具
            );

            Set<Method> methodSet = reflections.getMethodsAnnotatedWith(KafkaListener.class);
            if(!CollectionUtils.isEmpty(methodSet)){
                for (Method method : methodSet) {
                    KafkaListener kafkaListener = method.getAnnotation(KafkaListener.class);
                    changeTopics(kafkaListener);
                }
            }
        }

    }


    private void changeTopics(KafkaListener kafkaListener) throws Exception{
        InvocationHandler invocationHandler = Proxy.getInvocationHandler(kafkaListener);
        Field memberValuesField = invocationHandler.getClass().getDeclaredField("memberValues");
        memberValuesField.setAccessible(true);
        Map<String,Object> memberValues = (Map<String,Object>)memberValuesField.get(invocationHandler);
        String[] topics = (String[])memberValues.get("topics");
        System.out.println("修改前topics:" + Lists.newArrayList(topics));
        for (int i = 0; i < topics.length; i++) {
            topics[i] = Constant.TOPIC_KEY_PREFIX + topics[i];
        }
        memberValues.put("topics", topics);
        System.out.println("修改后topics:" + Lists.newArrayList(kafkaListener.topics()));

    }
}

測試

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自本網(wǎng)站內容采集于網(wǎng)絡(luò )互聯(lián)網(wǎng)轉載等其它媒體和分享為主,內容觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如侵犯了原作者的版權,請告知一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容,聯(lián)系我們QQ:712375056,同時(shí)歡迎投稿傳遞力量。

免费A级作爱片免费观看中国| 亚洲人成无码网站在线观看| 秋霞无码久久久精品| 中国免费高清在线观看| 日本亚欧乱色视频免费观看| 髙清国产性猛交XXXAND|