Skip to content

Commit 8c1fc2c

Browse files
committed
Added volatile, Update send big package code
1 parent 2a3554b commit 8c1fc2c

File tree

17 files changed

+133
-77
lines changed

17 files changed

+133
-77
lines changed

examples/server/echo.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
$serv->set(array(
44
//'tcp_defer_accept' => 5,
55
//'ipc_mode' => 2,
6-
'worker_num' => 4,
6+
'worker_num' => 1,
77
'max_request' => 0,
88
//'daemonize' => true,
99
//'log_file' => '/tmp/swoole.log'
@@ -28,7 +28,9 @@
2828
$serv->on('receive', function (swoole_server $serv, $fd, $from_id, $data) {
2929
//echo "[#".posix_getpid()."]\tClient[$fd]: $data\n";
3030
//$info = $serv->connection_info($fd);
31+
//$t = microtime(true);
3132
$serv->send($fd, str_repeat('B', 1024*rand(40, 70)).rand(10000, 99999)."\n");
33+
//echo "use. ".((microtime(true) - $t)*1000)."ms\n";
3234
//$serv->send($fd, json_encode(array("hello" => '1213', "bat" => "ab")).PHP_EOL);
3335
//$serv->close($fd);
3436
});

include/Server.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -485,8 +485,8 @@ SWINLINE int swConnection_sendfile_blocking(int fd, char *filename, int timeout)
485485
SWINLINE swString* swConnection_get_string_buffer(swConnection *conn);
486486
SWINLINE int swConnection_send_string_buffer(swConnection *conn);
487487
SWINLINE void swConnection_clear_string_buffer(swConnection *conn);
488-
SWINLINE swBuffer_trunk* swConnection_get_out_buffer(swConnection *conn, uint32_t type);
489-
SWINLINE swBuffer_trunk* swConnection_get_in_buffer(swConnection *conn);
488+
SWINLINE volatile swBuffer_trunk* swConnection_get_out_buffer(swConnection *conn, uint32_t type);
489+
SWINLINE volatile swBuffer_trunk* swConnection_get_in_buffer(swConnection *conn);
490490
int swConnection_send_in_buffer(swConnection *conn);
491491

492492
int swServer_master_onAccept(swReactor *reactor, swDataHead *event);

include/atomic.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33

44
#if defined(__x86_64__)
55
#define SW_ATOMIC_T_LEN (sizeof("-9223372036854775808") - 1)
6-
typedef int64_t atomic_int_t;
7-
typedef uint64_t atomic_uint_t;
6+
typedef volatile int64_t atomic_int_t;
7+
typedef volatile uint64_t atomic_uint_t;
88
#else
99
#define SW_ATOMIC_T_LEN (sizeof("-2147483648") - 1)
10-
typedef int32_t atomic_int_t;
11-
typedef uint32_t atomic_uint_t;
10+
typedef volatile int32_t atomic_int_t;
11+
typedef volatile uint32_t atomic_uint_t;
1212
#endif
1313

1414
typedef volatile atomic_uint_t atomic_t;

include/buffer.h

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ extern "C"
1515

1616
typedef struct _swBuffer_trunk
1717
{
18-
uint32_t type;
19-
uint32_t length;
20-
uint32_t offset;
18+
volatile uint32_t type;
19+
volatile uint32_t length;
20+
volatile uint32_t offset;
2121
union
2222
{
2323
void *ptr;
@@ -27,25 +27,25 @@ typedef struct _swBuffer_trunk
2727
uint32_t val2;
2828
} data;
2929
} store;
30-
struct _swBuffer_trunk *next;
30+
volatile struct _swBuffer_trunk *next;
3131
} swBuffer_trunk;
3232

3333
typedef struct _swBuffer
3434
{
35-
int fd;
36-
uint8_t trunk_num; //trunk数量
37-
uint16_t trunk_size;
38-
uint32_t length;
39-
swBuffer_trunk *head;
40-
swBuffer_trunk *tail;
35+
volatile int fd;
36+
volatile uint8_t trunk_num; //trunk数量
37+
volatile uint16_t trunk_size;
38+
volatile uint32_t length;
39+
volatile swBuffer_trunk *head;
40+
volatile swBuffer_trunk *tail;
4141
} swBuffer;
4242

4343
#define swBuffer_get_trunk(buffer) (buffer->head)
4444
#define swBuffer_empty(buffer) (buffer == NULL || buffer->head == NULL)
4545

4646
SWINLINE swBuffer* swBuffer_new(int trunk_size);
4747
swBuffer_trunk *swBuffer_new_trunk(swBuffer *buffer, uint32_t type, uint32_t size);
48-
SWINLINE void swBuffer_pop_trunk(swBuffer *buffer, swBuffer_trunk *trunk);
48+
SWINLINE void swBuffer_pop_trunk(swBuffer *buffer, volatile swBuffer_trunk *trunk);
4949
int swBuffer_append(swBuffer *buffer, void *data, uint32_t size);
5050
int swBuffer_send(swBuffer *buffer, int fd);
5151

include/swoole.h

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ typedef struct _swPipe
309309
} swPipe;
310310

311311
int swPipeBase_create(swPipe *p, int blocking);
312-
int swPipeEventfd_create(swPipe *p, int blocking, int semaphore);
312+
int swPipeEventfd_create(swPipe *p, int blocking, int semaphore, int timeout);
313313
int swPipeUnsock_create(swPipe *p, int blocking, int protocol);
314314
int swPipeNotify_auto(swPipe *p, int blocking, int semaphore);
315315
void swBreakPoint(void);
@@ -663,9 +663,13 @@ struct _swWorker
663663
swPipe *notify;
664664

665665
/**
666-
* share memory
666+
* share memory store
667667
*/
668-
void *shm; //for taskwait
668+
struct
669+
{
670+
volatile uint8_t lock;
671+
void *ptr;
672+
} store;
669673

670674
int pipe_master;
671675
int pipe_worker;
@@ -807,7 +811,7 @@ typedef struct _swChannel
807811
swPipe notify_fd;
808812
} swChannel;
809813

810-
swChannel* swChannel_create(int size, int maxlen, int flag);
814+
swChannel* swChannel_new(int size, int maxlen, int flag);
811815
int swChannel_pop(swChannel *object, void *out, int buffer_length);
812816
int swChannel_push(swChannel *object, void *in, int data_length);
813817
int swChannel_out(swChannel *object, void *out, int buffer_length);
@@ -912,6 +916,12 @@ typedef struct
912916
*/
913917
uint16_t task_worker_num;
914918

919+
/**
920+
* Unix socket default buffer size
921+
*/
922+
uint32_t unixsock_buffer_size;
923+
924+
915925
swServer *serv;
916926
swFactory *factory;
917927
swLock lock;
@@ -954,8 +964,8 @@ typedef struct
954964

955965
typedef struct
956966
{
957-
uint8_t factory_lock_target;
958-
int16_t factory_target_worker;
967+
volatile uint8_t factory_lock_target;
968+
volatile int16_t factory_target_worker;
959969
atomic_uint_t worker_round_i;
960970
} swThreadG;
961971

php_swoole.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ ZEND_BEGIN_MODULE_GLOBALS(swoole)
232232
uint8_t task_ipc_mode;
233233
uint8_t task_auto_start;
234234
key_t message_queue_key;
235+
uint32_t unixsock_buffer_size;
235236
ZEND_END_MODULE_GLOBALS(swoole)
236237

237238
#ifdef ZTS

src/core/Base.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ int swPipeNotify_auto(swPipe *p, int blocking, int semaphore)
514514
{
515515
//eventfd是2.6.26提供的,timerfd是2.6.27提供的
516516
#ifdef HAVE_EVENTFD
517-
return swPipeEventfd_create(p, blocking, semaphore);
517+
return swPipeEventfd_create(p, blocking, semaphore, 0);
518518
#else
519519
return swPipeBase_create(p, blocking);
520520
#endif

src/core/Channel.c

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ void swChannel_debug(swChannel *chan)
4040
printf("RingBuffer: num=%d|head=%d|tail=%d|tail_tag=%d|head_tag=%d\n", chan->num, chan->head, chan->tail, (int)chan->tail_tag, (int)chan->head_tag);
4141
}
4242

43-
swChannel* swChannel_create(int size, int maxlen, int flag)
43+
swChannel* swChannel_new(int size, int maxlen, int flag)
4444
{
4545
assert(size > SW_CHANNEL_MIN_MEM + maxlen);
4646
int ret;
@@ -85,11 +85,7 @@ swChannel* swChannel_create(int size, int maxlen, int flag)
8585
//use notify
8686
if (flag & SW_CHAN_NOTIFY)
8787
{
88-
#ifdef HAVE_EVENTFD
89-
ret = swPipeEventfd_create(&object->notify_fd, 1, 1);
90-
#else
91-
ret = swPipeBase_create(&object->notify_fd, 1);
92-
#endif
88+
ret = swPipeNotify_auto(&object->notify_fd, 1, 1);
9389
if (ret < 0)
9490
{
9591
swWarn("swChannel_create: notify_fd init fail");

src/factory/FactoryProcess.c

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@ int swFactoryProcess_start(swFactory *factory)
148148
{
149149
worker = swServer_get_worker(serv, i);
150150
worker->notify = &(worker_notify[i]);
151-
worker->shm = worker_shm + (i * serv->response_max_length);
151+
worker->store.ptr = worker_shm + (i * serv->response_max_length);
152+
worker->store.lock = 0;
152153

153154
if (swPipeNotify_auto(worker->notify, 1, 0))
154155
{
@@ -609,6 +610,15 @@ int swFactoryProcess_finish(swFactory *factory, swSendData *resp)
609610
*/
610611
if (resp->length > 0)
611612
{
613+
int64_t wait_reactor;
614+
615+
/**
616+
* Storage is in use right now, wait notify.
617+
*/
618+
if (worker->store.lock == 1)
619+
{
620+
worker->notify->read(worker->notify, &wait_reactor, sizeof(wait_reactor));
621+
}
612622
swPackage_response response;
613623

614624
response.length = resp->length;
@@ -618,7 +628,12 @@ int swFactoryProcess_finish(swFactory *factory, swSendData *resp)
618628
sdata._send.info.len = sizeof(response);
619629

620630
memcpy(sdata._send.data, &response, sizeof(response));
621-
memcpy(worker->shm, resp->data, resp->length);
631+
632+
/**
633+
* Lock the worker storage
634+
*/
635+
worker->store.lock = 1;
636+
memcpy(worker->store.ptr, resp->data, resp->length);
622637
}
623638
else
624639
{
@@ -684,11 +699,6 @@ int swFactoryProcess_finish(swFactory *factory, swSendData *resp)
684699
{
685700
swWarn("sendto to reactor failed. Error: %s [%d]", strerror(errno), errno);
686701
}
687-
else if (resp->length > 0)
688-
{
689-
int64_t wait_reactor;
690-
worker->notify->read(worker->notify, &wait_reactor, sizeof(wait_reactor));
691-
}
692702
return ret;
693703
}
694704

@@ -895,7 +905,8 @@ static int swFactoryProcess_worker_loop(swFactory *factory, int worker_pti)
895905
* for msg queue
896906
* 头部放一个long让msg queue可以直接插入到消息队列中
897907
*/
898-
static __thread struct {
908+
static __thread struct
909+
{
899910
long pti;
900911
swDataHead _send;
901912
} sw_notify_data;

src/memory/RingBuffer.c

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,17 @@ typedef struct _swRingBuffer
44
{
55
uint8_t shared;
66
size_t size;
7-
off_t alloc_offset;
8-
off_t collect_offset;
9-
uint32_t free_n;
7+
volatile off_t alloc_offset;
8+
volatile off_t collect_offset;
9+
volatile uint32_t free_n;
1010
void *memory;
1111

1212
} swRingBuffer;
1313

1414
typedef struct _swRingBuffer_item
1515
{
16-
uint32_t lock;
17-
uint32_t length;
16+
volatile uint32_t lock;
17+
volatile uint32_t length;
1818
} swRingBuffer_head;
1919

2020
static void swRingBuffer_destory(swMemoryPool *pool);

0 commit comments

Comments
 (0)