国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - spring與disruptor集成的簡單示例

spring與disruptor集成的簡單示例

2021-04-07 13:09Muroidea Java教程

本篇文章主要介紹了spring與disruptor集成的簡單示例,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

disruptor不過多介紹了,描述下當前的業務場景,兩個應用A,B,應用 A 向應用 B 傳遞數據 . 數據傳送比較快,如果用http直接push數據然后入庫,效率不高.有可能導致A應用比較大的壓力. 使用mq 太重量級,所以選擇了disruptor. 也可以使用Reactor

BaseQueueHelper.java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/**
 * lmax.disruptor 高效隊列處理模板. 支持初始隊列,即在init()前進行發布。
 *
 * 調用init()時才真正啟動線程開始處理 系統退出自動清理資源.
 *
 * @author xielongwang
 * @create 2018-01-18 下午3:49
 * @email xielong.wang@nvr-china.com
 * @description
 */
public abstract class BaseQueueHelper<D, E extends ValueWrapper<D>, H extends WorkHandler<E>> {
 
  /**
   * 記錄所有的隊列,系統退出時統一清理資源
   */
  private static List<BaseQueueHelper> queueHelperList = new ArrayList<BaseQueueHelper>();
  /**
   * Disruptor 對象
   */
  private Disruptor<E> disruptor;
  /**
   * RingBuffer
   */
  private RingBuffer<E> ringBuffer;
  /**
   * initQueue
   */
  private List<D> initQueue = new ArrayList<D>();
 
  /**
   * 隊列大小
   *
   * @return 隊列長度,必須是2的冪
   */
  protected abstract int getQueueSize();
 
  /**
   * 事件工廠
   *
   * @return EventFactory
   */
  protected abstract EventFactory<E> eventFactory();
 
  /**
   * 事件消費者
   *
   * @return WorkHandler[]
   */
  protected abstract WorkHandler[] getHandler();
 
  /**
   * 初始化
   */
  public void init() {
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build();
    disruptor = new Disruptor<E>(eventFactory(), getQueueSize(), namedThreadFactory, ProducerType.SINGLE, getStrategy());
    disruptor.setDefaultExceptionHandler(new MyHandlerException());
    disruptor.handleEventsWithWorkerPool(getHandler());
    ringBuffer = disruptor.start();
 
    //初始化數據發布
    for (D data : initQueue) {
      ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() {
        @Override
        public void translateTo(E event, long sequence, D data) {
          event.setValue(data);
        }
      }, data);
    }
 
    //加入資源清理鉤子
    synchronized (queueHelperList) {
      if (queueHelperList.isEmpty()) {
        Runtime.getRuntime().addShutdownHook(new Thread() {
          @Override
          public void run() {
            for (BaseQueueHelper baseQueueHelper : queueHelperList) {
              baseQueueHelper.shutdown();
            }
          }
        });
      }
      queueHelperList.add(this);
    }
  }
 
  /**
   * 如果要改變線程執行優先級,override此策略. YieldingWaitStrategy會提高響應并在閑時占用70%以上CPU,
   * 慎用SleepingWaitStrategy會降低響應更減少CPU占用,用于日志等場景.
   *
   * @return WaitStrategy
   */
  protected abstract WaitStrategy getStrategy();
 
  /**
   * 插入隊列消息,支持在對象init前插入隊列,則在隊列建立時立即發布到隊列處理.
   */
  public synchronized void publishEvent(D data) {
    if (ringBuffer == null) {
      initQueue.add(data);
      return;
    }
    ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() {
      @Override
      public void translateTo(E event, long sequence, D data) {
        event.setValue(data);
      }
    }, data);
  }
 
  /**
   * 關閉隊列
   */
  public void shutdown() {
    disruptor.shutdown();
  }
}

EventFactory.java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
 * @author xielongwang
 * @create 2018-01-18 下午6:24
 * @email xielong.wang@nvr-china.com
 * @description
 */
public class EventFactory implements com.lmax.disruptor.EventFactory<SeriesDataEvent> {
 
  @Override
  public SeriesDataEvent newInstance() {
    return new SeriesDataEvent();
  }
}

MyHandlerException.java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class MyHandlerException implements ExceptionHandler {
 
  private Logger logger = LoggerFactory.getLogger(MyHandlerException.class);
 
  /*
   * (non-Javadoc) 運行過程中發生時的異常
   *
   * @see
   * com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable
   * , long, java.lang.Object)
   */
  @Override
  public void handleEventException(Throwable ex, long sequence, Object event) {
    ex.printStackTrace();
    logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage());
  }
 
  /*
   * (non-Javadoc) 啟動時的異常
   *
   * @see
   * com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang.
   * Throwable)
   */
  @Override
  public void handleOnStartException(Throwable ex) {
    logger.error("start disruptor error ==[{}]!", ex.getMessage());
  }
 
  /*
   * (non-Javadoc) 關閉時的異常
   *
   * @see
   * com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang
   * .Throwable)
   */
  @Override
  public void handleOnShutdownException(Throwable ex) {
    logger.error("shutdown disruptor error ==[{}]!", ex.getMessage());
  }
}

SeriesData.java (代表應用A發送給應用B的消息)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class SeriesData {
  private String deviceInfoStr;
  public SeriesData() {
  }
 
  public SeriesData(String deviceInfoStr) {
    this.deviceInfoStr = deviceInfoStr;
  }
 
  public String getDeviceInfoStr() {
    return deviceInfoStr;
  }
 
  public void setDeviceInfoStr(String deviceInfoStr) {
    this.deviceInfoStr = deviceInfoStr;
  }
 
  @Override
  public String toString() {
    return "SeriesData{" +
        "deviceInfoStr='" + deviceInfoStr + '\'' +
        '}';
  }
}

SeriesDataEvent.java

?
1
2
public class SeriesDataEvent extends ValueWrapper<SeriesData> {
}

SeriesDataEventHandler.java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class SeriesDataEventHandler implements WorkHandler<SeriesDataEvent> {
  private Logger logger = LoggerFactory.getLogger(SeriesDataEventHandler.class);
  @Autowired
  private DeviceInfoService deviceInfoService;
 
  @Override
  public void onEvent(SeriesDataEvent event) {
    if (event.getValue() == null || StringUtils.isEmpty(event.getValue().getDeviceInfoStr())) {
      logger.warn("receiver series data is empty!");
    }
    //業務處理
    deviceInfoService.processData(event.getValue().getDeviceInfoStr());
  }
}

SeriesDataEventQueueHelper.java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Component
public class SeriesDataEventQueueHelper extends BaseQueueHelper<SeriesData, SeriesDataEvent, SeriesDataEventHandler> implements InitializingBean {
  private static final int QUEUE_SIZE = 1024;
  @Autowired
  private List<SeriesDataEventHandler> seriesDataEventHandler;
 
  @Override
  protected int getQueueSize() {
    return QUEUE_SIZE;
  }
 
  @Override
  protected com.lmax.disruptor.EventFactory eventFactory() {
    return new EventFactory();
  }
 
  @Override
  protected WorkHandler[] getHandler() {
    int size = seriesDataEventHandler.size();
    SeriesDataEventHandler[] paramEventHandlers = (SeriesDataEventHandler[]) seriesDataEventHandler.toArray(new SeriesDataEventHandler[size]);
    return paramEventHandlers;
  }
 
  @Override
  protected WaitStrategy getStrategy() {
    return new BlockingWaitStrategy();
    //return new YieldingWaitStrategy();
  }
 
  @Override
  public void afterPropertiesSet() throws Exception {
    this.init();
  }
}

ValueWrapper.java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public abstract class ValueWrapper<T> {
  private T value;
  public ValueWrapper() {}
  public ValueWrapper(T value) {
    this.value = value;
  }
 
  public T getValue() {
    return value;
  }
 
  public void setValue(T value) {
    this.value = value;
  }
}

DisruptorConfig.java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Configuration
@ComponentScan(value = {"com.portal.disruptor"})
//多實例幾個消費者
public class DisruptorConfig {
 
  /**
   * smsParamEventHandler1
   *
   * @return SeriesDataEventHandler
   */
  @Bean
  public SeriesDataEventHandler smsParamEventHandler1() {
    return new SeriesDataEventHandler();
  }
 
  /**
   * smsParamEventHandler2
   *
   * @return SeriesDataEventHandler
   */
  @Bean
  public SeriesDataEventHandler smsParamEventHandler2() {
    return new SeriesDataEventHandler();
  }
 
  /**
   * smsParamEventHandler3
   *
   * @return SeriesDataEventHandler
   */
  @Bean
  public SeriesDataEventHandler smsParamEventHandler3() {
    return new SeriesDataEventHandler();
  }
 
 
  /**
   * smsParamEventHandler4
   *
   * @return SeriesDataEventHandler
   */
  @Bean
  public SeriesDataEventHandler smsParamEventHandler4() {
    return new SeriesDataEventHandler();
  }
 
  /**
   * smsParamEventHandler5
   *
   * @return SeriesDataEventHandler
   */
  @Bean
  public SeriesDataEventHandler smsParamEventHandler5() {
    return new SeriesDataEventHandler();
  }
}

測試

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//注入SeriesDataEventQueueHelper消息生產者
@Autowired
private SeriesDataEventQueueHelper seriesDataEventQueueHelper;
 
@RequestMapping(value = "/data", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
public DataResponseVo<String> receiverDeviceData(@RequestBody String deviceData) {
  long startTime1 = System.currentTimeMillis();
 
  if (StringUtils.isEmpty(deviceData)) {
    logger.info("receiver data is empty !");
    return new DataResponseVo<String>(400, "failed");
  }
  seriesDataEventQueueHelper.publishEvent(new SeriesData(deviceData));
  long startTime2 = System.currentTimeMillis();
  logger.info("receiver data ==[{}] millisecond ==[{}]", deviceData, startTime2 - startTime1);
  return new DataResponseVo<String>(200, "success");
}

應用A通過/data 接口把數據發送到應用B ,然后通過seriesDataEventQueueHelper 把消息發給disruptor隊列,消費者去消費,整個過程對不會堵塞應用A. 可接受消息丟失, 可以通過擴展SeriesDataEventQueueHelper來達到對disruptor隊列的監控

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

原文鏈接:http://blog.csdn.net/u014087707/article/details/79340463

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 欧洲成人午夜免费大片 | 亚洲福利| 亚洲欧美另类久久久精品2019 | 五月激情综合网 | 国产色网 | 亚洲成人久久久 | 国产精品日韩欧美一区二区三区 | 亚洲国产aⅴ成人精品无吗 久久久91 | 欧美一级在线 | 欧美日韩三级在线 | 国产成人福利在线 | 综合精品久久久 | 日韩在线不卡 | 亚洲五码中文字幕 | 亚洲成人久久久 | 国产精品久久久久久久久久久久 | 色老板在线视频 | 日韩欧美中文字幕在线视频 | 亚洲国产精品自拍视频 | 在线国v免费看 | 国产精品久久久久久久久久久新郎 | 国产1页 | 欧美一区二区三区 | 国产98色在线 | 日韩 | 国产97在线播放 | 欧美日韩精品一区二区在线观看 | 曰本人一级毛片免费完整视频 | www.亚洲成人 | 亚洲性在线| 精品国产一区二区在线 | 淫片一级国产 | 另类国产ts人妖高潮系列视频 | 日本一区二区三区在线视频 | 亚洲 自拍 另类 欧美 丝袜 | 欧美亚洲国产一区二区三区 | 美女视频一区二区三区 | 国产美女视频网站 | 在线观看av国产一区二区 | 一级做a| 欧美视频精品 | 欧美精品乱码久久久久久按摩 |