refactor calling list

This commit is contained in:
东煌 2020-09-13 11:24:07 +08:00
parent 73d9ad7bd6
commit 109532b21a
5 changed files with 173 additions and 10 deletions

View File

@ -0,0 +1,27 @@
package com.pudonghot.yo.campaign.service;
import java.util.List;
import com.wacai.tigon.service.BaseCrudService;
import com.pudonghot.yo.model.domain.CallingList;
/**
* @author Donghuang
* @date Sep 13, 2020 10:32:54
*/
public interface CallingListService extends BaseCrudService<Integer, CallingList> {
/**
* list limit ready records
*
* @param campaignId campaignId
* @param limit limit
* @return CallingLists
*/
List<CallingList> listReady(final Integer campaignId, final int limit);
/**
* release calling list lock
* @param callingList callingList
*/
void releaseCallingListLock(final CallingList callingList);
}

View File

@ -0,0 +1,85 @@
package com.pudonghot.yo.campaign.service.impl;
import java.util.Map;
import java.util.Date;
import java.util.List;
import java.util.HashMap;
import lombok.extern.slf4j.Slf4j;
import com.pudonghot.yo.util.TimeUtils;
import com.wacai.tigon.mybatis.Search;
import com.wacai.tigon.sequence.IdSequence;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.stereotype.Service;
import com.pudonghot.yo.mapper.CallingListMapper;
import com.pudonghot.yo.model.domain.CallingList;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import com.pudonghot.yo.campaign.service.CallingListService;
import com.wacai.tigon.service.support.BaseCrudServiceSupport;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @author Donghuang
* @date Sep 13, 2020 10:33:54
*/
@Slf4j
@Service
public class CallingListServiceImpl
extends BaseCrudServiceSupport<Integer, CallingList, CallingListMapper>
implements CallingListService {
@Autowired
private IdSequence idSeq;
@Value("${yo.br.campaign.callinglist-lock-expire.seconds:300}")
private int expireLockDuration;
@Scheduled(fixedRateString = "${yo.br.campaign.callinglist-lock-expire.rate:180000}")
public void expireLock() {
log.info("Expire calling list lock task");
final Date expireTime = DateUtils.addSeconds(new Date(), -expireLockDuration);
scan(new Search(CallingList.ACTIVE, true)
.gt(CallingList.CREATED_TIME, DateUtils.addDays(new Date(), -1))
.notNull(CallingList.LAST_CONN_ID)
.lt(CallingList.LOCK_TIME, expireTime), callingList -> {
callingList.setLockTime(null);
callingList.setLockKey(null);
mapper.update(callingList);
});
}
/**
* {@inheritDoc}
*/
@Override
public List<CallingList> listReady(final Integer campaignId, int limit) {
final String lockKey = idSeq.get();
final Map<String, Object> update = new HashMap<>(8);
update.put(CallingList.LOCK_KEY, lockKey);
update.put(CallingList.LOCK_TIME, new Date());
update.put(CallingList.STATUS, CallingList.Status.ACHIEVED);
final int time = TimeUtils.secondOfDay(new Date());
mapper.update(update,
new Search(CallingList.CAMPAIGN_ID, campaignId)
.eq(CallingList.STATUS, CallingList.Status.READY)
.isNull(CallingList.LOCK_KEY)
.lt(CallingList.DAILY_FROM, time)
.gt(CallingList.DAILY_TO, time)
.limit(limit));
return mapper.list(new Search(CallingList.CAMPAIGN_ID, campaignId)
.eq(CallingList.LOCK_KEY, lockKey).limit(limit));
}
/**
* {@inheritDoc}
*/
@Override
public void releaseCallingListLock(final CallingList callingList) {
callingList.setLockKey(null);
callingList.setLockTime(null);
mapper.update(callingList);
}
}

View File

@ -1,21 +1,26 @@
package com.pudonghot.yo.campaign.service.impl;
import java.util.Date;
import java.util.List;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.concurrent.TimeUnit;
import com.pudonghot.yo.util.LogMDC;
import lombok.RequiredArgsConstructor;
import com.wacai.tigon.mybatis.Search;
import com.pudonghot.yo.util.TimeUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.StringUtils;
import com.pudonghot.yo.mapper.CampaignMapper;
import com.pudonghot.yo.model.domain.Campaign;
import org.springframework.stereotype.Service;
import com.pudonghot.yo.model.domain.Campaign;
import com.pudonghot.yo.mapper.AgentStatusMapper;
import com.pudonghot.yo.service.CommonCallDataService;
import com.pudonghot.yo.fsagent.api.CampaignDialService;
import com.pudonghot.yo.campaign.service.CampaignService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import com.pudonghot.yo.fsagent.api.request.ReqCampaignDial;
import org.springframework.beans.factory.annotation.Autowired;
@ -37,6 +42,10 @@ public class CampaignServiceImpl
implements CampaignService {
private final ThreadPoolTaskExecutor taskExecutor;
@Value("${yo.campaign.dial-batch:8}")
private int dialBatchSize;
@Value("${yo.campaign.dial-sleep:12000}")
private int dialBatchSleep;
@Autowired
private AgentStatusMapper agentStatusMapper;
@Autowired
@ -94,9 +103,6 @@ public class CampaignServiceImpl
return;
}
// TODO campaign dial
// Outbound = (readyAgent + expNum) * z * speedAdjust - taskNum;
final int optimizationValue = campaign.getOptimizationValue();
int fetchCount = 0;
if (countIdleAgentOfQueue > 4) {
@ -125,13 +131,32 @@ public class CampaignServiceImpl
campaignId, campaignKey, StringUtils.join(data, "|"));
}
// TODO insert DB
final ReqCampaignDial req= new ReqCampaignDial();
req.setCampaignId(campaignId);
req.setCallingList(Stream.of(data)
.map(RespCallingList.CallingData::toPair)
.collect(Collectors.toList()));
dialService.queueDial(req);
final List<Pair<String, String>> dataList = Stream.of(data)
.map(RespCallingList.CallingData::toPair)
.collect(Collectors.toList());
eachBatch(dataList, dialBatchSize, cl -> {
req.setCallingList(cl);
dialService.queueDial(req);
try {
log.debug("Sleep [{}] milliseconds.", dialBatchSize);
TimeUnit.MILLISECONDS.sleep(dialBatchSleep);
}
catch (InterruptedException e) {
log.warn("Campaign dial sleep exception caused.", e);
}
});
}
<T> void eachBatch(
final List<T> list,
final int batch,
final Consumer<List<T>> consumer) {
final int size = list.size();
for (int i = 0; i < size; i += batch) {
consumer.accept(list.subList(i, Math.min(size, i + batch)));
}
}
}

View File

@ -0,0 +1,24 @@
package com.pudonghot.yo.campaign.service.impl;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
/**
* @author Donghuang
* @date Sep 13, 2020 10:25:44
*/
@Slf4j
public class CampaignServiceImplTest {
@Test
public void testEachBatch() {
List<Integer> list = new ArrayList<>(100);
for (int i = 0; i < 100; i++) {
list.add(i);
}
new CampaignServiceImpl(null).eachBatch(list, 8, sub -> log.info("{}", sub));
}
}

View File

@ -72,6 +72,8 @@ public class CampaignChannelDestroy {
rec.setLastCallDuration(billSec);
rec.setLastConnected(billSec > 0);
rec.setLastHangupCause(event.getHangupCause());
rec.setLockKey(null);
rec.setLockTime(null);
rec.setUpdatedTime(new Date());
callingListMapper.update(rec);
}