Skip to content

Instantly share code, notes, and snippets.

@sujeet-agrahari
Forked from sam-github/.gitignore
Created May 23, 2023 10:07
Show Gist options
  • Save sujeet-agrahari/8cefe50415ed0d3153a63ba0f7b72007 to your computer and use it in GitHub Desktop.
Save sujeet-agrahari/8cefe50415ed0d3153a63ba0f7b72007 to your computer and use it in GitHub Desktop.
echo-server-epoll
echo-server-poll
talk
talk.dSYM
#include <fcntl.h>
#include <netdb.h>
#include <netinet/in.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
// Quick and dirty assertion that a call succeeds
int Check(int rc, const char* call) {
if (rc < 0) {
perror(call);
abort();
}
printf("%s => %d\n", call, rc);
return rc;
}
#define CHECK(X) Check(X, #X)
/*
epoll_create()
epoll_ctl()
epoll_wait()
uv... edge or level?
strace a node echo server?
*/
int main(int argc, char* argv[]) {
if (!argv[1]) {
printf("usage: %s <port>\n", argv[0]);
return 1;
}
signal(SIGPIPE, SIG_IGN);
struct sockaddr_in sa;
sa.sin_family = AF_INET;
sa.sin_port = htons(atoi(argv[1]));
sa.sin_addr.s_addr = htonl(INADDR_ANY);
int server = CHECK(socket(AF_INET, SOCK_STREAM, 0));
CHECK(bind(server, (struct sockaddr*) &sa, sizeof(sa)));
CHECK(listen(server, 512));
int optval = 1;
CHECK(setsockopt(server, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)));
int flags;
CHECK(fcntl(server, F_GETFL, &flags));
CHECK(fcntl(server, F_SETFL, flags | O_NONBLOCK));
struct epoll_event ev;
# define EVENTSZ 10
struct epoll_event events[EVENTSZ];
int epollfd = CHECK(epoll_create1(0));
ev.events = EPOLLIN;
ev.data.fd = server;
CHECK(epoll_ctl(epollfd, EPOLL_CTL_ADD, server, &ev)); // why fd twice?
// timeout of -1 means "wait for ever"
printf("polling...\n");
for(;;) {
int nfds = CHECK(epoll_wait(epollfd, events, EVENTSZ, -1));
for (int n = 0; n < nfds; n++) {
if(events[n].data.fd == server) {
printf("accepting new connection...\n");
int client = CHECK(accept(server, NULL, 0));
ev.events = EPOLLIN;
ev.data.fd = client;
CHECK(epoll_ctl(epollfd, EPOLL_CTL_ADD, client, &ev));
} else {
printf("client %d, reading...\n", events[n].data.fd);
char buf[4096];
ssize_t buflen = read(events[n].data.fd, buf, sizeof(buf));
if (buflen > 0) {
printf(" writing %zd bytes...\n", buflen);
buflen = write(events[n].data.fd, buf, buflen);
}
if (buflen < 1) {
printf(" clearing client!\n");
close(events[n].data.fd);
}
}
}
}
return 0;
}
#include <fcntl.h>
#include <netdb.h>
#include <netinet/in.h>
#include <poll.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
// Quick and dirty assertion that a call succeeds
int Check(int rc, const char* call) {
if (rc < 0) {
perror(call);
abort();
}
printf("%s => %d\n", call, rc);
return rc;
}
#define CHECK(X) Check(X, #X)
int main(int argc, char* argv[]) {
if (!argv[1]) {
printf("usage: %s <port>\n", argv[0]);
return 1;
}
signal(SIGPIPE, SIG_IGN);
struct sockaddr_in sa;
sa.sin_family = AF_INET;
sa.sin_port = htons(atoi(argv[1]));
sa.sin_addr.s_addr = htonl(INADDR_ANY);
int server = CHECK(socket(AF_INET, SOCK_STREAM, 0));
CHECK(bind(server, (struct sockaddr*) &sa, sizeof(sa)));
CHECK(listen(server, 512));
int optval = 1;
CHECK(setsockopt(server, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)));
int flags;
CHECK(fcntl(server, F_GETFL, &flags));
CHECK(fcntl(server, F_SETFL, flags | O_NONBLOCK));
struct pollfd pfds[1024] = {
{ .events = POLLIN, .fd = server },
};
size_t pfdlen = sizeof(pfds)/sizeof(pfds[0]);
for(int i = 1; i < pfdlen; i++) {
pfds[i].events = POLLIN;
pfds[i].fd = -1;
}
// timeout of -1 means "wait for ever"
printf("polling...\n");
while (CHECK(poll(pfds, pfdlen, -1))) {
for (int i = 0; i < pfdlen; i++) {
struct pollfd* pfd = pfds + i;
if (!(pfd->revents & POLLIN)) continue;
if (i == 0) {
printf("accepting new connection...\n");
int client = CHECK(accept(pfd->fd, NULL, 0));
int j;
for(j = 0; pfds[j].fd > -1; j++);
pfds[j].fd = client;
} else {
printf("client %d, reading...\n", pfd->fd);
char buf[4096];
ssize_t buflen = read(pfd->fd, buf, sizeof(buf));
if (buflen > 0) {
printf(" writing %zd bytes...\n", buflen);
// XXX a proper server would buffer the data, stop waiting for POLLIN
// for more data to read, start waiting for POLLOUT to allow data to
// be written, handle only partial data being written, and keep
// writing until all data was written, then start reading again...
// which is why its easier to use Node's stream.pipe()!
buflen = write(pfd->fd, buf, buflen);
}
if (buflen < 1) {
printf(" clearing client!\n");
close(pfd->fd);
pfd->fd = -1;
}
}
}
}
return 0;
}
<!DOCTYPE html>
<html>
<head>
<title>Title</title>
<meta charset="utf-8">
<style>
@import url(https://fonts.googleapis.com/css?family=Yanone+Kaffeesatz);
@import url(https://fonts.googleapis.com/css?family=Droid+Serif:400,700,400italic);
@import url(https://fonts.googleapis.com/css?family=Ubuntu+Mono:400,700,400italic);
body { font-family: 'Droid Serif'; }
h1, h2, h3 {
font-family: 'Yanone Kaffeesatz';
font-weight: normal;
}
.remark-code, .remark-inline-code { font-family: 'Ubuntu Mono'; }
@media print {
.remark-slide-number {
/* hide slide numbers on print/PDF, viewer has its own */
display: none;
}
}
@page {
size: 16cm 12cm;
}
</style>
</head>
<body>
<textarea id="source">
class: center, middle
- <https://goo.gl/EIPclI>or
<https://www.youtube.com/watch?v=PNa9OMajw9w&list=PLfMzBWSH11xaxRcsreXF-jB16geIJ8Foc&index=35>
</textarea>
<script src="https://gnab.github.io/remark/downloads/remark-latest.min.js">
</script>
<script>
var slideshow = remark.create();
</script>
</body>
</html>
do: exes
SRC := $(wildcard *.c)
EXE := $(SRC:.c=)
exes: $(EXE)
%: %.c
gcc -Wall -Werror -o $@ $^

Scale solution 1: poll() or select()

    int server = ... // like before

    struct pollfd pfds[1024] = {{ .events = POLLIN, .fd = server }};

    while(poll(pfds)) { // This is the "event loop"
      foreach(pfd in pfds, ) {
        if (fd.revents | POLLIN) {
          if (fd.fd == server) {  // Server socket has connection!
            int connection = accept(server);
            push(fds, { .events = POLLIN, .fd = connection})
          } else {       // Connection socket has data!
            char buf[4096];
            int size = read(connection, buffer, sizeof buf);
            write(connection, buffer, size);
          }
     }}}

Scale problem 2: linear scan of file descriptors

With thousands of fds, passing the entire list back and forth to the kernel becomes a bottleneck when most of them will remain unready in any single loop.

Addressed back in the 2000s.

The event loop from the inside out

Sam Roberts

github: @sam-github

email: [email protected]

twitter: @octetcloud


Goal, to be able to answer these questions:

  • What is the event loop? (Hint: its not an EventEmitter)
  • When is node multi-threaded?
  • Why is Node.js said to "scale well"?

A primer in Unix system programming

Warning: Pseudo "C" code lies ahead!

Network connections use "sockets", named after the system call used:

  int s = socket();

Sockets are referred to (confusingly) via "file descriptors", these are not necessarily references to the file system. Sorry.

File descriptors are O/S "object orientation", they point to objects in the kernel with a virtual "interface" (read/write/close/etc.).


Scale problem: thread-per-connection

    int server = socket();
    bind(server, 80)
    listen(server)
    while(int connection = accept(server)) {
      pthread_create(echo, connection)
    }

    void echo(int connection) {
      char buf[4096];
      while(int size = read(connection, buffer, sizeof buf)) {
        write(connection, buffer, size);
      }

Scale solution 2: kqueue, epoll, overlapped I/O

    int server = ... // like before

    int eventfd = epoll_create1(0);
    struct epoll_event events[10];
    struct epoll_event ev = { .events = EPOLLIN, .data.fd = server };
    epoll_ctl(epollfd, EPOLL_CTL_ADD, server, &ev);

    // This *is* the "event loop", every pass is a "tick"
    while((int max = epoll_wait(eventfd, events, 10, -1))) {
      for(n = 0; n &lt; max; n++) {
        if (events[n].data.fd.fd == server) {
          // Server socket has connection!
          int connection = accept(server);
          ev.events = EPOLLIN; ev.data.fd = connection;
          epoll_ctl(eventfd, EPOLL_CTL_ADD, connection, &ev);
        } else {
          // Connection socket has data!
          char buf[4096];
          int size = read(connection, buffer, sizeof buf);
          write(connection, buffer, size);
        }
     }}

What is the node event loop?

A semi-infinite loop, polling and blocking on the O/S until some in a set of file descriptors are ready.


When does node exit?

It exits when it no longer has an events to epoll_wait() for, so will never have any more events to process. At that point the epoll loop must complete.

Note: .unref() marks handles that are being waited on in the loop as "not counting" towards keeping node alive.


Can we poll for all Node.js events?

Yes and no.

  • "file" descriptors: yes, but not actual disk files (sorry)
  • time: yes
  • anything else... indirectly

Pollable: sockets (net/dgram/http/tls/https)

Classic, well supported.


Pollable: time (timeouts and intervals)

    poll(..., int timeout)
    kqueue(..., struct timespec* timeout)
    epoll_wait(..., int timeout, ...)

timeout resolution is milliseconds, timespec is nanoseconds, but rounded up to system clock granularity.

Only one timeout at a time, but Node.js keeps all timeouts sorted, and sets the timeout value to the next/earliest timeout.


Not pollable: file system

fs.* use the uv thread pool (unless they are sync).

The blocking call is made by a thread, and when it completes, readiness is signalled back to epoll loop using either an eventfd or a self-pipe.


Aside: self-pipe

A pipe, where one end is written to by a thread or signal handler, and the other end is polled in the epoll loop.

Traditional way to "wake up" a polling loop when the event to wait for is directly representable as a file descriptor.


Sometimes pollable: dns

  • dns.lookup() calls getaddrinfo(), a function in the system resolver library that makes blocking socket calls and cannot be integrated into a polling loop.
  • dns.<everything else> uses non-blocking I/O, and integrates with the epoll loop

Docs bend over backwards to explain this, but once you know how the event loop works, and how blocking library calls must be shunted off to the thead pool, this will always makes sense.


Important notes about the UV thread pool

It is shared by:

  • fs,
  • dns,
  • http.request() (with a name, dns.lookup() is used to resolve), and
  • any C++ addons that use it.

Default number of threads is 4, significantly parallel users of the above should increase the size.

Hints:

  • Resolve DNS names yourself, directly, using the direct APIs to avoid dns.lookup().
  • Increase the thread pool size with UV_THREADPOOL_SIZE.

Pollable: signals

The ultimate async... uses the self-pipe pattern to communicate with epoll loop.

Note that attaching callbacks for signals doesn't "ref" the event loop, which is consistent with their usage as a "probably won't happen" IPC mechanism.


Pollable: child processes

  • Unix signals child process termination with SIGCHLD
  • Pipes between the parent and child are pollable.

Sometimes pollable: C++ addons

Addons should use the UV thread pool, but can do anything, including making blocking calls which will block the loop (perhaps unintentionally).

Hints:

  • Review their code
  • Track loop metrics

You should now be able to describe:

  • What is the event loop
  • When is node multi-threaded
  • Why it "scales well"

End

This talk, including compilable version of pseudo "C" for playing with:

Bert Belder's talk about the Node.js event loop from a higher level, the "outside in":

- XXX Bert, Ben: how would contention in the thread pool size be noticeable?
I am wondering if appmetrics should have a threads-used metrics (similar to
the loop time metric), so its possible to see over time how many threads
are in use, and how deep the work queue is waiting for them.
- XXX appmetrics: does it give information about latency of fs and dns.lookup,
the typical pooled things?
- XXX other than fs and dns.lookup(), does anything in node core use the pool?
(well, and debugger and cpu profiling)
- XXX TBD, my talk is too long... I have only practiced once, but I think I
will have to skip poll and go directly to epoll, and while I find the
difference interesting, I think it may not be useful to have to understand
two chunks of C-ish code.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment