[转]并发编程:Actors模型和CSP模型

一、前言
不同的编程模型与具体的语言无关,大部分现代语言都可以通过巧妙地结构处理实现不同的模型.杂谈的意思是很杂,想到哪儿写到哪儿,不对正确性负责 :D.

二、Actors模型
传统的并发模型主要由两种实现的形式,一是同一个进程下,多个线程天然的共享内存,由程序对读写做同步控制(有锁或无锁). 二是多个进程通过进程间通讯或者内存映射实现数据的同步.

Actors模型更多的使用消息机制来实现并发,目标是让开发者不再考虑线程这种东西,每个Actor最多同时只能进行一样工作,Actor内部可以有自己的变量和数据.

Actors模型避免了由操作系统进行任务调度的问题,在操作系统进程之上,多个Actor可能运行在同一个进程(或线程)中.这就节省了大量的Context切换.

在Actors模型中,每个Actor都有一个专属的命名”邮箱”, 其他Actor可以随时选择一个Actor通过邮箱收发数据,对于“邮箱”的维护,通常是使用发布订阅的机制实现的,比如我们可以定义发布者是自己,订阅者可以是某个Socket接口,另外的消息总线或者直接是目标Actor.

目前akka库是比较流行的Actors编程模型实现,支持Scala和Java语言.

三、CSP模型
CSP(Communicating Sequential Process)模型提供一种多个进程公用的“管道(channel)”, 这个channel中存放的是一个个”任务”.

目前正流行的go语言中的goroutine就是参考的CSP模型,原始的CSP中channel里的任务都是立即执行的,而go语言为其增加了一个缓存,即任务可以先暂存起来,等待执行进程准备好了再逐个按顺序执行.

四、CSP和Actor的区别
CSP进程通常是同步的(即任务被推送进Channel就立即执行,如果任务执行的线程正忙,则发送者就暂时无法推送新任务),Actor进程通常是异步的(消息传递给Actor后并不一定马上执行).
CSP中的Channel通常是匿名的, 即任务放进Channel之后你并不需要知道是哪个Channel在执行任务,而Actor是有“身份”的,你可以明确的知道哪个Actor在执行任务.
在CSP中,我们只能通过Channel在任务间传递消息, 在Actor中我们可以直接从一个Actor往另一个Actor传输数据.
CSP中消息的交互是同步的,Actor中支持异步的消息交互.

五、参考文档
Scala中的actors和Go中的goroutines对比
CSP Model From Wiki

《OOD启思录》:61条面向对象设计的经验原则

摘自《OOD启思录》 Arthur J.Riel【著】; 鲍志云【译】

“你不必严格遵守这些原则,违背它们也不会被处以宗教刑罚。但你应当把这些原则看成警铃,若违背了其中的一条,那么警铃就会响起。” —– Arthur J.Riel

(1) 所有数据都应该隐藏在所在的类的内部。

(2) 类的使用者必须依赖类的共有接口,但类不能依赖它的使用者。

(3) 尽量减少类的协议中的消息。

(4) 实现所有类都理解的最基本公有接口[例如,拷贝操作(深拷贝和浅拷贝)、相等性判断、正确输出内容、从ASCII描述解析等等]。

(5) 不要把实现细节(例如放置共用代码的私有函数)放到类的公有接口中。 如果类的两个方法有一段公共代码,那么就可以创建一个防止这些公共代码的私有函数。

(6) 不要以用户无法使用或不感兴趣的东西扰乱类的公有接口。

(7) 类之间应该零耦合,或者只有导出耦合关系。也即,一个类要么同另一个类毫无关系,要么只使用另一个类的公有接口中的操作。

(8) 类应该只表示一个关键抽象。包中的所有类对于同一类性质的变化应该是共同封闭的。一个变化若对一个包影响,则将对包中的所有类产生影响,而对其他的包不造成任何影响 .

(9) 把相关的数据和行为集中放置。设计者应当留意那些通过get之类操作从别的对象中获取数据的对象。这种类型的行为暗示着这条经验原则被违反了。

(10) 把不相关的信息放在另一个类中(也即:互不沟通的行为)。朝着稳定的方向进行依赖.

(11) 确保你为之建模的抽象概念是类,而不只是对象扮演的角色。

(12) 在水平方向上尽可能统一地分布系统功能,也即:按照设计,顶层类应当统一地共享工作。

(13) 在你的系统中不要创建全能类/对象。对名字包含Driver、Manager、System、Susystem的类要特别多加小心。规划一个接口而不是实现一个接口。

(14) 对公共接口中定义了大量访问方法的类多加小心。大量访问方法意味着相关数据和行为没有集中存放。

(15) 对包含太多互不沟通的行为的类多加小心。这个问题的另一表现是在你的应用程序中的类的公有接口中创建了很多的get和set函数。

(16) 在由同用户界面交互的面向对象模型构成的应用程序中,模型不应该依赖于界面,界面则应当依赖于模型。

(17) 尽可能地按照现实世界建模(我们常常为了遵守系统功能分布原则、避免全能类原则以及集中放置相关数据和行为的原则而违背这条原则) 。

(18) 从你的设计中去除不需要的类。一般来说,我们会把这个类降级成一个属性。

(19) 去除系统外的类。系统外的类的特点是,抽象地看它们只往系统领域发送消息但并不接受系统领域内其他类发出的消息。

(20) 不要把操作变成类。质疑任何名字是动词或者派生自动词的类,特别是只有一个有意义行为的类。考虑一下那个有意义的行为是否应当迁移到已经存在或者尚未发现的某个类中。

(21) 我们在创建应用程序的分析模型时常常引入代理类。在设计阶段,我们常会发现很多代理没有用的,应当去除。

(22) 尽量减少类的协作者的数量。一个类用到的其他类的数目应当尽量少。

(23) 尽量减少类和协作者之间传递的消息的数量。

(24) 尽量减少类和协作者之间的协作量,也即:减少类和协作者之间传递的不同消息的数量。

(25) 尽量减少类的扇出,也即:减少类定义的消息数和发送的消息数的乘积。

(26) 如果类包含另一个类的对象,那么包含类应当给被包含的对象发送消息。也即:包含关系总是意味着使用关系。

(27) 类中定义的大多数方法都应当在大多数时间里使用大多数数据成员。

(28) 类包含的对象数目不应当超过开发者短期记忆的容量。这个数目常常是6。当类包含多于6个数据成员时,可以把逻辑相关的数据成员划分为一组,然后用一个新的包含类去包含这一组成员。

(29) 让系统功能在窄而深的继承体系中垂直分布。

(30) 在实现语义约束时,最好根据类定义来实现。这常常会导致类泛滥成灾,在这种情况下,约束应当在类的行为中实现,通常是在构造函数中实现,但不是必须如此。

(31) 在类的构造函数中实现语义约束时,把约束测试放在构造函数领域所允许的尽量深的包含层次中。

(32) 约束所依赖的语义信息如果经常改变,那么最好放在一个集中式的第3方对象中。

(33) 约束所依赖的语义信息如果很少改变,那么最好分布在约束所涉及的各个类中。

(34) 类必须知道它包含什么,但是不能知道谁包含它。

(35) 共享字面范围(也就是被同一个类所包含)的对象相互之间不应当有使用关系。

(36) 继承只应被用来为特化层次结构建模。

(37) 派生类必须知道基类,基类不应该知道关于它们的派生类的任何信息。

(38) 基类中的所有数据都应当是私有的,不要使用保护数据。类的设计者永远都不应该把类的使用者不需要的东西放在公有接口中。

(39) 在理论上,继承层次体系应当深一点,越深越好。

(40) 在实践中,继承层次体系的深度不应当超出一个普通人的短期记忆能力。一个广为接受的深度值是6。

(41) 所有的抽象类都应当是基类。

(42) 所有的基类都应当是抽象类。

(43) 把数据、行为和/或接口的共性尽可能地放到继承层次体系的高端。

(44) 如果两个或更多个类共享公共数据(但没有公共行为),那么应当把公共数据放在一个类中,每个共享这个数据的类都包含这个类。

(45) 如果两个或更多个类有共同的数据和行为(就是方法),那么这些类的每一个都应当从一个表示了这些数据和方法的公共基类继承。

(46) 如果两个或更多个类共享公共接口(指的是消息,而不是方法),那么只有他们需要被多态地使用时,他们才应当从一个公共基类继承。

(47) 对对象类型的显示的分情况分析一般是错误的。在大多数这样的情况下,设计者应当使用多态。

(48) 对属性值的显示的分情况分析常常是错误的。类应当解耦合成一个继承层次结构,每个属性值都被变换成一个派生类。

(49) 不要通过继承关系来为类的动态语义建模。试图用静态语义关系来为动态语义建模会导致在运行时切换类型。

(50) 不要把类的对象变成派生类。对任何只有一个实例的派生类都要多加小心。

(51) 如果你觉得需要在运行时刻创建新的类,那么退后一步以认清你要创建的是对象。现在,把这些对象概括成一个类。

(52) 在派生类中用空方法(也就是什么也不做的方法)来覆写基类中的方法应当是非法的。

(53) 不要把可选包含同对继承的需要相混淆。把可选包含建模成继承会带来泛滥成灾的类。

(54) 在创建继承层次时,试着创建可复用的框架,而不是可复用的组件。

(55) 如果你在设计中使用了多重继承,先假设你犯了错误。如果没犯错误,你需要设法证明。

(56) 只要在面向对象设计中用到了继承,问自己两个问题:(1)派生类是否是它继承的那个东西的一个特殊类型?(2)基类是不是派生类的一部分?

(57) 如果你在一个面向对象设计中发现了多重继承关系,确保没有哪个基类实际上是另一个基类的派生类。

(58) 在面向对象设计中如果你需要在包含关系和关联关系间作出选择,请选择包含关系。

(59) 不要把全局数据或全局函数用于类的对象的薄记工作。应当使用类变量或类方法。

(60) 面向对象设计者不应当让物理设计准则来破坏他们的逻辑设计。但是,在对逻辑设计作出决策的过程中我们经常用到物理设计准则。

(61) 不要绕开公共接口去修改对象的状态。

哲学家就餐问题 [go/java 实现练习并发编程]

java

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

class Philosopher implements Runnable{
	private final String name;
	private final ReentrantLock left;
	private final ReentrantLock right;
	private final AtomicInteger hunger = new AtomicInteger(3);
	public Philosopher(String name, ReentrantLock left, ReentrantLock right) {
		this.name = name;
		this.left = left;
		this.right = right;
	}
	@Override
	public void run() {
		while (this.hunger.get() > 0) {
			System.out.println(name + " Hungry");
			this.left.lock();
			this.right.lock();
			System.out.println(name + " Eating");
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			this.right.unlock();
			this.left.unlock();
			System.out.println(name + " Thinking");
			this.hunger.decrementAndGet();
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}	
}

public class DiningPhilosopher {
	public static void main(String[] args) {
		ExecutorService executor = Executors.newCachedThreadPool();
		String[] philosophers = new String[] {"Mark", "Russell", "Rocky", "Haris", "Root"};
		ReentrantLock fork0 = new ReentrantLock();
		ReentrantLock forkLeftLock = fork0;
		List<Philosopher> philosopherList = new ArrayList<Philosopher>();
		for(int i=1;i<philosophers.length;i++) {
			String name = philosophers[i];
			ReentrantLock forkRightLock = new ReentrantLock();
			philosopherList.add(new Philosopher(name, forkLeftLock, forkRightLock));
			forkLeftLock = forkRightLock;
		}
		philosopherList.add(new Philosopher(philosophers[0], fork0, forkLeftLock));
		for (Philosopher philosopher : philosopherList) {
			executor.submit(philosopher);
		}
		executor.shutdown();
	}
}

go

package main

import (
	"hash/fnv"
	"log"
	"math/rand"
	"os"
	"sync"
	"time"
)

// 初始化5个人
var ph = []string{"Mark", "Russell", "Rocky", "Haris", "Root"}

const hunger = 3                // 每个哲学家吃几次饭
const think = time.Second / 100 // 思考时间
const eat = time.Second / 100   // 吃饭时间

var fmt = log.New(os.Stdout, "", 0)

var dining sync.WaitGroup

func diningProblem(phName string, dominantHand, otherHand *sync.Mutex) {
	fmt.Println(phName, "Seated")
	h := fnv.New64a()
	h.Write([]byte(phName))
	rg := rand.New(rand.NewSource(int64(h.Sum64())))
	rSleep := func(t time.Duration) {
		time.Sleep(t/2 + time.Duration(rg.Int63n(int64(t))))
	}
	for h := hunger; h > 0; h-- {
		fmt.Println(phName, "Hungry")
		dominantHand.Lock() // 获取资源
		otherHand.Lock()
		fmt.Println(phName, "Eating")
		rSleep(eat)
		dominantHand.Unlock() // 释放资源
		otherHand.Unlock()
		fmt.Println(phName, "Thinking")
		rSleep(think)
	}
	fmt.Println(phName, "Satisfied")
	dining.Done()
	fmt.Println(phName, "Left the table")
}

func main() {
	fmt.Println("Table empty")
	dining.Add(5)
	fork0 := &sync.Mutex{}
	forkLeft := fork0
	for i := 1; i < len(ph); i++ {
		forkRight := &sync.Mutex{}
		go diningProblem(ph[i], forkLeft, forkRight)
		forkLeft = forkRight
	}
	go diningProblem(ph[0], fork0, forkLeft)
	dining.Wait() // 等待结束
	fmt.Println("Table empty")
}

技术提升的一个方法

最近看到知乎上的专栏https://zhuanlan.zhihu.com/c_183152541,感触颇多。

要突破自己首先的热爱编程,敬畏程序。毕竟有道无术,术尚可求,有术无道,止于术也。

你发现你的开发过程中总是需要重复复制-稍微改吧改吧-粘贴这个过程,你有没有去尝试阅读并实践《重构》?

你有没有尝试去自己发布一个公用的库?JCenter/Maven Central都是免费的。

你发现你每次修改完代码要抄起Postman点来点去

你有没有尝试去编写一个集成测试,代替手工的劳动?

你发现你碰到了很多奇奇怪怪你搞不明白的问题,只能一次次地尝试每个搜索结果中提到的解决方案,期望其中的某一个好使。
你有没有尝试过去阅读相关的书籍,查阅相关文档?

如何突破自己
如何成为顶级程序员
跳出弱鸡循环1
跳出弱鸡循环2

总结下来提升手段:
1.实践自动化测试

当你觉得技术上遇到瓶颈的时候,可以通过学习和实践自动化测试,来补全自己在相关工程领域的技术短板,提升自己的技术能力
从测试中学到的不仅是写测试用例的能力,而是一整套的工程化、自动化的能力。

实践流程
第一步,去看一下《Maven实战》,了解一下Maven的测试是怎么工作的。之所以让你去研究Maven,是因为你们这种系统,99%不会采用Gradle这种新技术的。
第二步,写第一个测试,代码如下:

public class MyTest {
    @Test
    public void 跳出弱鸡循环() {
    }
}

第三步,去搞一份你们线上数据库的表结构。各种数据库都有相应的命令dump表结构。有困难的的话,手写建表语句。
第四步,本地用Docker启动一个临时的数据库。
第五步,去研究一下flyway,用自动化方式把表结构灌到这个临时数据库里。
第六步,去了解一下你们的应用是怎么部署的,你们上线的应用不可能是通过在IDE里面点绿色三角来部署的。把部署的命令行要过来。
第七步,研究一下这个命令行,尝试在本地启动起来。碰到数据库没起来的问题,就把连接串改成刚刚那个Docker的临时数据库。
第八步,你平时怎么在网页上点点点测试的,把它翻译成Java。比如你平时会手工测试登录接口,那就用HttpClient写一段代码,模拟登录。
第九步,把上面这些整合起来:

public class MyTest {
    @Test
    public void 跳出弱鸡循环() {
        启动测试数据库();
        把表结构灌进去();
        本地启动应用();
        自动化方式测试接口();
    }
}

继续阅读“技术提升的一个方法”

Java CountDownLatch 和 CyclicBarrier 示例

复习了一下 JCIP 回顾一下同步工具类的使用

CountDownLatch


import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchExample1 {

	private final static int threadCount = 200;

	public static void main(String[] args) throws Exception {

		ExecutorService exec = Executors.newCachedThreadPool();

		final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

		for (int i = 0; i < threadCount; i++) {
			final int threadNum = i;
			exec.execute(() -> {
				try {
					test(threadNum);
				} catch (Exception e) {
					e.printStackTrace();
				} finally {
					countDownLatch.countDown();
				}
			});
		}
		countDownLatch.await();
		System.out.println("finish");
		exec.shutdown();
	}

	private static void test(int threadNum) throws Exception {
		Thread.sleep(100);
		System.out.println("{" + threadNum + "}");
		Thread.sleep(100);
	}
}

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CountDownLatchExample2 {
	private final static int threadCount = 200;

	public static void main(String[] args) throws Exception {

		ExecutorService exec = Executors.newCachedThreadPool();

		final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

		for (int i = 0; i < threadCount; i++) {
			final int threadNum = i;
			exec.execute(() -> {
				try {
					test(threadNum);
				} catch (Exception e) {
					e.printStackTrace();
				} finally {
					countDownLatch.countDown();
				}
			});
		}
                //超时之后不阻塞,直接继续运行
		countDownLatch.await(10, TimeUnit.MILLISECONDS);
		System.out.println("finish");
		exec.shutdown();
	}

	private static void test(int threadNum) throws Exception {
		Thread.sleep(100);
		System.out.println("{" + threadNum + "}");
	}
}
import java.util.concurrent.CountDownLatch;

public class TestCountDownLatch {

	public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {

		final CountDownLatch startGate = new CountDownLatch(1);
		final CountDownLatch endGate = new CountDownLatch(nThreads);

		for (int i = 0; i < nThreads; i++) {
			Thread thread = new Thread() {
				@Override
				public void run() {
					try {
						startGate.await();
						try {
							task.run();
						} finally {
							endGate.countDown();
						}
					} catch (InterruptedException e) {

					}

				}
			};
			thread.start();
		}

		long start = System.nanoTime();
		startGate.countDown();
		endGate.await();
		long end = System.nanoTime();
		return end - start;
	}

	public static void main(String[] args) {
		int nTask = 10;
		TestCountDownLatch testCountDownLatch = new TestCountDownLatch();
		try {
			long timeUse = testCountDownLatch.timeTasks(nTask, new Task());
			System.out.println(nTask + " use time " + timeUse);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

	}

}

class Task implements Runnable {

	@Override
	public void run() {
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

CyclicBarrier

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierExample1 {

	private static CyclicBarrier barrier = new CyclicBarrier(5);

	public static void main(String[] args) throws Exception {

		ExecutorService executor = Executors.newCachedThreadPool();

		for (int i = 0; i < 10; i++) {
			final int threadNum = i;
			Thread.sleep(1000);
			executor.execute(() -> {
				try {
					race(threadNum);
				} catch (Exception e) {
					e.printStackTrace();
				}
			});
		}
		executor.shutdown();
	}

	private static void race(int threadNum) throws Exception {
		Thread.sleep(1000);
		System.out.println("{" + threadNum + "} is ready ");
		barrier.await();
		System.out.println("{" + threadNum + "} continue ");
	}
}
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CyclicBarrierExample2 {
	private static CyclicBarrier barrier = new CyclicBarrier(5);

	public static void main(String[] args) throws Exception {

		ExecutorService executor = Executors.newCachedThreadPool();

		for (int i = 0; i < 10; i++) {
			final int threadNum = i;
			Thread.sleep(1000);
			executor.execute(() -> {
				try {
					race(threadNum);
				} catch (Exception e) {
					e.printStackTrace();
				}
			});
		}
		executor.shutdown();
	}

	private static void race(int threadNum) throws Exception {
		Thread.sleep(1000);
		System.out.println("{" + threadNum + "} is ready");
		try {
                        //等待超时之后直接放开栅栏,让线程执行,不需要等到规定线程数都就位
			barrier.await(2000, TimeUnit.MILLISECONDS);
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.out.println("{" + threadNum + "} continue");
	}
}
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierExample3 {
        //开始之前执行回调函数
	private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
		System.out.println("callback is running");
	});

	public static void main(String[] args) throws Exception {

		ExecutorService executor = Executors.newCachedThreadPool();

		for (int i = 0; i < 10; i++) {
			final int threadNum = i;
			Thread.sleep(1000);
			executor.execute(() -> {
				try {
					race(threadNum);
				} catch (Exception e) {
					e.printStackTrace();
				}
			});
		}
		executor.shutdown();
	}

	private static void race(int threadNum) throws Exception {
		Thread.sleep(1000);
		System.out.println("{" + threadNum + "} is ready");
		barrier.await();
		System.out.println("{" + threadNum + "} continue");
	}
}

缓存的使用和设计

原文地址 [本文整理发布]

缓存的一些基本常识

  • Cache(缓存): 从cpu的一级和二级缓存、Internet的DNS、到浏览器缓存都可以看做是一种缓存。(存贮数据(使用频繁的数据)的临时地方,因为取原始数据的代价太大了,所以我可以取得快一些)
  • Cache hit(缓存命中) Cahe miss(缓存未命中)

  • 缓存算法:缓存容量超过预设,如何踢掉“无用”的数据。例如:LRU(Least Recently Used) FIFO(First Input First Output)Least Frequently Used(LFU) 等等

  • System-of-Record(真实数据源): 例如关系型数据库、其他持久性系统等等。 也有英文书叫做authority data(权威数据)

  • serialization-and-deserialization(序列化与反序列化):可以参考:序列化与反序列化(美团工程师写的,非常棒的文章)
    serialization-and-deserialization

  • Scale Up (垂直扩容) 和 Scale out (水平扩容), 驴拉车,通常不是把一头驴养壮(有极限),而通常是一群驴去拉(当然每个个体也不能太差)。

    64258e75-ad29-32ef-ac55-45d95278ff0a

  • Write-through 和 write-behind
    154e49f4-f223-3b4f-9346-fa378dd2efd1

  • 阿姆而达定律:用于计算缓存加速比

  • LocalCache(独立式): 例如Ehcache、BigMemory Go
    (1) 缓存和应用在一个JVM中。
    (2) 缓存间是不通信的,独立的。
    (3) 弱一致性。

  • Standalone(单机):
    (1) 缓存和应用是独立部署的。
    (2) 缓存可以是单台。(例如memcache/redis单机等等)
    (3) 强一致性
    (4) 无高可用、无分布式。
    (5) 跨进程、跨网络

  • Distributed(分布式):例如Redis-Cluster, memcache集群等等
    (1) 缓存和应用是独立部署的。
    (2) 多个实例。(例如memcache/redis等等)
    (3) 强一致性或者最终一致性
    (4) 支持Scale Out、高可用。
    (5) 跨进程、跨网络

  • Replicated(复制式): 缓存数据时同时存放在多个应用节点的,数据复制和失效的事件以同步或者异步的形式在各个集群节点间传播。(也是弱一致性)这种用的不太多。

  • 数据层访问速度
    082dfcc7-1a06-3116-886f-055d4af98cf8
    继续阅读“缓存的使用和设计”

一次Memcached使用问题

收到群里反馈很多PHP请求执行超时,服务已经不可用。

查看服务器监控数据

发现 CPU 负载 在那段时间一直爬坡。 查询fpm slow 日志 发现请求都卡在了 memcache get 的方法上面。

发现出问题的请求有一个方法 频繁的读写一个key 这个 key 保存的内容 内存已经大于1MB, 一秒内有 500 多个请求,同时读写会导致memcache 的性能急剧下降。

自己写代码重现这个问题:
php 脚本 test-memcached.php:

<?php
$time_start = microtime(true);

$ac = new Memcached();
$ac->addServer('localhost', 11211);

#$ac->setOption(Memcached::OPT_SOCKET_SEND_SIZE, 1024*512);
#$ac->setOption(Memcached::OPT_SOCKET_RECV_SIZE, 1024*512);
#$ac->setOption(Memcached::OPT_RECV_TIMEOUT, 5);
#$ac->setOption(Memcached::OPT_SEND_TIMEOUT, 5);

$data = [];

for($i = 0; $i<30000; $i++){
  $data[] = md5($i);
}

$r = $ac->set('key', $data, 3600);
$set_time_end = microtime(true);
$set_time = $set_time_end - $time_start;
echo "Set Key Done in $set_time seconds \n";
$result = $ac->get('key');
$read_time_end = microtime(true);
$read_time = $read_time_end - $time_start;
echo "Read key Done in $read_time seconds \n";
$time_end = microtime(true);
$time = $time_end - $time_start;
echo "Did nothing in $time seconds\n";
?>

shell 测试脚本:

#!/bin/bash
date
for i in `seq 1 300`
do
{
 php test-memcached.php
 sleep 1
}&
done
wait #等待执行完成
date

分别修改 memcache key 保存内容的大小得到结果:

A:

for($i = 0; $i<30000; $i++){
  $data[] = md5($i);
}

B:

for($i = 0; $i<3; $i++){
  $data[] = md5($i);
}

测试结果:

A:

Did nothing in 18.769073009491 seconds
Did nothing in 14.950340032578 seconds
Did nothing in 19.823235034943 seconds
Did nothing in 20.014719009399 seconds
Did nothing in 20.894814014435 seconds
Did nothing in 20.827455997467 seconds
Did nothing in 15.345555067062 seconds
Did nothing in 18.984731197357 seconds
Did nothing in 21.08841586113 seconds

B:

Did nothing in 0.39238500595093 seconds
Did nothing in 0.42710494995117 seconds
Did nothing in 0.34245300292969 seconds
Did nothing in 0.084210872650146 seconds
Did nothing in 0.41426110267639 seconds
Did nothing in 0.12554693222046 seconds
Did nothing in 0.10487604141235 seconds
Did nothing in 0.60212993621826 seconds
Did nothing in 0.054439067840576 seconds
Did nothing in 0.11286902427673 seconds
Did nothing in 0.69583892822266 seconds
Did nothing in 0.14684200286865 seconds
Did nothing in 0.082473993301392 seconds
Did nothing in 0.34351587295532 seconds
Did nothing in 0.10698294639587 seconds


看结果可以得知 但 memcache 保存的内容变大的时候速度会非常慢,所以使用memcache 的时候要注意缓存值得大小。

memcache 缓存值最大大小是1M 引用 Oracle 网站的一个 FAQ

The default maximum object size is 1MB. In memcached 1.4.2 and later, you can change the maximum size of an object using the -I command line option.

For versions before this, to increase this size, you have to re-compile memcached. You can modify the value of the POWER_BLOCK within the slabs.c file within the source.

In memcached 1.4.2 and higher, you can configure the maximum supported object size by using the -I command-line option. For example, to increase the maximum object size to 5MB:

$ memcached -I 5m

If an object is larger than the maximum object size, you must manually split it. memcached is very simple: you give it a key and some data, it tries to cache it in RAM. If you try to store more than the default maximum size, the value is just truncated for speed reasons.

出现服务不可用的一个重要原因也有写缓存的次数和读缓存次数一样多。

最后修改测试脚本 依然保存很大的缓存值,但是只写入一次,测试并发的读取,基本都可以在1秒左右的读取到数据。

结论:
1.使用memcache的要注意缓存内容的大小,不要保存太大的内容在一个key上
2.缓存是读的次数远远大于写的次数,如果代码中发现写缓存的次数和读缓存的次数一样的时候要提高警惕
3.千万不要在数据库事务未提交的时候删除缓存

QA:
1.为啥使用MD5字符串?
因为memcache会自动压缩内容,如果用普通重复字符串测试效果不明显,线上出现问题的场景也是MD5串的缓存值,可以开启和关闭压缩选项?

测试代码下载

打印杨辉三角

构造杨辉三角逻辑并不难,主要打印格式的对齐比较困难.

public class Main {

	/**
	 * 计算行应该占得宽度
	 * 每个数字额外加了一个空格,数字的宽度加空格的宽度
	 * @param lastIntegers
	 * @return
	 */
	static int width_of(int[] lastIntegers) {
		int width = 0;
		int length = 0;
		for (int lastInteger : lastIntegers) {
			if (lastInteger > 0) {
				width += numWidth(lastInteger);
				length++;
			}
		}
		width += length;
		return width;
	}

	static int numWidth(int num) {
		int count = 0;
		while (num > 0) {
			num = num / 10;
			count++;
		}
		return count;
	}

	public static void main(String[] args) {
		int n = 10;
		int[][] result = new int[n][n];
		result[0][0] = 1;
		result[1][0] = 1;
		result[1][1] = 1;
		//构造杨辉三角
		for (int i = 2; i < n; i++) {
			result[i][0] = 1;
			result[i][i] = 1;
			for (int j = 1; j <= i - 1; j++) {
				result[i][j] = result[i - 1][j - 1] + result[i - 1][j];
			}
		}
		int maxWidth = width_of(result[n - 1]);
		for (int i = 0; i < n; i++) {
			int[] temp = result[i];
			int temp_width = width_of(temp);
			int wihteBlankLength = (maxWidth - temp_width) / 2;
			for (int k = 0; k < wihteBlankLength; k++) {
				System.out.print(" ");
			}
			for (int j = 0; j < temp.length; j++) {
				if (temp[j] > 0)
					System.out.print(String.format("%d", temp[j]) + " ");
			}
			System.out.println();
		}
	}

}