diff --git a/campaign/src/main/java/com/pudonghot/yo/campaign/service/CallingListService.java b/campaign/src/main/java/com/pudonghot/yo/campaign/service/CallingListService.java new file mode 100644 index 00000000..bc6cf63d --- /dev/null +++ b/campaign/src/main/java/com/pudonghot/yo/campaign/service/CallingListService.java @@ -0,0 +1,27 @@ +package com.pudonghot.yo.campaign.service; + +import java.util.List; +import com.wacai.tigon.service.BaseCrudService; +import com.pudonghot.yo.model.domain.CallingList; + +/** + * @author Donghuang + * @date Sep 13, 2020 10:32:54 + */ +public interface CallingListService extends BaseCrudService { + + /** + * list limit ready records + * + * @param campaignId campaignId + * @param limit limit + * @return CallingLists + */ + List listReady(final Integer campaignId, final int limit); + + /** + * release calling list lock + * @param callingList callingList + */ + void releaseCallingListLock(final CallingList callingList); +} diff --git a/campaign/src/main/java/com/pudonghot/yo/campaign/service/impl/CallingListServiceImpl.java b/campaign/src/main/java/com/pudonghot/yo/campaign/service/impl/CallingListServiceImpl.java new file mode 100644 index 00000000..5234cb4d --- /dev/null +++ b/campaign/src/main/java/com/pudonghot/yo/campaign/service/impl/CallingListServiceImpl.java @@ -0,0 +1,85 @@ +package com.pudonghot.yo.campaign.service.impl; + +import java.util.Map; +import java.util.Date; +import java.util.List; +import java.util.HashMap; +import lombok.extern.slf4j.Slf4j; +import com.pudonghot.yo.util.TimeUtils; +import com.wacai.tigon.mybatis.Search; +import com.wacai.tigon.sequence.IdSequence; +import org.apache.commons.lang3.time.DateUtils; +import org.springframework.stereotype.Service; +import com.pudonghot.yo.mapper.CallingListMapper; +import com.pudonghot.yo.model.domain.CallingList; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import com.pudonghot.yo.campaign.service.CallingListService; +import com.wacai.tigon.service.support.BaseCrudServiceSupport; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * @author Donghuang + * @date Sep 13, 2020 10:33:54 + */ +@Slf4j +@Service +public class CallingListServiceImpl + extends BaseCrudServiceSupport + implements CallingListService { + + @Autowired + private IdSequence idSeq; + + @Value("${yo.br.campaign.callinglist-lock-expire.seconds:300}") + private int expireLockDuration; + + @Scheduled(fixedRateString = "${yo.br.campaign.callinglist-lock-expire.rate:180000}") + public void expireLock() { + log.info("Expire calling list lock task"); + final Date expireTime = DateUtils.addSeconds(new Date(), -expireLockDuration); + scan(new Search(CallingList.ACTIVE, true) + .gt(CallingList.CREATED_TIME, DateUtils.addDays(new Date(), -1)) + .notNull(CallingList.LAST_CONN_ID) + .lt(CallingList.LOCK_TIME, expireTime), callingList -> { + callingList.setLockTime(null); + callingList.setLockKey(null); + mapper.update(callingList); + }); + } + + /** + * {@inheritDoc} + */ + @Override + public List listReady(final Integer campaignId, int limit) { + final String lockKey = idSeq.get(); + final Map update = new HashMap<>(8); + update.put(CallingList.LOCK_KEY, lockKey); + update.put(CallingList.LOCK_TIME, new Date()); + update.put(CallingList.STATUS, CallingList.Status.ACHIEVED); + + final int time = TimeUtils.secondOfDay(new Date()); + + mapper.update(update, + new Search(CallingList.CAMPAIGN_ID, campaignId) + .eq(CallingList.STATUS, CallingList.Status.READY) + .isNull(CallingList.LOCK_KEY) + .lt(CallingList.DAILY_FROM, time) + .gt(CallingList.DAILY_TO, time) + .limit(limit)); + + return mapper.list(new Search(CallingList.CAMPAIGN_ID, campaignId) + .eq(CallingList.LOCK_KEY, lockKey).limit(limit)); + } + + /** + * {@inheritDoc} + */ + @Override + public void releaseCallingListLock(final CallingList callingList) { + callingList.setLockKey(null); + callingList.setLockTime(null); + mapper.update(callingList); + } +} diff --git a/campaign/src/main/java/com/pudonghot/yo/campaign/service/impl/CampaignServiceImpl.java b/campaign/src/main/java/com/pudonghot/yo/campaign/service/impl/CampaignServiceImpl.java index 858d5135..4df8705c 100644 --- a/campaign/src/main/java/com/pudonghot/yo/campaign/service/impl/CampaignServiceImpl.java +++ b/campaign/src/main/java/com/pudonghot/yo/campaign/service/impl/CampaignServiceImpl.java @@ -1,21 +1,26 @@ package com.pudonghot.yo.campaign.service.impl; import java.util.Date; +import java.util.List; import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; +import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.concurrent.TimeUnit; import com.pudonghot.yo.util.LogMDC; import lombok.RequiredArgsConstructor; import com.wacai.tigon.mybatis.Search; import com.pudonghot.yo.util.TimeUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.StringUtils; import com.pudonghot.yo.mapper.CampaignMapper; -import com.pudonghot.yo.model.domain.Campaign; import org.springframework.stereotype.Service; +import com.pudonghot.yo.model.domain.Campaign; import com.pudonghot.yo.mapper.AgentStatusMapper; import com.pudonghot.yo.service.CommonCallDataService; import com.pudonghot.yo.fsagent.api.CampaignDialService; import com.pudonghot.yo.campaign.service.CampaignService; +import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import com.pudonghot.yo.fsagent.api.request.ReqCampaignDial; import org.springframework.beans.factory.annotation.Autowired; @@ -37,6 +42,10 @@ public class CampaignServiceImpl implements CampaignService { private final ThreadPoolTaskExecutor taskExecutor; + @Value("${yo.campaign.dial-batch:8}") + private int dialBatchSize; + @Value("${yo.campaign.dial-sleep:12000}") + private int dialBatchSleep; @Autowired private AgentStatusMapper agentStatusMapper; @Autowired @@ -94,9 +103,6 @@ public class CampaignServiceImpl return; } - // TODO campaign dial - // Outbound = (readyAgent + expNum) * z * speedAdjust - taskNum; - final int optimizationValue = campaign.getOptimizationValue(); int fetchCount = 0; if (countIdleAgentOfQueue > 4) { @@ -125,13 +131,32 @@ public class CampaignServiceImpl campaignId, campaignKey, StringUtils.join(data, "|")); } - // TODO insert DB - final ReqCampaignDial req= new ReqCampaignDial(); req.setCampaignId(campaignId); - req.setCallingList(Stream.of(data) - .map(RespCallingList.CallingData::toPair) - .collect(Collectors.toList())); - dialService.queueDial(req); + final List> dataList = Stream.of(data) + .map(RespCallingList.CallingData::toPair) + .collect(Collectors.toList()); + eachBatch(dataList, dialBatchSize, cl -> { + req.setCallingList(cl); + dialService.queueDial(req); + try { + log.debug("Sleep [{}] milliseconds.", dialBatchSize); + TimeUnit.MILLISECONDS.sleep(dialBatchSleep); + } + catch (InterruptedException e) { + log.warn("Campaign dial sleep exception caused.", e); + } + }); + } + + void eachBatch( + final List list, + final int batch, + final Consumer> consumer) { + + final int size = list.size(); + for (int i = 0; i < size; i += batch) { + consumer.accept(list.subList(i, Math.min(size, i + batch))); + } } } diff --git a/campaign/src/test/java/com/pudonghot/yo/campaign/service/impl/CampaignServiceImplTest.java b/campaign/src/test/java/com/pudonghot/yo/campaign/service/impl/CampaignServiceImplTest.java new file mode 100644 index 00000000..ffc9b692 --- /dev/null +++ b/campaign/src/test/java/com/pudonghot/yo/campaign/service/impl/CampaignServiceImplTest.java @@ -0,0 +1,24 @@ +package com.pudonghot.yo.campaign.service.impl; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author Donghuang + * @date Sep 13, 2020 10:25:44 + */ +@Slf4j +public class CampaignServiceImplTest { + + @Test + public void testEachBatch() { + List list = new ArrayList<>(100); + for (int i = 0; i < 100; i++) { + list.add(i); + } + new CampaignServiceImpl(null).eachBatch(list, 8, sub -> log.info("{}", sub)); + } +} diff --git a/fsagent/src/main/java/com/pudonghot/yo/fsagent/listener/CampaignChannelDestroy.java b/fsagent/src/main/java/com/pudonghot/yo/fsagent/listener/CampaignChannelDestroy.java index 2c00c01c..a9028a85 100644 --- a/fsagent/src/main/java/com/pudonghot/yo/fsagent/listener/CampaignChannelDestroy.java +++ b/fsagent/src/main/java/com/pudonghot/yo/fsagent/listener/CampaignChannelDestroy.java @@ -72,6 +72,8 @@ public class CampaignChannelDestroy { rec.setLastCallDuration(billSec); rec.setLastConnected(billSec > 0); rec.setLastHangupCause(event.getHangupCause()); + rec.setLockKey(null); + rec.setLockTime(null); rec.setUpdatedTime(new Date()); callingListMapper.update(rec); }