拨开荷叶行,寻梦已然成。仙女莲花里,翩翩白鹭情。
IMG-LOGO
主页 文章列表 一种基于二分法的变步长批量处理算法

一种基于二分法的变步长批量处理算法

白鹭 - 2022-02-10 2128 0 0

1、前言

??变步长批量处理算法,在实作某些功能时,非常需要,如资料传输和资料汇入时,使用可变步长的批量处理算法,可以极大地提高系统的性能,,

1.1、资料传输

??在不稳定的网络环境下,传输失败的几率提高,大的资料块可能会传输失败,如果分为小的资料块,可以传输成功,但由于传输开销,传输效率低,因此希望在网络好的时候,传输大的资料块或高分辨率的影像;网络差的时候,传输小的资料块或对影像进行降质处理,调低影像分辨率,提高压缩比等,因此,可变概念,可以提升服务功能的质量,提升系统的可用性,

1.2、资料汇入

??资料汇入,特别是ETL,大量资料汇入,效率非常重要,对于大多数数据库,如Myql,批量新增(如100条记录)和单记录新增的时间消耗相差无几,但处理能力有百倍之差,

??曾经,笔者使用逐条资料insert,300万条记录汇入,化了半个小时,简直无法忍受,于是,后来改为使用100条批量insert,但由于资料中存在这种那种的例外资料,经常出现一条例外资料导致成批(100条)的资料汇入失败,这样,一次汇入资料中可能有几潭训资料,导致几百上千条资料没有入库成功,于是,再修改代码,针对这些没入库成功的几百上千条记录里,逐潭训入,检测出具体的坏资料,整个程序不堪回首,

??因此,资料汇入需要可变步长算法,这样可用极大提升资料汇入的处理能力,

??另外,还有一种Excel资料汇入,如规定按记录的编码(字符串型别,如身份证号、手机号、订单编号等,唯一键栏位)作为记录的特征栏位,但表格资料中有新增的,还有修改的,即如果为新的编码值,为新增记录,如果数据库中编码值已存在,则需要修改记录,这种汇入,如果采用固定批量值汇入,新增的失败率是很高的,如果批量一旦失败就改为逐潭训入的策略,也是效率不高,可变步长算法可非常高效地解决此问题,

1.3、其它应用场景

??对这种可变的需求,具体很大的通用性,如与搜索相关的,也可以用可变步长算法来提升处理性能,

2、算法原理

2.1、算法概述

??可变步长算法,不是个新鲜的概念,区别在于变化的依据和策略,如最大似然估计、梯度下降等,以及算法复杂度和是否简单易用,

??本算法可以归于简单的决策树,有贪婪算法的因子,计算量很少,界面呼叫可以内嵌方法(类似C++的指标函式)来执行批量处理作业和单条资料修正处理作业,二分法是非常经典的算法,本算法的核心还是二分法,也可以说依据下列公式而开发:

\[1 + 2 + 4 + ... + 2^n = 2^{n+1}-1 \]

??使用QoS(Quality of Service,服务等级)的概念,将处理等级与处理能力(批量值)建立联系,QoS等级对应的批量值为\(2^0\)\(2^n\)的一个连续序列,如:[128,64,32,16,8,4,2,1],上限为\(2^n\),这里n=7,下限为约定为1,等级0对应128,等级7对应1,等级值即为等级阵列的下标,

??具体算法如下:

  1. 对于长度为n的输入资料串列,型别为泛型型别T,即资料为List,另外提供2个T型别的串列自变量,为批量处理成功的资料串列和修正处理成功的资料串列,便于外部进一步处理(如相关资料一致性处理),
  2. 算法回传处理例外记录的日志串列,字符串型别,使用者可以调整日志的格式和内容,以便定位例外资料的具体位置,
  3. 设定一个布尔型的例外标志值bError,用于记录之前是否发生了批量处理的例外,初值为false,该例外标志值在单条记录的修正处理后被复位为false,
  4. 设定一个资料串列下标锚点anchorIdx,表示当前正在处理的资料位置,初始为0,
  5. 设定一个当前等级值levelIdx,初值随意,这里设定初值为0,即从最高等级开始,
  6. 如果bError为false,即当前无例外,按照当前等级对应的批量值进行批量处理,如果成功,则提升一个等级(如果已为等级上限,则维持),并更新锚点anchorIdx到新的位置;如果处理失败,则设定bError为true,锚点anchorIdx不变,如当前等级不为等级下限,则下降一个等级;如果已是等级下限,则按照单条资料的修正处理方法进行处理,如果修正成功,则加入修正资料串列中,如果修正失败,则加入例外日志串列中,不管修正成功与否,修正处理后,anchorIdx均加1,且设定bError为false,表示例外资料已给检测出并按照规则进行处置了,
  7. 如果bError为true,即当前处于例外状态,按照当前等级对应的批量值进行批量处理,如果成功,则下降一个等级(如果已为等级下限,则维持),并更新锚点anchorIdx到新的位置;如果失败,且不为等级下限值,则锚点anchorIdx不变,下降一个等级;如果失败,且当前等级为等级下限值,则按照单条资料的修正处理方法进行处理,如果修正成功,则加入修正资料串列中,如果修正失败,则加入例外日志串列中,不管修正成功与否,修正处理后,anchorIdx均加1,且设定bError为false,
  8. 结束条件,anchorIdx到达n,即所有资料被处理完毕,

??这里有两个外部方法,批量处理方法和单条修正方法,这是一个非常抽象的界面方法,具体对资料进行怎么处理,由外部根据需要自行确定,后面提供的单元测验代码,给出批量资料型别转换的例子,

??当bError为true时,批量处理成功,仍然要降级,是因为前面等级发生例外时,资料范围包括了后面等级批量值之和的范围:

\[2^{n+1}>2^n+...+2+1 \]

??当等级批量值\(2^{n+1}\)批量尝试发生例外时,设定bError为true,然后如果\(2^n\)批量处理成功,意味着后面\(2^n\)批量必然失败,因此必须降级,才可能避免失败;当然如果前\(2^n\)批量处理失败,意味着例外资料至少在这些资料中存在,也需要降级尝试,才可能避免失败,

2.2、算法代码

??算法代码使用java语言,很容易转成python或其它语言,代码在github上:https://github.com/alabo1999/framework_algoset/commit/dacefcbc9bf2ad95c05ab7671a12054178e7f90e 包含一个算法类档案BatchProcess.java和单元测验档案BatchProcessTest.java,其中BatchProcess.java只有一个方法,该方法的界面形式如下:

	/**
	 * @methodName		: varStepBatchProcess
	 * @description		: 可变步长的批量处理算法
	 * @param <T>		: 泛型型别
	 * @param object	: 提供batchProcMethod和singleProcMethod方法的类物件
	 * @param dataRowList		: 待处理的T型别物件资料串列
	 * @param normalList		: 正常处理的物件串列
	 * @param correctList		: 修改处理的物件串列
	 * @param batchProcMethod	: 正常批量处理的方法
	 * @param singleProcMethod	: 单条修正处理的方法
	 * @param debugLevel		: 除错信息输出设定,bit0-输出修正处理信息,bit1-输出详细步骤,bit2:输出尝试次数
	 * @return					: 处理程序产生的例外日志串列
	 */
	public static <T> List<String> varStepBatchProcess(
			Object object,
			List<T> dataRowList,
			List<T> normalList,
			List<T> correctList,			
			Method batchProcMethod,
			Method singleProcMethod,
			int debugLevel);

2.3、算法测验

??单元测验档案BatchProcessTest.java,给出了算法测验,代码如下:

package com.abc.example.service;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;

import com.abc.example.common.impexp.BatchProcess;

/**
 * @className	: BatchProcessTest
 * @description	: 批量处理测验类
 * @summary		:
 * @history		:
 * ------------------------------------------------------------------------------
 * date			version		modifier		remarks                   
 * ------------------------------------------------------------------------------
 * 2022/01/20	1.0.0		sheng.zheng		初版
 *
 */
@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
public class BatchProcessTest {
	
	@Test
	// 可变步长的批量处理算法测验
	public void varStepBatchProcessTest() {
		
		// 构造待处理的资料,资料型别为String
		List<String> dataRowList = new ArrayList<String>();
		int idx = 0;
		for (int i = 0; i < 1000; i++) {
			String str = "";
			if (i % 129 == 0) {
				str = i + ".1";
				idx ++;
				if (idx % 2 == 0) {
					str += "abc";
				}
			}else {
				str = i + "";
			}
			dataRowList.add(str);
		}
		
		System.out.println(dataRowList);
		
		// 呼叫算法
		Method method1 = getMethodByName(this,"batchProcMethod");
		Method method2 = getMethodByName(this,"singleProcMethod");
		// 用于存放正常批量处理的资料
		List<String> normalList = new ArrayList<String>();
		// 用于存放修正处理的资料
		List<String> correctList = new ArrayList<String>();
		// 呼叫算法
		List<String> errorList = BatchProcess.varStepBatchProcess(this, dataRowList, 
				normalList, correctList, method1, method2,0x05);
		// 打印errorList
		System.out.println("errorList: " + errorList.toString());
		// 打印correctList
		System.out.println("correctList: " + correctList.toString());
		
	}
	
	// 构造批量处理的方法
	// 将串列中字符串,批量转为整型,被反射呼叫,必须是public的
	public void batchProcMethod(List<String> subDataList) {
		for (String item : subDataList) {
			Integer.valueOf(item);
		}
	}
	
	// 构造单记录处理的方法,被反射呼叫,必须是public的
	public String singleProcMethod(Exception e,String item) {
		String errInfo = "";
		try {
			Double.valueOf(item).intValue();
		}catch(Exception ex) {
			errInfo = ex.getMessage();
		}
		
		return errInfo;
	}
	
	// 根据方法名称获取方法物件
	private Method getMethodByName(Object object,String methodName) {
		Class<?> class1 = object.getClass();
		Method retItem = null;
		Method[] methods = class1.getMethods();
		for (int i = 0; i < methods.length; i++) {
			Method item = methods[i];
			// System.out.println(item.getName());
			if (item.getName() == methodName) {
				retItem = item;
				break;
			}
		}
		return retItem;
	}	

}

??构造了1000个字符串的串列,这里使用String来作为资料型别,除了下标为129的整数倍之外的值=i+""的字符串,其它的取值,如果为129的偶数倍,则为i+"0.1"的字符串;如果为129的奇数倍,则为i+”0.1abc“,

??批量处理方法batchProcMethod,实作对当前批量资料的型别转换,将字符串转为整数,当资料串列包含.1或.1abc的例外资料时,Integer.valueOf(item);会抛出例外,导致该批量处理失败;

??单资料修正方法singleProcMethod,实作了处理"0.1"之类的double型字符串转换成,但针对".1abc"后缀的资料仍然处理失败,

??执行测验代码,输出结果如下:

single tryNum = 9, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 0,bError=true
0.1: can be fixed.
single tryNum = 24, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 129,bError=true
129.1abc: can not be fixed.
single tryNum = 39, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 258,bError=true
258.1: can be fixed.
single tryNum = 54, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 387,bError=true
387.1abc: can not be fixed.
single tryNum = 69, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 516,bError=true
516.1: can be fixed.
single tryNum = 84, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 645,bError=true
645.1abc: can not be fixed.
single tryNum = 99, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 774,bError=true
774.1: can be fixed.
single tryNum = 114, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 903,bError=true
903.1abc: can not be fixed.
tryNum: 120
errorList: [For input string: "129.1abc", For input string: "387.1abc", For input string: "645.1abc", For input string: "903.1abc"]
correctList: [0.1, 258.1, 516.1, 774.1]

??tryNum: 120,表示这个算法总共尝试了120次处理尝试,包括批量处理和修正处理,需要想要看每一次处理尝试的详细信息,将debugLevel设定为7,可以看到详细的输出(摘录部分):

batch  tryNum = 1, levelIdx = 0,levelNum = 128,batchNum = 128,anchorIdx = 0,bError=false
java.lang.reflect.InvocationTargetException
batch  tryNum = 2, levelIdx = 1,levelNum = 64,batchNum = 64,anchorIdx = 0,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 3, levelIdx = 2,levelNum = 32,batchNum = 32,anchorIdx = 0,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 4, levelIdx = 3,levelNum = 16,batchNum = 16,anchorIdx = 0,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 5, levelIdx = 4,levelNum = 8,batchNum = 8,anchorIdx = 0,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 6, levelIdx = 5,levelNum = 4,batchNum = 4,anchorIdx = 0,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 7, levelIdx = 6,levelNum = 2,batchNum = 2,anchorIdx = 0,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 8, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 0,bError=true
java.lang.reflect.InvocationTargetException
single tryNum = 9, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 0,bError=true
0.1: can be fixed.
batch  tryNum = 10, levelIdx = 6,levelNum = 2,batchNum = 2,anchorIdx = 1,bError=false
batch  tryNum = 11, levelIdx = 5,levelNum = 4,batchNum = 4,anchorIdx = 3,bError=false
batch  tryNum = 12, levelIdx = 4,levelNum = 8,batchNum = 8,anchorIdx = 7,bError=false
batch  tryNum = 13, levelIdx = 3,levelNum = 16,batchNum = 16,anchorIdx = 15,bError=false
batch  tryNum = 14, levelIdx = 2,levelNum = 32,batchNum = 32,anchorIdx = 31,bError=false
batch  tryNum = 15, levelIdx = 1,levelNum = 64,batchNum = 64,anchorIdx = 63,bError=false
batch  tryNum = 16, levelIdx = 0,levelNum = 128,batchNum = 128,anchorIdx = 127,bError=false
java.lang.reflect.InvocationTargetException
batch  tryNum = 17, levelIdx = 1,levelNum = 64,batchNum = 64,anchorIdx = 127,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 18, levelIdx = 2,levelNum = 32,batchNum = 32,anchorIdx = 127,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 19, levelIdx = 3,levelNum = 16,batchNum = 16,anchorIdx = 127,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 20, levelIdx = 4,levelNum = 8,batchNum = 8,anchorIdx = 127,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 21, levelIdx = 5,levelNum = 4,batchNum = 4,anchorIdx = 127,bError=true
java.lang.reflect.InvocationTargetException
batch  tryNum = 22, levelIdx = 6,levelNum = 2,batchNum = 2,anchorIdx = 127,bError=true
batch  tryNum = 23, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 129,bError=true
java.lang.reflect.InvocationTargetException
single tryNum = 24, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 129,bError=true
129.1abc: can not be fixed.

3、算法在资料汇入时的应用

??批量处理方法batchProcMethod,实作insertItems陈述句即可,注意,不要使用try/catch,否则拦截了例外,算法会认为批量处理成功,

??单资料修正方法singleProcMethod,实作updateItems陈述句即可,注意由于此时实作按特征栏位的值,修改记录的汇入栏位的值,实际上只修改了一条记录,必须使用选择性栏位更新方法,避免未汇入栏位被item物件的默认值所覆写,

??大致形式如下:

	public <T> void batchProcMethod(List<T> subDataList) {
		xxxDao.insertItems(subDataList);
	}
	
	// 构造单记录处理的方法,被反射呼叫,必须是public的
	public <T> String singleProcMethod(Exception e,T item) {
		String errInfo = "";
		if (e instanceof DuplicateKeyException) {
			// 如果唯一键重复,则update
			// 根据汇入栏位串列,抽取相关栏位
			// String[] importFieldNames = new String[]{"fieldname1","fieldname2",...,"fieldnamen"};
			// Map<String,Object> map = itemToMap(item,importFieldNames);
			xxxDao.updateItems(map)
		}else{
            errInfo = e.getMessage();
        }
		return errInfo;
	}

??其中itemToMap方法,可使用反射方法,撰写为静态方法,供所有物体类使用,

4、算法应用注意事项

??算法代码在github上,公开状态,使用了apache license 2.0,如需使用,请按照许可证要求即可,

作者:阿拉伯1999 出处:http://www.cnblogs.com/alabo1999/ 本文著作权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段宣告,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利. 养成良好习惯,好文章随手顶一下,
标签:

0 评论

发表评论

您的电子邮件地址不会被公开。 必填的字段已做标记 *