国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|JAVA教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|JavaScript|易語言|

服務器之家 - 編程語言 - JAVA教程 - 淺談使用java實現阿里云消息隊列簡單封裝

淺談使用java實現阿里云消息隊列簡單封裝

2021-04-09 11:45狂盜一枝梅 JAVA教程

這篇文章主要介紹了淺談使用java實現阿里云消息隊列簡單封裝,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

一、前言

最近公司有使用阿里云消息隊列的需求,為了更加方便使用,本人用了幾天時間將消息隊列封裝成api調用方式以方便內部系統的調用,現在已經完成,特此記錄其中過程和使用到的相關技術,與君共勉。

現在阿里云提供了兩種消息服務:mns服務和ons服務,其中我認為mns是簡化版的ons,而且mns的消息消費需要自定義輪詢策略的,相比之下,ons的發布與訂閱模式功能更加強大(比如相對于mns,ons提供了消息追蹤、日志、監控等功能),其api使用起來更加方便,而且聽聞阿里內部以后不再對mns進行新的開發,只做維護,ons服務則會逐步替代mns服務成為阿里消息服務的主打產品,所以,如果有使用消息隊列的需求,建議不要再使用mns,使用ons是最好的選擇。

涉及到的技術:Spring,反射、動態代理、Jackson序列化和反序列化

在看下面的文章之前,需要先看上面的文檔以了解相關概念(Topic、Consumer、Producer、Tag等)以及文檔中提供的簡單的發送和接收代碼實現。

該博文只針對有消息隊列知識基礎的朋友看,能幫上大家的忙我自然很高興,看不懂的也不要罵,說明你路子不對。

二、設計方案

1.消息發送

在一個簡單的cs架構中,假設server會監聽一個Topic的Producer發送的消息,那么它首先應該提供client一個api,client只需要簡單的調用該api,就可以通過producer來生產消息

2.消息接收

由于api是server制定的,所以server當然也知道如何消費這些消息

在這個過程中,server實際充當著消費者的角色,client實際充當著生產者的角色,但是生產者生產消息的規則則由消費者制定以滿足消費者消費需求。

3.最終目標

我們要創建一個單獨的jar包,起名為queue-core為生產者和消費者提供依賴和發布訂閱的具體實現。

三、消息發送

1.消費者提供接口

java" id="highlighter_443537">
?
1
2
3
4
5
6
7
8
9
@Topic(name="kdyzm",producerId="kdyzm_producer")
public interface UserQueueResource {
  
  @Tag("test1")
  public void handleUserInfo(@Body @Key("userInfoHandler") UserModel user);
  
  @Tag("test2")
  public void handleUserInfo1(@Body @Key("userInfoHandler1") UserModel user);
}

由于Topic和producer之間是N:1的關系,所以這里直接將producerId作為Topic的一個屬性;Tag是一個很關鍵的過濾條件,消費者通過它進行消息的分類做不同的業務處理,所以,這里使用Tag作為路由條件。

2.生產者使用消費者提供的api發送消息

由于消費者只提供了接口給生產者使用,接口是沒有辦法直接使用的,因為沒有辦法實例化,這里使用動態代理生成對象,在消費者提供的api中,添加如下config,以方便生產者直接導入config即可使用,這里使用了基于java的spring config,請知悉。

?
1
2
3
4
5
6
7
8
9
@Configuration
public class QueueConfig {
 
  @Autowired
  @Bean
  public UserQueueResource userQueueResource() {
    return QueueResourceFactory.createProxyQueueResource(UserQueueResource.class);
  }
}

3.queue-core對生產者發送消息的封裝

以上1中所有的注解(Topic、Tag、Body 、Key)以及2中使用到的QueueResourceFactory類都要在queue-core中定義,其中注解的定義只是定義了規則,真正的實現實際上是在QueueResourceFactory中

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.wy.queue.core.api.MQConnection;
import com.wy.queue.core.utils.JacksonSerializer;
import com.wy.queue.core.utils.MQUtils;
import com.wy.queue.core.utils.QueueCoreSpringUtils;
 
public class QueueResourceFactory implements InvocationHandler {
 
  private static final Logger logger=LoggerFactory.getLogger(QueueResourceFactory.class);
  
  private String topicName;
 
  private String producerId;
  
  private JacksonSerializer serializer=new JacksonSerializer();
  
  private static final String PREFIX="PID_";
  
  public QueueResourceFactory(String topicName,String producerId) {
    this.topicName = topicName;
    this.producerId=producerId;
  }
 
  public static <T> T createProxyQueueResource(Class<T> clazz) {
    String topicName = MQUtils.getTopicName(clazz);
    String producerId = MQUtils.getProducerId(clazz);
    T target = (T) Proxy.newProxyInstance(QueueResourceFactory.class.getClassLoader(),
        new Class<?>[] { clazz }, new QueueResourceFactory(topicName,producerId));
    return target;
  }
 
  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    if(args.length == 0 || args.length>1){
      throw new RuntimeException("only accept one param at queueResource interface.");
    }
    String tagName=MQUtils.getTagName(method);
    ProducerFactory producerFactory = QueueCoreSpringUtils.getBean(ProducerFactory.class);
    MQConnection connectionInfo = QueueCoreSpringUtils.getBean(MQConnection.class);
    
    Producer producer = producerFactory.createProducer(PREFIX+connectionInfo.getPrefix()+"_"+producerId);
    
    //發送消息
    Message msg = new Message( //
        // 在控制臺創建的 Topic,即該消息所屬的 Topic 名稱
        connectionInfo.getPrefix()+"_"+topicName,
        // Message Tag,
        // 可理解為 Gmail 中的標簽,對消息進行再歸類,方便 Consumer 指定過濾條件在 MQ 服務器過濾
        tagName,
        // Message Body
        // 任何二進制形式的數據, MQ 不做任何干預,
        // 需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式
        serializer.serialize(args[0]).getBytes());
    SendResult sendResult = producer.send(msg);
    logger.info("Send Message success. Message ID is: " + sendResult.getMessageId());
    return null;
  
}

這里特意將自定義包和第三方使用的包名都貼過來了,以便于區分。

這里到底做了哪些事情呢?

發送消息的過程就是動態代理創建一個代理對象,該對象調用方法的時候會被攔截,首先解析所有的注解,比如topicName、producerId、tag等關鍵信息從注解中取出來,然后調用阿里sdk發送消息,過程很簡單,但是注意,這里發送消息的時候是分環境的,一般來講現在企業中會區分QA、staging、product三種環境,其中QA和staging是測試環境,對于消息隊列來講,也是會有三種環境的,但是QA和staging環境往往為了降低成本使用同一個阿里賬號,所以創建的topic和productId會放到同一個區域下,這樣同名的TopicName是不允許存在的,所以加上了環境前綴加以區分,比如QA_TopicName,PID_Staging_ProducerId等等;另外,queue-core提供了MQConnection接口,以獲取配置信息,生產者服務只需要實現該接口即可。

4.生產者發送消息

?
1
2
3
4
5
6
7
8
9
10
@Autowired
private UserQueueResource userQueueResource;
 
@Override
public void sendMessage() {
  UserModel userModel=new UserModel();
  userModel.setName("kdyzm");
  userModel.setAge(25);
  userQueueResource.handleUserInfo(userModel);
}

只需要數行代碼即可將消息發送到指定的Topic,相對于原生的發送代碼,精簡了太多。

四、消息消費

相對于消息發送,消息的消費要復雜一些。

1.消息消費設計

由于Topic和Consumer之間是N:N的關系,所以將ConsumerId放到消費者具體實現的方法上

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Controller
@QueueResource
public class UserQueueResourceImpl implements UserQueueResource {
 
  private Logger logger = LoggerFactory.getLogger(this.getClass());
 
  @Override
  @ConsumerAnnotation("kdyzm_consumer")
  public void handleUserInfo(UserModel user) {
    logger.info("收到消息1:{}", new Gson().toJson(user));
  }
 
  @Override
  @ConsumerAnnotation("kdyzm_consumer1")
  public void handleUserInfo1(UserModel user) {
    logger.info("收到消息2:{}", new Gson().toJson(user));
  }
}

這里又有兩個新的注解@QueueResource和@ConsumerAnnotation,這兩個注解后續會討論如何使用。有人會問我為什么要使用ConsumerAnnotation這個名字而不使用Consumer這個名字,因為Consumer這個名字和aliyun提供的sdk中的名字沖突了。。。。

在這里, 消費者提供api 接口給生產者以方便生產者發送消息,消費者則實現該接口以消費生產者發送的消息,如何實現api接口就實現了監聽,這點是比較關鍵的邏輯。

2.queue-core實現消息隊列監聽核心邏輯

第一步:使用sping 容器的監聽方法獲取所有加上QueueResource注解的Bean

第二步:分發處理Bean

如何處理這些Bean呢,每個Bean實際上都是一個對象,有了對象,比如上面例子中的UserQueueResourceImpl 對象,我們可以拿到該對象實現的接口字節碼對象,進而可以拿到該接口UserQueueRerousce上的注解以及方法上和方法中的注解,當然UserQueueResourceImpl實現方法上的注解也能拿得到,這里我將獲取到的信息以consumerId為key,其余相關信息封裝為Value緩存到了一個Map對象中,核心代碼如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Class<?> clazz = resourceImpl.getClass();
    Class<?> clazzIf = clazz.getInterfaces()[0];
    Method[] methods = clazz.getMethods();
    String topicName = MQUtils.getTopicName(clazzIf);
    for (Method m : methods) {
      ConsumerAnnotation consumerAnno = m.getAnnotation(ConsumerAnnotation.class);
 
      if (null == consumerAnno) {
//        logger.error("method={} need Consumer annotation.", m.getName());
        continue;
      }
      String consuerId = consumerAnno.value();
      if (StringUtils.isEmpty(consuerId)) {
        logger.error("method={} ConsumerId can't be null", m.getName());
        continue;
      }
      Class<?>[] parameterTypes = m.getParameterTypes();
      Method resourceIfMethod = null;
      try {
        resourceIfMethod = clazzIf.getMethod(m.getName(), parameterTypes);
      } catch (NoSuchMethodException | SecurityException e) {
        logger.error("can't find method={} at super interface={} .", m.getName(), clazzIf.getCanonicalName(),
            e);
        continue;
      }
      String tagName = MQUtils.getTagName(resourceIfMethod);
      consumersMap.put(consuerId, new MethodInfo(topicName, tagName, m));
    }

第三步:通過反射實現消費的動作

首先,先確定好反射動作執行的時機,那就是監聽到了新的消息

其次,如何執行反射動作?不贅述,有反射相關基礎的童鞋都知道怎么做,核心代碼如下所示:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
MQConnection connectionInfo = QueueCoreSpringUtils.getBean(MQConnection.class);
    String topicPrefix=connectionInfo.getPrefix()+"_";
    String consumerIdPrefix=PREFIX+connectionInfo.getPrefix()+"_";
    for(String consumerId:consumersMap.keySet()){
      MethodInfo methodInfo=consumersMap.get(consumerId);
      Properties connectionProperties=convertToProperties(connectionInfo);
      // 您在控制臺創建的 Consumer ID
      connectionProperties.put(PropertyKeyConst.ConsumerId, consumerIdPrefix+consumerId);
      Consumer consumer = ONSFactory.createConsumer(connectionProperties);
      consumer.subscribe(topicPrefix+methodInfo.getTopicName(), methodInfo.getTagName(), new MessageListener() { //訂閱多個Tag
        public Action consume(Message message, ConsumeContext context) {
          try {
            String messageBody=new String(message.getBody(),"UTF-8");
            logger.info("receive message from topic={},tag={},consumerId={},message={}",topicPrefix+methodInfo.getTopicName(),methodInfo.getTagName(),consumerIdPrefix+consumerId,messageBody);
            Method method=methodInfo.getMethod();
            Class<?> parameType = method.getParameterTypes()[0];
            Object arg = jacksonSerializer.deserialize(messageBody, parameType);
            Object[] args={arg};
            method.invoke(resourceImpl, args);
          } catch (Exception e) {
            logger.error("",e);
          }
          return Action.CommitMessage;
        }
      });
      consumer.start();
      logger.info("consumer={} has started.",consumerIdPrefix+consumerId);
    }

五、完整代碼見下面的git鏈接

 https://github.com/kdyzm/queue-core.git

 以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

原文鏈接:http://www.cnblogs.com/kuangdaoyizhimei/p/8508357.html

延伸 · 閱讀

精彩推薦
Weibo Article 1 Weibo Article 2 Weibo Article 3 Weibo Article 4 Weibo Article 5 Weibo Article 6 Weibo Article 7 Weibo Article 8 Weibo Article 9 Weibo Article 10 Weibo Article 11 Weibo Article 12 Weibo Article 13 Weibo Article 14 Weibo Article 15 Weibo Article 16 Weibo Article 17 Weibo Article 18 Weibo Article 19 Weibo Article 20 Weibo Article 21 Weibo Article 22 Weibo Article 23 Weibo Article 24 Weibo Article 25 Weibo Article 26 Weibo Article 27 Weibo Article 28 Weibo Article 29 Weibo Article 30 Weibo Article 31 Weibo Article 32 Weibo Article 33 Weibo Article 34 Weibo Article 35 Weibo Article 36 Weibo Article 37 Weibo Article 38 Weibo Article 39 Weibo Article 40
主站蜘蛛池模板: 久久久久久亚洲一区二区三区蜜臀 | 午夜视频免费在线观看 | 国产精品三级视频 | 国产91视频在线观看 | 一级特黄a免费观看视频 | 日韩欧美在线一区二区 | 精品96久久久久久中文字幕无 | 精品国产乱码久久久久久影片 | 欧美激情一区二区 | 成人免费在线电影 | 国产日韩一区二区三免费高清 | 成人午夜精品久久久久久久蜜臀 | 久久国产精品久久 | 亚洲日韩中文字幕一区 | 亚洲欧美一区二区三区国产精品 | 精品无码久久久久久国产 | 伊人在线| 欧美在线综合 | 中文字幕在线电影观看 | 亚洲福利一区 | 亚洲国产中文字幕 | 国产精品高潮呻吟久久av野狼 | av中文字幕第一页 | 日日操av | 久草视频在线播放 | 国产精品久久久久久亚洲调教 | 少妇一区二区三区免费观看 | 久久青青 | 九色 在线 | 久久精品青青大伊人av | 日韩成人在线免费视频 | 亚洲一区二区av | 国内精品视频一区 | 最近韩国日本免费观看mv免费版 | 久久久精品国产亚洲 | 中文字幕不卡 | 天天干天天操天天射 | 日韩av电影在线观看 | 国产精品高清在线 | www.国产.com | 精品在线一区二区三区 |