1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-28 16:20:57 -05:00

refactor(cli/js/net): Cleanup iterable APIs (#4236)

Listener and UDPConn are AsyncIterables instead of AsyncIterators.
The [Symbol.asyncIterator]()s are defined as generators and the
next() methods are gone.

"Listener/Socket has been closed" errors are now BadResource.
This commit is contained in:
Nayeem Rahman 2020-03-10 19:14:22 +00:00 committed by GitHub
parent 8078d976d2
commit 55119aaee2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 44 additions and 56 deletions

View file

@ -1522,7 +1522,7 @@ declare namespace Deno {
/** **UNSTABLE**: new API, yet to be vetted.
*
* A generic transport listener for message-oriented protocols. */
export interface UDPConn extends AsyncIterator<[Uint8Array, Addr]> {
export interface UDPConn extends AsyncIterable<[Uint8Array, Addr]> {
/** **UNSTABLE**: new API, yet to be vetted.
*
* Waits for and resolves to the next message to the `UDPConn`. */
@ -1542,7 +1542,7 @@ declare namespace Deno {
}
/** A generic network listener for stream-oriented protocols. */
export interface Listener extends AsyncIterator<Conn> {
export interface Listener extends AsyncIterable<Conn> {
/** Waits for and resolves to the next connection to the `Listener`. */
accept(): Promise<Conn>;
/** Close closes the listener. Any pending accept promises will be rejected

View file

@ -1,4 +1,5 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import { errors } from "./errors.ts";
import { EOF, Reader, Writer, Closer } from "./io.ts";
import { read, write } from "./ops/io.ts";
import { close } from "./ops/resources.ts";
@ -19,7 +20,7 @@ export interface UDPAddr {
}
/** A socket is a generic transport listener for message-oriented protocols */
export interface UDPConn extends AsyncIterator<[Uint8Array, Addr]> {
export interface UDPConn extends AsyncIterable<[Uint8Array, Addr]> {
/** Waits for and resolves to the next message to the `Socket`. */
receive(p?: Uint8Array): Promise<[Uint8Array, Addr]>;
@ -38,7 +39,7 @@ export interface UDPConn extends AsyncIterator<[Uint8Array, Addr]> {
}
/** A Listener is a generic transport listener for stream-oriented protocols. */
export interface Listener extends AsyncIterator<Conn> {
export interface Listener extends AsyncIterable<Conn> {
/** Waits for and resolves to the next connection to the `Listener`. */
accept(): Promise<Conn>;
@ -88,11 +89,7 @@ export class ConnImpl implements Conn {
}
export class ListenerImpl implements Listener {
constructor(
readonly rid: number,
readonly addr: Addr,
private closing: boolean = false
) {}
constructor(readonly rid: number, readonly addr: Addr) {}
async accept(): Promise<Conn> {
const res = await netOps.accept(this.rid);
@ -100,29 +97,20 @@ export class ListenerImpl implements Listener {
}
close(): void {
this.closing = true;
close(this.rid);
}
async next(): Promise<IteratorResult<Conn>> {
if (this.closing) {
return { value: undefined, done: true };
async *[Symbol.asyncIterator](): AsyncIterator<Conn> {
while (true) {
try {
yield await this.accept();
} catch (error) {
if (error instanceof errors.BadResource) {
break;
}
return await this.accept()
.then(value => ({ value, done: false }))
.catch(e => {
// It wouldn't be correct to simply check this.closing here.
// TODO: Get a proper error kind for this case, don't check the message.
// The current error kind is Other.
if (e.message == "Listener has been closed") {
return { value: undefined, done: true };
throw error;
}
throw e;
});
}
[Symbol.asyncIterator](): AsyncIterator<Conn> {
return this;
}
}
@ -138,8 +126,7 @@ export class UDPConnImpl implements UDPConn {
constructor(
readonly rid: number,
readonly addr: Addr,
public bufSize: number = 1024,
private closing: boolean = false
public bufSize: number = 1024
) {}
async receive(p?: Uint8Array): Promise<[Uint8Array, Addr]> {
@ -157,29 +144,20 @@ export class UDPConnImpl implements UDPConn {
}
close(): void {
this.closing = true;
close(this.rid);
}
async next(): Promise<IteratorResult<[Uint8Array, Addr]>> {
if (this.closing) {
return { value: undefined, done: true };
async *[Symbol.asyncIterator](): AsyncIterator<[Uint8Array, Addr]> {
while (true) {
try {
yield await this.receive();
} catch (error) {
if (error instanceof errors.BadResource) {
break;
}
return await this.receive()
.then(value => ({ value, done: false }))
.catch(e => {
// It wouldn't be correct to simply check this.closing here.
// TODO: Get a proper error kind for this case, don't check the message.
// The current error kind is Other.
if (e.message == "Socket has been closed") {
return { value: undefined, done: true };
throw error;
}
throw e;
});
}
[Symbol.asyncIterator](): AsyncIterator<[Uint8Array, Addr]> {
return this;
}
}

View file

@ -56,7 +56,7 @@ fn op_accept(
let listener_resource = resource_table
.get_mut::<TcpListenerResource>(rid)
.ok_or_else(|| {
OpError::other("Listener has been closed".to_string())
OpError::bad_resource("Listener has been closed".to_string())
})?;
let listener = &mut listener_resource.listener;
match listener.poll_accept(cx).map_err(OpError::from) {
@ -122,7 +122,9 @@ fn op_receive(
let resource_table = &mut state_.borrow_mut().resource_table;
let resource = resource_table
.get_mut::<UdpSocketResource>(rid)
.ok_or_else(|| OpError::other("Socket has been closed".to_string()))?;
.ok_or_else(|| {
OpError::bad_resource("Socket has been closed".to_string())
})?;
let socket = &mut resource.socket;
socket.poll_recv_from(cx, &mut buf).map_err(OpError::from)
});
@ -168,7 +170,9 @@ fn op_send(
let resource = state
.resource_table
.get_mut::<UdpSocketResource>(rid)
.ok_or_else(|| OpError::other("Socket has been closed".to_string()))?;
.ok_or_else(|| {
OpError::bad_resource("Socket has been closed".to_string())
})?;
let socket = &mut resource.socket;
let addr = resolve_addr(&args.hostname, args.port).await?;

View file

@ -283,7 +283,7 @@ fn op_accept_tls(
let listener_resource = resource_table
.get_mut::<TlsListenerResource>(rid)
.ok_or_else(|| {
OpError::other("Listener has been closed".to_string())
OpError::bad_resource("Listener has been closed".to_string())
})?;
let listener = &mut listener_resource.listener;
match listener.poll_accept(cx).map_err(OpError::from) {

View file

@ -198,9 +198,15 @@ export class Server implements AsyncIterable<ServerRequest> {
): AsyncIterableIterator<ServerRequest> {
if (this.closing) return;
// Wait for a new connection.
const { value, done } = await this.listener.next();
if (done) return;
const conn = value as Conn;
let conn: Conn;
try {
conn = await this.listener.accept();
} catch (error) {
if (error instanceof Deno.errors.BadResource) {
return;
}
throw error;
}
// Try to accept another connection and add it to the multiplexer.
mux.add(this.acceptConnAndIterateHttpRequests(mux));
// Yield the requests that arrive on the just-accepted connection.