国产成人精品18p,天天干成人网,无码专区狠狠躁天天躁,美女脱精光隐私扒开免费观看

如何設計一個(gè)高性能網(wǎng)關(guān)?

發(fā)布時(shí)間:2021-08-02 23:37 來(lái)源:https://blog.51cto.com/u_15233 閱讀:168 作者:編程技術(shù)圈 欄目: 云計算 歡迎投稿:712375056

日英文

Close your eyes. Clear your heart. Let it go.?

往日回顧:

責編:樂(lè )樂(lè )?|?來(lái)自:網(wǎng)絡(luò )

后臺回復“大禮包”有驚喜禮包!

??? ?

-? ? ?背景? ? ?- 最近在github上看了soul網(wǎng)關(guān)的設計,突然就來(lái)了興趣準備自己從零開(kāi)始寫(xiě)一個(gè)高性能的網(wǎng)關(guān)。經(jīng)過(guò)兩周時(shí)間的開(kāi)發(fā),我的網(wǎng)關(guān)ship-gate核心功能基本都已完成,最大的缺陷就是前端功底太差沒(méi)有管理后臺。 -? ? ?設計? ? ?- 1、技術(shù)選型網(wǎng)關(guān)是所有請求的入口,所以要求有很高的吞吐量,為了實(shí)現這點(diǎn)可以使用請求異步化來(lái)解決。目前一般有以下兩種方案: Tomcat/Jetty+NIO+Servlet3Servlet3已經(jīng)支持異步,這種方案使用比較多,京東,有贊和Zuul,都用的是這種方案。 Netty+NIONetty為高并發(fā)而生,目前唯品會(huì )的網(wǎng)關(guān)使用這個(gè)策略,在唯品會(huì )的技術(shù)文章中在相同的情況下Netty是每秒30w+的吞吐量,Tomcat是13w+,可以看出是有一定的差距的,但是Netty需要自己處理HTTP協(xié)議,這一塊比較麻煩。 后面發(fā)現Soul網(wǎng)關(guān)是基于Spring WebFlux(底層Netty)的,不用太關(guān)心HTTP協(xié)議的處理,于是決定也用Spring WebFlux。 網(wǎng)關(guān)的第二個(gè)特點(diǎn)是具備可擴展性,比如Netflix Zuul有preFilters,postFilters等在不同的階段方便處理不同的業(yè)務(wù),基于責任鏈模式將請求進(jìn)行鏈式處理即可實(shí)現。 在微服務(wù)架構下,服務(wù)都會(huì )進(jìn)行多實(shí)例部署來(lái)保證高可用,請求到達網(wǎng)關(guān)時(shí),網(wǎng)關(guān)需要根據URL找到所有可用的實(shí)例,這時(shí)就需要服務(wù)注冊和發(fā)現功能,即注冊中心。 現在流行的注冊中心有Apache的Zookeeper和阿里的Nacos兩種(consul有點(diǎn)小眾),因為之前寫(xiě)RPC框架時(shí)已經(jīng)用過(guò)了Zookeeper,所以這次就選擇了Nacos。 2、需求清單首先要明確目標,即開(kāi)發(fā)一個(gè)具備哪些特性的網(wǎng)關(guān),總結下后如下: 自定義路由規則可基于version的路由規則設置,路由對象包括DEFAUL,HEADER和QUERY三種,匹配方式包括=、regex、like三種。 跨語(yǔ)言HTTP協(xié)議天生跨語(yǔ)言 高性能Netty本身就是一款高性能的通信框架,同時(shí)server將一些路由規則等數據緩存到JVM內存避免請求admin服務(wù)。 高可用支持集群模式防止單節點(diǎn)故障,無(wú)狀態(tài)。 灰度發(fā)布灰度發(fā)布(又名金絲雀發(fā)布)是指在黑與白之間,能夠平滑過(guò)渡的一種發(fā)布方式。在其上可以進(jìn)行A/B testing,即讓一部分用戶(hù)繼續用產(chǎn)品特性A,一部分用戶(hù)開(kāi)始用產(chǎn)品特性B,如果用戶(hù)對B沒(méi)有什么反對意見(jiàn),那么逐步擴大范圍,把所有用戶(hù)都遷移到B上面來(lái)。通過(guò)特性一可以實(shí)現。 接口鑒權基于責任鏈模式,用戶(hù)開(kāi)發(fā)自己的鑒權插件即可。 負載均衡支持多種負載均衡算法,如隨機,輪詢(xún),加權輪詢(xún)等。利用SPI機制可以根據配置進(jìn)行動(dòng)態(tài)加載。 3、架構設計在參考了一些優(yōu)秀的網(wǎng)關(guān)Zuul,Spring Cloud Gateway,Soul后,將項目劃分為以下幾個(gè)模塊。 它們之間的關(guān)系如圖: 網(wǎng)關(guān)設計 注意:?這張圖與實(shí)際實(shí)現有點(diǎn)出入,Nacos push到本地緩存的那個(gè)環(huán)節沒(méi)有實(shí)現,目前只有ship-sever定時(shí)輪詢(xún)pull的過(guò)程。ship-admin從Nacos獲取注冊服務(wù)信息的過(guò)程,也改成了ServiceA啟動(dòng)時(shí)主動(dòng)發(fā)生HTTP請求通知ship-admin。 4、表結構設計 -? ? ?編碼? ? ?- 1、ship-client-spring-boot-starter 首先創(chuàng )建一個(gè)spring-boot-starter命名為ship-client-spring-boot-starter,不知道如何自定義starter的可以看我以前寫(xiě)的《開(kāi)發(fā)自己的starter》。 其核心類(lèi)?AutoRegisterListener?就是在項目啟動(dòng)時(shí)做了兩件事:1.將服務(wù)信息注冊到Nacos注冊中心2.通知ship-admin服務(wù)上線(xiàn)了并注冊下線(xiàn)hook。 代碼如下:*?Created?by?2YSP?on?2020/12/21 */ public?class?AutoRegisterListener?implements?ApplicationListener<ContextRefreshedEvent>?{ ???private?final?static?Logger?LOGGER?=?LoggerFactory.getLogger(AutoRegisterListener.class); ???private?volatile?AtomicBoolean?registered?=?new?AtomicBoolean(false); ???private?final?ClientConfigProperties?properties; ???@NacosInjected ???private?NamingService?namingService; ???@Autowired ???private?RequestMappingHandlerMapping?handlerMapping; ???private?final?ExecutorService?pool; ???/** *?url?list?to?ignore */ ???private?static?List<String>?ignoreUrlList?=?new?LinkedList<>(); ???static?{ ???????ignoreUrlList.add("/error"); ???} ???public?AutoRegisterListener(ClientConfigProperties?properties)?{ ???????if?(!check(properties))?{ ???????????LOGGER.error("client?config?port,contextPath,appName?adminUrl?and?version?can't?be?empty!"); ???????????throw?new?ShipException("client?config?port,contextPath,appName?adminUrl?and?version?can't?be?empty!"); ???????} ???????this.properties?=?properties; ???????pool?=?new?ThreadPoolExecutor(1,?4,?0,?TimeUnit.SECONDS,?new?LinkedBlockingQueue<>()); ???} ???/** *?check?the?ClientConfigProperties * *?@param?properties *?@return */ ???private?boolean?check(ClientConfigProperties?properties)?{ ???????if?(properties.getPort()?==?null|?properties.getContextPath()?==?null ??????????????|?properties.getVersion()?==?null|?properties.getAppName()?==?null ??????????????|?properties.getAdminUrl()?==?null)?{ ???????????return?false; ???????} ???????return?true; ???} ???@Override ???public?void?onApplicationEvent(ContextRefreshedEvent?event)?{ ???????if?(!registered.compareAndSet(false,?true))?{ ???????????return; ???????} ???????doRegister(); ???????registerShutDownHook(); ???} ???/** *?send?unregister?request?to?admin?when?jvm?shutdown */ ???private?void?registerShutDownHook()?{ ???????final?String?url?=?"http://"?+?properties.getAdminUrl()?+?AdminConstants.UNREGISTER_PATH; ???????final?UnregisterAppDTO?unregisterAppDTO?=?new?UnregisterAppDTO(); ???????unregisterAppDTO.setAppName(properties.getAppName()); ???????unregisterAppDTO.setVersion(properties.getVersion()); ???????unregisterAppDTO.setIp(IpUtil.getLocalIpAddress()); ???????unregisterAppDTO.setPort(properties.getPort()); ???????Runtime.getRuntime().addShutdownHook(new?Thread(()?->?{ ???????????OkhttpTool.doPost(url,?unregisterAppDTO); ???????????LOGGER.info("[{}:{}]?unregister?from?ship-admin?success!",?unregisterAppDTO.getAppName(),?unregisterAppDTO.getVersion()); ???????})); ???} ???/** *?register?all?interface?info?to?register?center */ ???private?void?doRegister()?{ ???????Instance?instance?=?new?Instance(); ???????instance.setIp(IpUtil.getLocalIpAddress()); ???????instance.setPort(properties.getPort()); ???????instance.setEphemeral(true); ???????Map<String,?String>?metadataMap?=?new?HashMap<>(); ???????metadataMap.put("version",?properties.getVersion()); ???????metadataMap.put("appName",?properties.getAppName()); ???????instance.setMetadata(metadataMap); ???????try?{ ???????????namingService.registerInstance(properties.getAppName(),?NacosConstants.APP_GROUP_NAME,?instance); ???????}?catch?(NacosException?e)?{ ???????????LOGGER.error("register?to?nacos?fail",?e); ???????????throw?new?ShipException(e.getErrCode(),?e.getErrMsg()); ???????} ???????LOGGER.info("register?interface?info?to?nacos?success!"); ???????//?send?register?request?to?ship-admin ???????String?url?=?"http://"?+?properties.getAdminUrl()?+?AdminConstants.REGISTER_PATH; ???????RegisterAppDTO?registerAppDTO?=?buildRegisterAppDTO(instance); ???????OkhttpTool.doPost(url,?registerAppDTO); ???????LOGGER.info("register?to?ship-admin?success!"); ???} ???private?RegisterAppDTO?buildRegisterAppDTO(Instance?instance)?{ ???????RegisterAppDTO?registerAppDTO?=?new?RegisterAppDTO(); ???????registerAppDTO.setAppName(properties.getAppName()); ???????registerAppDTO.setContextPath(properties.getContextPath()); ???????registerAppDTO.setIp(instance.getIp()); ???????registerAppDTO.setPort(instance.getPort()); ???????registerAppDTO.setVersion(properties.getVersion()); ???????return?registerAppDTO; ???} } 2、ship-servership-sever項目主要包括了兩個(gè)部分內容, 1.請求動(dòng)態(tài)路由的主流程 2.本地緩存數據和ship-admin及nacos同步,這部分在后面3.3再講。 ship-server實(shí)現動(dòng)態(tài)路由的原理是利用WebFilter攔截請求,然后將請求教給plugin chain去鏈式處理。 PluginFilter根據URL解析出appName,然后將啟用的plugin組裝成plugin chain。 public?class?PluginFilter?implements?WebFilter?{ ???private?ServerConfigProperties?properties; ???public?PluginFilter(ServerConfigProperties?properties)?{ ???????this.properties?=?properties; ???} ???@Override ???public?Mono<Void>?filter(ServerWebExchange?exchange,?WebFilterChain?chain)?{ ???????String?appName?=?parseAppName(exchange); ???????if?(CollectionUtils.isEmpty(ServiceCache.getAllInstances(appName)))?{ ???????????throw?new?ShipException(ShipExceptionEnum.SERVICE_NOT_FIND); ???????} ???????PluginChain?pluginChain?=?new?PluginChain(properties,?appName); ???????pluginChain.addPlugin(new?DynamicRoutePlugin(properties)); ???????pluginChain.addPlugin(new?AuthPlugin(properties)); ???????return?pluginChain.execute(exchange,?pluginChain); ???} ???private?String?parseAppName(ServerWebExchange?exchange)?{ ???????RequestPath?path?=?exchange.getRequest().getPath(); ???????String?appName?=?path.value().split("http://blog.51cto.com/")[1]; ???????return?appName; ???} }``` PluginChain繼承了AbstractShipPlugin并持有所有要執行的插件。 ```java *?@Author:?Ship *?@Description: *?@Date:?Created?in?2020/12/25 */ public?class?PluginChain?extends?AbstractShipPlugin?{ ???/** *?the?pos?point?to?current?plugin */ ???private?int?pos; ???/** *?the?plugins?of?chain */ ???private?List<ShipPlugin>?plugins; ???private?final?String?appName; ???public?PluginChain(ServerConfigProperties?properties,?String?appName)?{ ???????super(properties); ???????this.appName?=?appName; ???} ???/** *?add?enabled?plugin?to?chain * *?@param?shipPlugin */ ???public?void?addPlugin(ShipPlugin?shipPlugin)?{ ???????if?(plugins?==?null)?{ ???????????plugins?=?new?ArrayList<>(); ???????} ???????if?(!PluginCache.isEnabled(appName,?shipPlugin.name()))?{ ???????????return; ???????} ???????plugins.add(shipPlugin); ???????//?order?by?the?plugin's?order ???????plugins.sort(Comparator.comparing(ShipPlugin::order)); ???} ???@Override ???public?Integer?order()?{ ???????return?null; ???} ???@Override ???public?String?name()?{ ???????return?null; ???} ???@Override ???public?Mono<Void>?execute(ServerWebExchange?exchange,?PluginChain?pluginChain)?{ ???????if?(pos?==?plugins.size())?{ ???????????return?exchange.getResponse().setComplete(); ???????} ???????return?pluginChain.plugins.get(pos++).execute(exchange,?pluginChain); ???} ???public?String?getAppName()?{ ???????return?appName; ???} } AbstractShipPlugin實(shí)現了ShipPlugin接口,并持有ServerConfigProperties配置對象。 public?abstract?class?AbstractShipPlugin?implements?ShipPlugin?{ ???protected?ServerConfigProperties?properties; ???public?AbstractShipPlugin(ServerConfigProperties?properties)?{ ???????this.properties?=?properties; ???} }``` ShipPlugin接口定義了所有插件必須實(shí)現的三個(gè)方法order(),name()和execute()。 ```java public?interface?ShipPlugin?{ ???/** *?lower?values?have?higher?priority * *?@return */ ???Integer?order(); ???/** *?return?current?plugin?name * *?@return */ ???String?name(); ???Mono<Void>?execute(ServerWebExchange?exchange,PluginChain?pluginChain); }``` DynamicRoutePlugin繼承了抽象類(lèi)AbstractShipPlugin,包含了動(dòng)態(tài)路由的主要業(yè)務(wù)邏輯。 ```java *?@Author:?Ship *?@Description: *?@Date:?Created?in?2020/12/25 */ public?class?DynamicRoutePlugin?extends?AbstractShipPlugin?{ ???private?final?static?Logger?LOGGER?=?LoggerFactory.getLogger(DynamicRoutePlugin.class); ???private?static?WebClient?webClient; ???private?static?final?Gson?gson?=?new?GsonBuilder().create(); ???static?{ ???????HttpClient?httpClient?=?HttpClient.create() ???????????????.tcpConfiguration(client?-> ???????????????????????client.doOnConnected(conn?-> ???????????????????????????????conn.addHandlerLast(new?ReadTimeoutHandler(3)) ???????????????????????????????????????.addHandlerLast(new?WriteTimeoutHandler(3))) ???????????????????????????????.option(ChannelOption.TCP_NODELAY,?true) ???????????????); ???????webClient?=?WebClient.builder().clientConnector(new?ReactorClientHttpConnector(httpClient)) ???????????????.build(); ???} ???public?DynamicRoutePlugin(ServerConfigProperties?properties)?{ ???????super(properties); ???} ???@Override ???public?Integer?order()?{ ???????return?ShipPluginEnum.DYNAMIC_ROUTE.getOrder(); ???} ???@Override ???public?String?name()?{ ???????return?ShipPluginEnum.DYNAMIC_ROUTE.getName(); ???} ???@Override ???public?Mono<Void>?execute(ServerWebExchange?exchange,?PluginChain?pluginChain)?{ ???????String?appName?=?pluginChain.getAppName(); ???????ServiceInstance?serviceInstance?=?chooseInstance(appName,?exchange.getRequest()); //????????LOGGER.info("selected?instance?is?[{}]",?gson.toJson(serviceInstance)); ???????//?request?service ???????String?url?=?buildUrl(exchange,?serviceInstance); ???????return?forward(exchange,?url); ???} ???/** *?forward?request?to?backend?service * *?@param?exchange *?@param?url *?@return */ ???private?Mono<Void>?forward(ServerWebExchange?exchange,?String?url)?{ ???????ServerHttpRequest?request?=?exchange.getRequest(); ???????ServerHttpResponse?response?=?exchange.getResponse(); ???????HttpMethod?method?=?request.getMethod(); ???????WebClient.RequestBodySpec?requestBodySpec?=?webClient.method(method).uri(url).headers((headers)?->?{ ???????????headers.addAll(request.getHeaders()); ???????}); ???????WebClient.RequestHeadersSpec<?>?reqHeadersSpec; ???????if?(requireHttpBody(method))?{ ???????????reqHeadersSpec?=?requestBodySpec.body(BodyInserters.fromDataBuffers(request.getBody())); ???????}?else?{ ???????????reqHeadersSpec?=?requestBodySpec; ???????} ???????//?nio->callback->nio ???????return?reqHeadersSpec.exchange().timeout(Duration.ofMillis(properties.getTimeOutMillis())) ???????????????.onErrorResume(ex?->?{ ???????????????????return?Mono.defer(()?->?{ ???????????????????????String?errorResultJson?=?""; ???????????????????????if?(ex?instanceof?TimeoutException)?{ ???????????????????????????errorResultJson?=?"{\"code\":5001,\"message\":\"network?timeout\"}"; ???????????????????????}?else?{ ???????????????????????????errorResultJson?=?"{\"code\":5000,\"message\":\"system?error\"}"; ???????????????????????} ???????????????????????return?ShipResponseUtil.doResponse(exchange,?errorResultJson); ???????????????????}).then(Mono.empty()); ???????????????}).flatMap(backendResponse?->?{ ???????????????????response.setStatusCode(backendResponse.statusCode()); ???????????????????response.getHeaders().putAll(backendResponse.headers().asHttpHeaders()); ???????????????????return?response.writeWith(backendResponse.bodyToFlux(DataBuffer.class)); ???????????????}); ???} ???/** *?weather?the?http?method?need?http?body * *?@param?method *?@return */ ???private?boolean?requireHttpBody(HttpMethod?method)?{ ???????if?(method.equals(HttpMethod.POST)|?method.equals(HttpMethod.PUT)|?method.equals(HttpMethod.PATCH))?{ ???????????return?true; ???????} ???????return?false; ???} ???private?String?buildUrl(ServerWebExchange?exchange,?ServiceInstance?serviceInstance)?{ ???????ServerHttpRequest?request?=?exchange.getRequest(); ???????String?query?=?request.getURI().getQuery(); ???????String?path?=?request.getPath().value().replaceFirst("http://blog.51cto.com/"?+?serviceInstance.getAppName(),?""); ???????String?url?=?"http://"?+?serviceInstance.getIp()?+?":"?+?serviceInstance.getPort()?+?path; ???????if?(!StringUtils.isEmpty(query))?{ ???????????url?=?url?+?"?"?+?query; ???????} ???????return?url; ???} ???/** *?choose?an?ServiceInstance?according?to?route?rule?config?and?load?balancing?algorithm * *?@param?appName *?@param?request *?@return */ ???private?ServiceInstance?chooseInstance(String?appName,?ServerHttpRequest?request)?{ ???????List<ServiceInstance>?serviceInstances?=?ServiceCache.getAllInstances(appName); ???????if?(CollectionUtils.isEmpty(serviceInstances))?{ ???????????LOGGER.error("service?instance?of?{}?not?find",?appName); ???????????throw?new?ShipException(ShipExceptionEnum.SERVICE_NOT_FIND); ???????} ???????String?version?=?matchAppVersion(appName,?request); ???????if?(StringUtils.isEmpty(version))?{ ???????????throw?new?ShipException("match?app?version?error"); ???????} ???????//?filter?serviceInstances?by?version ???????List<ServiceInstance>?instances?=?serviceInstances.stream().filter(i?->?i.getVersion().equals(version)).collect(Collectors.toList()); ???????//Select?an?instance?based?on?the?load?balancing?algorithm ???????LoadBalance?loadBalance?=?LoadBalanceFactory.getInstance(properties.getLoadBalance(),?appName,?version); ???????ServiceInstance?serviceInstance?=?loadBalance.chooseOne(instances); ???????return?serviceInstance; ???} ???private?String?matchAppVersion(String?appName,?ServerHttpRequest?request)?{ ???????List<AppRuleDTO>?rules?=?RouteRuleCache.getRules(appName); ???????rules.sort(Comparator.comparing(AppRuleDTO::getPriority).reversed()); ???????for?(AppRuleDTO?rule?:?rules)?{ ???????????if?(match(rule,?request))?{ ???????????????return?rule.getVersion(); ???????????} ???????} ???????return?null; ???} ???private?boolean?match(AppRuleDTO?rule,?ServerHttpRequest?request)?{ ???????String?matchObject?=?rule.getMatchObject(); ???????String?matchKey?=?rule.getMatchKey(); ???????String?matchRule?=?rule.getMatchRule(); ???????Byte?matchMethod?=?rule.getMatchMethod(); ???????if?(MatchObjectEnum.DEFAULT.getCode().equals(matchObject))?{ ???????????return?true; ???????}?else?if?(MatchObjectEnum.QUERY.getCode().equals(matchObject))?{ ???????????String?param?=?request.getQueryParams().getFirst(matchKey); ???????????if?(!StringUtils.isEmpty(param))?{ ???????????????return?StringTools.match(param,?matchMethod,?matchRule); ???????????} ???????}?else?if?(MatchObjectEnum.HEADER.getCode().equals(matchObject))?{ ???????????HttpHeaders?headers?=?request.getHeaders(); ???????????String?headerValue?=?headers.getFirst(matchKey); ???????????if?(!StringUtils.isEmpty(headerValue))?{ ???????????????return?StringTools.match(headerValue,?matchMethod,?matchRule); ???????????} ???????} ???????return?false; ???} } 3、數據同步app數據同步后臺服務(wù)(如訂單服務(wù))啟動(dòng)時(shí),只將服務(wù)名,版本,ip地址和端口號注冊到了Nacos,并沒(méi)有實(shí)例的權重和啟用的插件信息怎么辦? 搜索后端架構師公眾號回復“架構整潔”,送你一份驚喜禮包。 一般在線(xiàn)的實(shí)例權重和插件列表都是在管理界面配置,然后動(dòng)態(tài)生效的,所以需要ship-admin定時(shí)更新實(shí)例的權重和插件信息到注冊中心。 對應代碼ship-admin的NacosSyncListener: *?@Author:?Ship *?@Description: *?@Date:?Created?in?2020/12/30 */ @Configuration public?class?NacosSyncListener?implements?ApplicationListener<ContextRefreshedEvent>?{ ???private?static?final?Logger?LOGGER?=?LoggerFactory.getLogger(NacosSyncListener.class); ???private?static?ScheduledThreadPoolExecutor?scheduledPool?=?new?ScheduledThreadPoolExecutor(1, ???????????new?ShipThreadFactory("nacos-sync",?true).create()); ???@NacosInjected ???private?NamingService?namingService; ???@Value("${nacos.discovery.server-addr}") ???private?String?baseUrl; ???@Resource ???private?AppService?appService; ???@Override ???public?void?onApplicationEvent(ContextRefreshedEvent?event)?{ ???????if?(event.getApplicationContext().getParent()?!=?null)?{ ???????????return; ???????} ???????String?url?=?"http://"?+?baseUrl?+?NacosConstants.INSTANCE_UPDATE_PATH; ???????scheduledPool.scheduleWithFixedDelay(new?NacosSyncTask(namingService,?url,?appService),?0,?30L,?TimeUnit.SECONDS); ???} ???class?NacosSyncTask?implements?Runnable?{ ???????private?NamingService?namingService; ???????private?String?url; ???????private?AppService?appService; ???????private?Gson?gson?=?new?GsonBuilder().create(); ???????public?NacosSyncTask(NamingService?namingService,?String?url,?AppService?appService)?{ ???????????this.namingService?=?namingService; ???????????this.url?=?url; ???????????this.appService?=?appService; ???????} ???????/** *?Regular?update?weight,enabled?plugins?to?nacos?instance */ ???????@Override ???????public?void?run()?{ ???????????try?{ ???????????????//?get?all?app?names ???????????????ListView<String>?services?=?namingService.getServicesOfServer(1,?Integer.MAX_VALUE,?NacosConstants.APP_GROUP_NAME); ???????????????if?(CollectionUtils.isEmpty(services.getData()))?{ ???????????????????return; ???????????????} ???????????????List<String>?appNames?=?services.getData(); ???????????????List<AppInfoDTO>?appInfos?=?appService.getAppInfos(appNames); ???????????????for?(AppInfoDTO?appInfo?:?appInfos)?{ ???????????????????if?(CollectionUtils.isEmpty(appInfo.getInstances()))?{ ???????????????????????continue; ???????????????????} ???????????????????for?(ServiceInstance?instance?:?appInfo.getInstances())?{ ???????????????????????Map<String,?Object>?queryMap?=?buildQueryMap(appInfo,?instance); ???????????????????????String?resp?=?OkhttpTool.doPut(url,?queryMap,?""); ???????????????????????LOGGER.debug("response?:{}",?resp); ???????????????????} ???????????????} ???????????}?catch?(Exception?e)?{ ???????????????LOGGER.error("nacos?sync?task?error",?e); ???????????} ???????} ???????private?Map<String,?Object>?buildQueryMap(AppInfoDTO?appInfo,?ServiceInstance?instance)?{ ???????????Map<String,?Object>?map?=?new?HashMap<>(); ???????????map.put("serviceName",?appInfo.getAppName()); ???????????map.put("groupName",?NacosConstants.APP_GROUP_NAME); ???????????map.put("ip",?instance.getIp()); ???????????map.put("port",?instance.getPort()); ???????????map.put("weight",?instance.getWeight().doubleValue()); ???????????NacosMetadata?metadata?=?new?NacosMetadata(); ???????????metadata.setAppName(appInfo.getAppName()); ???????????metadata.setVersion(instance.getVersion()); ???????????metadata.setPlugins(String.join(",",?appInfo.getEnabledPlugins())); ???????????map.put("metadata",?StringTools.urlEncode(gson.toJson(metadata))); ???????????map.put("ephemeral",?true); ???????????return?map; ???????} ???} } ship-server再定時(shí)從Nacos拉取app數據更新到本地Map緩存。 *?@Author:?Ship *?@Description:?sync?data?to?local?cache *?@Date:?Created?in?2020/12/25 */ @Configuration public?class?DataSyncTaskListener?implements?ApplicationListener<ContextRefreshedEvent>?{ ???private?static?ScheduledThreadPoolExecutor?scheduledPool?=?new?ScheduledThreadPoolExecutor(1, ???????????new?ShipThreadFactory("service-sync",?true).create()); ???@NacosInjected ???private?NamingService?namingService; ???@Autowired ???private?ServerConfigProperties?properties; ???@Override ???public?void?onApplicationEvent(ContextRefreshedEvent?event)?{ ???????if?(event.getApplicationContext().getParent()?!=?null)?{ ???????????return; ???????} ???????scheduledPool.scheduleWithFixedDelay(new?DataSyncTask(namingService) ???????????????,?0L,?properties.getCacheRefreshInterval(),?TimeUnit.SECONDS); ???????WebsocketSyncCacheServer?websocketSyncCacheServer?=?new?WebsocketSyncCacheServer(properties.getWebSocketPort()); ???????websocketSyncCacheServer.start(); ???} ???class?DataSyncTask?implements?Runnable?{ ???????private?NamingService?namingService; ???????public?DataSyncTask(NamingService?namingService)?{ ???????????this.namingService?=?namingService; ???????} ???????@Override ???????public?void?run()?{ ???????????try?{ ???????????????//?get?all?app?names ???????????????ListView<String>?services?=?namingService.getServicesOfServer(1,?Integer.MAX_VALUE,?NacosConstants.APP_GROUP_NAME); ???????????????if?(CollectionUtils.isEmpty(services.getData()))?{ ???????????????????return; ???????????????} ???????????????List<String>?appNames?=?services.getData(); ???????????????//?get?all?instances ???????????????for?(String?appName?:?appNames)?{ ???????????????????List<Instance>?instanceList?=?namingService.getAllInstances(appName,?NacosConstants.APP_GROUP_NAME); ???????????????????if?(CollectionUtils.isEmpty(instanceList))?{ ???????????????????????continue; ???????????????????} ???????????????????ServiceCache.add(appName,?buildServiceInstances(instanceList)); ???????????????????List<String>?pluginNames?=?getEnabledPlugins(instanceList); ???????????????????PluginCache.add(appName,?pluginNames); ???????????????} ???????????????ServiceCache.removeExpired(appNames); ???????????????PluginCache.removeExpired(appNames); ???????????}?catch?(NacosException?e)?{ ???????????????e.printStackTrace(); ???????????} ???????} ???????private?List<String>?getEnabledPlugins(List<Instance>?instanceList)?{ ???????????Instance?instance?=?instanceList.get(0); ???????????Map<String,?String>?metadata?=?instance.getMetadata(); ???????????//?plugins:?DynamicRoute,Auth ???????????String?plugins?=?metadata.getOrDefault("plugins",?ShipPluginEnum.DYNAMIC_ROUTE.getName()); ???????????return?Arrays.stream(plugins.split(",")).collect(Collectors.toList()); ???????} ???????private?List<ServiceInstance>?buildServiceInstances(List<Instance>?instanceList)?{ ???????????List<ServiceInstance>?list?=?new?LinkedList<>(); ???????????instanceList.forEach(instance?->?{ ???????????????Map<String,?String>?metadata?=?instance.getMetadata(); ???????????????ServiceInstance?serviceInstance?=?new?ServiceInstance(); ???????????????serviceInstance.setAppName(metadata.get("appName")); ???????????????serviceInstance.setIp(instance.getIp()); ???????????????serviceInstance.setPort(instance.getPort()); ???????????????serviceInstance.setVersion(metadata.get("version")); ???????????????serviceInstance.setWeight((int)?instance.getWeight()); ???????????????list.add(serviceInstance); ???????????}); ???????????return?list; ???????} ???} } 路由規則數據同步 同時(shí),如果用戶(hù)在管理后臺更新了路由規則,ship-admin需要推送規則數據到ship-server,這里參考了soul網(wǎng)關(guān)的做法利用websocket在第一次建立連接后進(jìn)行全量同步,此后路由規則發(fā)生變更就只作增量同步。 搜索頂級架構師公眾號回復“架構”,送你一份驚喜禮包。 服務(wù)端WebsocketSyncCacheServer: *?@Author:?Ship *?@Description: *?@Date:?Created?in?2020/12/28 */ public?class?WebsocketSyncCacheServer?extends?WebSocketServer?{ ???private?final?static?Logger?LOGGER?=?LoggerFactory.getLogger(WebsocketSyncCacheServer.class); ???private?Gson?gson?=?new?GsonBuilder().create(); ???private?MessageHandler?messageHandler; ???public?WebsocketSyncCacheServer(Integer?port)?{ ???????super(new?InetSocketAddress(port)); ???????this.messageHandler?=?new?MessageHandler(); ???} ???@Override ???public?void?onOpen(WebSocket?webSocket,?ClientHandshake?clientHandshake)?{ ???????LOGGER.info("server?is?open"); ???} ???@Override ???public?void?onClose(WebSocket?webSocket,?int?i,?String?s,?boolean?b)?{ ???????LOGGER.info("websocket?server?close..."); ???} ???@Override ???public?void?onMessage(WebSocket?webSocket,?String?message)?{ ???????LOGGER.info("websocket?server?receive?message:\n[{}]",?message); ???????this.messageHandler.handler(message); ???} ???@Override ???public?void?onError(WebSocket?webSocket,?Exception?e)?{ ???} ???@Override ???public?void?onStart()?{ ???????LOGGER.info("websocket?server?start..."); ???} ???class?MessageHandler?{ ???????public?void?handler(String?message)?{ ???????????RouteRuleOperationDTO?operationDTO?=?gson.fromJson(message,?RouteRuleOperationDTO.class); ???????????if?(CollectionUtils.isEmpty(operationDTO.getRuleList()))?{ ???????????????return; ???????????} ???????????Map<String,?List<AppRuleDTO>>?map?=?operationDTO.getRuleList() ???????????????????.stream().collect(Collectors.groupingBy(AppRuleDTO::getAppName)); ???????????if?(OperationTypeEnum.INSERT.getCode().equals(operationDTO.getOperationType()) ??????????????????|?OperationTypeEnum.UPDATE.getCode().equals(operationDTO.getOperationType()))?{ ???????????????RouteRuleCache.add(map); ???????????}?else?if?(OperationTypeEnum.DELETE.getCode().equals(operationDTO.getOperationType()))?{ ???????????????RouteRuleCache.remove(map); ???????????} ???????} ???} } 客戶(hù)端WebsocketSyncCacheClient: *?@Author:?Ship *?@Description: *?@Date:?Created?in?2020/12/28 */ @Component public?class?WebsocketSyncCacheClient?{ ???private?final?static?Logger?LOGGER?=?LoggerFactory.getLogger(WebsocketSyncCacheClient.class); ???private?WebSocketClient?client; ???private?RuleService?ruleService; ???private?Gson?gson?=?new?GsonBuilder().create(); ???public?WebsocketSyncCacheClient(@Value("${ship.server-web-socket-url}")?String?serverWebSocketUrl, RuleService?ruleService)?{ ???????if?(StringUtils.isEmpty(serverWebSocketUrl))?{ ???????????throw?new?ShipException(ShipExceptionEnum.CONFIG_ERROR); ???????} ???????this.ruleService?=?ruleService; ???????ScheduledThreadPoolExecutor?executor?=?new?ScheduledThreadPoolExecutor(1, ???????????????new?ShipThreadFactory("websocket-connect",?true).create()); ???????try?{ ???????????client?=?new?WebSocketClient(new?URI(serverWebSocketUrl))?{ ???????????????@Override ???????????????public?void?onOpen(ServerHandshake?serverHandshake)?{ ???????????????????LOGGER.info("client?is?open"); ???????????????????List<AppRuleDTO>?list?=?ruleService.getEnabledRule(); ???????????????????String?msg?=?gson.toJson(new?RouteRuleOperationDTO(OperationTypeEnum.INSERT,?list)); ???????????????????send(msg); ???????????????} ???????????????@Override ???????????????public?void?onMessage(String?s)?{ ???????????????} ???????????????@Override ???????????????public?void?onClose(int?i,?String?s,?boolean?b)?{ ???????????????} ???????????????@Override ???????????????public?void?onError(Exception?e)?{ ???????????????????LOGGER.error("websocket?client?error",?e); ???????????????} ???????????}; ???????????client.connectBlocking(); ???????????//使用調度線(xiàn)程池進(jìn)行斷線(xiàn)重連,30秒進(jìn)行一次 ???????????executor.scheduleAtFixedRate(()?->?{ ???????????????if?(client?!=?null?&&?client.isClosed())?{ ???????????????????try?{ ???????????????????????client.reconnectBlocking(); ???????????????????}?catch?(InterruptedException?e)?{ ???????????????????????LOGGER.error("reconnect?server?fail",?e); ???????????????????} ???????????????} ???????????},?10,?30,?TimeUnit.SECONDS); ???????}?catch?(Exception?e)?{ ???????????LOGGER.error("websocket?sync?cache?exception",?e); ???????????throw?new?ShipException(e.getMessage()); ???????} ???} ???public?<T>?void?send(T?t)?{ ???????while?(!client.getReadyState().equals(ReadyState.OPEN))?{ ???????????LOGGER.debug("connecting?...please?wait"); ???????} ???????client.send(gson.toJson(t)); ???} } -? ? ?測試? ? ?- 1、動(dòng)態(tài)路由測試本地啟動(dòng)nacos ,sh startup.sh -m standalone; 啟動(dòng)ship-admin; 本地啟動(dòng)兩個(gè)ship-example實(shí)例。 實(shí)例1配置:ship: ?http: ???app-name:?order ???version:?gray_1.0 ???context-path:?/order ???port:?8081 ???admin-url:?127.0.0.1:9001 ?server: ?port:?8081 ?nacos: ?discovery: ???server-addr:?127.0.0.1:8848 實(shí)例2配置: ship: ?http: ???app-name:?order ???version:?prod_1.0 ???context-path:?/order ???port:?8082 ???admin-url:?127.0.0.1:9001 ?server: ?port:?8082 ?nacos: ?discovery: ???server-addr:?127.0.0.1:8848 在數據庫添加路由規則配置,該規則表示當http header 中的name=ship時(shí)請求路由到gray_1.0版本的節點(diǎn)。 啟動(dòng)ship-server,看到以下日志時(shí)則可以進(jìn)行測試了。2021-01-02?19:57:09.159??INFO?30413?---?[SocketWorker-29]?cn.sp.sync.WebsocketSyncCacheServer??????:?websocket?server?receive?message: ?[{"operationType":"INSERT","ruleList":[{"id":1,"appId":5,"appName":"order","version":"gray_1.0","matchObject":"HEADER","matchKey":"name","matchMethod":1,"matchRule":"ship","priority":50}]}] 用Postman請求:9000/order/user/add,POST方式,header設置name=ship,可以看到只有實(shí)例1有日志顯示。 ==========add?user,version:gray_1.0 2、性能壓測壓測環(huán)境: MacBook Pro 13英寸處理器 2.3 GHz 四核Intel Core i7內存 16 GB 3733 MHz LPDDR4X后端節點(diǎn)個(gè)數一個(gè)壓測工具:wrk壓測結果:20個(gè)線(xiàn)程,500個(gè)連接數,吞吐量大概每秒9400個(gè)請求。 壓測結果 -? ? ?總結?? ?- 千里之行始于足下,開(kāi)始以為寫(xiě)一個(gè)網(wǎng)關(guān)會(huì )很難,但當你實(shí)際開(kāi)始行動(dòng)時(shí)就會(huì )發(fā)現其實(shí)沒(méi)那么難,所以邁出第一步很重要。過(guò)程中也遇到了很多問(wèn)題,還在github上給soul和nacos這兩個(gè)開(kāi)源項目提了兩個(gè)issue,后來(lái)發(fā)現是自己的問(wèn)題,尷尬。 本文代碼已全部上傳到:https://github.com/2YSP/ship-gate 。 PS:歡迎在留言區留下你的觀(guān)點(diǎn),一起討論提高。如果今天的文章讓你有新的啟發(fā),歡迎轉發(fā)分享給更多人。版權申明:內容來(lái)源網(wǎng)絡(luò ),版權歸原創(chuàng )者所有。除非無(wú)法確認,我們都會(huì )標明作者及出處,如有侵權煩請告知,我們會(huì )立即刪除并表示歉意。謝謝! 歡迎加入后端架構師交流群,在后臺回復“學(xué)習”即可。 猜你還想看 阿里、騰訊、百度、華為、京東最新面試題匯集 費解!為什么那么多人用“ji32k7au4a83”作密碼? 微軟在日本嘗試了每周4天工作制,生產(chǎn)力躍升了40% 996引起公憤,要到頭了? BAT等大廠(chǎng)Java面試經(jīng)驗總結 別找了,想獲取 Java大廠(chǎng)面試題學(xué)習資料 掃下方二維碼回復「手冊」就好了 嘿,你在看嗎?

閉上眼睛,清理你的心,過(guò)去的就讓它過(guò)去吧。

每日掏心話(huà)

? ?正文? ?

點(diǎn)擊上方 ""關(guān)注,?星標或置頂一起成長(cháng)

清淡的人生,步履更輕松。一粥一勺是清淡,健康、溫暖、妥帖;一瓢一簞是清淡,隨意、自在、安心。

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自本網(wǎng)站內容采集于網(wǎng)絡(luò )互聯(lián)網(wǎng)轉載等其它媒體和分享為主,內容觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如侵犯了原作者的版權,請告知一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容,聯(lián)系我們QQ:712375056,同時(shí)歡迎投稿傳遞力量。

国产成AV人片在线观看天堂无码| 四虎永久在线精品免费A| 成年妇女免费播放| 亚洲 欧美 激情 小说 另类| FREE性欧美媓妇喷水| 少妇撒尿BBwBBwBBwBBW毛|