进程间通信(IPC, inter-process communication)是多个执行上下文实现数据交互的重要功能,也是 Linux Kernel 一个重要的模块。本篇主要着眼于 Linux 基于 System V 引入的 3 种 IPC 机制——信号量、消息队列、共享内存。除此之外,Linux 还有更多的方式能够实现进程间通信,但本文不做介绍。

本篇基于 Linux 4.9.87 版本源码

IPC 命名空间

读过前几篇的同学应该都能够了解,内核统一维护了所有的资源,并有限地向各个执行流暴露所需的资源。这保证了各个执行流看似独立地运行,但也为进程间的协作制造了障碍。基于这个原因,内核又提供了额外的机制来超脱这个限制。

struct kern_ipc_perm
{
	spinlock_t	lock;
	bool		deleted;
	int		id;             /* 内核态内部 id */
	key_t		key;        /* 保存用户程序用来唯一区分信号量的一个魔数 */
	kuid_t		uid;
	kgid_t		gid;
	kuid_t		cuid;
	kgid_t		cgid;
	umode_t		mode;
	unsigned long	seq;
	void		*security;
};

struct ipc_ids {
	int in_use;                 /* 使用中的 IPC 对象数量 */
	unsigned short seq;         /* 用户空间 IPC 对象 ID */
	struct rw_semaphore rwsem;  /* 内核信号量,内核操作必备的锁机制 */
	struct idr ipcs_idr;        /* id 管理器,ipcs_idr 总是指向 kern_ipc_perm 对象 */
	int next_id;
};

struct ipc_namespace {
	atomic_t	count;          /* 被引用的次数 */
    /* “核心”,每个数组元素对应一种 IPC 机制:信号量、消息队列、共享内存 */
    /*
       from ipc/util.h
        #define IPC_SEM_IDS 0   信号量
        #define IPC_MSG_IDS 1   消息队列
        #define IPC_SHM_IDS 2   共享内存
    */
	struct ipc_ids	ids[3];

    /* 以下都是对三种 IPC 机制设置的限制,诸如共享内存页的最大数量等 */
	int		sem_ctls[4];
	int		used_sems;

	unsigned int	msg_ctlmax;
	unsigned int	msg_ctlmnb;
	unsigned int	msg_ctlmni;
	atomic_t	msg_bytes;
	atomic_t	msg_hdrs;

	size_t		shm_ctlmax;
	size_t		shm_ctlall;
	unsigned long	shm_tot;
	int		shm_ctlmni;
	int		shm_rmid_forced;

	struct notifier_block ipcns_nb;

	/* The kern_mount of the mqueuefs sb.  We take a ref on it */
	struct vfsmount	*mq_mnt;

	/* # queues in this ns, protected by mq_lock */
	unsigned int    mq_queues_count;

	/* next fields are set through sysctl */
	unsigned int    mq_queues_max;   /* initialized to DFLT_QUEUESMAX */
	unsigned int    mq_msg_max;      /* initialized to DFLT_MSGMAX */
	unsigned int    mq_msgsize_max;  /* initialized to DFLT_MSGSIZEMAX */
	unsigned int    mq_msg_default;
	unsigned int    mq_msgsize_default;

	/* user_ns which owns the ipc ns */
	struct user_namespace *user_ns;
	struct ucounts *ucounts;

	struct ns_common ns;
};

IPC 由一个被称为 ipc_namespace 的数据结构维护,如果熟悉 pid_namespace 之类的,应该能够理解内核通过命名空间实现了资源的隔离。这里的 ipc_namespace 也是为了实现多个 ipc 环境所做的抽象。首先看下初始化 IPC 命名空间的流程。

/* from init/init_task.c */
/* 内核抽象的第一个任务 */
struct task_struct init_task = INIT_TASK(init_task);

/* from include/linux/init_task.h */
#define INIT_TASK(tsk) \
{                                       \
    .nsproxy	= &init_nsproxy,        \
}

/* from kernel/nsproxy.c */
struct nsproxy init_nsproxy = {
	.count			= ATOMIC_INIT(1),
	.uts_ns			= &init_uts_ns,
#if defined(CONFIG_POSIX_MQUEUE) || defined(CONFIG_SYSVIPC)
	.ipc_ns			= &init_ipc_ns,
#endif
	.mnt_ns			= NULL,
	.pid_ns_for_children	= &init_pid_ns,
#ifdef CONFIG_NET
	.net_ns			= &init_net,
#endif
#ifdef CONFIG_CGROUPS
	.cgroup_ns		= &init_cgroup_ns,
#endif
};

/* from ipc/msgutil.c */ 
struct ipc_namespace init_ipc_ns = {
	.count		= ATOMIC_INIT(1),
	.user_ns = &init_user_ns,
	.ns.inum = PROC_IPC_INIT_INO,
#ifdef CONFIG_IPC_NS
	.ns.ops = &ipcns_operations,
#endif
};

任务通过 forkclone 等操作构建新的任务,并由 flag CLONE_NEWIPC 决定与父任务共享 IPC 命名空间或创建新的 IPC 命名空间。

struct ipc_namespace *copy_ipcs(unsigned long flags,
	struct user_namespace *user_ns, struct ipc_namespace *ns)
{
	if (!(flags & CLONE_NEWIPC))
        /* 返回原来的 IPC 命名空间 */
		return get_ipc_ns(ns);
    /* 返回一个新的 IPC 命名空间 */
	return create_ipc_ns(user_ns, ns);
}

信号量

上图给出了各个结构间的关系,通过当前任务指向的 IPC 命名空间,找到 struct ipc_ids ,内核通过 ipcs_idr 找到 ID 到指针的映射关系,从而得到所需的 kern_ipc_perm 实例。同时 kern_ipc_perm 作为结构 sem_array 的第一个元素,使用技巧就可以直接定位到 struct sem_array

syscall semget

获取一个信号量

/* from ipc/sem.c */
SYSCALL_DEFINE3(semget, key_t, key, int, nsems, int, semflg)
{
	struct ipc_namespace *ns;
	static const struct ipc_ops sem_ops = {
		.getnew = newary,
		.associate = sem_security,
		.more_checks = sem_more_checks,
	};
	struct ipc_params sem_params;

    /* 获取当前任务的 IPC 命名空间 */
	ns = current->nsproxy->ipc_ns;

	if (nsems < 0 || nsems > ns->sc_semmsl)
		return -EINVAL;

	sem_params.key = key;
	sem_params.flg = semflg;
	sem_params.u.nsems = nsems;

	return ipcget(ns, &sem_ids(ns), &sem_ops, &sem_params);
}

/* from ipc/util.c */
int ipcget(struct ipc_namespace *ns, struct ipc_ids *ids,
			const struct ipc_ops *ops, struct ipc_params *params)
{
    /* 如果标志位为“私有”,则创建新的 IPC 命名空间 */
	if (params->key == IPC_PRIVATE)
		return ipcget_new(ns, ids, ops, params);
    /* 使用当前任务的 IPC 命名空间 */
	else
		return ipcget_public(ns, ids, ops, params);
}

这里额外展示了 msgget, shmget,分别意味着消息队列、共享内存。获取 IPC 对象的流程都是相同的,有所区别的只是获取、写入和存储数据的方式。

// from 'ipc/util.c'
int ipcget_new(struct ipc_namespace *ns, struct ipc_ids *ids,
		struct ipc_ops *ops, struct ipc_params *params)
{
	int err;
retry:
	err = idr_pre_get(&ids->ipcs_idr, GFP_KERNEL);

	if (!err)
		return -ENOMEM;

	down_write(&ids->rw_mutex);
	err = ops->getnew(ns, params);
	up_write(&ids->rw_mutex);

	if (err == -EAGAIN)
		goto retry;

	return err;
}

int ipcget_public(struct ipc_namespace *ns, struct ipc_ids *ids,
		struct ipc_ops *ops, struct ipc_params *params)
{
	struct kern_ipc_perm *ipcp;
	int flg = params->flg;
	int err;
retry:
	err = idr_pre_get(&ids->ipcs_idr, GFP_KERNEL);

  // 读写锁保护临界区
	down_write(&ids->rw_mutex);
  // 确认 KEY 是否存在
	ipcp = ipc_findkey(ids, params->key);
	if (ipcp == NULL) {
		/* KEY 不存在,创建新的 */
		if (!(flg & IPC_CREAT))
			err = -ENOENT;
		else if (!err)
			err = -ENOMEM;
		else
			err = ops->getnew(ns, params);
	} else {
		/* KEY 存在,check ACL */
		if (flg & IPC_CREAT && flg & IPC_EXCL)
			err = -EEXIST;
		else {
			err = 0;
			if (ops->more_checks)
				err = ops->more_checks(ipcp, params);
			if (!err)
				err = ipc_check_perms(ipcp, ops, params);
		}
		ipc_unlock(ipcp);
	}
  // 离开临界区
	up_write(&ids->rw_mutex);

	if (err == -EAGAIN)
		goto retry;

	return err;
}

ipcget_newipcget_public ,核心的目标用 key 去换取一个 id (代表 struct kern_ipc_permipcs_idr 整数指针管理器中的 id )。

如果这个 key 不存在,就考虑创建一个新的 struct kern_ipc_perm

// from 'include/linux/sem.h'
struct sem_array {
	struct kern_ipc_perm	sem_perm;	/* permissions .. see ipc.h */
	time_t			sem_otime;	/* last semop time */
	time_t			sem_ctime;	/* last change time */
	struct sem		*sem_base;	/* ptr to first semaphore in array */
	struct sem_queue	*sem_pending;	/* pending operations to be processed */
	struct sem_queue	**sem_pending_last; /* last pending operation */
	struct sem_undo		*undo;		/* undo requests on this array */
	unsigned long		sem_nsems;	/* no. of semaphores in array */
};

// from 'ipc/sem.c'
static int newary(struct ipc_namespace *ns, struct ipc_params *params)
{
	int id;
	int retval;
	struct sem_array *sma;
	int size;
	key_t key = params->key;
	int nsems = params->u.nsems;
	int semflg = params->flg;

	if (!nsems)	// 信号量集合中信号量数量不能为0.
		return -EINVAL;
	if (ns->used_sems + nsems > ns->sc_semmns) // 不能超过 IPC 信号量数量上限
		return -ENOSPC;

	size = sizeof (*sma) + nsems * sizeof (struct sem);
	sma = ipc_rcu_alloc(size);
	if (!sma) {
		return -ENOMEM;
	}
	memset (sma, 0, size);

	sma->sem_perm.mode = (semflg & S_IRWXUGO);
	sma->sem_perm.key = key;

	sma->sem_perm.security = NULL;
	retval = security_sem_alloc(sma);
	if (retval) {
		ipc_rcu_putref(sma);
		return retval;
	}

	id = ipc_addid(&sem_ids(ns), &sma->sem_perm, ns->sc_semmni);
	if (id < 0) {
		security_sem_free(sma);
		ipc_rcu_putref(sma);
		return id;
	}
	ns->used_sems += nsems;

	sma->sem_perm.id = sem_buildid(id, sma->sem_perm.seq);
	sma->sem_base = (struct sem *) &sma[1];
	/* sma->sem_pending = NULL; */
	sma->sem_pending_last = &sma->sem_pending;
	/* sma->undo = NULL; */
	sma->sem_nsems = nsems;
	sma->sem_ctime = get_seconds();
	sem_unlock(sma);

	return sma->sem_perm.id;
}

syscall semctl

信号量控制操作。根据 cmd 的不同,可以获得信号集合中所有值(GETALL),获取最后一个操作信号集合的进程号(GETPID),统一设置信号集合中所有值(SETALL)

SYSCALL_DEFINE4(semctl, int, semid, int, semnum, int, cmd, unsigned long, arg)
{
	int version;
	struct ipc_namespace *ns;
	void __user *p = (void __user *)arg;

	if (semid < 0)
		return -EINVAL;

	version = ipc_parse_version(&cmd);
	ns = current->nsproxy->ipc_ns;

	switch (cmd) {
	case IPC_INFO:
	case SEM_INFO:
	case IPC_STAT:
	case SEM_STAT:
		return semctl_nolock(ns, semid, cmd, version, p);
	case GETALL:
	case GETVAL:
	case GETPID:
	case GETNCNT:
	case GETZCNT:
	case SETALL:
		return semctl_main(ns, semid, semnum, cmd, p);
	case SETVAL:
		return semctl_setval(ns, semid, semnum, arg);
	case IPC_RMID:
	case IPC_SET:
		return semctl_down(ns, semid, cmd, version, p);
	default:
		return -EINVAL;
	}
}

syscall semop

信号的 P/V 操作

// from 'ipc/sem.c'
asmlinkage long sys_semop (int semid, struct sembuf __user *tsops, unsigned nsops)
{
    return sys_semtimedop(semid, tsops, nsops, NULL);
}

asmlinkage long sys_semtimedop(int semid, struct sembuf __user *tsops,
			unsigned nsops, const struct timespec __user *timeout)
{
	int error = -EINVAL;
	struct sem_array *sma;
	struct sembuf fast_sops[SEMOPM_FAST];
	struct sembuf* sops = fast_sops, *sop;
	struct sem_undo *un;
	int undos = 0, alter = 0, max;
	struct sem_queue queue;
	unsigned long jiffies_left = 0;
	struct ipc_namespace *ns;

  // 获取当前任务的 ipc 命名空间
	ns = current->nsproxy->ipc_ns;

	if (nsops < 1 || semid < 0)
		return -EINVAL;
	if (nsops > ns->sc_semopm)
		return -E2BIG;
	if(nsops > SEMOPM_FAST) {
		sops = kmalloc(sizeof(*sops)*nsops,GFP_KERNEL);
		if(sops==NULL)
			return -ENOMEM;
	}
	if (copy_from_user (sops, tsops, nsops * sizeof(*tsops))) {
		error=-EFAULT;
		goto out_free;
	}
	if (timeout) {
		struct timespec _timeout;
		if (copy_from_user(&_timeout, timeout, sizeof(*timeout))) {
			error = -EFAULT;
			goto out_free;
		}
		if (_timeout.tv_sec < 0 || _timeout.tv_nsec < 0 ||
			_timeout.tv_nsec >= 1000000000L) {
			error = -EINVAL;
			goto out_free;
		}
		jiffies_left = timespec_to_jiffies(&_timeout);
	}
	max = 0;
	for (sop = sops; sop < sops + nsops; sop++) {
		if (sop->sem_num >= max)
			max = sop->sem_num;
		if (sop->sem_flg & SEM_UNDO)
			undos = 1;
		if (sop->sem_op != 0)
			alter = 1;
	}

retry_undos:
	if (undos) {
		un = find_undo(ns, semid);
		if (IS_ERR(un)) {
			error = PTR_ERR(un);
			goto out_free;
		}
	} else
		un = NULL;

	sma = sem_lock_check(ns, semid);
	if (IS_ERR(sma)) {
		error = PTR_ERR(sma);
		goto out_free;
	}

	/*
	 * semid identifiers are not unique - find_undo may have
	 * allocated an undo structure, it was invalidated by an RMID
	 * and now a new array with received the same id. Check and retry.
	 */
	if (un && un->semid == -1) {
		sem_unlock(sma);
		goto retry_undos;
	}
	error = -EFBIG;
	if (max >= sma->sem_nsems)
		goto out_unlock_free;

	error = -EACCES;
	if (ipcperms(&sma->sem_perm, alter ? S_IWUGO : S_IRUGO))
		goto out_unlock_free;

	error = security_sem_semop(sma, sops, nsops, alter);
	if (error)
		goto out_unlock_free;

	error = try_atomic_semop (sma, sops, nsops, un, task_tgid_vnr(current));
	if (error <= 0) {
		if (alter && error == 0)
			update_queue (sma);
		goto out_unlock_free;
	}

	/* We need to sleep on this operation, so we put the current
	 * task into the pending queue and go to sleep.
	 */

	queue.sma = sma;
	queue.sops = sops;
	queue.nsops = nsops;
	queue.undo = un;
	queue.pid = task_tgid_vnr(current);
	queue.id = semid;
	queue.alter = alter;
	if (alter)
		append_to_queue(sma ,&queue);
	else
		prepend_to_queue(sma ,&queue);

	queue.status = -EINTR;
	queue.sleeper = current;
	current->state = TASK_INTERRUPTIBLE;
	sem_unlock(sma);

	if (timeout)
		jiffies_left = schedule_timeout(jiffies_left);
	else
		schedule();

	error = queue.status;
	while(unlikely(error == IN_WAKEUP)) {
		cpu_relax();
		error = queue.status;
	}

	if (error != -EINTR) {
		/* fast path: update_queue already obtained all requested
		 * resources */
		goto out_free;
	}

	sma = sem_lock(ns, semid);
	if (IS_ERR(sma)) {
		BUG_ON(queue.prev != NULL);
		error = -EIDRM;
		goto out_free;
	}

	/*
	 * If queue.status != -EINTR we are woken up by another process
	 */
	error = queue.status;
	if (error != -EINTR) {
		goto out_unlock_free;
	}

	/*
	 * If an interrupt occurred we have to clean up the queue
	 */
	if (timeout && jiffies_left == 0)
		error = -EAGAIN;
	remove_from_queue(sma,&queue);
	goto out_unlock_free;

out_unlock_free:
	sem_unlock(sma);
out_free:
	if(sops != fast_sops)
		kfree(sops);
	return error;
}

消息队列

至于消息队列和共享内存,都利用了类似的技术,通过在 IPC 命名空间下,使用 struct idr 做 KEY, VALUE 的管理,从而维护起了一系列互不干扰的消息队列、共享内存。

共享内存

结束

局限于目前未能找到 SysV IPC 的经典利用场景,加之平时工作接触甚少。本篇匆匆结尾,未详细展示不同 IPC 技术在不同使用场景下的优劣。

相对而言,日常更多使用的 IPC 技术反而是管道、多进程共同读写文档等。

此坑估计不会再填了…

  __                    __                  
 / _| __ _ _ __   __ _ / _| ___ _ __   __ _ 
| |_ / _` | '_ \ / _` | |_ / _ \ '_ \ / _` |
|  _| (_| | | | | (_| |  _|  __/ | | | (_| |
|_|  \__,_|_| |_|\__, |_|  \___|_| |_|\__, |
                 |___/                |___/