张家口购物:实现简朴延迟行列和分布式延迟行列

2021-09-28 206 views 2

扫一扫用手机浏览

  在我们“‘【(的)】’”工作中,许多地方{使用}延迟〖行{〖列〗}〗,好比订单《到》期没有付款作废订单,<制订一「个」提醒“‘【(的)】’”>义务等都需要延迟〖行{〖列〗}〗,“那”么我们需要实现延迟〖行{〖列〗}〗。我们本文“‘【(的)】’”梗概如下,‘同砚们可以选’择性阅读。

  1. 实现一「个」简『朴』“‘【(的)】’”延迟〖行{〖列〗}〗。

  2.{使用}Redis“‘【(的)】’”list实现分布式延迟〖行{〖列〗}〗。

  3.{使用}Redis“‘【(的)】’”zSet实现分布式延迟〖行{〖列〗}〗。

  4. 总结一下,另外另有哪些可以延迟〖行{〖列〗}〗。

 

1.  实现一「个」简『朴』“‘【(的)】’”延迟〖行{〖列〗}〗。

  我们知道现在JAVA〖可以有〗DelayedQueue,我们首先开一「个」DelayQueue“‘【(的)】’”结构(类)图。DelayQueue‘实现了’Delay、BlockingQueue接口。也就是DelayQueue是一种壅闭〖行{〖列〗}〗。

  

 

  『我们』在看一下Delay“‘【(的)】’”(类)图。Delayed接口也‘实现了’Comparable接口,{也就是我}们{使用}Delayed“‘【(的)】’”时刻需要实现CompareTo「方式」。由于〖行{〖列〗}〗中“‘【(的)】’”《数据》需要排一下先后,凭据我们自己“‘【(的)】’”实现。Delayed接口(《里边》)有一「个」「方式」就是getDelay「方式」,‘用于获取延迟时间’,判断是否时间已经《到》了延迟“‘【(的)】’”时间,若是《到》了延迟“‘【(的)】’”时间就可以从〖行{〖列〗}〗(《里边》)获取了。

 

  我们建立一「个」Message(类),‘实现了’Delayed接口,我们主要把getDelay〖和〗compareTo举行实现。在Message“‘【(的)】’”组织「方式」“‘【(的)】’”地方传入延迟“‘【(的)】’”时间,‘单元是毫『秒』’,盘算好触发时间fireTime。同时根据延迟时间“‘【(的)】’”升『序举行排序』。我重写了(《里边》)“‘【(的)】’”toString「方式」,用于将Message根据我写“‘【(的)】’”「方式」举行输【出】。

package com.hqs.delayQueue.bean;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @author huangqingshi
 * @Date 2020-04-18
 */
public class Message implements Delayed {

    private String body;
    private long fireTime;

    public String getBody() {
        return body;
    }

    public long getFireTime() {
        return fireTime;
    }

    public Message(String body, long delayTime) {
        this.body = body;
        this.fireTime = delayTime + System.currentTimeMillis();
    }

    public long getDelay(TimeUnit unit) {

        return unit.convert(this.fireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        return System.currentTimeMillis() + ":" + body;
    }

    public static void main(String[] args) throws InterruptedException {
        System.out.println(System.currentTimeMillis() + ":start");
        BlockingQueue<Message> queue = new DelayQueue<>();
        Message message1 = new Message("hello", 1000 * 5L);
        Message message2 = new Message("world", 1000 * 7L);

        queue.put(message1);
        queue.put(message2);

        while (queue.size() > 0) {
            System.out.println(queue.take());
        }
    }
}

  (《里边》)“‘【(的)】’”main「方式」(《里边》)声明晰两「个」Message,一「个」延迟5『秒』,一「个」延迟7『秒』,时间《到》了之后会将接取【出】而且打印。输【出】“‘【(的)】’”效果如下,正是我们所期望“‘【(的)】’”。

1587218430786:start
1587218435789:hello
1587218437793:world

  这「个」「方式」实现<起来>真“‘【(的)】’”异常简『朴』。然则瑕玷也是很明显“‘【(的)】’”,就是《数据》在内存(《里边》),《数》据对照容易丢失。那么我们需要〖接纳〗Redis实现分布式“‘【(的)】’”义务处置。

  2. {使用}Redis“‘【(的)】’”list实现分布式延迟〖行{〖列〗}〗。

  {内陆需要安}装一「个」Redis,我自己是{使用}Docker「构建一」「个」Redis, 异常快速[,下令也没 若干[。我们直接启动Redis而且露【出】6379端口。进入之后直接{使用}客户端下令即可查看〖和〗调试《数据》。

docker pull redis
docker run -itd --name redisLocal -p 6379:6379 redis
docker exec -it redisLocal /bin/bash
redis-cli

  我内陆〖接纳〗spring-boot“‘【(的)】’”「方式」毗邻redis,pom文件{〖列〗}一下,供人人参考。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.hqs</groupId>
    <artifactId>delayQueue</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>delayQueue</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

  加上Redis“‘【(的)】’”设置放《到》application.properties『(《里边》)即可实现』Redis毗邻,异常“‘【(的)】’”利便。

# redis
redis.host=127.0.0.1
redis.port=6379
redis.password=
redis.maxIdle=100
redis.maxTotal=300
redis.maxWait=10000
redis.testOnBorrow=true
redis.timeout=100000

  【接下】来实现一「个」基于Redis“‘【(的)】’”list《数据》(类)型举行实现“‘【(的)】’”一「个」(类)。我们{使用}RedisTemplate“操作”Redis,这「个」(《里边》)封装好我们所需要“‘【(的)】’”Redis“‘【(的)】’”一些「方式」,用<起来>异常利便。这「个」(类)允许延迟义务做多有10W「个」,也是制止《数据》量过大对Redis‘造成影’响。若是在线上{使用}“‘【(的)】’”时刻也需要思量延迟义务“‘【(的)】’” 若干[。太多几百万几万万“‘【(的)】’”时刻可能《数据》量异常大,{我}们需要盘算Redis“‘【(的)】’”空间是否够。这「个」代码也是异常“‘【(的)】’”简『朴』,一「个」用于存放需要延迟“‘【(的)】’”新闻,〖接纳〗offer“‘【(的)】’”「方式」。另外一「个」是启动一「个」线程, ‘若是新’闻时间《到》了,那么就将《数据》lpush《到》Redis(《里边》)。

package com.hqs.delayQueue.cache;

import com.hqs.delayQueue.bean.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.concurrent.BlockingQueue;

/**
 * @author huangqingshi
 * @Date 2020-04-18
 */
@Slf4j
public class RedisListDelayedQueue{

    private static final int MAX_SIZE_OF_QUEUE = 100000;
    private RedisTemplate<String, String> redisTemplate;
    private String queueName;
    private BlockingQueue<Message> delayedQueue;

    public RedisListDelayedQueue(RedisTemplate<String, String> redisTemplate, String queueName, BlockingQueue<Message> delayedQueue) {
        this.redisTemplate = redisTemplate;
        this.queueName = queueName;
        this.delayedQueue = delayedQueue;
        init();
    }

    public void offerMessage(Message message) {
        if(delayedQueue.size() > MAX_SIZE_OF_QUEUE) {
            throw new IllegalStateException("跨越〖行{〖列〗}〗要求最大值,〖请检〗查");
        }
        try {
            log.info("offerMessage:" + message);
            delayedQueue.offer(message);
        } catch (Exception e) {
            log.error("offMessage异常", e);
        }
    }

    public void init() {
        new Thread(() -> {
            while(true) {
                try {
                    Message message = delayedQueue.take();
                    redisTemplate.opsForList().leftPush(queueName, message.toString());
                } catch (InterruptedException e) {
                    log.error("作废息错误", e);
                }
            }
        }).start();
    }
}

  接下来我们看一下,我们写一「个」测试“‘【(的)】’”controller。人人看一下这「个」请求/redis/listDelayedQueue“‘【(的)】’”代码位置。我们也是生成了两「个」新闻,然后把新闻放《到》〖行{〖列〗}〗(《里边》),另外我们在启动一「个」线程义务,用于将《数据》从Redis“‘【(的)】’”list中获取。「方式」也异常简『朴』。

package com.hqs.delayQueue.controller;

import com.hqs.delayQueue.bean.Message;
import com.hqs.delayQueue.cache.RedisListDelayedQueue;
import com.hqs.delayQueue.cache.RedisZSetDelayedQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import java.util.Set;
import java.util.concurrent.*;

/**
 * @author huangqingshi
 * @Date 2020-04-18
 */
@Slf4j
@Controller
public class DelayQueueController {

    private static final int CORE_SIZE = Runtime.getRuntime().availableProcessors();

    //注重RedisTemplate用“‘【(的)】’”String,String,后续所有用《到》“‘【(的)】’”key〖和〗value都是String“‘【(的)】’”
    @Autowired
    RedisTemplate<String, String> redisTemplate;

    private static ThreadPoolExecutor taskExecPool = new ThreadPoolExecutor(CORE_SIZE, CORE_SIZE, 0, TimeUnit.SECONDS,
            new LinkedBlockingDeque<>());

    @GetMapping("/redisTest")
    @ResponseBody
    public String redisTest() {
        redisTemplate.opsForValue().set("a","b",60L, TimeUnit.SECONDS);
        System.out.println(redisTemplate.opsForValue().get("a"));
        return "s";
    }

    @GetMapping("/redis/listDelayedQueue")
    @ResponseBody
    public String listDelayedQueue() {

        Message message1 = new Message("hello", 1000 * 5L);
        Message message2 = new Message("world", 1000 * 7L);

        String queueName = "list_queue";

        BlockingQueue<Message> delayedQueue = new DelayQueue<>();

        RedisListDelayedQueue redisListDelayedQueue = new RedisListDelayedQueue(redisTemplate, queueName, delayedQueue);

        redisListDelayedQueue.offerMessage(message1);
        redisListDelayedQueue.offerMessage(message2);
        asyncListTask(queueName);

        return "success";
    }

    @GetMapping("/redis/zSetDelayedQueue")
    @ResponseBody
    public String zSetDelayedQueue() {

        Message message1 = new Message("hello", 1000 * 5L);
        Message message2 = new Message("world", 1000 * 7L);

        String queueName = "zset_queue";

        BlockingQueue<Message> delayedQueue = new DelayQueue<>();

        RedisZSetDelayedQueue redisZSetDelayedQueue = new RedisZSetDelayedQueue(redisTemplate, queueName, delayedQueue);

        redisZSetDelayedQueue.offerMessage(message1);
        redisZSetDelayedQueue.offerMessage(message2);
        asyncZSetTask(queueName);

        return "success";
    }

    public void asyncListTask(String queueName) {
        taskExecPool.execute(() -> {
            for(;;) {
                String message = redisTemplate.opsForList().rightPop(queueName);
                if(message != null) {
                    log.info(message);
                }
            }
        });
    }

    public void asyncZSetTask(String queueName) {
        taskExecPool.execute(() -> {
            for(;;) {
                Long nowTimeInMs = System.currentTimeMillis();
                System.out.println("nowTimeInMs:" + nowTimeInMs);
                Set<String> messages = redisTemplate.opsForZSet().rangeByScore(queueName, 0, nowTimeInMs);
                if(messages != null && messages.size() != 0) {
                    redisTemplate.opsForZSet().removeRangeByScore(queueName, 0, nowTimeInMs);
                    for (String message : messages) {
                        log.info("asyncZSetTask:" + message + " " + nowTimeInMs);
                    }
                    log.info(redisTemplate.opsForZSet().zCard(queueName).toString());
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

}

  我就不把运行效果写【出】来了,感兴趣“‘【(的)】’”同砚自己自行 试验[。固然这「个」「方式」也是从内存中拿【出】《数据》,《到》时间之后放《到》Redis(《里边》),照样会存在程序启动“‘【(的)】’”时刻,《义务》举行丢失。我们继续看另外一种「方式」更好“‘【(的)】’”举行这「个」【问题】“‘【(的)】’”处置。

  3. {使用}Redis“‘【(的)】’”zSet实现分布式延迟〖行{〖列〗}〗。

  我们需要再写一「个」ZSet“‘【(的)】’”〖行{〖列〗}〗处置。下边“‘【(的)】’”offerMessage主要是把新闻直接放入缓存中。〖接纳〗Redis“‘【(的)】’”ZSET“‘【(的)】’”zadd「方式」。zadd(key, value, score) 即将key=value“‘【(的)】’”《数据》赋予一「个」score, 放入缓存中。score就是盘算【出】来延迟“‘【(的)】’”毫『秒』数。

package com.hqs.delayQueue.cache;

import com.hqs.delayQueue.bean.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.concurrent.BlockingQueue;

/**
 * @author huangqingshi
 * @Date 2020-04-18
 */
@Slf4j
public class RedisZSetDelayedQueue {

    private static final int MAX_SIZE_OF_QUEUE = 100000;
    private RedisTemplate<String, String> redisTemplate;
    private String queueName;
    private BlockingQueue<Message> delayedQueue;

    public RedisZSetDelayedQueue(RedisTemplate<String, String> redisTemplate, String queueName, BlockingQueue<Message> delayedQueue) {
        this.redisTemplate = redisTemplate;
        this.queueName = queueName;
        this.delayedQueue = delayedQueue;
    }

    public void offerMessage(Message message) {
        if(delayedQueue.size() > MAX_SIZE_OF_QUEUE) {
            throw new IllegalStateException("跨越〖行{〖列〗}〗要求最大值,〖请检〗查");
        }
        long delayTime = message.getFireTime() - System.currentTimeMillis();
        log.info("zset offerMessage" + message + delayTime);
        redisTemplate.opsForZSet().add(queueName, message.toString(), message.getFireTime());
    }

}

  上边“‘【(的)】’”Controller「方式」已经写好了测试“‘【(的)】’”「方式」。/redis/zSetDelayedQueue,(《里边》)主要{使用}ZSet“‘【(的)】’”zRangeByScore(key, min, max)。主要是从score从0,当前时间“‘【(的)】’”毫『秒』数获取。取【出】《数据》后再〖接纳〗removeRangeByScore,将《数据》删除。这样《数据》可以直接写《到》Redis(《里边》),然后取【出】《数据》后直接处置。这种「方式」比前边“‘【(的)】’”「方式」〖稍微好〗一些,然则实际上还存在一些【问题】,由于依赖Redis,若是Redis内存不足或者连不上“‘【(的)】’”时刻,(系统将变得不)可用。

  4. 总结一下,另外另有哪些可以延迟〖行{〖列〗}〗。

  上面“‘【(的)】’”「方式」实在照样存在【问题】“‘【(的)】’”,好比系统重启“‘【(的)】’”时刻照样会造成义务“‘【(的)】’”丢失。以是我们在生产上{使用}“‘【(的)】’”时刻,我们还需要将义务保存<起来>,好比放《到》《数据》库〖和〗文件存储系统将《数据》存储<起来>,这样做《到》double-check,双重检查,最终《到》达义务“‘【(的)】’”99.999%能够处置。

  <实在另有许多器械可以>实现延迟〖行{〖列〗}〗。

  1) RabbitMQ(就可)以实现此功效。这「个」新闻〖行{〖列〗}〗可以把《数据》保存<起来>而且举行处置。

  2)Kafka也可以实现这「个」功效。

  3)Netty“‘【(的)】’”HashedWheelTimer也可以实现这「个」功效。

  

  有兴趣“‘【(的)】’”同砚可以进一步研究这些内容“‘【(的)】’”实现。

  最后放上我“‘【(的)】’”代码: https://github.com/stonehqs/delayQueue  

 

 

   

 

,

Sunbet

www.0577meeting.com提供官方APP下载,游戏火爆,口碑极好,服务一流,{一直是}sunbet会员“‘【(的)】’”首选。

Allbet网站内容转载自互联网,如有侵权,联系www.ALLbetgame.us删除。

本文链接地址:http://www.lijiweihejin.com/post/1038.html

相关文章

发表评论