Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ ltask is inspired by skynet (https://github.com/cloudwu/skynet) , but it's a lib

It implement an n:m scheduler , so that you can run M lua VMs on N OS threads.

Each lua service (an indepentent lua VM) works in request/response mode, they use message channels to inter-communicate.
Each lua service (an independent lua VM) works in request/response mode, they use message channels to inter-communicate.

`root` is a special service that can spawn new services. For example,

Expand Down Expand Up @@ -45,4 +45,4 @@ Test
====
```
lua test.lua
```
```
4 changes: 2 additions & 2 deletions lualib/bootstrap.lua
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
local boot = require "ltask.bootstrap"

local SERVICE_ROOT <const> = 1
local MESSSAGE_SYSTEM <const> = 0
local MESSAGE_SYSTEM <const> = 0

local function bootstrap_root(initfunc, config)
local sid = assert(boot.new_service("root", config.service_source, config.service_chunkname, SERVICE_ROOT))
Expand All @@ -18,7 +18,7 @@ local function bootstrap_root(initfunc, config)
from = SERVICE_ROOT,
to = SERVICE_ROOT,
session = 1, -- 1 for root init
type = MESSSAGE_SYSTEM,
type = MESSAGE_SYSTEM,
message = init_msg,
size = sz,
}
Expand Down
4 changes: 2 additions & 2 deletions lualib/service.lua
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ local function post_response_message(addr, session, type, msg, sz)
end
end

function ltask.rasie_error(addr, session, message)
function ltask.raise_error(addr, session, message)
if session == SESSION_SEND_MESSAGE then
return
end
Expand Down Expand Up @@ -702,7 +702,7 @@ function ltask.quit()
ltask.fork(function ()
for co, addr in pairs(session_coroutine_address) do
local session = session_coroutine_response[co]
ltask.rasie_error(addr, session, "Service has been quit.")
ltask.raise_error(addr, session, "Service has been quit.")
end
quit = true
end)
Expand Down
6 changes: 3 additions & 3 deletions service/root.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ local MESSAGE_SCHEDULE_DEL <const> = 1

local RECEIPT_ERROR <const> = 2
local RECEIPT_BLOCK <const> = 3
local RECEIPT_RESPONCE <const> = 4
local RECEIPT_RESPONSE <const> = 4

local S = {}

Expand Down Expand Up @@ -117,7 +117,7 @@ end

local function spawn(t)
local type, address = ltask.post_message(SERVICE_SYSTEM, 0, MESSAGE_SCHEDULE_NEW)
if type ~= RECEIPT_RESPONCE then
if type ~= RECEIPT_RESPONSE then
-- RECEIPT_ERROR
error("send MESSAGE_SCHEDULE_NEW failed.")
end
Expand Down Expand Up @@ -244,7 +244,7 @@ local function del_service(address)
for i=1, #msg, 2 do
local addr = msg[i]
local session = msg[i+1]
ltask.rasie_error(addr, session, err)
ltask.raise_error(addr, session, err)
end
end
end
Expand Down
16 changes: 8 additions & 8 deletions src/ltask.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ dispatch_schedule_message(struct ltask *task, service_id id, struct message *msg
if (msg->to.id == 0) {
service_write_receipt(P, id, MESSAGE_RECEIPT_ERROR, msg);
} else {
service_write_receipt(P, id, MESSAGE_RECEIPT_RESPONCE, msg);
service_write_receipt(P, id, MESSAGE_RECEIPT_RESPONSE, msg);
}
break;
case MESSAGE_SCHEDULE_DEL:
Expand Down Expand Up @@ -222,7 +222,7 @@ collect_done_job(struct ltask *task, service_id done_job[]) {
}

static void
dispath_out_messages(struct ltask *task, const service_id done_job[], int done_job_n) {
dispatch_out_messages(struct ltask *task, const service_id done_job[], int done_job_n) {
struct service_pool *P = task->services;
int i;

Expand Down Expand Up @@ -483,7 +483,7 @@ schedule_dispatch(struct ltask *task) {
schedule_back(task, id);
}

// Step 1 : dispatch external messsages
// Step 1 : dispatch external messages

if (task->external_message) {
dispatch_external_messages(task);
Expand All @@ -495,7 +495,7 @@ schedule_dispatch(struct ltask *task) {
int done_job_n = collect_done_job(task, jobs);

// Step 3: Dispatch out message by service_done
dispath_out_messages(task, jobs, done_job_n);
dispatch_out_messages(task, jobs, done_job_n);

// Step 4: get pending jobs
int job_n = get_pending_jobs(task, jobs);
Expand Down Expand Up @@ -857,7 +857,7 @@ ltask_init(lua_State *L) {
static void *
get_ptr(lua_State *L, const char *key) {
if (lua_getfield(L, LUA_REGISTRYINDEX, key) == LUA_TNIL) {
luaL_error(L, "%s is absense", key);
luaL_error(L, "%s is absence", key);
return NULL;
}
void * v = lua_touserdata(L, -1);
Expand Down Expand Up @@ -984,10 +984,10 @@ ltask_deinit(lua_State *L) {

int i;
for (i=0;i<task->config->worker;i++) {
worker_destory(&task->workers[i]);
worker_destroy(&task->workers[i]);
}

service_destory(task->services);
service_destroy(task->services);
queue_delete(task->schedule);
timer_destroy(task->timer);

Expand Down Expand Up @@ -1419,7 +1419,7 @@ lmessage_receipt(lua_State *L) {
lua_pushinteger(L, receipt);
if (m == NULL)
return 1;
if (receipt == MESSAGE_RECEIPT_RESPONCE) {
if (receipt == MESSAGE_RECEIPT_RESPONSE) {
// Only for schedule message NEW
lua_pushinteger(L, m->to.id);
message_delete(m);
Expand Down
2 changes: 1 addition & 1 deletion src/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ typedef unsigned int session_t;
#define MESSAGE_RECEIPT_DONE 1
#define MESSAGE_RECEIPT_ERROR 2
#define MESSAGE_RECEIPT_BLOCK 3
#define MESSAGE_RECEIPT_RESPONCE 4
#define MESSAGE_RECEIPT_RESPONSE 4

// If to == 0, it's a schedule message. It should be post from root service (1).
// type is MESSAGE_SCHEDULE_* from is the parameter (for DEL service_id).
Expand Down
2 changes: 1 addition & 1 deletion src/service.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ free_service(struct service *S) {
}

void
service_destory(struct service_pool *p) {
service_destroy(struct service_pool *p) {
if (p == NULL)
return;
int i;
Expand Down
2 changes: 1 addition & 1 deletion src/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ typedef struct {
} service_id;

struct service_pool * service_create(struct ltask_config *config);
void service_destory(struct service_pool *p);
void service_destroy(struct service_pool *p);
service_id service_new(struct service_pool *p, unsigned int id);
// 0 succ
int service_init(struct service_pool *p, service_id id, void *ud, size_t sz, void *pL);
Expand Down
4 changes: 2 additions & 2 deletions src/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ worker_quit(struct worker_thread *w) {
}

static inline void
worker_destory(struct worker_thread *worker) {
worker_destroy(struct worker_thread *worker) {
cond_release(&worker->trigger);
}

Expand Down Expand Up @@ -131,7 +131,7 @@ worker_assign_job(struct worker_thread *worker, service_id id) {
if (q->head == q->tail)
q->head = q->tail = 0;
}
// only one producer (Woker) except itself (worker_steal_job), so don't need use CAS to set
// only one producer (Worker) except itself (worker_steal_job), so don't need use CAS to set
worker->service_ready = id.id;
return id;
} else {
Expand Down